Skip to content

Commit 0e7819b

Browse files
ran-isenbergleandrodamascenaheitorlessaRelease botpeterschutt
authored
feat(parser): add KafkaMskEventModel and KafkaSelfManagedEventModel (#1499)
Co-authored-by: Leandro Damascena <[email protected]> Co-authored-by: Heitor Lessa <[email protected]> Co-authored-by: heitorlessa <[email protected]> Co-authored-by: Release bot <[email protected]> Co-authored-by: Peter Schutt <[email protected]> Co-authored-by: Ran Isenberg <[email protected]> Co-authored-by: Ran Isenberg <[email protected]>
1 parent 217bd6c commit 0e7819b

File tree

12 files changed

+242
-5
lines changed

12 files changed

+242
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2393,4 +2393,4 @@
23932393
[v1.0.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v1.0.0...v1.0.1
23942394
[v1.0.0]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.11.0...v1.0.0
23952395
[v0.11.0]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.1...v0.11.0
2396-
[v0.10.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.0...v0.10.1
2396+
[v0.10.1]: https://github.com/awslabs/aws-lambda-powertools-python/compare/v0.10.0...v0.10.1

aws_lambda_powertools/shared/functions.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
import base64
2+
import logging
3+
from binascii import Error as BinAsciiError
14
from typing import Optional, Union
25

6+
logger = logging.getLogger(__name__)
7+
38

49
def strtobool(value: str) -> bool:
510
"""Convert a string representation of truth to True or False.
@@ -58,3 +63,18 @@ def resolve_env_var_choice(
5863
resolved choice as either bool or environment value
5964
"""
6065
return choice if choice is not None else env
66+
67+
68+
def base64_decode(value: str) -> bytes:
69+
try:
70+
logger.debug("Decoding base64 Kafka record item before parsing")
71+
return base64.b64decode(value)
72+
except (BinAsciiError, TypeError):
73+
raise ValueError("base64 decode failed")
74+
75+
76+
def bytes_to_string(value: bytes) -> str:
77+
try:
78+
return value.decode("utf-8")
79+
except (BinAsciiError, TypeError):
80+
raise ValueError("base64 UTF-8 decode failed")

aws_lambda_powertools/utilities/data_classes/kafka_event.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,11 @@ def get_header_value(
8585

8686

8787
class KafkaEvent(DictWrapper):
88-
"""Self-managed Apache Kafka event trigger
88+
"""Self-managed or MSK Apache Kafka event trigger
8989
Documentation:
9090
--------------
9191
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
92+
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
9293
"""
9394

9495
@property
@@ -98,7 +99,7 @@ def event_source(self) -> str:
9899

99100
@property
100101
def event_source_arn(self) -> Optional[str]:
101-
"""The AWS service ARN from which the Kafka event record originated."""
102+
"""The AWS service ARN from which the Kafka event record originated, mandatory for AWS MSK."""
102103
return self.get("eventSourceArn")
103104

104105
@property

aws_lambda_powertools/utilities/parser/envelopes/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .cloudwatch import CloudWatchLogsEnvelope
55
from .dynamodb import DynamoDBStreamEnvelope
66
from .event_bridge import EventBridgeEnvelope
7+
from .kafka import KafkaEnvelope
78
from .kinesis import KinesisDataStreamEnvelope
89
from .lambda_function_url import LambdaFunctionUrlEnvelope
910
from .sns import SnsEnvelope, SnsSqsEnvelope
@@ -20,5 +21,6 @@
2021
"SnsEnvelope",
2122
"SnsSqsEnvelope",
2223
"SqsEnvelope",
24+
"KafkaEnvelope",
2325
"BaseEnvelope",
2426
]
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import logging
2+
from typing import Any, Dict, List, Optional, Type, Union, cast
3+
4+
from ..models import KafkaMskEventModel, KafkaSelfManagedEventModel
5+
from ..types import Model
6+
from .base import BaseEnvelope
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class KafkaEnvelope(BaseEnvelope):
12+
"""Kafka event envelope to extract data within body key
13+
The record's body parameter is a string, though it can also be a JSON encoded string.
14+
Regardless of its type it'll be parsed into a BaseModel object.
15+
16+
Note: Records will be parsed the same way so if model is str,
17+
all items in the list will be parsed as str and npt as JSON (and vice versa)
18+
"""
19+
20+
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
21+
"""Parses data found with model provided
22+
23+
Parameters
24+
----------
25+
data : Dict
26+
Lambda event to be parsed
27+
model : Type[Model]
28+
Data model provided to parse after extracting data using envelope
29+
30+
Returns
31+
-------
32+
List
33+
List of records parsed with model provided
34+
"""
35+
event_source = cast(dict, data).get("eventSource")
36+
model_parse_event = KafkaMskEventModel if event_source == "aws:kafka" else KafkaSelfManagedEventModel
37+
38+
logger.debug(f"Parsing incoming data with Kafka event model {model_parse_event}")
39+
parsed_envelope = model_parse_event.parse_obj(data)
40+
logger.debug(f"Parsing Kafka event records in `value` with {model}")
41+
ret_list = []
42+
for records in parsed_envelope.records.values():
43+
ret_list += [self._parse(data=record.value, model=model) for record in records]
44+
return ret_list

aws_lambda_powertools/utilities/parser/models/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from .cloudwatch import CloudWatchLogsData, CloudWatchLogsDecode, CloudWatchLogsLogEvent, CloudWatchLogsModel
1818
from .dynamodb import DynamoDBStreamChangedRecordModel, DynamoDBStreamModel, DynamoDBStreamRecordModel
1919
from .event_bridge import EventBridgeModel
20+
from .kafka import KafkaBaseEventModel, KafkaMskEventModel, KafkaRecordModel, KafkaSelfManagedEventModel
2021
from .kinesis import KinesisDataStreamModel, KinesisDataStreamRecord, KinesisDataStreamRecordPayload
2122
from .lambda_function_url import LambdaFunctionUrlModel
2223
from .s3 import S3Model, S3RecordModel
@@ -98,4 +99,8 @@
9899
"APIGatewayEventRequestContext",
99100
"APIGatewayEventAuthorizer",
100101
"APIGatewayEventIdentity",
102+
"KafkaSelfManagedEventModel",
103+
"KafkaRecordModel",
104+
"KafkaMskEventModel",
105+
"KafkaBaseEventModel",
101106
]
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from datetime import datetime
2+
from typing import Dict, List, Type, Union
3+
4+
from pydantic import BaseModel, validator
5+
6+
from aws_lambda_powertools.shared.functions import base64_decode, bytes_to_string
7+
from aws_lambda_powertools.utilities.parser.types import Literal
8+
9+
SERVERS_DELIMITER = ","
10+
11+
12+
class KafkaRecordModel(BaseModel):
13+
topic: str
14+
partition: int
15+
offset: int
16+
timestamp: datetime
17+
timestampType: str
18+
key: bytes
19+
value: Union[str, Type[BaseModel]]
20+
headers: List[Dict[str, bytes]]
21+
22+
# validators
23+
_decode_key = validator("key", allow_reuse=True)(base64_decode)
24+
25+
@validator("value", pre=True, allow_reuse=True)
26+
def data_base64_decode(cls, value):
27+
as_bytes = base64_decode(value)
28+
return bytes_to_string(as_bytes)
29+
30+
@validator("headers", pre=True, allow_reuse=True)
31+
def decode_headers_list(cls, value):
32+
for header in value:
33+
for key, values in header.items():
34+
header[key] = bytes(values)
35+
return value
36+
37+
38+
class KafkaBaseEventModel(BaseModel):
39+
bootstrapServers: List[str]
40+
records: Dict[str, List[KafkaRecordModel]]
41+
42+
@validator("bootstrapServers", pre=True, allow_reuse=True)
43+
def split_servers(cls, value):
44+
return None if not value else value.split(SERVERS_DELIMITER)
45+
46+
47+
class KafkaSelfManagedEventModel(KafkaBaseEventModel):
48+
"""Self-managed Apache Kafka event trigger
49+
Documentation:
50+
--------------
51+
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
52+
"""
53+
54+
eventSource: Literal["aws:SelfManagedKafka"]
55+
56+
57+
class KafkaMskEventModel(KafkaBaseEventModel):
58+
"""Fully-managed AWS Apache Kafka event trigger
59+
Documentation:
60+
--------------
61+
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
62+
"""
63+
64+
eventSource: Literal["aws:kafka"]
65+
eventSourceArn: str

docs/utilities/parser.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ Parser comes with the following built-in models:
168168
| **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway |
169169
| **APIGatewayProxyEventV2Model** | Lambda Event Source payload for Amazon API Gateway v2 payload |
170170
| **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload |
171+
| **KafkaSelfManagedEventModel** | Lambda Event Source payload for self managed Kafka payload |
172+
| **KafkaMskEventModel** | Lambda Event Source payload for AWS MSK payload |
171173

172174
### extending built-in models
173175

@@ -308,6 +310,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return
308310
| **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
309311
| **ApiGatewayV2Envelope** | 1. Parses data using `APIGatewayProxyEventV2Model`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
310312
| **LambdaFunctionUrlEnvelope** | 1. Parses data using `LambdaFunctionUrlModel`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
313+
| **KafkaEnvelope** | 1. Parses data using `KafkaRecordModel`. <br/> 2. Parses `value` key using your model and returns it. | `Model` |
311314

312315
### Bringing your own envelope
313316

tests/events/kafkaEventSelfManaged.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"eventSource":"aws:aws:SelfManagedKafka",
2+
"eventSource":"aws:SelfManagedKafka",
33
"bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
44
"records":{
55
"mytopic-0":[

tests/functional/parser/schemas.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,7 @@ class MyApiGatewayBusiness(BaseModel):
9191
class MyALambdaFuncUrlBusiness(BaseModel):
9292
message: str
9393
username: str
94+
95+
96+
class MyLambdaKafkaBusiness(BaseModel):
97+
key: str

0 commit comments

Comments
 (0)