Skip to content

add EventHubBatch E2E pystein tests. #1052

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import os
import typing
import azure.functions as func
from azure.eventhub import EventHubProducerClient, EventData

app = func.FunctionApp()


# This is an actual EventHub trigger which handles Eventhub events in batches.
# It serializes multiple event data into a json and store it into a blob.
@app.function_name(name="eventhub_multiple")
@app.event_hub_message_trigger(
arg_name="events",
event_hub_name="python-worker-ci-eventhub-batch",
connection="AzureWebJobsEventHubConnectionString",
data_type="string",
cardinality="many")
@app.write_table(arg_name="$return",
connection="AzureWebJobsStorage",
table_name="EventHubBatchTest")
def eventhub_multiple(events):
table_entries = []
for event in events:
json_entry = event.get_body()
table_entry = json.loads(json_entry)
table_entries.append(table_entry)

table_json = json.dumps(table_entries)

return table_json


# An HttpTrigger to generating EventHub event from EventHub Output Binding
@app.function_name(name="eventhub_output_batch")
@app.write_event_hub_message(arg_name="$return",
connection="AzureWebJobsEventHubConnectionString",
event_hub_name="python-worker-ci-eventhub-batch")
@app.route(route="eventhub_output_batch", binding_arg_name="out")
def eventhub_output_batch(req: func.HttpRequest, out: func.Out[str]) -> str:
events = req.get_body().decode('utf-8')
out.set('hello')
return events


# Retrieve the event data from storage blob and return it as Http response
@app.function_name(name="get_eventhub_batch_triggered")
@app.route(route="get_eventhub_batch_triggered/{id}")
@app.read_table(arg_name="testEntities",
connection="AzureWebJobsStorage",
table_name="EventHubBatchTest",
partition_key="{id}")
def get_eventhub_batch_triggered(req: func.HttpRequest, testEntities):
return func.HttpResponse(status_code=200, body=testEntities)


# Retrieve the event data from storage blob and return it as Http response
@app.function_name(name="get_metadata_batch_triggered")
@app.route(route="get_metadata_batch_triggered")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-metadata-batch-triggered.txt",
connection="AzureWebJobsStorage")
def get_metadata_batch_triggered(req: func.HttpRequest,
file: func.InputStream) -> str:
return func.HttpResponse(body=file.read().decode('utf-8'),
status_code=200,
mimetype='application/json')


# This is an actual EventHub trigger which handles Eventhub events in batches.
# It serializes multiple event data into a json and store it into a blob.
@app.function_name(name="metadata_multiple")
@app.event_hub_message_trigger(
arg_name="events",
event_hub_name="python-worker-ci-eventhub-batch-metadata",
connection="AzureWebJobsEventHubConnectionString",
data_type="binary",
cardinality="many")
@app.write_blob(arg_name="$return",
path="python-worker-tests/test-metadata-batch-triggered.txt",
connection="AzureWebJobsStorage")
def metadata_multiple(events: typing.List[func.EventHubEvent]) -> bytes:
event_list = []
for event in events:
event_dict: typing.Mapping[str, typing.Any] = {
'body': event.get_body().decode('utf-8'),
'enqueued_time': event.enqueued_time.isoformat(),
'partition_key': event.partition_key,
'sequence_number': event.sequence_number,
'offset': event.offset,
'metadata': event.metadata
}
event_list.append(event_dict)

return json.dumps(event_list)


# An HttpTrigger to generating EventHub event from azure-eventhub SDK.
# Events generated from azure-eventhub contain the full metadata.
@app.function_name(name="metadata_output_batch")
@app.route(route="metadata_output_batch")
def main(req: func.HttpRequest):
# Get event count from http request query parameter
count = int(req.params.get('count', '1'))

# Parse event metadata from http request
json_string = req.get_body().decode('utf-8')
event_dict = json.loads(json_string)

# Create an EventHub Client and event batch
client = EventHubProducerClient.from_connection_string(
os.getenv('AzureWebJobsEventHubConnectionString'),
eventhub_name='python-worker-ci-eventhub-batch-metadata')

