Skip to content

Allow converter failures to fail workflow and other minor things #329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ jobs:
runsOn: buildjet-4vcpu-ubuntu-2204-arm
runs-on: ${{ matrix.runsOn || matrix.os }}
steps:
- name: Print build information
run: "echo head_ref: ${{ github.head_ref }}, ref: ${{ github.ref }}, os: ${{ matrix.os }}, python: ${{ matrix.python }}"
- uses: actions/checkout@v2
with:
submodules: recursive
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ This notably doesn't include any `date`, `time`, or `datetime` objects as they m
Users are strongly encouraged to use a single `dataclass` for parameter and return types so fields with defaults can be
easily added without breaking compatibility.

Classes with generics may not have the generics properly resolved. The current implementation, similar to Pydantic, does
not have generic type resolution. Users should use concrete types.
Classes with generics may not have the generics properly resolved. The current implementation does not have generic
type resolution. Users should use concrete types.

##### Custom Type Data Conversion

Expand Down
25 changes: 11 additions & 14 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ protoc-wheel-0 = "^21.1"
psutil = "^5.9.3"
pydantic = "^1.9.1"
pydocstyle = "^6.1.1"
# TODO(cretz): Update when https://github.com/twisted/pydoctor/pull/595 released
# pydoctor = "^22.5.1"
pydoctor = { git = "https://github.com/cretz/pydoctor.git", branch = "overloads" }
pydoctor = "^23.4.1"
pytest = "^7.1.2"
pytest-asyncio = "^0.18.3"
pytest-timeout = "^2.1.0"
Expand Down
30 changes: 30 additions & 0 deletions temporalio/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Common Temporal exceptions."""

import asyncio
from enum import IntEnum
from typing import Any, Optional, Sequence, Tuple

Expand Down Expand Up @@ -322,3 +323,32 @@ def started_event_id(self) -> int:
def retry_state(self) -> Optional[RetryState]:
"""Retry state for this error."""
return self._retry_state


def is_cancelled_exception(exception: BaseException) -> bool:
"""Check whether the given exception is considered a cancellation exception
according to Temporal.

This is often used in a conditional of a catch clause to check whether a
cancel occurred inside of a workflow. This can occur from
:py:class:`asyncio.CancelledError` or :py:class:`CancelledError` or either
:py:class:`ActivityError` or :py:class:`ChildWorkflowError` if either of
those latter two have a :py:class:`CancelledError` cause.

Args:
exception: Exception to check.

