Skip to content

Commit 040358d

Browse files
committed
support kinesis response
1 parent e51f6ff commit 040358d

File tree

4 files changed

+192
-9
lines changed

4 files changed

+192
-9
lines changed

aws_lambda_powertools/utilities/data_classes/__init__.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,17 @@
1515
from .event_source import event_source
1616
from .kafka_event import KafkaEvent
1717
from .kinesis_firehose_event import KinesisFirehoseEvent
18+
from .kinesis_firehose_response import (
19+
FirehoseStateDropped,
20+
FirehoseStateFailed,
21+
FirehoseStateOk,
22+
KinesisFirehoseResponce,
23+
KinesisFirehoseResponceFactory,
24+
KinesisFirehoseResponceRecord,
25+
KinesisFirehoseResponceRecordFactory,
26+
KinesisFirehoseResponseRecordMetadata,
27+
KinesisFirehoseResponseRecordMetadataFactory,
28+
)
1829
from .kinesis_stream_event import KinesisStreamEvent
1930
from .lambda_function_url_event import LambdaFunctionUrlEvent
2031
from .s3_event import S3Event, S3EventBridgeNotificationEvent
@@ -37,6 +48,15 @@
3748
"KafkaEvent",
3849
"KinesisFirehoseEvent",
3950
"KinesisStreamEvent",
51+
"KinesisFirehoseResponce",
52+
"KinesisFirehoseResponceRecord",
53+
"KinesisFirehoseResponseRecordMetadata",
54+
"FirehoseStateOk",
55+
"FirehoseStateDropped",
56+
"FirehoseStateFailed",
57+
"KinesisFirehoseResponceFactory",
58+
"KinesisFirehoseResponceRecordFactory",
59+
"KinesisFirehoseResponseRecordMetadataFactory",
4060
"LambdaFunctionUrlEvent",
4161
"S3Event",
4262
"S3EventBridgeNotificationEvent",
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
from __future__ import annotations
2+
3+
import base64
4+
from typing import Callable, Iterator, List, Optional, Union
5+
6+
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
7+
8+
FirehoseStateOk = "Ok"
9+
FirehoseStateDropped = "Dropped"
10+
FirehoseStateFailed = "ProcessingFailed"
11+
12+
13+
class KinesisFirehoseResponseRecordMetadata(DictWrapper):
14+
"""
15+
Documentation:
16+
--------------
17+
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
18+
"""
19+
20+
@property
21+
def _metadata(self) -> Optional[dict]:
22+
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
23+
return self["metadata"] # could raise KeyError
24+
25+
@property
26+
def partition_keys(self) -> Optional[dict[str, str]]:
27+
"""Kinesis stream partition key; present only when Kinesis Stream is source"""
28+
return self._metadata["partitionKeys"]
29+
30+
31+
def KinesisFirehoseResponseRecordMetadataFactory(
32+
partition_keys: dict[str, str],
33+
json_deserializer: Optional[Callable] = None,
34+
) -> KinesisFirehoseResponseRecordMetadata:
35+
data = {
36+
"metadata": {
37+
"partitionKeys": partition_keys,
38+
},
39+
}
40+
return KinesisFirehoseResponseRecordMetadata(data=data, json_deserializer=json_deserializer)
41+
42+
43+
class KinesisFirehoseResponceRecord(DictWrapper):
44+
"""Record in Kinesis Data Firehose event
45+
46+
Documentation:
47+
--------------
48+
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
49+
"""
50+
51+
@property
52+
def record_id(self) -> str:
53+
"""Record ID; uniquely identifies this record within the current batch"""
54+
return self["recordId"]
55+
56+
@property
57+
def result(self) -> Union[FirehoseStateOk, FirehoseStateDropped, FirehoseStateFailed]:
58+
"""processing result, supported value: Ok, Dropped, ProcessingFailed"""
59+
return self["result"]
60+
61+
@property
62+
def data(self) -> str:
63+
"""The data blob, base64-encoded"""
64+
return self["data"]
65+
66+
@property
67+
def metadata(self) -> Optional[KinesisFirehoseResponseRecordMetadata]:
68+
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
69+
return KinesisFirehoseResponseRecordMetadata(self._data) if self.get("metadata") else None
70+
71+
@property
72+
def data_as_bytes(self) -> bytes:
73+
"""Decoded base64-encoded data as bytes"""
74+
return base64.b64decode(self.data)
75+
76+
@property
77+
def data_as_text(self) -> str:
78+
"""Decoded base64-encoded data as text"""
79+
return self.data_as_bytes.decode("utf-8")
80+
81+
@property
82+
def data_as_json(self) -> dict:
83+
"""Decoded base64-encoded data loaded to json"""
84+
if self._json_data is None:
85+
self._json_data = self._json_deserializer(self.data_as_text)
86+
return self._json_data
87+
88+
89+
def KinesisFirehoseResponceRecordFactory(
90+
record_id: str,
91+
result: Union[FirehoseStateOk, FirehoseStateDropped, FirehoseStateFailed],
92+
data: str,
93+
metadata: Optional[KinesisFirehoseResponseRecordMetadata] = None,
94+
json_deserializer: Optional[Callable] = None,
95+
) -> KinesisFirehoseResponceRecord:
96+
pass_data = {
97+
"recordId": record_id,
98+
"result": result,
99+
"data": base64.b64encode(data.encode("utf-8")).decode("utf-8"),
100+
}
101+
if metadata:
102+
data["metadata"] = metadata
103+
return KinesisFirehoseResponceRecord(data=pass_data, json_deserializer=json_deserializer)
104+
105+
106+
class KinesisFirehoseResponce(DictWrapper):
107+
"""Kinesis Data Firehose event
108+
109+
Documentation:
110+
--------------
111+
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
112+
"""
113+
114+
@property
115+
def records(self) -> Iterator[KinesisFirehoseResponceRecord]:
116+
for record in self["records"]:
117+
yield KinesisFirehoseResponceRecord(data=record, json_deserializer=self._json_deserializer)
118+
119+
120+
def KinesisFirehoseResponceFactory(
121+
records: List[KinesisFirehoseResponceRecord],
122+
json_deserializer: Optional[Callable] = None,
123+
) -> KinesisFirehoseResponce:
124+
pass_data = {"records": records}
125+
return KinesisFirehoseResponce(data=pass_data, json_deserializer=json_deserializer)
Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,30 @@
1-
import base64
21
import json
32