# Generate new event based on http request with full metadata
event_data_batch = client.create_batch()
random_number = int(event_dict.get('body', '0'))
for i in range(count):
event_data_batch.add(EventData(str(random_number + i)))

# Send out event into event hub
with client:
client.send_batch(event_data_batch)

return 'OK'
105 changes: 105 additions & 0 deletions tests/endtoend/test_eventhub_batch_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,108 @@ def _get_table_function_json_path(self):
script_dir = pathlib.Path(self.get_script_dir())
json_path = pathlib.Path('get_eventhub_batch_triggered/function.json')
return testutils.TESTS_ROOT / script_dir / json_path


class TestEventHubBatchFunctionsStein(testutils.WebHostTestCase):

@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'eventhub_batch_functions' / \
'eventhub_batch_functions_stein'

@testutils.retryable_test(3, 5)
def test_eventhub_multiple(self):
NUM_EVENTS = 3
all_row_keys_seen = dict([(str(i), True) for i in range(NUM_EVENTS)])
partition_key = str(round(time.time()))

docs = []
for i in range(NUM_EVENTS):
doc = {'PartitionKey': partition_key, 'RowKey': i}
docs.append(doc)

r = self.webhost.request('POST', 'eventhub_output_batch',
data=json.dumps(docs))
self.assertEqual(r.status_code, 200)

row_keys = [str(i) for i in range(NUM_EVENTS)]
seen = [False] * NUM_EVENTS
row_keys_seen = dict(zip(row_keys, seen))

# Allow trigger to fire.
time.sleep(5)

r = self.webhost.request(
'GET',
f'get_eventhub_batch_triggered/{partition_key}')
self.assertEqual(r.status_code, 200)
entries = r.json()
for entry in entries:
self.assertEqual(entry['PartitionKey'], partition_key)
row_key = entry['RowKey']
row_keys_seen[row_key] = True

self.assertDictEqual(all_row_keys_seen, row_keys_seen)

@testutils.retryable_test(3, 5)
def test_eventhub_multiple_with_metadata(self):
# Generate a unique event body for EventHub event
# Record the start_time and end_time for checking event enqueue time
start_time = datetime.now(tz=tz.UTC)
count = 10
random_number = str(round(time.time()) % 1000)
req_body = {
'body': random_number
}

# Invoke metadata_output HttpTrigger to generate an EventHub event
# from azure-eventhub SDK
r = self.webhost.request('POST',
f'metadata_output_batch?count={count}',
data=json.dumps(req_body))
self.assertEqual(r.status_code, 200)
self.assertIn('OK', r.text)
end_time = datetime.now(tz=tz.UTC)

# Once the event get generated, allow function host to pool from
# EventHub and wait for metadata_multiple to execute,
# converting the event metadata into a blob.
time.sleep(5)

# Call get_metadata_batch_triggered to retrieve event metadata
r = self.webhost.request('GET', 'get_metadata_batch_triggered')
self.assertEqual(r.status_code, 200)

# Check metadata and events length, events should be batched processed
events = r.json()
self.assertIsInstance(events, list)
self.assertGreater(len(events), 1)

# EventhubEvent property check
for event_index in range(len(events)):
event = events[event_index]

# Check if the event is enqueued between start_time and end_time
enqueued_time = parser.isoparse(event['enqueued_time']).astimezone(
tz=tz.UTC)
self.assertTrue(start_time < enqueued_time < end_time)

# Check if event properties are properly set
self.assertIsNone(event['partition_key']) # only 1 partition
self.assertGreaterEqual(event['sequence_number'], 0)
self.assertIsNotNone(event['offset'])

# Check if event.metadata field is properly set
self.assertIsNotNone(event['metadata'])
metadata = event['metadata']
sys_props_array = metadata['SystemPropertiesArray']
sys_props = sys_props_array[event_index]
enqueued_time = parser.isoparse(sys_props['EnqueuedTimeUtc'])

# Check event trigger time and other system properties
self.assertTrue(
start_time.timestamp() < enqueued_time.timestamp()
< end_time.timestamp()) # NoQA
self.assertIsNone(sys_props['PartitionKey'])
self.assertGreaterEqual(sys_props['SequenceNumber'], 0)
self.assertIsNotNone(sys_props['Offset'])