Returns:
True if a cancelled exception, false if not.
"""
return (
isinstance(exception, asyncio.CancelledError)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh boy this whole thing is not fun lol

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we basically write this same thing in every SDK

or isinstance(exception, CancelledError)
or (
(
isinstance(exception, ActivityError)
or isinstance(exception, ChildWorkflowError)
)
and isinstance(exception.cause, CancelledError)
)
)
47 changes: 27 additions & 20 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def activate(
self._time_ns = act.timestamp.ToNanoseconds()
self._is_replaying = act.is_replaying

activation_err: Optional[Exception] = None
try:
# Split into job sets with patches, then signals, then non-queries, then
# queries
Expand Down Expand Up @@ -287,7 +288,17 @@ def activate(
# be checked in patch jobs (first index) or query jobs (last
# index).
self._run_once(check_conditions=index == 1 or index == 2)
except temporalio.exceptions.FailureError as err:
# We want failure errors during activation, like those that can
# happen during payload conversion, to fail the workflow not the
# task
try:
self._set_workflow_failure(err)
Comment on lines +292 to +296
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making sure - this is only happening if they explicitly raise something like ApplicationError - not on any old random exception, correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this catch clause is for temporalio.exceptions.FailureError only

except Exception as inner_err:
activation_err = inner_err
except Exception as err:
activation_err = err
if activation_err:
logger.warning(
f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}",
exc_info=True,
Expand All @@ -296,7 +307,7 @@ def activate(
self._current_completion.failed.failure.SetInParent()
try:
self._failure_converter.to_failure(
err,
activation_err,
self._payload_converter,
self._current_completion.failed.failure,
)
Expand Down Expand Up @@ -1134,6 +1145,9 @@ def _convert_payloads(
payloads,
type_hints=types,
)
except temporalio.exceptions.FailureError:
# Don't wrap payload conversion errors that would fail the workflow
raise
except Exception as err:
raise RuntimeError("Failed decoding arguments") from err

Expand Down Expand Up @@ -1227,33 +1241,26 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
# cancel later on will show the workflow as cancelled. But this is
# a Temporal limitation in that cancellation is a state not an
# event.
if self._cancel_requested and (
isinstance(err, temporalio.exceptions.CancelledError)
or (
(
isinstance(err, temporalio.exceptions.ActivityError)
or isinstance(err, temporalio.exceptions.ChildWorkflowError)
)
and isinstance(err.cause, temporalio.exceptions.CancelledError)
)
if self._cancel_requested and temporalio.exceptions.is_cancelled_exception(
err
):
self._add_command().cancel_workflow_execution.SetInParent()
elif isinstance(err, temporalio.exceptions.FailureError):
# All other failure errors fail the workflow
failure = self._add_command().fail_workflow_execution.failure
failure.SetInParent()
try:
self._failure_converter.to_failure(
err, self._payload_converter, failure
)
except Exception as inner_err:
raise ValueError(
"Failed converting workflow exception"
) from inner_err
self._set_workflow_failure(err)
else:
# All other exceptions fail the task
self._current_activation_error = err

def _set_workflow_failure(self, err: temporalio.exceptions.FailureError) -> None:
# All other failure errors fail the workflow
failure = self._add_command().fail_workflow_execution.failure
failure.SetInParent()
try:
self._failure_converter.to_failure(err, self._payload_converter, failure)
except Exception as inner_err:
raise ValueError("Failed converting workflow exception") from inner_err

async def _signal_external_workflow(
self,
# Should not have seq set
Expand Down
44 changes: 44 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Optional,
Sequence,
Tuple,
Type,
cast,
)

Expand Down Expand Up @@ -51,6 +52,7 @@
from temporalio.converter import (
DataConverter,
DefaultFailureConverterWithEncodedAttributes,
DefaultPayloadConverter,
PayloadCodec,
PayloadConverter,
)
Expand Down Expand Up @@ -2719,3 +2721,45 @@ async def test_workflow_optional_param(client: Client):
task_queue=worker.task_queue,
)
assert result3 == OptionalParam(some_string="foo")


class ExceptionRaisingPayloadConverter(DefaultPayloadConverter):
bad_str = "bad-payload-str"

def from_payloads(
self, payloads: Sequence[Payload], type_hints: Optional[List] = None
) -> List[Any]:
# Check if any payloads contain the bad data
for payload in payloads:
if ExceptionRaisingPayloadConverter.bad_str.encode() in payload.data:
raise ApplicationError("Intentional converter failure")
return super().from_payloads(payloads, type_hints)


@workflow.defn
class ExceptionRaisingConverterWorkflow:
@workflow.run
async def run(self, some_param: str) -> str:
return some_param


async def test_exception_raising_converter_param(client: Client):
# Clone the client but change the data converter to use our converter
config = client.config()
config["data_converter"] = dataclasses.replace(
config["data_converter"],
payload_converter_class=ExceptionRaisingPayloadConverter,
)
client = Client(**config)

# Run workflow and confirm error
async with new_worker(client, ExceptionRaisingConverterWorkflow) as worker:
with pytest.raises(WorkflowFailureError) as err:
await client.execute_workflow(
ExceptionRaisingConverterWorkflow.run,
ExceptionRaisingPayloadConverter.bad_str,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
assert isinstance(err.value.cause, ApplicationError)
assert "Intentional converter failure" in str(err.value.cause)