diff --git a/tests/endtoend/cosmosdb_functions/cosmosdb_functions_stein/generic/function_app.py b/tests/endtoend/cosmosdb_functions/cosmosdb_functions_stein/generic/function_app.py new file mode 100644 index 000000000..dee78952a --- /dev/null +++ b/tests/endtoend/cosmosdb_functions/cosmosdb_functions_stein/generic/function_app.py @@ -0,0 +1,62 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import azure.functions as func + +app = func.FunctionApp() + + +@app.generic_trigger(arg_name="req", type="httpTrigger") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_input_binding( + arg_name="docs", + type="cosmosDB", + database_name="test", + collection_name="items", + id="cosmosdb-input-test", + connection_string_setting="AzureWebJobsCosmosDBConnectionString") +def cosmosdb_input(req: func.HttpRequest, docs: func.DocumentList) -> str: + return func.HttpResponse(docs[0].to_json(), mimetype='application/json') + + +@app.generic_trigger( + arg_name="docs", + type="cosmosDBTrigger", + database_name="test", + collection_name="items", + lease_collection_name="leases", + connection_string_setting="AzureWebJobsCosmosDBConnectionString", + create_lease_collection_if_not_exists=True) +@app.generic_output_binding( + arg_name="$return", + type="blob", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-cosmosdb-triggered.txt") +def cosmosdb_trigger(docs: func.DocumentList) -> str: + return docs[0].to_json() + + +@app.generic_trigger(arg_name="req", type="httpTrigger") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_input_binding( + arg_name="file", + connection="AzureWebJobsStorage", + type="blob", + path="python-worker-tests/test-cosmosdb-triggered.txt") +def get_cosmosdb_triggered(req: func.HttpRequest, + file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.generic_trigger(arg_name="req", type="httpTrigger") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_output_binding( + arg_name="doc", + database_name="test", + type="cosmosDB", + collection_name="items", + create_if_not_exists=True, + connection_string_setting="AzureWebJobsCosmosDBConnectionString") +def put_document(req: func.HttpRequest, doc: func.Out[func.Document]): + doc.set(func.Document.from_json(req.get_body())) + + return 'OK' diff --git a/tests/endtoend/eventhub_functions/eventhub_functions_stein/generic/function_app.py b/tests/endtoend/eventhub_functions/eventhub_functions_stein/generic/function_app.py new file mode 100644 index 000000000..e6ead2cc1 --- /dev/null +++ b/tests/endtoend/eventhub_functions/eventhub_functions_stein/generic/function_app.py @@ -0,0 +1,127 @@ +import json +import os +import typing + +from azure.eventhub import EventData +from azure.eventhub.aio import EventHubProducerClient + +import azure.functions as func + +app = func.FunctionApp() + + +# An HttpTrigger to generating EventHub event from EventHub Output Binding +@app.function_name(name="eventhub_output") +@app.generic_trigger(arg_name="req", type="httpTrigger", + route="eventhub_output") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_output_binding(arg_name="event", type="eventHub", + event_hub_name="python-worker-ci-eventhub-one", + connection="AzureWebJobsEventHubConnectionString") +def eventhub_output(req: func.HttpRequest, event: func.Out[str]): + event.set(req.get_body().decode('utf-8')) + return 'OK' + + +# This is an actual EventHub trigger which will convert the event data +# into a storage blob. +@app.function_name(name="eventhub_trigger") +@app.generic_trigger(arg_name="event", + type="eventHubTrigger", + event_hub_name="python-worker-ci-eventhub-one", + connection="AzureWebJobsEventHubConnectionString") +@app.generic_output_binding( + arg_name="$return", + type="blob", + path="python-worker-tests/test-eventhub-triggered.txt", + connection="AzureWebJobsStorage") +def eventhub_trigger(event: func.EventHubEvent) -> bytes: + return event.get_body() + + +# Retrieve the event data from storage blob and return it as Http response +@app.function_name(name="get_eventhub_triggered") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="get_eventhub_triggered") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_input_binding( + arg_name="file", + type="blob", + path="python-worker-tests/test-eventhub-triggered.txt", + connection="AzureWebJobsStorage") +def get_eventhub_triggered(req: func.HttpRequest, + file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +# Retrieve the event data from storage blob and return it as Http response +@app.function_name(name="get_metadata_triggered") +@app.generic_trigger(arg_name="req", type="httpTrigger", + route="get_metadata_triggered") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_input_binding(arg_name="file", + type="blob", + path="python-worker-tests/test-metadata-triggered" + ".txt", + connection="AzureWebJobsStorage") +async def get_metadata_triggered(req: func.HttpRequest, + file: func.InputStream) -> str: + return func.HttpResponse(body=file.read().decode('utf-8'), + status_code=200, + mimetype='application/json') + + +# An HttpTrigger to generating EventHub event from azure-eventhub SDK. +# Events generated from azure-eventhub contain the full metadata. +@app.function_name(name="metadata_output") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="metadata_output") +@app.generic_output_binding(arg_name="$return", type="http") +async def metadata_output(req: func.HttpRequest): + # Parse event metadata from http request + json_string = req.get_body().decode('utf-8') + event_dict = json.loads(json_string) + + # Create an EventHub Client and event batch + client = EventHubProducerClient.from_connection_string( + os.getenv('AzureWebJobsEventHubConnectionString'), + eventhub_name='python-worker-ci-eventhub-one-metadata') + + # Generate new event based on http request with full metadata + event_data_batch = await client.create_batch() + event_data_batch.add(EventData(event_dict.get('body'))) + + # Send out event into event hub + try: + await client.send_batch(event_data_batch) + finally: + await client.close() + + return 'OK' + + +@app.function_name(name="metadata_trigger") +@app.generic_trigger( + arg_name="event", + type="eventHubTrigger", + event_hub_name="python-worker-ci-eventhub-one-metadata", + connection="AzureWebJobsEventHubConnectionString") +@app.generic_output_binding( + arg_name="$return", + type="blob", + path="python-worker-tests/test-metadata-triggered.txt", + connection="AzureWebJobsStorage") +async def metadata_trigger(event: func.EventHubEvent) -> bytes: + event_dict: typing.Mapping[str, typing.Any] = { + 'body': event.get_body().decode('utf-8'), + # Uncomment this when the EnqueuedTimeUtc is fixed in azure-functions + # 'enqueued_time': event.enqueued_time.isoformat(), + 'partition_key': event.partition_key, + 'sequence_number': event.sequence_number, + 'offset': event.offset, + 'metadata': event.metadata + } + + return json.dumps(event_dict) diff --git a/tests/endtoend/queue_functions/queue_functions_stein/generic/function_app.py b/tests/endtoend/queue_functions/queue_functions_stein/generic/function_app.py new file mode 100644 index 000000000..e7f738dcd --- /dev/null +++ b/tests/endtoend/queue_functions/queue_functions_stein/generic/function_app.py @@ -0,0 +1,253 @@ +import json +import logging +import typing + +import azure.functions as func + +app = func.FunctionApp() + + +@app.function_name(name="get_queue_blob") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="get_queue_blob") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_input_binding( + arg_name="file", + type="blob", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob.txt") +def get_queue_blob(req: func.HttpRequest, file: func.InputStream) -> str: + return json.dumps({ + 'queue': json.loads(file.read().decode('utf-8')) + }) + + +@app.function_name(name="get_queue_blob_message_return") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="get_queue_blob_message_return") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_input_binding( + arg_name="file", + type="blob", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob-message-return.txt") +def get_queue_blob_message_return(req: func.HttpRequest, + file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.function_name(name="get_queue_blob_return") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="get_queue_blob_return") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_input_binding(arg_name="file", + type="blob", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob-return" + ".txt") +def get_queue_blob_return(req: func.HttpRequest, + file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.function_name(name="get_queue_untyped_blob_return") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="get_queue_untyped_blob_return") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_input_binding( + arg_name="file", + type="blob", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-untyped-blob-return.txt") +def get_queue_untyped_blob_return(req: func.HttpRequest, + file: func.InputStream) -> str: + return file.read().decode('utf-8') + + +@app.function_name(name="put_queue") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="put_queue") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_output_binding( + arg_name="msg", + type="queue", + connection="AzureWebJobsStorage", + queue_name="testqueue") +def put_queue(req: func.HttpRequest, msg: func.Out[str]): + msg.set(req.get_body()) + + return 'OK' + + +@app.function_name(name="put_queue_message_return") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="put_queue_message_return") +@app.generic_output_binding(arg_name="resp", type="http") +@app.generic_output_binding( + arg_name="$return", + type="queue", + connection="AzureWebJobsStorage", + queue_name="testqueue-message-return") +def main(req: func.HttpRequest, resp: func.Out[str]) -> bytes: + return func.QueueMessage(body=req.get_body()) + + +@app.function_name(name="put_queue_multiple_out") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="put_queue_multiple_out") +@app.generic_output_binding(arg_name="resp", type="http") +@app.generic_output_binding( + arg_name="msg", + type="queue", + connection="AzureWebJobsStorage", + queue_name="testqueue-return-multiple-outparam") +def put_queue_multiple_out(req: func.HttpRequest, + resp: func.Out[func.HttpResponse], + msg: func.Out[func.QueueMessage]) -> None: + data = req.get_body().decode() + msg.set(func.QueueMessage(body=data)) + resp.set(func.HttpResponse(body='HTTP response: {}'.format(data))) + + +@app.function_name("put_queue_return") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="put_queue_return") +@app.generic_output_binding(arg_name="resp", type="http") +@app.generic_output_binding( + arg_name="$return", + type="queue", + connection="AzureWebJobsStorage", + queue_name="testqueue-return") +def put_queue_return(req: func.HttpRequest, resp: func.Out[str]) -> bytes: + return req.get_body() + + +@app.function_name(name="put_queue_multiple_return") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="put_queue_multiple_return") +@app.generic_output_binding(arg_name="$return", type="http") +@app.generic_output_binding( + arg_name="msgs", + type="queue", + connection="AzureWebJobsStorage", + queue_name="testqueue-return-multiple") +def put_queue_multiple_return(req: func.HttpRequest, + msgs: func.Out[typing.List[str]]): + msgs.set(['one', 'two']) + + +@app.function_name(name="put_queue_untyped_return") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="put_queue_untyped_return") +@app.generic_output_binding(arg_name="resp", type="http") +@app.generic_output_binding( + arg_name="$return", + type="queue", + connection="AzureWebJobsStorage", + queue_name="testqueue-untyped-return") +def put_queue_untyped_return(req: func.HttpRequest, + resp: func.Out[str]) -> bytes: + return func.QueueMessage(body=req.get_body()) + + +@app.function_name(name="queue_trigger") +@app.generic_trigger(arg_name="msg", + type="queueTrigger", + queue_name="testqueue", + connection="AzureWebJobsStorage") +@app.generic_output_binding(arg_name="$return", + type="blob", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob.txt") +def queue_trigger(msg: func.QueueMessage) -> str: + result = json.dumps({ + 'id': msg.id, + 'body': msg.get_body().decode('utf-8'), + 'expiration_time': (msg.expiration_time.isoformat() + if msg.expiration_time else None), + 'insertion_time': (msg.insertion_time.isoformat() + if msg.insertion_time else None), + 'time_next_visible': (msg.time_next_visible.isoformat() + if msg.time_next_visible else None), + 'pop_receipt': msg.pop_receipt, + 'dequeue_count': msg.dequeue_count + }) + + return result + + +@app.function_name(name="queue_trigger_message_return") +@app.generic_trigger(arg_name="msg", + type="queueTrigger", + queue_name="testqueue-message-return", + connection="AzureWebJobsStorage") +@app.generic_output_binding( + arg_name="$return", + type="blob", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob-message-return.txt") +def queue_trigger_message_return(msg: func.QueueMessage) -> bytes: + return msg.get_body() + + +@app.function_name(name="queue_trigger_return") +@app.generic_trigger(arg_name="msg", + type="queueTrigger", + queue_name="testqueue-message-return", + connection="AzureWebJobsStorage") +@app.generic_output_binding( + arg_name="$return", + type="blob", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-blob-return.txt") +def queue_trigger_return(msg: func.QueueMessage) -> bytes: + return msg.get_body() + + +@app.function_name(name="queue_trigger_return_multiple") +@app.generic_trigger(arg_name="msg", + type="queueTrigger", + queue_name="testqueue-return-multiple", + connection="AzureWebJobsStorage") +def queue_trigger_return_multiple(msg: func.QueueMessage) -> None: + logging.info('trigger on message: %s', msg.get_body().decode('utf-8')) + + +@app.function_name(name="queue_trigger_untyped") +@app.generic_trigger(arg_name="msg", + type="queueTrigger", + queue_name="testqueue-untyped-return", + connection="AzureWebJobsStorage") +@app.generic_output_binding(arg_name="$return", + type="blob", + connection="AzureWebJobsStorage", + path="python-worker-tests/test-queue-untyped" + "-blob-return.txt") +def queue_trigger_untyped(msg: str) -> str: + return msg + + +@app.function_name(name="put_queue_return_multiple") +@app.generic_trigger(arg_name="req", + type="httpTrigger", + route="put_queue_return_multiple") +@app.generic_output_binding(arg_name="resp", type="http") +@app.generic_output_binding( + arg_name="msgs", + type="queue", + connection="AzureWebJobsStorage", + queue_name="testqueue-return-multiple") +def put_queue_return_multiple(req: func.HttpRequest, + resp: func.Out[str], + msgs: func.Out[typing.List[str]]): + msgs.set(['one', 'two']) diff --git a/tests/endtoend/test_cosmosdb_functions.py b/tests/endtoend/test_cosmosdb_functions.py index 4404e16a1..2a559a621 100644 --- a/tests/endtoend/test_cosmosdb_functions.py +++ b/tests/endtoend/test_cosmosdb_functions.py @@ -92,4 +92,12 @@ class TestCosmosDBFunctionsStein(TestCosmosDBFunctions): @classmethod def get_script_dir(cls): return testutils.E2E_TESTS_FOLDER / 'cosmosdb_functions' / \ - 'cosmosdb_functions_stein' + 'cosmosdb_functions_stein' + + +class TestCosmosDBFunctionsSteinGeneric(TestCosmosDBFunctions): + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'cosmosdb_functions' / \ + 'cosmosdb_functions_stein' / 'generic' diff --git a/tests/endtoend/test_eventhub_functions.py b/tests/endtoend/test_eventhub_functions.py index 225ecdbd8..9a6c06697 100644 --- a/tests/endtoend/test_eventhub_functions.py +++ b/tests/endtoend/test_eventhub_functions.py @@ -34,7 +34,7 @@ def test_eventhub_trigger(self): self.assertEqual(r.status_code, 200) self.assertEqual(r.text, 'OK') - # Once the event get generated, allow function host to pool from + # Once the event get generated, allow function host to poll from # EventHub and wait for eventhub_trigger to execute, # converting the event metadata into a blob. time.sleep(5) @@ -106,3 +106,11 @@ class TestEventHubFunctionsStein(TestEventHubFunctions): def get_script_dir(cls): return testutils.E2E_TESTS_FOLDER / 'eventhub_functions' / \ 'eventhub_functions_stein' + + +class TestEventHubFunctionsSteinGeneric(TestEventHubFunctions): + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'eventhub_functions' / \ + 'eventhub_functions_stein' / 'generic' diff --git a/tests/endtoend/test_queue_functions.py b/tests/endtoend/test_queue_functions.py index 4b7b85929..e67308d3e 100644 --- a/tests/endtoend/test_queue_functions.py +++ b/tests/endtoend/test_queue_functions.py @@ -99,3 +99,11 @@ class TestQueueFunctionsStein(TestQueueFunctions): def get_script_dir(cls): return testutils.E2E_TESTS_FOLDER / 'queue_functions' / \ 'queue_functions_stein' + + +class TestQueueFunctionsSteinGeneric(TestQueueFunctions): + + @classmethod + def get_script_dir(cls): + return testutils.E2E_TESTS_FOLDER / 'queue_functions' / \ + 'queue_functions_stein' / 'generic'