Skip to content

Bring over merge metrics from stateless #128617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ class TestInternalEngine extends org.elasticsearch.index.engine.InternalEngine {
protected ElasticsearchMergeScheduler createMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMetrics mergeMetrics
) {
ElasticsearchMergeScheduler mergeScheduler = super.createMergeScheduler(
shardId,
indexSettings,
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
mergeMetrics
);
assertThat(mergeScheduler, instanceOf(ThreadPoolMergeScheduler.class));
// assert there is a single merge executor service for all shards
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.NoOpEngine;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.MapperMetrics;
Expand Down Expand Up @@ -680,7 +681,8 @@ public static final IndexShard newIndexShard(
null,
MapperMetrics.NOOP,
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings())
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
MergeMetrics.NOOP
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.getIndexCommitListener(),
config.isPromotableToPrimary(),
config.getMapperService(),
config.getEngineResetLock()
config.getEngineResetLock(),
config.getMergeMetrics()
);
}

Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperMetrics;
Expand Down Expand Up @@ -181,6 +182,7 @@ public interface DirectoryWrapper {
private final MapperMetrics mapperMetrics;
private final IndexingStatsSettings indexingStatsSettings;
private final SearchStatsSettings searchStatsSettings;
private final MergeMetrics mergeMetrics;

/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand All @@ -190,6 +192,7 @@ public interface DirectoryWrapper {
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param directoryFactories the available store types
* @param mergeMetrics
*/
public IndexModule(
final IndexSettings indexSettings,
Expand All @@ -203,7 +206,8 @@ public IndexModule(
final MapperMetrics mapperMetrics,
final List<SearchOperationListener> searchOperationListeners,
final IndexingStatsSettings indexingStatsSettings,
final SearchStatsSettings searchStatsSettings
final SearchStatsSettings searchStatsSettings,
final MergeMetrics mergeMetrics
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
Expand All @@ -220,6 +224,7 @@ public IndexModule(
this.mapperMetrics = mapperMetrics;
this.indexingStatsSettings = indexingStatsSettings;
this.searchStatsSettings = searchStatsSettings;
this.mergeMetrics = mergeMetrics;
}

/**
Expand Down Expand Up @@ -557,7 +562,8 @@ public IndexService newIndexService(
mapperMetrics,
queryRewriteInterceptor,
indexingStatsSettings,
searchStatsSettings
searchStatsSettings,
mergeMetrics
);
success = true;
return indexService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.IndexFieldData;
Expand Down Expand Up @@ -172,6 +173,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final QueryRewriteInterceptor queryRewriteInterceptor;
private final IndexingStatsSettings indexingStatsSettings;
private final SearchStatsSettings searchStatsSettings;
private final MergeMetrics mergeMetrics;

@SuppressWarnings("this-escape")
public IndexService(
Expand Down Expand Up @@ -210,7 +212,8 @@ public IndexService(
MapperMetrics mapperMetrics,
QueryRewriteInterceptor queryRewriteInterceptor,
IndexingStatsSettings indexingStatsSettings,
SearchStatsSettings searchStatsSettings
SearchStatsSettings searchStatsSettings,
MergeMetrics mergeMetrics
) {
super(indexSettings);
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
Expand Down Expand Up @@ -297,6 +300,7 @@ public IndexService(
}
this.indexingStatsSettings = indexingStatsSettings;
this.searchStatsSettings = searchStatsSettings;
this.mergeMetrics = mergeMetrics;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -588,7 +592,8 @@ public synchronized IndexShard createShard(
indexCommitListener,
mapperMetrics,
indexingStatsSettings,
searchStatsSettings
searchStatsSettings,
mergeMetrics
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {

private final EngineResetLock engineResetLock;

private final MergeMetrics mergeMetrics;

/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
Expand Down Expand Up @@ -181,7 +183,8 @@ public EngineConfig(
Engine.IndexCommitListener indexCommitListener,
boolean promotableToPrimary,
MapperService mapperService,
EngineResetLock engineResetLock
EngineResetLock engineResetLock,
MergeMetrics mergeMetrics
) {
this.shardId = shardId;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -229,6 +232,7 @@ public EngineConfig(
// always use compound on flush - reduces # of file-handles on refresh
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
this.engineResetLock = engineResetLock;
this.mergeMetrics = mergeMetrics;
}

/**
Expand Down Expand Up @@ -477,4 +481,8 @@ public MapperService getMapperService() {
public EngineResetLock getEngineResetLock() {
return engineResetLock;
}

public MergeMetrics getMergeMetrics() {
return mergeMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = createMergeScheduler(
engineConfig.getShardId(),
engineConfig.getIndexSettings(),
engineConfig.getThreadPoolMergeExecutorService()
engineConfig.getThreadPoolMergeExecutorService(),
engineConfig.getMergeMetrics()
);
scheduler = mergeScheduler.getMergeScheduler();
throttle = new IndexThrottle(pauseIndexingOnThrottle);
Expand Down Expand Up @@ -2908,10 +2909,11 @@ protected void doRun() {
protected ElasticsearchMergeScheduler createMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMetrics mergeMetrics
) {
if (threadPoolMergeExecutorService != null) {
return new EngineThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutorService);
return new EngineThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutorService, mergeMetrics);
} else {
return new EngineConcurrentMergeScheduler(shardId, indexSettings);
}
Expand All @@ -2921,9 +2923,10 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
EngineThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMetrics mergeMetrics
) {
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes, mergeMetrics);
}

@Override
Expand Down
101 changes: 101 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/MergeMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.concurrent.atomic.AtomicLong;

public class MergeMetrics {

public static final String MERGE_SEGMENTS_SIZE = "es.merge.segments.size";
public static final String MERGE_DOCS_TOTAL = "es.merge.docs.total";
public static final String MERGE_SEGMENTS_QUEUED_USAGE = "es.merge.segments.queued.usage";
public static final String MERGE_SEGMENTS_RUNNING_USAGE = "es.merge.segments.running.usage";
public static final String MERGE_SEGMENTS_MERGED_SIZE = "es.merge.segments.merged.size";
public static final String MERGE_QUEUED_ESTIMATED_MEMORY_SIZE = "es.merge.segments.memory.size";
public static final String MERGE_TIME_IN_SECONDS = "es.merge.time";
public static MergeMetrics NOOP = new MergeMetrics(TelemetryProvider.NOOP.getMeterRegistry());

private final LongCounter mergeSizeInBytes;
private final LongCounter mergeMergedSegmentSizeInBytes;
private final LongCounter mergeNumDocs;
private final LongHistogram mergeTimeInSeconds;

private final AtomicLong runningMergeSizeInBytes = new AtomicLong();
private final AtomicLong queuedMergeSizeInBytes = new AtomicLong();
private final AtomicLong queuedEstimatedMergeMemoryInBytes = new AtomicLong();

public MergeMetrics(MeterRegistry meterRegistry) {
mergeSizeInBytes = meterRegistry.registerLongCounter(MERGE_SEGMENTS_SIZE, "Total size of segments merged", "bytes");
meterRegistry.registerLongGauge(
MERGE_SEGMENTS_QUEUED_USAGE,
"Total usage of segments queued to be merged",
"bytes",
() -> new LongWithAttributes(queuedMergeSizeInBytes.get())
);
meterRegistry.registerLongGauge(
MERGE_SEGMENTS_RUNNING_USAGE,
"Total usage of segments currently being merged",
"bytes",
() -> new LongWithAttributes(runningMergeSizeInBytes.get())
);
mergeMergedSegmentSizeInBytes = meterRegistry.registerLongCounter(
MERGE_SEGMENTS_MERGED_SIZE,
"Total size of the new merged segments",
"bytes"
);
mergeNumDocs = meterRegistry.registerLongCounter(MERGE_DOCS_TOTAL, "Total number of documents merged", "documents");
mergeTimeInSeconds = meterRegistry.registerLongHistogram(MERGE_TIME_IN_SECONDS, "Merge time in seconds", "seconds");
meterRegistry.registerLongGauge(
MERGE_QUEUED_ESTIMATED_MEMORY_SIZE,
"Estimated memory usage for queued merges",
"bytes",
() -> new LongWithAttributes(queuedEstimatedMergeMemoryInBytes.get())
);
}

public void incrementQueuedMergeBytes(OnGoingMerge currentMerge, long estimatedMemorySize) {
queuedMergeSizeInBytes.getAndAdd(currentMerge.getTotalBytesSize());
queuedEstimatedMergeMemoryInBytes.getAndAdd(estimatedMemorySize);
}

public void moveQueuedMergeBytesToRunning(OnGoingMerge currentMerge, long estimatedMemorySize) {
long totalSize = currentMerge.getTotalBytesSize();
queuedMergeSizeInBytes.getAndAdd(-totalSize);
runningMergeSizeInBytes.getAndAdd(totalSize);
queuedEstimatedMergeMemoryInBytes.getAndAdd(-estimatedMemorySize);
}

public void decrementRunningMergeBytes(OnGoingMerge currentMerge) {
runningMergeSizeInBytes.getAndAdd(-currentMerge.getTotalBytesSize());
}

public void markMergeMetrics(MergePolicy.OneMerge currentMerge, long mergedSegmentSize, long tookMillis) {
mergeSizeInBytes.incrementBy(currentMerge.totalBytesSize());
mergeMergedSegmentSizeInBytes.incrementBy(mergedSegmentSize);
mergeNumDocs.incrementBy(currentMerge.totalNumDocs());
mergeTimeInSeconds.record(tookMillis / 1000);
}

public long getQueuedMergeSizeInBytes() {
return queuedMergeSizeInBytes.get();
}

public long getRunningMergeSizeInBytes() {
return runningMergeSizeInBytes.get();
}
}
Loading