Skip to content

Commit eee262b

Browse files
authored
[exporterhelper] Preserve client address in persistent queue (#13232)
Preserve client address in persistent queue as part of the request context A follow-up to #13220
1 parent 8cc2954 commit eee262b

File tree

5 files changed

+1674
-197
lines changed

5 files changed

+1674
-197
lines changed

.chloggen/persist-request-context.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ change_type: enhancement
55
component: exporterhelper
66

77
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8-
note: Preserve request span context and client metadata in the persistent queue.
8+
note: Preserve request span context and client information in the persistent queue.
99

1010
# One or more tracking issues or pull requests related to the change
11-
issues: [11740, 13220]
11+
issues: [11740, 13220, 13232]
1212

1313
# (Optional) One or more lines of additional information to render under the primary note.
1414
# These lines will be padded with 2 spaces and then inserted directly into the document.
1515
# Use pipe (|) for multiline entries.
1616
subtext: |
17-
It allows internal collector spans and client metadata to propagate through the persistent queue used by
17+
It allows internal collector spans and client information to propagate through the persistent queue used by
1818
the exporters. The same way as it's done for the in-memory queue.
1919
Currently, it is behind the exporter.PersistRequestContext feature gate, which can be enabled by adding
2020
`--feature-gates=exporter.PersistRequestContext` to the collector command line. An exporter buffer stored by

pdata/xpdata/request/context.go

Lines changed: 78 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package request // import "go.opentelemetry.io/collector/pdata/xpdata/request"
55

66
import (
77
"context"
8+
"net"
89

910
"go.opentelemetry.io/otel/trace"
1011

@@ -19,20 +20,21 @@ var readOnlyState = pdataint.StateReadOnly
1920

2021
// encodeContext encodes the context into a map of strings.
2122
func encodeContext(ctx context.Context) internal.RequestContext {
22-
return internal.RequestContext{
23-
SpanContext: encodeSpanContext(ctx),
24-
ClientMetadata: encodeClientMetadata(ctx),
25-
}
23+
rc := internal.RequestContext{}
24+
encodeSpanContext(ctx, &rc)
25+
encodeClientMetadata(ctx, &rc)
26+
encodeClientAddress(ctx, &rc)
27+
return rc
2628
}
2729

28-
func encodeSpanContext(ctx context.Context) *internal.SpanContext {
30+
func encodeSpanContext(ctx context.Context, rc *internal.RequestContext) {
2931
spanCtx := trace.SpanContextFromContext(ctx)
3032
if !spanCtx.IsValid() {
31-
return nil
33+
return
3234
}
3335
traceID := spanCtx.TraceID()
3436
spanID := spanCtx.SpanID()
35-
return &internal.SpanContext{
37+
rc.SpanContext = &internal.SpanContext{
3638
TraceId: traceID[:],
3739
SpanId: spanID[:],
3840
TraceFlags: uint32(spanCtx.TraceFlags()),
@@ -41,7 +43,7 @@ func encodeSpanContext(ctx context.Context) *internal.SpanContext {
4143
}
4244
}
4345

44-
func encodeClientMetadata(ctx context.Context) []protocommon.KeyValue {
46+
func encodeClientMetadata(ctx context.Context, rc *internal.RequestContext) {
4547
clientMetadata := client.FromContext(ctx).Metadata
4648
metadataMap, metadataFound := pcommon.Map{}, false
4749
for k := range clientMetadata.Keys() {
@@ -54,9 +56,35 @@ func encodeClientMetadata(ctx context.Context) []protocommon.KeyValue {
5456
}
5557
}
5658
if metadataFound {
57-
return *pdataint.GetOrigMap(pdataint.Map(metadataMap))
59+
rc.ClientMetadata = *pdataint.GetOrigMap(pdataint.Map(metadataMap))
60+
}
61+
}
62+
63+
func encodeClientAddress(ctx context.Context, rc *internal.RequestContext) {
64+
switch a := client.FromContext(ctx).Addr.(type) {
65+
case *net.IPAddr:
66+
rc.ClientAddress = &internal.RequestContext_Ip{Ip: &internal.IPAddr{
67+
Ip: a.IP,
68+
Zone: a.Zone,
69+
}}
70+
case *net.TCPAddr:
71+
rc.ClientAddress = &internal.RequestContext_Tcp{Tcp: &internal.TCPAddr{
72+
Ip: a.IP,
73+
Port: int32(a.Port), //nolint:gosec // G115
74+
Zone: a.Zone,
75+
}}
76+
case *net.UDPAddr:
77+
rc.ClientAddress = &internal.RequestContext_Udp{Udp: &internal.UDPAddr{
78+
Ip: a.IP,
79+
Port: int32(a.Port), //nolint:gosec // G115
80+
Zone: a.Zone,
81+
}}
82+
case *net.UnixAddr:
83+
rc.ClientAddress = &internal.RequestContext_Unix{Unix: &internal.UnixAddr{
84+
Name: a.Name,
85+
Net: a.Net,
86+
}}
5887
}
59-
return nil
6088
}
6189

6290
// decodeContext decodes the context from the bytes map.
@@ -65,7 +93,15 @@ func decodeContext(ctx context.Context, rc *internal.RequestContext) context.Con
6593
return ctx
6694
}
6795
ctx = decodeSpanContext(ctx, rc.SpanContext)
68-
return decodeClientMetadata(ctx, rc.ClientMetadata)
96+
metadataMap := decodeClientMetadata(rc.ClientMetadata)
97+
clientAddress := decodeClientAddress(rc)
98+
if len(metadataMap) > 0 || clientAddress != nil {
99+
ctx = client.NewContext(ctx, client.Info{
100+
Metadata: client.NewMetadata(metadataMap),
101+
Addr: clientAddress,
102+
})
103+
}
104+
return ctx
69105
}
70106

71107
func decodeSpanContext(ctx context.Context, sc *internal.SpanContext) context.Context {
@@ -86,9 +122,9 @@ func decodeSpanContext(ctx context.Context, sc *internal.SpanContext) context.Co
86122
}))
87123
}
88124

89-
func decodeClientMetadata(ctx context.Context, clientMetadata []protocommon.KeyValue) context.Context {
125+
func decodeClientMetadata(clientMetadata []protocommon.KeyValue) map[string][]string {
90126
if len(clientMetadata) == 0 {
91-
return ctx
127+
return nil
92128
}
93129
metadataMap := make(map[string][]string, len(clientMetadata))
94130
for k, vals := range pcommon.Map(pdataint.NewMap(&clientMetadata, &readOnlyState)).All() {
@@ -97,5 +133,33 @@ func decodeClientMetadata(ctx context.Context, clientMetadata []protocommon.KeyV
97133
metadataMap[k][i] = v.Str()
98134
}
99135
}
100-
return client.NewContext(ctx, client.Info{Metadata: client.NewMetadata(metadataMap)})
136+
return metadataMap
137+
}
138+
139+
func decodeClientAddress(rc *internal.RequestContext) net.Addr {
140+
switch a := rc.ClientAddress.(type) {
141+
case *internal.RequestContext_Ip:
142+
return &net.IPAddr{
143+
IP: a.Ip.Ip,
144+
Zone: a.Ip.Zone,
145+
}
146+
case *internal.RequestContext_Tcp:
147+
return &net.TCPAddr{
148+
IP: a.Tcp.Ip,
149+
Port: int(a.Tcp.Port),
150+
Zone: a.Tcp.Zone,
151+
}
152+
case *internal.RequestContext_Udp:
153+
return &net.UDPAddr{
154+
IP: a.Udp.Ip,
155+
Port: int(a.Udp.Port),
156+
Zone: a.Udp.Zone,
157+
}
158+
case *internal.RequestContext_Unix:
159+
return &net.UnixAddr{
160+
Name: a.Unix.Name,
161+
Net: a.Unix.Net,
162+
}
163+
}
164+
return nil
101165
}

pdata/xpdata/request/context_test.go

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package request // import "go.opentelemetry.io/collector/pdata/xpdata/request"
55

66
import (
77
"context"
8+
"net"
89
"testing"
910

1011
"github.com/stretchr/testify/assert"
@@ -21,24 +22,74 @@ func TestEncodeDecodeContext(t *testing.T) {
2122
"key1": {"value1"},
2223
"key2": {"value2", "value3"},
2324
})
25+
tests := []struct {
26+
name string
27+
clientInfo client.Info
28+
}{
29+
{
30+
name: "without_client_address",
31+
clientInfo: client.Info{Metadata: clientMetadata},
32+
},
33+
{
34+
name: "with_client_IP_address",
35+
clientInfo: client.Info{
36+
Metadata: clientMetadata,
37+
Addr: &net.IPAddr{
38+
IP: net.IPv6loopback,
39+
Zone: "eth0",
40+
},
41+
},
42+
},
43+
{
44+
name: "with_client_TCP_address",
45+
clientInfo: client.Info{
46+
Metadata: clientMetadata,
47+
Addr: &net.TCPAddr{
48+
IP: net.IPv4(127, 0, 0, 1),
49+
Port: 8080,
50+
},
51+
},
52+
},
53+
{
54+
name: "with_client_UDP_address",
55+
clientInfo: client.Info{
56+
Metadata: clientMetadata,
57+
Addr: &net.UDPAddr{
58+
IP: net.IPv4(127, 0, 0, 1),
59+
Port: 8080,
60+
},
61+
},
62+
},
63+
{
64+
name: "with_client_unix_address",
65+
clientInfo: client.Info{
66+
Metadata: clientMetadata,
67+
Addr: &net.UnixAddr{
68+
Name: "/var/run/test.sock",
69+
Net: "unixpacket",
70+
},
71+
},
72+
},
73+
}
74+
for _, tt := range tests {
75+
t.Run(tt.name, func(t *testing.T) {
76+
// Encode a context with a span and client metadata
77+
ctx := trace.ContextWithSpanContext(context.Background(), spanCtx)
78+
ctx = client.NewContext(ctx, tt.clientInfo)
79+
reqCtx := encodeContext(ctx)
80+
buf, err := reqCtx.Marshal()
81+
require.NoError(t, err)
2482

25-
// Encode a context with a span and client metadata
26-
ctx := trace.ContextWithSpanContext(context.Background(), spanCtx)
27-
ctx = client.NewContext(ctx, client.Info{
28-
Metadata: clientMetadata,
29-
})
30-
reqCtx := encodeContext(ctx)
31-
buf, err := reqCtx.Marshal()
32-
require.NoError(t, err)
33-
34-
// Decode the context
35-
gotReqCtx := internal.RequestContext{}
36-
err = gotReqCtx.Unmarshal(buf)
37-
require.NoError(t, err)
38-
gotCtx := decodeContext(context.Background(), &gotReqCtx)
39-
assert.Equal(t, spanCtx, trace.SpanContextFromContext(gotCtx))
40-
assert.Equal(t, clientMetadata, client.FromContext(gotCtx).Metadata)
83+
// Decode the context
84+
gotReqCtx := internal.RequestContext{}
85+
err = gotReqCtx.Unmarshal(buf)
86+
require.NoError(t, err)
87+
gotCtx := decodeContext(context.Background(), &gotReqCtx)
88+
assert.Equal(t, spanCtx, trace.SpanContextFromContext(gotCtx))
89+
assert.Equal(t, tt.clientInfo, client.FromContext(gotCtx))
90+
})
91+
}
4192

42-
// Decode nil request context
93+
// Decode a nil context
4394
assert.Equal(t, context.Background(), decodeContext(context.Background(), nil))
4495
}

0 commit comments

Comments
 (0)