Skip to content

Clean orphaned traces #2330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright Splunk Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.splunk.opentelemetry.profiler.snapshot;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import java.io.Closeable;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/**
* Clear snapshot profiling info for traces where root span was not ended. If there is a bug in the
* instrumentation that causes the root span that triggered profiling not to be ended we'll end up
* leaking the trace id in the trace registry and the stack sampler will continue running. To reduce
* the impact of such instrumentation bugs this class triggers cleanup when the root span for which
* the profiling was started is garbage collected.
*/
class OrphanedTraceCleaner implements Closeable {
private final Set<Key> traces = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();

private final TraceRegistry traceRegistry;
private final Supplier<StackTraceSampler> sampler;
private final Thread thread;

OrphanedTraceCleaner(TraceRegistry traceRegistry, Supplier<StackTraceSampler> sampler) {
this.traceRegistry = traceRegistry;
this.sampler = sampler;

thread = new Thread(this::unregisterOrphanedTraces);
thread.setName("orphaned-trace-cleaner");
thread.setDaemon(true);
thread.start();
}

void register(Span span) {
traces.add(new WeakKey(span, referenceQueue));
}

void unregister(SpanContext spanContext) {
traces.remove(new LookupKey(spanContext));
}
Comment on lines +55 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I grok the reasons why, but the asymmetry between register(span) and unregister(context) gives me pause. 🤷🏻


// visible for tests
int getTracesSize() {
return traces.size();
}

private void unregisterOrphanedTraces() {
try {
while (!Thread.interrupted()) {
Object reference = referenceQueue.remove();
if (reference != null) {
Key key = (Key) reference;
if (traces.remove(key)) {
traceRegistry.unregister(key.getTraceId());
sampler.get().stop(key.getTraceId(), key.getSpanId());
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@Override
public void close() {
thread.interrupt();
}

private interface Key {
String getTraceId();

String getSpanId();
}

private static class LookupKey implements Key {
private final SpanContext spanContext;

private LookupKey(SpanContext spanContext) {
this.spanContext = spanContext;
}

@Override
public String getTraceId() {
return spanContext.getTraceId();
}

@Override
public String getSpanId() {
return spanContext.getSpanId();
}

@Override
public int hashCode() {
return Objects.hash(getTraceId(), getSpanId());
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof Key)) {
return false;
}
Key key = (Key) obj;
return Objects.equals(getTraceId(), key.getTraceId())
&& Objects.equals(getSpanId(), key.getSpanId());
}
}

private static class WeakKey extends WeakReference<Span> implements Key {
private final String traceId;
private final String spanId;

public WeakKey(Span referent, ReferenceQueue<Object> queue) {
super(referent, queue);
this.traceId = referent.getSpanContext().getTraceId();
this.spanId = referent.getSpanContext().getSpanId();
}

@Override
public String getTraceId() {
return traceId;
}

@Override
public String getSpanId() {
return spanId;
}

@Override
public int hashCode() {
return Objects.hash(getTraceId(), getSpanId());
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof Key)) {
return false;
}
Key key = (Key) obj;
return Objects.equals(getTraceId(), key.getTraceId())
&& Objects.equals(getSpanId(), key.getSpanId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ public void start(SpanContext spanContext) {
}

@Override
public void stop(SpanContext spanContext) {
public void stop(String traceId, String spanId) {
samplers.computeIfPresent(
spanContext.getTraceId(),
(traceId, sampler) -> {
if (spanContext.equals(sampler.getSpanContext())) {
traceId,
(unused, sampler) -> {
if (spanId.equals(sampler.getSpanContext().getSpanId())) {
sampler.shutdown();
waitForShutdown(sampler);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.splunk.opentelemetry.profiler.ProfilingSemanticAttributes.SNAPSHOT_PROFILING;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
Expand All @@ -38,10 +39,12 @@
public class SnapshotProfilingSpanProcessor implements SpanProcessor {
private final TraceRegistry registry;
private final Supplier<StackTraceSampler> sampler;
private final OrphanedTraceCleaner orphanedTraceCleaner;

SnapshotProfilingSpanProcessor(TraceRegistry registry, Supplier<StackTraceSampler> sampler) {
this.registry = registry;
this.sampler = sampler;
this.orphanedTraceCleaner = new OrphanedTraceCleaner(registry, sampler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inject it or make a factory method, because this violates DI principles.

}

@Override
Expand All @@ -50,6 +53,7 @@ public void onStart(Context context, ReadWriteSpan span) {
Volume volume = Volume.from(context);
if (volume == Volume.HIGHEST) {
registry.register(span.getSpanContext());
orphanedTraceCleaner.register(span);
}
}

Expand All @@ -75,6 +79,7 @@ public boolean isStartRequired() {
public void onEnd(ReadableSpan span) {
if (isEntry(span)) {
registry.unregister(span.getSpanContext());
orphanedTraceCleaner.unregister(span.getSpanContext());
sampler.get().stop(span.getSpanContext());
}
}
Expand All @@ -87,4 +92,10 @@ private boolean isEntry(ReadableSpan span) {
public boolean isEndRequired() {
return true;
}

@Override
public CompletableResultCode shutdown() {
orphanedTraceCleaner.close();
return SpanProcessor.super.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ interface StackTraceSampler extends Closeable {
public void start(SpanContext spanContext) {}

@Override
public void stop(SpanContext spanContext) {}
public void stop(String traceId, String spanId) {}
};
ConfigurableSupplier<StackTraceSampler> SUPPLIER = new ConfigurableSupplier<>(NOOP);

void start(SpanContext spanContext);

void stop(SpanContext spanContext);
default void stop(SpanContext spanContext) {
stop(spanContext.getTraceId(), spanContext.getSpanId());
}

void stop(String traceId, String spanId);

default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,27 @@
class TraceRegistry {
private final Set<String> traceIds = Collections.newSetFromMap(new ConcurrentHashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this make the Set also concurrent/threadsafe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it does


public void register(SpanContext spanContext) {
traceIds.add(spanContext.getTraceId());
public final void register(SpanContext spanContext) {
register(spanContext.getTraceId());
}

public boolean isRegistered(SpanContext spanContext) {
return traceIds.contains(spanContext.getTraceId());
public void register(String traceId) {
traceIds.add(traceId);
}

public void unregister(SpanContext spanContext) {
traceIds.remove(spanContext.getTraceId());
public final boolean isRegistered(SpanContext spanContext) {
return isRegistered(spanContext.getTraceId());
}

public boolean isRegistered(String traceId) {
return traceIds.contains(traceId);
}

public final void unregister(SpanContext spanContext) {
unregister(spanContext.getTraceId());
}

public void unregister(String traceId) {
traceIds.remove(traceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public void start(SpanContext spanContext) {
}

@Override
public void stop(SpanContext spanContext) {
traceIds.remove(spanContext.getTraceId());
public void stop(String traceId, String spanId) {
traceIds.remove(traceId);
}

boolean isBeingSampled(SpanContext spanContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Splunk Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.splunk.opentelemetry.profiler.snapshot;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.instrumentation.test.utils.GcUtils;
import java.lang.ref.WeakReference;
import java.time.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

public class OrphanedTraceCleanerTest {
private final TraceRegistry registry = new TraceRegistry();
private final ObservableStackTraceSampler sampler = new ObservableStackTraceSampler();
private final OrphanedTraceCleaner cleaner = new OrphanedTraceCleaner(registry, () -> sampler);

@AfterEach
void teardown() {
cleaner.close();
}

@Test
void registerAndUnregister() {
var spanContext = Snapshotting.spanContext().build();
var span = Span.wrap(spanContext);
cleaner.register(span);

assertThat(cleaner.getTracesSize()).isEqualTo(1);

cleaner.unregister(spanContext);

assertThat(cleaner.getTracesSize()).isEqualTo(0);
}

@Test
void unregisterOrphanedTraces() throws Exception {
var spanContext = Snapshotting.spanContext().build();
registry.register(spanContext);
var span = Span.wrap(spanContext);
cleaner.register(span);

var spanReference = new WeakReference<>(span);
span = null;
GcUtils.awaitGc(spanReference, Duration.ofSeconds(10));

await().untilAsserted(() -> assertThat(registry.isRegistered(spanContext)).isFalse());
}

@Test
void stopSamplingForOrphanedTraces() throws Exception {
var spanContext = Snapshotting.spanContext().build();
registry.register(spanContext);
sampler.start(spanContext);
var span = Span.wrap(spanContext);
cleaner.register(span);

assertThat(sampler.isBeingSampled(spanContext)).isTrue();

var spanReference = new WeakReference<>(span);
span = null;
GcUtils.awaitGc(spanReference, Duration.ofSeconds(10));

await().untilAsserted(() -> assertThat(sampler.isBeingSampled(spanContext)).isFalse());
}
}
Loading