Skip to content

Commit bb3faf0

Browse files
committed
Change the memory queue implementation to not pre-allocate capacity objects.
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 2a6150d commit bb3faf0

File tree

5 files changed

+200
-80
lines changed

5 files changed

+200
-80
lines changed

.chloggen/sized-queue.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Change the memory queue implementation to not pre-allocate capacity objects.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12070]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: This change improves memory usage of the collector under low utilization and is a prerequisite for supporting different other size limitations (number of items, bytes).
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [user]

exporter/exporterqueue/bounded_memory_queue.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
// the producer are dropped.
1717
type boundedMemoryQueue[T any] struct {
1818
component.StartFunc
19-
*sizedChannel[memQueueEl[T]]
19+
*sizedQueue[memQueueEl[T]]
2020
}
2121

2222
// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
@@ -29,17 +29,17 @@ type memoryQueueSettings[T any] struct {
2929
// callback for dropped items (e.g. useful to emit metrics).
3030
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
3131
return &boundedMemoryQueue[T]{
32-
sizedChannel: newSizedChannel[memQueueEl[T]](set.capacity, memQueueElSizer[T]{sizer: set.sizer}),
32+
sizedQueue: newSizedQueue[memQueueEl[T]](set.capacity, memQueueElSizer[T]{sizer: set.sizer}),
3333
}
3434
}
3535

3636
// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
3737
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
38-
return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req})
38+
return q.sizedQueue.push(memQueueEl[T]{ctx: ctx, req: req})
3939
}
4040

4141
func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
42-
item, ok := q.sizedChannel.pop()
42+
item, ok := q.sizedQueue.pop()
4343
return 0, item.ctx, item.req, ok
4444
}
4545

@@ -50,7 +50,7 @@ func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {
5050

5151
// Shutdown closes the queue channel to initiate draining of the queue.
5252
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
53-
q.sizedChannel.shutdown()
53+
q.sizedQueue.shutdown()
5454
return nil
5555
}
5656

exporter/exporterqueue/sized_channel.go

Lines changed: 0 additions & 71 deletions
This file was deleted.

exporter/exporterqueue/sized_queue.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
5+
6+
import (
7+
"errors"
8+
"sync"
9+
)
10+
11+
var errInvalidSize = errors.New("invalid element size")
12+
13+
type node[T any] struct {
14+
data T
15+
size int64
16+
next *node[T]
17+
}
18+
19+
type linkedQueue[T any] struct {
20+
head *node[T]
21+
tail *node[T]
22+
}
23+
24+
func (l *linkedQueue[T]) push(data T, size int64) {
25+
n := &node[T]{data: data, size: size}
26+
if l.tail == nil {
27+
l.head = n
28+
l.tail = n
29+
return
30+
}
31+
l.tail.next = n
32+
l.tail = n
33+
}
34+
35+
func (l *linkedQueue[T]) pop() (T, int64) {
36+
n := l.head
37+
l.head = n.next
38+
if l.head == nil {
39+
l.tail = nil
40+
}
41+
n.next = nil
42+
return n.data, n.size
43+
}
44+
45+
// sizedQueue is a channel wrapper for sized elements with a capacity set to a total size of all the elements.
46+
// The channel will accept elements until the total size of the elements reaches the capacity.
47+
type sizedQueue[T any] struct {
48+
sizer sizer[T]
49+
cap int64
50+
51+
mu sync.Mutex
52+
hasElements *sync.Cond
53+
items *linkedQueue[T]
54+
size int64
55+
stopped bool
56+
}
57+
58+
// newSizedQueue creates a sized elements channel. Each element is assigned a size by the provided sizer.
59+
// capacity is the capacity of the queue.
60+
func newSizedQueue[T any](capacity int64, sizer sizer[T]) *sizedQueue[T] {
61+
sq := &sizedQueue[T]{
62+
sizer: sizer,
63+
cap: capacity,
64+
items: &linkedQueue[T]{},
65+
}
66+
sq.hasElements = sync.NewCond(&sq.mu)
67+
return sq
68+
}
69+
70+
// push puts the element into the queue with the given sized if there is enough capacity.
71+
// Returns an error if the queue is full.
72+
func (sq *sizedQueue[T]) push(el T) error {
73+
elSize := sq.sizer.Sizeof(el)
74+
if elSize == 0 {
75+
return nil
76+
}
77+
78+
if elSize <= 0 {
79+
return errInvalidSize
80+
}
81+
82+
sq.mu.Lock()
83+
defer sq.mu.Unlock()
84+
85+
if sq.size+elSize > sq.cap {
86+
return ErrQueueIsFull
87+
}
88+
89+
sq.size += elSize
90+
sq.items.push(el, elSize)
91+
// Signal one consumer if any.
92+
sq.hasElements.Signal()
93+
return nil
94+
}
95+
96+
// pop removes the element from the queue and returns it.
97+
// The call blocks until there is an item available or the queue is stopped.
98+
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
99+
func (sq *sizedQueue[T]) pop() (T, bool) {
100+
sq.mu.Lock()
101+
defer sq.mu.Unlock()
102+
103+
for {
104+
if sq.size > 0 {
105+
el, elSize := sq.items.pop()
106+
sq.size -= elSize
107+
return el, true
108+
}
109+
110+
if sq.stopped {
111+
var el T
112+
return el, false
113+
}
114+
115+
sq.hasElements.Wait()
116+
}
117+
}
118+
119+
// shutdown closes the queue channel to initiate draining of the queue.
120+
func (sq *sizedQueue[T]) shutdown() {
121+
sq.mu.Lock()
122+
defer sq.mu.Unlock()
123+
sq.stopped = true
124+
sq.hasElements.Broadcast()
125+
}
126+
127+
func (sq *sizedQueue[T]) Size() int {
128+
sq.mu.Lock()
129+
defer sq.mu.Unlock()
130+
return int(sq.size)
131+
}
132+
133+
func (sq *sizedQueue[T]) Capacity() int {
134+
return int(sq.cap)
135+
}

