Skip to content

Commit c05c5b3

Browse files
authored
Port span correlation (#2251)
* Add a custom ContextStorage to track the currently active span for traces selected for snapshotting. * Configure active span tracking in SnapshotProfilingSdkCustomizer. * Add the SpanTracker interface. * Include the active span details on collected stack traces when available. * Activate span stracking in SnapshotProfilingSdkCustomizer. * Remove missed debugging statements in test. * Remove unnecessary log record span context setting from InMemoryOtelLogger. * Include the currently active span ID on collected stack traces. * Introduce the SpanTrackingActivator interface to add a mechanism for tests to reliably inject the snapshot profiling active span tracker into the OpenTelemetry SDK runtime. * Introduce a SpanTrackerProvider to remove the need for ActiveSpanTracker to be implemented as a Singleton. * Remove unnecessary 'null' check in ActiveSpanTracker. * Add tests for InterceptingContextStorageSpanTrackingActivator. * Add tests for SpanTrackerProvider. * Consider whether a span is sampled, not recording, when deciding whether or not to track it. * Remove unnecessary fake ContextStorage in ActiveSpanTrackerTest. * Add a builder subclass for FakeSpan in ActiveSpanTrackerTest. * Move SpanContext builder into Snapshotting test helper class. * Remove helper method for generating random trace IDs from the Snapshotting class in favor of direct use of OpenTelemetry's IdGenerator. * Restore unnecessary changes to InMemoryOtelLogger. * Remove FakeSpan builder in ActiveSpanTrackerTest in favor of the SpanContextBuilder obtained from Snapshotting. * Add easier way to build SpanContext instances using trace IDs from other spans. * Do not use traceId from the active span tracker when creating StackTrace instances so to always have a trace ID associated even if span tracking is offline. * Remove the possibility that the SpanTracker in SpanTrackerProvider can ever be null. * Add ActiveSpanTrackerTest asserting that the active span is restored to a parent span upon scope closing. * Renaming class. * Expect span ID on exported logs. * Apply spotless code formatting. * Track active spans per thread rather than trace ID to account for traces that span multiple threads. * Replace FakeSpan in ActiveSpanTrackerTest with Span.wrap. * Add ActiveSpanTracker test to ensure that invalid spans are not tracked. * Track active spans by Thread rather than thread ID and use a map of weak reference keys in ActiveSpanTracker to ensure than map entries are automatically removed if there are a thread terminates without the OpenTelemetry scope being closed properly. * Add a CountDownLatch to allow tests to explicitly release background tasks to ensure those tasks execute on multiple threads. * Apply spotless code formatting.
1 parent 7bb828a commit c05c5b3

22 files changed

+926
-124
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
import io.opentelemetry.api.trace.Span;
20+
import io.opentelemetry.api.trace.SpanContext;
21+
import io.opentelemetry.context.Context;
22+
import io.opentelemetry.context.ContextStorage;
23+
import io.opentelemetry.context.Scope;
24+
import io.opentelemetry.instrumentation.api.internal.cache.Cache;
25+
import java.util.Optional;
26+
import javax.annotation.Nullable;
27+
28+
class ActiveSpanTracker implements ContextStorage, SpanTracker {
29+
private final Cache<Thread, SpanContext> cache = Cache.weak();
30+
31+
private final ContextStorage delegate;
32+
private final TraceRegistry registry;
33+
34+
ActiveSpanTracker(ContextStorage delegate, TraceRegistry registry) {
35+
this.delegate = delegate;
36+
this.registry = registry;
37+
}
38+
39+
@Override
40+
public Scope attach(Context toAttach) {
41+
Scope scope = delegate.attach(toAttach);
42+
SpanContext newSpanContext = Span.fromContext(toAttach).getSpanContext();
43+
if (doNotTrack(newSpanContext)) {
44+
return scope;
45+
}
46+
47+
Thread thread = Thread.currentThread();
48+
SpanContext oldSpanContext = cache.get(thread);
49+
if (oldSpanContext == newSpanContext) {
50+
return scope;
51+
}
52+
53+
cache.put(thread, newSpanContext);
54+
return () -> {
55+
if (oldSpanContext != null) {
56+
cache.put(thread, oldSpanContext);
57+
} else {
58+
cache.remove(thread);
59+
}
60+
scope.close();
61+
};
62+
}
63+
64+
private boolean doNotTrack(SpanContext spanContext) {
65+
return !spanContext.isSampled() || !registry.isRegistered(spanContext);
66+
}
67+
68+
@Nullable
69+
@Override
70+
public Context current() {
71+
return delegate.current();
72+
}
73+
74+
public Optional<SpanContext> getActiveSpan(Thread thread) {
75+
return Optional.ofNullable(cache.get(thread));
76+
}
77+
}

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/AsyncStackTraceExporter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private Runnable pprofExporter(Logger otelLogger, List<StackTrace> stackTraces)
6565
stackTrace.getStackFrames(),
6666
stackTrace.getTimestamp(),
6767
stackTrace.getTraceId(),
68-
null);
68+
stackTrace.getSpanId());
6969
}
7070
cpuEventExporter.flush();
7171
} catch (Exception e) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
import com.google.common.annotations.VisibleForTesting;
20+
import io.opentelemetry.context.ContextStorage;
21+
import java.util.function.Consumer;
22+
import java.util.function.UnaryOperator;
23+
24+
class InterceptingContextStorageSpanTrackingActivator implements SpanTrackingActivator {
25+
private final Consumer<UnaryOperator<ContextStorage>> contextStorageWrappingFunction;
26+
27+
InterceptingContextStorageSpanTrackingActivator() {
28+
this(ContextStorage::addWrapper);
29+
}
30+
31+
@VisibleForTesting
32+
InterceptingContextStorageSpanTrackingActivator(
33+
Consumer<UnaryOperator<ContextStorage>> contextStorageWrappingFunction) {
34+
this.contextStorageWrappingFunction = contextStorageWrappingFunction;
35+
}
36+
37+
@Override
38+
public void activate(TraceRegistry registry) {
39+
contextStorageWrappingFunction.accept(
40+
contextStorage -> {
41+
ActiveSpanTracker tracker = new ActiveSpanTracker(contextStorage, registry);
42+
SpanTrackerProvider.INSTANCE.configure(tracker);
43+
return tracker;
44+
});
45+
}
46+
}

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSampler.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,13 @@ class ScheduledExecutorStackTraceSampler implements StackTraceSampler {
4040
new ConcurrentHashMap<>();
4141
private final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
4242
private final StagingArea stagingArea;
43+
private final Supplier<SpanTracker> spanTracker;
4344
private final Duration samplingPeriod;
4445

45-
ScheduledExecutorStackTraceSampler(StagingArea stagingArea, Duration samplingPeriod) {
46+
ScheduledExecutorStackTraceSampler(
47+
StagingArea stagingArea, Supplier<SpanTracker> spanTracker, Duration samplingPeriod) {
4648
this.stagingArea = stagingArea;
49+
this.spanTracker = spanTracker;
4750
this.samplingPeriod = samplingPeriod;
4851
}
4952

@@ -55,7 +58,7 @@ public void start(SpanContext spanContext) {
5558
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
5659
scheduler.scheduleAtFixedRate(
5760
new StackTraceGatherer(
58-
samplingPeriod, spanContext.getTraceId(), Thread.currentThread().getId()),
61+
samplingPeriod, spanContext.getTraceId(), Thread.currentThread()),
5962
SCHEDULER_INITIAL_DELAY,
6063
samplingPeriod.toMillis(),
6164
TimeUnit.MILLISECONDS);
@@ -75,26 +78,32 @@ public void stop(SpanContext spanContext) {
7578
class StackTraceGatherer implements Runnable {
7679
private final Duration samplingPeriod;
7780
private final String traceId;
78-
private final long threadId;
81+
private final Thread thread;
7982

80-
StackTraceGatherer(Duration samplingPeriod, String traceId, long threadId) {
83+
StackTraceGatherer(Duration samplingPeriod, String traceId, Thread thread) {
8184
this.samplingPeriod = samplingPeriod;
8285
this.traceId = traceId;
83-
this.threadId = threadId;
86+
this.thread = thread;
8487
}
8588

8689
@Override
8790
public void run() {
8891
try {
8992
Instant now = Instant.now();
90-
ThreadInfo threadInfo = threadMXBean.getThreadInfo(threadId, Integer.MAX_VALUE);
91-
StackTrace stackTrace = StackTrace.from(now, samplingPeriod, traceId, threadInfo);
93+
ThreadInfo threadInfo = threadMXBean.getThreadInfo(thread.getId(), Integer.MAX_VALUE);
94+
SpanContext spanContext = retrieveActiveSpan(thread);
95+
StackTrace stackTrace =
96+
StackTrace.from(now, samplingPeriod, threadInfo, traceId, spanContext.getSpanId());
9297
stagingArea.stage(traceId, stackTrace);
9398
} catch (Exception e) {
94-
logger.log(Level.SEVERE, e, samplerErrorMessage(traceId, threadId));
99+
logger.log(Level.SEVERE, e, samplerErrorMessage(traceId, thread.getId()));
95100
}
96101
}
97102

103+
private SpanContext retrieveActiveSpan(Thread thread) {
104+
return spanTracker.get().getActiveSpan(thread).orElse(SpanContext.getInvalid());
105+
}
106+
98107
private Supplier<String> samplerErrorMessage(String traceId, long threadId) {
99108
return () ->
100109
"Exception thrown attempting to stage callstacks for trace ID ' "

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSdkCustomizer.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,35 +35,46 @@
3535
public class SnapshotProfilingSdkCustomizer implements AutoConfigurationCustomizerProvider {
3636
private final TraceRegistry registry;
3737
private final Function<ConfigProperties, StackTraceSampler> samplerProvider;
38+
private final SpanTrackingActivator spanTrackingActivator;
3839

3940
public SnapshotProfilingSdkCustomizer() {
40-
this(new TraceRegistry(), stackTraceSamplerProvider());
41+
this(
42+
new TraceRegistry(),
43+
stackTraceSamplerProvider(),
44+
new InterceptingContextStorageSpanTrackingActivator());
4145
}
4246

4347
private static Function<ConfigProperties, StackTraceSampler> stackTraceSamplerProvider() {
4448
return properties -> {
4549
Duration samplingPeriod = Configuration.getSnapshotProfilerSamplingInterval(properties);
4650
return new ScheduledExecutorStackTraceSampler(
47-
new AccumulatingStagingArea(StackTraceExporterProvider.INSTANCE), samplingPeriod);
51+
new AccumulatingStagingArea(StackTraceExporterProvider.INSTANCE),
52+
SpanTrackerProvider.INSTANCE,
53+
samplingPeriod);
4854
};
4955
}
5056

5157
@VisibleForTesting
52-
SnapshotProfilingSdkCustomizer(TraceRegistry registry, StackTraceSampler samplerProvider) {
53-
this(registry, properties -> samplerProvider);
58+
SnapshotProfilingSdkCustomizer(
59+
TraceRegistry registry, StackTraceSampler sampler, SpanTrackingActivator activator) {
60+
this(registry, properties -> sampler, activator);
5461
}
5562

5663
private SnapshotProfilingSdkCustomizer(
57-
TraceRegistry registry, Function<ConfigProperties, StackTraceSampler> samplerProvider) {
64+
TraceRegistry registry,
65+
Function<ConfigProperties, StackTraceSampler> samplerProvider,
66+
SpanTrackingActivator spanTrackingActivator) {
5867
this.registry = registry;
5968
this.samplerProvider = samplerProvider;
69+
this.spanTrackingActivator = spanTrackingActivator;
6070
}
6171

6272
@Override
6373
public void customize(AutoConfigurationCustomizer autoConfigurationCustomizer) {
6474
autoConfigurationCustomizer
6575
.addPropertiesCustomizer(autoConfigureSnapshotVolumePropagator())
66-
.addTracerProviderCustomizer(snapshotProfilingSpanProcessor(registry));
76+
.addTracerProviderCustomizer(snapshotProfilingSpanProcessor(registry))
77+
.addPropertiesCustomizer(startTrackingActiveSpans(registry));
6778
}
6879

6980
private BiFunction<SdkTracerProviderBuilder, ConfigProperties, SdkTracerProviderBuilder>
@@ -111,6 +122,16 @@ private boolean includeTraceContextPropagator(Set<String> configuredPropagators)
111122
return configuredPropagators.isEmpty();
112123
}
113124

125+
private Function<ConfigProperties, Map<String, String>> startTrackingActiveSpans(
126+
TraceRegistry registry) {
127+
return properties -> {
128+
if (snapshotProfilingEnabled(properties)) {
129+
spanTrackingActivator.activate(registry);
130+
}
131+
return Collections.emptyMap();
132+
};
133+
}
134+
114135
private boolean snapshotProfilingEnabled(ConfigProperties properties) {
115136
return Configuration.isSnapshotProfilingEnabled(properties);
116137
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
import io.opentelemetry.api.trace.SpanContext;
20+
import java.util.Optional;
21+
22+
interface SpanTracker {
23+
SpanTracker NOOP = thread -> Optional.empty();
24+
25+
Optional<SpanContext> getActiveSpan(Thread thread);
26+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
import java.util.Objects;
20+
import java.util.function.Supplier;
21+
22+
class SpanTrackerProvider implements Supplier<SpanTracker> {
23+
public static final SpanTrackerProvider INSTANCE = new SpanTrackerProvider();
24+
25+
private SpanTracker tracker = SpanTracker.NOOP;
26+
27+
@Override
28+
public SpanTracker get() {
29+
return tracker;
30+
}
31+
32+
void configure(SpanTracker tracker) {
33+
this.tracker = Objects.requireNonNull(tracker);
34+
}
35+
36+
private SpanTrackerProvider() {}
37+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright Splunk Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.splunk.opentelemetry.profiler.snapshot;
18+
19+
interface SpanTrackingActivator {
20+
void activate(TraceRegistry registry);
21+
}

0 commit comments

Comments
 (0)