Skip to content

Commit 3ef58fd

Browse files
authored
[exporterhelper] Preserve request span context in the persistent queue (#13188)
Alternative to #13176 that doesn't involve custom encoder but exposes new public module `pdata/xpdata/request` The actual change set is pretty small. Most of the code is generated protobuf
1 parent 1921e21 commit 3ef58fd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1732
-112
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: breaking
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: exporterhelper
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: QueueBatchEncoding interface is changed to support marshaling and unmarshaling of request context.
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [13188]
12+
13+
# Optional: The change log or logs in which this entry should be included.
14+
# e.g. '[user]' or '[user, api]'
15+
# Include 'user' if the change is relevant to end users.
16+
# Include 'api' if there is a change to a library API.
17+
# Default: '[user]'
18+
change_logs: [api]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
5+
component: exporterhelper
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Add an option to preserve request span context in the persistent queue
9+
10+
# One or more tracking issues or pull requests related to the change
11+
issues: [11740]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
Currently, it is behind the exporter.PersistRequestContext feature gate, which can be enabled by adding
18+
`--feature-gates=exporter.PersistRequestContext` to the collector command line. An exporter buffer stored by
19+
a previous version of the collector (or by a collector with the feature gate disabled) can be read by a newer
20+
collector with the feature enabled. However, the reverse is not supported: a buffer stored by a newer collector with
21+
the feature enabled cannot be read by an older collector (or by a collector with the feature gate disabled).
22+
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: [user]

.codecov.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ coverage:
1919

2020
ignore:
2121
- "pdata/internal/data/protogen/**/*"
22+
- "**/*.pb.go"
2223
- "cmd/mdatagen/third_party/**/*"

.github/workflows/build-and-test.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ jobs:
125125
run: |
126126
make gogenerate
127127
git diff --exit-code || (echo 'Generated code is out of date, please run "make gogenerate" and commit the changes in this PR.' && exit 1)
128+
- name: Generate proto files
129+
run: |
130+
make genproto
131+
git diff --exit-code || (echo 'Generated code is out of date, please run "make genproto" and commit the changes in this PR.' && exit 1)
128132
- name: Gen Pdata
129133
run: |
130134
make genpdata

.github/workflows/utils/cspell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,7 @@
445445
"xexporter",
446446
"xexporterhelper",
447447
"xextension",
448+
"xpdata",
448449
"xpipeline",
449450
"xprocessor",
450451
"xprocessorhelper",

Makefile

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ gotidy:
9494
gogenerate:
9595
cd cmd/mdatagen && $(GOCMD) install .
9696
@$(MAKE) for-all-target TARGET="generate"
97-
$(MAKE) genproto_internal
9897
$(MAKE) fmt
9998

10099
.PHONY: addlicense
@@ -206,6 +205,7 @@ genproto: genproto-cleanup
206205
curl -sSL https://api.github.com/repos/open-telemetry/opentelemetry-proto/tarball/${OPENTELEMETRY_PROTO_VERSION} | tar xz --strip 1 -C ${OPENTELEMETRY_PROTO_SRC_DIR}
207206
# Call a sub-make to ensure OPENTELEMETRY_PROTO_FILES is populated
208207
$(MAKE) genproto_sub
208+
$(MAKE) genproto_internal
209209
$(MAKE) fmt
210210
$(MAKE) genproto-cleanup
211211

@@ -250,10 +250,9 @@ genpdata:
250250
pushd pdata/ && $(GOCMD) run ./internal/cmd/pdatagen/main.go && popd
251251
$(MAKE) fmt
252252

253-
INTERNAL_PROTO_SRC_DIRS := exporter/exporterhelper/internal/queue
254-
# INTERNAL_PROTO_SRC_DIRS += path/to/other/proto/dirs
253+
INTERNAL_PROTO_SRC_DIRS := exporter/exporterhelper/internal/queue pdata/xpdata/request/internal
255254
INTERNAL_PROTO_FILES := $(foreach dir,$(INTERNAL_PROTO_SRC_DIRS),$(wildcard $(dir)/*.proto))
256-
INTERNAL_PROTOC := $(DOCKERCMD) run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${DOCKER_PROTOBUF} --proto_path=${PWD} --go_out=${PWD}
255+
INTERNAL_PROTOC := $(DOCKERCMD) run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${DOCKER_PROTOBUF} --proto_path=${PWD} -I/usr/include/github.com/gogo/protobuf -I${PWD}/$(PROTO_INTERMEDIATE_DIR) --go_out=${PWD}
257256

258257
.PHONY: genproto_internal
259258
genproto_internal:

cmd/builder/internal/builder/main_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ var replaceModules = []string{
9595
"/pdata",
9696
"/pdata/testdata",
9797
"/pdata/pprofile",
98+
"/pdata/xpdata",
9899
"/pipeline",
99100
"/pipeline/xpipeline",
100101
"/processor",

cmd/otelcorecol/builder-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ replaces:
9696
- go.opentelemetry.io/collector/pdata => ../../pdata
9797
- go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata
9898
- go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile
99+
- go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata
99100
- go.opentelemetry.io/collector/pipeline => ../../pipeline
100101
- go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline
101102
- go.opentelemetry.io/collector/processor => ../../processor

cmd/otelcorecol/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ require (
122122
go.opentelemetry.io/collector/pdata v1.34.0 // indirect
123123
go.opentelemetry.io/collector/pdata/pprofile v0.128.0 // indirect
124124
go.opentelemetry.io/collector/pdata/testdata v0.128.0 // indirect
125+
go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 // indirect
125126
go.opentelemetry.io/collector/pipeline v0.128.0 // indirect
126127
go.opentelemetry.io/collector/pipeline/xpipeline v0.128.0 // indirect
127128
go.opentelemetry.io/collector/processor/processorhelper v0.128.0 // indirect
@@ -291,6 +292,8 @@ replace go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata
291292

292293
replace go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile
293294

295+
replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata
296+
294297
replace go.opentelemetry.io/collector/pipeline => ../../pipeline
295298

296299
replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline

exporter/debugexporter/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ require (
5050
go.opentelemetry.io/collector/extension/xextension v0.128.0 // indirect
5151
go.opentelemetry.io/collector/featuregate v1.34.0 // indirect
5252
go.opentelemetry.io/collector/internal/telemetry v0.128.0 // indirect
53+
go.opentelemetry.io/collector/pdata/xpdata v0.0.0-00010101000000-000000000000 // indirect
5354
go.opentelemetry.io/collector/pipeline v0.128.0 // indirect
5455
go.opentelemetry.io/collector/pipeline/xpipeline v0.128.0 // indirect
5556
go.opentelemetry.io/collector/receiver v1.34.0 // indirect
@@ -127,3 +128,5 @@ replace go.opentelemetry.io/collector/extension/xextension => ../../extension/xe
127128
replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry
128129

129130
replace go.opentelemetry.io/collector/client => ../../client
131+
132+
replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,10 @@ func newFakeQueueBatch() QueueBatchSettings[request.Request] {
162162

163163
type fakeEncoding struct{}
164164

165-
func (f fakeEncoding) Marshal(request.Request) ([]byte, error) {
165+
func (f fakeEncoding) Marshal(context.Context, request.Request) ([]byte, error) {
166166
return []byte("mockRequest"), nil
167167
}
168168

169-
func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) {
170-
return &requesttest.FakeRequest{}, nil
169+
func (f fakeEncoding) Unmarshal([]byte) (context.Context, request.Request, error) {
170+
return context.Background(), &requesttest.FakeRequest{}, nil
171171
}

exporter/exporterhelper/internal/oteltest/tracetest.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/stretchr/testify/require"
1010
"go.opentelemetry.io/otel/codes"
1111
sdktrace "go.opentelemetry.io/otel/sdk/trace"
12+
"go.opentelemetry.io/otel/trace"
1213
)
1314

1415
func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
@@ -19,3 +20,17 @@ func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
1920
require.Equal(t, codes.Unset, sd.Status().Code)
2021
}
2122
}
23+
24+
func FakeSpanContext(t *testing.T) trace.SpanContext {
25+
traceID, err := trace.TraceIDFromHex("0102030405060708090a0b0c0d0e0f10")
26+
require.NoError(t, err)
27+
spanID, err := trace.SpanIDFromHex("0102030405060708")
28+
require.NoError(t, err)
29+
return trace.NewSpanContext(trace.SpanContextConfig{
30+
TraceID: traceID,
31+
SpanID: spanID,
32+
TraceFlags: 0x01,
33+
TraceState: trace.TraceState{},
34+
Remote: true,
35+
})
36+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue"
5+
6+
import "go.opentelemetry.io/collector/featuregate"
7+
8+
// PersistRequestContextFeatureGate controls whether request context should be preserved in the persistent queue.
9+
var PersistRequestContextFeatureGate = featuregate.GlobalRegistry().MustRegister(
10+
"exporter.PersistRequestContext",
11+
featuregate.StageAlpha,
12+
featuregate.WithRegisterFromVersion("v0.128.0"),
13+
featuregate.WithRegisterDescription("controls whether context should be stored alongside requests in the persistent queue"),
14+
)
15+
16+
// assign the feature gate to separate variables to make it possible to override the behavior in tests
17+
// on write and read paths separately.
18+
var (
19+
PersistRequestContextOnRead = PersistRequestContextFeatureGate.IsEnabled()
20+
PersistRequestContextOnWrite = PersistRequestContextFeatureGate.IsEnabled()
21+
)

exporter/exporterhelper/internal/queue/persistent_queue.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
253253
}
254254
}
255255

256-
reqBuf, err := pq.set.encoding.Marshal(req)
256+
reqBuf, err := pq.set.encoding.Marshal(ctx, req)
257257
if err != nil {
258258
return err
259259
}
@@ -294,7 +294,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
294294

295295
// Read until either a successful retrieved element or no more elements in the storage.
296296
for pq.metadata.ReadIndex != pq.metadata.WriteIndex {
297-
index, req, consumed := pq.getNextItem(ctx)
297+
index, req, reqCtx, consumed := pq.getNextItem(ctx)
298298
// Ensure the used size and the channel size are in sync.
299299
if pq.metadata.ReadIndex == pq.metadata.WriteIndex {
300300
pq.metadata.QueueSize = 0
@@ -303,7 +303,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
303303
if consumed {
304304
id := indexDonePool.Get().(*indexDone)
305305
id.reset(index, pq.set.sizer.Sizeof(req), pq)
306-
return context.Background(), req, id, true
306+
return reqCtx, req, id, true
307307
}
308308
}
309309

@@ -316,7 +316,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
316316
// getNextItem pulls the next available item from the persistent storage along with its index. Once processing is
317317
// finished, the index should be called with onDone to clean up the storage. If no new item is available,
318318
// returns false.
319-
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) {
319+
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, context.Context, bool) {
320320
index := pq.metadata.ReadIndex
321321
// Increase here, so even if errors happen below, it always iterates
322322
pq.metadata.ReadIndex++
@@ -328,8 +328,9 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
328328
getOp)
329329

330330
var request T
331+
restoredCtx := context.Background()
331332
if err == nil {
332-
request, err = pq.set.encoding.Unmarshal(getOp.Value)
333+
restoredCtx, request, err = pq.set.encoding.Unmarshal(getOp.Value)
333334
}
334335

335336
if err != nil {
@@ -339,14 +340,14 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
339340
pq.logger.Error("Error deleting item from queue", zap.Error(err))
340341
}
341342

342-
return 0, request, false
343+
return 0, request, restoredCtx, false
343344
}
344345

345346
// Increase the reference count, so the client is not closed while the request is being processed.
346347
// The client cannot be closed because we hold the lock since last we checked `stopped`.
347348
pq.refClient++
348349

349-
return index, request, true
350+
return index, request, restoredCtx, true
350351
}
351352

352353
// onDone should be called to remove the item of the given index from the queue once processing is finished.
@@ -438,13 +439,13 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
438439
pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
439440
continue
440441
}
441-
req, err := pq.set.encoding.Unmarshal(op.Value)
442+
reqCtx, req, err := pq.set.encoding.Unmarshal(op.Value)
442443
// If error happened or item is nil, it will be efficiently ignored
443444
if err != nil {
444445
pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))
445446
continue
446447
}
447-
if pq.putInternal(ctx, req) != nil {
448+
if pq.putInternal(reqCtx, req) != nil {
448449
errCount++
449450
}
450451
}

exporter/exporterhelper/internal/queue/persistent_queue_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ func (is *itemsSizer) Sizeof(val uint64) int64 {
4343

4444
type uint64Encoding struct{}
4545

46-
func (uint64Encoding) Marshal(val uint64) ([]byte, error) {
46+
func (uint64Encoding) Marshal(_ context.Context, val uint64) ([]byte, error) {
4747
return binary.LittleEndian.AppendUint64([]byte{}, val), nil
4848
}
4949

50-
func (uint64Encoding) Unmarshal(bytes []byte) (uint64, error) {
50+
func (uint64Encoding) Unmarshal(bytes []byte) (context.Context, uint64, error) {
5151
if len(bytes) < 8 {
52-
return 0, errInvalidValue
52+
return context.Background(), 0, errInvalidValue
5353
}
54-
return binary.LittleEndian.Uint64(bytes), nil
54+
return context.Background(), binary.LittleEndian.Uint64(bytes), nil
5555
}
5656

5757
func newFakeBoundedStorageClient(maxSizeInBytes int) *fakeBoundedStorageClient {
@@ -913,7 +913,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {
913913
}
914914

915915
func TestPersistentQueue_StorageFull(t *testing.T) {
916-
marshaled, err := uint64Encoding{}.Marshal(uint64(50))
916+
marshaled, err := uint64Encoding{}.Marshal(context.Background(), uint64(50))
917917
require.NoError(t, err)
918918
maxSizeInBytes := len(marshaled) * 5 // arbitrary small number
919919

exporter/exporterhelper/internal/queue/queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ import (
1414

1515
type Encoding[T any] interface {
1616
// Marshal is a function that can marshal a request into bytes.
17-
Marshal(T) ([]byte, error)
17+
Marshal(context.Context, T) ([]byte, error)
1818

1919
// Unmarshal is a function that can unmarshal bytes into a request.
20-
Unmarshal([]byte) (T, error)
20+
Unmarshal([]byte) (context.Context, T, error)
2121
}
2222

2323
// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full and setup to

exporter/exporterhelper/internal/queuebatch/queue_batch_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ type fakeEncoding struct {
4545
mr request.Request
4646
}
4747

48-
func (f fakeEncoding) Marshal(request.Request) ([]byte, error) {
48+
func (f fakeEncoding) Marshal(context.Context, request.Request) ([]byte, error) {
4949
return []byte("mockRequest"), nil
5050
}
5151

52-
func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) {
53-
return f.mr, nil
52+
func (f fakeEncoding) Unmarshal([]byte) (context.Context, request.Request, error) {
53+
return context.Background(), f.mr, nil
5454
}
5555

5656
func newFakeEncoding(mr request.Request) queue.Encoding[request.Request] {

exporter/exporterhelper/logs.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ import (
1414
"go.opentelemetry.io/collector/consumer/consumererror"
1515
"go.opentelemetry.io/collector/exporter"
1616
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1819
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1920
"go.opentelemetry.io/collector/pdata/plog"
21+
pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request"
2022
"go.opentelemetry.io/collector/pipeline"
2123
)
2224

@@ -57,16 +59,32 @@ func newLogsRequest(ld plog.Logs) Request {
5759

5860
type logsEncoding struct{}
5961

60-
func (logsEncoding) Unmarshal(bytes []byte) (Request, error) {
62+
var _ QueueBatchEncoding[Request] = logsEncoding{}
63+
64+
func (logsEncoding) Unmarshal(bytes []byte) (context.Context, Request, error) {
65+
if queue.PersistRequestContextOnRead {
66+
ctx, logs, err := pdatareq.UnmarshalLogs(bytes)
67+
if errors.Is(err, pdatareq.ErrInvalidFormat) {
68+
// fall back to unmarshaling without context
69+
logs, err = logsUnmarshaler.UnmarshalLogs(bytes)
70+
}
71+
return ctx, newLogsRequest(logs), err
72+
}
73+
6174
logs, err := logsUnmarshaler.UnmarshalLogs(bytes)
6275
if err != nil {
63-
return nil, err
76+
var req Request
77+
return context.Background(), req, err
6478
}
65-
return newLogsRequest(logs), nil
79+
return context.Background(), newLogsRequest(logs), nil
6680
}
6781

68-
func (logsEncoding) Marshal(req Request) ([]byte, error) {
69-
return logsMarshaler.MarshalLogs(req.(*logsRequest).ld)
82+
func (logsEncoding) Marshal(ctx context.Context, req Request) ([]byte, error) {
83+
logs := req.(*logsRequest).ld
84+
if queue.PersistRequestContextOnWrite {
85+
return pdatareq.MarshalLogs(ctx, logs)
86+
}
87+
return logsMarshaler.MarshalLogs(logs)
7088
}
7189

7290
func (req *logsRequest) OnError(err error) Request {

0 commit comments

Comments
 (0)