Skip to content

Commit b7b555d

Browse files
authored
Merge branch 'dev' into pthummar/eventGrid_E2E_tests
2 parents f85cd6b + 5f28c27 commit b7b555d

File tree

2 files changed

+232
-0
lines changed

2 files changed

+232
-0
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import json
4+
import os
5+
import typing
6+
import azure.functions as func
7+
from azure.eventhub import EventHubProducerClient, EventData
8+
9+
app = func.FunctionApp()
10+
11+
12+
# This is an actual EventHub trigger which handles Eventhub events in batches.
13+
# It serializes multiple event data into a json and store it into a blob.
14+
@app.function_name(name="eventhub_multiple")
15+
@app.event_hub_message_trigger(
16+
arg_name="events",
17+
event_hub_name="python-worker-ci-eventhub-batch",
18+
connection="AzureWebJobsEventHubConnectionString",
19+
data_type="string",
20+
cardinality="many")
21+
@app.write_table(arg_name="$return",
22+
connection="AzureWebJobsStorage",
23+
table_name="EventHubBatchTest")
24+
def eventhub_multiple(events):
25+
table_entries = []
26+
for event in events:
27+
json_entry = event.get_body()
28+
table_entry = json.loads(json_entry)
29+
table_entries.append(table_entry)
30+
31+
table_json = json.dumps(table_entries)
32+
33+
return table_json
34+
35+
36+
# An HttpTrigger to generating EventHub event from EventHub Output Binding
37+
@app.function_name(name="eventhub_output_batch")
38+
@app.write_event_hub_message(arg_name="$return",
39+
connection="AzureWebJobsEventHubConnectionString",
40+
event_hub_name="python-worker-ci-eventhub-batch")
41+
@app.route(route="eventhub_output_batch", binding_arg_name="out")
42+
def eventhub_output_batch(req: func.HttpRequest, out: func.Out[str]) -> str:
43+
events = req.get_body().decode('utf-8')
44+
out.set('hello')
45+
return events
46+
47+
48+
# Retrieve the event data from storage blob and return it as Http response
49+
@app.function_name(name="get_eventhub_batch_triggered")
50+
@app.route(route="get_eventhub_batch_triggered/{id}")
51+
@app.read_table(arg_name="testEntities",
52+
connection="AzureWebJobsStorage",
53+
table_name="EventHubBatchTest",
54+
partition_key="{id}")
55+
def get_eventhub_batch_triggered(req: func.HttpRequest, testEntities):
56+
return func.HttpResponse(status_code=200, body=testEntities)
57+
58+
59+
# Retrieve the event data from storage blob and return it as Http response
60+
@app.function_name(name="get_metadata_batch_triggered")
61+
@app.route(route="get_metadata_batch_triggered")
62+
@app.read_blob(arg_name="file",
63+
path="python-worker-tests/test-metadata-batch-triggered.txt",
64+
connection="AzureWebJobsStorage")
65+
def get_metadata_batch_triggered(req: func.HttpRequest,
66+
file: func.InputStream) -> str:
67+
return func.HttpResponse(body=file.read().decode('utf-8'),
68+
status_code=200,
69+
mimetype='application/json')
70+
71+
72+
# This is an actual EventHub trigger which handles Eventhub events in batches.
73+
# It serializes multiple event data into a json and store it into a blob.
74+
@app.function_name(name="metadata_multiple")
75+
@app.event_hub_message_trigger(
76+
arg_name="events",
77+
event_hub_name="python-worker-ci-eventhub-batch-metadata",
78+
connection="AzureWebJobsEventHubConnectionString",
79+
data_type="binary",
80+
cardinality="many")
81+
@app.write_blob(arg_name="$return",
82+
path="python-worker-tests/test-metadata-batch-triggered.txt",
83+
connection="AzureWebJobsStorage")
84+
def metadata_multiple(events: typing.List[func.EventHubEvent]) -> bytes:
85+
event_list = []
86+
for event in events:
87+
event_dict: typing.Mapping[str, typing.Any] = {
88+
'body': event.get_body().decode('utf-8'),
89+
'enqueued_time': event.enqueued_time.isoformat(),
90+
'partition_key': event.partition_key,
91+
'sequence_number': event.sequence_number,
92+
'offset': event.offset,
93+
'metadata': event.metadata
94+
}
95+
event_list.append(event_dict)
96+
97+
return json.dumps(event_list)
98+
99+
100+
# An HttpTrigger to generating EventHub event from azure-eventhub SDK.
101+
# Events generated from azure-eventhub contain the full metadata.
102+
@app.function_name(name="metadata_output_batch")
103+
@app.route(route="metadata_output_batch")
104+
def main(req: func.HttpRequest):
105+
# Get event count from http request query parameter
106+
count = int(req.params.get('count', '1'))
107+
108+
# Parse event metadata from http request
109+
json_string = req.get_body().decode('utf-8')
110+
event_dict = json.loads(json_string)
111+
112+
# Create an EventHub Client and event batch
113+
client = EventHubProducerClient.from_connection_string(
114+
os.getenv('AzureWebJobsEventHubConnectionString'),
115+
eventhub_name='python-worker-ci-eventhub-batch-metadata')
116+
117+
# Generate new event based on http request with full metadata
118+
event_data_batch = client.create_batch()
119+
random_number = int(event_dict.get('body', '0'))
120+
for i in range(count):
121+
event_data_batch.add(EventData(str(random_number + i)))
122+
123+
# Send out event into event hub
124+
with client:
125+
client.send_batch(event_data_batch)
126+
127+
return 'OK'

