Skip to content

Handle eventhub cardinality #527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/linux_devops_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

set -e -x

python -m pip install -U -e .[dev]
python -m pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple -U -e .[dev]
python setup.py webhost
2 changes: 1 addition & 1 deletion azure-pipelines-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
displayName: 'Install dotnet'
- bash: |
set -e -x
python -m pip install -U -e .[dev]
python -m pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple -U -e .[dev]
python setup.py webhost
displayName: 'Build'
- bash: |
Expand Down
6 changes: 6 additions & 0 deletions azure_functions_worker/bindings/datumdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ def from_typed_data(cls, td: protos.TypedData):
val = td.bytes
elif tt == 'json':
val = td.json
elif tt == 'collection_bytes':
val = td.collection_bytes
elif tt == 'collection_string':
val = td.collection_string
elif tt == 'collection_sint64':
val = td.collection_sint64
elif tt is None:
return None
else:
Expand Down
1 change: 1 addition & 0 deletions azure_functions_worker/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Capabilities
RAW_HTTP_BODY_BYTES = "RawHttpBodyBytes"
TYPED_DATA_COLLECTION = "TypedDataCollection"
1 change: 1 addition & 0 deletions azure_functions_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ async def _handle__worker_init_request(self, req):

capabilities = dict()
capabilities[constants.RAW_HTTP_BODY_BYTES] = "true"
capabilities[constants.TYPED_DATA_COLLECTION] = "true"

return protos.StreamingMessage(
request_id=self.request_id,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def run(self):
],
extras_require={
'dev': [
'azure-functions==1.0.0b5',
'azure-functions==1.0.3',
'flake8~=3.5.0',
'mypy',
'pytest',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import json


def main(events):
table_entries = []
for event in events:
json_entry = event.get_body()
table_entry = json.loads(json_entry)
table_entries.append(table_entry)

table_json = json.dumps(table_entries)

return table_json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"scriptFile": "__init__.py",

"bindings": [
{
"type": "eventHubTrigger",
"name": "events",
"direction": "in",
"cardinality": "many",
"dataType": "string",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not specify dataType. String is default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are failing on a JSON deserialization step when dataType is not specified as string. The SDK must be assuming that we are writing a JSON deserializable string by default. I was able to repro in Node. Will open an issue against the SDK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue on the EventHub extension: Azure/azure-functions-eventhubs-extension#17.

"eventHubName": "python-worker-ci-eventhub-batch",
"connection": "AzureWebJobsEventHubConnectionString"
},
{
"direction": "out",
"type": "table",
"name": "$return",
"tableName": "EventHubBatchTest",
"connection": "AzureWebJobsStorage"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import azure.functions as func


def main(req: func.HttpRequest) -> str:
events = req.get_body().decode('utf-8')
return events
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"scriptFile": "__init__.py",

"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "eventHub",
"name": "$return",
"direction": "out",
"eventHubName": "python-worker-ci-eventhub-batch",
"connection": "AzureWebJobsEventHubConnectionString"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import azure.functions as func


def main(req: func.HttpRequest, testEntities):
return func.HttpResponse(status_code=200, body=testEntities)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"authLevel": "anonymous",
"methods": [
"get"
],
"name": "req"
},
{
"direction": "in",
"type": "table",
"name": "testEntities",
"partitionKey": "WillBePopulated",
"tableName": "EventHubBatchTest",
"connection": "AzureWebJobsStorage"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"type": "eventHubTrigger",
"name": "event",
"direction": "in",
"eventHubName": "python-worker-ci",
"eventHubName": "python-worker-eventhub-ci-linux",
"connection": "AzureWebJobsEventHubConnectionString"
},
{
Expand Down
94 changes: 94 additions & 0 deletions tests/endtoend/test_eventhub_batch_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import json
import time
import pathlib

from azure_functions_worker import testutils


class TestEventHubFunctions(testutils.WebHostTestCase):

@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'eventhub_batch_functions'

def test_eventhub_multiple(self):
NUM_EVENTS = 3
all_row_keys_seen = dict([(str(i), True) for i in range(NUM_EVENTS)])
partition_key = str(round(time.time()))

# Dynamically rewrite function.json to point to new partition key
# for recording EventHub state
old_partition_key = self._get_table_partition_key()
self._set_table_partition_key(partition_key)

# wait for host to restart after change
time.sleep(1)

docs = []
for i in range(NUM_EVENTS):
doc = {'PartitionKey': partition_key, 'RowKey': i}
docs.append(doc)

r = self.webhost.request('POST', 'eventhub_output_batch',
data=json.dumps(docs))
self.assertEqual(r.status_code, 200)

max_retries = 30

row_keys = [str(i) for i in range(NUM_EVENTS)]
seen = [False] * NUM_EVENTS
row_keys_seen = dict(zip(row_keys, seen))
for try_no in range(max_retries):
# Allow trigger to fire.
time.sleep(2)

try:
r = self.webhost.request('GET',
'get_eventhub_batch_triggered')
self.assertEqual(r.status_code, 200)
entries = r.json()
for entry in entries:
self.assertEqual(entry['PartitionKey'], partition_key)
row_key = entry['RowKey']
row_keys_seen[row_key] = True

self.assertDictEqual(all_row_keys_seen, row_keys_seen)

except AssertionError as e:
if try_no == max_retries - 1:
self._cleanup(old_partition_key)
raise
else:
break

self._cleanup(old_partition_key)

def _cleanup(self, old_partition_key):
self._set_table_partition_key(old_partition_key)

def _get_table_partition_key(self):
func_dict = self._get_table_function_json_dict()
partition_key = func_dict['bindings'][1]['partitionKey']
return partition_key

def _set_table_partition_key(self, partition_key):
full_json_path = self._get_table_function_json_path()

func_dict = self._get_table_function_json_dict()
func_dict['bindings'][1]['partitionKey'] = partition_key

with open(full_json_path, 'w') as f:
json.dump(func_dict, f, indent=2)

def _get_table_function_json_dict(self):
full_json_path = self._get_table_function_json_path()

with open(full_json_path, 'r') as f:
func_dict = json.load(f)

return func_dict

def _get_table_function_json_path(self):
script_dir = pathlib.Path(self.get_script_dir())
json_path = pathlib.Path('get_eventhub_batch_triggered/function.json')
return testutils.TESTS_ROOT / script_dir / json_path