Skip to content

Commit 629d70a

Browse files
Add tests for merge metrics
1 parent 2d44c70 commit 629d70a

File tree

2 files changed

+87
-26
lines changed

2 files changed

+87
-26
lines changed

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,8 @@ void abort() {
509509
// {@code IndexWriter} checks the abort flag internally, while running the merge.
510510
// The segments of an aborted merge become available to subsequent merges.
511511
onGoingMerge.getMerge().setAborted();
512+
513+
mergeMetrics.moveQueuedMergeBytesToRunning(onGoingMerge, mergeMemoryEstimateBytes);
512514
try {
513515
if (verbose()) {
514516
message(String.format(Locale.ROOT, "merge task %s start abort", this));

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

Lines changed: 85 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import static org.hamcrest.Matchers.is;
4747
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4848
import static org.mockito.ArgumentMatchers.any;
49+
import static org.mockito.ArgumentMatchers.anyLong;
4950
import static org.mockito.Mockito.doAnswer;
5051
import static org.mockito.Mockito.mock;
5152
import static org.mockito.Mockito.times;
@@ -74,13 +75,14 @@ public void testMergesExecuteInSizeOrder() throws IOException {
7475
nodeEnvironment = newNodeEnvironment(settings);
7576
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests
7677
.getThreadPoolMergeExecutorService(threadPoolTaskQueue.getThreadPool(), settings, nodeEnvironment);
78+
var mergeMetrics = mock(MergeMetrics.class);
7779
try (
7880
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
7981
new ShardId("index", "_na_", 1),
8082
IndexSettingsModule.newIndexSettings("index", Settings.EMPTY),
8183
threadPoolMergeExecutorService,
8284
merge -> 0,
83-
MergeMetrics.NOOP
85+
mergeMetrics
8486
)
8587
) {
8688
List<OneMerge> executedMergesList = new ArrayList<>();
@@ -98,9 +100,19 @@ public void testMergesExecuteInSizeOrder() throws IOException {
98100
return null;
99101
}).when(mergeSource).merge(any(OneMerge.class));
100102
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
103+
104+
// verify queued byte metric is recorded for each merge
105+
verify(mergeMetrics, times(i + 1)).incrementQueuedMergeBytes(any(), anyLong());
101106
}
107+
102108
threadPoolTaskQueue.runAllTasks();
103109
assertThat(executedMergesList.size(), is(mergeCount));
110+
111+
// verify metrics are reported for each merge
112+
verify(mergeMetrics, times(mergeCount)).moveQueuedMergeBytesToRunning(any(), anyLong());
113+
verify(mergeMetrics, times(mergeCount)).decrementRunningMergeBytes(any());
114+
verify(mergeMetrics, times(mergeCount)).markMergeMetrics(any(), anyLong(), anyLong());
115+
104116
// assert merges are executed in ascending size order
105117
for (int i = 1; i < mergeCount; i++) {
106118
assertThat(
@@ -114,6 +126,7 @@ public void testMergesExecuteInSizeOrder() throws IOException {
114126

115127
public void testSimpleMergeTaskBacklogging() {
116128
int mergeExecutorThreadCount = randomIntBetween(1, 5);
129+
var mergeMetrics = mock(MergeMetrics.class);
117130
Settings mergeSchedulerSettings = Settings.builder()
118131
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeExecutorThreadCount)
119132
.build();
@@ -124,7 +137,7 @@ public void testSimpleMergeTaskBacklogging() {
124137
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
125138
threadPoolMergeExecutorService,
126139
merge -> 0,
127-
MergeMetrics.NOOP
140+
mergeMetrics
128141
);
129142
// more merge tasks than merge threads
130143
int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5);
@@ -145,6 +158,9 @@ public void testSimpleMergeTaskBacklogging() {
145158
}
146159
assertThat(threadPoolMergeScheduler.getRunningMergeTasks().size(), is(mergeExecutorThreadCount));
147160
assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(mergeCount - mergeExecutorThreadCount));
161+
162+
// verify no metrics are recorded as no merges have been queued or executed through the merge scheduler
163+
verifyNoInteractions(mergeMetrics);
148164
}
149165

150166
public void testSimpleMergeTaskReEnqueueingBySize() {
@@ -444,6 +460,7 @@ public void testMergesRunConcurrently() throws Exception {
444460
// disable fs available disk space feature for this test
445461
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s")
446462
.build();
463+
var mergeMetrics = mock(MergeMetrics.class);
447464
nodeEnvironment = newNodeEnvironment(settings);
448465
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
449466
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests
@@ -456,13 +473,13 @@ public void testMergesRunConcurrently() throws Exception {
456473
IndexSettingsModule.newIndexSettings("index", settings),
457474
threadPoolMergeExecutorService,
458475
merge -> 0,
459-
MergeMetrics.NOOP
476+
mergeMetrics
460477
)
461478
) {
462479
// at least 1 extra merge than there are concurrently allowed
463480
int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 10);
464481
Semaphore runMergeSemaphore = new Semaphore(0);
465-
for (int i = 0; i < mergeCount; i++) {
482+
for (int i = 1; i <= mergeCount; i++) {
466483
MergeSource mergeSource = mock(MergeSource.class);
467484
OneMerge oneMerge = mock(OneMerge.class);
468485
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
@@ -476,7 +493,11 @@ public void testMergesRunConcurrently() throws Exception {
476493
return null;
477494
}).when(mergeSource).merge(any(OneMerge.class));
478495
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
496+
497+
// verify queued byte metric is recorded for each merge
498+
verify(mergeMetrics, times(i)).incrementQueuedMergeBytes(any(), anyLong());
479499
}
500+
480501
for (int completedMergesCount = 0; completedMergesCount < mergeCount
481502
- mergeSchedulerMaxThreadCount; completedMergesCount++) {
482503
int finalCompletedMergesCount = completedMergesCount;
@@ -521,6 +542,11 @@ public void testMergesRunConcurrently() throws Exception {
521542
runMergeSemaphore.release();
522543
}
523544
assertBusy(() -> assertTrue(threadPoolMergeExecutorService.allDone()));
545+
546+
// verify metrics are recorded for each merge
547+
verify(mergeMetrics, times(mergeCount)).moveQueuedMergeBytesToRunning(any(), anyLong());
548+
verify(mergeMetrics, times(mergeCount)).decrementRunningMergeBytes(any());
549+
verify(mergeMetrics, times(mergeCount)).markMergeMetrics(any(), anyLong(), anyLong());
524550
}
525551
}
526552
}
@@ -701,30 +727,63 @@ public void testAutoIOThrottleForMergeTasks() throws Exception {
701727
}
702728
}
703729

