Skip to content

Adds a predicate to DataLoaderRegistry and a per dataloader map of pedicates is also possible #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 8, 2023
2 changes: 1 addition & 1 deletion src/main/java/org/dataloader/DataLoaderRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.function.Function;

/**
* This allows data loaders to be registered together into a single place so
* This allows data loaders to be registered together into a single place, so
* they can be dispatched as one. It also allows you to retrieve data loaders by
* name from a central place
*/
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/dataloader/annotations/GuardedBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
public @interface GuardedBy {

/**
* The lock that should be held.
* @return The lock that should be held.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopped a javadoc warning

*/
String value();
}
10 changes: 10 additions & 0 deletions src/main/java/org/dataloader/registries/DispatchPredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@
*/
@FunctionalInterface
public interface DispatchPredicate {

/**
* A predicate that always returns true
*/
DispatchPredicate DISPATCH_ALWAYS = (dataLoaderKey, dataLoader) -> true;
/**
* A predicate that always returns false
*/
DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> false;

/**
* This predicate tests whether the data loader should be dispatched or not.
*
Expand Down
164 changes: 143 additions & 21 deletions src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@
import org.dataloader.annotations.ExperimentalApi;

import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.dataloader.impl.Assertions.nonNull;

/**
* This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called
* This {@link DataLoaderRegistry} will use {@link DispatchPredicate}s when {@link #dispatchAll()} is called
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled
* to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
* <p>
* It;s possible to have a {@link DispatchPredicate} per dataloader as well as a default {@link DispatchPredicate} for the
* whole {@link ScheduledDataLoaderRegistry}.
* <p>
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
* no rescheduling will occur and you will need to call dispatch again to restart the process.
* no rescheduling will occur, and you will need to call dispatch again to restart the process.
* <p>
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
* call {@link #rescheduleNow()}.
Expand All @@ -29,17 +33,20 @@
@ExperimentalApi
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {

private final ScheduledExecutorService scheduledExecutorService;
private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>();
private final DispatchPredicate dispatchPredicate;
private final ScheduledExecutorService scheduledExecutorService;
private final Duration schedule;
private volatile boolean closed;

private ScheduledDataLoaderRegistry(Builder builder) {
super();
this.dataLoaders.putAll(builder.dataLoaders);
this.scheduledExecutorService = builder.scheduledExecutorService;
this.dispatchPredicate = builder.dispatchPredicate;
this.schedule = builder.schedule;
this.closed = false;
this.dispatchPredicate = builder.dispatchPredicate;
this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates);
}

/**
Expand All @@ -57,6 +64,88 @@ public Duration getScheduleDuration() {
return schedule;
}

/**
* This will combine all the current data loaders in this registry and all the data loaders from the specified registry
* and return a new combined registry
*
* @param registry the registry to combine into this registry
*
* @return a new combined registry
*/
public ScheduledDataLoaderRegistry combine(DataLoaderRegistry registry) {
Builder combinedBuilder = ScheduledDataLoaderRegistry.newScheduledRegistry()
.dispatchPredicate(this.dispatchPredicate);
combinedBuilder.registerAll(this);
combinedBuilder.registerAll(registry);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth checking here, or inside registerAll if the other registry is also a ScheduledDataLoaderRegistry and if so picking up its dispatchPredicates too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked - it does

return combinedBuilder.build();
}


/**
* This will unregister a new dataloader
*
* @param key the key of the data loader to unregister
*
* @return this registry
*/
public ScheduledDataLoaderRegistry unregister(String key) {
DataLoader<?, ?> dataLoader = dataLoaders.remove(key);
if (dataLoader != null) {
dataLoaderPredicates.remove(dataLoader);
}
return this;
}

/**
* @return a map of data loaders to specific dispatch predicates
*/
public Map<DataLoader<?, ?>, DispatchPredicate> getDataLoaderPredicates() {
return new LinkedHashMap<>(dataLoaderPredicates);
}
Copy link

@rstoyanchev rstoyanchev Oct 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that a DispatchPredicate can be registered per DataLoader, this should ideally be called defaultDispatchPredicate. However, I understand that would involve renaming, and/or a deprecation and a property with the new name. In the very least, the Javadoc should be updated to indicate that it applies by default, if there isn't already a registration for that DataLoader.

Also, really minor but the field and getter for dispatchPredicate could be ordered after the map of predicates, like it is in the builder to reflect it comes after the other registrations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. I didnt deprecate but I did redocument


/**
* There is a default predicate that applies to the whole {@link ScheduledDataLoaderRegistry}
*
* @return the default dispatch predicate
*/
public DispatchPredicate getDispatchPredicate() {
return dispatchPredicate;
}

/**
* This will register a new dataloader and dispatch predicate associated with that data loader
*
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
* @param dispatchPredicate the dispatch predicate to associate with this data loader
*
* @return this registry
*/
public ScheduledDataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
dataLoaders.put(key, dataLoader);
dataLoaderPredicates.put(dataLoader, dispatchPredicate);
return this;
}

