Skip to content

feat: reconcile-all-event mode #2894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: next
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions notes.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- check that Cleaner interface is not present
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,14 @@ private <P extends HasMetadata> ResolvedControllerConfiguration<P> controllerCon
final var dependentFieldManager =
fieldManager.equals(CONTROLLER_NAME_AS_FIELD_MANAGER) ? name : fieldManager;

var controllerMode = annotation == null ? ControllerMode.DEFAULT : annotation.controllerMode();

InformerConfiguration<P> informerConfig =
InformerConfiguration.builder(resourceClass)
.initFromAnnotation(annotation != null ? annotation.informer() : null, context)
.buildForController();

return new ResolvedControllerConfiguration<P>(
return new ResolvedControllerConfiguration<>(
name,
generationAware,
associatedReconcilerClass,
Expand All @@ -315,7 +317,8 @@ private <P extends HasMetadata> ResolvedControllerConfiguration<P> controllerCon
null,
dependentFieldManager,
this,
informerConfig);
informerConfig,
controllerMode);
}

protected boolean createIfNeeded() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,12 @@ default String fieldManager() {
}

<C> C getConfigurationFor(DependentResourceSpec<?, P, C> spec);

default ControllerMode getControllerMode() {
return ControllerMode.DEFAULT;
}

default boolean isAllEventReconcileMode() {
return getControllerMode() == ControllerMode.ALL_EVENT_RECONCILE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class ControllerConfigurationOverrider<R extends HasMetadata> {
private Duration reconciliationMaxInterval;
private Map<DependentResourceSpec, Object> configurations;
private final InformerConfiguration<R>.Builder config;
private ControllerMode controllerMode;

private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
this.finalizer = original.getFinalizerName();
Expand All @@ -42,6 +43,7 @@ private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
this.rateLimiter = original.getRateLimiter();
this.name = original.getName();
this.fieldManager = original.fieldManager();
this.controllerMode = original.getControllerMode();
}

public ControllerConfigurationOverrider<R> withFinalizer(String finalizer) {
Expand Down Expand Up @@ -154,6 +156,11 @@ public ControllerConfigurationOverrider<R> withFieldManager(String dependentFiel
return this;
}

public ControllerConfigurationOverrider<R> withControllerMode(ControllerMode controllerMode) {
this.controllerMode = controllerMode;
return this;
}

/**
* Sets a max page size limit when starting the informer. This will result in pagination while
* populating the cache. This means that longer lists will take multiple requests to fetch. See
Expand Down Expand Up @@ -198,6 +205,7 @@ public ControllerConfiguration<R> build() {
fieldManager,
original.getConfigurationService(),
config.buildForController(),
controllerMode,
original.getWorkflowSpec().orElse(null));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.javaoperatorsdk.operator.api.config;

public enum ControllerMode {
DEFAULT,
ALL_EVENT_RECONCILE
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class ResolvedControllerConfiguration<P extends HasMetadata>
private final ConfigurationService configurationService;
private final String fieldManager;
private WorkflowSpec workflowSpec;
private ControllerMode controllerMode;

public ResolvedControllerConfiguration(ControllerConfiguration<P> other) {
this(
Expand All @@ -44,6 +45,7 @@ public ResolvedControllerConfiguration(ControllerConfiguration<P> other) {
other.fieldManager(),
other.getConfigurationService(),
other.getInformerConfig(),
other.getControllerMode(),
other.getWorkflowSpec().orElse(null));
}

Expand All @@ -59,6 +61,7 @@ public ResolvedControllerConfiguration(
String fieldManager,
ConfigurationService configurationService,
InformerConfiguration<P> informerConfig,
ControllerMode controllerMode,
WorkflowSpec workflowSpec) {
this(
name,
Expand All @@ -71,7 +74,8 @@ public ResolvedControllerConfiguration(
configurations,
fieldManager,
configurationService,
informerConfig);
informerConfig,
controllerMode);
setWorkflowSpec(workflowSpec);
}

Expand All @@ -86,7 +90,8 @@ protected ResolvedControllerConfiguration(
Map<DependentResourceSpec, Object> configurations,
String fieldManager,
ConfigurationService configurationService,
InformerConfiguration<P> informerConfig) {
InformerConfiguration<P> informerConfig,
ControllerMode controllerMode) {
this.informerConfig = informerConfig;
this.configurationService = configurationService;
this.name = ControllerConfiguration.ensureValidName(name, associatedReconcilerClassName);
Expand All @@ -99,6 +104,7 @@ protected ResolvedControllerConfiguration(
this.finalizer =
ControllerConfiguration.ensureValidFinalizerName(finalizer, getResourceTypeName());
this.fieldManager = fieldManager;
this.controllerMode = controllerMode;
}

protected ResolvedControllerConfiguration(
Expand All @@ -117,7 +123,8 @@ protected ResolvedControllerConfiguration(
null,
null,
configurationService,
InformerConfiguration.builder(resourceClass).buildForController());
InformerConfiguration.builder(resourceClass).buildForController(),
null);
}

@Override
Expand Down Expand Up @@ -207,4 +214,9 @@ public <C> C getConfigurationFor(DependentResourceSpec<?, P, C> spec) {
public String fieldManager() {
return fieldManager;
}

@Override
public ControllerMode getControllerMode() {
return controllerMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,8 @@ default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
* @return {@code true} is another reconciliation is already scheduled, {@code false} otherwise
*/
boolean isNextReconciliationImminent();

boolean isDeleteEventPresent();

boolean isDeleteFinalStateUnknown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import io.javaoperatorsdk.operator.api.config.ControllerMode;
import io.javaoperatorsdk.operator.api.config.informer.Informer;
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
Expand Down Expand Up @@ -77,4 +78,6 @@ MaxReconciliationInterval maxReconciliationInterval() default
* @return the name used as field manager for SSA operations
*/
String fieldManager() default CONTROLLER_NAME_AS_FIELD_MANAGER;

ControllerMode controllerMode() default ControllerMode.DEFAULT;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,21 @@ public class DefaultContext<P extends HasMetadata> implements Context<P> {
private final ControllerConfiguration<P> controllerConfiguration;
private final DefaultManagedWorkflowAndDependentResourceContext<P>
defaultManagedDependentResourceContext;

public DefaultContext(RetryInfo retryInfo, Controller<P> controller, P primaryResource) {
private final boolean isDeleteEventPresent;
private final boolean isDeleteFinalStateUnknown;

public DefaultContext(
RetryInfo retryInfo,
Controller<P> controller,
P primaryResource,
boolean isDeleteEventPresent,
boolean isDeleteFinalStateUnknown) {
this.retryInfo = retryInfo;
this.controller = controller;
this.primaryResource = primaryResource;
this.controllerConfiguration = controller.getConfiguration();
this.isDeleteEventPresent = isDeleteEventPresent;
this.isDeleteFinalStateUnknown = isDeleteFinalStateUnknown;
this.defaultManagedDependentResourceContext =
new DefaultManagedWorkflowAndDependentResourceContext<>(controller, primaryResource, this);
}
Expand Down Expand Up @@ -119,6 +128,16 @@ public boolean isNextReconciliationImminent() {
.isNextReconciliationImminent(ResourceID.fromResource(primaryResource));
}

@Override
public boolean isDeleteEventPresent() {
return isDeleteEventPresent;
}

@Override
public boolean isDeleteFinalStateUnknown() {
return isDeleteFinalStateUnknown;
}

public DefaultContext<P> setRetryInfo(RetryInfo retryInfo) {
this.retryInfo = retryInfo;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
import io.javaoperatorsdk.operator.processing.event.source.Cache;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
import io.javaoperatorsdk.operator.processing.retry.Retry;
Expand Down Expand Up @@ -122,7 +123,7 @@ public synchronized void handleEvent(Event event) {
}

private void handleMarkedEventForResource(ResourceState state) {
if (state.deleteEventPresent()) {
if (state.deleteEventPresent() && !controllerConfiguration.isAllEventReconcileMode()) {
cleanupForDeletedEvent(state.getId());
} else if (!state.processedMarkForDeletionPresent()) {
submitReconciliationExecution(state);
Expand Down Expand Up @@ -179,7 +180,9 @@ private void handleEventMarking(Event event, ResourceState state) {
if (event instanceof ResourceEvent resourceEvent) {
if (resourceEvent.getAction() == ResourceAction.DELETED) {
log.debug("Marking delete event received for: {}", relatedCustomResourceID);
state.markDeleteEventReceived();
state.markDeleteEventReceived(
resourceEvent.getResource().orElseThrow(),
((ResourceDeleteEvent) resourceEvent).isDeletedFinalStateUnknown());
} else {
if (state.processedMarkForDeletionPresent() && isResourceMarkedForDeletion(resourceEvent)) {
log.debug(
Expand Down Expand Up @@ -244,14 +247,15 @@ synchronized void eventProcessingFinished(
// Either way we don't want to retry.
if (isRetryConfigured()
&& postExecutionControl.exceptionDuringExecution()
&& !state.deleteEventPresent()) {
&& (!state.deleteEventPresent() || controllerConfiguration.isAllEventReconcileMode())) {
handleRetryOnException(
executionScope, postExecutionControl.getRuntimeException().orElseThrow());
return;
}
cleanupOnSuccessfulExecution(executionScope);
metrics.finishedReconciliation(executionScope.getResource(), metricsMetadata);
if (state.deleteEventPresent()) {
if ((controllerConfiguration.isAllEventReconcileMode() && executionScope.isDeleteEvent())
|| (!controllerConfiguration.isAllEventReconcileMode() && state.deleteEventPresent())) {
cleanupForDeletedEvent(executionScope.getResourceID());
} else if (postExecutionControl.isFinalizerRemoved()) {
state.markProcessedMarkForDeletion();
Expand Down Expand Up @@ -450,6 +454,7 @@ private ReconcilerExecutor(ResourceID resourceID, ExecutionScope<P> executionSco
}

@Override
@SuppressWarnings("unchecked")
public void run() {
if (!running) {
// this is needed for the case when controller stopped, but there is a graceful shutdown
Expand All @@ -464,8 +469,26 @@ public void run() {
try {
var actualResource = cache.get(resourceID);
if (actualResource.isEmpty()) {
log.debug("Skipping execution; primary resource missing from cache: {}", resourceID);
return;
if (controllerConfiguration.isAllEventReconcileMode()) {
log.debug(
"Resource not found in the cache, checking for delete event resource: {}",
resourceID);
var state = resourceStateManager.get(resourceID);
actualResource =
(Optional<P>)
state
.filter(ResourceState::deleteEventPresent)
.map(ResourceState::getLastKnownResource);
if (actualResource.isEmpty()) {
log.debug(
"Skipping execution; delete event resource not found in state: {}", resourceID);
return;
}
executionScope.setDeleteEvent(true);
} else {
log.debug("Skipping execution; primary resource missing from cache: {}", resourceID);
return;
}
}
actualResource.ifPresent(executionScope::setResource);
MDCUtils.addResourceInfo(executionScope.getResource());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class ExecutionScope<R extends HasMetadata> {
// the latest custom resource from cache
private R resource;
private final RetryInfo retryInfo;
private boolean deleteEvent = false;
private boolean isDeleteFinalStateUnknown = false;

ExecutionScope(RetryInfo retryInfo) {
this.retryInfo = retryInfo;
Expand All @@ -26,6 +28,22 @@ public ResourceID getResourceID() {
return ResourceID.fromResource(resource);
}

public boolean isDeleteEvent() {
return deleteEvent;
}

public void setDeleteEvent(boolean deleteEvent) {
this.deleteEvent = deleteEvent;
}

public boolean isDeleteFinalStateUnknown() {
return isDeleteFinalStateUnknown;
}

public void setDeleteFinalStateUnknown(boolean deleteFinalStateUnknown) {
isDeleteFinalStateUnknown = deleteFinalStateUnknown;
}

@Override
public String toString() {
if (resource == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ private PostExecutionControl<P> handleDispatch(ExecutionScope<P> executionScope)
originalResource.getMetadata().getNamespace());

final var markedForDeletion = originalResource.isMarkedForDeletion();
if (markedForDeletion && shouldNotDispatchToCleanupWhenMarkedForDeletion(originalResource)) {
if (!configuration().isAllEventReconcileMode()
&& markedForDeletion
&& shouldNotDispatchToCleanupWhenMarkedForDeletion(originalResource)) {
log.debug(
"Skipping cleanup of resource {} because finalizer(s) {} don't allow processing yet",
getName(originalResource),
Expand All @@ -90,8 +92,13 @@ private PostExecutionControl<P> handleDispatch(ExecutionScope<P> executionScope)
}

Context<P> context =
new DefaultContext<>(executionScope.getRetryInfo(), controller, resourceForExecution);
if (markedForDeletion) {
new DefaultContext<>(
executionScope.getRetryInfo(),
controller,
resourceForExecution,
executionScope.isDeleteEvent(),
executionScope.isDeleteFinalStateUnknown());
if (markedForDeletion && !configuration().isAllEventReconcileMode()) {
return handleCleanup(resourceForExecution, originalResource, context);
} else {
return handleReconcile(executionScope, resourceForExecution, originalResource, context);
Expand All @@ -110,7 +117,8 @@ private PostExecutionControl<P> handleReconcile(
P originalResource,
Context<P> context)
throws Exception {
if (controller.useFinalizer()
if (!configuration().isAllEventReconcileMode()
&& controller.useFinalizer()
&& !originalResource.hasFinalizer(configuration().getFinalizerName())) {
/*
* We always add the finalizer if missing and the controller is configured to use a finalizer.
Expand Down
Loading