43
from aws_lambda_powertools.utilities.data_classes import (
4+
FirehoseStateOk,
55
KinesisFirehoseEvent,
6+
KinesisFirehoseResponce,
7+
KinesisFirehoseResponceRecordFactory,
68
event_source,
79
)
810
from aws_lambda_powertools.utilities.typing import LambdaContext
911

1012

1113
@event_source(data_class=KinesisFirehoseEvent)
1214
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
13-
result = []
15+
result = KinesisFirehoseResponce({})
1416

1517
for record in event.records:
1618
# if data was delivered as json; caches loaded value
1719
data = record.data_as_json
1820

19-
processed_record = {
20-
"recordId": record.record_id,
21-
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
22-
"result": "Ok",
23-
}
21+
processed_record = KinesisFirehoseResponceRecordFactory(
22+
record_id=record.record_id,
23+
result=FirehoseStateOk,
24+
data=(json.dumps(data)),
25+
)
2426

25-
result.append(processed_record)
27+
result.add_record(processed_record)
2628

2729
# return transformed records
28-
return {"records": result}
30+
return result
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from aws_lambda_powertools.utilities.data_classes import (
2+
FirehoseStateOk,
3+
KinesisFirehoseEvent,
4+
KinesisFirehoseResponceFactory,
5+
KinesisFirehoseResponceRecordFactory,
6+
)
7+
from tests.functional.utils import load_event
8+
9+
10+
def test_kinesis_firehose_response():
11+
raw_event = load_event("kinesisFirehoseKinesisEvent.json")
12+
parsed_event = KinesisFirehoseEvent(raw_event)
13+
14+
result = []
15+
for record in parsed_event.records:
16+
# if data was delivered as json; caches loaded value
17+
data = record.data_as_text
18+
19+
processed_record = KinesisFirehoseResponceRecordFactory(
20+
record_id=record.record_id,
21+
result=FirehoseStateOk,
22+
data=(data),
23+
)
24+
25+
result.append(processed_record)
26+
response = KinesisFirehoseResponceFactory(result)
27+
28+
res_records = list(response.records)
29+
assert len(res_records) == 2
30+
record_01, record_02 = res_records[:]
31+
record01_raw = raw_event["records"][0]
32+
assert record_01.result == FirehoseStateOk
33+
assert record_01.record_id == record01_raw["recordId"]
34+
assert record_01.data_as_bytes == b"Hello World"
35+
assert record_01.data_as_text == "Hello World"
36+
assert record_01.data == record01_raw["data"]

0 commit comments

Comments
 (0)