Skip to content

Commit 81afadb

Browse files
feat(parser): add KinesisFirehoseModel (#1556)
Co-authored-by: Leandro Damascena <[email protected]>
1 parent a5755d4 commit 81afadb

File tree

12 files changed

+262
-12
lines changed

12 files changed

+262
-12
lines changed

aws_lambda_powertools/shared/functions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def resolve_env_var_choice(
7171

7272
def base64_decode(value: str) -> bytes:
7373
try:
74-
logger.debug("Decoding base64 Kafka record item before parsing")
74+
logger.debug("Decoding base64 record item before parsing")
7575
return base64.b64decode(value)
7676
except (BinAsciiError, TypeError):
7777
raise ValueError("base64 decode failed")

aws_lambda_powertools/utilities/parser/envelopes/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from .event_bridge import EventBridgeEnvelope
77
from .kafka import KafkaEnvelope
88
from .kinesis import KinesisDataStreamEnvelope
9+
from .kinesis_firehose import KinesisFirehoseEnvelope
910
from .lambda_function_url import LambdaFunctionUrlEnvelope
1011
from .sns import SnsEnvelope, SnsSqsEnvelope
1112
from .sqs import SqsEnvelope
@@ -17,6 +18,7 @@
1718
"DynamoDBStreamEnvelope",
1819
"EventBridgeEnvelope",
1920
"KinesisDataStreamEnvelope",
21+
"KinesisFirehoseEnvelope",
2022
"LambdaFunctionUrlEnvelope",
2123
"SnsEnvelope",
2224
"SnsSqsEnvelope",

aws_lambda_powertools/utilities/parser/envelopes/kinesis.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class KinesisDataStreamEnvelope(BaseEnvelope):
1616
Regardless of its type it'll be parsed into a BaseModel object.
1717
1818
Note: Records will be parsed the same way so if model is str,
19-
all items in the list will be parsed as str and npt as JSON (and vice versa)
19+
all items in the list will be parsed as str and not as JSON (and vice versa)
2020
"""
2121

2222
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import logging
2+
from typing import Any, Dict, List, Optional, Type, Union, cast
3+
4+
from ..models import KinesisFirehoseModel
5+
from ..types import Model
6+
from .base import BaseEnvelope
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class KinesisFirehoseEnvelope(BaseEnvelope):
12+
"""Kinesis Firehose Envelope to extract array of Records
13+
14+
The record's data parameter is a base64 encoded string which is parsed into a bytes array,
15+
though it can also be a JSON encoded string.
16+
Regardless of its type it'll be parsed into a BaseModel object.
17+
18+
Note: Records will be parsed the same way so if model is str,
19+
all items in the list will be parsed as str and not as JSON (and vice versa)
20+
21+
https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
22+
"""
23+
24+
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
25+
"""Parses records found with model provided
26+
27+
Parameters
28+
----------
29+
data : Dict
30+
Lambda event to be parsed
31+
model : Type[Model]
32+
Data model provided to parse after extracting data using envelope
33+
34+
Returns
35+
-------
36+
List
37+
List of records parsed with model provided
38+
"""
39+
logger.debug(f"Parsing incoming data with Kinesis Firehose model {KinesisFirehoseModel}")
40+
parsed_envelope: KinesisFirehoseModel = KinesisFirehoseModel.parse_obj(data)
41+
logger.debug(f"Parsing Kinesis Firehose records in `body` with {model}")
42+
models = []
43+
for record in parsed_envelope.records:
44+
# We allow either AWS expected contract (bytes) or a custom Model, see #943
45+
data = cast(bytes, record.data)
46+
models.append(self._parse(data=data.decode("utf-8"), model=model))
47+
return models

aws_lambda_powertools/utilities/parser/models/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@
3737
KinesisDataStreamRecord,
3838
KinesisDataStreamRecordPayload,
3939
)
40+
from .kinesis_firehose import (
41+
KinesisFirehoseModel,
42+
KinesisFirehoseRecord,
43+
KinesisFirehoseRecordMetadata,
44+
)
4045
from .lambda_function_url import LambdaFunctionUrlModel
4146
from .s3 import S3Model, S3RecordModel
4247
from .s3_object_event import (
@@ -86,6 +91,9 @@
8691
"KinesisDataStreamModel",
8792
"KinesisDataStreamRecord",
8893
"KinesisDataStreamRecordPayload",
94+
"KinesisFirehoseModel",
95+
"KinesisFirehoseRecord",
96+
"KinesisFirehoseRecordMetadata",
8997
"LambdaFunctionUrlModel",
9098
"S3Model",
9199
"S3RecordModel",

aws_lambda_powertools/utilities/parser/models/kinesis.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
1-
import base64
2-
import logging
3-
from binascii import Error as BinAsciiError
41
from typing import List, Type, Union
52

63
from pydantic import BaseModel, validator
74

5+
from aws_lambda_powertools.shared.functions import base64_decode
86
from aws_lambda_powertools.utilities.parser.types import Literal
97

10-
logger = logging.getLogger(__name__)
11-
128

139
class KinesisDataStreamRecordPayload(BaseModel):
1410
kinesisSchemaVersion: str
@@ -19,11 +15,7 @@ class KinesisDataStreamRecordPayload(BaseModel):
1915

2016
@validator("data", pre=True, allow_reuse=True)
2117
def data_base64_decode(cls, value):
22-
try:
23-
logger.debug("Decoding base64 Kinesis data record before parsing")
24-
return base64.b64decode(value)
25-
except (BinAsciiError, TypeError):
26-
raise ValueError("base64 decode failed")
18+
return base64_decode(value)
2719

2820

2921
class KinesisDataStreamRecord(BaseModel):
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from typing import List, Optional, Type, Union
2+
3+
from pydantic import BaseModel, PositiveInt, validator
4+
5+
from aws_lambda_powertools.shared.functions import base64_decode
6+
7+
8+
class KinesisFirehoseRecordMetadata(BaseModel):
9+
shardId: str
10+
partitionKey: str
11+
approximateArrivalTimestamp: PositiveInt
12+
sequenceNumber: str
13+
subsequenceNumber: str
14+
15+
16+
class KinesisFirehoseRecord(BaseModel):
17+
data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes
18+
recordId: str
19+
approximateArrivalTimestamp: PositiveInt
20+
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata]
21+
22+
@validator("data", pre=True, allow_reuse=True)
23+
def data_base64_decode(cls, value):
24+
return base64_decode(value)
25+
26+
27+
class KinesisFirehoseModel(BaseModel):
28+
invocationId: str
29+
deliveryStreamArn: str
30+
region: str
31+
sourceKinesisStreamArn: Optional[str]
32+
records: List[KinesisFirehoseRecord]

docs/utilities/parser.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ Parser comes with the following built-in models:
163163
| **S3Model** | Lambda Event Source payload for Amazon S3 |
164164
| **S3ObjectLambdaEvent** | Lambda Event Source payload for Amazon S3 Object Lambda |
165165
| **KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams |
166+
| **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose |
166167
| **SesModel** | Lambda Event Source payload for Amazon Simple Email Service |
167168
| **SnsModel** | Lambda Event Source payload for Amazon Simple Notification Service |
168169
| **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway |
@@ -319,6 +320,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return
319320
| **SqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]` |
320321
| **CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it. <br/> 2. Parses records in `message` key using your model and return them in a list. | `List[Model]` |
321322
| **KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it. <br/> 2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` |
323+
| **KinesisFirehoseEnvelope** | 1. Parses data using `KinesisFirehoseModel` which will base64 decode it. <br/> 2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` |
322324
| **SnsEnvelope** | 1. Parses data using `SnsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]` |
323325
| **SnsSqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses SNS records in `body` key using `SnsNotificationModel`. <br/> 3. Parses data in `Message` key using your model and return them in a list. | `List[Model]` |
324326
| **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{
2+
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
3+
"sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source",
4+
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
5+
"region": "us-east-2",
6+
"records": [
7+
{
8+
"data": "SGVsbG8gV29ybGQ=",
9+
"recordId": "record1",
10+
"approximateArrivalTimestamp": 1664028820148,
11+
"kinesisRecordMetadata": {
12+
"shardId": "shardId-000000000000",
13+
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
14+
"approximateArrivalTimestamp": 1664028820148,
15+
"sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
16+
"subsequenceNumber": ""
17+
}
18+
},
19+
{
20+
"data": "eyJIZWxsbyI6ICJXb3JsZCJ9",
21+
"recordId": "record2",
22+
"approximateArrivalTimestamp": 1664028793294,
23+
"kinesisRecordMetadata": {
24+
"shardId": "shardId-000000000001",
25+
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
26+
"approximateArrivalTimestamp": 1664028793294,
27+
"sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
28+
"subsequenceNumber": ""
29+
}
30+
}
31+
]
32+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
3+
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
4+
"region": "us-east-2",
5+
"records":[
6+
{
7+
"recordId":"record1",
8+
"approximateArrivalTimestamp":1664029185290,
9+
"data":"SGVsbG8gV29ybGQ="
10+
},
11+
{
12+
"recordId":"record2",
13+
"approximateArrivalTimestamp":1664029186945,
14+
"data":"eyJIZWxsbyI6ICJXb3JsZCJ9"
15+
}
16+
]
17+
}

0 commit comments

Comments
 (0)