exporter/exporterqueue/sized_channel_test.go renamed to exporter/exporterqueue/sized_queue_test.go

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ func (s sizerInt) Sizeof(el int) int64 {
1616
return int64(el)
1717
}
1818

19-
func TestSizedChannel(t *testing.T) {
20-
q := newSizedChannel[int](7, sizerInt{})
19+
func TestSizedQueue(t *testing.T) {
20+
q := newSizedQueue[int](7, sizerInt{})
2121
require.NoError(t, q.push(1))
2222
assert.Equal(t, 1, q.Size())
2323
assert.Equal(t, 7, q.Capacity())
@@ -45,7 +45,38 @@ func TestSizedChannel(t *testing.T) {
4545
assert.Equal(t, 0, el)
4646
}
4747

48+
func TestSizedQueue_DrainAllElements(t *testing.T) {
49+
q := newSizedQueue[int](7, sizerInt{})
50+
require.NoError(t, q.push(1))
51+
require.NoError(t, q.push(3))
52+
53+
el, ok := q.pop()
54+
assert.Equal(t, 1, el)
55+
assert.True(t, ok)
56+
assert.Equal(t, 3, q.Size())
57+
58+
q.shutdown()
59+
el, ok = q.pop()
60+
assert.Equal(t, 3, el)
61+
assert.True(t, ok)
62+
assert.Equal(t, 0, q.Size())
63+
64+
el, ok = q.pop()
65+
assert.False(t, ok)
66+
assert.Equal(t, 0, el)
67+
}
68+
4869
func TestSizedChannel_OfferInvalidSize(t *testing.T) {
49-
q := newSizedChannel[int](1, sizerInt{})
50-
require.ErrorIs(t, q.push(0), errInvalidSize)
70+
q := newSizedQueue[int](1, sizerInt{})
71+
require.ErrorIs(t, q.push(-1), errInvalidSize)
72+
}
73+
74+
func TestSizedChannel_OfferZeroSize(t *testing.T) {
75+
q := newSizedQueue[int](1, sizerInt{})
76+
require.NoError(t, q.push(0))
77+
q.shutdown()
78+
// Because the size 0 is ignored, nothing to drain.
79+
el, ok := q.pop()
80+
assert.False(t, ok)
81+
assert.Equal(t, 0, el)
5182
}

0 commit comments

Comments
 (0)