Skip to content

Minor updates #283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 73 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,25 @@ The SDK is now ready for use. To build from source, see "Building" near the end

## Implementing a Workflow

Create the following script at `run_worker.py`:
Create the following in `activities.py`:

```python
import asyncio
from datetime import datetime, timedelta
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio import activity

@activity.defn
async def say_hello(name: str) -> str:
return f"Hello, {name}!"
```

Create the following in `workflows.py`:

```python
from datetime import timedelta
from temporalio import workflow

# Import our activity, passing it through the sandbox
with workflow.unsafe.imports_passed_through():
from .activities import say_hello

@workflow.defn
class SayHello:
Expand All @@ -145,6 +152,18 @@ class SayHello:
return await workflow.execute_activity(
say_hello, name, schedule_to_close_timeout=timedelta(seconds=5)
)
```

Create the following in `run_worker.py`:

```python
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker

# Import the activity and workflow from our other files
from .activities import say_hello
from .workflows import SayHello

async def main():
# Create client connected to server at the given address
Expand Down Expand Up @@ -172,7 +191,7 @@ import asyncio
from temporalio.client import Client

# Import the workflow from the previous code
from run_worker import SayHello
from .workflows import SayHello

async def main():
# Create client connected to server at the given address
Expand All @@ -196,11 +215,16 @@ The output will be:
Result: Hello, my-name!

## Next Steps
Temporal can be implemented in your code in many different ways, to suit your application's needs. The links below will give you much more information about how Temporal works with Python:

* [Code Samples](https://github.com/temporalio/samples-python) - If you want to start with some code, we have provided some pre-built samples.
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) Our Python specific Developer's Guide will give you much more information on how to build with Temporal in your Python applications than our SDK README ever could (or should).
* [API Documentation](https://python.temporal.io) - Full Temporal Python SDK package documentation
Temporal can be implemented in your code in many different ways, to suit your application's needs. The links below will
give you much more information about how Temporal works with Python:

* [Code Samples](https://github.com/temporalio/samples-python) - If you want to start with some code, we have provided
some pre-built samples.
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) Our Python specific
Developer's Guide will give you much more information on how to build with Temporal in your Python applications than
our SDK README ever could (or should).
* [API Documentation](https://python.temporal.io) - Full Temporal Python SDK package documentation.

---

Expand Down Expand Up @@ -420,16 +444,12 @@ respectively. Here's an example of a workflow:

```python
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio import workflow

@dataclass
class GreetingInfo:
salutation: str = "Hello"
name: str = "<unknown>"
# Pass the activities through the sandbox
with workflow.unsafe.imports_passed_through():
from .my_activities import GreetingInfo, create_greeting_activity

@workflow.defn
class GreetingWorkflow:
Expand Down Expand Up @@ -477,16 +497,31 @@ class GreetingWorkflow:
async def current_greeting(self) -> str:
return self._current_greeting

```

This assumes there's an activity in `my_activities.py` like:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why do activities need the my_ prefix and workflows don't?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't specify a workflow file name in this section. If you're talking about the quick start section, neither needs a my_ prefix. I chose to add it here for clarity since it's not an end-to-end example.


```python
from dataclasses import dataclass
from temporalio import workflow

@dataclass
class GreetingInfo:
salutation: str = "Hello"
name: str = "<unknown>"

