From 78d067ea70a6f85e355f614d4d8bd130a80cdc63 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Wed, 16 Nov 2022 17:08:21 +0100
Subject: [PATCH 01/15] fix: run event source start on specific thread pool
Fixes #1603
---
.../api/config/ExecutorServiceManager.java | 20 ++++++++
.../processing/event/EventSourceManager.java | 17 ++++---
.../source/informer/InformerManager.java | 50 +++++++++++--------
3 files changed, 59 insertions(+), 28 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index 38ed240c97..e77600de25 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -5,6 +5,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -17,6 +18,9 @@ public class ExecutorServiceManager {
private static ExecutorServiceManager instance;
private final ExecutorService executor;
private final ExecutorService workflowExecutor;
+
+ private static final ForkJoinPool threadPool = new ForkJoinPool(
+ Runtime.getRuntime().availableProcessors());
private final int terminationTimeoutSeconds;
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
@@ -68,6 +72,21 @@ public ExecutorService workflowExecutorService() {
return workflowExecutor;
}
+ public static void executeInParallel(Runnable callable) {
+ executeInParallel(() -> {
+ callable.run();
+ return null;
+ });
+ }
+
+ public static T executeInParallel(Callable callable) {
+ try {
+ return threadPool.submit(callable).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private void doStop() {
try {
log.debug("Closing executor");
@@ -80,6 +99,7 @@ private void doStop() {
executor.shutdownNow(); // if we timed out, waiting, cancel everything
}
+ threadPool.shutdown();
} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
index a85633bb00..0a5dc7d476 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
@@ -13,6 +13,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.MissingCRDException;
import io.javaoperatorsdk.operator.OperatorException;
+import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.processing.Controller;
@@ -65,20 +66,24 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
@Override
public synchronized void start() {
startEventSource(eventSources.namedControllerResourceEventSource());
- eventSources.additionalNamedEventSources()
+
+ // starting event sources on the workflow executor which shouldn't be used at this point
+ ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
- .parallel().forEach(this::startEventSource);
- eventSources.additionalNamedEventSources()
+ .parallel()
+ .forEach(this::startEventSource));
+
+ ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
- .parallel().forEach(this::startEventSource);
+ .parallel().forEach(this::startEventSource));
}
@Override
public synchronized void stop() {
stopEventSource(eventSources.namedControllerResourceEventSource());
- eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource);
+ ExecutorServiceManager.executeInParallel(
+ () -> eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource));
eventSources.clear();
-
}
@SuppressWarnings("rawtypes")
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
index 0d22ccdef0..77ec79170a 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
@@ -24,6 +24,7 @@
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
+import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -45,7 +46,8 @@ public class InformerManager sources.values().parallelStream().forEach(LifecycleAware::start));
}
void initSources(MixedOperation, Resource> client,
@@ -86,18 +88,19 @@ public void changeNamespaces(Set namespaces) {
log.debug("Stopped informer {} for namespaces: {}", this, sourcesToRemove);
sourcesToRemove.forEach(k -> sources.remove(k).stop());
- namespaces.forEach(ns -> {
- if (!sources.containsKey(ns)) {
- final var source =
- createEventSource(
- client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()),
- eventHandler, ns);
- source.addIndexers(this.indexers);
- source.start();
- log.debug("Registered new {} -> {} for namespace: {}", this, source,
- ns);
- }
- });
+ ExecutorServiceManager.executeInParallel(
+ () -> namespaces.forEach(ns -> {
+ if (!sources.containsKey(ns)) {
+ final var source =
+ createEventSource(
+ client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()),
+ eventHandler, ns);
+ source.addIndexers(this.indexers);
+ source.start();
+ log.debug("Registered new {} -> {} for namespace: {}", this, source,
+ ns);
+ }
+ }));
}
@@ -113,15 +116,18 @@ private InformerWrapper createEventSource(
@Override
public void stop() {
- log.info("Stopping {}", this);
- sources.forEach((ns, source) -> {
- try {
- log.debug("Stopping informer for namespace: {} -> {}", ns, source);
- source.stop();
- } catch (Exception e) {
- log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
- }
- });
+ ExecutorServiceManager.executeInParallel(
+ () -> {
+ log.info("Stopping {}", this);
+ sources.forEach((ns, source) -> {
+ try {
+ log.debug("Stopping informer for namespace: {} -> {}", ns, source);
+ source.stop();
+ } catch (Exception e) {
+ log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
+ }
+ });
+ });
}
@Override
From 339dddc26bc9d0b244159a08a4f05c3791947f73 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Thu, 17 Nov 2022 10:20:31 +0100
Subject: [PATCH 02/15] fix: make common thread pool attached to instance
instead of static
---
.../operator/api/config/ExecutorServiceManager.java | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index e77600de25..fd6f24cc2f 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -18,9 +18,8 @@ public class ExecutorServiceManager {
private static ExecutorServiceManager instance;
private final ExecutorService executor;
private final ExecutorService workflowExecutor;
-
- private static final ForkJoinPool threadPool = new ForkJoinPool(
- Runtime.getRuntime().availableProcessors());
+ private final ForkJoinPool threadPool =
+ new ForkJoinPool(Runtime.getRuntime().availableProcessors());
private final int terminationTimeoutSeconds;
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
@@ -73,13 +72,13 @@ public ExecutorService workflowExecutorService() {
}
public static void executeInParallel(Runnable callable) {
- executeInParallel(() -> {
+ instance().executeInParallel(() -> {
callable.run();
return null;
});
}
- public static T executeInParallel(Callable callable) {
+ public T executeInParallel(Callable callable) {
try {
return threadPool.submit(callable).get();
} catch (InterruptedException | ExecutionException e) {
@@ -90,6 +89,7 @@ public static T executeInParallel(Callable callable) {
private void doStop() {
try {
log.debug("Closing executor");
+ threadPool.shutdown();
executor.shutdown();
workflowExecutor.shutdown();
if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
@@ -98,8 +98,6 @@ private void doStop() {
if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
executor.shutdownNow(); // if we timed out, waiting, cancel everything
}
-
- threadPool.shutdown();
} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
}
From fec3817eb93f50649bce2324a538a583cae64a36 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Mon, 21 Nov 2022 10:53:06 +0100
Subject: [PATCH 03/15] fix: use workflow service to start/stop event sources
IO-bound operations shouldn't use ForkJoinPools and workflow service
should be in use during these operations so it should be available to
run these tasks.
---
.../api/config/ExecutorServiceManager.java | 22 ++++++-------------
.../processing/event/EventSourceManager.java | 21 ++++++++++--------
.../source/informer/InformerManager.java | 9 ++++----
3 files changed, 24 insertions(+), 28 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index fd6f24cc2f..c122511283 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -5,7 +5,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -13,13 +12,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.javaoperatorsdk.operator.OperatorException;
+
public class ExecutorServiceManager {
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
private static ExecutorServiceManager instance;
private final ExecutorService executor;
private final ExecutorService workflowExecutor;
- private final ForkJoinPool threadPool =
- new ForkJoinPool(Runtime.getRuntime().availableProcessors());
private final int terminationTimeoutSeconds;
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
@@ -71,25 +70,18 @@ public ExecutorService workflowExecutorService() {
return workflowExecutor;
}
- public static void executeInParallel(Runnable callable) {
- instance().executeInParallel(() -> {
- callable.run();
- return null;
- });
- }
-
- public T executeInParallel(Callable callable) {
+ public static void executeAndWaitForCompletion(Runnable task) {
try {
- return threadPool.submit(callable).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
+ instance().workflowExecutorService().submit(task)
+ .get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new OperatorException("Couldn't execute task", e);
}
}
private void doStop() {
try {
log.debug("Closing executor");
- threadPool.shutdown();
executor.shutdown();
workflowExecutor.shutdown();
if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
index 0a5dc7d476..68c165aedf 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
@@ -68,21 +68,24 @@ public synchronized void start() {
startEventSource(eventSources.namedControllerResourceEventSource());
// starting event sources on the workflow executor which shouldn't be used at this point
- ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources()
- .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
- .parallel()
- .forEach(this::startEventSource));
+ ExecutorServiceManager
+ .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
+ .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
+ .parallel()
+ .forEach(this::startEventSource));
- ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources()
- .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
- .parallel().forEach(this::startEventSource));
+ ExecutorServiceManager
+ .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
+ .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
+ .parallel().forEach(this::startEventSource));
}
@Override
public synchronized void stop() {
stopEventSource(eventSources.namedControllerResourceEventSource());
- ExecutorServiceManager.executeInParallel(
- () -> eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource));
+ ExecutorServiceManager
+ .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources().parallel()
+ .forEach(this::stopEventSource));
eventSources.clear();
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
index 77ec79170a..34a34d37e0 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
@@ -46,8 +46,9 @@ public class InformerManager sources.values().parallelStream().forEach(LifecycleAware::start));
+ // make sure informers are all started before proceeding further
+ ExecutorServiceManager.executeAndWaitForCompletion(
+ () -> sources.values().parallelStream().forEach(LifecycleAware::start));
}
void initSources(MixedOperation, Resource> client,
@@ -88,7 +89,7 @@ public void changeNamespaces(Set namespaces) {
log.debug("Stopped informer {} for namespaces: {}", this, sourcesToRemove);
sourcesToRemove.forEach(k -> sources.remove(k).stop());
- ExecutorServiceManager.executeInParallel(
+ ExecutorServiceManager.executeAndWaitForCompletion(
() -> namespaces.forEach(ns -> {
if (!sources.containsKey(ns)) {
final var source =
@@ -116,7 +117,7 @@ private InformerWrapper createEventSource(
@Override
public void stop() {
- ExecutorServiceManager.executeInParallel(
+ ExecutorServiceManager.instance().workflowExecutorService().execute(
() -> {
log.info("Stopping {}", this);
sources.forEach((ns, source) -> {
From a30dc4b13d87f126c22533b2ed100ad521b59512 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Mon, 21 Nov 2022 10:58:07 +0100
Subject: [PATCH 04/15] refactor: enable ExecutorService configuration, add doc
---
.../api/config/ExecutorServiceManager.java | 20 +++++++++++++++++--
1 file changed, 18 insertions(+), 2 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index c122511283..78368cdfe9 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -70,10 +70,26 @@ public ExecutorService workflowExecutorService() {
return workflowExecutor;
}
+ /**
+ * Runs the specified I/O-bound task and waits for its completion using the ExecutorService
+ * provided by {@link #workflowExecutorService()}
+ *
+ * @param task task to run concurrently
+ */
public static void executeAndWaitForCompletion(Runnable task) {
+ executeAndWaitForCompletion(task, instance().workflowExecutorService());
+ }
+
+ /**
+ * Executes the specified I/O-bound task using the specified ExecutorService and waits for its
+ * completion for at most {@link #terminationTimeoutSeconds} seconds.
+ *
+ * @param task task to run concurrently
+ * @param executor ExecutorService used to run the task
+ */
+ public static void executeAndWaitForCompletion(Runnable task, ExecutorService executor) {
try {
- instance().workflowExecutorService().submit(task)
- .get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS);
+ executor.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new OperatorException("Couldn't execute task", e);
}
From a0e1c56502f3874143e26e23ec4b65b865857af9 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Mon, 21 Nov 2022 11:01:21 +0100
Subject: [PATCH 05/15] refactor: enable ExecutorService configuration, add doc
---
.../operator/api/config/ExecutorServiceManager.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index 78368cdfe9..12cb8de1d1 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -73,7 +73,7 @@ public ExecutorService workflowExecutorService() {
/**
* Runs the specified I/O-bound task and waits for its completion using the ExecutorService
* provided by {@link #workflowExecutorService()}
- *
+ *
* @param task task to run concurrently
*/
public static void executeAndWaitForCompletion(Runnable task) {
@@ -83,7 +83,7 @@ public static void executeAndWaitForCompletion(Runnable task) {
/**
* Executes the specified I/O-bound task using the specified ExecutorService and waits for its
* completion for at most {@link #terminationTimeoutSeconds} seconds.
- *
+ *
* @param task task to run concurrently
* @param executor ExecutorService used to run the task
*/
From ce8eebae48874b43eab789da6c6c750302f97451 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Tue, 22 Nov 2022 15:07:11 +0100
Subject: [PATCH 06/15] fix: use separate executor to start/stop EventSources
---
.../api/config/ExecutorServiceManager.java | 36 +++++++++++++------
.../processing/event/EventSourceManager.java | 6 ++--
.../source/informer/InformerManager.java | 4 +--
3 files changed, 30 insertions(+), 16 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index 12cb8de1d1..621fd0a450 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -5,6 +5,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -19,12 +20,15 @@ public class ExecutorServiceManager {
private static ExecutorServiceManager instance;
private final ExecutorService executor;
private final ExecutorService workflowExecutor;
+ private final ExecutorService ioBoundExecutor;
private final int terminationTimeoutSeconds;
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
int terminationTimeoutSeconds) {
this.executor = new InstrumentedExecutorService(executor);
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor);
+ this.ioBoundExecutor = new InstrumentedExecutorService(
+ Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
this.terminationTimeoutSeconds = terminationTimeoutSeconds;
}
@@ -76,8 +80,8 @@ public ExecutorService workflowExecutorService() {
*
* @param task task to run concurrently
*/
- public static void executeAndWaitForCompletion(Runnable task) {
- executeAndWaitForCompletion(task, instance().workflowExecutorService());
+ public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) {
+ executeAndWaitForCompletion(task, instance().ioBoundExecutor, threadNamePrefix);
}
/**
@@ -87,30 +91,40 @@ public static void executeAndWaitForCompletion(Runnable task) {
* @param task task to run concurrently
* @param executor ExecutorService used to run the task
*/
- public static void executeAndWaitForCompletion(Runnable task, ExecutorService executor) {
+ public static void executeAndWaitForCompletion(Runnable task, ExecutorService executor,
+ String threadNamePrefix) {
+ // change thread name for easier debugging
+ final var thread = Thread.currentThread();
+ final var name = thread.getName();
try {
+ thread.setName(threadNamePrefix + "-" + thread.getId());
executor.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new OperatorException("Couldn't execute task", e);
+ } finally {
+ // restore original name
+ thread.setName(name);
}
}
private void doStop() {
try {
log.debug("Closing executor");
- executor.shutdown();
- workflowExecutor.shutdown();
- if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
- workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything
- }
- if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
- executor.shutdownNow(); // if we timed out, waiting, cancel everything
- }
+ shutdown(executor);
+ shutdown(workflowExecutor);
+ shutdown(ioBoundExecutor);
} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
}
}
+ private void shutdown(ExecutorService executorService) throws InterruptedException {
+ executorService.shutdown();
+ if (!executorService.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
+ executorService.shutdownNow(); // if we timed out, waiting, cancel everything
+ }
+ }
+
private static class InstrumentedExecutorService implements ExecutorService {
private final boolean debug;
private final ExecutorService executor;
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
index 68c165aedf..e020bc71f6 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
@@ -72,12 +72,12 @@ public synchronized void start() {
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
.parallel()
- .forEach(this::startEventSource));
+ .forEach(this::startEventSource), "LowLevelEventSourceStart");
ExecutorServiceManager
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
- .parallel().forEach(this::startEventSource));
+ .parallel().forEach(this::startEventSource), "DefaultEventSourceStart");
}
@Override
@@ -85,7 +85,7 @@ public synchronized void stop() {
stopEventSource(eventSources.namedControllerResourceEventSource());
ExecutorServiceManager
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources().parallel()
- .forEach(this::stopEventSource));
+ .forEach(this::stopEventSource), "EventSourceStop");
eventSources.clear();
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
index 34a34d37e0..cf8990b13d 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
@@ -48,7 +48,7 @@ public class InformerManager sources.values().parallelStream().forEach(LifecycleAware::start));
+ () -> sources.values().parallelStream().forEach(LifecycleAware::start), "InformerStart");
}
void initSources(MixedOperation, Resource> client,
@@ -101,7 +101,7 @@ public void changeNamespaces(Set namespaces) {
log.debug("Registered new {} -> {} for namespace: {}", this, source,
ns);
}
- }));
+ }), "InformerStart");
}
From d03a9c275393787b8c1703b604f518555c5f965f Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Wed, 23 Nov 2022 14:33:17 +0100
Subject: [PATCH 07/15] fix: some operations just need to spawn, not be waited
on to finish
---
.../operator/ControllerManager.java | 18 ++++++++++------
.../api/config/ExecutorServiceManager.java | 13 ++++++++++++
.../processing/event/EventSourceManager.java | 21 +++++++++++--------
.../source/informer/InformerManager.java | 10 +++++----
4 files changed, 43 insertions(+), 19 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java
index 44aa430ec2..b766d3cfda 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java
@@ -8,6 +8,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.processing.Controller;
/**
@@ -32,20 +33,25 @@ public synchronized void shouldStart() {
}
public synchronized void start(boolean startEventProcessor) {
- controllers().parallelStream().forEach(c -> c.start(startEventProcessor));
+ ExecutorServiceManager.executeIOBoundTask(
+ () -> controllers().parallelStream().forEach(c -> c.start(startEventProcessor)),
+ "ControllerStart");
started = true;
}
public synchronized void stop() {
- controllers().parallelStream().forEach(closeable -> {
- log.debug("closing {}", closeable);
- closeable.stop();
- });
+ ExecutorServiceManager.executeAndWaitForCompletion(
+ () -> controllers().parallelStream().forEach(closeable -> {
+ log.debug("closing {}", closeable);
+ closeable.stop();
+ }), "ControllerStop");
started = false;
}
public synchronized void startEventProcessing() {
- controllers().parallelStream().forEach(Controller::startEventProcessing);
+ ExecutorServiceManager.executeIOBoundTask(
+ () -> controllers().parallelStream().forEach(Controller::startEventProcessing),
+ "ControllerEventProcessing");
}
@SuppressWarnings({"unchecked", "rawtypes"})
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index 621fd0a450..711d37a388 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -107,6 +107,19 @@ public static void executeAndWaitForCompletion(Runnable task, ExecutorService ex
}
}
+ public static void executeIOBoundTask(Runnable task, String threadNamePrefix) {
+ // change thread name for easier debugging
+ final var thread = Thread.currentThread();
+ final var name = thread.getName();
+ try {
+ thread.setName(threadNamePrefix + "-" + thread.getId());
+ instance().ioBoundExecutor.execute(task);
+ } finally {
+ // restore original name
+ thread.setName(name);
+ }
+ }
+
private void doStop() {
try {
log.debug("Closing executor");
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
index e020bc71f6..14bf7142f6 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
@@ -68,24 +68,27 @@ public synchronized void start() {
startEventSource(eventSources.namedControllerResourceEventSource());
// starting event sources on the workflow executor which shouldn't be used at this point
- ExecutorServiceManager
- .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
+ ExecutorServiceManager.executeAndWaitForCompletion(
+ () -> eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
.parallel()
- .forEach(this::startEventSource), "LowLevelEventSourceStart");
+ .forEach(this::startEventSource),
+ "LowLevelEventSourceStart");
- ExecutorServiceManager
- .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
+ ExecutorServiceManager.executeAndWaitForCompletion(
+ () -> eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
- .parallel().forEach(this::startEventSource), "DefaultEventSourceStart");
+ .parallel().forEach(this::startEventSource),
+ "DefaultEventSourceStart");
}
@Override
public synchronized void stop() {
stopEventSource(eventSources.namedControllerResourceEventSource());
- ExecutorServiceManager
- .executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources().parallel()
- .forEach(this::stopEventSource), "EventSourceStop");
+ ExecutorServiceManager.executeIOBoundTask(
+ () -> eventSources.additionalNamedEventSources().parallel()
+ .forEach(this::stopEventSource),
+ "EventSourceStop");
eventSources.clear();
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
index cf8990b13d..d2a7bf106c 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
@@ -47,8 +47,9 @@ public class InformerManager sources.values().parallelStream().forEach(LifecycleAware::start), "InformerStart");
+ ExecutorServiceManager.executeIOBoundTask(
+ () -> sources.values().parallelStream().forEach(LifecycleAware::start),
+ "InformerStart");
}
void initSources(MixedOperation, Resource> client,
@@ -117,7 +118,7 @@ private InformerWrapper createEventSource(
@Override
public void stop() {
- ExecutorServiceManager.instance().workflowExecutorService().execute(
+ ExecutorServiceManager.executeAndWaitForCompletion(
() -> {
log.info("Stopping {}", this);
sources.forEach((ns, source) -> {
@@ -128,7 +129,8 @@ public void stop() {
log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
}
});
- });
+ },
+ "StopInformer");
}
@Override
From a50e2a6afd7f6870053cf2e5d22e35b0e3bb24d4 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Wed, 23 Nov 2022 16:46:08 +0100
Subject: [PATCH 08/15] fix: if a stop handler is set, use it by default
---
.../operator/api/config/ConfigurationService.java | 5 ++++-
.../operator/api/config/ConfigurationServiceOverrider.java | 4 +++-
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
index 1fe9485588..07422dc7e9 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
@@ -164,9 +164,12 @@ default Optional getLeaderElectionConfiguration() {
* if false, the startup will ignore recoverable errors, caused for example by RBAC issues, and
* will try to reconnect periodically in the background.
*
+ *
+ * NOTE: this setting is ignored if an informer stop handler is set
+ *
*/
default boolean stopOnInformerErrorDuringStartup() {
- return true;
+ return getInformerStoppedHandler().isEmpty();
}
/**
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java
index 2b8aee9708..8d35527278 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java
@@ -188,7 +188,9 @@ public Optional getInformerStoppedHandler() {
@Override
public boolean stopOnInformerErrorDuringStartup() {
- return stopOnInformerErrorDuringStartup != null ? stopOnInformerErrorDuringStartup
+ // only stop on informer error if we didn't set a stop handler for the informers
+ return stopOnInformerErrorDuringStartup != null && informerStoppedHandler == null
+ ? stopOnInformerErrorDuringStartup
: super.stopOnInformerErrorDuringStartup();
}
From 407e9a574fa8008e09feaf29396f20e453e0dfea Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Wed, 23 Nov 2022 17:40:12 +0100
Subject: [PATCH 09/15] fix: use a new executor service for each task to run
---
.../operator/ControllerManager.java | 4 +-
.../api/config/ExecutorServiceManager.java | 43 ++++---------------
.../processing/event/EventSourceManager.java | 2 +-
.../source/informer/InformerManager.java | 17 +++++++-
.../source/informer/InformerWrapper.java | 6 ++-
.../event/EventSourceManagerTest.java | 5 ++-
.../informer/InformerEventSourceTest.java | 3 +-
7 files changed, 36 insertions(+), 44 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java
index b766d3cfda..095b059362 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java
@@ -33,7 +33,7 @@ public synchronized void shouldStart() {
}
public synchronized void start(boolean startEventProcessor) {
- ExecutorServiceManager.executeIOBoundTask(
+ ExecutorServiceManager.executeAndWaitForCompletion(
() -> controllers().parallelStream().forEach(c -> c.start(startEventProcessor)),
"ControllerStart");
started = true;
@@ -49,7 +49,7 @@ public synchronized void stop() {
}
public synchronized void startEventProcessing() {
- ExecutorServiceManager.executeIOBoundTask(
+ ExecutorServiceManager.executeAndWaitForCompletion(
() -> controllers().parallelStream().forEach(Controller::startEventProcessing),
"ControllerEventProcessing");
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index 711d37a388..5b96efec1f 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -20,15 +20,12 @@ public class ExecutorServiceManager {
private static ExecutorServiceManager instance;
private final ExecutorService executor;
private final ExecutorService workflowExecutor;
- private final ExecutorService ioBoundExecutor;
private final int terminationTimeoutSeconds;
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
int terminationTimeoutSeconds) {
this.executor = new InstrumentedExecutorService(executor);
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor);
- this.ioBoundExecutor = new InstrumentedExecutorService(
- Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
this.terminationTimeoutSeconds = terminationTimeoutSeconds;
}
@@ -75,30 +72,22 @@ public ExecutorService workflowExecutorService() {
}
/**
- * Runs the specified I/O-bound task and waits for its completion using the ExecutorService
- * provided by {@link #workflowExecutorService()}
+ * Runs the specified I/O-bound task and waits for its completion using the new ExecutorService
*
* @param task task to run concurrently
+ * @param threadNamePrefix the prefix with which to prefix thread names when tasks are run this
+ * way
*/
public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) {
- executeAndWaitForCompletion(task, instance().ioBoundExecutor, threadNamePrefix);
- }
-
- /**
- * Executes the specified I/O-bound task using the specified ExecutorService and waits for its
- * completion for at most {@link #terminationTimeoutSeconds} seconds.
- *
- * @param task task to run concurrently
- * @param executor ExecutorService used to run the task
- */
- public static void executeAndWaitForCompletion(Runnable task, ExecutorService executor,
- String threadNamePrefix) {
+ final var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ ExecutorService instrumented = new InstrumentedExecutorService(executor);
// change thread name for easier debugging
final var thread = Thread.currentThread();
final var name = thread.getName();
try {
thread.setName(threadNamePrefix + "-" + thread.getId());
- executor.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS);
+ instrumented.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS);
+ shutdown(instrumented);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new OperatorException("Couldn't execute task", e);
} finally {
@@ -107,33 +96,19 @@ public static void executeAndWaitForCompletion(Runnable task, ExecutorService ex
}
}
- public static void executeIOBoundTask(Runnable task, String threadNamePrefix) {
- // change thread name for easier debugging
- final var thread = Thread.currentThread();
- final var name = thread.getName();
- try {
- thread.setName(threadNamePrefix + "-" + thread.getId());
- instance().ioBoundExecutor.execute(task);
- } finally {
- // restore original name
- thread.setName(name);
- }
- }
-
private void doStop() {
try {
log.debug("Closing executor");
shutdown(executor);
shutdown(workflowExecutor);
- shutdown(ioBoundExecutor);
} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
}
}
- private void shutdown(ExecutorService executorService) throws InterruptedException {
+ private static void shutdown(ExecutorService executorService) throws InterruptedException {
executorService.shutdown();
- if (!executorService.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
+ if (!executorService.awaitTermination(instance().terminationTimeoutSeconds, TimeUnit.SECONDS)) {
executorService.shutdownNow(); // if we timed out, waiting, cancel everything
}
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
index 14bf7142f6..2d16c0f87e 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
@@ -85,7 +85,7 @@ public synchronized void start() {
@Override
public synchronized void stop() {
stopEventSource(eventSources.namedControllerResourceEventSource());
- ExecutorServiceManager.executeIOBoundTask(
+ ExecutorServiceManager.executeAndWaitForCompletion(
() -> eventSources.additionalNamedEventSources().parallel()
.forEach(this::stopEventSource),
"EventSourceStop");
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
index d2a7bf106c..7f8f022378 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
@@ -47,8 +47,21 @@ public class InformerManager sources.values().parallelStream().forEach(LifecycleAware::start),
+ ExecutorServiceManager.executeAndWaitForCompletion(
+ () -> sources.values().parallelStream().forEach(source -> {
+ // change thread name for easier debugging
+ final var thread = Thread.currentThread();
+ final var name = thread.getName();
+ try {
+ thread.setName(source.informerInfo() + " " + thread.getId());
+ source.start();
+ } catch (Exception e) {
+ throw new OperatorException("Couldn't start informer: " + source, e);
+ } finally {
+ // restore original name
+ thread.setName(name);
+ }
+ }),
"InformerStart");
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
index 601cb0c10c..4535aa6bc5 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
@@ -143,6 +143,10 @@ public List byIndex(String indexName, String indexKey) {
@Override
public String toString() {
- return "InformerWrapper [" + versionedFullResourceName() + "] (" + informer + ')';
+ return informerInfo() + " (" + informer + ')';
+ }
+
+ String informerInfo() {
+ return "InformerWrapper [" + versionedFullResourceName() + "]";
}
}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java
index ab345145da..51a4b597f7 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java
@@ -4,6 +4,7 @@
import org.junit.jupiter.api.Test;
+import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.MockKubernetesClient;
import io.javaoperatorsdk.operator.OperatorException;
@@ -171,9 +172,9 @@ void changesNamespacesOnControllerAndInformerEventSources() {
}
private EventSourceManager initManager() {
- final var configuration = MockControllerConfiguration.forResource(HasMetadata.class);
+ final var configuration = MockControllerConfiguration.forResource(ConfigMap.class);
final Controller controller = new Controller(mock(Reconciler.class), configuration,
- MockKubernetesClient.client(HasMetadata.class));
+ MockKubernetesClient.client(ConfigMap.class));
return new EventSourceManager(controller);
}
}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
index 8b5c0202c0..a01350d18f 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
@@ -19,7 +19,6 @@
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
@@ -259,7 +258,7 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
// by default informer fails to start if there is an exception in the client on start.
// Throws the exception further.
- assertThrows(RuntimeException.class, () -> informerEventSource.start());
+ informerEventSource.start();
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
} finally {
ConfigurationServiceProvider.reset();
From de6079c53e0510d9b898143f3ba9c0c032a1cda5 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Wed, 23 Nov 2022 18:12:51 +0100
Subject: [PATCH 10/15] fix: use cache sync timeout instead of termination one
---
.../operator/api/config/ExecutorServiceManager.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index 5b96efec1f..8cd21cb565 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -86,7 +86,9 @@ public static void executeAndWaitForCompletion(Runnable task, String threadNameP
final var name = thread.getName();
try {
thread.setName(threadNamePrefix + "-" + thread.getId());
- instrumented.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS);
+ instrumented.submit(task)
+ .get(ConfigurationServiceProvider.instance().cacheSyncTimeout().toSeconds(),
+ TimeUnit.SECONDS);
shutdown(instrumented);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new OperatorException("Couldn't execute task", e);
From 06a7e13981742aa474b75bfe7a1a04836a8389e8 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Wed, 23 Nov 2022 22:01:28 +0100
Subject: [PATCH 11/15] fix: controllers starting is not io-bound per se
---
.../operator/ControllerManager.java | 18 ++++++------------
.../operator/processing/Controller.java | 2 +-
2 files changed, 7 insertions(+), 13 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java
index 095b059362..44aa430ec2 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java
@@ -8,7 +8,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.processing.Controller;
/**
@@ -33,25 +32,20 @@ public synchronized void shouldStart() {
}
public synchronized void start(boolean startEventProcessor) {
- ExecutorServiceManager.executeAndWaitForCompletion(
- () -> controllers().parallelStream().forEach(c -> c.start(startEventProcessor)),
- "ControllerStart");
+ controllers().parallelStream().forEach(c -> c.start(startEventProcessor));
started = true;
}
public synchronized void stop() {
- ExecutorServiceManager.executeAndWaitForCompletion(
- () -> controllers().parallelStream().forEach(closeable -> {
- log.debug("closing {}", closeable);
- closeable.stop();
- }), "ControllerStop");
+ controllers().parallelStream().forEach(closeable -> {
+ log.debug("closing {}", closeable);
+ closeable.stop();
+ });
started = false;
}
public synchronized void startEventProcessing() {
- ExecutorServiceManager.executeAndWaitForCompletion(
- () -> controllers().parallelStream().forEach(Controller::startEventProcessing),
- "ControllerEventProcessing");
+ controllers().parallelStream().forEach(Controller::startEventProcessing);
}
@SuppressWarnings({"unchecked", "rawtypes"})
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
index a34c4b38f1..ca27ad4f2c 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
@@ -360,8 +360,8 @@ public void changeNamespaces(Set namespaces) {
}
public synchronized void startEventProcessing() {
- log.info("Started event processing for controller: {}", configuration.getName());
eventProcessor.start();
+ log.info("Started event processing for controller: {}", configuration.getName());
}
private void throwMissingCRDException(String crdName, String specVersion, String controllerName) {
From 47a8891a2237f81bb9a43b99b9d09196c1a39a21 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Thu, 24 Nov 2022 09:51:14 +0100
Subject: [PATCH 12/15] revert: fix: if a stop handler is set, use it by
default
This reverts commit 7d007db5fb4636637e782a5b48ca3dc2a616022e.
---
.../operator/api/config/ConfigurationService.java | 5 +----
.../operator/api/config/ConfigurationServiceOverrider.java | 4 +---
.../event/source/informer/InformerEventSourceTest.java | 4 +++-
3 files changed, 5 insertions(+), 8 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
index 07422dc7e9..1fe9485588 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
@@ -164,12 +164,9 @@ default Optional getLeaderElectionConfiguration() {
* if false, the startup will ignore recoverable errors, caused for example by RBAC issues, and
* will try to reconnect periodically in the background.
*
- *
- * NOTE: this setting is ignored if an informer stop handler is set
- *
*/
default boolean stopOnInformerErrorDuringStartup() {
- return getInformerStoppedHandler().isEmpty();
+ return true;
}
/**
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java
index 8d35527278..2b8aee9708 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java
@@ -188,9 +188,7 @@ public Optional getInformerStoppedHandler() {
@Override
public boolean stopOnInformerErrorDuringStartup() {
- // only stop on informer error if we didn't set a stop handler for the informers
- return stopOnInformerErrorDuringStartup != null && informerStoppedHandler == null
- ? stopOnInformerErrorDuringStartup
+ return stopOnInformerErrorDuringStartup != null ? stopOnInformerErrorDuringStartup
: super.stopOnInformerErrorDuringStartup();
}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
index a01350d18f..a33658415e 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
@@ -10,6 +10,7 @@
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.MockKubernetesClient;
+import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
@@ -19,6 +20,7 @@
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
@@ -258,7 +260,7 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
// by default informer fails to start if there is an exception in the client on start.
// Throws the exception further.
- informerEventSource.start();
+ assertThrows(OperatorException.class, () -> informerEventSource.start());
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
} finally {
ConfigurationServiceProvider.reset();
From 09bd45cad6feecff65ffdb5a8fb648933f367cd5 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Thu, 24 Nov 2022 10:19:33 +0100
Subject: [PATCH 13/15] fix: re-interrupt thread on InterruptedException
---
.../operator/api/config/ExecutorServiceManager.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index 8cd21cb565..c8932ddeb5 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -90,7 +90,10 @@ public static void executeAndWaitForCompletion(Runnable task, String threadNameP
.get(ConfigurationServiceProvider.instance().cacheSyncTimeout().toSeconds(),
TimeUnit.SECONDS);
shutdown(instrumented);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ } catch (InterruptedException e) {
+ thread.interrupt();
+ throw new OperatorException("Couldn't execute task", e);
+ } catch (ExecutionException | TimeoutException e) {
throw new OperatorException("Couldn't execute task", e);
} finally {
// restore original name
@@ -105,6 +108,7 @@ private void doStop() {
shutdown(workflowExecutor);
} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
+ Thread.currentThread().interrupt();
}
}
From 824c192491e71c2de0d6734a22bcfe1bb18c6cd6 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Thu, 24 Nov 2022 12:39:08 +0100
Subject: [PATCH 14/15] fix: informers are already started asynchronously
---
.../source/informer/InformerManager.java | 65 +++++++------------
.../source/informer/InformerWrapper.java | 9 ++-
2 files changed, 30 insertions(+), 44 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
index 7f8f022378..801e3e3778 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
@@ -24,7 +24,6 @@
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
-import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -47,22 +46,7 @@ public class InformerManager sources.values().parallelStream().forEach(source -> {
- // change thread name for easier debugging
- final var thread = Thread.currentThread();
- final var name = thread.getName();
- try {
- thread.setName(source.informerInfo() + " " + thread.getId());
- source.start();
- } catch (Exception e) {
- throw new OperatorException("Couldn't start informer: " + source, e);
- } finally {
- // restore original name
- thread.setName(name);
- }
- }),
- "InformerStart");
+ sources.values().parallelStream().forEach(InformerWrapper::start);
}
void initSources(MixedOperation, Resource> client,
@@ -103,19 +87,18 @@ public void changeNamespaces(Set namespaces) {
log.debug("Stopped informer {} for namespaces: {}", this, sourcesToRemove);
sourcesToRemove.forEach(k -> sources.remove(k).stop());
- ExecutorServiceManager.executeAndWaitForCompletion(
- () -> namespaces.forEach(ns -> {
- if (!sources.containsKey(ns)) {
- final var source =
- createEventSource(
- client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()),
- eventHandler, ns);
- source.addIndexers(this.indexers);
- source.start();
- log.debug("Registered new {} -> {} for namespace: {}", this, source,
- ns);
- }
- }), "InformerStart");
+ namespaces.forEach(ns -> {
+ if (!sources.containsKey(ns)) {
+ final var source =
+ createEventSource(
+ client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()),
+ eventHandler, ns);
+ source.addIndexers(this.indexers);
+ source.start();
+ log.debug("Registered new {} -> {} for namespace: {}", this, source,
+ ns);
+ }
+ });
}
@@ -131,19 +114,15 @@ private InformerWrapper createEventSource(
@Override
public void stop() {
- ExecutorServiceManager.executeAndWaitForCompletion(
- () -> {
- log.info("Stopping {}", this);
- sources.forEach((ns, source) -> {
- try {
- log.debug("Stopping informer for namespace: {} -> {}", ns, source);
- source.stop();
- } catch (Exception e) {
- log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
- }
- });
- },
- "StopInformer");
+ log.info("Stopping {}", this);
+ sources.forEach((ns, source) -> {
+ try {
+ log.debug("Stopping informer for namespace: {} -> {}", ns, source);
+ source.stop();
+ } catch (Exception e) {
+ log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
+ }
+ });
}
@Override
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
index 4535aa6bc5..de81815a96 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
@@ -64,7 +64,11 @@ public void start() throws OperatorException {
if (!configService.stopOnInformerErrorDuringStartup()) {
informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t));
}
+ // change thread name for easier debugging
+ final var thread = Thread.currentThread();
+ final var name = thread.getName();
try {
+ thread.setName(informerInfo() + " " + thread.getId());
var start = informer.start();
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
// false, and there is a rbac issue the get never returns; therefore operator never really
@@ -81,6 +85,9 @@ public void start() throws OperatorException {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
+ } finally {
+ // restore original name
+ thread.setName(name);
}
} catch (Exception e) {
@@ -146,7 +153,7 @@ public String toString() {
return informerInfo() + " (" + informer + ')';
}
- String informerInfo() {
+ private String informerInfo() {
return "InformerWrapper [" + versionedFullResourceName() + "]";
}
}
From 83c828b11f68289a414ff8a065072fa20a046fa5 Mon Sep 17 00:00:00 2001
From: Chris Laprun
Date: Thu, 24 Nov 2022 17:58:53 +0100
Subject: [PATCH 15/15] fix: make sure we run each task on a different thread
---
.../api/config/ExecutorServiceManager.java | 60 +++++++++----------
.../processing/event/EventSourceManager.java | 45 ++++++++------
.../processing/event/NamedEventSource.java | 7 +++
.../source/informer/InformerManager.java | 1 -
4 files changed, 62 insertions(+), 51 deletions(-)
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index c8932ddeb5..c4193b9a2a 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -9,12 +9,13 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.javaoperatorsdk.operator.OperatorException;
-
public class ExecutorServiceManager {
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
private static ExecutorServiceManager instance;
@@ -63,6 +64,31 @@ public synchronized static ExecutorServiceManager instance() {
return instance;
}
+ public static void executeAndWaitForAllToComplete(Stream stream,
+ Function task, Function threadNamer) {
+ final var instrumented = new InstrumentedExecutorService(
+ Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
+
+ try {
+ instrumented.invokeAll(stream.parallel().map(item -> (Callable) () -> {
+ // change thread name for easier debugging
+ final var thread = Thread.currentThread();
+ final var name = thread.getName();
+ thread.setName(threadNamer.apply(item));
+ try {
+ task.apply(item);
+ return null;
+ } finally {
+ // restore original name
+ thread.setName(name);
+ }
+ }).collect(Collectors.toList()));
+ shutdown(instrumented);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
public ExecutorService executorService() {
return executor;
}
@@ -71,36 +97,6 @@ public ExecutorService workflowExecutorService() {
return workflowExecutor;
}
- /**
- * Runs the specified I/O-bound task and waits for its completion using the new ExecutorService
- *
- * @param task task to run concurrently
- * @param threadNamePrefix the prefix with which to prefix thread names when tasks are run this
- * way
- */
- public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) {
- final var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
- ExecutorService instrumented = new InstrumentedExecutorService(executor);
- // change thread name for easier debugging
- final var thread = Thread.currentThread();
- final var name = thread.getName();
- try {
- thread.setName(threadNamePrefix + "-" + thread.getId());
- instrumented.submit(task)
- .get(ConfigurationServiceProvider.instance().cacheSyncTimeout().toSeconds(),
- TimeUnit.SECONDS);
- shutdown(instrumented);
- } catch (InterruptedException e) {
- thread.interrupt();
- throw new OperatorException("Couldn't execute task", e);
- } catch (ExecutionException | TimeoutException e) {
- throw new OperatorException("Couldn't execute task", e);
- } finally {
- // restore original name
- thread.setName(name);
- }
- }
-
private void doStop() {
try {
log.debug("Closing executor");
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
index 2d16c0f87e..8e9c981031 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
@@ -4,6 +4,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -67,28 +68,34 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
public synchronized void start() {
startEventSource(eventSources.namedControllerResourceEventSource());
- // starting event sources on the workflow executor which shouldn't be used at this point
- ExecutorServiceManager.executeAndWaitForCompletion(
- () -> eventSources.additionalNamedEventSources()
- .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
- .parallel()
- .forEach(this::startEventSource),
- "LowLevelEventSourceStart");
+ ExecutorServiceManager.executeAndWaitForAllToComplete(
+ eventSources.additionalNamedEventSources()
+ .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)),
+ this::startEventSource,
+ getThreadNamer("start"));
- ExecutorServiceManager.executeAndWaitForCompletion(
- () -> eventSources.additionalNamedEventSources()
- .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
- .parallel().forEach(this::startEventSource),
- "DefaultEventSourceStart");
+ ExecutorServiceManager.executeAndWaitForAllToComplete(
+ eventSources.additionalNamedEventSources()
+ .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)),
+ this::startEventSource,
+ getThreadNamer("start"));
+ }
+
+ private static Function getThreadNamer(String stage) {
+ return es -> {
+ final var name = es.name();
+ return es.priority() + " " + stage + " -> "
+ + (es.isNameSet() ? name + " " + es.original().getClass().getSimpleName() : name);
+ };
}
@Override
public synchronized void stop() {
stopEventSource(eventSources.namedControllerResourceEventSource());
- ExecutorServiceManager.executeAndWaitForCompletion(
- () -> eventSources.additionalNamedEventSources().parallel()
- .forEach(this::stopEventSource),
- "EventSourceStop");
+ ExecutorServiceManager.executeAndWaitForAllToComplete(
+ eventSources.additionalNamedEventSources(),
+ this::stopEventSource,
+ getThreadNamer("stop"));
eventSources.clear();
}
@@ -105,7 +112,7 @@ private void logEventSourceEvent(NamedEventSource eventSource, String event) {
}
}
- private void startEventSource(NamedEventSource eventSource) {
+ private Void startEventSource(NamedEventSource eventSource) {
try {
logEventSourceEvent(eventSource, "Starting");
eventSource.start();
@@ -115,9 +122,10 @@ private void startEventSource(NamedEventSource eventSource) {
} catch (Exception e) {
throw new OperatorException("Couldn't start source " + eventSource.name(), e);
}
+ return null;
}
- private void stopEventSource(NamedEventSource eventSource) {
+ private Void stopEventSource(NamedEventSource eventSource) {
try {
logEventSourceEvent(eventSource, "Stopping");
eventSource.stop();
@@ -125,6 +133,7 @@ private void stopEventSource(NamedEventSource eventSource) {
} catch (Exception e) {
log.warn("Error closing {} -> {}", eventSource.name(), e);
}
+ return null;
}
public final void registerEventSource(EventSource eventSource) throws OperatorException {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java
index 13d5a10323..4787e675c9 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java
@@ -4,6 +4,7 @@
import java.util.Optional;
import io.javaoperatorsdk.operator.OperatorException;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.processing.event.source.Configurable;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
@@ -13,10 +14,12 @@ class NamedEventSource implements EventSource, EventSourceMetadata {
private final EventSource original;
private final String name;
+ private final boolean nameSet;
NamedEventSource(EventSource original, String name) {
this.original = original;
this.name = name;
+ nameSet = !name.equals(EventSourceInitializer.generateNameFor(original));
}
@Override
@@ -95,4 +98,8 @@ public int hashCode() {
public EventSourceStartPriority priority() {
return original.priority();
}
+
+ public boolean isNameSet() {
+ return nameSet;
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
index 801e3e3778..ba7aa9f67f 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
@@ -114,7 +114,6 @@ private InformerWrapper createEventSource(
@Override
public void stop() {
- log.info("Stopping {}", this);
sources.forEach((ns, source) -> {
try {
log.debug("Stopping informer for namespace: {} -> {}", ns, source);