Skip to content

OpenAI/plugin #956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions temporalio/contrib/openai_agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,19 @@
Use with caution in production environments.
"""

from temporalio.contrib.openai_agents._invoke_model_activity import ModelActivity
from temporalio.contrib.openai_agents._model_parameters import ModelActivityParameters
from temporalio.contrib.openai_agents._trace_interceptor import (
OpenAIAgentsTracingInterceptor,
)
from temporalio.contrib.openai_agents.temporal_openai_agents import (
OpenAIAgentsPlugin,
TestModel,
TestModelProvider,
set_open_ai_agent_temporal_overrides,
)

from . import workflow

__all__ = [
"ModelActivity",
"OpenAIAgentsPlugin",
"ModelActivityParameters",
"workflow",
"set_open_ai_agent_temporal_overrides",
"OpenAIAgentsTracingInterceptor",
"TestModel",
"TestModelProvider",
]
127 changes: 127 additions & 0 deletions temporalio/contrib/openai_agents/temporal_openai_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@
from agents.tracing.provider import DefaultTraceProvider
from openai.types.responses import ResponsePromptParam

import temporalio.client
import temporalio.worker
from temporalio.client import ClientConfig
from temporalio.contrib.openai_agents._invoke_model_activity import ModelActivity
from temporalio.contrib.openai_agents._model_parameters import ModelActivityParameters
from temporalio.contrib.openai_agents._openai_runner import TemporalOpenAIRunner
from temporalio.contrib.openai_agents._temporal_trace_provider import (
TemporalTraceProvider,
)
from temporalio.contrib.openai_agents._trace_interceptor import (
OpenAIAgentsTracingInterceptor,
)
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.worker import Worker, WorkerConfig


@contextmanager
Expand Down Expand Up @@ -133,3 +142,121 @@ def stream_response(
) -> AsyncIterator[TResponseStreamEvent]:
"""Get a streamed response from the model. Unimplemented."""
raise NotImplementedError()


class OpenAIAgentsPlugin(temporalio.client.Plugin, temporalio.worker.Plugin):
"""Temporal plugin for integrating OpenAI agents with Temporal workflows.

.. warning::
This class is experimental and may change in future versions.
Use with caution in production environments.

This plugin provides seamless integration between the OpenAI Agents SDK and
Temporal workflows. It automatically configures the necessary interceptors,
activities, and data converters to enable OpenAI agents to run within
Temporal workflows with proper tracing and model execution.

The plugin:
1. Configures the Pydantic data converter for type-safe serialization
2. Sets up tracing interceptors for OpenAI agent interactions
3. Registers model execution activities
4. Manages the OpenAI agent runtime overrides during worker execution

Args:
model_params: Configuration parameters for Temporal activity execution
of model calls. If None, default parameters will be used.
model_provider: Optional model provider for custom model implementations.
Useful for testing or custom model integrations.

Example:
>>> from temporalio.client import Client
>>> from temporalio.worker import Worker
>>> from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
>>> from datetime import timedelta
>>>
>>> # Configure model parameters
>>> model_params = ModelActivityParameters(
... start_to_close_timeout=timedelta(seconds=30),
... retry_policy=RetryPolicy(maximum_attempts=3)
... )
>>>
>>> # Create plugin
>>> plugin = OpenAIAgentsPlugin(model_params=model_params)
>>>
>>> # Use with client and worker
>>> client = await Client.connect(
... "localhost:7233",
... plugins=[plugin]
... )
>>> worker = Worker(
... client,
... task_queue="my-task-queue",
... workflows=[MyWorkflow],
... )
"""

def __init__(
self,
model_params: Optional[ModelActivityParameters] = None,
model_provider: Optional[ModelProvider] = None,
) -> None:
"""Initialize the OpenAI agents plugin.

Args:
model_params: Configuration parameters for Temporal activity execution
of model calls. If None, default parameters will be used.
model_provider: Optional model provider for custom model implementations.
Useful for testing or custom model integrations.
"""
self._model_params = model_params
self._model_provider = model_provider

def configure_client(self, config: ClientConfig) -> ClientConfig:
"""Configure the Temporal client for OpenAI agents integration.

This method sets up the Pydantic data converter to enable proper
serialization of OpenAI agent objects and responses.

Args:
config: The client configuration to modify.

Returns:
The modified client configuration.
"""
config["data_converter"] = pydantic_data_converter
return super().configure_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
"""Configure the Temporal worker for OpenAI agents integration.

This method adds the necessary interceptors and activities for OpenAI
agent execution:
- Adds tracing interceptors for OpenAI agent interactions
- Registers model execution activities

Args:
config: The worker configuration to modify.

Returns:
The modified worker configuration.
"""
config["interceptors"] = list(config.get("interceptors") or []) + [
OpenAIAgentsTracingInterceptor()
]
config["activities"] = list(config.get("activities") or []) + [
ModelActivity(self._model_provider).invoke_model_activity
]
return super().configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
"""Run the worker with OpenAI agents temporal overrides.

This method sets up the necessary runtime overrides for OpenAI agents
to work within the Temporal worker context, including custom runners
and trace providers.

Args:
worker: The worker instance to run.
"""
with set_open_ai_agent_temporal_overrides(self._model_params):
await super().run_worker(worker)
24 changes: 23 additions & 1 deletion temporalio/contrib/openai_agents/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,26 @@ async def run_operation(ctx: RunContextWrapper[Any], input: str) -> Any:


class ToolSerializationError(TemporalError):
"""Error that occurs when a tool output could not be serialized."""
"""Error that occurs when a tool output could not be serialized.

.. warning::
This exception is experimental and may change in future versions.
Use with caution in production environments.

This exception is raised when a tool (created from an activity or Nexus operation)
returns a value that cannot be properly serialized for use by the OpenAI agent.
All tool outputs must be convertible to strings for the agent to process them.

The error typically occurs when:
- A tool returns a complex object that doesn't have a meaningful string representation
- The returned object cannot be converted using str()
- Custom serialization is needed but not implemented

Example:
>>> @activity.defn
>>> def problematic_tool() -> ComplexObject:
... return ComplexObject() # This might cause ToolSerializationError

To fix this error, ensure your tool returns string-convertible values or
modify the tool to return a string representation of the result.
"""
Loading
Loading