Skip to content

Commit 8cc2954

Browse files
authored
[exporterhelper] Preserve client metadata in the persistent queue (#13220)
Preserve client metadata key values in persistent queue. It allows request client metadata to propagate through the persistent queue used by the exporters. The same way as it's done for the in-memory queue. Currently, it is behind the exporter.PersistRequestContext feature gate, which can be enabled by adding `--feature-gates=exporter.PersistRequestContext` to the collector command line. An exporter buffer stored by a previous version of the collector (or by a collector with the feature gate disabled) can be read by a newer collector with the feature enabled. However, the reverse is not supported: a buffer stored by a newer collector with the feature enabled cannot be read by an older collector (or by a collector with the feature gate disabled). Resolves #10110, #11780 and open-telemetry/opentelemetry-collector-contrib#38666 Follow-up work: To fully propagate the client context, we also add support for serializing remaining `client.Client` fields: ``` Addr net.Addr Auth AuthData ``` Then, we probably want to make this configurable or rely on the transform processor to be able to manipulate the context
1 parent 8af46bc commit 8cc2954

File tree

13 files changed

+244
-98
lines changed

13 files changed

+244
-98
lines changed

.chloggen/persist-request-context.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,17 @@ change_type: enhancement
55
component: exporterhelper
66

77
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8-
note: Add an option to preserve request span context in the persistent queue
8+
note: Preserve request span context and client metadata in the persistent queue.
99

1010
# One or more tracking issues or pull requests related to the change
11-
issues: [11740]
11+
issues: [11740, 13220]
1212

1313
# (Optional) One or more lines of additional information to render under the primary note.
1414
# These lines will be padded with 2 spaces and then inserted directly into the document.
1515
# Use pipe (|) for multiline entries.
1616
subtext: |
17+
It allows internal collector spans and client metadata to propagate through the persistent queue used by
18+
the exporters. The same way as it's done for the in-memory queue.
1719
Currently, it is behind the exporter.PersistRequestContext feature gate, which can be enabled by adding
1820
`--feature-gates=exporter.PersistRequestContext` to the collector command line. An exporter buffer stored by
1921
a previous version of the collector (or by a collector with the feature gate disabled) can be read by a newer

exporter/debugexporter/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ require (
4141
github.com/modern-go/reflect2 v1.0.2 // indirect
4242
github.com/pmezard/go-difflib v1.0.0 // indirect
4343
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
44+
go.opentelemetry.io/collector/client v1.34.0 // indirect
4445
go.opentelemetry.io/collector/config/configretry v1.34.0 // indirect
4546
go.opentelemetry.io/collector/consumer/consumererror v0.128.0 // indirect
4647
go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.128.0 // indirect

exporter/exporterhelper/xexporterhelper/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ require (
3737
github.com/modern-go/reflect2 v1.0.2 // indirect
3838
github.com/pmezard/go-difflib v1.0.0 // indirect
3939
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
40+
go.opentelemetry.io/collector/client v1.34.0 // indirect
4041
go.opentelemetry.io/collector/config/configretry v1.34.0 // indirect
4142
go.opentelemetry.io/collector/extension v1.34.0 // indirect
4243
go.opentelemetry.io/collector/extension/xextension v0.128.0 // indirect

exporter/exportertest/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ require (
3333
github.com/modern-go/reflect2 v1.0.2 // indirect
3434
github.com/pmezard/go-difflib v1.0.0 // indirect
3535
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
36+
go.opentelemetry.io/collector/client v1.34.0 // indirect
3637
go.opentelemetry.io/collector/consumer/xconsumer v0.128.0 // indirect
3738
go.opentelemetry.io/collector/extension v1.34.0 // indirect
3839
go.opentelemetry.io/collector/extension/xextension v0.128.0 // indirect

pdata/xpdata/go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.23.0
55
require (
66
github.com/gogo/protobuf v1.3.2
77
github.com/stretchr/testify v1.10.0
8+
go.opentelemetry.io/collector/client v0.0.0-00010101000000-000000000000
89
go.opentelemetry.io/collector/pdata v1.34.0
910
go.opentelemetry.io/collector/pdata/pprofile v0.128.0
1011
go.opentelemetry.io/collector/pdata/testdata v0.128.0
@@ -33,3 +34,7 @@ replace go.opentelemetry.io/collector/pdata => ..
3334
replace go.opentelemetry.io/collector/pdata/pprofile => ../pprofile
3435

3536
replace go.opentelemetry.io/collector/pdata/testdata => ../testdata
37+
38+
replace go.opentelemetry.io/collector/client => ../../client
39+
40+
replace go.opentelemetry.io/collector/consumer => ../../consumer

pdata/xpdata/request/context.go

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,42 +8,94 @@ import (
88

99
"go.opentelemetry.io/otel/trace"
1010

11+
"go.opentelemetry.io/collector/client"
12+
pdataint "go.opentelemetry.io/collector/pdata/internal"
13+
protocommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1"
14+
"go.opentelemetry.io/collector/pdata/pcommon"
1115
"go.opentelemetry.io/collector/pdata/xpdata/request/internal"
1216
)
1317

18+
var readOnlyState = pdataint.StateReadOnly
19+
1420
// encodeContext encodes the context into a map of strings.
1521
func encodeContext(ctx context.Context) internal.RequestContext {
16-
rc := internal.RequestContext{}
22+
return internal.RequestContext{
23+
SpanContext: encodeSpanContext(ctx),
24+
ClientMetadata: encodeClientMetadata(ctx),
25+
}
26+
}
27+
28+
func encodeSpanContext(ctx context.Context) *internal.SpanContext {
1729
spanCtx := trace.SpanContextFromContext(ctx)
18-
if spanCtx.IsValid() {
19-
traceID := spanCtx.TraceID()
20-
spanID := spanCtx.SpanID()
21-
rc.SpanContext = &internal.SpanContext{
22-
TraceId: traceID[:],
23-
SpanId: spanID[:],
24-
TraceFlags: uint32(spanCtx.TraceFlags()),
25-
TraceState: spanCtx.TraceState().String(),
26-
Remote: spanCtx.IsRemote(),
30+
if !spanCtx.IsValid() {
31+
return nil
32+
}
33+
traceID := spanCtx.TraceID()
34+
spanID := spanCtx.SpanID()
35+
return &internal.SpanContext{
36+
TraceId: traceID[:],
37+
SpanId: spanID[:],
38+
TraceFlags: uint32(spanCtx.TraceFlags()),
39+
TraceState: spanCtx.TraceState().String(),
40+
Remote: spanCtx.IsRemote(),
41+
}
42+
}
43+
44+
func encodeClientMetadata(ctx context.Context) []protocommon.KeyValue {
45+
clientMetadata := client.FromContext(ctx).Metadata
46+
metadataMap, metadataFound := pcommon.Map{}, false
47+
for k := range clientMetadata.Keys() {
48+
if !metadataFound {
49+
metadataMap, metadataFound = pcommon.NewMap(), true
50+
}
51+
vals := metadataMap.PutEmptySlice(k)
52+
for i := 0; i < len(clientMetadata.Get(k)); i++ {
53+
vals.AppendEmpty().SetStr(clientMetadata.Get(k)[i])
2754
}
2855
}
29-
return rc
56+
if metadataFound {
57+
return *pdataint.GetOrigMap(pdataint.Map(metadataMap))
58+
}
59+
return nil
3060
}
3161

3262
// decodeContext decodes the context from the bytes map.
33-
func decodeContext(rc *internal.RequestContext) context.Context {
34-
if rc == nil || rc.SpanContext == nil {
35-
return context.Background()
63+
func decodeContext(ctx context.Context, rc *internal.RequestContext) context.Context {
64+
if rc == nil {
65+
return ctx
66+
}
67+
ctx = decodeSpanContext(ctx, rc.SpanContext)
68+
return decodeClientMetadata(ctx, rc.ClientMetadata)
69+
}
70+
71+
func decodeSpanContext(ctx context.Context, sc *internal.SpanContext) context.Context {
72+
if sc == nil {
73+
return ctx
3674
}
3775
traceID := trace.TraceID{}
38-
copy(traceID[:], rc.SpanContext.TraceId)
76+
copy(traceID[:], sc.TraceId)
3977
spanID := trace.SpanID{}
40-
copy(spanID[:], rc.SpanContext.SpanId)
41-
traceState, _ := trace.ParseTraceState(rc.SpanContext.TraceState)
78+
copy(spanID[:], sc.SpanId)
79+
traceState, _ := trace.ParseTraceState(sc.TraceState)
4280
return trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{
4381
TraceID: traceID,
4482
SpanID: spanID,
45-
TraceFlags: trace.TraceFlags(rc.SpanContext.TraceFlags),
83+
TraceFlags: trace.TraceFlags(sc.TraceFlags),
4684
TraceState: traceState,
47-
Remote: rc.SpanContext.Remote,
85+
Remote: sc.Remote,
4886
}))
4987
}
88+
89+
func decodeClientMetadata(ctx context.Context, clientMetadata []protocommon.KeyValue) context.Context {
90+
if len(clientMetadata) == 0 {
91+
return ctx
92+
}
93+
metadataMap := make(map[string][]string, len(clientMetadata))
94+
for k, vals := range pcommon.Map(pdataint.NewMap(&clientMetadata, &readOnlyState)).All() {
95+
metadataMap[k] = make([]string, vals.Slice().Len())
96+
for i, v := range vals.Slice().All() {
97+
metadataMap[k][i] = v.Str()
98+
}
99+
}
100+
return client.NewContext(ctx, client.Info{Metadata: client.NewMetadata(metadataMap)})
101+
}

pdata/xpdata/request/context_test.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,38 @@ import (
77
"context"
88
"testing"
99

10+
"github.com/stretchr/testify/assert"
1011
"github.com/stretchr/testify/require"
1112
"go.opentelemetry.io/otel/trace"
1213

14+
"go.opentelemetry.io/collector/client"
1315
"go.opentelemetry.io/collector/pdata/xpdata/request/internal"
1416
)
1517

16-
func BenchmarkEncodeDecodeContext(b *testing.B) {
17-
spanCtx := fakeSpanContext(b)
18+
func TestEncodeDecodeContext(t *testing.T) {
19+
spanCtx := fakeSpanContext(t)
20+
clientMetadata := client.NewMetadata(map[string][]string{
21+
"key1": {"value1"},
22+
"key2": {"value2", "value3"},
23+
})
24+
25+
// Encode a context with a span and client metadata
1826
ctx := trace.ContextWithSpanContext(context.Background(), spanCtx)
27+
ctx = client.NewContext(ctx, client.Info{
28+
Metadata: clientMetadata,
29+
})
30+
reqCtx := encodeContext(ctx)
31+
buf, err := reqCtx.Marshal()
32+
require.NoError(t, err)
33+
34+
// Decode the context
35+
gotReqCtx := internal.RequestContext{}
36+
err = gotReqCtx.Unmarshal(buf)
37+
require.NoError(t, err)
38+
gotCtx := decodeContext(context.Background(), &gotReqCtx)
39+
assert.Equal(t, spanCtx, trace.SpanContextFromContext(gotCtx))
40+
assert.Equal(t, clientMetadata, client.FromContext(gotCtx).Metadata)
1941

20-
b.ResetTimer()
21-
b.ReportAllocs()
22-
for i := 0; i < b.N; i++ {
23-
reqCtx := encodeContext(ctx)
24-
buf, err := reqCtx.Marshal()
25-
require.NoError(b, err)
26-
27-
gotReqCtx := internal.RequestContext{}
28-
err = gotReqCtx.Unmarshal(buf)
29-
require.NoError(b, err)
30-
gotCtx := decodeContext(&gotReqCtx)
31-
require.Equal(b, spanCtx, trace.SpanContextFromContext(gotCtx))
32-
}
42+
// Decode nil request context
43+
assert.Equal(t, context.Background(), decodeContext(context.Background(), nil))
3344
}

0 commit comments

Comments
 (0)