Skip to content

Commit 6b962c4

Browse files
Hazhzengvrdmr
andauthored
Enable servicebus batch trigger (cardinality=many) (#73)
* Add cardinality == many * Add service bus to handle multiple messages in batch * Fix flake8 and mypy * Add deltatime implementation * Add timedelta parser implementation * Implement array parser * Fix mypy typing issue * Fix rich binding * Amend warning message for UserProperties are not set * Fixes PR issues * Clean up not yet exposed servicebus fields Co-authored-by: Varad Meru <[email protected]>
1 parent 47ebec2 commit 6b962c4

File tree

12 files changed

+1238
-202
lines changed

12 files changed

+1238
-202
lines changed

.gitignore

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,11 @@ celerybeat-schedule
7979
# SageMath parsed files
8080
*.sage.py
8181

82-
# dotenv
83-
.env
84-
8582
# virtualenv
86-
.venv*
83+
.env*/
84+
.venv*/
8785
venv/
88-
ENV/
86+
env/
8987
py3env/
9088

9189
# Spyder project settings
@@ -110,4 +108,7 @@ py3env/
110108

111109
# PyCharm related files
112110
.idea/
113-
.idea_modules/
111+
.idea_modules/
112+
113+
# vscode setting
114+
.vscode/

azure/functions/_servicebus.py

Lines changed: 256 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,114 +3,335 @@
33

44
import abc
55
import datetime
6-
import typing
6+
from typing import Optional, Dict, Any
77

88

99
class ServiceBusMessage(abc.ABC):
1010

1111
@abc.abstractmethod
1212
def get_body(self) -> bytes:
13-
"""Return message body as bytes."""
13+
"""Get the message body from ServiceBus
14+
15+
Returns:
16+
--------
17+
bytes
18+
The ServiceBus message body in bytes form
19+
"""
1420
pass
1521

1622
@property
1723
@abc.abstractmethod
18-
def content_type(self) -> typing.Optional[str]:
19-
"""Message content type."""
24+
def content_type(self) -> Optional[str]:
25+
"""Optionally describes the payload of the message,
26+
with a descriptor following the format of RFC2045
27+
28+
Returns:
29+
--------
30+
Optional[str]
31+
If content type is set, returns a string.
32+
Otherwise, returns None.
33+
"""
2034
pass
2135

2236
@property
2337
@abc.abstractmethod
24-
def correlation_id(self) -> typing.Optional[str]:
25-
"""Message correlation identifier."""
38+
def correlation_id(self) -> Optional[str]:
39+
"""Enables an application to specify a context for the message for the
40+
purposes of correlation
41+
42+
Returns:
43+
--------
44+
Optional[str]
45+
If correlation id set, returns a string.
46+
Otherwise, returns None.
47+
"""
2648
pass
2749

2850
@property
2951
@abc.abstractmethod
30-
def delivery_count(self) -> typing.Optional[int]:
31-
"""Number of times delivery has been attempted."""
52+
def dead_letter_source(self) -> Optional[str]:
53+
"""Only set in messages that have been dead-lettered and subsequently
54+
auto-forwarded from the dead-letter queue to another entity.
55+
Indicates the entity in which the message was dead-lettered.
56+
This property is read-only.
57+
58+
Returns:
59+
--------
60+
Optional[str]
61+
If dead letter source is set, returns a string.
62+
Otherwise, returns None.
63+
"""
3264
pass
3365

3466
@property
3567
@abc.abstractmethod
36-
def enqueued_time_utc(self) -> typing.Optional[datetime.datetime]:
37-
"""The date and time in UTC at which the message is enqueued"""
68+
def delivery_count(self) -> Optional[int]:
69+
"""Number of deliveries that have been attempted for this message.
70+
The count is incremented when a message lock expires,
71+
or the message is explicitly abandoned by the receiver.
72+
This property is read-only.
73+
74+
Returns:
75+
--------
76+
Optional[str]
77+
If delivery count is set, returns a string.
78+
Otherwise, returns None.
79+
"""
3880
pass
3981

4082
@property
4183
@abc.abstractmethod
42-
def expires_at_utc(self) -> typing.Optional[datetime.datetime]:
43-
"""The date and time in UTC at which the message is set to expire."""
84+
def enqueued_time_utc(self) -> Optional[datetime.datetime]:
85+
"""The UTC instant at which the message has been accepted and stored
86+
in the entity. This value can be used as an authoritative and neutral
87+
arrival time indicator when the receiver does not want to trust the
88+
sender's clock. This property is read-only.
89+
90+
Returns:
91+
--------
92+
Optional[datetime.datetime]
93+
If enqueued time utc is set, returns a datetime.
94+
Otherwise, returns None.
95+
"""
4496
pass
4597

4698
@property
4799
@abc.abstractmethod
48-
def expiration_time(self) -> typing.Optional[datetime.datetime]:
49-
"""The date and time in UTC at which the message is set to expire."""
100+
def expires_at_utc(self) -> Optional[datetime.datetime]:
101+
"""The UTC instant at which the message is marked for removal and no
102+
longer available for retrieval from the entity due to its expiration.
103+
Expiry is controlled by the TimeToLive property and this property is
104+
computed from EnqueuedTimeUtc+TimeToLive. This property is read-only.
105+
106+
Returns:
107+
--------
108+
Optional[datetime.datetime]
109+
If expires at utc is set, returns a datetime.
110+
Otherwise, returns None.
111+
"""
50112
pass
51113

52114
@property
53115
@abc.abstractmethod
54-
def label(self) -> typing.Optional[str]:
55-
"""Application specific label."""
116+
def expiration_time(self) -> Optional[datetime.datetime]:
117+
"""(Deprecated, use expires_at_utc instead)"""
118+
pass
119+
120+
@property
121+
@abc.abstractmethod
122+
def label(self) -> Optional[str]:
123+
"""This property enables the application to indicate the purpose of
124+
the message to the receiver in a standardized fashion, similar to an
125+
email subject line.
126+
127+
Returns:
128+
--------
129+
Optional[str]
130+
If label is set, returns a string.
131+
Otherwise, returns None.
132+
"""
133+
pass
134+
135+
@property
136+
@abc.abstractmethod
137+
def lock_token(self) -> Optional[str]:
138+
""" The lock token is a reference to the lock that is being held by
139+
the broker in peek-lock receive mode. The token can be used to pin the
140+
lock permanently through the Deferral API and, with that, take the
141+
message out of the regular delivery state flow.
142+
This property is read-only.
143+
144+
Returns:
145+
--------
146+
Optional[str]
147+
If local token is set, returns a string.
148+
Otherwise, returns None.
149+
"""
56150
pass
57151

58152
@property
59153
@abc.abstractmethod
60154
def message_id(self) -> str:
61-
"""Identifier used to identify the message."""
155+
"""The message identifier is an application-defined value that
156+
uniquely identifies the message and its payload.
157+
The identifier is a free-form string and can reflect a GUID or an
158+
identifier derived from the application context. If enabled, the
159+
duplicate detection feature identifies and removes second and further
160+
submissions of messages with the same MessageId.
161+
162+
Returns:
163+
--------
164+
str
165+
The message identifier
166+
"""
167+
pass
168+
169+
@property
170+
@abc.abstractmethod
171+
def partition_key(self) -> Optional[str]:
172+
""" For partitioned entities, setting this value enables assigning
173+
related messages to the same internal partition, so that submission
174+
sequence order is correctly recorded. The partition is chosen by a
175+
hash function over this value and cannot be chosen directly. For
176+
session-aware entities, the SessionId property overrides this value.
177+
178+
Returns:
179+
--------
180+
Optional[str]
181+
If partition key is set, returns a string.
182+
Otherwise, returns None.
183+
"""
62184
pass
63185

64186
@property
65187
@abc.abstractmethod
66-
def partition_key(self) -> typing.Optional[str]:
67-
"""Message partition key."""
188+
def reply_to(self) -> Optional[str]:
189+
"""This optional and application-defined value is a standard way to
190+
express a reply path to the receiver of the message. When a sender
191+
expects a reply, it sets the value to the absolute or relative path
192+
of the queue or topic it expects the reply to be sent to.
193+
194+
Returns:
195+
--------
196+
Optional[str]
197+
If reply to is set, returns a string.
198+
Otherwise, returns None.
199+
"""
68200
pass
69201

70202
@property
71203
@abc.abstractmethod
72-
def reply_to(self) -> typing.Optional[str]:
73-
"""The address of an entity to send replies to."""
204+
def reply_to_session_id(self) -> Optional[str]:
205+
"""This value augments the ReplyTo information and specifies which
206+
SessionId should be set for the reply when sent to the reply entity.
207+
208+
Returns:
209+
--------
210+
Optional[str]
211+
If reply to session id is set, returns a string.
212+
Otherwise, returns None.
213+
"""
74214
pass
75215

76216
@property
77217
@abc.abstractmethod
78-
def reply_to_session_id(self) -> typing.Optional[str]:
79-
"""A session identifier augmenting the reply_to address."""
218+
def scheduled_enqueue_time(self) -> Optional[datetime.datetime]:
219+
"""(Deprecated, use scheduled_enqueue_time_utc instead)"""
80220
pass
81221

82222
@property
83223
@abc.abstractmethod
84-
def scheduled_enqueue_time(self) -> typing.Optional[datetime.datetime]:
85-
"""The date and time in UTC at which the message will be enqueued."""
224+
def scheduled_enqueue_time_utc(self) -> Optional[datetime.datetime]:
225+
"""For messages that are only made available for retrieval after a
226+
delay, this property defines the UTC instant at which the message
227+
will be logically enqueued, sequenced, and therefore made available
228+
for retrieval.
229+
230+
Returns:
231+
--------
232+
Optional[datetime.datetime]
233+
If scheduled enqueue time utc is set, returns a string.
234+
Otherwise, returns None.
235+
"""
86236
pass
87237

88238
@property
89239
@abc.abstractmethod
90-
def session_id(self) -> typing.Optional[str]:
91-
"""The session identifier for a session-aware entity."""
240+
def sequence_number(self) -> Optional[int]:
241+
"""The sequence number is a unique 64-bit integer assigned to a message
242+
as it is accepted and stored by the broker and functions as its true
243+
identifier. For partitioned entities, the topmost 16 bits reflect the
244+
partition identifier. Sequence numbers monotonically increase and are
245+
gapless. They roll over to 0 when the 48-64 bit range is exhausted.
246+
This property is read-only.
247+
248+
Returns:
249+
--------
250+
Optional[int]
251+
If sequence number is set, returns an integer.
252+
Otherwise, returns None.
253+
"""
92254
pass
93255

94256
@property
95257
@abc.abstractmethod
96-
def time_to_live(self) -> typing.Optional[datetime.timedelta]:
97-
"""The TTL time interval."""
258+
def session_id(self) -> Optional[str]:
259+
"""For session-aware entities, this application-defined value
260+
specifies the session affiliation of the message. Messages with the
261+
same session identifier are subject to summary locking and enable
262+
exact in-order processing and demultiplexing. For entities that are
263+
not session-aware, this value is ignored.
264+
265+
Returns:
266+
--------
267+
Optional[str]
268+
If session id is set, returns a string.
269+
Otherwise, returns None.
270+
"""
98271
pass
99272

100273
@property
101274
@abc.abstractmethod
102-
def to(self) -> typing.Optional[str]:
103-
"""The address of an entity the message is addressed."""
275+
def time_to_live(self) -> Optional[datetime.timedelta]:
276+
""" This value is the relative duration after which the message
277+
expires, starting from the instant the message has been accepted and
278+
stored by the broker, as captured in EnqueueTimeUtc. When not set
279+
explicitly, the assumed value is the DefaultTimeToLive for the
280+
respective queue or topic. A message-level TimeToLive value cannot
281+
be longer than the entity's DefaultTimeToLive setting.
282+
If it is longer, it is silently adjusted.
283+
284+
Returns:
285+
--------
286+
Optional[datetime.timedelta]
287+
If time to live is set, returns a timedelta.
288+
Otherwise, returns None.
289+
"""
104290
pass
105291

106292
@property
107293
@abc.abstractmethod
108-
def user_properties(self) -> typing.Dict[str, object]:
109-
"""User-defined message metadata."""
294+
def to(self) -> Optional[str]:
295+
""" This property is reserved for future use in routing scenarios and
296+
currently ignored by the broker itself. Applications can use this
297+
value in rule-driven auto-forward chaining scenarios to indicate the
298+
intended logical destination of the message.
299+
300+
Returns:
301+
--------
302+
Optional[str]
303+
If the recipient is set, returns a string.
304+
Otherwise, returns None.
305+
"""
110306
pass
111307

112308
@property
113309
@abc.abstractmethod
114-
def metadata(self) -> typing.Optional[typing.Mapping[str, typing.Any]]:
115-
"""The serialized JSON string from trigger metadata"""
310+
def user_properties(self) -> Dict[str, Any]:
311+
"""Contains user defined message properties.
312+
313+
Returns:
314+
--------
315+
Dict[str, Any]:
316+
If user has set properties for the message, returns a dictionary.
317+
If nothing is set, returns an empty dictionary.
318+
"""
319+
pass
320+
321+
@property
322+
@abc.abstractmethod
323+
def metadata(self) -> Optional[Dict[str, Any]]:
324+
"""Getting read-only trigger metadata in a Python dictionary.
325+
326+
Exposing the raw trigger_metadata to our customer. For cardinality=many
327+
scenarios, each event points to the common metadata of all the events.
328+
329+
So when using metadata field when cardinality=many, it only needs to
330+
take one of the events to get all the data (e.g. events[0].metadata).
331+
332+
Returns:
333+
--------
334+
Dict[str, object]
335+
Return the Python dictionary of trigger metadata
336+
"""
116337
pass

0 commit comments

Comments
 (0)