Skip to content

Commit 5aab536

Browse files
committed
Stabilize exporter.UsePullingBasedExporterQueueBatcher and remove old batch sender
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 9ea6963 commit 5aab536

File tree

6 files changed

+631
-1314
lines changed

6 files changed

+631
-1314
lines changed

.chloggen/rm-batcher.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Stabilize exporter.UsePullingBasedExporterQueueBatcher and remove old batch sender
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: []
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api, user]

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 51 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -48,94 +48,66 @@ func newNoopExportSender() Sender[request.Request] {
4848
}
4949

5050
func TestBaseExporter(t *testing.T) {
51-
runTest := func(testName string, enableQueueBatcher bool) {
52-
t.Run(testName, func(t *testing.T) {
53-
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
54-
be, err := NewBaseExporter(defaultSettings, defaultSignal)
55-
require.NoError(t, err)
56-
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
57-
require.NoError(t, be.Shutdown(context.Background()))
58-
})
59-
}
60-
runTest("enable_queue_batcher", true)
61-
runTest("disable_queue_batcher", false)
51+
be, err := NewBaseExporter(defaultSettings, defaultSignal)
52+
require.NoError(t, err)
53+
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
54+
require.NoError(t, be.Shutdown(context.Background()))
6255
}
6356

6457
func TestBaseExporterWithOptions(t *testing.T) {
65-
runTest := func(testName string, enableQueueBatcher bool) {
66-
t.Run(testName, func(t *testing.T) {
67-
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
68-
want := errors.New("my error")
69-
be, err := NewBaseExporter(
70-
defaultSettings, defaultSignal,
71-
WithStart(func(context.Context, component.Host) error { return want }),
72-
WithShutdown(func(context.Context) error { return want }),
73-
WithTimeout(NewDefaultTimeoutConfig()),
74-
)
75-
require.NoError(t, err)
76-
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
77-
require.Equal(t, want, be.Shutdown(context.Background()))
78-
})
79-
}
80-
runTest("enable_queue_batcher", true)
81-
runTest("disable_queue_batcher", false)
58+
want := errors.New("my error")
59+
be, err := NewBaseExporter(
60+
defaultSettings, defaultSignal,
61+
WithStart(func(context.Context, component.Host) error { return want }),
62+
WithShutdown(func(context.Context) error { return want }),
63+
WithTimeout(NewDefaultTimeoutConfig()),
64+
)
65+
require.NoError(t, err)
66+
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
67+
require.Equal(t, want, be.Shutdown(context.Background()))
8268
}
8369

8470
func TestQueueOptionsWithRequestExporter(t *testing.T) {
85-
runTest := func(testName string, enableQueueBatcher bool) {
86-
t.Run(testName, func(t *testing.T) {
87-
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
88-
bs, err := NewBaseExporter(exportertest.NewNopSettingsWithType(exportertest.NopType), defaultSignal,
89-
WithRetry(configretry.NewDefaultBackOffConfig()))
90-
require.NoError(t, err)
91-
require.Nil(t, bs.Marshaler)
92-
require.Nil(t, bs.Unmarshaler)
93-
_, err = NewBaseExporter(exportertest.NewNopSettingsWithType(exportertest.NopType), defaultSignal,
94-
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
95-
require.Error(t, err)
71+
bs, err := NewBaseExporter(exportertest.NewNopSettingsWithType(exportertest.NopType), defaultSignal,
72+
WithRetry(configretry.NewDefaultBackOffConfig()))
73+
require.NoError(t, err)
74+
require.Nil(t, bs.Marshaler)
75+
require.Nil(t, bs.Unmarshaler)
76+
_, err = NewBaseExporter(exportertest.NewNopSettingsWithType(exportertest.NopType), defaultSignal,
77+
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
78+
require.Error(t, err)
9679

97-
_, err = NewBaseExporter(exportertest.NewNopSettingsWithType(exportertest.NopType), defaultSignal,
98-
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
99-
WithRetry(configretry.NewDefaultBackOffConfig()),
100-
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[request.Request]()))
101-
require.Error(t, err)
102-
})
103-
}
104-
runTest("enable_queue_batcher", true)
105-
runTest("disable_queue_batcher", false)
80+
_, err = NewBaseExporter(exportertest.NewNopSettingsWithType(exportertest.NopType), defaultSignal,
81+
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
82+
WithRetry(configretry.NewDefaultBackOffConfig()),
83+
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[request.Request]()))
84+
require.Error(t, err)
10685
}
10786

