Skip to content

Provide Span.Kind for TracingInterceptor #356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions temporalio/contrib/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,9 @@ def _start_as_current_span(
*,
attributes: opentelemetry.util.types.Attributes,
input: Optional[_InputWithHeaders] = None,
kind: opentelemetry.trace.SpanKind,
) -> Iterator[None]:
with self.tracer.start_as_current_span(name, attributes=attributes):
with self.tracer.start_as_current_span(name, attributes=attributes, kind=kind):
if input:
input.headers = self._context_to_headers(input.headers)
yield None
Expand Down Expand Up @@ -190,6 +191,7 @@ def _completed_workflow_span(
attributes=params.attributes,
links=links,
start_time=params.time_ns,
kind=params.kind,
)
context = opentelemetry.trace.set_span_in_context(span, context)
if params.exception:
Expand Down Expand Up @@ -218,6 +220,7 @@ async def start_workflow(
f"{prefix}:{input.workflow}",
attributes={"temporalWorkflowID": input.id},
input=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().start_workflow(input)

Expand All @@ -226,6 +229,7 @@ async def query_workflow(self, input: temporalio.client.QueryWorkflowInput) -> A
f"QueryWorkflow:{input.query}",
attributes={"temporalWorkflowID": input.id},
input=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().query_workflow(input)

Expand All @@ -236,6 +240,7 @@ async def signal_workflow(
f"SignalWorkflow:{input.signal}",
attributes={"temporalWorkflowID": input.id},
input=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
):
return await super().signal_workflow(input)

Expand All @@ -261,6 +266,7 @@ async def execute_activity(
"temporalRunID": info.workflow_run_id,
"temporalActivityID": info.activity_id,
},
kind=opentelemetry.trace.SpanKind.SERVER,
):
return await super().execute_activity(input)

Expand All @@ -283,6 +289,7 @@ class _CompletedWorkflowSpanParams:
time_ns: int
link_context: Optional[_CarrierDict]
exception: Optional[Exception]
kind: opentelemetry.trace.SpanKind


_interceptor_context_key = opentelemetry.context.create_key(
Expand Down Expand Up @@ -334,8 +341,10 @@ async def execute_workflow(
:py:meth:`temporalio.worker.WorkflowInboundInterceptor.execute_workflow`.
"""
with self._top_level_workflow_context(success_is_complete=True):
# Entrypoint of workflow should be `server` in OTel
self._completed_span(
f"RunWorkflow:{temporalio.workflow.info().workflow_type}"
f"RunWorkflow:{temporalio.workflow.info().workflow_type}",
kind=opentelemetry.trace.SpanKind.SERVER,
)
return await super().execute_workflow(input)

Expand All @@ -355,6 +364,7 @@ async def handle_signal(self, input: temporalio.worker.HandleSignalInput) -> Non
self._completed_span(
f"HandleSignal:{input.signal}",
link_context_carrier=link_context_carrier,
kind=opentelemetry.trace.SpanKind.SERVER,
)
await super().handle_signal(input)

Expand Down Expand Up @@ -388,6 +398,7 @@ async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any:
link_context_carrier=link_context_carrier,
# Create even on replay for queries
new_span_even_on_replay=True,
kind=opentelemetry.trace.SpanKind.SERVER,
)
return await super().handle_query(input)
finally:
Expand Down Expand Up @@ -437,6 +448,7 @@ def _top_level_workflow_context(
self._completed_span(
f"CompleteWorkflow:{temporalio.workflow.info().workflow_type}",
exception=exception,
kind=opentelemetry.trace.SpanKind.INTERNAL,
)
opentelemetry.context.detach(token)

Expand Down Expand Up @@ -468,6 +480,7 @@ def _completed_span(
new_span_even_on_replay: bool = False,
additional_attributes: opentelemetry.util.types.Attributes = None,
exception: Optional[Exception] = None,
kind: opentelemetry.trace.SpanKind = opentelemetry.trace.SpanKind.INTERNAL,
) -> None:
# If there is no span on the context, we do not create a span
if opentelemetry.trace.get_current_span() is opentelemetry.trace.INVALID_SPAN:
Expand Down Expand Up @@ -499,6 +512,7 @@ def _completed_span(
time_ns=temporalio.workflow.time_ns(),
link_context=link_context_carrier,
exception=exception,
kind=kind,
)
)

Expand Down Expand Up @@ -535,7 +549,9 @@ async def signal_child_workflow(
) -> None:
# Create new span and put on outbound input
self.root._completed_span(
f"SignalChildWorkflow:{input.signal}", add_to_outbound=input
f"SignalChildWorkflow:{input.signal}",
add_to_outbound=input,
kind=opentelemetry.trace.SpanKind.SERVER,
)
await super().signal_child_workflow(input)

Expand All @@ -544,7 +560,9 @@ async def signal_external_workflow(
) -> None:
# Create new span and put on outbound input
self.root._completed_span(
f"SignalExternalWorkflow:{input.signal}", add_to_outbound=input
f"SignalExternalWorkflow:{input.signal}",
add_to_outbound=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
)
await super().signal_external_workflow(input)

Expand All @@ -553,7 +571,9 @@ def start_activity(
) -> temporalio.workflow.ActivityHandle:
# Create new span and put on outbound input
self.root._completed_span(
f"StartActivity:{input.activity}", add_to_outbound=input
f"StartActivity:{input.activity}",
add_to_outbound=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
)
return super().start_activity(input)

Expand All @@ -562,7 +582,9 @@ async def start_child_workflow(
) -> temporalio.workflow.ChildWorkflowHandle:
# Create new span and put on outbound input
self.root._completed_span(
f"StartChildWorkflow:{input.workflow}", add_to_outbound=input
f"StartChildWorkflow:{input.workflow}",
add_to_outbound=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
)
return await super().start_child_workflow(input)

Expand All @@ -571,7 +593,9 @@ def start_local_activity(
) -> temporalio.workflow.ActivityHandle:
# Create new span and put on outbound input
self.root._completed_span(
f"StartActivity:{input.activity}", add_to_outbound=input
f"StartActivity:{input.activity}",
add_to_outbound=input,
kind=opentelemetry.trace.SpanKind.CLIENT,
)
return super().start_local_activity(input)

Expand Down