diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 295b1c8ffb..c3d031b130 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -134,9 +134,13 @@ public synchronized void start() { version.getSdkVersion(), version.getCommit(), version.getBuiltTime()); - final var clientVersion = Version.clientVersion(); log.info("Client version: {}", clientVersion); + + // need to create new thread pools if we're restarting because they've been shut down when we + // previously stopped + configurationService.getExecutorServiceManager().start(configurationService); + // first start the controller manager before leader election, // the leader election would start subsequently the processor if on controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled()); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java index 3921f9f84e..bf90a230a6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java @@ -17,32 +17,37 @@ public class AbstractConfigurationService implements ConfigurationService { private final Version version; private Cloner cloner; private ObjectMapper mapper; + private ExecutorServiceManager executorServiceManager; public AbstractConfigurationService(Version version) { - this(version, null, null); + this(version, null, null, null); } public AbstractConfigurationService(Version version, Cloner cloner) { - this(version, cloner, null); + this(version, cloner, null, null); } - public AbstractConfigurationService(Version version, Cloner cloner, ObjectMapper mapper) { + public AbstractConfigurationService(Version version, Cloner cloner, ObjectMapper mapper, + ExecutorServiceManager executorServiceManager) { this.version = version; - init(cloner, mapper); + init(cloner, mapper, executorServiceManager); } /** - * Subclasses can call this method to more easily initialize the {@link Cloner} and - * {@link ObjectMapper} associated with this ConfigurationService implementation. This is useful - * in situations where the cloner depends on a mapper that might require additional configuration - * steps before it's ready to be used. + * Subclasses can call this method to more easily initialize the {@link Cloner} + * {@link ObjectMapper} and {@link ExecutorServiceManager} associated with this + * ConfigurationService implementation. This is useful in situations where the cloner depends on a + * mapper that might require additional configuration steps before it's ready to be used. * * @param cloner the {@link Cloner} instance to be used * @param mapper the {@link ObjectMapper} instance to be used + * @param executorServiceManager the {@link ExecutorServiceManager} instance to be used */ - protected void init(Cloner cloner, ObjectMapper mapper) { + protected void init(Cloner cloner, ObjectMapper mapper, + ExecutorServiceManager executorServiceManager) { this.cloner = cloner != null ? cloner : ConfigurationService.super.getResourceCloner(); this.mapper = mapper != null ? mapper : ConfigurationService.super.getObjectMapper(); + this.executorServiceManager = executorServiceManager; } protected void register(ControllerConfiguration config) { @@ -132,4 +137,13 @@ public Cloner getResourceCloner() { public ObjectMapper getObjectMapper() { return mapper; } + + @Override + public ExecutorServiceManager getExecutorServiceManager() { + // lazy init to avoid initializing thread pools for nothing in an overriding scenario + if (executorServiceManager == null) { + executorServiceManager = ConfigurationService.super.getExecutorServiceManager(); + } + return executorServiceManager; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java index 46343a13d0..bd939d4b32 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java @@ -45,7 +45,7 @@ public BaseConfigurationService(Version version) { } public BaseConfigurationService(Version version, Cloner cloner, ObjectMapper mapper) { - super(version, cloner, mapper); + super(version, cloner, mapper, null); } public BaseConfigurationService(Version version, Cloner cloner) { @@ -62,6 +62,7 @@ protected void logMissingReconcilerWarning(String reconcilerKey, String reconcil reconcilersNameMessage); } + @SuppressWarnings("unused") public String getLoggerName() { return LOGGER_NAME; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index dce6033fd0..52d78fb1e9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -4,9 +4,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.slf4j.Logger; @@ -24,6 +21,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import static io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.newThreadPoolExecutor; + /** An interface from which to retrieve configuration information. */ public interface ConfigurationService { @@ -164,15 +163,13 @@ default Metrics getMetrics() { } default ExecutorService getExecutorService() { - return new ThreadPoolExecutor(minConcurrentReconciliationThreads(), - concurrentReconciliationThreads(), - 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); + return newThreadPoolExecutor(minConcurrentReconciliationThreads(), + concurrentReconciliationThreads()); } default ExecutorService getWorkflowExecutorService() { - return new ThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(), - concurrentWorkflowExecutorThreads(), - 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); + return newThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(), + concurrentWorkflowExecutorThreads()); } default boolean closeClientOnStop() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index e8fa2a1577..8d9f4200bc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -57,13 +57,21 @@ public ConfigurationServiceOverrider withConcurrentWorkflowExecutorThreads(int t return this; } + private int minimumMaxValueFor(Integer minValue) { + return minValue != null ? (minValue < 0 ? 0 : minValue) + 1 : 1; + } + public ConfigurationServiceOverrider withMinConcurrentReconciliationThreads(int threadNumber) { - this.minConcurrentReconciliationThreads = threadNumber; + this.minConcurrentReconciliationThreads = Utils.ensureValid(threadNumber, + "minimum reconciliation threads", ExecutorServiceManager.MIN_THREAD_NUMBER, + original.minConcurrentReconciliationThreads()); return this; } public ConfigurationServiceOverrider withMinConcurrentWorkflowExecutorThreads(int threadNumber) { - this.minConcurrentWorkflowExecutorThreads = threadNumber; + this.minConcurrentWorkflowExecutorThreads = Utils.ensureValid(threadNumber, + "minimum workflow execution threads", ExecutorServiceManager.MIN_THREAD_NUMBER, + original.minConcurrentWorkflowExecutorThreads()); return this; } @@ -150,14 +158,22 @@ public boolean checkCRDAndValidateLocalModel() { @Override public int concurrentReconciliationThreads() { - return concurrentReconciliationThreads != null ? concurrentReconciliationThreads - : original.concurrentReconciliationThreads(); + return Utils.ensureValid( + concurrentReconciliationThreads != null ? concurrentReconciliationThreads + : original.concurrentReconciliationThreads(), + "maximum reconciliation threads", + minimumMaxValueFor(minConcurrentReconciliationThreads), + original.concurrentReconciliationThreads()); } @Override public int concurrentWorkflowExecutorThreads() { - return concurrentWorkflowExecutorThreads != null ? concurrentWorkflowExecutorThreads - : original.concurrentWorkflowExecutorThreads(); + return Utils.ensureValid( + concurrentWorkflowExecutorThreads != null ? concurrentWorkflowExecutorThreads + : original.concurrentWorkflowExecutorThreads(), + "maximum workflow execution threads", + minimumMaxValueFor(minConcurrentWorkflowExecutorThreads), + original.concurrentWorkflowExecutorThreads()); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index f57af9fc30..4685fb1d57 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -8,6 +8,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; @@ -20,16 +22,24 @@ import io.javaoperatorsdk.operator.OperatorException; public class ExecutorServiceManager { + private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class); - private final ExecutorService executor; - private final ExecutorService workflowExecutor; - private final ExecutorService cachingExecutorService; + public static final int MIN_THREAD_NUMBER = 0; + private ExecutorService executor; + private ExecutorService workflowExecutor; + private ExecutorService cachingExecutorService; + private boolean started; ExecutorServiceManager(ConfigurationService configurationService) { - this.cachingExecutorService = Executors.newCachedThreadPool(); - this.executor = new InstrumentedExecutorService(configurationService.getExecutorService()); - this.workflowExecutor = - new InstrumentedExecutorService(configurationService.getWorkflowExecutorService()); + start(configurationService); + } + + public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) { + minThreads = Utils.ensureValid(minThreads, "minimum number of threads", MIN_THREAD_NUMBER); + maxThreads = Utils.ensureValid(maxThreads, "maximum number of threads", minThreads + 1); + + return new ThreadPoolExecutor(minThreads, maxThreads, 1, TimeUnit.MINUTES, + new LinkedBlockingDeque<>()); } /** @@ -92,6 +102,16 @@ public ExecutorService cachingExecutorService() { return cachingExecutorService; } + public void start(ConfigurationService configurationService) { + if (!started) { + this.cachingExecutorService = Executors.newCachedThreadPool(); + this.executor = new InstrumentedExecutorService(configurationService.getExecutorService()); + this.workflowExecutor = + new InstrumentedExecutorService(configurationService.getWorkflowExecutorService()); + started = true; + } + } + public void stop(Duration gracefulShutdownTimeout) { try { var parallelExec = Executors.newFixedThreadPool(3); @@ -100,6 +120,7 @@ public void stop(Duration gracefulShutdownTimeout) { shutdown(workflowExecutor, gracefulShutdownTimeout), shutdown(cachingExecutorService, gracefulShutdownTimeout))); parallelExec.shutdownNow(); + started = false; } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); Thread.currentThread().interrupt(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java index 2028d3d5bc..19d8a73689 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java @@ -72,6 +72,24 @@ public static Version loadFromProperties() { builtTime); } + public static int ensureValid(int value, String description, int minValue) { + return ensureValid(value, description, minValue, minValue); + } + + public static int ensureValid(int value, String description, int minValue, int defaultValue) { + if (value < minValue) { + if (defaultValue < minValue) { + throw new IllegalArgumentException( + "Default value for " + description + " must be greater than " + minValue); + } + log.warn("Requested " + description + " should be greater than " + minValue + ". Requested: " + + value + ", using " + defaultValue + (defaultValue == minValue ? "" : " (default)") + + " instead"); + value = defaultValue; + } + return value; + } + @SuppressWarnings("unused") // this is used in the Quarkus extension public static boolean isValidateCustomResourcesEnvVarSet() { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java index 60ca5fcee3..e9d7eda37f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java @@ -1,7 +1,6 @@ package io.javaoperatorsdk.operator.api.config; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.junit.jupiter.api.Test; @@ -13,6 +12,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; class ConfigurationServiceOverriderTest { @@ -42,31 +42,11 @@ public Config getClientConfiguration() { return new ConfigBuilder().withNamespace("namespace").build(); } - @Override - public int concurrentReconciliationThreads() { - return -1; - } - - @Override - public int getTerminationTimeoutSeconds() { - return -1; - } - @Override public Metrics getMetrics() { return METRICS; } - @Override - public ExecutorService getExecutorService() { - return null; - } - - @Override - public boolean closeClientOnStop() { - return true; - } - @Override public ObjectMapper getObjectMapper() { return OBJECT_MAPPER; @@ -121,4 +101,24 @@ public R clone(R object) { overridden.getLeaderElectionConfiguration()); } + @Test + void shouldReplaceInvalidValues() { + final var original = new BaseConfigurationService(); + + final var service = ConfigurationService.newOverriddenConfigurationService(original, + o -> o + .withConcurrentReconciliationThreads(0) + .withMinConcurrentReconciliationThreads(-1) + .withConcurrentWorkflowExecutorThreads(2) + .withMinConcurrentWorkflowExecutorThreads(3)); + + assertEquals(original.minConcurrentReconciliationThreads(), + service.minConcurrentReconciliationThreads()); + assertEquals(original.concurrentReconciliationThreads(), + service.concurrentReconciliationThreads()); + assertEquals(3, service.minConcurrentWorkflowExecutorThreads()); + assertEquals(original.concurrentWorkflowExecutorThreads(), + service.concurrentWorkflowExecutorThreads()); + } + }