-
Notifications
You must be signed in to change notification settings - Fork 435
Description
Use case
When working with DynamoDB, depending on the action being performed, there quality of life improvements that can be made to simplify the overall experience. For example:
- When calling
GetItem
, automatically populateExpressionAttributeNames
from an list of attributes - When calling
Query
withLimit
, automatically handle paginating withLastEvaluatedKey
until the desired number of items are returned- This could also include methods for making defining
KeyCondition
easier and populatingExpressionAttributeNames
andExpressionAttributeValues
.
- This could also include methods for making defining
- When calling
BatchGetItems
, automatically retryUnprocessedKeys
using exponential backoff with jitter. - Automatically serialized and deserialize DynamoDB objects into Python objects
Solution/User Experience
A helper class that implements some of these methods, such as:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import random
import time
from typing import Dict, Any, List, TYPE_CHECKING, Generator, Tuple
from aws_lambda_powertools import Logger
import boto3
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from botocore.config import Config
if TYPE_CHECKING:
from mypy_boto3_dynamodb import DynamoDBClient
logger = Logger(child=True)
__all__ = ["DynamoDBHelpers", "TransactionWriter"]
class TransactionWriter:
"""
Automatically handle building a transaction to DynamoDB.
"""
def __init__(self, table_name: str, client: "DynamoDBClient", flush_amount: int = 100) -> None:
self._table_name = table_name
self._client = client
self._flush_amount = flush_amount
self._items_buffer: List[Dict[str, Any]] = []
def put_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"Put": item})
def update_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"Update": item})
def delete_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"Delete": item})
def check_item(self, item: Dict[str, Any]) -> None:
item["TableName"] = self._table_name
self._add_request_and_process({"ConditionCheck": item})
def _add_request_and_process(self, request: Dict[str, Any]) -> None:
self._items_buffer.append(request)
self._flush_if_needed()
def _flush_if_needed(self) -> None:
if len(self._items_buffer) >= self._flush_amount:
self._flush()
def _flush(self):
items_to_send = self._items_buffer[: self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount :]
self._client.transact_write_items(TransactItems=items_to_send)
logger.debug(
f"Transaction write sent {len(items_to_send)}, unprocessed: {len(self._items_buffer)}"
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
# When we exit, we need to keep flushing whatever's left until there's nothing left in our items buffer.
while self._items_buffer:
self._flush()
class DynamoDBHelpers:
MAX_BATCH_GET_SIZE = 100
MAX_TRANSACTION_WRITE_SIZE = 100
# Maximum number of retries
MAX_RETRIES = 15
# Minimum sleep time
MIN_SLEEP_TIME = 1e-2
_deserializer = TypeDeserializer()
_serializer = TypeSerializer()
_client = None
def __init__(self, table_name: str, region: str, session: boto3.Session = None) -> None:
if not session:
session = boto3._get_default_session()
config = Config(
retries={
"max_attempts": self.MAX_RETRIES,
"mode": "standard",
}
)
self._client: "DynamoDBClient" = session.client(
"dynamodb", region_name=region, config=config
)
self._table_name = table_name
@property
def client(self):
return self._client
def transaction_writer(self) -> TransactionWriter:
return TransactionWriter(self._table_name, self._client, self.MAX_TRANSACTION_WRITE_SIZE)
def paginated_query(
self, params: Dict[str, Any], page_size: int, acc: List[Dict[str, Any]] = None
) -> Tuple[List, bool]:
"""
@see https://dev.to/andyrichardsonn/graphql-pagination-with-dynamodb-putting-it-together-20mn
"""
if acc is None:
acc = []
logger.debug("paginated_query", params=params, page_size=page_size)
remaining = page_size - len(acc)
result = self._client.query(**params)
new_items = result.get("Items", [])
new_acc = [*acc, *(new_items[0:remaining])]
cursor = result.get("LastEvaluatedKey")
if not cursor:
return new_acc, len(new_items) > remaining
if len(new_acc) < page_size or len(new_items) <= remaining:
return self.paginated_query(params | {"ExclusiveStartKey": cursor}, page_size, new_acc)
return new_acc, True
def get_items(
self, keys: List[Dict[str, Any]], attributes: List[str] = None, in_transaction: bool = False
) -> Generator[Dict[str, Any], None, None]:
projection_attrs = {}
if attributes:
names = {}
placeholders = []
for idx, attribute in enumerate(attributes):
placeholder = f"#a{idx}"
names[placeholder] = attribute
placeholders.append(placeholder)
projection_attrs["ExpressionAttributeNames"] = names
projection_attrs["ProjectionExpression"] = ",".join(placeholders)
for key_chunk in self._batch_keys(keys):
if in_transaction:
items = self._get_items_transaction(key_chunk, projection_attrs)
else:
items = self._get_items_batch(key_chunk, projection_attrs)
for item in items:
yield item
def _get_items_batch(
self, keys: List[Dict[str, Any]], attributes: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
"""
Gets a batch of items from Amazon DynamoDB in a batch from a single table.
"""
if not keys:
return []
num_keys = len(keys)
if num_keys > self.MAX_BATCH_GET_SIZE:
raise Exception(
f"{num_keys} exceeds maximum batch get size of {self.MAX_BATCH_GET_SIZE}"
)
request_items = {
self._table_name: {
"Keys": self._serialize(keys),
}
}
if attributes:
request_items[self._table_name] |= attributes
response = self._client.batch_get_item(RequestItems=request_items)
items = response.get("Responses", {}).get(self._table_name, [])
results: List[Dict[str, Any]] = [self._deserialize(item) for item in items]
num_retries = 0
while response.get("UnprocessedKeys", {}).get(self._table_name, None) is not None:
num_keys = len(response.get("UnprocessedKeys", {}).get(self._table_name, []))
num_retries += 1
if num_retries > self.MAX_RETRIES:
num_retries = random.randint(1, self.MAX_RETRIES)
sleep_time = self.MIN_SLEEP_TIME * random.randint(1, 2**num_retries)
logger.debug(
f"Re-fetching {num_keys} keys, retry {num_retries} of {self.MAX_RETRIES}, sleeping for {sleep_time} seconds"
)
time.sleep(sleep_time)
response = self._client.batch_get_item(RequestItems=response["UnprocessedKeys"])
items = response.get("Responses", {}).get(self._table_name, [])
results.extend([self._deserialize(item) for item in items])
return results
def _get_items_transaction(
self, keys: List[Dict[str, Any]], attributes: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
"""
Gets a batch of items from Amazon DynamoDB in a transaction.
"""
if not keys:
return []
num_keys = len(keys)
if num_keys > self.MAX_BATCH_GET_SIZE:
raise Exception(
f"{num_keys} exceeds maximum batch get size of {self.MAX_BATCH_GET_SIZE}"
)
items = []
for key in keys:
item = {
"Get": {
"Key": self._serialize(key),
"TableName": self._table_name,
}
}
if attributes:
item["Get"] |= attributes
items.append(item)
response = self._client.transact_get_items(TransactItems=items)
items = response.get("Responses", [])
return [self._deserialize(item["Item"]) for item in items]
@classmethod
def _batch_keys(cls, keys: List[Dict[str, Any]]) -> Generator[List[Dict[str, Any]], None, None]:
end = len(keys)
for index in range(0, end, cls.MAX_BATCH_GET_SIZE):
yield keys[index : min(index + cls.MAX_BATCH_GET_SIZE, end)]
@classmethod
def _deserialize(cls, item: Any) -> Any:
if not item:
return item
if isinstance(item, dict) and "M" not in item:
item = {"M": item}
return cls._deserializer.deserialize(item)
@classmethod
def _serialize(cls, obj: Any) -> Dict[str, Any]:
result = cls._serializer.serialize(obj)
if "M" in result:
result = result["M"]
return result
Alternative solutions
PynamoDB is one alternative that has its own model structure. I'm implementing a single-table design and wanted more control over the items being inserted into the table.
What does a good UX look like?
Users shouldn't be concerned with how DynamoDB internally representing typing, so all outside->in and inside->out data conversations should be ran through the DynamoDB TypeSerializer
and TypeDeserializer
libraries. Fetching individual attributes can also be simplified to List[str]
and then internally we build the ProjectionExpression
and populate ExpressionAttributeNames
.
Acknowledgment
- This feature request meets Lambda Powertools Tenets
- Should this be considered in other Lambda Powertools languages? i.e. Java, TypeScript
Metadata
Metadata
Assignees
Labels
Type
Projects
Status