diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java index f3a9e5db28047..9129292d43837 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java @@ -88,12 +88,14 @@ class TestInternalEngine extends org.elasticsearch.index.engine.InternalEngine { protected ElasticsearchMergeScheduler createMergeScheduler( ShardId shardId, IndexSettings indexSettings, - @Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService + @Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService, + MergeMetrics mergeMetrics ) { ElasticsearchMergeScheduler mergeScheduler = super.createMergeScheduler( shardId, indexSettings, - threadPoolMergeExecutorService + threadPoolMergeExecutorService, + mergeMetrics ); assertThat(mergeScheduler, instanceOf(ThreadPoolMergeScheduler.class)); // assert there is a single merge executor service for all shards diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 5da1e1d3ba024..6dafab431500e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.MapperMetrics; @@ -680,7 +681,8 @@ public static final IndexShard newIndexShard( null, MapperMetrics.NOOP, new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index a5166a5e68da9..520df8a8ebeca 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -88,7 +88,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); } diff --git a/server/src/main/java/org/elasticsearch/index/IndexModule.java b/server/src/main/java/org/elasticsearch/index/IndexModule.java index 42410c6b8025e..42ab9ae362509 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/server/src/main/java/org/elasticsearch/index/IndexModule.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperMetrics; @@ -181,6 +182,7 @@ public interface DirectoryWrapper { private final MapperMetrics mapperMetrics; private final IndexingStatsSettings indexingStatsSettings; private final SearchStatsSettings searchStatsSettings; + private final MergeMetrics mergeMetrics; /** * Construct the index module for the index with the specified index settings. The index module contains extension points for plugins @@ -190,6 +192,7 @@ public interface DirectoryWrapper { * @param analysisRegistry the analysis registry * @param engineFactory the engine factory * @param directoryFactories the available store types + * @param mergeMetrics */ public IndexModule( final IndexSettings indexSettings, @@ -203,7 +206,8 @@ public IndexModule( final MapperMetrics mapperMetrics, final List searchOperationListeners, final IndexingStatsSettings indexingStatsSettings, - final SearchStatsSettings searchStatsSettings + final SearchStatsSettings searchStatsSettings, + final MergeMetrics mergeMetrics ) { this.indexSettings = indexSettings; this.analysisRegistry = analysisRegistry; @@ -220,6 +224,7 @@ public IndexModule( this.mapperMetrics = mapperMetrics; this.indexingStatsSettings = indexingStatsSettings; this.searchStatsSettings = searchStatsSettings; + this.mergeMetrics = mergeMetrics; } /** @@ -557,7 +562,8 @@ public IndexService newIndexService( mapperMetrics, queryRewriteInterceptor, indexingStatsSettings, - searchStatsSettings + searchStatsSettings, + mergeMetrics ); success = true; return indexService; diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 2c9616e0791e5..b5180f70ae845 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.IndexFieldData; @@ -172,6 +173,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final QueryRewriteInterceptor queryRewriteInterceptor; private final IndexingStatsSettings indexingStatsSettings; private final SearchStatsSettings searchStatsSettings; + private final MergeMetrics mergeMetrics; @SuppressWarnings("this-escape") public IndexService( @@ -210,7 +212,8 @@ public IndexService( MapperMetrics mapperMetrics, QueryRewriteInterceptor queryRewriteInterceptor, IndexingStatsSettings indexingStatsSettings, - SearchStatsSettings searchStatsSettings + SearchStatsSettings searchStatsSettings, + MergeMetrics mergeMetrics ) { super(indexSettings); assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS @@ -297,6 +300,7 @@ public IndexService( } this.indexingStatsSettings = indexingStatsSettings; this.searchStatsSettings = searchStatsSettings; + this.mergeMetrics = mergeMetrics; updateFsyncTaskIfNecessary(); } @@ -588,7 +592,8 @@ public synchronized IndexShard createShard( indexCommitListener, mapperMetrics, indexingStatsSettings, - searchStatsSettings + searchStatsSettings, + mergeMetrics ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 6137aed83ec7b..cdb8a39d4713b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -149,6 +149,8 @@ public Supplier retentionLeasesSupplier() { private final EngineResetLock engineResetLock; + private final MergeMetrics mergeMetrics; + /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ @@ -181,7 +183,8 @@ public EngineConfig( Engine.IndexCommitListener indexCommitListener, boolean promotableToPrimary, MapperService mapperService, - EngineResetLock engineResetLock + EngineResetLock engineResetLock, + MergeMetrics mergeMetrics ) { this.shardId = shardId; this.indexSettings = indexSettings; @@ -229,6 +232,7 @@ public EngineConfig( // always use compound on flush - reduces # of file-handles on refresh this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true); this.engineResetLock = engineResetLock; + this.mergeMetrics = mergeMetrics; } /** @@ -477,4 +481,8 @@ public MapperService getMapperService() { public EngineResetLock getEngineResetLock() { return engineResetLock; } + + public MergeMetrics getMergeMetrics() { + return mergeMetrics; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0a6084ff9cf11..170ccc2cc7bbb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -257,7 +257,8 @@ public InternalEngine(EngineConfig engineConfig) { mergeScheduler = createMergeScheduler( engineConfig.getShardId(), engineConfig.getIndexSettings(), - engineConfig.getThreadPoolMergeExecutorService() + engineConfig.getThreadPoolMergeExecutorService(), + engineConfig.getMergeMetrics() ); scheduler = mergeScheduler.getMergeScheduler(); throttle = new IndexThrottle(pauseIndexingOnThrottle); @@ -2908,10 +2909,11 @@ protected void doRun() { protected ElasticsearchMergeScheduler createMergeScheduler( ShardId shardId, IndexSettings indexSettings, - @Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService + @Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService, + MergeMetrics mergeMetrics ) { if (threadPoolMergeExecutorService != null) { - return new EngineThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutorService); + return new EngineThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutorService, mergeMetrics); } else { return new EngineConcurrentMergeScheduler(shardId, indexSettings); } @@ -2921,9 +2923,10 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu EngineThreadPoolMergeScheduler( ShardId shardId, IndexSettings indexSettings, - ThreadPoolMergeExecutorService threadPoolMergeExecutorService + ThreadPoolMergeExecutorService threadPoolMergeExecutorService, + MergeMetrics mergeMetrics ) { - super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes); + super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes, mergeMetrics); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/MergeMetrics.java b/server/src/main/java/org/elasticsearch/index/engine/MergeMetrics.java new file mode 100644 index 0000000000000..f381f442d05a0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/MergeMetrics.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.MergePolicy; +import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongHistogram; +import org.elasticsearch.telemetry.metric.LongWithAttributes; +import org.elasticsearch.telemetry.metric.MeterRegistry; + +import java.util.concurrent.atomic.AtomicLong; + +public class MergeMetrics { + + public static final String MERGE_SEGMENTS_SIZE = "es.merge.segments.size"; + public static final String MERGE_DOCS_TOTAL = "es.merge.docs.total"; + public static final String MERGE_SEGMENTS_QUEUED_USAGE = "es.merge.segments.queued.usage"; + public static final String MERGE_SEGMENTS_RUNNING_USAGE = "es.merge.segments.running.usage"; + public static final String MERGE_SEGMENTS_MERGED_SIZE = "es.merge.segments.merged.size"; + public static final String MERGE_QUEUED_ESTIMATED_MEMORY_SIZE = "es.merge.segments.memory.size"; + public static final String MERGE_TIME_IN_SECONDS = "es.merge.time"; + public static MergeMetrics NOOP = new MergeMetrics(TelemetryProvider.NOOP.getMeterRegistry()); + + private final LongCounter mergeSizeInBytes; + private final LongCounter mergeMergedSegmentSizeInBytes; + private final LongCounter mergeNumDocs; + private final LongHistogram mergeTimeInSeconds; + + private final AtomicLong runningMergeSizeInBytes = new AtomicLong(); + private final AtomicLong queuedMergeSizeInBytes = new AtomicLong(); + private final AtomicLong queuedEstimatedMergeMemoryInBytes = new AtomicLong(); + + public MergeMetrics(MeterRegistry meterRegistry) { + mergeSizeInBytes = meterRegistry.registerLongCounter(MERGE_SEGMENTS_SIZE, "Total size of segments merged", "bytes"); + meterRegistry.registerLongGauge( + MERGE_SEGMENTS_QUEUED_USAGE, + "Total usage of segments queued to be merged", + "bytes", + () -> new LongWithAttributes(queuedMergeSizeInBytes.get()) + ); + meterRegistry.registerLongGauge( + MERGE_SEGMENTS_RUNNING_USAGE, + "Total usage of segments currently being merged", + "bytes", + () -> new LongWithAttributes(runningMergeSizeInBytes.get()) + ); + mergeMergedSegmentSizeInBytes = meterRegistry.registerLongCounter( + MERGE_SEGMENTS_MERGED_SIZE, + "Total size of the new merged segments", + "bytes" + ); + mergeNumDocs = meterRegistry.registerLongCounter(MERGE_DOCS_TOTAL, "Total number of documents merged", "documents"); + mergeTimeInSeconds = meterRegistry.registerLongHistogram(MERGE_TIME_IN_SECONDS, "Merge time in seconds", "seconds"); + meterRegistry.registerLongGauge( + MERGE_QUEUED_ESTIMATED_MEMORY_SIZE, + "Estimated memory usage for queued merges", + "bytes", + () -> new LongWithAttributes(queuedEstimatedMergeMemoryInBytes.get()) + ); + } + + public void incrementQueuedMergeBytes(OnGoingMerge currentMerge, long estimatedMemorySize) { + queuedMergeSizeInBytes.getAndAdd(currentMerge.getTotalBytesSize()); + queuedEstimatedMergeMemoryInBytes.getAndAdd(estimatedMemorySize); + } + + public void moveQueuedMergeBytesToRunning(OnGoingMerge currentMerge, long estimatedMemorySize) { + long totalSize = currentMerge.getTotalBytesSize(); + queuedMergeSizeInBytes.getAndAdd(-totalSize); + runningMergeSizeInBytes.getAndAdd(totalSize); + queuedEstimatedMergeMemoryInBytes.getAndAdd(-estimatedMemorySize); + } + + public void decrementRunningMergeBytes(OnGoingMerge currentMerge) { + runningMergeSizeInBytes.getAndAdd(-currentMerge.getTotalBytesSize()); + } + + public void markMergeMetrics(MergePolicy.OneMerge currentMerge, long mergedSegmentSize, long tookMillis) { + mergeSizeInBytes.incrementBy(currentMerge.totalBytesSize()); + mergeMergedSegmentSizeInBytes.incrementBy(mergedSegmentSize); + mergeNumDocs.incrementBy(currentMerge.totalNumDocs()); + mergeTimeInSeconds.record(tookMillis / 1000); + } + + public long getQueuedMergeSizeInBytes() { + return queuedMergeSizeInBytes.get(); + } + + public long getRunningMergeSizeInBytes() { + return runningMergeSizeInBytes.get(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index e3c76374a62a7..7f1ad5adf3181 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -64,6 +64,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics private final MergeSchedulerConfig config; protected final Logger logger; private final MergeTracking mergeTracking; + private final MergeMetrics mergeMetrics; private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final PriorityQueue backloggedMergeTasks = new PriorityQueue<>( 16, @@ -86,16 +87,19 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics * @param indexSettings used to obtain the {@link MergeSchedulerConfig} * @param threadPoolMergeExecutorService the executor service used to execute merge tasks from this scheduler * @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take + * @param mergeMetrics metrics related to merges */ public ThreadPoolMergeScheduler( ShardId shardId, IndexSettings indexSettings, ThreadPoolMergeExecutorService threadPoolMergeExecutorService, - MergeMemoryEstimateProvider mergeMemoryEstimateProvider + MergeMemoryEstimateProvider mergeMemoryEstimateProvider, + MergeMetrics mergeMetrics ) { this.shardId = shardId; this.config = indexSettings.getMergeSchedulerConfig(); this.logger = Loggers.getLogger(getClass(), shardId); + this.mergeMetrics = mergeMetrics; this.mergeTracking = new MergeTracking( logger, () -> this.config.isAutoThrottle() @@ -226,6 +230,7 @@ protected void handleMergeException(Throwable t) { boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) { try { MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger); + mergeMetrics.incrementQueuedMergeBytes(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes()); mergeQueued(mergeTask.onGoingMerge); return threadPoolMergeExecutorService.submitMergeTask(mergeTask); } finally { @@ -310,6 +315,7 @@ synchronized void mergeTaskFinishedRunning(MergeTask mergeTask) { private void mergeTaskDone(OnGoingMerge merge) { doneMergeTaskCount.incrementAndGet(); + mergeMetrics.decrementRunningMergeBytes(merge); mergeExecutedOrAborted(merge); checkMergeTaskThrottling(); } @@ -437,6 +443,7 @@ public void run() { assert hasStartedRunning() == false; assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) : "runNowOrBacklog must be invoked before actually running the merge task"; + boolean success = false; try { beforeMerge(onGoingMerge); try { @@ -444,11 +451,13 @@ public void run() { throw new IllegalStateException("The merge task is already started or aborted"); } mergeTracking.mergeStarted(onGoingMerge); + mergeMetrics.moveQueuedMergeBytesToRunning(onGoingMerge, mergeMemoryEstimateBytes); if (verbose()) { message(String.format(Locale.ROOT, "merge task %s start", this)); } try { doMerge(mergeSource, onGoingMerge.getMerge()); + success = onGoingMerge.getMerge().isAborted() == false; if (verbose()) { message( String.format( @@ -468,6 +477,10 @@ public void run() { } } finally { long tookMS = TimeValue.nsecToMSec(System.nanoTime() - mergeStartTimeNS.get()); + if (success) { + long newSegmentSize = getNewSegmentSize(onGoingMerge.getMerge()); + mergeMetrics.markMergeMetrics(onGoingMerge.getMerge(), newSegmentSize, tookMS); + } mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS); } } finally { @@ -508,6 +521,8 @@ void abort() { // {@code IndexWriter} checks the abort flag internally, while running the merge. // The segments of an aborted merge become available to subsequent merges. onGoingMerge.getMerge().setAborted(); + + mergeMetrics.moveQueuedMergeBytesToRunning(onGoingMerge, mergeMemoryEstimateBytes); try { if (verbose()) { message(String.format(Locale.ROOT, "merge task %s start abort", this)); @@ -554,6 +569,21 @@ public OnGoingMerge getOnGoingMerge() { return onGoingMerge; } + private static long getNewSegmentSize(MergePolicy.OneMerge currentMerge) { + try { + return currentMerge.getMergeInfo() != null ? currentMerge.getMergeInfo().sizeInBytes() : currentMerge.estimatedMergeBytes; + } catch (IOException e) { + // For stateless only: It is (rarely) possible that the merged segment could be merged away by the IndexWriter prior to + // reaching this point. Once the IW creates the new segment, it could be exposed to be included in a new merge. That + // merge can be executed concurrently if more than 1 merge threads are configured. That new merge allows this IW to + // delete segment created by this merge. Although the files may still be available in the object store for executing + // searches, the IndexDirectory will no longer have references to the underlying segment files and will throw file not + // found if we try to read them. In this case, we will ignore that exception (which would otherwise fail the shard) and + // use the originally estimated merge size for metrics. + return currentMerge.estimatedMergeBytes; + } + } + @Override public String toString() { return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : ""); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 01b7e63280d77..1c9fd9b305f37 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -91,6 +91,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.SafeCommitInfo; @@ -269,6 +270,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final MeanMetric externalRefreshMetric = new MeanMetric(); private final MeanMetric flushMetric = new MeanMetric(); private final CounterMetric periodicFlushMetric = new CounterMetric(); + private final MergeMetrics mergeMetrics; private final ShardEventListener shardEventListener = new ShardEventListener(); @@ -343,7 +345,8 @@ public IndexShard( final Engine.IndexCommitListener indexCommitListener, final MapperMetrics mapperMetrics, final IndexingStatsSettings indexingStatsSettings, - final SearchStatsSettings searchStatsSettings + final SearchStatsSettings searchStatsSettings, + final MergeMetrics mergeMetrics ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -432,6 +435,7 @@ public IndexShard( this.refreshFieldHasValueListener = new RefreshFieldHasValueListener(); this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier; this.indexCommitListener = indexCommitListener; + this.mergeMetrics = mergeMetrics; } public ThreadPool getThreadPool() { @@ -3755,7 +3759,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { indexCommitListener, routingEntry().isPromotableToPrimary(), mapperService(), - engineResetLock + engineResetLock, + mergeMetrics ); } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index ed94017a600ed..8fdc53e6b795f 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -98,6 +98,7 @@ import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; @@ -283,6 +284,7 @@ public class IndicesService extends AbstractLifecycleComponent final SlowLogFieldProvider slowLogFieldProvider; // pkg-private for testingå private final IndexingStatsSettings indexStatsSettings; private final SearchStatsSettings searchStatsSettings; + private final MergeMetrics mergeMetrics; @Override protected void doStart() { @@ -358,6 +360,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon this.requestCacheKeyDifferentiator = builder.requestCacheKeyDifferentiator; this.queryRewriteInterceptor = builder.queryRewriteInterceptor; this.mapperMetrics = builder.mapperMetrics; + this.mergeMetrics = builder.mergeMetrics; // doClose() is called when shutting down a node, yet there might still be ongoing requests // that we need to wait for before closing some resources such as the caches. In order to // avoid closing these resources while ongoing requests are still being processed, we use a @@ -801,7 +804,8 @@ private synchronized IndexService createIndexService( mapperMetrics, searchOperationListeners, indexStatsSettings, - searchStatsSettings + searchStatsSettings, + mergeMetrics ); for (IndexingOperationListener operationListener : indexingOperationListeners) { indexModule.addIndexOperationListener(operationListener); @@ -900,7 +904,8 @@ public synchronized MapperService createIndexMapperServiceForValidation(IndexMet mapperMetrics, searchOperationListeners, indexStatsSettings, - searchStatsSettings + searchStatsSettings, + mergeMetrics ); pluginsService.forEach(p -> p.onIndexModule(indexModule)); return indexModule.newIndexMapperService(clusterService, parserConfig, mapperRegistry, scriptService); diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java b/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java index df3ed42db37a0..3b7f4d24869f2 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.SlowLogFields; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.MapperRegistry; import org.elasticsearch.index.shard.SearchOperationListener; @@ -79,6 +80,7 @@ public class IndicesServiceBuilder { @Nullable CheckedBiConsumer requestCacheKeyDifferentiator; MapperMetrics mapperMetrics; + MergeMetrics mergeMetrics; List searchOperationListener = List.of(); QueryRewriteInterceptor queryRewriteInterceptor = null; SlowLogFieldProvider slowLogFieldProvider = new SlowLogFieldProvider() { @@ -206,6 +208,11 @@ public IndicesServiceBuilder mapperMetrics(MapperMetrics mapperMetrics) { return this; } + public IndicesServiceBuilder mergeMetrics(MergeMetrics mergeMetrics) { + this.mergeMetrics = mergeMetrics; + return this; + } + public List searchOperationListeners() { return searchOperationListener; } @@ -244,6 +251,7 @@ public IndicesService build() { Objects.requireNonNull(indexFoldersDeletionListeners); Objects.requireNonNull(snapshotCommitSuppliers); Objects.requireNonNull(mapperMetrics); + Objects.requireNonNull(mergeMetrics); Objects.requireNonNull(searchOperationListener); Objects.requireNonNull(slowLogFieldProvider); diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index f4ec7f245e78b..73a2efabb9e81 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -116,6 +116,7 @@ import org.elasticsearch.index.SlowLogFieldProvider; import org.elasticsearch.index.SlowLogFields; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.SourceFieldMetrics; import org.elasticsearch.index.search.stats.ShardSearchPhaseAPMMetrics; @@ -806,6 +807,9 @@ private void construct( threadPool::relativeTimeInMillis ); MapperMetrics mapperMetrics = new MapperMetrics(sourceFieldMetrics); + + MergeMetrics mergeMetrics = new MergeMetrics(telemetryProvider.getMeterRegistry()); + final List searchOperationListeners = List.of( new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry()) ); @@ -894,6 +898,7 @@ public Map queryFields() { .valuesSourceRegistry(searchModule.getValuesSourceRegistry()) .requestCacheKeyDifferentiator(searchModule.getRequestCacheKeyDifferentiator()) .mapperMetrics(mapperMetrics) + .mergeMetrics(mergeMetrics) .searchOperationListeners(searchOperationListeners) .slowLogFieldProvider(slowLogFieldProvider) .build(); @@ -1290,6 +1295,7 @@ public Map queryFields() { b.bind(FailureStoreMetrics.class).toInstance(failureStoreMetrics); b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService); b.bind(OnlinePrewarmingService.class).toInstance(onlinePrewarmingService); + b.bind(MergeMetrics.class).toInstance(mergeMetrics); }); if (ReadinessService.enabled(environment)) { diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index fcd20894b5a3c..db29c9a61e007 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -60,6 +60,7 @@ import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.fielddata.IndexFieldDataCache; @@ -256,7 +257,8 @@ public void testWrapperIsBound() throws IOException { MapperMetrics.NOOP, emptyList(), new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); module.setReaderWrapper(s -> new Wrapper()); @@ -286,7 +288,8 @@ public void testRegisterIndexStore() throws IOException { MapperMetrics.NOOP, emptyList(), new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); final IndexService indexService = newIndexService(module); @@ -314,7 +317,8 @@ public void testDirectoryWrapper() throws IOException { MapperMetrics.NOOP, emptyList(), new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); module.setDirectoryWrapper(new TestDirectoryWrapper()); @@ -670,7 +674,8 @@ public void testRegisterCustomRecoveryStateFactory() throws IOException { MapperMetrics.NOOP, emptyList(), new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); final IndexService indexService = newIndexService(module); @@ -695,7 +700,8 @@ public void testIndexCommitListenerIsBound() throws IOException, ExecutionExcept MapperMetrics.NOOP, emptyList(), new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); final AtomicLong lastAcquiredPrimaryTerm = new AtomicLong(); @@ -800,7 +806,8 @@ private static IndexModule createIndexModule( MapperMetrics.NOOP, emptyList(), new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 3974bf9126597..680f6fca9652e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3635,7 +3635,8 @@ public void testRecoverFromForeignTranslog() throws IOException { null, true, config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -7243,7 +7244,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 92148a661ceab..d88f7c67b0bbd 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -46,6 +46,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -74,12 +75,14 @@ public void testMergesExecuteInSizeOrder() throws IOException { nodeEnvironment = newNodeEnvironment(settings); ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests .getThreadPoolMergeExecutorService(threadPoolTaskQueue.getThreadPool(), settings, nodeEnvironment); + var mergeMetrics = mock(MergeMetrics.class); try ( ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", Settings.EMPTY), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + mergeMetrics ) ) { List executedMergesList = new ArrayList<>(); @@ -97,9 +100,19 @@ public void testMergesExecuteInSizeOrder() throws IOException { return null; }).when(mergeSource).merge(any(OneMerge.class)); threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + + // verify queued byte metric is recorded for each merge + verify(mergeMetrics, times(i + 1)).incrementQueuedMergeBytes(any(), anyLong()); } + threadPoolTaskQueue.runAllTasks(); assertThat(executedMergesList.size(), is(mergeCount)); + + // verify metrics are reported for each merge + verify(mergeMetrics, times(mergeCount)).moveQueuedMergeBytesToRunning(any(), anyLong()); + verify(mergeMetrics, times(mergeCount)).decrementRunningMergeBytes(any()); + verify(mergeMetrics, times(mergeCount)).markMergeMetrics(any(), anyLong(), anyLong()); + // assert merges are executed in ascending size order for (int i = 1; i < mergeCount; i++) { assertThat( @@ -113,6 +126,7 @@ public void testMergesExecuteInSizeOrder() throws IOException { public void testSimpleMergeTaskBacklogging() { int mergeExecutorThreadCount = randomIntBetween(1, 5); + var mergeMetrics = mock(MergeMetrics.class); Settings mergeSchedulerSettings = Settings.builder() .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeExecutorThreadCount) .build(); @@ -122,7 +136,8 @@ public void testSimpleMergeTaskBacklogging() { new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + mergeMetrics ); // more merge tasks than merge threads int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5); @@ -143,6 +158,9 @@ public void testSimpleMergeTaskBacklogging() { } assertThat(threadPoolMergeScheduler.getRunningMergeTasks().size(), is(mergeExecutorThreadCount)); assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(mergeCount - mergeExecutorThreadCount)); + + // verify no metrics are recorded as no merges have been queued or executed through the merge scheduler + verifyNoInteractions(mergeMetrics); } public void testSimpleMergeTaskReEnqueueingBySize() { @@ -156,7 +174,8 @@ public void testSimpleMergeTaskReEnqueueingBySize() { new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ); // sort backlogged merges by size PriorityQueue backloggedMergeTasks = new PriorityQueue<>( @@ -388,7 +407,8 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ) ) { MergeSource mergeSource = mock(MergeSource.class); @@ -454,6 +474,7 @@ public void testMergesRunConcurrently() throws Exception { // disable fs available disk space feature for this test .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") .build(); + var mergeMetrics = mock(MergeMetrics.class); nodeEnvironment = newNodeEnvironment(settings); try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests @@ -465,7 +486,8 @@ public void testMergesRunConcurrently() throws Exception { new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + mergeMetrics ) ) { // at least 1 extra merge than there are concurrently allowed @@ -485,7 +507,11 @@ public void testMergesRunConcurrently() throws Exception { return null; }).when(mergeSource).merge(any(OneMerge.class)); threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + + // verify queued byte metric is recorded for each merge + verify(mergeMetrics, times(i + 1)).incrementQueuedMergeBytes(any(), anyLong()); } + for (int completedMergesCount = 0; completedMergesCount < mergeCount - mergeSchedulerMaxThreadCount; completedMergesCount++) { int finalCompletedMergesCount = completedMergesCount; @@ -530,6 +556,11 @@ public void testMergesRunConcurrently() throws Exception { runMergeSemaphore.release(); } assertBusy(() -> assertTrue(threadPoolMergeExecutorService.allDone())); + + // verify metrics are recorded for each merge + verify(mergeMetrics, times(mergeCount)).moveQueuedMergeBytesToRunning(any(), anyLong()); + verify(mergeMetrics, times(mergeCount)).decrementRunningMergeBytes(any()); + verify(mergeMetrics, times(mergeCount)).markMergeMetrics(any(), anyLong(), anyLong()); } } } @@ -553,7 +584,8 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception { new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ) ) { CountDownLatch mergeDoneLatch = new CountDownLatch(1); @@ -626,7 +658,8 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce new ShardId("index", "_na_", 1), indexSettings, threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ) ) { threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); @@ -656,7 +689,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { new ShardId("index", "_na_", 1), indexSettings, threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ) ) { threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); @@ -673,7 +707,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { new ShardId("index", "_na_", 1), indexSettings, threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ) ) { // merge submitted upon closing @@ -690,7 +725,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { new ShardId("index", "_na_", 1), indexSettings, threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ) ) { // merge submitted upon closing @@ -705,29 +741,63 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { } } - public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() { - ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); - // build a scheduler that always returns true for shouldSkipMerge - ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( - new ShardId("index", "_na_", 1), - IndexSettingsModule.newIndexSettings("index", Settings.builder().build()), - threadPoolMergeExecutorService, - merge -> 0 + public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() throws IOException { + DeterministicTaskQueue threadPoolTaskQueue = new DeterministicTaskQueue(); + Settings settings = Settings.builder() + // disable fs available disk space feature for this test + .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s") + .build(); + nodeEnvironment = newNodeEnvironment(settings); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests + .getThreadPoolMergeExecutorService(threadPoolTaskQueue.getThreadPool(), settings, nodeEnvironment); + var mergeMetrics = mock(MergeMetrics.class); + try ( + // build a scheduler that always returns true for shouldSkipMerge + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", Settings.EMPTY), + threadPoolMergeExecutorService, + merge -> 0, + mergeMetrics + ) { + @Override + protected boolean shouldSkipMerge() { + return true; + } + } ) { - @Override - protected boolean shouldSkipMerge() { - return true; + int mergeCount = randomIntBetween(2, 10); + for (int i = 0; i < mergeCount; i++) { + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + + // create the merge task + MergeTask mergeTask = threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values())); + + // verify that calling schedule on the merge task indicates the merge should be aborted + Schedule schedule = threadPoolMergeScheduler.schedule(mergeTask); + assertThat(schedule, is(Schedule.ABORT)); + + // run the merge through the scheduler + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + + // verify queued merge byte metrics are still recorded for each merge + verify(mergeMetrics, times(i + 1)).incrementQueuedMergeBytes(any(), anyLong()); } - }; - MergeSource mergeSource = mock(MergeSource.class); - OneMerge oneMerge = mock(OneMerge.class); - when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); - when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); - when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); - MergeTask mergeTask = threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values())); - // verify that calling schedule on the merge task indicates the merge should be aborted - Schedule schedule = threadPoolMergeScheduler.schedule(mergeTask); - assertThat(schedule, is(Schedule.ABORT)); + + // run all merges; they should all be aborted + threadPoolTaskQueue.runAllTasks(); + + // verify queued bytes metrics are moved to running and decremented + verify(mergeMetrics, times(mergeCount)).moveQueuedMergeBytesToRunning(any(), anyLong()); + verify(mergeMetrics, times(mergeCount)).decrementRunningMergeBytes(any()); + + // verify we did not mark the merges as merged + verify(mergeMetrics, times(0)).markMergeMetrics(any(), anyLong(), anyLong()); + } } private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) { @@ -746,7 +816,7 @@ static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler { IndexSettings indexSettings, ThreadPoolMergeExecutorService threadPoolMergeExecutorService ) { - super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0); + super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0, MergeMetrics.NOOP); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 10df837c8d4f2..cc682901876b6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -5064,7 +5064,8 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); return new InternalEngine(configWithWarmer); }); @@ -5346,7 +5347,8 @@ public void afterRefresh(boolean didRefresh) throws IOException {} config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); lazyEngineConfig.set(engineConfigWithBlockingRefreshListener); return new InternalEngine(engineConfigWithBlockingRefreshListener) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index d031ef187b70b..8f0604956a98b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -175,7 +176,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { null, true, EngineTestCase.createMapperService(), - new EngineResetLock() + new EngineResetLock(), + MergeMetrics.NOOP ); engine = new InternalEngine(config); EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 572c7110beaee..7477e648e054b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -145,6 +145,7 @@ import org.elasticsearch.index.IndexSettingProviders; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.MapperRegistry; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; @@ -2483,6 +2484,7 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { .client(client) .metaStateService(new MetaStateService(nodeEnv, namedXContentRegistry)) .mapperMetrics(MapperMetrics.NOOP) + .mergeMetrics(MergeMetrics.NOOP) .build(); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 3a134a3d0d9e5..2327ac06b9e81 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -169,6 +169,7 @@ public abstract class EngineTestCase extends ESTestCase { protected InternalEngine engine; protected InternalEngine replicaEngine; + protected MergeMetrics mergeMetrics; protected IndexSettings defaultSettings; protected String codecName; @@ -263,6 +264,7 @@ public void setUp() throws Exception { primaryTranslogDir = createTempDir("translog-primary"); mapperService = createMapperService(defaultSettings.getSettings(), defaultMapping(), extraMappers()); translogHandler = createTranslogHandler(mapperService); + mergeMetrics = MergeMetrics.NOOP; engine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy()); LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); @@ -313,7 +315,8 @@ public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpoi config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); } @@ -347,7 +350,8 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); } @@ -381,7 +385,8 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); } @@ -887,7 +892,8 @@ public EngineConfig config( indexCommitListener, true, mapperService, - new EngineResetLock() + new EngineResetLock(), + mergeMetrics ); } @@ -929,7 +935,8 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 6ea9f29a5b6b2..aed1d10342768 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.mapper.MapperMetrics; @@ -562,7 +563,8 @@ protected IndexShard newShard( null, MapperMetrics.NOOP, new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 5e7481602a77a..54bdbd0c0e91c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.engine.TranslogHandler; @@ -284,7 +285,8 @@ public void onFailedEngine(String reason, Exception e) { null, true, mapperService, - new EngineResetLock() + new EngineResetLock(), + MergeMetrics.NOOP ); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index 0faac6422cffb..e480fb69b61e2 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.index.SlowLogFieldProvider; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.search.stats.SearchStatsSettings; import org.elasticsearch.index.shard.IndexingStatsSettings; @@ -464,7 +465,8 @@ public void testOnIndexModuleIsNoOpWithSecurityDisabled() throws Exception { MapperMetrics.NOOP, List.of(), new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); security.onIndexModule(indexModule); // indexReaderWrapper is a SetOnce so if Security#onIndexModule had already set an ReaderWrapper we would get an exception here diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java index 1cfa5669363c4..4ef1fdc276f9d 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.index.SlowLogFieldProvider; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.search.stats.SearchStatsSettings; import org.elasticsearch.index.shard.IndexingStatsSettings; @@ -76,7 +77,8 @@ public void testWatcherDisabledTests() throws Exception { MapperMetrics.NOOP, List.of(), new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), - new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); // this will trip an assertion if the watcher indexing operation listener is null (which it is) but we try to add it watcher.onIndexModule(indexModule);