tests/endtoend/test_eventhub_batch_functions.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,108 @@ def _get_table_function_json_path(self):
156156
script_dir = pathlib.Path(self.get_script_dir())
157157
json_path = pathlib.Path('get_eventhub_batch_triggered/function.json')
158158
return testutils.TESTS_ROOT / script_dir / json_path
159+
160+
161+
class TestEventHubBatchFunctionsStein(testutils.WebHostTestCase):
162+
163+
@classmethod
164+
def get_script_dir(cls):
165+
return testutils.E2E_TESTS_FOLDER / 'eventhub_batch_functions' / \
166+
'eventhub_batch_functions_stein'
167+
168+
@testutils.retryable_test(3, 5)
169+
def test_eventhub_multiple(self):
170+
NUM_EVENTS = 3
171+
all_row_keys_seen = dict([(str(i), True) for i in range(NUM_EVENTS)])
172+
partition_key = str(round(time.time()))
173+
174+
docs = []
175+
for i in range(NUM_EVENTS):
176+
doc = {'PartitionKey': partition_key, 'RowKey': i}
177+
docs.append(doc)
178+
179+
r = self.webhost.request('POST', 'eventhub_output_batch',
180+
data=json.dumps(docs))
181+
self.assertEqual(r.status_code, 200)
182+
183+
row_keys = [str(i) for i in range(NUM_EVENTS)]
184+
seen = [False] * NUM_EVENTS
185+
row_keys_seen = dict(zip(row_keys, seen))
186+
187+
# Allow trigger to fire.
188+
time.sleep(5)
189+
190+
r = self.webhost.request(
191+
'GET',
192+
f'get_eventhub_batch_triggered/{partition_key}')
193+
self.assertEqual(r.status_code, 200)
194+
entries = r.json()
195+
for entry in entries:
196+
self.assertEqual(entry['PartitionKey'], partition_key)
197+
row_key = entry['RowKey']
198+
row_keys_seen[row_key] = True
199+
200+
self.assertDictEqual(all_row_keys_seen, row_keys_seen)
201+
202+
@testutils.retryable_test(3, 5)
203+
def test_eventhub_multiple_with_metadata(self):
204+
# Generate a unique event body for EventHub event
205+
# Record the start_time and end_time for checking event enqueue time
206+
start_time = datetime.now(tz=tz.UTC)
207+
count = 10
208+
random_number = str(round(time.time()) % 1000)
209+
req_body = {
210+
'body': random_number
211+
}
212+
213+
# Invoke metadata_output HttpTrigger to generate an EventHub event
214+
# from azure-eventhub SDK
215+
r = self.webhost.request('POST',
216+
f'metadata_output_batch?count={count}',
217+
data=json.dumps(req_body))
218+
self.assertEqual(r.status_code, 200)
219+
self.assertIn('OK', r.text)
220+
end_time = datetime.now(tz=tz.UTC)
221+
222+
# Once the event get generated, allow function host to pool from
223+
# EventHub and wait for metadata_multiple to execute,
224+
# converting the event metadata into a blob.
225+
time.sleep(5)
226+
227+
# Call get_metadata_batch_triggered to retrieve event metadata
228+
r = self.webhost.request('GET', 'get_metadata_batch_triggered')
229+
self.assertEqual(r.status_code, 200)
230+
231+
# Check metadata and events length, events should be batched processed
232+
events = r.json()
233+
self.assertIsInstance(events, list)
234+
self.assertGreater(len(events), 1)
235+
236+
# EventhubEvent property check
237+
for event_index in range(len(events)):
238+
event = events[event_index]
239+
240+
# Check if the event is enqueued between start_time and end_time
241+
enqueued_time = parser.isoparse(event['enqueued_time']).astimezone(
242+
tz=tz.UTC)
243+
self.assertTrue(start_time < enqueued_time < end_time)
244+
245+
# Check if event properties are properly set
246+
self.assertIsNone(event['partition_key']) # only 1 partition
247+
self.assertGreaterEqual(event['sequence_number'], 0)
248+
self.assertIsNotNone(event['offset'])
249+
250+
# Check if event.metadata field is properly set
251+
self.assertIsNotNone(event['metadata'])
252+
metadata = event['metadata']
253+
sys_props_array = metadata['SystemPropertiesArray']
254+
sys_props = sys_props_array[event_index]
255+
enqueued_time = parser.isoparse(sys_props['EnqueuedTimeUtc'])
256+
257+
# Check event trigger time and other system properties
258+
self.assertTrue(
259+
start_time.timestamp() < enqueued_time.timestamp()
260+
< end_time.timestamp()) # NoQA
261+
self.assertIsNone(sys_props['PartitionKey'])
262+
self.assertGreaterEqual(sys_props['SequenceNumber'], 0)
263+
self.assertIsNotNone(sys_props['Offset'])

0 commit comments

Comments
 (0)