From 78d067ea70a6f85e355f614d4d8bd130a80cdc63 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 16 Nov 2022 17:08:21 +0100 Subject: [PATCH 01/15] fix: run event source start on specific thread pool Fixes #1603 --- .../api/config/ExecutorServiceManager.java | 20 ++++++++ .../processing/event/EventSourceManager.java | 17 ++++--- .../source/informer/InformerManager.java | 50 +++++++++++-------- 3 files changed, 59 insertions(+), 28 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 38ed240c97..e77600de25 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -5,6 +5,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -17,6 +18,9 @@ public class ExecutorServiceManager { private static ExecutorServiceManager instance; private final ExecutorService executor; private final ExecutorService workflowExecutor; + + private static final ForkJoinPool threadPool = new ForkJoinPool( + Runtime.getRuntime().availableProcessors()); private final int terminationTimeoutSeconds; private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor, @@ -68,6 +72,21 @@ public ExecutorService workflowExecutorService() { return workflowExecutor; } + public static void executeInParallel(Runnable callable) { + executeInParallel(() -> { + callable.run(); + return null; + }); + } + + public static T executeInParallel(Callable callable) { + try { + return threadPool.submit(callable).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + private void doStop() { try { log.debug("Closing executor"); @@ -80,6 +99,7 @@ private void doStop() { executor.shutdownNow(); // if we timed out, waiting, cancel everything } + threadPool.shutdown(); } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index a85633bb00..0a5dc7d476 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -13,6 +13,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.MissingCRDException; import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.config.NamespaceChangeable; import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.processing.Controller; @@ -65,20 +66,24 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() { @Override public synchronized void start() { startEventSource(eventSources.namedControllerResourceEventSource()); - eventSources.additionalNamedEventSources() + + // starting event sources on the workflow executor which shouldn't be used at this point + ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources() .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)) - .parallel().forEach(this::startEventSource); - eventSources.additionalNamedEventSources() + .parallel() + .forEach(this::startEventSource)); + + ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources() .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)) - .parallel().forEach(this::startEventSource); + .parallel().forEach(this::startEventSource)); } @Override public synchronized void stop() { stopEventSource(eventSources.namedControllerResourceEventSource()); - eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource); + ExecutorServiceManager.executeInParallel( + () -> eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource)); eventSources.clear(); - } @SuppressWarnings("rawtypes") diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 0d22ccdef0..77ec79170a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -24,6 +24,7 @@ import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.config.Cloner; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -45,7 +46,8 @@ public class InformerManager sources.values().parallelStream().forEach(LifecycleAware::start)); } void initSources(MixedOperation, Resource> client, @@ -86,18 +88,19 @@ public void changeNamespaces(Set namespaces) { log.debug("Stopped informer {} for namespaces: {}", this, sourcesToRemove); sourcesToRemove.forEach(k -> sources.remove(k).stop()); - namespaces.forEach(ns -> { - if (!sources.containsKey(ns)) { - final var source = - createEventSource( - client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()), - eventHandler, ns); - source.addIndexers(this.indexers); - source.start(); - log.debug("Registered new {} -> {} for namespace: {}", this, source, - ns); - } - }); + ExecutorServiceManager.executeInParallel( + () -> namespaces.forEach(ns -> { + if (!sources.containsKey(ns)) { + final var source = + createEventSource( + client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()), + eventHandler, ns); + source.addIndexers(this.indexers); + source.start(); + log.debug("Registered new {} -> {} for namespace: {}", this, source, + ns); + } + })); } @@ -113,15 +116,18 @@ private InformerWrapper createEventSource( @Override public void stop() { - log.info("Stopping {}", this); - sources.forEach((ns, source) -> { - try { - log.debug("Stopping informer for namespace: {} -> {}", ns, source); - source.stop(); - } catch (Exception e) { - log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e); - } - }); + ExecutorServiceManager.executeInParallel( + () -> { + log.info("Stopping {}", this); + sources.forEach((ns, source) -> { + try { + log.debug("Stopping informer for namespace: {} -> {}", ns, source); + source.stop(); + } catch (Exception e) { + log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e); + } + }); + }); } @Override From 339dddc26bc9d0b244159a08a4f05c3791947f73 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 17 Nov 2022 10:20:31 +0100 Subject: [PATCH 02/15] fix: make common thread pool attached to instance instead of static --- .../operator/api/config/ExecutorServiceManager.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index e77600de25..fd6f24cc2f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -18,9 +18,8 @@ public class ExecutorServiceManager { private static ExecutorServiceManager instance; private final ExecutorService executor; private final ExecutorService workflowExecutor; - - private static final ForkJoinPool threadPool = new ForkJoinPool( - Runtime.getRuntime().availableProcessors()); + private final ForkJoinPool threadPool = + new ForkJoinPool(Runtime.getRuntime().availableProcessors()); private final int terminationTimeoutSeconds; private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor, @@ -73,13 +72,13 @@ public ExecutorService workflowExecutorService() { } public static void executeInParallel(Runnable callable) { - executeInParallel(() -> { + instance().executeInParallel(() -> { callable.run(); return null; }); } - public static T executeInParallel(Callable callable) { + public T executeInParallel(Callable callable) { try { return threadPool.submit(callable).get(); } catch (InterruptedException | ExecutionException e) { @@ -90,6 +89,7 @@ public static T executeInParallel(Callable callable) { private void doStop() { try { log.debug("Closing executor"); + threadPool.shutdown(); executor.shutdown(); workflowExecutor.shutdown(); if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { @@ -98,8 +98,6 @@ private void doStop() { if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { executor.shutdownNow(); // if we timed out, waiting, cancel everything } - - threadPool.shutdown(); } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); } From fec3817eb93f50649bce2324a538a583cae64a36 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 21 Nov 2022 10:53:06 +0100 Subject: [PATCH 03/15] fix: use workflow service to start/stop event sources IO-bound operations shouldn't use ForkJoinPools and workflow service should be in use during these operations so it should be available to run these tasks. --- .../api/config/ExecutorServiceManager.java | 22 ++++++------------- .../processing/event/EventSourceManager.java | 21 ++++++++++-------- .../source/informer/InformerManager.java | 9 ++++---- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index fd6f24cc2f..c122511283 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -5,7 +5,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -13,13 +12,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.javaoperatorsdk.operator.OperatorException; + public class ExecutorServiceManager { private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class); private static ExecutorServiceManager instance; private final ExecutorService executor; private final ExecutorService workflowExecutor; - private final ForkJoinPool threadPool = - new ForkJoinPool(Runtime.getRuntime().availableProcessors()); private final int terminationTimeoutSeconds; private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor, @@ -71,25 +70,18 @@ public ExecutorService workflowExecutorService() { return workflowExecutor; } - public static void executeInParallel(Runnable callable) { - instance().executeInParallel(() -> { - callable.run(); - return null; - }); - } - - public T executeInParallel(Callable callable) { + public static void executeAndWaitForCompletion(Runnable task) { try { - return threadPool.submit(callable).get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); + instance().workflowExecutorService().submit(task) + .get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new OperatorException("Couldn't execute task", e); } } private void doStop() { try { log.debug("Closing executor"); - threadPool.shutdown(); executor.shutdown(); workflowExecutor.shutdown(); if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 0a5dc7d476..68c165aedf 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -68,21 +68,24 @@ public synchronized void start() { startEventSource(eventSources.namedControllerResourceEventSource()); // starting event sources on the workflow executor which shouldn't be used at this point - ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources() - .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)) - .parallel() - .forEach(this::startEventSource)); + ExecutorServiceManager + .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources() + .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)) + .parallel() + .forEach(this::startEventSource)); - ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources() - .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)) - .parallel().forEach(this::startEventSource)); + ExecutorServiceManager + .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources() + .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)) + .parallel().forEach(this::startEventSource)); } @Override public synchronized void stop() { stopEventSource(eventSources.namedControllerResourceEventSource()); - ExecutorServiceManager.executeInParallel( - () -> eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource)); + ExecutorServiceManager + .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources().parallel() + .forEach(this::stopEventSource)); eventSources.clear(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 77ec79170a..34a34d37e0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -46,8 +46,9 @@ public class InformerManager sources.values().parallelStream().forEach(LifecycleAware::start)); + // make sure informers are all started before proceeding further + ExecutorServiceManager.executeAndWaitForCompletion( + () -> sources.values().parallelStream().forEach(LifecycleAware::start)); } void initSources(MixedOperation, Resource> client, @@ -88,7 +89,7 @@ public void changeNamespaces(Set namespaces) { log.debug("Stopped informer {} for namespaces: {}", this, sourcesToRemove); sourcesToRemove.forEach(k -> sources.remove(k).stop()); - ExecutorServiceManager.executeInParallel( + ExecutorServiceManager.executeAndWaitForCompletion( () -> namespaces.forEach(ns -> { if (!sources.containsKey(ns)) { final var source = @@ -116,7 +117,7 @@ private InformerWrapper createEventSource( @Override public void stop() { - ExecutorServiceManager.executeInParallel( + ExecutorServiceManager.instance().workflowExecutorService().execute( () -> { log.info("Stopping {}", this); sources.forEach((ns, source) -> { From a30dc4b13d87f126c22533b2ed100ad521b59512 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 21 Nov 2022 10:58:07 +0100 Subject: [PATCH 04/15] refactor: enable ExecutorService configuration, add doc --- .../api/config/ExecutorServiceManager.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index c122511283..78368cdfe9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -70,10 +70,26 @@ public ExecutorService workflowExecutorService() { return workflowExecutor; } + /** + * Runs the specified I/O-bound task and waits for its completion using the ExecutorService + * provided by {@link #workflowExecutorService()} + * + * @param task task to run concurrently + */ public static void executeAndWaitForCompletion(Runnable task) { + executeAndWaitForCompletion(task, instance().workflowExecutorService()); + } + + /** + * Executes the specified I/O-bound task using the specified ExecutorService and waits for its + * completion for at most {@link #terminationTimeoutSeconds} seconds. + * + * @param task task to run concurrently + * @param executor ExecutorService used to run the task + */ + public static void executeAndWaitForCompletion(Runnable task, ExecutorService executor) { try { - instance().workflowExecutorService().submit(task) - .get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS); + executor.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new OperatorException("Couldn't execute task", e); } From a0e1c56502f3874143e26e23ec4b65b865857af9 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 21 Nov 2022 11:01:21 +0100 Subject: [PATCH 05/15] refactor: enable ExecutorService configuration, add doc --- .../operator/api/config/ExecutorServiceManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 78368cdfe9..12cb8de1d1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -73,7 +73,7 @@ public ExecutorService workflowExecutorService() { /** * Runs the specified I/O-bound task and waits for its completion using the ExecutorService * provided by {@link #workflowExecutorService()} - * + * * @param task task to run concurrently */ public static void executeAndWaitForCompletion(Runnable task) { @@ -83,7 +83,7 @@ public static void executeAndWaitForCompletion(Runnable task) { /** * Executes the specified I/O-bound task using the specified ExecutorService and waits for its * completion for at most {@link #terminationTimeoutSeconds} seconds. - * + * * @param task task to run concurrently * @param executor ExecutorService used to run the task */ From ce8eebae48874b43eab789da6c6c750302f97451 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 22 Nov 2022 15:07:11 +0100 Subject: [PATCH 06/15] fix: use separate executor to start/stop EventSources --- .../api/config/ExecutorServiceManager.java | 36 +++++++++++++------ .../processing/event/EventSourceManager.java | 6 ++-- .../source/informer/InformerManager.java | 4 +-- 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 12cb8de1d1..621fd0a450 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -5,6 +5,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -19,12 +20,15 @@ public class ExecutorServiceManager { private static ExecutorServiceManager instance; private final ExecutorService executor; private final ExecutorService workflowExecutor; + private final ExecutorService ioBoundExecutor; private final int terminationTimeoutSeconds; private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor, int terminationTimeoutSeconds) { this.executor = new InstrumentedExecutorService(executor); this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor); + this.ioBoundExecutor = new InstrumentedExecutorService( + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); this.terminationTimeoutSeconds = terminationTimeoutSeconds; } @@ -76,8 +80,8 @@ public ExecutorService workflowExecutorService() { * * @param task task to run concurrently */ - public static void executeAndWaitForCompletion(Runnable task) { - executeAndWaitForCompletion(task, instance().workflowExecutorService()); + public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) { + executeAndWaitForCompletion(task, instance().ioBoundExecutor, threadNamePrefix); } /** @@ -87,30 +91,40 @@ public static void executeAndWaitForCompletion(Runnable task) { * @param task task to run concurrently * @param executor ExecutorService used to run the task */ - public static void executeAndWaitForCompletion(Runnable task, ExecutorService executor) { + public static void executeAndWaitForCompletion(Runnable task, ExecutorService executor, + String threadNamePrefix) { + // change thread name for easier debugging + final var thread = Thread.currentThread(); + final var name = thread.getName(); try { + thread.setName(threadNamePrefix + "-" + thread.getId()); executor.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new OperatorException("Couldn't execute task", e); + } finally { + // restore original name + thread.setName(name); } } private void doStop() { try { log.debug("Closing executor"); - executor.shutdown(); - workflowExecutor.shutdown(); - if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { - workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything - } - if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { - executor.shutdownNow(); // if we timed out, waiting, cancel everything - } + shutdown(executor); + shutdown(workflowExecutor); + shutdown(ioBoundExecutor); } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); } } + private void shutdown(ExecutorService executorService) throws InterruptedException { + executorService.shutdown(); + if (!executorService.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { + executorService.shutdownNow(); // if we timed out, waiting, cancel everything + } + } + private static class InstrumentedExecutorService implements ExecutorService { private final boolean debug; private final ExecutorService executor; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 68c165aedf..e020bc71f6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -72,12 +72,12 @@ public synchronized void start() { .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources() .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)) .parallel() - .forEach(this::startEventSource)); + .forEach(this::startEventSource), "LowLevelEventSourceStart"); ExecutorServiceManager .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources() .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)) - .parallel().forEach(this::startEventSource)); + .parallel().forEach(this::startEventSource), "DefaultEventSourceStart"); } @Override @@ -85,7 +85,7 @@ public synchronized void stop() { stopEventSource(eventSources.namedControllerResourceEventSource()); ExecutorServiceManager .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources().parallel() - .forEach(this::stopEventSource)); + .forEach(this::stopEventSource), "EventSourceStop"); eventSources.clear(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 34a34d37e0..cf8990b13d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -48,7 +48,7 @@ public class InformerManager sources.values().parallelStream().forEach(LifecycleAware::start)); + () -> sources.values().parallelStream().forEach(LifecycleAware::start), "InformerStart"); } void initSources(MixedOperation, Resource> client, @@ -101,7 +101,7 @@ public void changeNamespaces(Set namespaces) { log.debug("Registered new {} -> {} for namespace: {}", this, source, ns); } - })); + }), "InformerStart"); } From d03a9c275393787b8c1703b604f518555c5f965f Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 23 Nov 2022 14:33:17 +0100 Subject: [PATCH 07/15] fix: some operations just need to spawn, not be waited on to finish --- .../operator/ControllerManager.java | 18 ++++++++++------ .../api/config/ExecutorServiceManager.java | 13 ++++++++++++ .../processing/event/EventSourceManager.java | 21 +++++++++++-------- .../source/informer/InformerManager.java | 10 +++++---- 4 files changed, 43 insertions(+), 19 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java index 44aa430ec2..b766d3cfda 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java @@ -8,6 +8,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.processing.Controller; /** @@ -32,20 +33,25 @@ public synchronized void shouldStart() { } public synchronized void start(boolean startEventProcessor) { - controllers().parallelStream().forEach(c -> c.start(startEventProcessor)); + ExecutorServiceManager.executeIOBoundTask( + () -> controllers().parallelStream().forEach(c -> c.start(startEventProcessor)), + "ControllerStart"); started = true; } public synchronized void stop() { - controllers().parallelStream().forEach(closeable -> { - log.debug("closing {}", closeable); - closeable.stop(); - }); + ExecutorServiceManager.executeAndWaitForCompletion( + () -> controllers().parallelStream().forEach(closeable -> { + log.debug("closing {}", closeable); + closeable.stop(); + }), "ControllerStop"); started = false; } public synchronized void startEventProcessing() { - controllers().parallelStream().forEach(Controller::startEventProcessing); + ExecutorServiceManager.executeIOBoundTask( + () -> controllers().parallelStream().forEach(Controller::startEventProcessing), + "ControllerEventProcessing"); } @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 621fd0a450..711d37a388 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -107,6 +107,19 @@ public static void executeAndWaitForCompletion(Runnable task, ExecutorService ex } } + public static void executeIOBoundTask(Runnable task, String threadNamePrefix) { + // change thread name for easier debugging + final var thread = Thread.currentThread(); + final var name = thread.getName(); + try { + thread.setName(threadNamePrefix + "-" + thread.getId()); + instance().ioBoundExecutor.execute(task); + } finally { + // restore original name + thread.setName(name); + } + } + private void doStop() { try { log.debug("Closing executor"); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index e020bc71f6..14bf7142f6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -68,24 +68,27 @@ public synchronized void start() { startEventSource(eventSources.namedControllerResourceEventSource()); // starting event sources on the workflow executor which shouldn't be used at this point - ExecutorServiceManager - .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources() + ExecutorServiceManager.executeAndWaitForCompletion( + () -> eventSources.additionalNamedEventSources() .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)) .parallel() - .forEach(this::startEventSource), "LowLevelEventSourceStart"); + .forEach(this::startEventSource), + "LowLevelEventSourceStart"); - ExecutorServiceManager - .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources() + ExecutorServiceManager.executeAndWaitForCompletion( + () -> eventSources.additionalNamedEventSources() .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)) - .parallel().forEach(this::startEventSource), "DefaultEventSourceStart"); + .parallel().forEach(this::startEventSource), + "DefaultEventSourceStart"); } @Override public synchronized void stop() { stopEventSource(eventSources.namedControllerResourceEventSource()); - ExecutorServiceManager - .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources().parallel() - .forEach(this::stopEventSource), "EventSourceStop"); + ExecutorServiceManager.executeIOBoundTask( + () -> eventSources.additionalNamedEventSources().parallel() + .forEach(this::stopEventSource), + "EventSourceStop"); eventSources.clear(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index cf8990b13d..d2a7bf106c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -47,8 +47,9 @@ public class InformerManager sources.values().parallelStream().forEach(LifecycleAware::start), "InformerStart"); + ExecutorServiceManager.executeIOBoundTask( + () -> sources.values().parallelStream().forEach(LifecycleAware::start), + "InformerStart"); } void initSources(MixedOperation, Resource> client, @@ -117,7 +118,7 @@ private InformerWrapper createEventSource( @Override public void stop() { - ExecutorServiceManager.instance().workflowExecutorService().execute( + ExecutorServiceManager.executeAndWaitForCompletion( () -> { log.info("Stopping {}", this); sources.forEach((ns, source) -> { @@ -128,7 +129,8 @@ public void stop() { log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e); } }); - }); + }, + "StopInformer"); } @Override From a50e2a6afd7f6870053cf2e5d22e35b0e3bb24d4 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 23 Nov 2022 16:46:08 +0100 Subject: [PATCH 08/15] fix: if a stop handler is set, use it by default --- .../operator/api/config/ConfigurationService.java | 5 ++++- .../operator/api/config/ConfigurationServiceOverrider.java | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 1fe9485588..07422dc7e9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -164,9 +164,12 @@ default Optional getLeaderElectionConfiguration() { * if false, the startup will ignore recoverable errors, caused for example by RBAC issues, and * will try to reconnect periodically in the background. *

