@@ -98,11 +98,8 @@ type persistentQueue[T any] struct {
98
98
mu sync.Mutex
99
99
hasMoreElements * sync.Cond
100
100
hasMoreSpace * cond
101
- readIndex uint64
102
- writeIndex uint64
103
- currentlyDispatchedItems []uint64
104
- queueSize int64
105
- refClient int64
101
+ metadata * persistentqueue.QueueMetadata
102
+ refClient int64
106
103
stopped bool
107
104
108
105
sizerTypeMismatch atomic.Bool
@@ -137,7 +134,7 @@ func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) er
137
134
func (pq * persistentQueue [T ]) Size () int64 {
138
135
pq .mu .Lock ()
139
136
defer pq .mu .Unlock ()
140
- return pq .queueSize
137
+ return pq .metadata . QueueSize
141
138
}
142
139
143
140
func (pq * persistentQueue [T ]) Capacity () int64 {
@@ -167,11 +164,11 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
167
164
168
165
err := pq .client .Batch (ctx , riOp , wiOp )
169
166
if err == nil {
170
- pq .readIndex , err = bytesToItemIndex (riOp .Value )
167
+ pq .metadata . ReadIndex , err = bytesToItemIndex (riOp .Value )
171
168
}
172
169
173
170
if err == nil {
174
- pq .writeIndex , err = bytesToItemIndex (wiOp .Value )
171
+ pq .metadata . WriteIndex , err = bytesToItemIndex (wiOp .Value )
175
172
}
176
173
177
174
if err != nil {
@@ -180,11 +177,11 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
180
177
} else {
181
178
pq .logger .Error ("Failed getting read/write index, starting with new ones" , zap .Error (err ))
182
179
}
183
- pq .readIndex = 0
184
- pq .writeIndex = 0
180
+ pq .metadata . ReadIndex = 0
181
+ pq .metadata . WriteIndex = 0
185
182
}
186
183
187
- queueSize := pq .writeIndex - pq .readIndex
184
+ queueSize := pq .metadata . WriteIndex - pq .metadata . ReadIndex
188
185
189
186
// If the queue is sized by the number of requests, no need to read the queue size from storage.
190
187
if queueSize > 0 && ! pq .isRequestSized {
@@ -359,7 +356,7 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
359
356
// putInternal is the internal version that requires caller to hold the mutex lock.
360
357
func (pq * persistentQueue [T ]) putInternal (ctx context.Context , req T ) error {
361
358
reqSize := pq .set .sizer .Sizeof (req )
362
- for pq .queueSize + reqSize > pq .set .capacity {
359
+ for pq .metadata . QueueSize + reqSize > pq .set .capacity {
363
360
if ! pq .set .blockOnOverflow {
364
361
return ErrQueueIsFull
365
362
}
@@ -418,11 +415,11 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
418
415
}
419
416
420
417
// Read until either a successful retrieved element or no more elements in the storage.
421
- for pq .readIndex != pq .writeIndex {
418
+ for pq .metadata . ReadIndex != pq .metadata . WriteIndex {
422
419
index , req , consumed := pq .getNextItem (ctx )
423
420
// Ensure the used size and the channel size are in sync.
424
- if pq .readIndex == pq .writeIndex {
425
- pq .queueSize = 0
421
+ if pq .metadata . ReadIndex == pq .metadata . WriteIndex {
422
+ pq .metadata . QueueSize = 0
426
423
pq .hasMoreSpace .Signal ()
427
424
}
428
425
if consumed {
@@ -442,7 +439,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
442
439
// finished, the index should be called with onDone to clean up the storage. If no new item is available,
443
440
// returns false.
444
441
func (pq * persistentQueue [T ]) getNextItem (ctx context.Context ) (uint64 , T , bool ) {
445
- index := pq .readIndex
442
+ index := pq .metadata . ReadIndex
446
443
// Increase here, so even if errors happen below, it always iterates
447
444
pq .readIndex ++
448
445
pq .currentlyDispatchedItems = append (pq .currentlyDispatchedItems , index )
@@ -496,12 +493,12 @@ func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr erro
496
493
pq .mu .Unlock ()
497
494
}()
498
495
499
- pq .queueSize -= elSize
496
+ pq .metadata . QueueSize -= elSize
500
497
// The size might be not in sync with the queue in case it's restored from the disk
501
498
// because we don't flush the current queue size on the disk on every read/write.
502
499
// In that case we need to make sure it doesn't go below 0.
503
- if pq .queueSize < 0 {
504
- pq .queueSize = 0
500
+ if pq .metadata . QueueSize < 0 {
501
+ pq .metadata . QueueSize = 0
505
502
}
506
503
pq .hasMoreSpace .Signal ()
507
504
@@ -593,11 +590,11 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
593
590
594
591
// itemDispatchingFinish removes the item from the list of currently dispatched items and deletes it from the persistent queue
595
592
func (pq * persistentQueue [T ]) itemDispatchingFinish (ctx context.Context , index uint64 ) error {
596
- lenCDI := len (pq .currentlyDispatchedItems )
593
+ lenCDI := len (pq .metadata . CurrentlyDispatchedItems )
597
594
for i := 0 ; i < lenCDI ; i ++ {
598
- if pq .currentlyDispatchedItems [i ] == index {
599
- pq .currentlyDispatchedItems [i ] = pq .currentlyDispatchedItems [lenCDI - 1 ]
600
- pq .currentlyDispatchedItems = pq .currentlyDispatchedItems [:lenCDI - 1 ]
595
+ if pq .metadata . CurrentlyDispatchedItems [i ] == index {
596
+ pq .metadata . CurrentlyDispatchedItems [i ] = pq .metadata . CurrentlyDispatchedItems [lenCDI - 1 ]
597
+ pq .metadata . CurrentlyDispatchedItems = pq .metadata . CurrentlyDispatchedItems [:lenCDI - 1 ]
601
598
break
602
599
}
603
600
}
0 commit comments