Skip to content

Kafka extension implementation #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions azure/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ._http import HttpRequest # NoQA
from ._http import HttpResponse # NoQA
from ._http_wsgi import WsgiMiddleware # NoQA
from .kafka import KafkaEvent, KafkaConverter, KafkaTriggerConverter # NoQA
from ._queue import QueueMessage # NoQA
from ._servicebus import ServiceBusMessage # NoQA
from ._durable_functions import OrchestrationContext # NoQA
Expand All @@ -15,6 +16,7 @@
from . import eventgrid # NoQA
from . import eventhub # NoQA
from . import http # NoQA
from . import kafka # NoQA
from . import queue # NoQA
from . import servicebus # NoQA
from . import timer # NoQA
Expand All @@ -37,6 +39,9 @@
'HttpRequest',
'HttpResponse',
'InputStream',
'KafkaEvent',
'KafkaConverter',
'KafkaTriggerConverter',
'OrchestrationContext',
'QueueMessage',
'ServiceBusMessage',
Expand Down
34 changes: 34 additions & 0 deletions azure/functions/_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import abc
import typing


class AbstractKafkaEvent(abc.ABC):

@abc.abstractmethod
def get_body(self) -> bytes:
pass

@property
@abc.abstractmethod
def key(self) -> typing.Optional[str]:
pass

@property
@abc.abstractmethod
def offset(self) -> typing.Optional[int]:
pass

@property
@abc.abstractmethod
def partition(self) -> typing.Optional[int]:
pass

@property
@abc.abstractmethod
def topic(self) -> typing.Optional[str]:
pass

@property
@abc.abstractmethod
def timestamp(self) -> typing.Optional[str]:
pass
278 changes: 278 additions & 0 deletions azure/functions/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
import typing
import json

from typing import Any, List

from . import meta

from ._kafka import AbstractKafkaEvent


class KafkaEvent(AbstractKafkaEvent):
"""A concrete implementation of Kafka event message type."""

def __init__(self, *,
body: bytes,
trigger_metadata: typing.Mapping[str, meta.Datum] = None,
key: typing.Optional[str] = None,
offset: typing.Optional[int] = None,
partition: typing.Optional[int] = None,
topic: typing.Optional[str] = None,
timestamp: typing.Optional[str] = None) -> None:
self.__body = body
self.__trigger_metadata = trigger_metadata
self.__key = key
self.__offset = offset
self.__partition = partition
self.__topic = topic
self.__timestamp = timestamp

# Cache for trigger metadata after Python object conversion
self._trigger_metadata_pyobj: typing.Optional[
typing.Mapping[str, typing.Any]] = None

def get_body(self) -> bytes:
return self.__body

@property
def key(self) -> typing.Optional[str]:
return self.__key

@property
def offset(self) -> typing.Optional[int]:
return self.__offset

@property
def partition(self) -> typing.Optional[int]:
return self.__partition

@property
def topic(self) -> typing.Optional[str]:
return self.__topic

@property
def timestamp(self) -> typing.Optional[str]:
return self.__timestamp

@property
def metadata(self) -> typing.Optional[typing.Mapping[str, typing.Any]]:
if self.__trigger_metadata is None:
return None

if self._trigger_metadata_pyobj is None:
self._trigger_metadata_pyobj = {}

for k, v in self.__trigger_metadata.items():
self._trigger_metadata_pyobj[k] = v.value

return self._trigger_metadata_pyobj

def __repr__(self) -> str:
return (
f'<azure.KafkaEvent '
f'key={self.key} '
f'partition={self.offset} '
f'offset={self.offset} '
f'topic={self.topic} '
f'timestamp={self.timestamp} '
f'at 0x{id(self):0x}>'
)


class KafkaConverter(meta.InConverter, meta.OutConverter, binding='kafka'):
@classmethod
def check_input_type_annotation(cls, pytype) -> bool:
valid_types = (KafkaEvent)

return (
meta.is_iterable_type_annotation(pytype, valid_types)
or (isinstance(pytype, type) and issubclass(pytype, valid_types))
)
return issubclass(pytype, KafkaEvent)

@classmethod
def check_output_type_annotation(cls, pytype) -> bool:
valid_types = (str, bytes)
return (
meta.is_iterable_type_annotation(pytype, str)
or (isinstance(pytype, type) and issubclass(pytype, valid_types))
)

@classmethod
def decode(
cls, data: meta.Datum, *, trigger_metadata
) -> typing.Union[KafkaEvent, typing.List[KafkaEvent]]:
data_type = data.type

if (data_type == 'string' or data_type == 'bytes'
or data_type == 'json'):
return cls.decode_single_event(data, trigger_metadata)

elif (data_type == 'collection_bytes'
or data_type == 'collection_string'):
return cls.decode_multiple_events(data, trigger_metadata)