704-
public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() {
705-
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
706-
// build a scheduler that always returns true for shouldSkipMerge
707-
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
708-
new ShardId("index", "_na_", 1),
709-
IndexSettingsModule.newIndexSettings("index", Settings.builder().build()),
710-
threadPoolMergeExecutorService,
711-
merge -> 0,
712-
MergeMetrics.NOOP
730+
public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() throws IOException {
731+
DeterministicTaskQueue threadPoolTaskQueue = new DeterministicTaskQueue();
732+
Settings settings = Settings.builder()
733+
// disable fs available disk space feature for this test
734+
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s")
735+
.build();
736+
nodeEnvironment = newNodeEnvironment(settings);
737+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests
738+
.getThreadPoolMergeExecutorService(threadPoolTaskQueue.getThreadPool(), settings, nodeEnvironment);
739+
var mergeMetrics = mock(MergeMetrics.class);
740+
try (
741+
// build a scheduler that always returns true for shouldSkipMerge
742+
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
743+
new ShardId("index", "_na_", 1),
744+
IndexSettingsModule.newIndexSettings("index", Settings.EMPTY),
745+
threadPoolMergeExecutorService,
746+
merge -> 0,
747+
mergeMetrics
748+
) {
749+
@Override
750+
protected boolean shouldSkipMerge() {
751+
return true;
752+
}
753+
}
713754
) {
714-
@Override
715-
protected boolean shouldSkipMerge() {
716-
return true;
755+
int mergeCount = randomIntBetween(2, 10);
756+
for (int i = 0; i < mergeCount; i++) {
757+
MergeSource mergeSource = mock(MergeSource.class);
758+
OneMerge oneMerge = mock(OneMerge.class);
759+
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
760+
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
761+
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
762+
763+
// create the merge task
764+
MergeTask mergeTask = threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values()));
765+
766+
// verify that calling schedule on the merge task indicates the merge should be aborted
767+
Schedule schedule = threadPoolMergeScheduler.schedule(mergeTask);
768+
assertThat(schedule, is(Schedule.ABORT));
769+
770+
// run the merge through the scheduler
771+
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
772+
773+
// verify queued merge byte metrics are still recorded for each merge
774+
verify(mergeMetrics, times(i + 1)).incrementQueuedMergeBytes(any(), anyLong());
717775
}
718-
};
719-
MergeSource mergeSource = mock(MergeSource.class);
720-
OneMerge oneMerge = mock(OneMerge.class);
721-
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
722-
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
723-
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
724-
MergeTask mergeTask = threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values()));
725-
// verify that calling schedule on the merge task indicates the merge should be aborted
726-
Schedule schedule = threadPoolMergeScheduler.schedule(mergeTask);
727-
assertThat(schedule, is(Schedule.ABORT));
776+
777+
// run all merges; they should all be aborted
778+
threadPoolTaskQueue.runAllTasks();
779+
780+
// verify queued bytes metrics are moved to running and decremented
781+
verify(mergeMetrics, times(mergeCount)).moveQueuedMergeBytesToRunning(any(), anyLong());
782+
verify(mergeMetrics, times(mergeCount)).decrementRunningMergeBytes(any());
783+
784+
// verify we did not mark the merges as merged
785+
verify(mergeMetrics, times(0)).markMergeMetrics(any(), anyLong(), anyLong());
786+
}
728787
}
729788

730789
private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) {

0 commit comments

Comments
 (0)