-
Notifications
You must be signed in to change notification settings - Fork 112
Allow converter failures to fail workflow and other minor things #329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e99e62f
43cc9bf
242f2a1
41bcb44
18fdc40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -259,6 +259,7 @@ def activate( | |
self._time_ns = act.timestamp.ToNanoseconds() | ||
self._is_replaying = act.is_replaying | ||
|
||
activation_err: Optional[Exception] = None | ||
try: | ||
# Split into job sets with patches, then signals, then non-queries, then | ||
# queries | ||
|
@@ -287,7 +288,17 @@ def activate( | |
# be checked in patch jobs (first index) or query jobs (last | ||
# index). | ||
self._run_once(check_conditions=index == 1 or index == 2) | ||
except temporalio.exceptions.FailureError as err: | ||
# We want failure errors during activation, like those that can | ||
# happen during payload conversion, to fail the workflow not the | ||
# task | ||
try: | ||
self._set_workflow_failure(err) | ||
Comment on lines
+292
to
+296
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Making sure - this is only happening if they explicitly raise something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, this catch clause is for |
||
except Exception as inner_err: | ||
activation_err = inner_err | ||
except Exception as err: | ||
activation_err = err | ||
if activation_err: | ||
logger.warning( | ||
f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}", | ||
exc_info=True, | ||
|
@@ -296,7 +307,7 @@ def activate( | |
self._current_completion.failed.failure.SetInParent() | ||
try: | ||
self._failure_converter.to_failure( | ||
err, | ||
activation_err, | ||
self._payload_converter, | ||
self._current_completion.failed.failure, | ||
) | ||
|
@@ -1134,6 +1145,9 @@ def _convert_payloads( | |
payloads, | ||
type_hints=types, | ||
) | ||
except temporalio.exceptions.FailureError: | ||
# Don't wrap payload conversion errors that would fail the workflow | ||
raise | ||
except Exception as err: | ||
raise RuntimeError("Failed decoding arguments") from err | ||
|
||
|
@@ -1227,33 +1241,26 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None: | |
# cancel later on will show the workflow as cancelled. But this is | ||
# a Temporal limitation in that cancellation is a state not an | ||
# event. | ||
if self._cancel_requested and ( | ||
isinstance(err, temporalio.exceptions.CancelledError) | ||
or ( | ||
( | ||
isinstance(err, temporalio.exceptions.ActivityError) | ||
or isinstance(err, temporalio.exceptions.ChildWorkflowError) | ||
) | ||
and isinstance(err.cause, temporalio.exceptions.CancelledError) | ||
) | ||
if self._cancel_requested and temporalio.exceptions.is_cancelled_exception( | ||
err | ||
): | ||
self._add_command().cancel_workflow_execution.SetInParent() | ||
elif isinstance(err, temporalio.exceptions.FailureError): | ||
# All other failure errors fail the workflow | ||
failure = self._add_command().fail_workflow_execution.failure | ||
failure.SetInParent() | ||
try: | ||
self._failure_converter.to_failure( | ||
err, self._payload_converter, failure | ||
) | ||
except Exception as inner_err: | ||
raise ValueError( | ||
"Failed converting workflow exception" | ||
) from inner_err | ||
self._set_workflow_failure(err) | ||
else: | ||
# All other exceptions fail the task | ||
self._current_activation_error = err | ||
|
||
def _set_workflow_failure(self, err: temporalio.exceptions.FailureError) -> None: | ||
# All other failure errors fail the workflow | ||
failure = self._add_command().fail_workflow_execution.failure | ||
failure.SetInParent() | ||
try: | ||
self._failure_converter.to_failure(err, self._payload_converter, failure) | ||
except Exception as inner_err: | ||
raise ValueError("Failed converting workflow exception") from inner_err | ||
|
||
async def _signal_external_workflow( | ||
self, | ||
# Should not have seq set | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh boy this whole thing is not fun lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we basically write this same thing in every SDK