diff --git a/README.md b/README.md index bb8444bae..5c0eec5f2 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ event loop. This means task management, sleep, cancellation, etc have all been d - [Usage](#usage) - [Client](#client) - [Data Conversion](#data-conversion) + - [Custom Type Data Conversion](#custom-type-data-conversion) - [Workers](#workers) - [Workflows](#workflows) - [Definition](#definition) @@ -268,21 +269,118 @@ The default data converter supports converting multiple types including: * Iterables including ones JSON dump may not support by default, e.g. `set` * Any class with a `dict()` method and a static `parse_obj()` method, e.g. [Pydantic models](https://pydantic-docs.helpmanual.io/usage/models) - * Note, this doesn't mean every Pydantic field can be converted, only fields which the data converter supports + * The default data converter is deprecated for Pydantic models and will warn if used since not all fields work. + See [this sample](https://github.com/temporalio/samples-python/tree/main/pydantic_converter) for the recommended + approach. * [IntEnum, StrEnum](https://docs.python.org/3/library/enum.html) based enumerates * [UUID](https://docs.python.org/3/library/uuid.html) This notably doesn't include any `date`, `time`, or `datetime` objects as they may not work across SDKs. +Users are strongly encouraged to use a single `dataclass` for parameter and return types so fields with defaults can be +easily added without breaking compatibility. + Classes with generics may not have the generics properly resolved. The current implementation, similar to Pydantic, does not have generic type resolution. Users should use concrete types. +##### Custom Type Data Conversion + For converting from JSON, the workflow/activity type hint is taken into account to convert to the proper type. Care has been taken to support all common typings including `Optional`, `Union`, all forms of iterables and mappings, `NewType`, etc in addition to the regular JSON values mentioned before. -Users are strongly encouraged to use a single `dataclass` for parameter and return types so fields with defaults can be -easily added without breaking compatibility. +Data converters contain a reference to a payload converter class that is used to convert to/from payloads/values. This +is a class and not an instance because it is instantiated on every workflow run inside the sandbox. The payload +converter is usually a `CompositePayloadConverter` which contains a multiple `EncodingPayloadConverter`s it uses to try +to serialize/deserialize payloads. Upon serialization, each `EncodingPayloadConverter` is tried until one succeeds. The +`EncodingPayloadConverter` provides an "encoding" string serialized onto the payload so that, upon deserialization, the +specific `EncodingPayloadConverter` for the given "encoding" is used. + +The default data converter uses the `DefaultPayloadConverter` which is simply a `CompositePayloadConverter` with a known +set of default `EncodingPayloadConverter`s. To implement a custom encoding for a custom type, a new +`EncodingPayloadConverter` can be created for the new type. For example, to support `IPv4Address` types: + +```python +class IPv4AddressEncodingPayloadConverter(EncodingPayloadConverter): + @property + def encoding(self) -> str: + return "text/ipv4-address" + + def to_payload(self, value: Any) -> Optional[Payload]: + if isinstance(value, ipaddress.IPv4Address): + return Payload( + metadata={"encoding": self.encoding.encode()}, + data=str(value).encode(), + ) + else: + return None + + def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any: + assert not type_hint or type_hint is ipaddress.IPv4Address + return ipaddress.IPv4Address(payload.data.decode()) + +class IPv4AddressPayloadConverter(CompositePayloadConverter): + def __init__(self) -> None: + # Just add ours as first before the defaults + super().__init__( + IPv4AddressEncodingPayloadConverter(), + *DefaultPayloadConverter.default_encoding_payload_converters, + ) + +my_data_converter = dataclasses.replace( + DataConverter.default, + payload_converter_class=IPv4AddressPayloadConverter, +) +``` + +Imports are left off for brevity. + +This is good for many custom types. However, sometimes you want to override the behavior of the just the existing JSON +encoding payload converter to support a new type. It is already the last encoding data converter in the list, so it's +the fall-through behavior for any otherwise unknown type. Customizing the existing JSON converter has the benefit of +making the type work in lists, unions, etc. + +The `JSONPlainPayloadConverter` uses the Python [json](https://docs.python.org/3/library/json.html) library with an +advanced JSON encoder by default and a custom value conversion method to turn `json.load`ed values to their type hints. +The conversion can be customized for serialization with a custom `json.JSONEncoder` and deserialization with a custom +`JSONTypeConverter`. For example, to support `IPv4Address` types in existing JSON conversion: + +```python +class IPv4AddressJSONEncoder(AdvancedJSONEncoder): + def default(self, o: Any) -> Any: + if isinstance(o, ipaddress.IPv4Address): + return str(o) + return super().default(o) +class IPv4AddressJSONTypeConverter(JSONTypeConverter): + def to_typed_value( + self, hint: Type, value: Any + ) -> Union[Optional[Any], _JSONTypeConverterUnhandled]: + if issubclass(hint, ipaddress.IPv4Address): + return ipaddress.IPv4Address(value) + return JSONTypeConverter.Unhandled + +class IPv4AddressPayloadConverter(CompositePayloadConverter): + def __init__(self) -> None: + # Replace default JSON plain with our own that has our encoder and type + # converter + json_converter = JSONPlainPayloadConverter( + encoder=IPv4AddressJSONEncoder, + custom_type_converters=[IPv4AddressJSONTypeConverter()], + ) + super().__init__( + *[ + c if not isinstance(c, JSONPlainPayloadConverter) else json_converter + for c in DefaultPayloadConverter.default_encoding_payload_converters + ] + ) + +my_data_converter = dataclasses.replace( + DataConverter.default, + payload_converter_class=IPv4AddressPayloadConverter, +) +``` + +Now `IPv4Address` can be used in type hints including collections, optionals, etc. ### Workers diff --git a/temporalio/converter.py b/temporalio/converter.py index 567271484..f78b9fe53 100644 --- a/temporalio/converter.py +++ b/temporalio/converter.py @@ -23,6 +23,7 @@ Dict, List, Mapping, + NewType, Optional, Sequence, Tuple, @@ -458,6 +459,7 @@ def __init__( encoder: Optional[Type[json.JSONEncoder]] = AdvancedJSONEncoder, decoder: Optional[Type[json.JSONDecoder]] = None, encoding: str = "json/plain", + custom_type_converters: Sequence[JSONTypeConverter] = [], ) -> None: """Initialize a JSON data converter. @@ -465,11 +467,14 @@ def __init__( encoder: Custom encoder class object to use. decoder: Custom decoder class object to use. encoding: Encoding name to use. + custom_type_converters: Set of custom type converters that are used + when converting from a payload to type-hinted values. """ super().__init__() self._encoder = encoder self._decoder = decoder self._encoding = encoding + self._custom_type_converters = custom_type_converters @property def encoding(self) -> str: @@ -500,12 +505,43 @@ def from_payload( try: obj = json.loads(payload.data, cls=self._decoder) if type_hint: - obj = value_to_type(type_hint, obj) + obj = value_to_type(type_hint, obj, self._custom_type_converters) return obj except json.JSONDecodeError as err: raise RuntimeError("Failed parsing") from err +_JSONTypeConverterUnhandled = NewType("_JSONTypeConverterUnhandled", object) + + +class JSONTypeConverter(ABC): + """Converter for converting an object from Python :py:func:`json.loads` + result (e.g. scalar, list, or dict) to a known type. + """ + + Unhandled = _JSONTypeConverterUnhandled(object()) + """Sentinel value that must be used as the result of + :py:meth:`to_typed_value` to say the given type is not handled by this + converter.""" + + @abstractmethod + def to_typed_value( + self, hint: Type, value: Any + ) -> Union[Optional[Any], _JSONTypeConverterUnhandled]: + """Convert the given value to a type based on the given hint. + + Args: + hint: Type hint to use to help in converting the value. + value: Value as returned by :py:func:`json.loads`. Usually a scalar, + list, or dict. + + Returns: + The converted value or :py:attr:`Unhandled` if this converter does + not handle this situation. + """ + raise NotImplementedError + + class PayloadCodec(ABC): """Codec for encoding/decoding to/from bytes. @@ -1112,7 +1148,11 @@ def decode_search_attributes( return ret -def value_to_type(hint: Type, value: Any) -> Any: +def value_to_type( + hint: Type, + value: Any, + custom_converters: Sequence[JSONTypeConverter] = [], +) -> Any: """Convert a given value to the given type hint. This is used internally to convert a raw JSON loaded value to a specific @@ -1121,6 +1161,10 @@ def value_to_type(hint: Type, value: Any) -> Any: Args: hint: Type hint to convert the value to. value: Raw value (e.g. primitive, dict, or list) to convert from. + custom_converters: Set of custom converters to try before doing default + conversion. Converters are tried in order and the first value that + is not :py:attr:`JSONTypeConverter.Unhandled` will be returned from + this function instead of doing default behavior. Returns: Converted value. @@ -1128,6 +1172,12 @@ def value_to_type(hint: Type, value: Any) -> Any: Raises: TypeError: Unable to convert to the given hint. """ + # Try custom converters + for conv in custom_converters: + ret = conv.to_typed_value(hint, value) + if ret is not JSONTypeConverter.Unhandled: + return ret + # Any or primitives if hint is Any: return value diff --git a/tests/test_converter.py b/tests/test_converter.py index 75496f933..7459bbb9e 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1,6 +1,7 @@ from __future__ import annotations import dataclasses +import ipaddress import logging import sys import traceback @@ -22,6 +23,7 @@ Set, Text, Tuple, + Type, Union, ) from uuid import UUID, uuid4 @@ -37,11 +39,16 @@ from temporalio.api.common.v1 import Payloads from temporalio.api.failure.v1 import Failure from temporalio.converter import ( + AdvancedJSONEncoder, BinaryProtoPayloadConverter, + CompositePayloadConverter, DataConverter, DefaultFailureConverterWithEncodedAttributes, + DefaultPayloadConverter, JSONPlainPayloadConverter, + JSONTypeConverter, PayloadCodec, + _JSONTypeConverterUnhandled, decode_search_attributes, encode_search_attribute_values, ) @@ -516,3 +523,56 @@ async def test_failure_encoded_attributes(): not in failure.application_failure_info.details.payloads[0].metadata ) assert failure == orig_failure + + +class IPv4AddressPayloadConverter(CompositePayloadConverter): + def __init__(self) -> None: + # Replace default JSON plain with our own that has our type converter + json_converter = JSONPlainPayloadConverter( + encoder=IPv4AddressJSONEncoder, + custom_type_converters=[IPv4AddressJSONTypeConverter()], + ) + super().__init__( + *[ + c if not isinstance(c, JSONPlainPayloadConverter) else json_converter + for c in DefaultPayloadConverter.default_encoding_payload_converters + ] + ) + + +class IPv4AddressJSONEncoder(AdvancedJSONEncoder): + def default(self, o: Any) -> Any: + if isinstance(o, ipaddress.IPv4Address): + return str(o) + return super().default(o) + + +class IPv4AddressJSONTypeConverter(JSONTypeConverter): + def to_typed_value( + self, hint: Type, value: Any + ) -> Union[Optional[Any], _JSONTypeConverterUnhandled]: + if issubclass(hint, ipaddress.IPv4Address): + return ipaddress.IPv4Address(value) + return JSONTypeConverter.Unhandled + + +async def test_json_type_converter(): + addr = ipaddress.IPv4Address("1.2.3.4") + custom_conv = dataclasses.replace( + DataConverter.default, payload_converter_class=IPv4AddressPayloadConverter + ) + + # Fails to encode with default + with pytest.raises(TypeError): + await DataConverter.default.encode([addr]) + + # But encodes with custom + payload = (await custom_conv.encode([addr]))[0] + assert '"1.2.3.4"' == payload.data.decode() + + # Fails to decode with default + with pytest.raises(TypeError): + await DataConverter.default.decode([payload], [ipaddress.IPv4Address]) + + # But decodes with custom + assert addr == (await custom_conv.decode([payload], [ipaddress.IPv4Address]))[0]