From 05dfd815e397c2b6dba792d45645c0a51fd7a607 Mon Sep 17 00:00:00 2001 From: pdthummar <101662222+pdthummar@users.noreply.github.com> Date: Fri, 3 Jun 2022 10:03:19 -0500 Subject: [PATCH 1/2] added EventHubBatch E2E pystein tests. --- .../function_app.py | 125 ++++++++++++++++++ .../endtoend/test_eventhub_batch_functions.py | 105 +++++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 tests/endtoend/eventhub_batch_functions/eventhub_batch_functions_stein/function_app.py diff --git a/tests/endtoend/eventhub_batch_functions/eventhub_batch_functions_stein/function_app.py b/tests/endtoend/eventhub_batch_functions/eventhub_batch_functions_stein/function_app.py new file mode 100644 index 000000000..30f21dab0 --- /dev/null +++ b/tests/endtoend/eventhub_batch_functions/eventhub_batch_functions_stein/function_app.py @@ -0,0 +1,125 @@ +import json +import os +import typing +import azure.functions as func +from azure.eventhub import EventHubProducerClient, EventData + +app = func.FunctionApp() + + +# This is an actual EventHub trigger which handles Eventhub events in batches. +# It serializes multiple event data into a json and store it into a blob. +@app.function_name(name="eventhub_multiple") +@app.event_hub_message_trigger( + arg_name="events", + event_hub_name="python-worker-ci-eventhub-batch", + connection="AzureWebJobsEventHubConnectionString", + data_type="string", + cardinality="many") +@app.write_table(arg_name="$return", + connection="AzureWebJobsStorage", + table_name="EventHubBatchTest") +def eventhub_multiple(events): + table_entries = [] + for event in events: + json_entry = event.get_body() + table_entry = json.loads(json_entry) + table_entries.append(table_entry) + + table_json = json.dumps(table_entries) + + return table_json + + +# An HttpTrigger to generating EventHub event from EventHub Output Binding +@app.function_name(name="eventhub_output_batch") +@app.write_event_hub_message(arg_name="$return", + connection="AzureWebJobsEventHubConnectionString", + event_hub_name="python-worker-ci-eventhub-batch") +@app.route(route="eventhub_output_batch", binding_arg_name="out") +def eventhub_output_batch(req: func.HttpRequest, out: func.Out[str]) -> str: + events = req.get_body().decode('utf-8') + out.set('hello') + return events + + +# Retrieve the event data from storage blob and return it as Http response +@app.function_name(name="get_eventhub_batch_triggered") +@app.route(route="get_eventhub_batch_triggered/{id}") +@app.read_table(arg_name="testEntities", + connection="AzureWebJobsStorage", + table_name="EventHubBatchTest", + partition_key="{id}") +def get_eventhub_batch_triggered(req: func.HttpRequest, testEntities): + return func.HttpResponse(status_code=200, body=testEntities) + + +# Retrieve the event data from storage blob and return it as Http response +@app.function_name(name="get_metadata_batch_triggered") +@app.route(route="get_metadata_batch_triggered") +@app.read_blob(arg_name="file", + path="python-worker-tests/test-metadata-batch-triggered.txt", + connection="AzureWebJobsStorage") +def get_metadata_batch_triggered(req: func.HttpRequest, + file: func.InputStream) -> str: + return func.HttpResponse(body=file.read().decode('utf-8'), + status_code=200, + mimetype='application/json') + + +# This is an actual EventHub trigger which handles Eventhub events in batches. +# It serializes multiple event data into a json and store it into a blob. +@app.function_name(name="metadata_multiple") +@app.event_hub_message_trigger( + arg_name="events", + event_hub_name="python-worker-ci-eventhub-batch-metadata", + connection="AzureWebJobsEventHubConnectionString", + data_type="binary", + cardinality="many") +@app.write_blob(arg_name="$return", + path="python-worker-tests/test-metadata-batch-triggered.txt", + connection="AzureWebJobsStorage") +def metadata_multiple(events: typing.List[func.EventHubEvent]) -> bytes: + event_list = [] + for event in events: + event_dict: typing.Mapping[str, typing.Any] = { + 'body': event.get_body().decode('utf-8'), + 'enqueued_time': event.enqueued_time.isoformat(), + 'partition_key': event.partition_key, + 'sequence_number': event.sequence_number, + 'offset': event.offset, + 'metadata': event.metadata + } + event_list.append(event_dict) + + return json.dumps(event_list) + + +# An HttpTrigger to generating EventHub event from azure-eventhub SDK. +# Events generated from azure-eventhub contain the full metadata. +@app.function_name(name="metadata_output_batch") +@app.route(route="metadata_output_batch") +def main(req: func.HttpRequest): + # Get event count from http request query parameter + count = int(req.params.get('count', '1')) + + # Parse event metadata from http request + json_string = req.get_body().decode('utf-8') + event_dict = json.loads(json_string) + + # Create an EventHub Client and event batch + client = EventHubProducerClient.from_connection_string( + os.getenv('AzureWebJobsEventHubConnectionString'), + eventhub_name='python-worker-ci-eventhub-batch-metadata') + + # Generate new event based on http request with full metadata + event_data_batch = client.create_batch() + random_number = int(event_dict.get('body', '0')) + for i in range(count): + event_data_batch.add(EventData(str(random_number + i))) + + # Send out event into event hub + with client: + client.send_batch(event_data_batch) + + return 'OK' diff --git a/tests/endtoend/test_eventhub_batch_functions.py b/tests/endtoend/test_eventhub_batch_functions.py index e0851174b..27d15f811 100644 --- a/tests/endtoend/test_eventhub_batch_functions.py +++ b/tests/endtoend/test_eventhub_batch_functions.py @@ -156,3 +156,108 @@ def _get_table_function_json_path(self): script_dir = pathlib.Path(self.get_script_dir()) json_path = pathlib.Path('get_eventhub_batch_triggered/function.json') return testutils.TESTS_ROOT / script_dir / json_path + + +class TestEventHubBatchFunctionsStein(testutils.WebHostTestCase): + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'eventhub_batch_functions' / \ + 'eventhub_batch_functions_stein' + + @testutils.retryable_test(3, 5) + def test_eventhub_multiple(self): + NUM_EVENTS = 3 + all_row_keys_seen = dict([(str(i), True) for i in range(NUM_EVENTS)]) + partition_key = str(round(time.time())) + + docs = [] + for i in range(NUM_EVENTS): + doc = {'PartitionKey': partition_key, 'RowKey': i} + docs.append(doc) + + r = self.webhost.request('POST', 'eventhub_output_batch', + data=json.dumps(docs)) + self.assertEqual(r.status_code, 200) + + row_keys = [str(i) for i in range(NUM_EVENTS)] + seen = [False] * NUM_EVENTS + row_keys_seen = dict(zip(row_keys, seen)) + + # Allow trigger to fire. + time.sleep(5) + + r = self.webhost.request( + 'GET', + f'get_eventhub_batch_triggered/{partition_key}') + self.assertEqual(r.status_code, 200) + entries = r.json() + for entry in entries: + self.assertEqual(entry['PartitionKey'], partition_key) + row_key = entry['RowKey'] + row_keys_seen[row_key] = True + + self.assertDictEqual(all_row_keys_seen, row_keys_seen) + + @testutils.retryable_test(3, 5) + def test_eventhub_multiple_with_metadata(self): + # Generate a unique event body for EventHub event + # Record the start_time and end_time for checking event enqueue time + start_time = datetime.now(tz=tz.UTC) + count = 10 + random_number = str(round(time.time()) % 1000) + req_body = { + 'body': random_number + } + + # Invoke metadata_output HttpTrigger to generate an EventHub event + # from azure-eventhub SDK + r = self.webhost.request('POST', + f'metadata_output_batch?count={count}', + data=json.dumps(req_body)) + self.assertEqual(r.status_code, 200) + self.assertIn('OK', r.text) + end_time = datetime.now(tz=tz.UTC) + + # Once the event get generated, allow function host to pool from + # EventHub and wait for metadata_multiple to execute, + # converting the event metadata into a blob. + time.sleep(5) + + # Call get_metadata_batch_triggered to retrieve event metadata + r = self.webhost.request('GET', 'get_metadata_batch_triggered') + self.assertEqual(r.status_code, 200) + + # Check metadata and events length, events should be batched processed + events = r.json() + self.assertIsInstance(events, list) + self.assertGreater(len(events), 1) + + # EventhubEvent property check + for event_index in range(len(events)): + event = events[event_index] + + # Check if the event is enqueued between start_time and end_time + enqueued_time = parser.isoparse(event['enqueued_time']).astimezone( + tz=tz.UTC) + self.assertTrue(start_time < enqueued_time < end_time) + + # Check if event properties are properly set + self.assertIsNone(event['partition_key']) # only 1 partition + self.assertGreaterEqual(event['sequence_number'], 0) + self.assertIsNotNone(event['offset']) + + # Check if event.metadata field is properly set + self.assertIsNotNone(event['metadata']) + metadata = event['metadata'] + sys_props_array = metadata['SystemPropertiesArray'] + sys_props = sys_props_array[event_index] + enqueued_time = parser.isoparse(sys_props['EnqueuedTimeUtc']) + + # Check event trigger time and other system properties + self.assertTrue( + start_time.timestamp() < enqueued_time.timestamp() + < end_time.timestamp()) # NoQA + self.assertIsNone(sys_props['PartitionKey']) + self.assertGreaterEqual(sys_props['SequenceNumber'], 0) + self.assertIsNotNone(sys_props['Offset']) From 10da1b463a32f95d48ee3b9b5d13163657786be2 Mon Sep 17 00:00:00 2001 From: pdthummar <101662222+pdthummar@users.noreply.github.com> Date: Fri, 3 Jun 2022 10:15:48 -0500 Subject: [PATCH 2/2] added copyright lic --- .../eventhub_batch_functions_stein/function_app.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/endtoend/eventhub_batch_functions/eventhub_batch_functions_stein/function_app.py b/tests/endtoend/eventhub_batch_functions/eventhub_batch_functions_stein/function_app.py index 30f21dab0..a50f80fa6 100644 --- a/tests/endtoend/eventhub_batch_functions/eventhub_batch_functions_stein/function_app.py +++ b/tests/endtoend/eventhub_batch_functions/eventhub_batch_functions_stein/function_app.py @@ -1,3 +1,5 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. import json import os import typing