/**
* Returns true if the dataloader has a predicate which returned true, OR the overall
* registry predicate returned true.
*
* @param dataLoaderKey the key in the dataloader map
* @param dataLoader the dataloader
*
* @return true if it should dispatch
*/
private boolean shouldDispatch(String dataLoaderKey, DataLoader<?, ?> dataLoader) {
DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader);
if (dispatchPredicate != null) {
if (dispatchPredicate.test(dataLoaderKey, dataLoader)) {
return true;
}
}
return this.dispatchPredicate.test(dataLoaderKey, dataLoader);
}

@Override
public void dispatchAll() {
dispatchAllWithCount();
Expand All @@ -68,7 +157,7 @@ public int dispatchAllWithCount() {
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
DataLoader<?, ?> dataLoader = entry.getValue();
String key = entry.getKey();
if (dispatchPredicate.test(key, dataLoader)) {
if (shouldDispatch(key, dataLoader)) {
sum += dataLoader.dispatchWithCounts().getKeysCount();
} else {
reschedule(key, dataLoader);
Expand All @@ -77,24 +166,28 @@ public int dispatchAllWithCount() {
return sum;
}


/**
* This will immediately dispatch the {@link DataLoader}s in the registry
* without testing the predicate
* without testing the predicates
*/
public void dispatchAllImmediately() {
super.dispatchAll();
dispatchAllWithCountImmediately();
}

/**
* This will immediately dispatch the {@link DataLoader}s in the registry
* without testing the predicate
* without testing the predicates
*
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
*/
public int dispatchAllWithCountImmediately() {
return super.dispatchAllWithCount();
return dataLoaders.values().stream()
.mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount())
.sum();
}


/**
* This will schedule a task to check the predicate and dispatch if true right now. It will not do
* a pre check of the preodicate like {@link #dispatchAll()} would
Expand All @@ -111,16 +204,16 @@ private void reschedule(String key, DataLoader<?, ?> dataLoader) {
}

private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
if (dispatchPredicate.test(key, dataLoader)) {
if (shouldDispatch(key, dataLoader)) {
dataLoader.dispatch();
} else {
reschedule(key, dataLoader);
}
}

/**
* By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
* and a schedule duration of 10 milli seconds.
* By default, this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
* and a schedule duration of 10 milliseconds.
*
* @return A builder of {@link ScheduledDataLoaderRegistry}s
*/
Expand All @@ -130,10 +223,11 @@ public static Builder newScheduledRegistry() {

public static class Builder {

private final Map<String, DataLoader<?, ?>> dataLoaders = new LinkedHashMap<>();
private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new LinkedHashMap<>();
private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS;
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private DispatchPredicate dispatchPredicate = (key, dl) -> true;
private Duration schedule = Duration.ofMillis(10);
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();

public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
this.scheduledExecutorService = nonNull(executorService);
Expand All @@ -145,11 +239,6 @@ public Builder schedule(Duration schedule) {
return this;
}

public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
this.dispatchPredicate = nonNull(dispatchPredicate);
return this;
}

/**
* This will register a new dataloader
*
Expand All @@ -163,8 +252,24 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
return this;
}


/**
* This will combine together the data loaders in this builder with the ones
* This will register a new dataloader with a specific {@link DispatchPredicate}
*
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
* @param dispatchPredicate the dispatch predicate
*
* @return this builder for a fluent pattern
*/
public Builder register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
register(key, dataLoader);
dataLoaderPredicates.put(dataLoader, dispatchPredicate);
return this;
}

/**
* This will combine the data loaders in this builder with the ones
* from a previous {@link DataLoaderRegistry}
*
* @param otherRegistry the previous {@link DataLoaderRegistry}
Expand All @@ -173,6 +278,23 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
*/
public Builder registerAll(DataLoaderRegistry otherRegistry) {
dataLoaders.putAll(otherRegistry.getDataLoadersMap());
if (otherRegistry instanceof ScheduledDataLoaderRegistry) {
ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry) otherRegistry;
dataLoaderPredicates.putAll(other.dataLoaderPredicates);
}
return this;
}

/**
* This sets a default predicate on the {@link DataLoaderRegistry} that will control
* whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched.
*
* @param dispatchPredicate the predicate
*
* @return this builder for a fluent pattern
*/
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
this.dispatchPredicate = dispatchPredicate;
return this;
}

Expand Down
11 changes: 11 additions & 0 deletions src/test/java/org/dataloader/fixtures/TestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
import org.dataloader.MappedBatchLoaderWithContext;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -94,4 +97,12 @@ public static void snooze(int millis) {
public static <T> List<T> sort(Collection<? extends T> collection) {
return collection.stream().sorted().collect(toList());
}

public static <T> Set<T> asSet(T... elements) {
return new LinkedHashSet<>(Arrays.asList(elements));
}

public static <T> Set<T> asSet(Collection<T> elements) {
return new LinkedHashSet<>(elements);
}
}
Loading