else:
raise NotImplementedError(
f'unsupported event data payload type: {data_type}')

@classmethod
def decode_single_event(cls, data: meta.Datum,
trigger_metadata) -> KafkaEvent:
data_type = data.type

if data_type == 'string':
body = data.value.encode('utf-8')

elif data_type == 'bytes':
body = data.value

elif data_type == 'json':
body = data.value.encode('utf-8')

else:
raise NotImplementedError(
f'unsupported event data payload type: {data_type}')

return KafkaEvent(body=body)

@classmethod
def decode_multiple_events(cls, data: meta.Datum,
trigger_metadata) -> typing.List[KafkaEvent]:
if data.type == 'collection_bytes':
parsed_data = data.value.bytes

elif data.type == 'collection_string':
parsed_data = data.value.string

events = [KafkaEvent(body=pd) for pd in parsed_data]

return events

@classmethod
def encode(cls, obj: typing.Any, *,
expected_type: typing.Optional[type]) -> meta.Datum:
raise NotImplementedError('Output bindings are not '
'supported for Kafka')


class KafkaTriggerConverter(KafkaConverter,
binding='kafkaTrigger', trigger=True):

@classmethod
def decode(
cls, data: meta.Datum, *, trigger_metadata
) -> typing.Union[KafkaEvent, typing.List[KafkaEvent]]:

data_type = data.type

if (data_type == 'string' or data_type == 'bytes'
or data_type == 'json'):
return cls.decode_single_event(data, trigger_metadata)
elif (data_type == 'collection_bytes'
or data_type == 'collection_string'):
return cls.decode_multiple_events(data, trigger_metadata)
else:
raise NotImplementedError(
f'unsupported event data payload type: {data_type}')

@classmethod
def decode_single_event(cls, data: meta.Datum,
trigger_metadata) -> KafkaEvent:
data_type = data.type

if data_type == 'string':
body = data.value.encode('utf-8')

elif data_type == 'bytes':
body = data.value

elif data_type == 'json':
body = data.value.encode('utf-8')

else:
raise NotImplementedError(
f'unsupported event data payload type: {data_type}')

return KafkaEvent(
body=body,
timestamp=cls._decode_trigger_metadata_field(
trigger_metadata, 'Timestamp', python_type=str),
key=cls._decode_trigger_metadata_field(
trigger_metadata, 'Key', python_type=str),
partition=cls._decode_trigger_metadata_field(
trigger_metadata, 'Partition', python_type=int),
offset=cls._decode_trigger_metadata_field(
trigger_metadata, 'Offset', python_type=int),
topic=cls._decode_trigger_metadata_field(
trigger_metadata, 'Topic', python_type=str),
trigger_metadata=trigger_metadata
)

@classmethod
def decode_multiple_events(cls, data: meta.Datum,
trigger_metadata) -> typing.List[KafkaEvent]:
if data.type == 'collection_bytes':
parsed_data = data.value.bytes

elif data.type == 'collection_string':
parsed_data = data.value.string

timestamp_props = trigger_metadata.get('TimestampArray')
key_props = trigger_metadata.get('KeyArray')
partition_props = trigger_metadata.get('PartitionArray')
offset_props = trigger_metadata.get('OffsetArray')
topic_props = trigger_metadata.get('TopicArray')

parsed_timestamp_props: List[Any] = cls.get_parsed_props(
timestamp_props, parsed_data)

parsed_key_props = cls.get_parsed_props(
key_props, parsed_data)

parsed_partition_props = cls.get_parsed_props(
partition_props, parsed_data)

parsed_offset_props: List[Any] = []
if offset_props is not None:
parsed_offset_props = [v for v in offset_props.value.sint64]
if len(parsed_offset_props) != len(parsed_data):
raise AssertionError(
'Number of bodies and metadata mismatched')

parsed_topic_props: List[Any]
if topic_props is not None:
parsed_topic_props = [v for v in topic_props.value.string]

events = []

for i in range(len(parsed_data)):
event = KafkaEvent(
body=parsed_data[i],
timestamp=parsed_timestamp_props[i],
key=cls._decode_typed_data(
parsed_key_props[i], python_type=str),
partition=parsed_partition_props[i],
offset=parsed_offset_props[i],
topic=parsed_topic_props[i],
trigger_metadata=trigger_metadata
)
events.append(event)

return events

@classmethod
def encode(cls, obj: typing.Any, *,
expected_type: typing.Optional[type]) -> meta.Datum:
raise NotImplementedError('Output bindings are not '
'supported for Kafka')

@classmethod
def get_parsed_props(
cls, props: meta.Datum, parsed_data) -> List[Any]:
parsed_props: List[Any] = []
if props is not None:
parsed_props = json.loads(props.value)
if len(parsed_data) != len(parsed_props):
raise AssertionError('Number of bodies and metadata mismatched')
return parsed_props
Loading