Skip to content

Cloner interface for Custom Resource instead of ObjectMapper #611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.javaoperatorsdk.operator.api.config;

import io.fabric8.kubernetes.client.CustomResource;

public interface Cloner {

CustomResource<?, ?> clone(CustomResource<?, ?> object);

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,24 @@
import io.javaoperatorsdk.operator.Metrics;
import io.javaoperatorsdk.operator.api.ResourceController;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

/** An interface from which to retrieve configuration information. */
public interface ConfigurationService {

ObjectMapper OBJECT_MAPPER = new ObjectMapper();
Cloner DEFAULT_CLONER = new Cloner() {
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Override
public CustomResource<?, ?> clone(CustomResource<?, ?> object) {
try {
return OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsString(object), object.getClass());
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
}
};

/**
* Retrieves the configuration associated with the specified controller
Expand Down Expand Up @@ -79,14 +91,12 @@ default int concurrentReconciliationThreads() {
}

/**
* The {@link ObjectMapper} that the operator should use to de-/serialize resources. This is
* particularly useful when frameworks can configure a specific mapper that should also be used by
* the SDK.
*
* Used to clone custom resources.
*
* @return the ObjectMapper to use
*/
default ObjectMapper getObjectMapper() {
return OBJECT_MAPPER;
default Cloner getResourceCloner() {
return DEFAULT_CLONER;
}

int DEFAULT_TERMINATION_TIMEOUT_SECONDS = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@
import io.javaoperatorsdk.operator.Metrics;
import io.javaoperatorsdk.operator.api.ResourceController;

import com.fasterxml.jackson.databind.ObjectMapper;

public class ConfigurationServiceOverrider {
private final ConfigurationService original;
private Metrics metrics;
private Config clientConfig;
private boolean checkCR;
private int threadNumber;
private ObjectMapper mapper;
private Cloner cloner;
private int timeoutSeconds;

public ConfigurationServiceOverrider(
Expand All @@ -24,7 +22,7 @@ public ConfigurationServiceOverrider(
this.clientConfig = original.getClientConfiguration();
this.checkCR = original.checkCRDAndValidateLocalModel();
this.threadNumber = original.concurrentReconciliationThreads();
this.mapper = original.getObjectMapper();
this.cloner = original.getResourceCloner();
this.timeoutSeconds = original.getTerminationTimeoutSeconds();
this.metrics = original.getMetrics();
}
Expand All @@ -45,8 +43,8 @@ public ConfigurationServiceOverrider withConcurrentReconciliationThreads(int thr
return this;
}

public ConfigurationServiceOverrider withObjectMapper(ObjectMapper mapper) {
this.mapper = mapper;
public ConfigurationServiceOverrider withResourceCloner(Cloner cloner) {
this.cloner = cloner;
return this;
}

Expand Down Expand Up @@ -94,8 +92,8 @@ public int concurrentReconciliationThreads() {
}

@Override
public ObjectMapper getObjectMapper() {
return mapper;
public Cloner getResourceCloner() {
return cloner;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;

import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;

/**
* Event handler that makes sure that events are processed in a "single threaded" way per resource
Expand Down Expand Up @@ -188,33 +189,34 @@ void eventProcessingFinished(
if (!running) {
return;
}

CustomResourceID customResourceID = executionScope.getCustomResourceID();
log.debug(
"Event processing finished. Scope: {}, PostExecutionControl: {}",
executionScope,
postExecutionControl);
unsetUnderExecution(executionScope.getCustomResourceID());
unsetUnderExecution(customResourceID);

// If a delete event present at this phase, it was received during reconciliation.
// So we either removed the finalizer during reconciliation or we don't use finalizers.
// Either way we don't want to retry.
if (retry != null && postExecutionControl.exceptionDuringExecution() &&
!eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) {
if (isRetryConfigured() && postExecutionControl.exceptionDuringExecution() &&
!eventMarker.deleteEventPresent(customResourceID)) {
handleRetryOnException(executionScope);
// todo revisit monitoring since events are not present anymore
// final var monitor = monitor(); executionScope.getEvents().forEach(e ->
// monitor.failedEvent(executionScope.getCustomResourceID(), e));
return;
}

if (retry != null) {
handleSuccessfulExecutionRegardingRetry(executionScope);
}
if (eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) {
cleanupOnSuccessfulExecution(executionScope);
if (eventMarker.deleteEventPresent(customResourceID)) {
cleanupForDeletedEvent(executionScope.getCustomResourceID());
} else {
if (eventMarker.eventPresent(executionScope.getCustomResourceID())) {
submitReconciliationExecution(executionScope.getCustomResourceID());
if (eventMarker.eventPresent(customResourceID)) {
if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) {
submitReconciliationExecution(customResourceID);
} else {
postponeReconciliationAndHandleCacheSyncEvent(customResourceID);
}
} else {
reScheduleExecutionIfInstructed(postExecutionControl,
executionScope.getCustomResource());
Expand All @@ -225,10 +227,38 @@ void eventProcessingFinished(
}
}

private void postponeReconciliationAndHandleCacheSyncEvent(CustomResourceID customResourceID) {
eventSourceManager.getCustomResourceEventSource().whitelistNextEvent(customResourceID);
}

private boolean isCacheReadyForInstantReconciliation(ExecutionScope<R> executionScope,
PostExecutionControl<R> postExecutionControl) {
if (!postExecutionControl.customResourceUpdatedDuringExecution()) {
return true;
}
String originalResourceVersion = getVersion(executionScope.getCustomResource());
String customResourceVersionAfterExecution = getVersion(postExecutionControl
.getUpdatedCustomResource().get());
String cachedCustomResourceVersion = getVersion(resourceCache
.getCustomResource(executionScope.getCustomResourceID()).get());

if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) {
return true;
}
if (cachedCustomResourceVersion.equals(originalResourceVersion)) {
return false;
}
// If the cached resource version equals neither the version before of after execution
// probably an update happened on the custom resource independent of the framework during
// reconciliation. We cannot tell at this point if it happened before our update or before.
// (Well we could if we would parse resource version, but that should not be done by definition)
return true;
}

private void reScheduleExecutionIfInstructed(PostExecutionControl<R> postExecutionControl,
R customResource) {
postExecutionControl.getReScheduleDelay().ifPresent(delay -> eventSourceManager
.getRetryTimerEventSource()
.getRetryAndRescheduleTimerEventSource()
.scheduleOnce(customResource, delay));
}

Expand Down Expand Up @@ -258,19 +288,21 @@ private void handleRetryOnException(ExecutionScope<R> executionScope) {
delay,
customResourceID);
eventSourceManager
.getRetryTimerEventSource()
.getRetryAndRescheduleTimerEventSource()
.scheduleOnce(executionScope.getCustomResource(), delay);
},
() -> log.error("Exhausted retries for {}", executionScope));
}

private void handleSuccessfulExecutionRegardingRetry(ExecutionScope<R> executionScope) {
private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
log.debug(
"Marking successful execution for resource: {}",
"Cleanup for successful execution for resource: {}",
getName(executionScope.getCustomResource()));
retryState.remove(executionScope.getCustomResourceID());
if (isRetryConfigured()) {
retryState.remove(executionScope.getCustomResourceID());
}
eventSourceManager
.getRetryTimerEventSource()
.getRetryAndRescheduleTimerEventSource()
.cancelOnceSchedule(executionScope.getCustomResourceID());
}

Expand Down Expand Up @@ -300,6 +332,10 @@ private void unsetUnderExecution(CustomResourceID customResourceUid) {
underProcessing.remove(customResourceUid);
}

private boolean isRetryConfigured() {
return retry != null;
}

@Override
public void close() {
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,12 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(name, namespace);
}

@Override
public String toString() {
return "CustomResourceID{" +
"name='" + name + '\'' +
", namespace='" + namespace + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class DefaultEventSourceManager<R extends CustomResource<?, ?>>
private final ReentrantLock lock = new ReentrantLock();
private final Set<EventSource> eventSources = Collections.synchronizedSet(new HashSet<>());
private DefaultEventHandler<R> defaultEventHandler;
private TimerEventSource<R> retryTimerEventSource;
private TimerEventSource<R> retryAndRescheduleTimerEventSource;
private CustomResourceEventSource customResourceEventSource;

DefaultEventSourceManager(DefaultEventHandler<R> defaultEventHandler) {
Expand All @@ -39,8 +39,8 @@ private void init(DefaultEventHandler<R> defaultEventHandler) {
this.defaultEventHandler = defaultEventHandler;
defaultEventHandler.setEventSourceManager(this);

this.retryTimerEventSource = new TimerEventSource<>();
registerEventSource(retryTimerEventSource);
this.retryAndRescheduleTimerEventSource = new TimerEventSource<>();
registerEventSource(retryAndRescheduleTimerEventSource);
}

@Override
Expand Down Expand Up @@ -98,8 +98,8 @@ public void cleanupForCustomResource(CustomResourceID customResourceUid) {
}
}

public TimerEventSource getRetryTimerEventSource() {
return retryTimerEventSource;
public TimerEventSource getRetryAndRescheduleTimerEventSource() {
return retryAndRescheduleTimerEventSource;
}

@Override
Expand All @@ -108,7 +108,7 @@ public Set<EventSource> getRegisteredEventSources() {
}

@Override
public CustomResourceEventSource getCustomResourceEventSource() {
public CustomResourceEventSource<R> getCustomResourceEventSource() {
return customResourceEventSource;
}

Expand Down
Loading