Skip to content

Commit 1acbafc

Browse files
committed
[chore] Move persistentQueue state management entirely into QueueMetadata
1 parent c27de9b commit 1acbafc

File tree

2 files changed

+34
-76
lines changed

2 files changed

+34
-76
lines changed

exporter/exporterhelper/internal/queuebatch/persistent_queue.go

Lines changed: 31 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ func newPersistentQueue[T any](set persistentQueueSettings[T]) readableQueue[T]
118118
pq.hasMoreElements = sync.NewCond(&pq.mu)
119119
pq.hasMoreSpace = newCond(&pq.mu)
120120
pq.drainingComplete = sync.NewCond(&pq.mu)
121+
pq.metadata = &persistentqueue.QueueMetadata{}
122+
123+
if sizerType, err := persistentqueue.SizerTypeToProto(set.sizerType); err == nil {
124+
pq.metadata.SizerType = sizerType
125+
}
126+
121127
return pq
122128
}
123129

@@ -197,7 +203,7 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
197203
// Load legacy dispatched items
198204
if itemKeysBuf, err := pq.client.Get(ctx, legacyCurrentlyDispatchedItemsKey); err == nil {
199205
if dispatchedItems, err := bytesToItemIndexArray(itemKeysBuf); err == nil {
200-
pq.currentlyDispatchedItems = dispatchedItems
206+
pq.metadata.CurrentlyDispatchedItems = dispatchedItems
201207
}
202208
}
203209

@@ -239,24 +245,18 @@ func (pq *persistentQueue[T]) loadQueueMetadata(ctx context.Context) error {
239245
pq.sizerTypeMismatch.CompareAndSwap(false, true)
240246
}
241247

242-
pq.readIndex = metadata.ReadIndex
243-
pq.writeIndex = metadata.WriteIndex
244-
pq.currentlyDispatchedItems = metadata.CurrentlyDispatchedItems
245-
246-
switch metadata.SizerType {
247-
case persistentqueue.SizerType_REQUESTS:
248+
pq.metadata = metadata
249+
if metadata.SizerType == persistentqueue.SizerType_REQUESTS {
248250
// If the queue is sized by the number of requests, no need to read the queue size from metadata.
251+
queueSize := pq.metadata.WriteIndex - pq.metadata.ReadIndex
249252
//nolint:gosec
250-
pq.queueSize = int64(pq.writeIndex - pq.readIndex)
251-
default:
252-
pq.queueSize = metadata.QueueSize
253+
pq.metadata.QueueSize = int64(queueSize)
253254
}
254-
255255
pq.logger.Info("Loaded queue metadata",
256-
zap.Uint64("readIndex", pq.readIndex),
257-
zap.Uint64("writeIndex", pq.writeIndex),
258-
zap.Int64("queueSize", pq.queueSize),
259-
zap.Int("dispatchedItems", len(pq.currentlyDispatchedItems)))
256+
zap.Uint64("readIndex", pq.metadata.ReadIndex),
257+
zap.Uint64("writeIndex", pq.metadata.WriteIndex),
258+
zap.Int64("queueSize", pq.metadata.QueueSize),
259+
zap.Int("dispatchedItems", len(pq.metadata.CurrentlyDispatchedItems)))
260260

261261
return nil
262262
}
@@ -277,31 +277,9 @@ func (pq *persistentQueue[T]) cleanupLegacyKeys(ctx context.Context) {
277277
}
278278
}
279279

280-
func (pq *persistentQueue[T]) currentMetadata() (*persistentqueue.QueueMetadata, error) {
281-
sizeType, err := persistentqueue.SizerTypeToProto(pq.set.sizerType)
282-
if err != nil {
283-
pq.logger.Error("Failed to marshal sizer type", zap.Error(err))
284-
return nil, err
285-
}
286-
287-
meta := persistentqueue.QueueMetadata{
288-
SizerType: sizeType,
289-
ReadIndex: pq.readIndex,
290-
WriteIndex: pq.writeIndex,
291-
CurrentlyDispatchedItems: pq.currentlyDispatchedItems,
292-
QueueSize: pq.queueSize,
293-
}
294-
return &meta, nil
295-
}
296-
297280
// backupCurrentMetadata is used for standalone metadata persistence like in Shutdown or initialization.
298281
func (pq *persistentQueue[T]) backupCurrentMetadata(ctx context.Context) error {
299-
metaData, err := pq.currentMetadata()
300-
if err != nil {
301-
return err
302-
}
303-
304-
metadataBytes, err := metadataToBytes(metaData)
282+
metadataBytes, err := metadataToBytes(pq.metadata)
305283
if err != nil {
306284
return err
307285
}
@@ -369,37 +347,26 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
369347
return err
370348
}
371349

372-
// Prepare metadata for update
373-
metadataToStore, err := pq.currentMetadata()
350+
itemIndex := pq.metadata.WriteIndex
351+
pq.metadata.WriteIndex++
352+
pq.metadata.QueueSize += reqSize
353+
metadataBytes, err := metadataToBytes(pq.metadata)
374354
if err != nil {
375355
return err
376356
}
377357