+ *

+ * NOTE: this setting is ignored if an informer stop handler is set + *

*/ default boolean stopOnInformerErrorDuringStartup() { - return true; + return getInformerStoppedHandler().isEmpty(); } /** diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 2b8aee9708..8d35527278 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -188,7 +188,9 @@ public Optional getInformerStoppedHandler() { @Override public boolean stopOnInformerErrorDuringStartup() { - return stopOnInformerErrorDuringStartup != null ? stopOnInformerErrorDuringStartup + // only stop on informer error if we didn't set a stop handler for the informers + return stopOnInformerErrorDuringStartup != null && informerStoppedHandler == null + ? stopOnInformerErrorDuringStartup : super.stopOnInformerErrorDuringStartup(); } From 407e9a574fa8008e09feaf29396f20e453e0dfea Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 23 Nov 2022 17:40:12 +0100 Subject: [PATCH 09/15] fix: use a new executor service for each task to run --- .../operator/ControllerManager.java | 4 +- .../api/config/ExecutorServiceManager.java | 43 ++++--------------- .../processing/event/EventSourceManager.java | 2 +- .../source/informer/InformerManager.java | 17 +++++++- .../source/informer/InformerWrapper.java | 6 ++- .../event/EventSourceManagerTest.java | 5 ++- .../informer/InformerEventSourceTest.java | 3 +- 7 files changed, 36 insertions(+), 44 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java index b766d3cfda..095b059362 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java @@ -33,7 +33,7 @@ public synchronized void shouldStart() { } public synchronized void start(boolean startEventProcessor) { - ExecutorServiceManager.executeIOBoundTask( + ExecutorServiceManager.executeAndWaitForCompletion( () -> controllers().parallelStream().forEach(c -> c.start(startEventProcessor)), "ControllerStart"); started = true; @@ -49,7 +49,7 @@ public synchronized void stop() { } public synchronized void startEventProcessing() { - ExecutorServiceManager.executeIOBoundTask( + ExecutorServiceManager.executeAndWaitForCompletion( () -> controllers().parallelStream().forEach(Controller::startEventProcessing), "ControllerEventProcessing"); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 711d37a388..5b96efec1f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -20,15 +20,12 @@ public class ExecutorServiceManager { private static ExecutorServiceManager instance; private final ExecutorService executor; private final ExecutorService workflowExecutor; - private final ExecutorService ioBoundExecutor; private final int terminationTimeoutSeconds; private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor, int terminationTimeoutSeconds) { this.executor = new InstrumentedExecutorService(executor); this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor); - this.ioBoundExecutor = new InstrumentedExecutorService( - Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); this.terminationTimeoutSeconds = terminationTimeoutSeconds; } @@ -75,30 +72,22 @@ public ExecutorService workflowExecutorService() { } /** - * Runs the specified I/O-bound task and waits for its completion using the ExecutorService - * provided by {@link #workflowExecutorService()} + * Runs the specified I/O-bound task and waits for its completion using the new ExecutorService * * @param task task to run concurrently + * @param threadNamePrefix the prefix with which to prefix thread names when tasks are run this + * way */ public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) { - executeAndWaitForCompletion(task, instance().ioBoundExecutor, threadNamePrefix); - } - - /** - * Executes the specified I/O-bound task using the specified ExecutorService and waits for its - * completion for at most {@link #terminationTimeoutSeconds} seconds. - * - * @param task task to run concurrently - * @param executor ExecutorService used to run the task - */ - public static void executeAndWaitForCompletion(Runnable task, ExecutorService executor, - String threadNamePrefix) { + final var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + ExecutorService instrumented = new InstrumentedExecutorService(executor); // change thread name for easier debugging final var thread = Thread.currentThread(); final var name = thread.getName(); try { thread.setName(threadNamePrefix + "-" + thread.getId()); - executor.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS); + instrumented.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS); + shutdown(instrumented); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new OperatorException("Couldn't execute task", e); } finally { @@ -107,33 +96,19 @@ public static void executeAndWaitForCompletion(Runnable task, ExecutorService ex } } - public static void executeIOBoundTask(Runnable task, String threadNamePrefix) { - // change thread name for easier debugging - final var thread = Thread.currentThread(); - final var name = thread.getName(); - try { - thread.setName(threadNamePrefix + "-" + thread.getId()); - instance().ioBoundExecutor.execute(task); - } finally { - // restore original name - thread.setName(name); - } - } - private void doStop() { try { log.debug("Closing executor"); shutdown(executor); shutdown(workflowExecutor); - shutdown(ioBoundExecutor); } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); } } - private void shutdown(ExecutorService executorService) throws InterruptedException { + private static void shutdown(ExecutorService executorService) throws InterruptedException { executorService.shutdown(); - if (!executorService.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { + if (!executorService.awaitTermination(instance().terminationTimeoutSeconds, TimeUnit.SECONDS)) { executorService.shutdownNow(); // if we timed out, waiting, cancel everything } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 14bf7142f6..2d16c0f87e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -85,7 +85,7 @@ public synchronized void start() { @Override public synchronized void stop() { stopEventSource(eventSources.namedControllerResourceEventSource()); - ExecutorServiceManager.executeIOBoundTask( + ExecutorServiceManager.executeAndWaitForCompletion( () -> eventSources.additionalNamedEventSources().parallel() .forEach(this::stopEventSource), "EventSourceStop"); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index d2a7bf106c..7f8f022378 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -47,8 +47,21 @@ public class InformerManager sources.values().parallelStream().forEach(LifecycleAware::start), + ExecutorServiceManager.executeAndWaitForCompletion( + () -> sources.values().parallelStream().forEach(source -> { + // change thread name for easier debugging + final var thread = Thread.currentThread(); + final var name = thread.getName(); + try { + thread.setName(source.informerInfo() + " " + thread.getId()); + source.start(); + } catch (Exception e) { + throw new OperatorException("Couldn't start informer: " + source, e); + } finally { + // restore original name + thread.setName(name); + } + }), "InformerStart"); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 601cb0c10c..4535aa6bc5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -143,6 +143,10 @@ public List byIndex(String indexName, String indexKey) { @Override public String toString() { - return "InformerWrapper [" + versionedFullResourceName() + "] (" + informer + ')'; + return informerInfo() + " (" + informer + ')'; + } + + String informerInfo() { + return "InformerWrapper [" + versionedFullResourceName() + "]"; } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java index ab345145da..51a4b597f7 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java @@ -4,6 +4,7 @@ import org.junit.jupiter.api.Test; +import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.OperatorException; @@ -171,9 +172,9 @@ void changesNamespacesOnControllerAndInformerEventSources() { } private EventSourceManager initManager() { - final var configuration = MockControllerConfiguration.forResource(HasMetadata.class); + final var configuration = MockControllerConfiguration.forResource(ConfigMap.class); final Controller controller = new Controller(mock(Reconciler.class), configuration, - MockKubernetesClient.client(HasMetadata.class)); + MockKubernetesClient.client(ConfigMap.class)); return new EventSourceManager(controller); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 8b5c0202c0..a01350d18f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -19,7 +19,6 @@ import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; @@ -259,7 +258,7 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() { // by default informer fails to start if there is an exception in the client on start. // Throws the exception further. - assertThrows(RuntimeException.class, () -> informerEventSource.start()); + informerEventSource.start(); verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception)); } finally { ConfigurationServiceProvider.reset(); From de6079c53e0510d9b898143f3ba9c0c032a1cda5 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 23 Nov 2022 18:12:51 +0100 Subject: [PATCH 10/15] fix: use cache sync timeout instead of termination one --- .../operator/api/config/ExecutorServiceManager.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 5b96efec1f..8cd21cb565 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -86,7 +86,9 @@ public static void executeAndWaitForCompletion(Runnable task, String threadNameP final var name = thread.getName(); try { thread.setName(threadNamePrefix + "-" + thread.getId()); - instrumented.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS); + instrumented.submit(task) + .get(ConfigurationServiceProvider.instance().cacheSyncTimeout().toSeconds(), + TimeUnit.SECONDS); shutdown(instrumented); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new OperatorException("Couldn't execute task", e); From 06a7e13981742aa474b75bfe7a1a04836a8389e8 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 23 Nov 2022 22:01:28 +0100 Subject: [PATCH 11/15] fix: controllers starting is not io-bound per se --- .../operator/ControllerManager.java | 18 ++++++------------ .../operator/processing/Controller.java | 2 +- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java index 095b059362..44aa430ec2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java @@ -8,7 +8,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.processing.Controller; /** @@ -33,25 +32,20 @@ public synchronized void shouldStart() { } public synchronized void start(boolean startEventProcessor) { - ExecutorServiceManager.executeAndWaitForCompletion( - () -> controllers().parallelStream().forEach(c -> c.start(startEventProcessor)), - "ControllerStart"); + controllers().parallelStream().forEach(c -> c.start(startEventProcessor)); started = true; } public synchronized void stop() { - ExecutorServiceManager.executeAndWaitForCompletion( - () -> controllers().parallelStream().forEach(closeable -> { - log.debug("closing {}", closeable); - closeable.stop(); - }), "ControllerStop"); + controllers().parallelStream().forEach(closeable -> { + log.debug("closing {}", closeable); + closeable.stop(); + }); started = false; } public synchronized void startEventProcessing() { - ExecutorServiceManager.executeAndWaitForCompletion( - () -> controllers().parallelStream().forEach(Controller::startEventProcessing), - "ControllerEventProcessing"); + controllers().parallelStream().forEach(Controller::startEventProcessing); } @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index a34c4b38f1..ca27ad4f2c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -360,8 +360,8 @@ public void changeNamespaces(Set namespaces) { } public synchronized void startEventProcessing() { - log.info("Started event processing for controller: {}", configuration.getName()); eventProcessor.start(); + log.info("Started event processing for controller: {}", configuration.getName()); } private void throwMissingCRDException(String crdName, String specVersion, String controllerName) { From 47a8891a2237f81bb9a43b99b9d09196c1a39a21 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 24 Nov 2022 09:51:14 +0100 Subject: [PATCH 12/15] revert: fix: if a stop handler is set, use it by default This reverts commit 7d007db5fb4636637e782a5b48ca3dc2a616022e. --- .../operator/api/config/ConfigurationService.java | 5 +---- .../operator/api/config/ConfigurationServiceOverrider.java | 4 +--- .../event/source/informer/InformerEventSourceTest.java | 4 +++- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 07422dc7e9..1fe9485588 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -164,12 +164,9 @@ default Optional getLeaderElectionConfiguration() { * if false, the startup will ignore recoverable errors, caused for example by RBAC issues, and * will try to reconnect periodically in the background. *

- *

- * NOTE: this setting is ignored if an informer stop handler is set - *

*/ default boolean stopOnInformerErrorDuringStartup() { - return getInformerStoppedHandler().isEmpty(); + return true; } /** diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 8d35527278..2b8aee9708 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -188,9 +188,7 @@ public Optional getInformerStoppedHandler() { @Override public boolean stopOnInformerErrorDuringStartup() { - // only stop on informer error if we didn't set a stop handler for the informers - return stopOnInformerErrorDuringStartup != null && informerStoppedHandler == null - ? stopOnInformerErrorDuringStartup + return stopOnInformerErrorDuringStartup != null ? stopOnInformerErrorDuringStartup : super.stopOnInformerErrorDuringStartup(); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index a01350d18f..a33658415e 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -10,6 +10,7 @@ import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.MockKubernetesClient; +import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler; import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; @@ -19,6 +20,7 @@ import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; @@ -258,7 +260,7 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() { // by default informer fails to start if there is an exception in the client on start. // Throws the exception further. - informerEventSource.start(); + assertThrows(OperatorException.class, () -> informerEventSource.start()); verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception)); } finally { ConfigurationServiceProvider.reset(); From 09bd45cad6feecff65ffdb5a8fb648933f367cd5 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 24 Nov 2022 10:19:33 +0100 Subject: [PATCH 13/15] fix: re-interrupt thread on InterruptedException --- .../operator/api/config/ExecutorServiceManager.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 8cd21cb565..c8932ddeb5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -90,7 +90,10 @@ public static void executeAndWaitForCompletion(Runnable task, String threadNameP .get(ConfigurationServiceProvider.instance().cacheSyncTimeout().toSeconds(), TimeUnit.SECONDS); shutdown(instrumented); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + } catch (InterruptedException e) { + thread.interrupt(); + throw new OperatorException("Couldn't execute task", e); + } catch (ExecutionException | TimeoutException e) { throw new OperatorException("Couldn't execute task", e); } finally { // restore original name @@ -105,6 +108,7 @@ private void doStop() { shutdown(workflowExecutor); } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); + Thread.currentThread().interrupt(); } } From 824c192491e71c2de0d6734a22bcfe1bb18c6cd6 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 24 Nov 2022 12:39:08 +0100 Subject: [PATCH 14/15] fix: informers are already started asynchronously --- .../source/informer/InformerManager.java | 65 +++++++------------ .../source/informer/InformerWrapper.java | 9 ++- 2 files changed, 30 insertions(+), 44 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 7f8f022378..801e3e3778 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -24,7 +24,6 @@ import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.config.Cloner; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; -import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -47,22 +46,7 @@ public class InformerManager sources.values().parallelStream().forEach(source -> { - // change thread name for easier debugging - final var thread = Thread.currentThread(); - final var name = thread.getName(); - try { - thread.setName(source.informerInfo() + " " + thread.getId()); - source.start(); - } catch (Exception e) { - throw new OperatorException("Couldn't start informer: " + source, e); - } finally { - // restore original name - thread.setName(name); - } - }), - "InformerStart"); + sources.values().parallelStream().forEach(InformerWrapper::start); } void initSources(MixedOperation, Resource> client, @@ -103,19 +87,18 @@ public void changeNamespaces(Set namespaces) { log.debug("Stopped informer {} for namespaces: {}", this, sourcesToRemove); sourcesToRemove.forEach(k -> sources.remove(k).stop()); - ExecutorServiceManager.executeAndWaitForCompletion( - () -> namespaces.forEach(ns -> { - if (!sources.containsKey(ns)) { - final var source = - createEventSource( - client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()), - eventHandler, ns); - source.addIndexers(this.indexers); - source.start(); - log.debug("Registered new {} -> {} for namespace: {}", this, source, - ns); - } - }), "InformerStart"); + namespaces.forEach(ns -> { + if (!sources.containsKey(ns)) { + final var source = + createEventSource( + client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()), + eventHandler, ns); + source.addIndexers(this.indexers); + source.start(); + log.debug("Registered new {} -> {} for namespace: {}", this, source, + ns); + } + }); } @@ -131,19 +114,15 @@ private InformerWrapper createEventSource( @Override public void stop() { - ExecutorServiceManager.executeAndWaitForCompletion( - () -> { - log.info("Stopping {}", this); - sources.forEach((ns, source) -> { - try { - log.debug("Stopping informer for namespace: {} -> {}", ns, source); - source.stop(); - } catch (Exception e) { - log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e); - } - }); - }, - "StopInformer"); + log.info("Stopping {}", this); + sources.forEach((ns, source) -> { + try { + log.debug("Stopping informer for namespace: {} -> {}", ns, source); + source.stop(); + } catch (Exception e) { + log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e); + } + }); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 4535aa6bc5..de81815a96 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -64,7 +64,11 @@ public void start() throws OperatorException { if (!configService.stopOnInformerErrorDuringStartup()) { informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t)); } + // change thread name for easier debugging + final var thread = Thread.currentThread(); + final var name = thread.getName(); try { + thread.setName(informerInfo() + " " + thread.getId()); var start = informer.start(); // note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is // false, and there is a rbac issue the get never returns; therefore operator never really @@ -81,6 +85,9 @@ public void start() throws OperatorException { } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); + } finally { + // restore original name + thread.setName(name); } } catch (Exception e) { @@ -146,7 +153,7 @@ public String toString() { return informerInfo() + " (" + informer + ')'; } - String informerInfo() { + private String informerInfo() { return "InformerWrapper [" + versionedFullResourceName() + "]"; } } From 83c828b11f68289a414ff8a065072fa20a046fa5 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 24 Nov 2022 17:58:53 +0100 Subject: [PATCH 15/15] fix: make sure we run each task on a different thread --- .../api/config/ExecutorServiceManager.java | 60 +++++++++---------- .../processing/event/EventSourceManager.java | 45 ++++++++------ .../processing/event/NamedEventSource.java | 7 +++ .../source/informer/InformerManager.java | 1 - 4 files changed, 62 insertions(+), 51 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index c8932ddeb5..c4193b9a2a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -9,12 +9,13 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.javaoperatorsdk.operator.OperatorException; - public class ExecutorServiceManager { private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class); private static ExecutorServiceManager instance; @@ -63,6 +64,31 @@ public synchronized static ExecutorServiceManager instance() { return instance; } + public static void executeAndWaitForAllToComplete(Stream stream, + Function task, Function threadNamer) { + final var instrumented = new InstrumentedExecutorService( + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); + + try { + instrumented.invokeAll(stream.parallel().map(item -> (Callable) () -> { + // change thread name for easier debugging + final var thread = Thread.currentThread(); + final var name = thread.getName(); + thread.setName(threadNamer.apply(item)); + try { + task.apply(item); + return null; + } finally { + // restore original name + thread.setName(name); + } + }).collect(Collectors.toList())); + shutdown(instrumented); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + public ExecutorService executorService() { return executor; } @@ -71,36 +97,6 @@ public ExecutorService workflowExecutorService() { return workflowExecutor; } - /** - * Runs the specified I/O-bound task and waits for its completion using the new ExecutorService - * - * @param task task to run concurrently - * @param threadNamePrefix the prefix with which to prefix thread names when tasks are run this - * way - */ - public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) { - final var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - ExecutorService instrumented = new InstrumentedExecutorService(executor); - // change thread name for easier debugging - final var thread = Thread.currentThread(); - final var name = thread.getName(); - try { - thread.setName(threadNamePrefix + "-" + thread.getId()); - instrumented.submit(task) - .get(ConfigurationServiceProvider.instance().cacheSyncTimeout().toSeconds(), - TimeUnit.SECONDS); - shutdown(instrumented); - } catch (InterruptedException e) { - thread.interrupt(); - throw new OperatorException("Couldn't execute task", e); - } catch (ExecutionException | TimeoutException e) { - throw new OperatorException("Couldn't execute task", e); - } finally { - // restore original name - thread.setName(name); - } - } - private void doStop() { try { log.debug("Closing executor"); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 2d16c0f87e..8e9c981031 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -67,28 +68,34 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() { public synchronized void start() { startEventSource(eventSources.namedControllerResourceEventSource()); - // starting event sources on the workflow executor which shouldn't be used at this point - ExecutorServiceManager.executeAndWaitForCompletion( - () -> eventSources.additionalNamedEventSources() - .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)) - .parallel() - .forEach(this::startEventSource), - "LowLevelEventSourceStart"); + ExecutorServiceManager.executeAndWaitForAllToComplete( + eventSources.additionalNamedEventSources() + .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)), + this::startEventSource, + getThreadNamer("start")); - ExecutorServiceManager.executeAndWaitForCompletion( - () -> eventSources.additionalNamedEventSources() - .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)) - .parallel().forEach(this::startEventSource), - "DefaultEventSourceStart"); + ExecutorServiceManager.executeAndWaitForAllToComplete( + eventSources.additionalNamedEventSources() + .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)), + this::startEventSource, + getThreadNamer("start")); + } + + private static Function getThreadNamer(String stage) { + return es -> { + final var name = es.name(); + return es.priority() + " " + stage + " -> " + + (es.isNameSet() ? name + " " + es.original().getClass().getSimpleName() : name); + }; } @Override public synchronized void stop() { stopEventSource(eventSources.namedControllerResourceEventSource()); - ExecutorServiceManager.executeAndWaitForCompletion( - () -> eventSources.additionalNamedEventSources().parallel() - .forEach(this::stopEventSource), - "EventSourceStop"); + ExecutorServiceManager.executeAndWaitForAllToComplete( + eventSources.additionalNamedEventSources(), + this::stopEventSource, + getThreadNamer("stop")); eventSources.clear(); } @@ -105,7 +112,7 @@ private void logEventSourceEvent(NamedEventSource eventSource, String event) { } } - private void startEventSource(NamedEventSource eventSource) { + private Void startEventSource(NamedEventSource eventSource) { try { logEventSourceEvent(eventSource, "Starting"); eventSource.start(); @@ -115,9 +122,10 @@ private void startEventSource(NamedEventSource eventSource) { } catch (Exception e) { throw new OperatorException("Couldn't start source " + eventSource.name(), e); } + return null; } - private void stopEventSource(NamedEventSource eventSource) { + private Void stopEventSource(NamedEventSource eventSource) { try { logEventSourceEvent(eventSource, "Stopping"); eventSource.stop(); @@ -125,6 +133,7 @@ private void stopEventSource(NamedEventSource eventSource) { } catch (Exception e) { log.warn("Error closing {} -> {}", eventSource.name(), e); } + return null; } public final void registerEventSource(EventSource eventSource) throws OperatorException { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java index 13d5a10323..4787e675c9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java @@ -4,6 +4,7 @@ import java.util.Optional; import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.processing.event.source.Configurable; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority; @@ -13,10 +14,12 @@ class NamedEventSource implements EventSource, EventSourceMetadata { private final EventSource original; private final String name; + private final boolean nameSet; NamedEventSource(EventSource original, String name) { this.original = original; this.name = name; + nameSet = !name.equals(EventSourceInitializer.generateNameFor(original)); } @Override @@ -95,4 +98,8 @@ public int hashCode() { public EventSourceStartPriority priority() { return original.priority(); } + + public boolean isNameSet() { + return nameSet; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 801e3e3778..ba7aa9f67f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -114,7 +114,6 @@ private InformerWrapper createEventSource( @Override public void stop() { - log.info("Stopping {}", this); sources.forEach((ns, source) -> { try { log.debug("Stopping informer for namespace: {} -> {}", ns, source);