From 20fb4f5bdc7ffcdf3763fae8b6914a8a9be249f6 Mon Sep 17 00:00:00 2001 From: Maheer Iqbal <42051041+maiqbal11@users.noreply.github.com> Date: Thu, 22 Aug 2019 10:19:12 -0700 Subject: [PATCH 1/3] Adding EventHub batching behavior --- .ci/linux_devops_build.sh | 2 +- azure-pipelines-e2e.yml | 2 +- azure_functions_worker/bindings/datumdef.py | 6 ++ azure_functions_worker/constants.py | 1 + azure_functions_worker/dispatcher.py | 1 + setup.py | 2 +- .../eventhub_multiple/__init__.py | 16 ++++ .../eventhub_multiple/function.json | 22 +++++ .../eventhub_output_batch/__init__.py | 6 ++ .../eventhub_output_batch/function.json | 18 ++++ .../get_eventhub_batch_triggered/__init__.py | 5 + .../function.json | 27 ++++++ .../eventhub_trigger/function.json | 2 +- .../endtoend/test_eventhub_batch_functions.py | 94 +++++++++++++++++++ 14 files changed, 200 insertions(+), 4 deletions(-) create mode 100644 tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py create mode 100644 tests/endtoend/eventhub_batch_functions/eventhub_multiple/function.json create mode 100644 tests/endtoend/eventhub_batch_functions/eventhub_output_batch/__init__.py create mode 100644 tests/endtoend/eventhub_batch_functions/eventhub_output_batch/function.json create mode 100644 tests/endtoend/eventhub_batch_functions/get_eventhub_batch_triggered/__init__.py create mode 100644 tests/endtoend/eventhub_batch_functions/get_eventhub_batch_triggered/function.json create mode 100644 tests/endtoend/test_eventhub_batch_functions.py diff --git a/.ci/linux_devops_build.sh b/.ci/linux_devops_build.sh index 21855340f..627b6d597 100644 --- a/.ci/linux_devops_build.sh +++ b/.ci/linux_devops_build.sh @@ -2,5 +2,5 @@ set -e -x -python -m pip install -U -e .[dev] +python -m pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple -U -e .[dev] python setup.py webhost \ No newline at end of file diff --git a/azure-pipelines-e2e.yml b/azure-pipelines-e2e.yml index a68dbbc4c..dd782ef20 100644 --- a/azure-pipelines-e2e.yml +++ b/azure-pipelines-e2e.yml @@ -36,7 +36,7 @@ jobs: displayName: 'Install dotnet' - bash: | set -e -x - python -m pip install -U -e .[dev] + python -m pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple -U -e .[dev] python setup.py webhost displayName: 'Build' - bash: | diff --git a/azure_functions_worker/bindings/datumdef.py b/azure_functions_worker/bindings/datumdef.py index efd15be82..f6f88e7cf 100644 --- a/azure_functions_worker/bindings/datumdef.py +++ b/azure_functions_worker/bindings/datumdef.py @@ -49,6 +49,12 @@ def from_typed_data(cls, td: protos.TypedData): val = td.bytes elif tt == 'json': val = td.json + elif tt == 'collection_bytes': + val = td.collection_bytes + elif tt == 'collection_string': + val = td.collection_string + elif tt == 'collection_sint64': + val = td.collection_sint64 elif tt is None: return None else: diff --git a/azure_functions_worker/constants.py b/azure_functions_worker/constants.py index 61fff37f7..62df7a022 100644 --- a/azure_functions_worker/constants.py +++ b/azure_functions_worker/constants.py @@ -1,2 +1,3 @@ # Capabilities RAW_HTTP_BODY_BYTES = "RawHttpBodyBytes" +TYPED_DATA_COLLECTION = "TypedDataCollection" diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 2709713dc..58ed246f2 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -216,6 +216,7 @@ async def _handle__worker_init_request(self, req): capabilities = dict() capabilities[constants.RAW_HTTP_BODY_BYTES] = "true" + capabilities[constants.TYPED_DATA_COLLECTION] = "true" return protos.StreamingMessage( request_id=self.request_id, diff --git a/setup.py b/setup.py index dad5080e2..f73da2fa9 100644 --- a/setup.py +++ b/setup.py @@ -269,7 +269,7 @@ def run(self): ], extras_require={ 'dev': [ - 'azure-functions==1.0.0b5', + 'azure-functions==1.0.0', 'flake8~=3.5.0', 'mypy', 'pytest', diff --git a/tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py b/tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py new file mode 100644 index 000000000..42a7e46d2 --- /dev/null +++ b/tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py @@ -0,0 +1,16 @@ +import typing +import json + +import azure.functions as func + + +def main(events: typing.List[func.EventHubEvent]): + table_entries = [] + for event in events: + json_entry = event.get_body() + table_entry = json.loads(json_entry) + table_entries.append(table_entry) + + table_json = json.dumps(table_entries) + + return table_json diff --git a/tests/endtoend/eventhub_batch_functions/eventhub_multiple/function.json b/tests/endtoend/eventhub_batch_functions/eventhub_multiple/function.json new file mode 100644 index 000000000..01fee0ce1 --- /dev/null +++ b/tests/endtoend/eventhub_batch_functions/eventhub_multiple/function.json @@ -0,0 +1,22 @@ +{ + "scriptFile": "__init__.py", + + "bindings": [ + { + "type": "eventHubTrigger", + "name": "events", + "direction": "in", + "cardinality": "many", + "dataType": "string", + "eventHubName": "python-worker-ci-eventhub-batch", + "connection": "AzureWebJobsEventHubConnectionString" + }, + { + "direction": "out", + "type": "table", + "name": "$return", + "tableName": "EventHubBatchTest", + "connection": "AzureWebJobsStorage" + } + ] +} diff --git a/tests/endtoend/eventhub_batch_functions/eventhub_output_batch/__init__.py b/tests/endtoend/eventhub_batch_functions/eventhub_output_batch/__init__.py new file mode 100644 index 000000000..22bd098b1 --- /dev/null +++ b/tests/endtoend/eventhub_batch_functions/eventhub_output_batch/__init__.py @@ -0,0 +1,6 @@ +import azure.functions as func + + +def main(req: func.HttpRequest) -> str: + events = req.get_body().decode('utf-8') + return events diff --git a/tests/endtoend/eventhub_batch_functions/eventhub_output_batch/function.json b/tests/endtoend/eventhub_batch_functions/eventhub_output_batch/function.json new file mode 100644 index 000000000..f885b2168 --- /dev/null +++ b/tests/endtoend/eventhub_batch_functions/eventhub_output_batch/function.json @@ -0,0 +1,18 @@ +{ + "scriptFile": "__init__.py", + + "bindings": [ + { + "type": "httpTrigger", + "direction": "in", + "name": "req" + }, + { + "type": "eventHub", + "name": "$return", + "direction": "out", + "eventHubName": "python-worker-ci-eventhub-batch", + "connection": "AzureWebJobsEventHubConnectionString" + } + ] +} diff --git a/tests/endtoend/eventhub_batch_functions/get_eventhub_batch_triggered/__init__.py b/tests/endtoend/eventhub_batch_functions/get_eventhub_batch_triggered/__init__.py new file mode 100644 index 000000000..d3f88317c --- /dev/null +++ b/tests/endtoend/eventhub_batch_functions/get_eventhub_batch_triggered/__init__.py @@ -0,0 +1,5 @@ +import azure.functions as func + + +def main(req: func.HttpRequest, testEntities): + return func.HttpResponse(status_code=200, body=testEntities) diff --git a/tests/endtoend/eventhub_batch_functions/get_eventhub_batch_triggered/function.json b/tests/endtoend/eventhub_batch_functions/get_eventhub_batch_triggered/function.json new file mode 100644 index 000000000..278338c85 --- /dev/null +++ b/tests/endtoend/eventhub_batch_functions/get_eventhub_batch_triggered/function.json @@ -0,0 +1,27 @@ +{ + "scriptFile": "__init__.py", + "bindings": [ + { + "type": "httpTrigger", + "direction": "in", + "authLevel": "anonymous", + "methods": [ + "get" + ], + "name": "req" + }, + { + "direction": "in", + "type": "table", + "name": "testEntities", + "partitionKey": "WillBePopulated", + "tableName": "EventHubBatchTest", + "connection": "AzureWebJobsStorage" + }, + { + "type": "http", + "direction": "out", + "name": "$return" + } + ] +} \ No newline at end of file diff --git a/tests/endtoend/eventhub_functions/eventhub_trigger/function.json b/tests/endtoend/eventhub_functions/eventhub_trigger/function.json index 7350f7304..f06d72f7b 100644 --- a/tests/endtoend/eventhub_functions/eventhub_trigger/function.json +++ b/tests/endtoend/eventhub_functions/eventhub_trigger/function.json @@ -6,7 +6,7 @@ "type": "eventHubTrigger", "name": "event", "direction": "in", - "eventHubName": "python-worker-ci", + "eventHubName": "python-worker-eventhub-ci-linux", "connection": "AzureWebJobsEventHubConnectionString" }, { diff --git a/tests/endtoend/test_eventhub_batch_functions.py b/tests/endtoend/test_eventhub_batch_functions.py new file mode 100644 index 000000000..fc7214630 --- /dev/null +++ b/tests/endtoend/test_eventhub_batch_functions.py @@ -0,0 +1,94 @@ +import json +import time +import pathlib + +from azure_functions_worker import testutils + + +class TestEventHubFunctions(testutils.WebHostTestCase): + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'eventhub_batch_functions' + + def test_eventhub_multiple(self): + NUM_EVENTS = 3 + all_row_keys_seen = dict([(str(i), True) for i in range(NUM_EVENTS)]) + partition_key = str(round(time.time())) + + # Dynamically rewrite function.json to point to new partition key + # for recording EventHub state + old_partition_key = self._get_table_partition_key() + self._set_table_partition_key(partition_key) + + # wait for host to restart after change + time.sleep(1) + + docs = [] + for i in range(NUM_EVENTS): + doc = {'PartitionKey': partition_key, 'RowKey': i} + docs.append(doc) + + r = self.webhost.request('POST', 'eventhub_output_batch', + data=json.dumps(docs)) + self.assertEqual(r.status_code, 200) + + max_retries = 30 + + row_keys = [str(i) for i in range(NUM_EVENTS)] + seen = [False] * NUM_EVENTS + row_keys_seen = dict(zip(row_keys, seen)) + for try_no in range(max_retries): + # Allow trigger to fire. + time.sleep(2) + + try: + r = self.webhost.request('GET', + 'get_eventhub_batch_triggered') + self.assertEqual(r.status_code, 200) + entries = r.json() + for entry in entries: + self.assertEqual(entry['PartitionKey'], partition_key) + row_key = entry['RowKey'] + row_keys_seen[row_key] = True + + self.assertDictEqual(all_row_keys_seen, row_keys_seen) + + except AssertionError as e: + if try_no == max_retries - 1: + self._cleanup(old_partition_key) + raise + else: + break + + self._cleanup(old_partition_key) + + def _cleanup(self, old_partition_key): + self._set_table_partition_key(old_partition_key) + + def _get_table_partition_key(self): + func_dict = self._get_table_function_json_dict() + partition_key = func_dict['bindings'][1]['partitionKey'] + return partition_key + + def _set_table_partition_key(self, partition_key): + full_json_path = self._get_table_function_json_path() + + func_dict = self._get_table_function_json_dict() + func_dict['bindings'][1]['partitionKey'] = partition_key + + with open(full_json_path, 'w') as f: + json.dump(func_dict, f, indent=2) + + def _get_table_function_json_dict(self): + full_json_path = self._get_table_function_json_path() + + with open(full_json_path, 'r') as f: + func_dict = json.load(f) + + return func_dict + + def _get_table_function_json_path(self): + script_dir = pathlib.Path(self.get_script_dir()) + json_path = pathlib.Path('get_eventhub_batch_triggered/function.json') + return testutils.TESTS_ROOT / script_dir / json_path From f03cfde9dcffd503b9902ae4723359272cbc14b2 Mon Sep 17 00:00:00 2001 From: Maheer Iqbal <42051041+maiqbal11@users.noreply.github.com> Date: Thu, 22 Aug 2019 15:51:36 -0700 Subject: [PATCH 2/3] Remove input annotation and use patched azure-functions --- setup.py | 2 +- .../eventhub_batch_functions/eventhub_multiple/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index f73da2fa9..fb79d1650 100644 --- a/setup.py +++ b/setup.py @@ -269,7 +269,7 @@ def run(self): ], extras_require={ 'dev': [ - 'azure-functions==1.0.0', + 'azure-functions==1.0.3', 'flake8~=3.5.0', 'mypy', 'pytest', diff --git a/tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py b/tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py index 42a7e46d2..e01ee7df3 100644 --- a/tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py +++ b/tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py @@ -4,7 +4,7 @@ import azure.functions as func -def main(events: typing.List[func.EventHubEvent]): +def main(events): table_entries = [] for event in events: json_entry = event.get_body() From d5343052ee7e545683463906e89f8bb17ae25b76 Mon Sep 17 00:00:00 2001 From: Maheer Iqbal <42051041+maiqbal11@users.noreply.github.com> Date: Thu, 22 Aug 2019 15:58:27 -0700 Subject: [PATCH 3/3] Fix code quality --- .../eventhub_batch_functions/eventhub_multiple/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py b/tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py index e01ee7df3..87a60ef6e 100644 --- a/tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py +++ b/tests/endtoend/eventhub_batch_functions/eventhub_multiple/__init__.py @@ -1,8 +1,5 @@ -import typing import json -import azure.functions as func - def main(events): table_entries = []