378-
// Update the metadata with the new writeIndex and queueSize
379-
pendingWriteIndex := pq.writeIndex + 1
380-
pendingQueueSize := pq.queueSize + reqSize
381-
metadataToStore.WriteIndex = pendingWriteIndex
382-
metadataToStore.QueueSize = pendingQueueSize
383-
384-
metadataBytes, err := metadataToBytes(metadataToStore)
385-
if err != nil {
386-
return err
387-
}
388-
389-
// Carry out a transaction where we both add the item and update the write index
358+
// Carry out a transaction where we both add the item and update the metadata.
390359
ops := []*storage.Operation{
391360
storage.SetOperation(queueMetadataKey, metadataBytes),
392-
storage.SetOperation(getItemKey(pq.writeIndex), reqBuf),
361+
storage.SetOperation(getItemKey(itemIndex), reqBuf),
393362
}
394363
if err = pq.client.Batch(ctx, ops...); err != nil {
364+
pq.metadata.WriteIndex--
365+
pq.metadata.QueueSize -= reqSize
395366
return err
396367
}
397368

398-
// Update in-memory state only after successful persistence
399-
pq.writeIndex = pendingWriteIndex
400-
pq.queueSize = pendingQueueSize
401369
pq.hasMoreElements.Signal()
402-
403370
return nil
404371
}
405372

@@ -440,16 +407,12 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
440407
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) {
441408
index := pq.metadata.ReadIndex
442409
// Increase here, so even if errors happen below, it always iterates
443-
pq.readIndex++
444-
pq.currentlyDispatchedItems = append(pq.currentlyDispatchedItems, index)
410+
pq.metadata.ReadIndex++
411+
pq.metadata.CurrentlyDispatchedItems = append(pq.metadata.CurrentlyDispatchedItems, index)
445412

446-
metadataToStore, err := pq.currentMetadata() // Reflects updated readIndex and currentlyDispatchedItems
413+
metadataBytes, err := metadataToBytes(pq.metadata)
447414
var request T
448-
if err != nil {
449-
return 0, request, false
450-
}
451415

452-
metadataBytes, err := metadataToBytes(metadataToStore)
453416
if err != nil {
454417
return 0, request, false
455418
}
@@ -513,7 +476,7 @@ func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr erro
513476
}
514477

515478
// Check if we've completed draining during sizer type mismatch.
516-
if pq.sizerTypeMismatch.Load() && pq.readIndex == pq.writeIndex && len(pq.currentlyDispatchedItems) == 0 && pq.queueSize == 0 {
479+
if pq.sizerTypeMismatch.Load() && pq.metadata.ReadIndex == pq.metadata.WriteIndex && len(pq.metadata.CurrentlyDispatchedItems) == 0 && pq.metadata.QueueSize == 0 {
517480
pq.logger.Info("Queue drain completed due to sizer type mismatch, allowing new requests")
518481

519482
pq.set.sizer = pq.originalConfiguredSizer
@@ -531,7 +494,7 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
531494
defer pq.mu.Unlock()
532495
pq.logger.Debug("Checking if there are items left for dispatch by consumers")
533496

534-
dispatchedItems := pq.currentlyDispatchedItems
497+
dispatchedItems := pq.metadata.CurrentlyDispatchedItems
535498

536499
if len(dispatchedItems) == 0 {
537500
pq.logger.Debug("No items left for dispatch by consumers")
@@ -540,7 +503,7 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
540503

541504
pq.logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems,
542505
len(dispatchedItems)))
543-
pq.currentlyDispatchedItems = pq.currentlyDispatchedItems[:0]
506+
pq.metadata.CurrentlyDispatchedItems = pq.metadata.CurrentlyDispatchedItems[:0]
544507

545508
retrieveBatch := make([]*storage.Operation, len(dispatchedItems))
546509
cleanupBatch := make([]*storage.Operation, len(dispatchedItems))
@@ -598,12 +561,7 @@ func (pq *persistentQueue[T]) itemDispatchingFinish(ctx context.Context, index u
598561
}
599562
}
600563

601-
metadataToStore, err := pq.currentMetadata()
602-
if err != nil {
603-
return err
604-
}
605-
606-
metadataBytes, err := metadataToBytes(metadataToStore)
564+
metadataBytes, err := metadataToBytes(pq.metadata)
607565
if err != nil {
608566
return err
609567
}

exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,10 +1235,10 @@ func TestPersistentQueue_SizerLegacyFormatMigration(t *testing.T) {
12351235
expectedQueueSize := int64(queueSize) + int64(len(currentlyDispatchedItems))*pq.set.sizer.Sizeof(req)
12361236

12371237
// Assert queue state
1238-
assert.Equal(t, readIndex, pq.readIndex)
1239-
assert.Equal(t, expectedWriteIndex, pq.writeIndex)
1238+
assert.Equal(t, readIndex, pq.metadata.ReadIndex)
1239+
assert.Equal(t, expectedWriteIndex, pq.metadata.WriteIndex)
12401240
assert.Equal(t, expectedQueueSize, pq.Size())
1241-
assert.Empty(t, pq.currentlyDispatchedItems)
1241+
assert.Empty(t, pq.metadata.CurrentlyDispatchedItems)
12421242

12431243
// Assert new metadata in storage
12441244
metadataBytes, err := pq.client.Get(context.Background(), queueMetadataKey)

0 commit comments

Comments
 (0)