10887
func TestBaseExporterLogging(t *testing.T) {
109-
runTest := func(testName string, enableQueueBatcher bool) {
110-
t.Run(testName, func(t *testing.T) {
111-
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
112-
set := exportertest.NewNopSettingsWithType(exportertest.NopType)
113-
logger, observed := observer.New(zap.DebugLevel)
114-
set.Logger = zap.New(logger)
115-
rCfg := configretry.NewDefaultBackOffConfig()
116-
rCfg.Enabled = false
117-
qCfg := exporterqueue.NewDefaultConfig()
118-
qCfg.Enabled = false
119-
bs, err := NewBaseExporter(set, defaultSignal,
120-
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[request.Request]()),
121-
WithBatcher(exporterbatcher.NewDefaultConfig()),
122-
WithRetry(rCfg))
123-
require.NoError(t, err)
124-
require.NoError(t, bs.Start(context.Background(), componenttest.NewNopHost()))
125-
sink := requesttest.NewSink()
126-
sendErr := bs.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink, ExportErr: errors.New("my error")})
127-
require.Error(t, sendErr)
88+
set := exportertest.NewNopSettingsWithType(exportertest.NopType)
89+
logger, observed := observer.New(zap.DebugLevel)
90+
set.Logger = zap.New(logger)
91+
rCfg := configretry.NewDefaultBackOffConfig()
92+
rCfg.Enabled = false
93+
qCfg := exporterqueue.NewDefaultConfig()
94+
qCfg.Enabled = false
95+
bs, err := NewBaseExporter(set, defaultSignal,
96+
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[request.Request]()),
97+
WithBatcher(exporterbatcher.NewDefaultConfig()),
98+
WithRetry(rCfg))
99+
require.NoError(t, err)
100+
require.NoError(t, bs.Start(context.Background(), componenttest.NewNopHost()))
101+
sink := requesttest.NewSink()
102+
sendErr := bs.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink, ExportErr: errors.New("my error")})
103+
require.Error(t, sendErr)
128104

129-
require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 2)
130-
assert.Contains(t, observed.All()[0].Message, "Exporting failed. Dropping data.")
131-
assert.Equal(t, "my error", observed.All()[0].ContextMap()["error"])
132-
assert.Contains(t, observed.All()[1].Message, "Exporting failed. Rejecting data.")
133-
assert.Equal(t, "my error", observed.All()[1].ContextMap()["error"])
134-
require.NoError(t, bs.Shutdown(context.Background()))
135-
})
136-
}
137-
runTest("enable_queue_batcher", true)
138-
runTest("disable_queue_batcher", false)
105+
require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 2)
106+
assert.Contains(t, observed.All()[0].Message, "Exporting failed. Dropping data.")
107+
assert.Equal(t, "my error", observed.All()[0].ContextMap()["error"])
108+
assert.Contains(t, observed.All()[1].Message, "Exporting failed. Rejecting data.")
109+
assert.Equal(t, "my error", observed.All()[1].ContextMap()["error"])
110+
require.NoError(t, bs.Shutdown(context.Background()))
139111
}
140112

141113
func TestQueueRetryWithDisabledQueue(t *testing.T) {
@@ -177,13 +149,8 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
177149
},
178150
}
179151

180-
runTest := func(testName string, enableQueueBatcher bool, tt struct {
181-
name string
182-
queueOptions []Option
183-
},
184-
) {
185-
t.Run(testName, func(t *testing.T) {
186-
setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)
152+
for _, tt := range tests {
153+
t.Run(tt.name, func(t *testing.T) {
187154
set := exportertest.NewNopSettingsWithType(exportertest.NopType)
188155
logger, observed := observer.New(zap.ErrorLevel)
189156
set.Logger = zap.New(logger)
@@ -199,8 +166,4 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
199166
assert.Empty(t, 0, sink.RequestsCount())
200167
})
201168
}
202-
for _, tt := range tests {
203-
runTest(tt.name+"_enable_queue_batcher", true, tt)
204-
runTest(tt.name+"_disable_queue_batcher", false, tt)
205-
}
206169
}

0 commit comments

Comments
 (0)