diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 67db4e2..0323db7 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -162,6 +162,9 @@ Object getCacheKeyWithContext(K key, Object context) { DispatchResult dispatch() { boolean batchingEnabled = loaderOptions.batchingEnabled(); + if (!loaderOptions.getDispatchPredicate().test(null, dataLoader)) { + return new DispatchResult<>(CompletableFuture.completedFuture(emptyList()), loaderQueue.size(), false); + } // // we copy the pre-loaded set of futures ready for dispatch final List keys = new ArrayList<>(); @@ -177,7 +180,7 @@ DispatchResult dispatch() { lastDispatchTime.set(now()); } if (!batchingEnabled || keys.isEmpty()) { - return new DispatchResult<>(completedFuture(emptyList()), 0); + return new DispatchResult<>(completedFuture(emptyList()), 0, false); } final int totalEntriesHandled = keys.size(); // @@ -198,7 +201,7 @@ DispatchResult dispatch() { } else { futureList = dispatchQueueBatch(keys, callContexts, queuedFutures); } - return new DispatchResult<>(futureList, totalEntriesHandled); + return new DispatchResult<>(futureList, totalEntriesHandled, true); } private CompletableFuture> sliceIntoBatchesOfBatches(List keys, List> queuedFutures, List callContexts, int maxBatchSize) { diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index bac9476..8ebe99a 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -18,6 +18,7 @@ import org.dataloader.annotations.PublicApi; import org.dataloader.impl.Assertions; +import org.dataloader.registries.DispatchPredicate; import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.NoOpStatisticsCollector; import org.dataloader.stats.StatisticsCollector; @@ -43,6 +44,7 @@ public class DataLoaderOptions { private CacheKey cacheKeyFunction; private CacheMap cacheMap; private ValueCache valueCache; + private DispatchPredicate dispatchPredicate; private int maxBatchSize; private Supplier statisticsCollector; private BatchLoaderContextProvider environmentProvider; @@ -60,6 +62,7 @@ public DataLoaderOptions() { statisticsCollector = NoOpStatisticsCollector::new; environmentProvider = NULL_PROVIDER; valueCacheOptions = ValueCacheOptions.newOptions(); + dispatchPredicate = DispatchPredicate.dispatchAlways(); batchLoaderScheduler = null; } @@ -290,6 +293,25 @@ public DataLoaderOptions setValueCache(ValueCache valueCache) { return this; } + /** + * @return the dispatch predicate of these options + */ + public DispatchPredicate getDispatchPredicate() { + return dispatchPredicate; + } + + /** + * Sets the {@link DispatchPredicate} to use. + * + * @param dispatchPredicate a non-null DispatchPredicate to use + * + * @return the data loader options for fluent coding + */ + public DataLoaderOptions dispatchPredicate(DispatchPredicate dispatchPredicate) { + this.dispatchPredicate = nonNull(dispatchPredicate); + return this; + } + /** * @return the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used */ diff --git a/src/main/java/org/dataloader/DispatchResult.java b/src/main/java/org/dataloader/DispatchResult.java index 97711da..12e1279 100644 --- a/src/main/java/org/dataloader/DispatchResult.java +++ b/src/main/java/org/dataloader/DispatchResult.java @@ -15,10 +15,16 @@ public class DispatchResult { private final CompletableFuture> futureList; private final int keysCount; + private final boolean wasDispatched; public DispatchResult(CompletableFuture> futureList, int keysCount) { + this(futureList, keysCount, true); + } + + public DispatchResult(CompletableFuture> futureList, int keysCount, boolean wasDispatched) { this.futureList = futureList; this.keysCount = keysCount; + this.wasDispatched = wasDispatched; } public CompletableFuture> getPromisedResults() { @@ -28,4 +34,16 @@ public CompletableFuture> getPromisedResults() { public int getKeysCount() { return keysCount; } + + /** + * If the {@link org.dataloader.registries.DispatchPredicate} associated with the dataloader + * returns false, then the call to dispatch was not performed and this will return false. + *

+ * Similarly, if the set the loaded keys was empty or the batching is not enabled them this will return false + * + * @return true of the dispatch call was actually made or false if it was not + */ + public boolean wasDispatched() { + return wasDispatched; + } } diff --git a/src/main/java/org/dataloader/registries/DispatchPredicate.java b/src/main/java/org/dataloader/registries/DispatchPredicate.java index d5bd31b..cd8da16 100644 --- a/src/main/java/org/dataloader/registries/DispatchPredicate.java +++ b/src/main/java/org/dataloader/registries/DispatchPredicate.java @@ -6,14 +6,16 @@ import java.util.Objects; /** - * A predicate class used by {@link ScheduledDataLoaderRegistry} to decide whether to dispatch or not + * A predicate class used by {@link ScheduledDataLoaderRegistry}s as well as by individual + * {@link DataLoader}s to decide whether to dispatch or not. */ @FunctionalInterface public interface DispatchPredicate { /** - * This predicate tests whether the data loader should be dispatched or not. + * This predicate tests whether the data loader should be dispatched or not. If the predicate is associated direct to a {@link DataLoader} + * then the dataLoaderKey parameter will be null. * - * @param dataLoaderKey the key of the data loader when registered + * @param dataLoaderKey the key of the data loader when registered or null if this is a predicate associated direct with a {@link DataLoader} * @param dataLoader the dataloader to dispatch * * @return true if the data loader SHOULD be dispatched @@ -68,7 +70,7 @@ default DispatchPredicate or(DispatchPredicate other) { * * @param duration the length of time to check * - * @return true if the data loader has not been dispatched in duration time + * @return a predicate that returns true if the data loader has not been dispatched in duration time */ static DispatchPredicate dispatchIfLongerThan(Duration duration) { return (dataLoaderKey, dataLoader) -> { @@ -79,14 +81,32 @@ static DispatchPredicate dispatchIfLongerThan(Duration duration) { /** * This predicate will return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth. - * + *

* This will act as minimum batch size. There must be more than `depth` items queued for the predicate to return true. * * @param depth the value to be greater than * - * @return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth. + * @return a predicate that returns true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth. */ static DispatchPredicate dispatchIfDepthGreaterThan(int depth) { return (dataLoaderKey, dataLoader) -> dataLoader.dispatchDepth() > depth; } + + /** + * This predicate will return true always + * + * @return a predicate that returns true always + */ + static DispatchPredicate dispatchAlways() { + return (dataLoaderKey, dataLoader) -> true; + } + + /** + * This predicate will never return true + * + * @return a predicate that never returns true + */ + static DispatchPredicate dispatchNever() { + return (dataLoaderKey, dataLoader) -> false; + } } diff --git a/src/test/java/org/dataloader/DataLoaderPredicateTest.java b/src/test/java/org/dataloader/DataLoaderPredicateTest.java new file mode 100644 index 0000000..a57d910 --- /dev/null +++ b/src/test/java/org/dataloader/DataLoaderPredicateTest.java @@ -0,0 +1,119 @@ +package org.dataloader; + +import org.dataloader.registries.DispatchPredicate; +import org.dataloader.stats.SimpleStatisticsCollector; +import org.dataloader.stats.Statistics; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import static java.util.Arrays.asList; +import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests related to dispatching predicates. + */ +public class DataLoaderPredicateTest { + + @Test + public void the_predicate_will_prevent_loading() { + BatchLoader batchLoader = CompletableFuture::completedFuture; + DataLoader loader = newDataLoader(batchLoader, + DataLoaderOptions.newOptions().setStatisticsCollector(SimpleStatisticsCollector::new) + .dispatchPredicate(DispatchPredicate.dispatchNever()) + ); + + loader.load("A"); + loader.load("B"); + loader.loadMany(asList("C", "D")); + + Statistics stats = loader.getStatistics(); + assertThat(stats.getLoadCount(), equalTo(4L)); + assertThat(stats.getBatchInvokeCount(), equalTo(0L)); + assertThat(stats.getBatchLoadCount(), equalTo(0L)); + assertThat(stats.getCacheHitCount(), equalTo(0L)); + + DispatchResult dispatchResult = loader.dispatchWithCounts(); + assertThat(dispatchResult.wasDispatched(), equalTo(false)); + assertThat(dispatchResult.getKeysCount(), equalTo(4)); + + stats = loader.getStatistics(); + assertThat(stats.getLoadCount(), equalTo(4L)); + assertThat(stats.getBatchInvokeCount(), equalTo(0L)); + assertThat(stats.getBatchLoadCount(), equalTo(0L)); + + + loader.load("A"); + loader.load("B"); + + dispatchResult = loader.dispatchWithCounts(); + assertThat(dispatchResult.wasDispatched(), equalTo(false)); + assertThat(dispatchResult.getKeysCount(), equalTo(4)); + + stats = loader.getStatistics(); + assertThat(stats.getLoadCount(), equalTo(6L)); + assertThat(stats.getBatchInvokeCount(), equalTo(0L)); + assertThat(stats.getBatchLoadCount(), equalTo(0L)); + } + + @Test + public void the_predicate_will_allow_loading_by_default() { + BatchLoader batchLoader = CompletableFuture::completedFuture; + DataLoader loader = newDataLoader(batchLoader, + DataLoaderOptions.newOptions().setStatisticsCollector(SimpleStatisticsCollector::new) + .dispatchPredicate(DispatchPredicate.dispatchAlways()) + ); + + loader.load("A"); + loader.load("B"); + loader.loadMany(asList("C", "D")); + + + DispatchResult dispatchResult = loader.dispatchWithCounts(); + assertThat(dispatchResult.wasDispatched(), equalTo(true)); + assertThat(dispatchResult.getKeysCount(), equalTo(4)); + + Statistics stats = loader.getStatistics(); + assertThat(stats.getLoadCount(), equalTo(4L)); + assertThat(stats.getBatchInvokeCount(), equalTo(1L)); + assertThat(stats.getBatchLoadCount(), equalTo(4L)); + + + loader.load("E"); + loader.load("F"); + + dispatchResult = loader.dispatchWithCounts(); + assertThat(dispatchResult.wasDispatched(), equalTo(true)); + assertThat(dispatchResult.getKeysCount(), equalTo(2)); + + stats = loader.getStatistics(); + assertThat(stats.getLoadCount(), equalTo(6L)); + assertThat(stats.getBatchInvokeCount(), equalTo(2L)); + assertThat(stats.getBatchLoadCount(), equalTo(6L)); + } + + @Test + public void dataloader_options_have_a_default_which_is_always_on() { + BatchLoader batchLoader = CompletableFuture::completedFuture; + DataLoaderOptions dataLoaderOptions = DataLoaderOptions.newOptions(); + + DispatchPredicate defaultPredicate = dataLoaderOptions.getDispatchPredicate(); + assertThat(defaultPredicate, notNullValue()); + assertThat(defaultPredicate.test(null, null), equalTo(true)); + + + DataLoader loader = newDataLoader(batchLoader, dataLoaderOptions); + + loader.load("A"); + loader.load("B"); + loader.loadMany(asList("C", "D")); + + DispatchResult dispatchResult = loader.dispatchWithCounts(); + assertThat(dispatchResult.wasDispatched(), equalTo(true)); + assertThat(dispatchResult.getKeysCount(), equalTo(4)); + + } +}