@activity.defn
async def create_greeting_activity(info: GreetingInfo) -> str:
return f"{info.salutation}, {info.name}!"
```

Some things to note about the above code:
Some things to note about the above workflow code:

* Workflows run in a sandbox by default. Users are encouraged to define workflows in files with no side effects or other
complicated code or unnecessary imports to other third party libraries. See the [Workflow Sandbox](#workflow-sandbox)
section for more details.
* Workflows run in a sandbox by default.
* Users are encouraged to define workflows in files with no side effects or other complicated code or unnecessary
imports to other third party libraries.
* Non-standard-library, non-`temporalio` imports should usually be "passed through" the sandbox. See the
[Workflow Sandbox](#workflow-sandbox) section for more details.
* This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on
a different signal
* Workflows are always classes and must have a single `@workflow.run` which is an `async def` function
Expand Down Expand Up @@ -791,7 +826,17 @@ early. Users are encouraged to define their workflows in files with no other sid

The sandbox offers a mechanism to pass through modules from outside the sandbox. By default this already includes all
standard library modules and Temporal modules. **For performance and behavior reasons, users are encouraged to pass
through all third party modules whose calls will be deterministic.** See "Passthrough Modules" below on how to do this.
through all third party modules whose calls will be deterministic.** This includes modules containing the activities to
be referenced in workflows. See "Passthrough Modules" below on how to do this.

If you are getting an error like:

> temporalio.worker.workflow_sandbox._restrictions.RestrictedWorkflowAccessError: Cannot access
> http.client.IncompleteRead.\_\_mro_entries\_\_ from inside a workflow. If this is code from a module not used in a
> workflow or known to only be used deterministically from a workflow, mark the import as pass through.

Then you are either using an invalid construct from the workflow, this is a known limitation of the sandbox, or most
commonly this is from a module that is safe to pass through (see "Passthrough Modules" section below).

##### How the Sandbox Works

Expand Down Expand Up @@ -1093,9 +1138,10 @@ occurs. Synchronous activities cannot call any of the `async` functions.

##### Heartbeating and Cancellation

In order for a non-local activity to be notified of cancellation requests, it must invoke
`temporalio.activity.heartbeat()`. It is strongly recommended that all but the fastest executing activities call this
function regularly. "Types of Activities" has specifics on cancellation for asynchronous and synchronous activities.
In order for a non-local activity to be notified of cancellation requests, it must be given a `heartbeat_timeout` at
invocation time and invoke `temporalio.activity.heartbeat()` inside the activity. It is strongly recommended that all
Comment on lines +1141 to +1142
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not true, you don't have to specify a heartbeat_timeout in order to be cancellable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that it's not strictly true but the sentence reads better with "must X and Y" than "must Y and should X".

but the fastest executing activities call this function regularly. "Types of Activities" has specifics on cancellation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you might want to mention throttling here, up to you.

for asynchronous and synchronous activities.

In addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the server
for retrieval during activity retry. If an activity calls `temporalio.activity.heartbeat(123, 456)` and then fails and
Expand Down
6 changes: 3 additions & 3 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1544,15 +1544,15 @@ async def heartbeat(

async def complete(
self,
result: Optional[Any] = None,
result: Optional[Any] = temporalio.common._arg_unset,
*,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> None:
"""Complete the activity.

Args:
result: Result of the activity.
result: Result of the activity if any.
rpc_metadata: Headers used on the RPC call. Keys here override
client-level RPC metadata keys.
rpc_timeout: Optional RPC deadline to set for the RPC call.
Expand Down Expand Up @@ -4421,7 +4421,7 @@ async def heartbeat_async_activity(
async def complete_async_activity(self, input: CompleteAsyncActivityInput) -> None:
result = (
None
if not input.result
if input.result is temporalio.common._arg_unset
else await self._client.data_converter.encode_wrapper([input.result])
)
if isinstance(input.id_or_token, AsyncActivityIDReference):
Expand Down
6 changes: 6 additions & 0 deletions temporalio/worker/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ def __init__(
if defn.name in self._activities:
raise ValueError(f"More than one activity named {defn.name}")

# Do not allow classes, __call__ based activities must be instances
if inspect.isclass(activity):
raise TypeError(
f"Activity named {defn.name} is a class instead of an instance"
)

# Some extra requirements for sync functions
if not defn.is_async:
if not activity_executor:
Expand Down
8 changes: 5 additions & 3 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1460,7 +1460,7 @@ async def handle_signal(self, input: HandleSignalInput) -> None:
handler = self._instance.workflow_get_signal_handler(None)
dynamic = True
# Technically this is checked before the interceptor is invoked, but
# an # interceptor could have changed the name
# an interceptor could have changed the name
if not handler:
raise RuntimeError(
f"Signal handler for {input.signal} expected but not found"
Expand All @@ -1480,10 +1480,12 @@ async def handle_query(self, input: HandleQueryInput) -> Any:
handler = self._instance.workflow_get_query_handler(None)
dynamic = True
# Technically this is checked before the interceptor is invoked, but
# an # interceptor could have changed the name
# an interceptor could have changed the name
if not handler:
known_queries = sorted([k for k in self._instance._queries.keys() if k])
raise RuntimeError(
f"Query handler for '{input.query}' expected but not found"
f"Query handler for '{input.query}' expected but not found, "
f"known queries: [{' '.join(known_queries)}]"
)
# Put name first if dynamic
args = list(input.args) if not dynamic else [input.query] + list(input.args)
Expand Down
7 changes: 6 additions & 1 deletion temporalio/worker/workflow_sandbox/_restrictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ class RestrictedWorkflowAccessError(temporalio.workflow.NondeterminismError):

def __init__(self, qualified_name: str) -> None:
"""Create restricted workflow access error."""
super().__init__(f"Cannot access {qualified_name} from inside a workflow.")
super().__init__(
f"Cannot access {qualified_name} from inside a workflow. "
"If this is code from a module not used in a workflow or known to "
"only be used deterministically from a workflow, mark the import "
"as pass through."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd consider linking to some doc that provides more information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no stable link I can provide here

)
self.qualified_name = qualified_name


Expand Down
13 changes: 11 additions & 2 deletions tests/worker/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ def __init__(self) -> None:
self._info_set = asyncio.Event()

@activity.defn
async def run(self) -> str:
async def run(self) -> Optional[str]:
self._info = activity.info()
self._info_set.set()
activity.raise_complete_async()
Expand All @@ -1012,15 +1012,24 @@ def async_handle(self, client: Client, use_task_token: bool) -> AsyncActivityHan
async def test_activity_async_success(
client: Client, worker: ExternalWorker, use_task_token: bool
):
wrapper = AsyncActivityWrapper()
# Start task, wait for info, complete with value, wait on workflow
wrapper = AsyncActivityWrapper()
task = asyncio.create_task(
_execute_workflow_with_activity(client, worker, wrapper.run)
)
await wrapper.wait_info()
await wrapper.async_handle(client, use_task_token).complete("some value")
assert "some value" == (await task).result

# Do again with a None value
wrapper = AsyncActivityWrapper()
task = asyncio.create_task(
_execute_workflow_with_activity(client, worker, wrapper.run)
)
await wrapper.wait_info()
await wrapper.async_handle(client, use_task_token).complete(None)
assert (await task).result is None


@pytest.mark.parametrize("use_task_token", [True, False])
async def test_activity_async_heartbeat_and_fail(
Expand Down
19 changes: 16 additions & 3 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ def bad_signal(self) -> NoReturn:
def bad_query(self) -> NoReturn:
raise ApplicationError("query fail", 456)

@workflow.query
def other_query(self) -> str:
raise NotImplementedError


async def test_workflow_signal_and_query_errors(client: Client):
async with new_worker(client, SignalAndQueryErrorsWorkflow) as worker:
Expand All @@ -374,9 +378,9 @@ async def test_workflow_signal_and_query_errors(client: Client):
# Unrecognized query
with pytest.raises(WorkflowQueryFailedError) as rpc_err:
await handle.query("non-existent query")
assert (
str(rpc_err.value)
== "Query handler for 'non-existent query' expected but not found"
assert str(rpc_err.value) == (
"Query handler for 'non-existent query' expected but not found,"
" known queries: [__stack_trace bad_query other_query]"
)


Expand Down Expand Up @@ -2260,6 +2264,15 @@ async def test_workflow_activity_callable_class(client: Client):
assert result == MyDataClass(field1="in worker, workflow param")


async def test_workflow_activity_callable_class_bad_register(client: Client):
# Try to register the class instead of the instance
with pytest.raises(TypeError) as err:
new_worker(
client, ActivityCallableClassWorkflow, activities=[CallableClassActivity]
)
assert "is a class instead of an instance" in str(err.value)


class MethodActivity:
def __init__(self, orig_field1: str) -> None:
self.orig_field1 = orig_field1
Expand Down