Skip to content

Commit b6453e7

Browse files
authored
Merge branch 'main' into add-featuregate-examples
2 parents c7869df + 56433bc commit b6453e7

File tree

24 files changed

+89
-160
lines changed

24 files changed

+89
-160
lines changed

cmd/otelcorecol/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ require (
7474
github.com/prometheus/client_model v0.6.2 // indirect
7575
github.com/prometheus/common v0.64.0 // indirect
7676
github.com/prometheus/procfs v0.16.1 // indirect
77-
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
7877
github.com/rs/cors v1.11.1 // indirect
7978
github.com/shirou/gopsutil/v4 v4.25.5 // indirect
8079
github.com/spf13/cobra v1.9.1 // indirect

cmd/otelcorecol/go.sum

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/debugexporter/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ require (
4040
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
4141
github.com/modern-go/reflect2 v1.0.2 // indirect
4242
github.com/pmezard/go-difflib v1.0.0 // indirect
43-
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
4443
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
4544
go.opentelemetry.io/collector/config/configretry v1.33.0 // indirect
4645
go.opentelemetry.io/collector/consumer/consumererror v0.127.0 // indirect

exporter/debugexporter/go.sum

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exporter/exporterhelper/internal/queuebatch/multi_batcher.go

Lines changed: 31 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,56 +6,45 @@ import (
66
"context"
77
"sync"
88

9-
"github.com/puzpuzpuz/xsync/v3"
10-
119
"go.opentelemetry.io/collector/component"
1210
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1311
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1412
)
1513

14+
type batcherSettings[T any] struct {
15+
sizerType request.SizerType
16+
sizer request.Sizer[T]
17+
partitioner Partitioner[T]
18+
next sender.SendFunc[T]
19+
maxWorkers int
20+
}
21+
1622
type multiBatcher struct {
1723
cfg BatchConfig
18-
workerPool chan struct{}
24+
wp *workerPool
1925
sizerType request.SizerType
2026
sizer request.Sizer[request.Request]
2127
partitioner Partitioner[request.Request]
2228
consumeFunc sender.SendFunc[request.Request]
2329

2430
singleShard *shardBatcher
25-
shards *xsync.MapOf[string, *shardBatcher]
31+
shards sync.Map
2632
}
2733

2834
var _ Batcher[request.Request] = (*multiBatcher)(nil)
2935

3036
func newMultiBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *multiBatcher {
31-
var workerPool chan struct{}
32-
if bSet.maxWorkers != 0 {
33-
workerPool = make(chan struct{}, bSet.maxWorkers)
34-
for i := 0; i < bSet.maxWorkers; i++ {
35-
workerPool <- struct{}{}
36-
}
37-
}
3837
mb := &multiBatcher{
3938
cfg: bCfg,
40-
workerPool: workerPool,
39+
wp: newWorkerPool(bSet.maxWorkers),
4140
sizerType: bSet.sizerType,
4241
sizer: bSet.sizer,
4342
partitioner: bSet.partitioner,
4443
consumeFunc: bSet.next,
4544
}
4645

4746
if bSet.partitioner == nil {
48-
mb.singleShard = &shardBatcher{
49-
cfg: bCfg,
50-
workerPool: mb.workerPool,
51-
sizerType: bSet.sizerType,
52-
sizer: bSet.sizer,
53-
consumeFunc: bSet.next,
54-
stopWG: sync.WaitGroup{},
55-
shutdownCh: make(chan struct{}, 1),
56-
}
57-
} else {
58-
mb.shards = xsync.NewMapOf[string, *shardBatcher]()
47+
mb.singleShard = newShard(mb.cfg, mb.sizerType, mb.sizer, mb.wp, mb.consumeFunc)
5948
}
6049
return mb
6150
}
@@ -66,25 +55,24 @@ func (mb *multiBatcher) getShard(ctx context.Context, req request.Request) *shar
6655
}
6756

6857
key := mb.partitioner.GetKey(ctx, req)
69-
result, _ := mb.shards.LoadOrCompute(key, func() *shardBatcher {
70-
s := &shardBatcher{
71-
cfg: mb.cfg,
72-
workerPool: mb.workerPool,
73-
sizerType: mb.sizerType,
74-
sizer: mb.sizer,
75-
consumeFunc: mb.consumeFunc,
76-
stopWG: sync.WaitGroup{},
77-
shutdownCh: make(chan struct{}, 1),
78-
}
79-
s.start(ctx, nil)
80-
return s
81-
})
82-
return result
58+
// Fast path, shard already created.
59+
s, found := mb.shards.Load(key)
60+
if found {
61+
return s.(*shardBatcher)
62+
}
63+
newS := newShard(mb.cfg, mb.sizerType, mb.sizer, mb.wp, mb.consumeFunc)
64+
_ = newS.Start(ctx, nil)
65+
s, loaded := mb.shards.LoadOrStore(key, newS)
66+
// If not loaded, there was a race condition in adding the new shard. Shutdown the newly created shard.
67+
if loaded {
68+
_ = newS.Shutdown(ctx)
69+
}
70+
return s.(*shardBatcher)
8371
}
8472

