From 657178b6da4335ed8d88fd8c825e10dc1eec31c9 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 5 May 2023 22:02:55 +0200 Subject: [PATCH 1/3] fix: enforce valid values when creating ExecutorServices Fixes #1889 --- .../config/AbstractConfigurationService.java | 32 ++++++++++---- .../api/config/BaseConfigurationService.java | 3 +- .../api/config/ConfigurationService.java | 15 +++---- .../config/ConfigurationServiceOverrider.java | 28 ++++++++++--- .../api/config/ExecutorServiceManager.java | 12 ++++++ .../operator/api/config/Utils.java | 18 ++++++++ .../ConfigurationServiceOverriderTest.java | 42 +++++++++---------- 7 files changed, 104 insertions(+), 46 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java index 3921f9f84e..bf90a230a6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java @@ -17,32 +17,37 @@ public class AbstractConfigurationService implements ConfigurationService { private final Version version; private Cloner cloner; private ObjectMapper mapper; + private ExecutorServiceManager executorServiceManager; public AbstractConfigurationService(Version version) { - this(version, null, null); + this(version, null, null, null); } public AbstractConfigurationService(Version version, Cloner cloner) { - this(version, cloner, null); + this(version, cloner, null, null); } - public AbstractConfigurationService(Version version, Cloner cloner, ObjectMapper mapper) { + public AbstractConfigurationService(Version version, Cloner cloner, ObjectMapper mapper, + ExecutorServiceManager executorServiceManager) { this.version = version; - init(cloner, mapper); + init(cloner, mapper, executorServiceManager); } /** - * Subclasses can call this method to more easily initialize the {@link Cloner} and - * {@link ObjectMapper} associated with this ConfigurationService implementation. This is useful - * in situations where the cloner depends on a mapper that might require additional configuration - * steps before it's ready to be used. + * Subclasses can call this method to more easily initialize the {@link Cloner} + * {@link ObjectMapper} and {@link ExecutorServiceManager} associated with this + * ConfigurationService implementation. This is useful in situations where the cloner depends on a + * mapper that might require additional configuration steps before it's ready to be used. * * @param cloner the {@link Cloner} instance to be used * @param mapper the {@link ObjectMapper} instance to be used + * @param executorServiceManager the {@link ExecutorServiceManager} instance to be used */ - protected void init(Cloner cloner, ObjectMapper mapper) { + protected void init(Cloner cloner, ObjectMapper mapper, + ExecutorServiceManager executorServiceManager) { this.cloner = cloner != null ? cloner : ConfigurationService.super.getResourceCloner(); this.mapper = mapper != null ? mapper : ConfigurationService.super.getObjectMapper(); + this.executorServiceManager = executorServiceManager; } protected void register(ControllerConfiguration config) { @@ -132,4 +137,13 @@ public Cloner getResourceCloner() { public ObjectMapper getObjectMapper() { return mapper; } + + @Override + public ExecutorServiceManager getExecutorServiceManager() { + // lazy init to avoid initializing thread pools for nothing in an overriding scenario + if (executorServiceManager == null) { + executorServiceManager = ConfigurationService.super.getExecutorServiceManager(); + } + return executorServiceManager; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java index 46343a13d0..bd939d4b32 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java @@ -45,7 +45,7 @@ public BaseConfigurationService(Version version) { } public BaseConfigurationService(Version version, Cloner cloner, ObjectMapper mapper) { - super(version, cloner, mapper); + super(version, cloner, mapper, null); } public BaseConfigurationService(Version version, Cloner cloner) { @@ -62,6 +62,7 @@ protected void logMissingReconcilerWarning(String reconcilerKey, String reconcil reconcilersNameMessage); } + @SuppressWarnings("unused") public String getLoggerName() { return LOGGER_NAME; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index dce6033fd0..52d78fb1e9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -4,9 +4,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.slf4j.Logger; @@ -24,6 +21,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import static io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.newThreadPoolExecutor; + /** An interface from which to retrieve configuration information. */ public interface ConfigurationService { @@ -164,15 +163,13 @@ default Metrics getMetrics() { } default ExecutorService getExecutorService() { - return new ThreadPoolExecutor(minConcurrentReconciliationThreads(), - concurrentReconciliationThreads(), - 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); + return newThreadPoolExecutor(minConcurrentReconciliationThreads(), + concurrentReconciliationThreads()); } default ExecutorService getWorkflowExecutorService() { - return new ThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(), - concurrentWorkflowExecutorThreads(), - 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); + return newThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(), + concurrentWorkflowExecutorThreads()); } default boolean closeClientOnStop() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index e8fa2a1577..8d9f4200bc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -57,13 +57,21 @@ public ConfigurationServiceOverrider withConcurrentWorkflowExecutorThreads(int t return this; } + private int minimumMaxValueFor(Integer minValue) { + return minValue != null ? (minValue < 0 ? 0 : minValue) + 1 : 1; + } + public ConfigurationServiceOverrider withMinConcurrentReconciliationThreads(int threadNumber) { - this.minConcurrentReconciliationThreads = threadNumber; + this.minConcurrentReconciliationThreads = Utils.ensureValid(threadNumber, + "minimum reconciliation threads", ExecutorServiceManager.MIN_THREAD_NUMBER, + original.minConcurrentReconciliationThreads()); return this; } public ConfigurationServiceOverrider withMinConcurrentWorkflowExecutorThreads(int threadNumber) { - this.minConcurrentWorkflowExecutorThreads = threadNumber; + this.minConcurrentWorkflowExecutorThreads = Utils.ensureValid(threadNumber, + "minimum workflow execution threads", ExecutorServiceManager.MIN_THREAD_NUMBER, + original.minConcurrentWorkflowExecutorThreads()); return this; } @@ -150,14 +158,22 @@ public boolean checkCRDAndValidateLocalModel() { @Override public int concurrentReconciliationThreads() { - return concurrentReconciliationThreads != null ? concurrentReconciliationThreads - : original.concurrentReconciliationThreads(); + return Utils.ensureValid( + concurrentReconciliationThreads != null ? concurrentReconciliationThreads + : original.concurrentReconciliationThreads(), + "maximum reconciliation threads", + minimumMaxValueFor(minConcurrentReconciliationThreads), + original.concurrentReconciliationThreads()); } @Override public int concurrentWorkflowExecutorThreads() { - return concurrentWorkflowExecutorThreads != null ? concurrentWorkflowExecutorThreads - : original.concurrentWorkflowExecutorThreads(); + return Utils.ensureValid( + concurrentWorkflowExecutorThreads != null ? concurrentWorkflowExecutorThreads + : original.concurrentWorkflowExecutorThreads(), + "maximum workflow execution threads", + minimumMaxValueFor(minConcurrentWorkflowExecutorThreads), + original.concurrentWorkflowExecutorThreads()); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index f57af9fc30..3ec6c82c7d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -8,6 +8,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; @@ -20,7 +22,9 @@ import io.javaoperatorsdk.operator.OperatorException; public class ExecutorServiceManager { + private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class); + public static final int MIN_THREAD_NUMBER = 0; private final ExecutorService executor; private final ExecutorService workflowExecutor; private final ExecutorService cachingExecutorService; @@ -32,6 +36,14 @@ public class ExecutorServiceManager { new InstrumentedExecutorService(configurationService.getWorkflowExecutorService()); } + public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) { + minThreads = Utils.ensureValid(minThreads, "minimum number of threads", MIN_THREAD_NUMBER); + maxThreads = Utils.ensureValid(maxThreads, "maximum number of threads", minThreads + 1); + + return new ThreadPoolExecutor(minThreads, maxThreads, 1, TimeUnit.MINUTES, + new LinkedBlockingDeque<>()); + } + /** * Uses cachingExecutorService from this manager. Use this only for tasks, that don't have dynamic * nature, in sense that won't grow with the number of inputs (thus kubernetes resources) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java index 2028d3d5bc..19d8a73689 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java @@ -72,6 +72,24 @@ public static Version loadFromProperties() { builtTime); } + public static int ensureValid(int value, String description, int minValue) { + return ensureValid(value, description, minValue, minValue); + } + + public static int ensureValid(int value, String description, int minValue, int defaultValue) { + if (value < minValue) { + if (defaultValue < minValue) { + throw new IllegalArgumentException( + "Default value for " + description + " must be greater than " + minValue); + } + log.warn("Requested " + description + " should be greater than " + minValue + ". Requested: " + + value + ", using " + defaultValue + (defaultValue == minValue ? "" : " (default)") + + " instead"); + value = defaultValue; + } + return value; + } + @SuppressWarnings("unused") // this is used in the Quarkus extension public static boolean isValidateCustomResourcesEnvVarSet() { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java index 60ca5fcee3..e9d7eda37f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java @@ -1,7 +1,6 @@ package io.javaoperatorsdk.operator.api.config; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.junit.jupiter.api.Test; @@ -13,6 +12,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; class ConfigurationServiceOverriderTest { @@ -42,31 +42,11 @@ public Config getClientConfiguration() { return new ConfigBuilder().withNamespace("namespace").build(); } - @Override - public int concurrentReconciliationThreads() { - return -1; - } - - @Override - public int getTerminationTimeoutSeconds() { - return -1; - } - @Override public Metrics getMetrics() { return METRICS; } - @Override - public ExecutorService getExecutorService() { - return null; - } - - @Override - public boolean closeClientOnStop() { - return true; - } - @Override public ObjectMapper getObjectMapper() { return OBJECT_MAPPER; @@ -121,4 +101,24 @@ public R clone(R object) { overridden.getLeaderElectionConfiguration()); } + @Test + void shouldReplaceInvalidValues() { + final var original = new BaseConfigurationService(); + + final var service = ConfigurationService.newOverriddenConfigurationService(original, + o -> o + .withConcurrentReconciliationThreads(0) + .withMinConcurrentReconciliationThreads(-1) + .withConcurrentWorkflowExecutorThreads(2) + .withMinConcurrentWorkflowExecutorThreads(3)); + + assertEquals(original.minConcurrentReconciliationThreads(), + service.minConcurrentReconciliationThreads()); + assertEquals(original.concurrentReconciliationThreads(), + service.concurrentReconciliationThreads()); + assertEquals(3, service.minConcurrentWorkflowExecutorThreads()); + assertEquals(original.concurrentWorkflowExecutorThreads(), + service.concurrentWorkflowExecutorThreads()); + } + } From cdec3cf98e1e9684a75c01e6d7984760f97f24a0 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 5 May 2023 23:37:27 +0200 Subject: [PATCH 2/3] fix: need to restart thread pools if the Operator is restarting --- .../io/javaoperatorsdk/operator/Operator.java | 6 ++++- .../api/config/ExecutorServiceManager.java | 22 +++++++++++++------ 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 295b1c8ffb..c3d031b130 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -134,9 +134,13 @@ public synchronized void start() { version.getSdkVersion(), version.getCommit(), version.getBuiltTime()); - final var clientVersion = Version.clientVersion(); log.info("Client version: {}", clientVersion); + + // need to create new thread pools if we're restarting because they've been shut down when we + // previously stopped + configurationService.getExecutorServiceManager().start(configurationService); + // first start the controller manager before leader election, // the leader election would start subsequently the processor if on controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled()); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 3ec6c82c7d..4cab416450 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -25,15 +25,13 @@ public class ExecutorServiceManager { private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class); public static final int MIN_THREAD_NUMBER = 0; - private final ExecutorService executor; - private final ExecutorService workflowExecutor; - private final ExecutorService cachingExecutorService; + private ExecutorService executor; + private ExecutorService workflowExecutor; + private ExecutorService cachingExecutorService; + private boolean started; ExecutorServiceManager(ConfigurationService configurationService) { - this.cachingExecutorService = Executors.newCachedThreadPool(); - this.executor = new InstrumentedExecutorService(configurationService.getExecutorService()); - this.workflowExecutor = - new InstrumentedExecutorService(configurationService.getWorkflowExecutorService()); + start(configurationService); } public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) { @@ -104,6 +102,16 @@ public ExecutorService cachingExecutorService() { return cachingExecutorService; } + public void start(ConfigurationService configurationService) { + if (!started) { + this.cachingExecutorService = Executors.newCachedThreadPool(); + this.executor = new InstrumentedExecutorService(configurationService.getExecutorService()); + this.workflowExecutor = + new InstrumentedExecutorService(configurationService.getWorkflowExecutorService()); + started = true; + } + } + public void stop(Duration gracefulShutdownTimeout) { try { var parallelExec = Executors.newFixedThreadPool(3); From 523532886cc764ab5de2f7bbeb289e36ec51fc83 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Sat, 6 May 2023 09:49:24 +0200 Subject: [PATCH 3/3] fix: started should become false after stopping --- .../operator/api/config/ExecutorServiceManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 4cab416450..4685fb1d57 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -120,6 +120,7 @@ public void stop(Duration gracefulShutdownTimeout) { shutdown(workflowExecutor, gracefulShutdownTimeout), shutdown(cachingExecutorService, gracefulShutdownTimeout))); parallelExec.shutdownNow(); + started = false; } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); Thread.currentThread().interrupt();