-
Notifications
You must be signed in to change notification settings - Fork 40
Clean orphaned traces #2330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Clean orphaned traces #2330
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
/* | ||
* Copyright Splunk Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.splunk.opentelemetry.profiler.snapshot; | ||
|
||
import io.opentelemetry.api.trace.Span; | ||
import io.opentelemetry.api.trace.SpanContext; | ||
import java.io.Closeable; | ||
import java.lang.ref.ReferenceQueue; | ||
import java.lang.ref.WeakReference; | ||
import java.util.Collections; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Clear snapshot profiling info for traces where root span was not ended. If there is a bug in the | ||
* instrumentation that causes the root span that triggered profiling not to be ended we'll end up | ||
* leaking the trace id in the trace registry and the stack sampler will continue running. To reduce | ||
* the impact of such instrumentation bugs this class triggers cleanup when the root span for which | ||
* the profiling was started is garbage collected. | ||
*/ | ||
class OrphanedTraceCleaner implements Closeable { | ||
private final Set<Key> traces = Collections.newSetFromMap(new ConcurrentHashMap<>()); | ||
private final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>(); | ||
|
||
private final TraceRegistry traceRegistry; | ||
private final Supplier<StackTraceSampler> sampler; | ||
private final Thread thread; | ||
|
||
OrphanedTraceCleaner(TraceRegistry traceRegistry, Supplier<StackTraceSampler> sampler) { | ||
this.traceRegistry = traceRegistry; | ||
this.sampler = sampler; | ||
|
||
thread = new Thread(this::unregisterOrphanedTraces); | ||
thread.setName("orphaned-trace-cleaner"); | ||
thread.setDaemon(true); | ||
thread.start(); | ||
} | ||
|
||
void register(Span span) { | ||
traces.add(new WeakKey(span, referenceQueue)); | ||
} | ||
|
||
void unregister(SpanContext spanContext) { | ||
traces.remove(new LookupKey(spanContext)); | ||
} | ||
|
||
// visible for tests | ||
int getTracesSize() { | ||
return traces.size(); | ||
} | ||
|
||
private void unregisterOrphanedTraces() { | ||
try { | ||
while (!Thread.interrupted()) { | ||
Object reference = referenceQueue.remove(); | ||
if (reference != null) { | ||
Key key = (Key) reference; | ||
if (traces.remove(key)) { | ||
traceRegistry.unregister(key.getTraceId()); | ||
sampler.get().stop(key.getTraceId(), key.getSpanId()); | ||
} | ||
} | ||
} | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
thread.interrupt(); | ||
} | ||
|
||
private interface Key { | ||
String getTraceId(); | ||
|
||
String getSpanId(); | ||
} | ||
|
||
private static class LookupKey implements Key { | ||
private final SpanContext spanContext; | ||
|
||
private LookupKey(SpanContext spanContext) { | ||
this.spanContext = spanContext; | ||
} | ||
|
||
@Override | ||
public String getTraceId() { | ||
return spanContext.getTraceId(); | ||
} | ||
|
||
@Override | ||
public String getSpanId() { | ||
return spanContext.getSpanId(); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(getTraceId(), getSpanId()); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (obj == this) { | ||
return true; | ||
} | ||
if (!(obj instanceof Key)) { | ||
return false; | ||
} | ||
Key key = (Key) obj; | ||
return Objects.equals(getTraceId(), key.getTraceId()) | ||
&& Objects.equals(getSpanId(), key.getSpanId()); | ||
} | ||
} | ||
|
||
private static class WeakKey extends WeakReference<Span> implements Key { | ||
private final String traceId; | ||
private final String spanId; | ||
|
||
public WeakKey(Span referent, ReferenceQueue<Object> queue) { | ||
super(referent, queue); | ||
this.traceId = referent.getSpanContext().getTraceId(); | ||
this.spanId = referent.getSpanContext().getSpanId(); | ||
} | ||
|
||
@Override | ||
public String getTraceId() { | ||
return traceId; | ||
} | ||
|
||
@Override | ||
public String getSpanId() { | ||
return spanId; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(getTraceId(), getSpanId()); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (obj == this) { | ||
return true; | ||
} | ||
if (!(obj instanceof Key)) { | ||
return false; | ||
} | ||
Key key = (Key) obj; | ||
return Objects.equals(getTraceId(), key.getTraceId()) | ||
&& Objects.equals(getSpanId(), key.getSpanId()); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
import static com.splunk.opentelemetry.profiler.ProfilingSemanticAttributes.SNAPSHOT_PROFILING; | ||
|
||
import io.opentelemetry.context.Context; | ||
import io.opentelemetry.sdk.common.CompletableResultCode; | ||
import io.opentelemetry.sdk.trace.ReadWriteSpan; | ||
import io.opentelemetry.sdk.trace.ReadableSpan; | ||
import io.opentelemetry.sdk.trace.SpanProcessor; | ||
|
@@ -38,10 +39,12 @@ | |
public class SnapshotProfilingSpanProcessor implements SpanProcessor { | ||
private final TraceRegistry registry; | ||
private final Supplier<StackTraceSampler> sampler; | ||
private final OrphanedTraceCleaner orphanedTraceCleaner; | ||
|
||
SnapshotProfilingSpanProcessor(TraceRegistry registry, Supplier<StackTraceSampler> sampler) { | ||
this.registry = registry; | ||
this.sampler = sampler; | ||
this.orphanedTraceCleaner = new OrphanedTraceCleaner(registry, sampler); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: inject it or make a factory method, because this violates DI principles. |
||
} | ||
|
||
@Override | ||
|
@@ -50,6 +53,7 @@ public void onStart(Context context, ReadWriteSpan span) { | |
Volume volume = Volume.from(context); | ||
if (volume == Volume.HIGHEST) { | ||
registry.register(span.getSpanContext()); | ||
orphanedTraceCleaner.register(span); | ||
} | ||
} | ||
|
||
|
@@ -75,6 +79,7 @@ public boolean isStartRequired() { | |
public void onEnd(ReadableSpan span) { | ||
if (isEntry(span)) { | ||
registry.unregister(span.getSpanContext()); | ||
orphanedTraceCleaner.unregister(span.getSpanContext()); | ||
sampler.get().stop(span.getSpanContext()); | ||
} | ||
} | ||
|
@@ -87,4 +92,10 @@ private boolean isEntry(ReadableSpan span) { | |
public boolean isEndRequired() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public CompletableResultCode shutdown() { | ||
orphanedTraceCleaner.close(); | ||
return SpanProcessor.super.shutdown(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,15 +24,27 @@ | |
class TraceRegistry { | ||
private final Set<String> traceIds = Collections.newSetFromMap(new ConcurrentHashMap<>()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this make the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, it does |
||
|
||
public void register(SpanContext spanContext) { | ||
traceIds.add(spanContext.getTraceId()); | ||
public final void register(SpanContext spanContext) { | ||
register(spanContext.getTraceId()); | ||
} | ||
|
||
public boolean isRegistered(SpanContext spanContext) { | ||
return traceIds.contains(spanContext.getTraceId()); | ||
public void register(String traceId) { | ||
traceIds.add(traceId); | ||
} | ||
|
||
public void unregister(SpanContext spanContext) { | ||
traceIds.remove(spanContext.getTraceId()); | ||
public final boolean isRegistered(SpanContext spanContext) { | ||
return isRegistered(spanContext.getTraceId()); | ||
} | ||
|
||
public boolean isRegistered(String traceId) { | ||
return traceIds.contains(traceId); | ||
} | ||
|
||
public final void unregister(SpanContext spanContext) { | ||
unregister(spanContext.getTraceId()); | ||
} | ||
|
||
public void unregister(String traceId) { | ||
traceIds.remove(traceId); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright Splunk Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.splunk.opentelemetry.profiler.snapshot; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.awaitility.Awaitility.await; | ||
|
||
import io.opentelemetry.api.trace.Span; | ||
import io.opentelemetry.instrumentation.test.utils.GcUtils; | ||
import java.lang.ref.WeakReference; | ||
import java.time.Duration; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.Test; | ||
|
||
public class OrphanedTraceCleanerTest { | ||
private final TraceRegistry registry = new TraceRegistry(); | ||
private final ObservableStackTraceSampler sampler = new ObservableStackTraceSampler(); | ||
private final OrphanedTraceCleaner cleaner = new OrphanedTraceCleaner(registry, () -> sampler); | ||
|
||
@AfterEach | ||
void teardown() { | ||
cleaner.close(); | ||
} | ||
|
||
@Test | ||
void registerAndUnregister() { | ||
var spanContext = Snapshotting.spanContext().build(); | ||
var span = Span.wrap(spanContext); | ||
cleaner.register(span); | ||
|
||
assertThat(cleaner.getTracesSize()).isEqualTo(1); | ||
|
||
cleaner.unregister(spanContext); | ||
|
||
assertThat(cleaner.getTracesSize()).isEqualTo(0); | ||
} | ||
|
||
@Test | ||
void unregisterOrphanedTraces() throws Exception { | ||
var spanContext = Snapshotting.spanContext().build(); | ||
registry.register(spanContext); | ||
var span = Span.wrap(spanContext); | ||
cleaner.register(span); | ||
|
||
var spanReference = new WeakReference<>(span); | ||
span = null; | ||
GcUtils.awaitGc(spanReference, Duration.ofSeconds(10)); | ||
|
||
await().untilAsserted(() -> assertThat(registry.isRegistered(spanContext)).isFalse()); | ||
} | ||
|
||
@Test | ||
void stopSamplingForOrphanedTraces() throws Exception { | ||
var spanContext = Snapshotting.spanContext().build(); | ||
registry.register(spanContext); | ||
sampler.start(spanContext); | ||
var span = Span.wrap(spanContext); | ||
cleaner.register(span); | ||
|
||
assertThat(sampler.isBeingSampled(spanContext)).isTrue(); | ||
|
||
var spanReference = new WeakReference<>(span); | ||
span = null; | ||
GcUtils.awaitGc(spanReference, Duration.ofSeconds(10)); | ||
|
||
await().untilAsserted(() -> assertThat(sampler.isBeingSampled(spanContext)).isFalse()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I grok the reasons why, but the asymmetry between
register(span)
andunregister(context)
gives me pause. 🤷🏻