-
Notifications
You must be signed in to change notification settings - Fork 94
Adds a predicate to DataLoaderRegistry and a per dataloader map of pedicates is also possible #133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1151ccb
fb0a072
9320fb4
c528455
c0c6eef
1c8d48c
69528f1
3099f1a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,21 +5,25 @@ | |
import org.dataloader.annotations.ExperimentalApi; | ||
|
||
import java.time.Duration; | ||
import java.util.HashMap; | ||
import java.util.LinkedHashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.dataloader.impl.Assertions.nonNull; | ||
|
||
/** | ||
* This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called | ||
* This {@link DataLoaderRegistry} will use {@link DispatchPredicate}s when {@link #dispatchAll()} is called | ||
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled | ||
* to perform that predicate dispatch again via the {@link ScheduledExecutorService}. | ||
* <p> | ||
* It;s possible to have a {@link DispatchPredicate} per dataloader as well as a default {@link DispatchPredicate} for the | ||
* whole {@link ScheduledDataLoaderRegistry}. | ||
* <p> | ||
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case | ||
* no rescheduling will occur and you will need to call dispatch again to restart the process. | ||
* no rescheduling will occur, and you will need to call dispatch again to restart the process. | ||
* <p> | ||
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and | ||
* call {@link #rescheduleNow()}. | ||
|
@@ -29,17 +33,20 @@ | |
@ExperimentalApi | ||
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable { | ||
|
||
private final ScheduledExecutorService scheduledExecutorService; | ||
private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); | ||
private final DispatchPredicate dispatchPredicate; | ||
private final ScheduledExecutorService scheduledExecutorService; | ||
private final Duration schedule; | ||
private volatile boolean closed; | ||
|
||
private ScheduledDataLoaderRegistry(Builder builder) { | ||
super(); | ||
this.dataLoaders.putAll(builder.dataLoaders); | ||
this.scheduledExecutorService = builder.scheduledExecutorService; | ||
this.dispatchPredicate = builder.dispatchPredicate; | ||
this.schedule = builder.schedule; | ||
this.closed = false; | ||
this.dispatchPredicate = builder.dispatchPredicate; | ||
this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates); | ||
} | ||
|
||
/** | ||
|
@@ -57,6 +64,88 @@ public Duration getScheduleDuration() { | |
return schedule; | ||
} | ||
|
||
/** | ||
* This will combine all the current data loaders in this registry and all the data loaders from the specified registry | ||
* and return a new combined registry | ||
* | ||
* @param registry the registry to combine into this registry | ||
* | ||
* @return a new combined registry | ||
*/ | ||
public ScheduledDataLoaderRegistry combine(DataLoaderRegistry registry) { | ||
Builder combinedBuilder = ScheduledDataLoaderRegistry.newScheduledRegistry() | ||
.dispatchPredicate(this.dispatchPredicate); | ||
combinedBuilder.registerAll(this); | ||
combinedBuilder.registerAll(registry); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe worth checking here, or inside There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. checked - it does |
||
return combinedBuilder.build(); | ||
} | ||
|
||
|
||
/** | ||
* This will unregister a new dataloader | ||
* | ||
* @param key the key of the data loader to unregister | ||
* | ||
* @return this registry | ||
*/ | ||
public ScheduledDataLoaderRegistry unregister(String key) { | ||
DataLoader<?, ?> dataLoader = dataLoaders.remove(key); | ||
if (dataLoader != null) { | ||
dataLoaderPredicates.remove(dataLoader); | ||
} | ||
return this; | ||
} | ||
|
||
/** | ||
* @return a map of data loaders to specific dispatch predicates | ||
*/ | ||
public Map<DataLoader<?, ?>, DispatchPredicate> getDataLoaderPredicates() { | ||
return new LinkedHashMap<>(dataLoaderPredicates); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that a Also, really minor but the field and getter for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. I didnt deprecate but I did redocument |
||
|
||
/** | ||
* There is a default predicate that applies to the whole {@link ScheduledDataLoaderRegistry} | ||
* | ||
* @return the default dispatch predicate | ||
*/ | ||
public DispatchPredicate getDispatchPredicate() { | ||
return dispatchPredicate; | ||
} | ||
|
||
/** | ||
* This will register a new dataloader and dispatch predicate associated with that data loader | ||
* | ||
* @param key the key to put the data loader under | ||
* @param dataLoader the data loader to register | ||
* @param dispatchPredicate the dispatch predicate to associate with this data loader | ||
* | ||
* @return this registry | ||
*/ | ||
public ScheduledDataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) { | ||
dataLoaders.put(key, dataLoader); | ||
dataLoaderPredicates.put(dataLoader, dispatchPredicate); | ||
return this; | ||
} | ||
|
||
/** | ||
* Returns true if the dataloader has a predicate which returned true, OR the overall | ||
* registry predicate returned true. | ||
* | ||
* @param dataLoaderKey the key in the dataloader map | ||
* @param dataLoader the dataloader | ||
* | ||
* @return true if it should dispatch | ||
*/ | ||
private boolean shouldDispatch(String dataLoaderKey, DataLoader<?, ?> dataLoader) { | ||
DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); | ||
if (dispatchPredicate != null) { | ||
if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { | ||
return true; | ||
} | ||
} | ||
return this.dispatchPredicate.test(dataLoaderKey, dataLoader); | ||
} | ||
|
||
@Override | ||
public void dispatchAll() { | ||
dispatchAllWithCount(); | ||
|
@@ -68,7 +157,7 @@ public int dispatchAllWithCount() { | |
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) { | ||
DataLoader<?, ?> dataLoader = entry.getValue(); | ||
String key = entry.getKey(); | ||
if (dispatchPredicate.test(key, dataLoader)) { | ||
if (shouldDispatch(key, dataLoader)) { | ||
sum += dataLoader.dispatchWithCounts().getKeysCount(); | ||
} else { | ||
reschedule(key, dataLoader); | ||
|
@@ -77,24 +166,28 @@ public int dispatchAllWithCount() { | |
return sum; | ||
} | ||
|
||
|
||
/** | ||
* This will immediately dispatch the {@link DataLoader}s in the registry | ||
* without testing the predicate | ||
* without testing the predicates | ||
*/ | ||
public void dispatchAllImmediately() { | ||
super.dispatchAll(); | ||
dispatchAllWithCountImmediately(); | ||
} | ||
|
||
/** | ||
* This will immediately dispatch the {@link DataLoader}s in the registry | ||
* without testing the predicate | ||
* without testing the predicates | ||
* | ||
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. | ||
*/ | ||
public int dispatchAllWithCountImmediately() { | ||
return super.dispatchAllWithCount(); | ||
return dataLoaders.values().stream() | ||
.mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount()) | ||
.sum(); | ||
} | ||
|
||
|
||
/** | ||
* This will schedule a task to check the predicate and dispatch if true right now. It will not do | ||
* a pre check of the preodicate like {@link #dispatchAll()} would | ||
|
@@ -111,16 +204,16 @@ private void reschedule(String key, DataLoader<?, ?> dataLoader) { | |
} | ||
|
||
private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) { | ||
if (dispatchPredicate.test(key, dataLoader)) { | ||
if (shouldDispatch(key, dataLoader)) { | ||
dataLoader.dispatch(); | ||
} else { | ||
reschedule(key, dataLoader); | ||
} | ||
} | ||
|
||
/** | ||
* By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()} | ||
* and a schedule duration of 10 milli seconds. | ||
* By default, this will create use a {@link Executors#newSingleThreadScheduledExecutor()} | ||
* and a schedule duration of 10 milliseconds. | ||
* | ||
* @return A builder of {@link ScheduledDataLoaderRegistry}s | ||
*/ | ||
|
@@ -130,10 +223,11 @@ public static Builder newScheduledRegistry() { | |
|
||
public static class Builder { | ||
|
||
private final Map<String, DataLoader<?, ?>> dataLoaders = new LinkedHashMap<>(); | ||
private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new LinkedHashMap<>(); | ||
private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; | ||
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); | ||
private DispatchPredicate dispatchPredicate = (key, dl) -> true; | ||
private Duration schedule = Duration.ofMillis(10); | ||
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>(); | ||
|
||
public Builder scheduledExecutorService(ScheduledExecutorService executorService) { | ||
this.scheduledExecutorService = nonNull(executorService); | ||
|
@@ -145,11 +239,6 @@ public Builder schedule(Duration schedule) { | |
return this; | ||
} | ||
|
||
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { | ||
this.dispatchPredicate = nonNull(dispatchPredicate); | ||
return this; | ||
} | ||
|
||
/** | ||
* This will register a new dataloader | ||
* | ||
|
@@ -163,8 +252,24 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) { | |
return this; | ||
} | ||
|
||
|
||
/** | ||
* This will combine together the data loaders in this builder with the ones | ||
* This will register a new dataloader with a specific {@link DispatchPredicate} | ||
* | ||
* @param key the key to put the data loader under | ||
* @param dataLoader the data loader to register | ||
* @param dispatchPredicate the dispatch predicate | ||
* | ||
* @return this builder for a fluent pattern | ||
*/ | ||
public Builder register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) { | ||
register(key, dataLoader); | ||
dataLoaderPredicates.put(dataLoader, dispatchPredicate); | ||
return this; | ||
} | ||
|
||
/** | ||
* This will combine the data loaders in this builder with the ones | ||
* from a previous {@link DataLoaderRegistry} | ||
* | ||
* @param otherRegistry the previous {@link DataLoaderRegistry} | ||
|
@@ -173,6 +278,23 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) { | |
*/ | ||
public Builder registerAll(DataLoaderRegistry otherRegistry) { | ||
dataLoaders.putAll(otherRegistry.getDataLoadersMap()); | ||
if (otherRegistry instanceof ScheduledDataLoaderRegistry) { | ||
ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry) otherRegistry; | ||
dataLoaderPredicates.putAll(other.dataLoaderPredicates); | ||
} | ||
return this; | ||
} | ||
|
||
/** | ||
* This sets a default predicate on the {@link DataLoaderRegistry} that will control | ||
* whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched. | ||
* | ||
* @param dispatchPredicate the predicate | ||
* | ||
* @return this builder for a fluent pattern | ||
*/ | ||
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { | ||
this.dispatchPredicate = dispatchPredicate; | ||
return this; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stopped a javadoc warning