Skip to content

Commit 09b2518

Browse files
committed
[exporterhelper] Fix potential deadlock in the batch sender
Concurrent handling of the flush timeouts can run into a deadlock when a batch is simultaneously sent by reaching the minimum size and flush timeout. The deadlock can happen on the following lines: - https://github.com/open-telemetry/opentelemetry-collector/blob/115bc8e28e009ca93565dc4deb4cf6608fa63622/exporter/exporterhelper/batch_sender.go#L131 - https://github.com/open-telemetry/opentelemetry-collector/blob/115bc8e28e009ca93565dc4deb4cf6608fa63622/exporter/exporterhelper/batch_sender.go#L87
1 parent 7b0c38e commit 09b2518

File tree

3 files changed

+97
-18
lines changed

3 files changed

+97
-18
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix potential deadlock in the batch sender
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [10315]
14+
15+
# Optional: The change log or logs in which this entry should be included.
16+
# e.g. '[user]' or '[user, api]'
17+
# Include 'user' if the change is relevant to end users.
18+
# Include 'api' if there is a change to a library API.
19+
change_logs: [user]

exporter/exporterhelper/batch_sender.go

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@ type batchSender struct {
3333
concurrencyLimit uint64
3434
activeRequests atomic.Uint64
3535

36-
resetTimerCh chan struct{}
37-
3836
mu sync.Mutex
3937
activeBatch *batch
38+
lastFlushed time.Time
4039

4140
logger *zap.Logger
4241

@@ -57,7 +56,6 @@ func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings,
5756
shutdownCh: nil,
5857
shutdownCompleteCh: make(chan struct{}),
5958
stopped: &atomic.Bool{},
60-
resetTimerCh: make(chan struct{}),
6159
}
6260
return bs
6361
}
@@ -85,16 +83,17 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
8583
return
8684
case <-timer.C:
8785
bs.mu.Lock()
86+
nextFlush := bs.cfg.FlushTimeout
8887
if bs.activeBatch.request != nil {
89-
bs.exportActiveBatch()
88+
sinceLastFlush := time.Since(bs.lastFlushed)
89+
if sinceLastFlush >= bs.cfg.FlushTimeout {
90+
bs.exportActiveBatch()
91+
} else {
92+
nextFlush = bs.cfg.FlushTimeout - sinceLastFlush
93+
}
9094
}
9195
bs.mu.Unlock()
92-
timer.Reset(bs.cfg.FlushTimeout)
93-
case <-bs.resetTimerCh:
94-
if !timer.Stop() {
95-
<-timer.C
96-
}
97-
timer.Reset(bs.cfg.FlushTimeout)
96+
timer.Reset(nextFlush)
9897
}
9998
}
10099
}()
@@ -123,15 +122,10 @@ func (bs *batchSender) exportActiveBatch() {
123122
b.err = bs.nextSender.send(b.ctx, b.request)
124123
close(b.done)
125124
}(bs.activeBatch)
125+
bs.lastFlushed = time.Now()
126126
bs.activeBatch = newEmptyBatch()
127127
}
128128

129-
func (bs *batchSender) resetTimer() {
130-
if !bs.stopped.Load() {
131-
bs.resetTimerCh <- struct{}{}
132-
}
133-
}
134-
135129
// isActiveBatchReady returns true if the active batch is ready to be exported.
136130
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
137131
// Caller must hold the lock.
@@ -168,7 +162,6 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err
168162
batch := bs.activeBatch
169163
if bs.isActiveBatchReady() || len(reqs) > 1 {
170164
bs.exportActiveBatch()
171-
bs.resetTimer()
172165
}
173166
bs.mu.Unlock()
174167
<-batch.done
@@ -208,7 +201,6 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
208201
batch := bs.activeBatch
209202
if bs.isActiveBatchReady() {
210203
bs.exportActiveBatch()
211-
bs.resetTimer()
212204
}
213205
bs.mu.Unlock()
214206
<-batch.done

exporter/exporterhelper/batch_sender_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,74 @@ func TestBatchSenderWithTimeout(t *testing.T) {
535535
assert.EqualValues(t, 12, sink.itemsCount.Load())
536536
}
537537

538+
func TestBatchSenderTimerResetNoConflict(t *testing.T) {
539+
bCfg := exporterbatcher.NewDefaultConfig()
540+
bCfg.MinSizeItems = 8
541+
bCfg.FlushTimeout = 50 * time.Millisecond
542+
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
543+
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
544+
require.NoError(t, err)
545+
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
546+
547+
sink := newFakeRequestSink()
548+
549+
// Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer
550+
go func() {
551+
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
552+
}()
553+
time.Sleep(50 * time.Millisecond)
554+
go func() {
555+
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
556+
}()
557+
558+
// The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict
559+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
560+
assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load())
561+
assert.EqualValues(c, 8, sink.itemsCount.Load())
562+
}, 200*time.Millisecond, 10*time.Millisecond)
563+
564+
require.NoError(t, be.Shutdown(context.Background()))
565+
}
566+
567+
func TestBatchSenderTimerFlush(t *testing.T) {
568+
bCfg := exporterbatcher.NewDefaultConfig()
569+
bCfg.MinSizeItems = 8
570+
bCfg.FlushTimeout = 100 * time.Millisecond
571+
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
572+
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
573+
require.NoError(t, err)
574+
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
575+
576+
sink := newFakeRequestSink()
577+
578+
time.Sleep(50 * time.Millisecond)
579+
580+
// Send 2 concurrent requests that should be merged in one batch and sent immediately
581+
go func() {
582+
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
583+
}()
584+
go func() {
585+
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
586+
}()
587+
588+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
589+
assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load())
590+
assert.EqualValues(c, 8, sink.itemsCount.Load())
591+
}, 30*time.Millisecond, 5*time.Millisecond)
592+
593+
// Send another request that should be sent after the flush timeout which supposed to happen after remaining 50ms
594+
go func() {
595+
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
596+
}()
597+
598+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
599+
assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load())
600+
assert.EqualValues(c, 8, sink.itemsCount.Load())
601+
}, 60*time.Millisecond, 5*time.Millisecond)
602+
603+
require.NoError(t, be.Shutdown(context.Background()))
604+
}
605+
538606
func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter {
539607
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption,
540608
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))

0 commit comments

Comments
 (0)