8573
func (mb *multiBatcher) Start(ctx context.Context, host component.Host) error {
8674
if mb.singleShard != nil {
87-
mb.singleShard.start(ctx, host)
75+
return mb.singleShard.Start(ctx, host)
8876
}
8977
return nil
9078
}
@@ -96,16 +84,15 @@ func (mb *multiBatcher) Consume(ctx context.Context, req request.Request, done D
9684

9785
func (mb *multiBatcher) Shutdown(ctx context.Context) error {
9886
if mb.singleShard != nil {
99-
mb.singleShard.shutdown(ctx)
100-
return nil
87+
return mb.singleShard.Shutdown(ctx)
10188
}
10289

10390
var wg sync.WaitGroup
104-
wg.Add(mb.shards.Size())
105-
mb.shards.Range(func(_ string, shard *shardBatcher) bool {
91+
mb.shards.Range(func(_ any, shard any) bool {
92+
wg.Add(1)
10693
go func() {
107-
shard.shutdown(ctx)
108-
wg.Done()
94+
defer wg.Done()
95+
_ = shard.(*shardBatcher).Shutdown(ctx)
10996
}()
11097
return true
11198
})

exporter/exporterhelper/internal/queuebatch/shard_batcher.go

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,10 @@ type batch struct {
2121
done multiDone
2222
}
2323

24-
type batcherSettings[T any] struct {
25-
sizerType request.SizerType
26-
sizer request.Sizer[T]
27-
partitioner Partitioner[T]
28-
next sender.SendFunc[T]
29-
maxWorkers int
30-
}
31-
3224
// shardBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
3325
type shardBatcher struct {
3426
cfg BatchConfig
35-
workerPool chan struct{}
27+
wp *workerPool
3628
sizerType request.SizerType
3729
sizer request.Sizer[request.Request]
3830
consumeFunc sender.SendFunc[request.Request]
@@ -43,6 +35,17 @@ type shardBatcher struct {
4335
shutdownCh chan struct{}
4436
}
4537

38+
func newShard(cfg BatchConfig, sizerType request.SizerType, sizer request.Sizer[request.Request], wp *workerPool, next sender.SendFunc[request.Request]) *shardBatcher {
39+
return &shardBatcher{
40+
cfg: cfg,
41+
wp: wp,
42+
sizerType: sizerType,
43+
sizer: sizer,
44+
consumeFunc: next,
45+
shutdownCh: make(chan struct{}, 1),
46+
}
47+
}
48+
4649
func (qb *shardBatcher) resetTimer() {
4750
if qb.cfg.FlushTimeout > 0 {
4851
qb.timer.Reset(qb.cfg.FlushTimeout)
@@ -88,7 +91,7 @@ func (qb *shardBatcher) Consume(ctx context.Context, req request.Request, done D
8891
}
8992

9093
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, req)
91-
// If failed to merge signal all Done callbacks from current batch as well as the current request and reset the current batch.
94+
// If failed to merge signal all Done callbacks from the current batch as well as the current request and reset the current batch.
9295
if mergeSplitErr != nil || len(reqList) == 0 {
9396
done.OnDone(mergeSplitErr)
9497
qb.currentBatchMu.Unlock()
@@ -102,7 +105,7 @@ func (qb *shardBatcher) Consume(ctx context.Context, req request.Request, done D
102105

103106
// We have at least one result in the reqList, if more results here is what that means:
104107
// - First result will contain items from the current batch + some results from the current request.
105-
// - All other results except first will contain items only from current request.
108+
// - All other results except first will contain items only from the current request.
106109
// - Last result may not have enough data to be flushed.
107110

108111
// Logic on how to deal with the current batch:
@@ -145,8 +148,12 @@ func (qb *shardBatcher) Consume(ctx context.Context, req request.Request, done D
145148
}
146149
}
147150

148-
// startTimeBasedFlushingGoroutine starts a goroutine that flushes on timeout.
149-
func (qb *shardBatcher) startTimeBasedFlushingGoroutine() {
151+
// Start starts the goroutine that reads from the queue and flushes asynchronously.
152+
func (qb *shardBatcher) Start(context.Context, component.Host) error {
153+
if qb.cfg.FlushTimeout <= 0 {
154+
return nil
155+
}
156+
qb.timer = time.NewTimer(qb.cfg.FlushTimeout)
150157
qb.stopWG.Add(1)
151158
go func() {
152159
defer qb.stopWG.Done()
@@ -159,14 +166,16 @@ func (qb *shardBatcher) startTimeBasedFlushingGoroutine() {
159166
}
160167
}
161168
}()
169+
return nil
162170
}
163171

164-
// Start starts the goroutine that reads from the queue and flushes asynchronously.
165-
func (qb *shardBatcher) start(_ context.Context, _ component.Host) {
166-
if qb.cfg.FlushTimeout > 0 {
167-
qb.timer = time.NewTimer(qb.cfg.FlushTimeout)
168-
qb.startTimeBasedFlushingGoroutine()
169-
}
172+
// Shutdown ensures that queue and all Batcher are stopped.
173+
func (qb *shardBatcher) Shutdown(context.Context) error {
174+
close(qb.shutdownCh)
175+
// Make sure execute one last flush if necessary.
176+
qb.flushCurrentBatchIfNecessary()
177+
qb.stopWG.Wait()
178+
return nil
170179
}
171180

172181
// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
@@ -188,24 +197,28 @@ func (qb *shardBatcher) flushCurrentBatchIfNecessary() {
188197
// flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary.
189198
func (qb *shardBatcher) flush(ctx context.Context, req request.Request, done Done) {
190199
qb.stopWG.Add(1)
191-
if qb.workerPool != nil {
192-
<-qb.workerPool
193-
}
194-
go func() {
200+
qb.wp.execute(func() {
195201
defer qb.stopWG.Done()
196202
done.OnDone(qb.consumeFunc(ctx, req))
197-
if qb.workerPool != nil {
198-
qb.workerPool <- struct{}{}
199-
}
200-
}()
203+
})
201204
}
202205

203-
// Shutdown ensures that queue and all Batcher are stopped.
204-
func (qb *shardBatcher) shutdown(_ context.Context) {
205-
close(qb.shutdownCh)
206-
// Make sure execute one last flush if necessary.
207-
qb.flushCurrentBatchIfNecessary()
208-
qb.stopWG.Wait()
206+
type workerPool struct {
207+
workers chan struct{}
208+
}
209+
210+
func newWorkerPool(maxWorkers int) *workerPool {
211+
workers := make(chan struct{}, maxWorkers)
212+
for i := 0; i < maxWorkers; i++ {
213+
workers <- struct{}{}
214+
}
215+
return &workerPool{workers: workers}
216+
}
217+
218+
func (wp *workerPool) execute(f func()) {
219+
<-wp.workers
220+
go f()
221+
wp.workers <- struct{}{}
209222
}
210223

211224
type multiDone []Done

0 commit comments

Comments
 (0)