Skip to content

Commit 753db47

Browse files
authored
Merge branch 'dev' into devcontainer
2 parents 4256905 + 4b9eccc commit 753db47

File tree

6 files changed

+468
-2
lines changed

6 files changed

+468
-2
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
import azure.functions as func
4+
5+
app = func.FunctionApp()
6+
7+
8+
@app.generic_trigger(arg_name="req", type="httpTrigger")
9+
@app.generic_output_binding(arg_name="$return", type="http")
10+
@app.generic_input_binding(
11+
arg_name="docs",
12+
type="cosmosDB",
13+
database_name="test",
14+
collection_name="items",
15+
id="cosmosdb-input-test",
16+
connection_string_setting="AzureWebJobsCosmosDBConnectionString")
17+
def cosmosdb_input(req: func.HttpRequest, docs: func.DocumentList) -> str:
18+
return func.HttpResponse(docs[0].to_json(), mimetype='application/json')
19+
20+
21+
@app.generic_trigger(
22+
arg_name="docs",
23+
type="cosmosDBTrigger",
24+
database_name="test",
25+
collection_name="items",
26+
lease_collection_name="leases",
27+
connection_string_setting="AzureWebJobsCosmosDBConnectionString",
28+
create_lease_collection_if_not_exists=True)
29+
@app.generic_output_binding(
30+
arg_name="$return",
31+
type="blob",
32+
connection="AzureWebJobsStorage",
33+
path="python-worker-tests/test-cosmosdb-triggered.txt")
34+
def cosmosdb_trigger(docs: func.DocumentList) -> str:
35+
return docs[0].to_json()
36+
37+
38+
@app.generic_trigger(arg_name="req", type="httpTrigger")
39+
@app.generic_output_binding(arg_name="$return", type="http")
40+
@app.generic_input_binding(
41+
arg_name="file",
42+
connection="AzureWebJobsStorage",
43+
type="blob",
44+
path="python-worker-tests/test-cosmosdb-triggered.txt")
45+
def get_cosmosdb_triggered(req: func.HttpRequest,
46+
file: func.InputStream) -> str:
47+
return file.read().decode('utf-8')
48+
49+
50+
@app.generic_trigger(arg_name="req", type="httpTrigger")
51+
@app.generic_output_binding(arg_name="$return", type="http")
52+
@app.generic_output_binding(
53+
arg_name="doc",
54+
database_name="test",
55+
type="cosmosDB",
56+
collection_name="items",
57+
create_if_not_exists=True,
58+
connection_string_setting="AzureWebJobsCosmosDBConnectionString")
59+
def put_document(req: func.HttpRequest, doc: func.Out[func.Document]):
60+
doc.set(func.Document.from_json(req.get_body()))
61+
62+
return 'OK'
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import json
2+
import os
3+
import typing
4+
5+
from azure.eventhub import EventData
6+
from azure.eventhub.aio import EventHubProducerClient
7+
8+
import azure.functions as func
9+
10+
app = func.FunctionApp()
11+
12+
13+
# An HttpTrigger to generating EventHub event from EventHub Output Binding
14+
@app.function_name(name="eventhub_output")
15+
@app.generic_trigger(arg_name="req", type="httpTrigger",
16+
route="eventhub_output")
17+
@app.generic_output_binding(arg_name="$return", type="http")
18+
@app.generic_output_binding(arg_name="event", type="eventHub",
19+
event_hub_name="python-worker-ci-eventhub-one",
20+
connection="AzureWebJobsEventHubConnectionString")
21+
def eventhub_output(req: func.HttpRequest, event: func.Out[str]):
22+
event.set(req.get_body().decode('utf-8'))
23+
return 'OK'
24+
25+
26+
# This is an actual EventHub trigger which will convert the event data
27+
# into a storage blob.
28+
@app.function_name(name="eventhub_trigger")
29+
@app.generic_trigger(arg_name="event",
30+
type="eventHubTrigger",
31+
event_hub_name="python-worker-ci-eventhub-one",
32+
connection="AzureWebJobsEventHubConnectionString")
33+
@app.generic_output_binding(
34+
arg_name="$return",
35+
type="blob",
36+
path="python-worker-tests/test-eventhub-triggered.txt",
37+
connection="AzureWebJobsStorage")
38+
def eventhub_trigger(event: func.EventHubEvent) -> bytes:
39+
return event.get_body()
40+
41+
42+
# Retrieve the event data from storage blob and return it as Http response
43+
@app.function_name(name="get_eventhub_triggered")
44+
@app.generic_trigger(arg_name="req",
45+
type="httpTrigger",
46+
route="get_eventhub_triggered")
47+
@app.generic_output_binding(arg_name="$return", type="http")
48+
@app.generic_input_binding(
49+
arg_name="file",
50+
type="blob",
51+
path="python-worker-tests/test-eventhub-triggered.txt",
52+
connection="AzureWebJobsStorage")
53+
def get_eventhub_triggered(req: func.HttpRequest,
54+
file: func.InputStream) -> str:
55+
return file.read().decode('utf-8')
56+
57+
58+
# Retrieve the event data from storage blob and return it as Http response
59+
@app.function_name(name="get_metadata_triggered")
60+
@app.generic_trigger(arg_name="req", type="httpTrigger",
61+
route="get_metadata_triggered")
62+
@app.generic_output_binding(arg_name="$return", type="http")
63+
@app.generic_input_binding(arg_name="file",
64+
type="blob",
65+
path="python-worker-tests/test-metadata-triggered"
66+
".txt",
67+
connection="AzureWebJobsStorage")
68+
async def get_metadata_triggered(req: func.HttpRequest,
69+
file: func.InputStream) -> str:
70+
return func.HttpResponse(body=file.read().decode('utf-8'),
71+
status_code=200,
72+
mimetype='application/json')
73+
74+
75+
# An HttpTrigger to generating EventHub event from azure-eventhub SDK.
76+
# Events generated from azure-eventhub contain the full metadata.
77+
@app.function_name(name="metadata_output")
78+
@app.generic_trigger(arg_name="req",
79+
type="httpTrigger",
80+
route="metadata_output")
81+
@app.generic_output_binding(arg_name="$return", type="http")
82+
async def metadata_output(req: func.HttpRequest):
83+
# Parse event metadata from http request
84+
json_string = req.get_body().decode('utf-8')
85+
event_dict = json.loads(json_string)
86+
87+
# Create an EventHub Client and event batch
88+
client = EventHubProducerClient.from_connection_string(
89+
os.getenv('AzureWebJobsEventHubConnectionString'),
90+
eventhub_name='python-worker-ci-eventhub-one-metadata')
91+
92+
# Generate new event based on http request with full metadata
93+
event_data_batch = await client.create_batch()
94+
event_data_batch.add(EventData(event_dict.get('body')))
95+
96+
# Send out event into event hub
97+
try:
98+
await client.send_batch(event_data_batch)
99+
finally:
100+
await client.close()
101+
102+
return 'OK'
103+
104+
105+
@app.function_name(name="metadata_trigger")
106+
@app.generic_trigger(
107+
arg_name="event",
108+
type="eventHubTrigger",
109+
event_hub_name="python-worker-ci-eventhub-one-metadata",
110+
connection="AzureWebJobsEventHubConnectionString")
111+
@app.generic_output_binding(
112+
arg_name="$return",
113+
type="blob",
114+
path="python-worker-tests/test-metadata-triggered.txt",
115+
connection="AzureWebJobsStorage")
116+
async def metadata_trigger(event: func.EventHubEvent) -> bytes:
117+
event_dict: typing.Mapping[str, typing.Any] = {
118+
'body': event.get_body().decode('utf-8'),
119+
# Uncomment this when the EnqueuedTimeUtc is fixed in azure-functions
120+
# 'enqueued_time': event.enqueued_time.isoformat(),
121+
'partition_key': event.partition_key,
122+
'sequence_number': event.sequence_number,
123+
'offset': event.offset,
124+
'metadata': event.metadata
125+
}
126+
127+
return json.dumps(event_dict)

0 commit comments

Comments
 (0)