From e5485adc8040cb7227416ec8295be4fc54d7b1ec Mon Sep 17 00:00:00 2001 From: Raymie Stata Date: Sun, 11 Sep 2022 08:46:15 -0700 Subject: [PATCH 01/18] Try.getThrowable - fix exception msg. --- src/main/java/org/dataloader/Try.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/dataloader/Try.java b/src/main/java/org/dataloader/Try.java index 3f9a129..c7eac67 100644 --- a/src/main/java/org/dataloader/Try.java +++ b/src/main/java/org/dataloader/Try.java @@ -167,7 +167,7 @@ public V get() { */ public Throwable getThrowable() { if (isSuccess()) { - throw new UnsupportedOperationException("You have called Try.getThrowable() with a failed Try", throwable); + throw new UnsupportedOperationException("You have called Try.getThrowable() with a successful Try"); } return throwable; } From 50886585d902d45234a58a4f2050898afa0835a0 Mon Sep 17 00:00:00 2001 From: dondonz <13839920+dondonz@users.noreply.github.com> Date: Tue, 7 Mar 2023 14:51:34 +1100 Subject: [PATCH 02/18] Prepend 0.0.0 to build version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 8d8c392..f5064ed 100644 --- a/build.gradle +++ b/build.gradle @@ -20,7 +20,7 @@ def getDevelopmentVersion() { println "git hash is empty: error: ${error.toString()}" throw new IllegalStateException("git hash could not be determined") } - def version = new SimpleDateFormat('yyyy-MM-dd\'T\'HH-mm-ss').format(new Date()) + "-" + gitHash + def version = "0.0.0-" + new SimpleDateFormat('yyyy-MM-dd\'T\'HH-mm-ss').format(new Date()) + "-" + gitHash println "created development version: $version" version } From 32ce3cbfa9cfce55c2c42e04b3351a5969139b3c Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 8 May 2023 13:08:12 +1000 Subject: [PATCH 03/18] Added BatchLoaderScheduler into the mix --- README.md | 45 ++++- .../java/org/dataloader/DataLoaderHelper.java | 35 +++- .../org/dataloader/DataLoaderOptions.java | 24 +++ .../scheduler/BatchLoaderScheduler.java | 74 ++++++++ .../java/org/dataloader/fixtures/TestKit.java | 27 +++ .../scheduler/BatchLoaderSchedulerTest.java | 167 ++++++++++++++++++ 6 files changed, 367 insertions(+), 5 deletions(-) create mode 100644 src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java create mode 100644 src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java diff --git a/README.md b/README.md index e48de1d..ef19d9c 100644 --- a/README.md +++ b/README.md @@ -510,7 +510,50 @@ and there are also gains to this different mode of operation: However, with batch execution control comes responsibility! If you forget to make the call to `dispatch()` then the futures in the load request queue will never be batched, and thus _will never complete_! So be careful when crafting your loader designs. -## Scheduled Dispatching +## The BatchLoader Scheduler + +By default, when `dataLoader.dispatch()` is called the `BatchLoader` / `MappedBatchLoader` function will be invoked +immediately. + +However, you can provide your own `BatchLoaderScheduler` that allows this call to be done some time into +the future. + +You will be passed a callback (`ScheduledBatchLoaderCall` / `ScheduledMapBatchLoaderCall`) and you are expected +to eventually call this callback method to make the batch loading happen. + +The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invoking the batch loading functions. + +```java + new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; +``` + +You are given the keys to be loaded and an optional `BatchLoaderEnvironment` for informative purposes. You can't change the list of +keys that will be loaded via this mechanism say. + +Also note, because there is a max batch size, it is possible for this scheduling to happen N times for a given `dispatch()` +call. The total set of keys will be sliced into batches themselves and then the `BatchLoaderScheduler` will be called for +each batch of keys. + +Do not assume that a single call to `dispatch()` results in a single call to `BatchLoaderScheduler`. + +## Scheduled Registry Dispatching `ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a predicate that is evaluated (per data loader contained within) when `dispatchAll` is invoked. diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 883189c..67db4e2 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -3,6 +3,7 @@ import org.dataloader.annotations.GuardedBy; import org.dataloader.annotations.Internal; import org.dataloader.impl.CompletableFutureKit; +import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.StatisticsCollector; import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext; import org.dataloader.stats.context.IncrementBatchLoadExceptionCountStatisticsContext; @@ -417,10 +418,23 @@ CompletableFuture> invokeLoader(List keys, List keyContexts) @SuppressWarnings("unchecked") private CompletableFuture> invokeListBatchLoader(List keys, BatchLoaderEnvironment environment) { CompletionStage> loadResult; + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof BatchLoaderWithContext) { - loadResult = ((BatchLoaderWithContext) batchLoadFunction).load(keys, environment); + BatchLoaderWithContext loadFunction = (BatchLoaderWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchLoaderCall loadCall = () -> loadFunction.load(keys, environment); + loadResult = batchLoaderScheduler.scheduleBatchLoader(loadCall, keys, environment); + } else { + loadResult = loadFunction.load(keys, environment); + } } else { - loadResult = ((BatchLoader) batchLoadFunction).load(keys); + BatchLoader loadFunction = (BatchLoader) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchLoaderCall loadCall = () -> loadFunction.load(keys); + loadResult = batchLoaderScheduler.scheduleBatchLoader(loadCall, keys, null); + } else { + loadResult = loadFunction.load(keys); + } } return nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture(); } @@ -434,10 +448,23 @@ private CompletableFuture> invokeListBatchLoader(List keys, BatchLoad private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoaderEnvironment environment) { CompletionStage> loadResult; Set setOfKeys = new LinkedHashSet<>(keys); + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof MappedBatchLoaderWithContext) { - loadResult = ((MappedBatchLoaderWithContext) batchLoadFunction).load(setOfKeys, environment); + MappedBatchLoaderWithContext loadFunction = (MappedBatchLoaderWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledMappedBatchLoaderCall loadCall = () -> loadFunction.load(setOfKeys, environment); + loadResult = batchLoaderScheduler.scheduleMappedBatchLoader(loadCall, keys, environment); + } else { + loadResult = loadFunction.load(setOfKeys, environment); + } } else { - loadResult = ((MappedBatchLoader) batchLoadFunction).load(setOfKeys); + MappedBatchLoader loadFunction = (MappedBatchLoader) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledMappedBatchLoaderCall loadCall = () -> loadFunction.load(setOfKeys); + loadResult = batchLoaderScheduler.scheduleMappedBatchLoader(loadCall, keys, null); + } else { + loadResult = loadFunction.load(setOfKeys); + } } CompletableFuture> mapBatchLoad = nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture(); return mapBatchLoad.thenApply(map -> { diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index 4c79296..bac9476 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -18,6 +18,7 @@ import org.dataloader.annotations.PublicApi; import org.dataloader.impl.Assertions; +import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.NoOpStatisticsCollector; import org.dataloader.stats.StatisticsCollector; @@ -46,6 +47,7 @@ public class DataLoaderOptions { private Supplier statisticsCollector; private BatchLoaderContextProvider environmentProvider; private ValueCacheOptions valueCacheOptions; + private BatchLoaderScheduler batchLoaderScheduler; /** * Creates a new data loader options with default settings. @@ -58,6 +60,7 @@ public DataLoaderOptions() { statisticsCollector = NoOpStatisticsCollector::new; environmentProvider = NULL_PROVIDER; valueCacheOptions = ValueCacheOptions.newOptions(); + batchLoaderScheduler = null; } /** @@ -77,6 +80,7 @@ public DataLoaderOptions(DataLoaderOptions other) { this.statisticsCollector = other.statisticsCollector; this.environmentProvider = other.environmentProvider; this.valueCacheOptions = other.valueCacheOptions; + batchLoaderScheduler = other.batchLoaderScheduler; } /** @@ -304,4 +308,24 @@ public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOption this.valueCacheOptions = Assertions.nonNull(valueCacheOptions); return this; } + + /** + * @return the {@link BatchLoaderScheduler} to use, which can be null + */ + public BatchLoaderScheduler getBatchLoaderScheduler() { + return batchLoaderScheduler; + } + + /** + * Sets in a new {@link BatchLoaderScheduler} that allows the call to a {@link BatchLoader} function to be scheduled + * to some future time. + * + * @param batchLoaderScheduler the scheduler + * + * @return the data loader options for fluent coding + */ + public DataLoaderOptions setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) { + this.batchLoaderScheduler = batchLoaderScheduler; + return this; + } } diff --git a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java new file mode 100644 index 0000000..21e63f7 --- /dev/null +++ b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java @@ -0,0 +1,74 @@ +package org.dataloader.scheduler; + +import org.dataloader.BatchLoader; +import org.dataloader.BatchLoaderEnvironment; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.MappedBatchLoader; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +/** + * By default, when {@link DataLoader#dispatch()} is called the {@link BatchLoader} / {@link MappedBatchLoader} function will be invoked + * immediately. However, you can provide your own {@link BatchLoaderScheduler} that allows this call to be done some time into + * the future. You will be passed a callback ({@link ScheduledBatchLoaderCall} / {@link ScheduledMappedBatchLoaderCall} and you are expected + * to eventually call this callback method to make the batch loading happen. + *

+ * Note: Because there is a {@link DataLoaderOptions#maxBatchSize()} it is possible for this scheduling to happen N times for a given {@link DataLoader#dispatch()} + * call. The total set of keys will be sliced into batches themselves and then the {@link BatchLoaderScheduler} will be called for + * each batch of keys. Do not assume that a single call to {@link DataLoader#dispatch()} results in a single call to {@link BatchLoaderScheduler}. + */ +public interface BatchLoaderScheduler { + + + /** + * This represents a callback that will invoke a {@link BatchLoader} function under the covers + * + * @param the value type + */ + interface ScheduledBatchLoaderCall { + CompletionStage> invoke(); + } + + /** + * This represents a callback that will invoke a {@link MappedBatchLoader} function under the covers + * + * @param the key type + * @param the value type + */ + interface ScheduledMappedBatchLoaderCall { + CompletionStage> invoke(); + } + + /** + * This is called to schedule a {@link BatchLoader} call. + * + * @param scheduledCall the callback that needs to be invoked to allow the {@link BatchLoader} to proceed. + * @param keys this is the list of keys that will be passed to the {@link BatchLoader}. + * This is provided only for informative reasons and you cant change the keys that are used + * @param environment this is the {@link BatchLoaderEnvironment} in place, + * which can be null if it's a simple {@link BatchLoader} call + * @param the key type + * @param the value type + * + * @return a promise to the values that come from the {@link BatchLoader} + */ + CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment); + + /** + * This is called to schedule a {@link MappedBatchLoader} call. + * + * @param scheduledCall the callback that needs to be invoked to allow the {@link MappedBatchLoader} to proceed. + * @param keys this is the list of keys that will be passed to the {@link MappedBatchLoader}. + * This is provided only for informative reasons and you cant change the keys that are used + * @param environment this is the {@link BatchLoaderEnvironment} in place, + * which can be null if it's a simple {@link MappedBatchLoader} call + * @param the key type + * @param the value type + * + * @return a promise to the values that come from the {@link BatchLoader} + */ + CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment); +} diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 5c87148..0206bd9 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -1,13 +1,19 @@ package org.dataloader.fixtures; import org.dataloader.BatchLoader; +import org.dataloader.BatchLoaderWithContext; import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderOptions; +import org.dataloader.MappedBatchLoader; +import org.dataloader.MappedBatchLoaderWithContext; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static java.util.stream.Collectors.toList; @@ -19,6 +25,27 @@ public static BatchLoader keysAsValues() { return CompletableFuture::completedFuture; } + public static BatchLoaderWithContext keysAsValuesWithContext() { + return (keys, env) -> CompletableFuture.completedFuture(keys); + } + + public static MappedBatchLoader keysAsMapOfValues() { + return keys -> mapOfKeys(keys); + } + + public static MappedBatchLoaderWithContext keysAsMapOfValuesWithContext() { + return (keys, env) -> mapOfKeys(keys); + } + + private static CompletableFuture> mapOfKeys(Set keys) { + Map map = new HashMap<>(); + for (K key : keys) { + //noinspection unchecked + map.put(key, (V) key); + } + return CompletableFuture.completedFuture(map); + } + public static BatchLoader keysAsValues(List> loadCalls) { return keys -> { List ks = new ArrayList<>(keys); diff --git a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java new file mode 100644 index 0000000..beb7c18 --- /dev/null +++ b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java @@ -0,0 +1,167 @@ +package org.dataloader.scheduler; + +import org.dataloader.BatchLoaderEnvironment; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import static org.awaitility.Awaitility.await; +import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedDataLoader; +import static org.dataloader.fixtures.TestKit.keysAsMapOfValues; +import static org.dataloader.fixtures.TestKit.keysAsMapOfValuesWithContext; +import static org.dataloader.fixtures.TestKit.keysAsValues; +import static org.dataloader.fixtures.TestKit.keysAsValuesWithContext; +import static org.dataloader.fixtures.TestKit.snooze; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class BatchLoaderSchedulerTest { + + BatchLoaderScheduler immediateScheduling = new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return scheduledCall.invoke(); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return scheduledCall.invoke(); + } + }; + + private BatchLoaderScheduler delayedScheduling(int ms) { + return new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(ms); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(ms); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; + } + + private static void commonSetupAndSimpleAsserts(DataLoader identityLoader) { + CompletableFuture future1 = identityLoader.load(1); + CompletableFuture future2 = identityLoader.load(2); + + identityLoader.dispatch(); + + await().until(() -> future1.isDone() && future2.isDone()); + assertThat(future1.join(), equalTo(1)); + assertThat(future2.join(), equalTo(2)); + } + + @Test + public void can_allow_a_simple_scheduler() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newDataLoader(keysAsValues(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_a_simple_scheduler_with_context() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newDataLoader(keysAsValuesWithContext(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_a_simple_scheduler_with_mapped_batch_load() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newMappedDataLoader(keysAsMapOfValues(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_a_simple_scheduler_with_mapped_batch_load_with_context() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newMappedDataLoader(keysAsMapOfValuesWithContext(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_an_async_scheduler() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(delayedScheduling(50)); + + DataLoader identityLoader = newDataLoader(keysAsValues(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + + @Test + public void can_allow_a_funky_scheduler() { + AtomicBoolean releaseTheHounds = new AtomicBoolean(); + BatchLoaderScheduler funkyScheduler = new BatchLoaderScheduler() { + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + while (!releaseTheHounds.get()) { + snooze(10); + } + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + while (!releaseTheHounds.get()) { + snooze(10); + } + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(funkyScheduler); + + DataLoader identityLoader = newDataLoader(keysAsValues(), options); + + CompletableFuture future1 = identityLoader.load(1); + CompletableFuture future2 = identityLoader.load(2); + + identityLoader.dispatch(); + + // we can spin around for a while - nothing will happen until we release the hounds + for (int i = 0; i < 5; i++) { + assertThat(future1.isDone(), equalTo(false)); + assertThat(future2.isDone(), equalTo(false)); + snooze(50); + } + + releaseTheHounds.set(true); + + await().until(() -> future1.isDone() && future2.isDone()); + assertThat(future1.join(), equalTo(1)); + assertThat(future2.join(), equalTo(2)); + } + + +} From 3882bf9d3394c116a7e3be5470a118409533b89f Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 8 May 2023 13:15:07 +1000 Subject: [PATCH 04/18] Added BatchLoaderScheduler into the mix- doco update --- README.md | 4 +++- .../java/org/dataloader/scheduler/BatchLoaderScheduler.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index ef19d9c..b64453e 100644 --- a/README.md +++ b/README.md @@ -512,7 +512,7 @@ in the load request queue will never be batched, and thus _will never complete_! ## The BatchLoader Scheduler -By default, when `dataLoader.dispatch()` is called the `BatchLoader` / `MappedBatchLoader` function will be invoked +By default, when `dataLoader.dispatch()` is called, the `BatchLoader` / `MappedBatchLoader` function will be invoked immediately. However, you can provide your own `BatchLoaderScheduler` that allows this call to be done some time into @@ -553,6 +553,8 @@ each batch of keys. Do not assume that a single call to `dispatch()` results in a single call to `BatchLoaderScheduler`. +This code is inspired from the scheduling code in the [reference JS implementation](https://github.com/graphql/dataloader#batch-scheduling) + ## Scheduled Registry Dispatching `ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a diff --git a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java index 21e63f7..bcebfa0 100644 --- a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java +++ b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java @@ -11,7 +11,7 @@ import java.util.concurrent.CompletionStage; /** - * By default, when {@link DataLoader#dispatch()} is called the {@link BatchLoader} / {@link MappedBatchLoader} function will be invoked + * By default, when {@link DataLoader#dispatch()} is called, the {@link BatchLoader} / {@link MappedBatchLoader} function will be invoked * immediately. However, you can provide your own {@link BatchLoaderScheduler} that allows this call to be done some time into * the future. You will be passed a callback ({@link ScheduledBatchLoaderCall} / {@link ScheduledMappedBatchLoaderCall} and you are expected * to eventually call this callback method to make the batch loading happen. From 23492f208058047c0a5534492c8d55182135f790 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 8 May 2023 15:23:31 +1000 Subject: [PATCH 05/18] Readme examples so they compile --- src/test/java/ReadmeExamples.java | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index e37550e..40a0260 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -12,6 +12,7 @@ import org.dataloader.fixtures.UserManager; import org.dataloader.registries.DispatchPredicate; import org.dataloader.registries.ScheduledDataLoaderRegistry; +import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.Statistics; import org.dataloader.stats.ThreadLocalStatisticsCollector; @@ -23,6 +24,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Function; import java.util.stream.Collectors; import static java.lang.String.format; @@ -278,6 +280,30 @@ private void statsConfigExample() { DataLoader userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options); } + private void snooze(int i) { + } + + private void BatchLoaderSchedulerExample() { + new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; + } + private void ScheduledDispatche() { DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10) .or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200))); From 1044d9c27648360383d85e2717c8635f217383eb Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 25 Sep 2023 14:31:38 +1000 Subject: [PATCH 06/18] This adds a ticker mode to ScheduledDataLoaderRegistry --- .../ScheduledDataLoaderRegistry.java | 63 ++++++++++++++---- .../java/org/dataloader/fixtures/TestKit.java | 28 +++++++- .../ScheduledDataLoaderRegistryTest.java | 64 +++++++++++++++++++ 3 files changed, 143 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 4be317e..9f40d62 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -15,15 +15,34 @@ /** * This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called - * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled - * to perform that predicate dispatch again via the {@link ScheduledExecutorService}. + * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, + * then a task is scheduled to perform that predicate dispatch again via the {@link ScheduledExecutorService}. *

- * This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case - * no rescheduling will occur and you will need to call dispatch again to restart the process. + * In the default mode, when {@link #tickerMode} is false, the registry will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case + * no rescheduling will occur, and you will need to call dispatch again to restart the process. + *

+ * However, when {@link #tickerMode} is true, the registry will always reschedule continuously after the first ever call to {@link #dispatchAll()}. + *

+ * This will allow you to chain together {@link DataLoader} load calls like this : + *

{@code
+ *   CompletableFuture future = dataLoaderA.load("A")
+ *                                          .thenCompose(value -> dataLoaderB.load(value));
+ * }
+ *

+ * However, it may mean your batching will not be as efficient as it might be. In environments + * like graphql this might mean you are too eager in fetching. The {@link DispatchPredicate} still runs to decide if + * dispatch should happen however in ticker mode it will be continuously rescheduled. + *

+ * When {@link #tickerMode} is true, you really SHOULD close the registry say at the end of a request otherwise you will leave a job + * on the {@link ScheduledExecutorService} that is continuously dispatching. *

* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and * call {@link #rescheduleNow()}. *

+ * By default, it uses a {@link Executors#newSingleThreadScheduledExecutor()}} to schedule the tasks. However, if you + * are creating a {@link ScheduledDataLoaderRegistry} per request you will want to look at sharing this {@link ScheduledExecutorService} + * to avoid creating a new thread per registry created. + *

* This code is currently marked as {@link ExperimentalApi} */ @ExperimentalApi @@ -32,6 +51,7 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A private final ScheduledExecutorService scheduledExecutorService; private final DispatchPredicate dispatchPredicate; private final Duration schedule; + private final boolean tickerMode; private volatile boolean closed; private ScheduledDataLoaderRegistry(Builder builder) { @@ -39,6 +59,7 @@ private ScheduledDataLoaderRegistry(Builder builder) { this.scheduledExecutorService = builder.scheduledExecutorService; this.dispatchPredicate = builder.dispatchPredicate; this.schedule = builder.schedule; + this.tickerMode = builder.tickerMode; this.closed = false; } @@ -57,6 +78,13 @@ public Duration getScheduleDuration() { return schedule; } + /** + * @return true of the registry is in ticker mode or false otherwise + */ + public boolean isTickerMode() { + return tickerMode; + } + @Override public void dispatchAll() { dispatchAllWithCount(); @@ -68,11 +96,7 @@ public int dispatchAllWithCount() { for (Map.Entry> entry : dataLoaders.entrySet()) { DataLoader dataLoader = entry.getValue(); String key = entry.getKey(); - if (dispatchPredicate.test(key, dataLoader)) { - sum += dataLoader.dispatchWithCounts().getKeysCount(); - } else { - reschedule(key, dataLoader); - } + dispatchOrReschedule(key, dataLoader); } return sum; } @@ -111,9 +135,11 @@ private void reschedule(String key, DataLoader dataLoader) { } private void dispatchOrReschedule(String key, DataLoader dataLoader) { - if (dispatchPredicate.test(key, dataLoader)) { + boolean shouldDispatch = dispatchPredicate.test(key, dataLoader); + if (shouldDispatch) { dataLoader.dispatch(); - } else { + } + if (tickerMode || !shouldDispatch) { reschedule(key, dataLoader); } } @@ -134,6 +160,7 @@ public static class Builder { private DispatchPredicate dispatchPredicate = (key, dl) -> true; private Duration schedule = Duration.ofMillis(10); private final Map> dataLoaders = new HashMap<>(); + private boolean tickerMode = false; public Builder scheduledExecutorService(ScheduledExecutorService executorService) { this.scheduledExecutorService = nonNull(executorService); @@ -176,6 +203,20 @@ public Builder registerAll(DataLoaderRegistry otherRegistry) { return this; } + /** + * This sets ticker mode on the registry. When ticker mode is true the registry will + * continuously reschedule the data loaders for possible dispatching after the first call + * to dispatchAll. + * + * @param tickerMode true or false + * + * @return this builder for a fluent pattern + */ + public Builder tickerMode(boolean tickerMode) { + this.tickerMode = tickerMode; + return this; + } + /** * @return the newly built {@link ScheduledDataLoaderRegistry} */ diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 5c87148..d40fdc7 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -5,6 +5,7 @@ import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderOptions; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -31,6 +32,23 @@ public static BatchLoader keysAsValues(List> loadCalls) { }; } + public static BatchLoader keysAsValuesAsync(Duration delay) { + return keysAsValuesAsync(new ArrayList<>(), delay); + } + + public static BatchLoader keysAsValuesAsync(List> loadCalls, Duration delay) { + return keys -> CompletableFuture.supplyAsync(() -> { + snooze(delay.toMillis()); + List ks = new ArrayList<>(keys); + loadCalls.add(ks); + @SuppressWarnings("unchecked") + List values = keys.stream() + .map(k -> (V) k) + .collect(toList()); + return values; + }); + } + public static DataLoader idLoader() { return idLoader(null, new ArrayList<>()); } @@ -43,6 +61,14 @@ public static DataLoader idLoader(DataLoaderOptions options, List DataLoader idLoaderAsync(Duration delay) { + return idLoaderAsync(null, new ArrayList<>(), delay); + } + + public static DataLoader idLoaderAsync(DataLoaderOptions options, List> loadCalls, Duration delay) { + return DataLoaderFactory.newDataLoader(keysAsValuesAsync(loadCalls, delay), options); + } + public static Collection listFrom(int i, int max) { List ints = new ArrayList<>(); for (int j = i; j < max; j++) { @@ -55,7 +81,7 @@ public static CompletableFuture futureError() { return failedFuture(new IllegalStateException("Error")); } - public static void snooze(int millis) { + public static void snooze(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java index 527f419..18ba41e 100644 --- a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java @@ -1,6 +1,7 @@ package org.dataloader.registries; import junit.framework.TestCase; +import org.awaitility.core.ConditionTimeoutException; import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderRegistry; @@ -11,13 +12,17 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static org.awaitility.Awaitility.await; +import static org.awaitility.Duration.TWO_SECONDS; import static org.dataloader.fixtures.TestKit.keysAsValues; import static org.dataloader.fixtures.TestKit.snooze; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; public class ScheduledDataLoaderRegistryTest extends TestCase { @@ -257,4 +262,63 @@ public void test_close_is_a_one_way_door() { snooze(200); assertEquals(counter.get(), countThen + 1); } + + public void test_can_tick_after_first_dispatch_for_chain_data_loaders() { + + // delays much bigger than the tick rate will mean multiple calls to dispatch + DataLoader dlA = TestKit.idLoaderAsync(Duration.ofMillis(100)); + DataLoader dlB = TestKit.idLoaderAsync(Duration.ofMillis(200)); + + CompletableFuture chainedCF = dlA.load("AK1").thenCompose(dlB::load); + + AtomicBoolean done = new AtomicBoolean(); + chainedCF.whenComplete((v, t) -> done.set(true)); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .register("b", dlB) + .dispatchPredicate(alwaysDispatch) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) + .build(); + + assertThat(registry.isTickerMode(), equalTo(true)); + + registry.dispatchAll(); + + await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); + + registry.close(); + } + + public void test_chain_data_loaders_will_hang_if_not_in_ticker_mode() { + + // delays much bigger than the tick rate will mean multiple calls to dispatch + DataLoader dlA = TestKit.idLoaderAsync(Duration.ofMillis(100)); + DataLoader dlB = TestKit.idLoaderAsync(Duration.ofMillis(200)); + + CompletableFuture chainedCF = dlA.load("AK1").thenCompose(dlB::load); + + AtomicBoolean done = new AtomicBoolean(); + chainedCF.whenComplete((v, t) -> done.set(true)); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .register("b", dlB) + .dispatchPredicate(alwaysDispatch) + .schedule(Duration.ofMillis(10)) + .tickerMode(false) + .build(); + + assertThat(registry.isTickerMode(), equalTo(false)); + + registry.dispatchAll(); + + try { + await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); + fail("This should not have completed but rather timed out"); + } catch (ConditionTimeoutException expected) { + } + registry.close(); + } } \ No newline at end of file From 37c7a5f8fd88a430ddf724382fcae80efd0e789e Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 25 Sep 2023 16:07:08 +1000 Subject: [PATCH 07/18] This adds a ticker mode to ScheduledDataLoaderRegistry - added readme info --- README.md | 69 +++++++++++++++++++++++++++++++ src/test/java/ReadmeExamples.java | 36 +++++++++++++++- 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e48de1d..afbadb7 100644 --- a/README.md +++ b/README.md @@ -538,6 +538,75 @@ since it was last dispatched". The above acts as a kind of minimum batch depth, with a time overload. It won't dispatch if the loader depth is less than or equal to 10 but if 200ms pass it will dispatch. +## Chaining DataLoader calls + +It's natural to want to have chained `DataLoader` calls. + +```java + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); +``` + +However, the challenge here is how to be efficient in batching terms. + +This is discussed in detail in the https://github.com/graphql-java/java-dataloader/issues/54 issue. + +Since CompletableFuture's are async and can complete at some time in the future, when is the best time to call +`dispatch` again when a load call has completed to maximize batching? + +The most naive approach is to immediately dispatch the second chained call as follows : + +```java + CompletableFuture chainedWithImmediateDispatch = dataLoaderA.load("user1") + .thenCompose(userAsKey -> { + CompletableFuture loadB = dataLoaderB.load(userAsKey); + dataLoaderB.dispatch(); + return loadB; + }); +``` + +The above will work however the window of batching together multiple calls to `dataLoaderB` will be very small and since +it will likely result in batch sizes of 1. + +This is a very difficult problem to solve because you have to balance two competing design ideals which is to maximize the +batching window of secondary calls in a small window of time so you customer requests don't take longer than necessary. + +* If the batching window is wide you will maximize the number of keys presented to a `BatchLoader` but your request latency will increase. + +* If the batching window is narrow you will reduce your request latency, but also you will reduce the number of keys presented to a `BatchLoader`. + + +### ScheduledDataLoaderRegistry ticker mode + +The `ScheduledDataLoaderRegistry` offers one solution to this called "ticker mode" where it will continually reschedule secondary +`DataLoader` calls after the initial `dispatch()` call is made. + +The batch window of time is controlled by the schedule duration setup at when the `ScheduledDataLoaderRegistry` is created. + +```java + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dataLoaderA) + .register("b", dataLoaderB) + .scheduledExecutorService(executorService) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) // ticker mode is on + .build(); + + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); + +``` +When ticker mode is on the chained dataloader calls will complete but the batching window size will depend on how quickly +the first level of `DataLoader` calls returned compared to the `schedule` of the `ScheduledDataLoaderRegistry`. + +If you use ticker mode, then you MUST `registry.close()` on the `ScheduledDataLoaderRegistry` at the end of the request (say) otherwise +it will continue to reschedule tasks to the `ScheduledExecutorService` associated with the registry. + +You will want to look at sharing the `ScheduledExecutorService` in some way between requests when creating the `ScheduledDataLoaderRegistry` +otherwise you will be creating a thread per `ScheduledDataLoaderRegistry` instance created and with enough concurrent requests +you may create too many threads. ## Other information sources diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index e37550e..3127a43 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -18,11 +18,14 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import static java.lang.String.format; @@ -278,7 +281,7 @@ private void statsConfigExample() { DataLoader userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options); } - private void ScheduledDispatche() { + private void ScheduledDispatcher() { DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10) .or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200))); @@ -288,4 +291,35 @@ private void ScheduledDispatche() { .register("users", userDataLoader) .build(); } + + + DataLoader dataLoaderA = DataLoaderFactory.newDataLoader(userBatchLoader); + DataLoader dataLoaderB = DataLoaderFactory.newDataLoader(keys -> { + return CompletableFuture.completedFuture(Collections.singletonList(1L)); + }); + + private void ScheduledDispatcherChained() { + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); + + + CompletableFuture chainedWithImmediateDispatch = dataLoaderA.load("user1") + .thenCompose(userAsKey -> { + CompletableFuture loadB = dataLoaderB.load(userAsKey); + dataLoaderB.dispatch(); + return loadB; + }); + + + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dataLoaderA) + .register("b", dataLoaderB) + .scheduledExecutorService(executorService) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) // ticker mode is on + .build(); + + } } From 1d11d87dfce7f216aa3b4d63854ef4447b0e8111 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 25 Sep 2023 17:17:22 +1000 Subject: [PATCH 08/18] This adds a ticker mode to ScheduledDataLoaderRegistry - testing works bitches! Found a bug in the sum code that I refactored way --- .../registries/ScheduledDataLoaderRegistry.java | 8 +++++--- .../registries/ScheduledDataLoaderRegistryTest.java | 6 ++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 9f40d62..5b58aa6 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -96,7 +96,7 @@ public int dispatchAllWithCount() { for (Map.Entry> entry : dataLoaders.entrySet()) { DataLoader dataLoader = entry.getValue(); String key = entry.getKey(); - dispatchOrReschedule(key, dataLoader); + sum += dispatchOrReschedule(key, dataLoader); } return sum; } @@ -134,14 +134,16 @@ private void reschedule(String key, DataLoader dataLoader) { } } - private void dispatchOrReschedule(String key, DataLoader dataLoader) { + private int dispatchOrReschedule(String key, DataLoader dataLoader) { + int sum = 0; boolean shouldDispatch = dispatchPredicate.test(key, dataLoader); if (shouldDispatch) { - dataLoader.dispatch(); + sum = dataLoader.dispatchWithCounts().getKeysCount(); } if (tickerMode || !shouldDispatch) { reschedule(key, dataLoader); } + return sum; } /** diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java index 18ba41e..5e0cd9a 100644 --- a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java @@ -284,7 +284,8 @@ public void test_can_tick_after_first_dispatch_for_chain_data_loaders() { assertThat(registry.isTickerMode(), equalTo(true)); - registry.dispatchAll(); + int count = registry.dispatchAllWithCount(); + assertThat(count,equalTo(1)); await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); @@ -312,7 +313,8 @@ public void test_chain_data_loaders_will_hang_if_not_in_ticker_mode() { assertThat(registry.isTickerMode(), equalTo(false)); - registry.dispatchAll(); + int count = registry.dispatchAllWithCount(); + assertThat(count,equalTo(1)); try { await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); From 5ee388c1e2892ebe4946e8cef6deac7c9aa038e3 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Tue, 26 Sep 2023 09:10:40 +1000 Subject: [PATCH 09/18] This adds a ticker mode to ScheduledDataLoaderRegistry - testing works bitches! More doco based on PR feedback --- README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/README.md b/README.md index 5a1c78d..24a65f6 100644 --- a/README.md +++ b/README.md @@ -653,6 +653,26 @@ You will want to look at sharing the `ScheduledExecutorService` in some way betw otherwise you will be creating a thread per `ScheduledDataLoaderRegistry` instance created and with enough concurrent requests you may create too many threads. +### ScheduledDataLoaderRegistry dispatching algorithm + +When ticker mode is **false** the `ScheduledDataLoaderRegistry` algorithm is as follows : + +* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time +* Then for every `DataLoader` in the registry + * The `DispatchPredicate` is called to test if the data loader should be dispatched + * if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future + * If it returns **true**, then `dataLoader.dispatch()` is called and the dataloader is not rescheduled again +* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()` + +When ticker mode is **true** the `ScheduledDataLoaderRegistry` algorithm is as follows: + +* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time +* Then for every `DataLoader` in the registry + * The `DispatchPredicate` is called to test if the data loader should be dispatched + * if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future + * If it returns **true**, then `dataLoader.dispatch()` is called **and** a task is scheduled to re-evaluate this specific dataloader in the near future +* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()` + ## Other information sources - [Facebook DataLoader Github repo](https://github.com/facebook/dataloader) From 1151ccbf4b531c2dbd12dc298f3018535fdb4ebc Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Fri, 29 Sep 2023 20:54:30 +1000 Subject: [PATCH 10/18] Adds a predicate to DataLoaderRegistry and a per dataloader map of predicates is also possible --- .../org/dataloader/DataLoaderRegistry.java | 143 ++++++++++++++++-- .../registries/DispatchPredicate.java | 10 ++ .../ScheduledDataLoaderRegistry.java | 60 +------- 3 files changed, 142 insertions(+), 71 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 9b19c29..48e8d96 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -1,6 +1,7 @@ package org.dataloader; import org.dataloader.annotations.PublicApi; +import org.dataloader.registries.DispatchPredicate; import org.dataloader.stats.Statistics; import java.util.ArrayList; @@ -21,12 +22,17 @@ @PublicApi public class DataLoaderRegistry { protected final Map> dataLoaders = new ConcurrentHashMap<>(); + protected final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); + protected final DispatchPredicate dispatchPredicate; + public DataLoaderRegistry() { + this.dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; } - private DataLoaderRegistry(Builder builder) { + protected DataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); + this.dispatchPredicate = builder.dispatchPredicate; } @@ -43,6 +49,21 @@ public DataLoaderRegistry register(String key, DataLoader dataLoader) { return this; } + /** + * This will register a new dataloader and dispatch predicate associated with that data loader + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * @param dispatchPredicate the dispatch predicate to associate with this data loader + * + * @return this registry + */ + public DataLoaderRegistry register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + dataLoaders.put(key, dataLoader); + dataLoaderPredicates.put(dataLoader, dispatchPredicate); + return this; + } + /** * Computes a data loader if absent or return it if it was * already registered at that key. @@ -76,6 +97,8 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { this.dataLoaders.forEach(combined::register); registry.dataLoaders.forEach(combined::register); + combined.dataLoaderPredicates.putAll(this.dataLoaderPredicates); + combined.dataLoaderPredicates.putAll(registry.dataLoaderPredicates); return combined; } @@ -101,7 +124,10 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { * @return this registry */ public DataLoaderRegistry unregister(String key) { - dataLoaders.remove(key); + DataLoader dataLoader = dataLoaders.remove(key); + if (dataLoader != null) { + dataLoaderPredicates.remove(dataLoader); + } return this; } @@ -131,7 +157,7 @@ public Set getKeys() { * {@link org.dataloader.DataLoader}s */ public void dispatchAll() { - getDataLoaders().forEach(DataLoader::dispatch); + dispatchAllWithCount(); } /** @@ -142,8 +168,12 @@ public void dispatchAll() { */ public int dispatchAllWithCount() { int sum = 0; - for (DataLoader dataLoader : getDataLoaders()) { - sum += dataLoader.dispatchWithCounts().getKeysCount(); + for (Map.Entry> entry : dataLoaders.entrySet()) { + DataLoader dataLoader = entry.getValue(); + String key = entry.getKey(); + if (shouldDispatch(key, dataLoader)) { + sum += dataLoader.dispatchWithCounts().getKeysCount(); + } } return sum; } @@ -154,12 +184,59 @@ public int dispatchAllWithCount() { */ public int dispatchDepth() { int totalDispatchDepth = 0; - for (DataLoader dataLoader : getDataLoaders()) { - totalDispatchDepth += dataLoader.dispatchDepth(); + for (Map.Entry> entry : dataLoaders.entrySet()) { + DataLoader dataLoader = entry.getValue(); + String key = entry.getKey(); + if (shouldDispatch(key, dataLoader)) { + totalDispatchDepth += dataLoader.dispatchDepth(); + } } return totalDispatchDepth; } + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicate + */ + public void dispatchAllImmediately() { + dispatchAllWithCountImmediately(); + } + + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicate + * + * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. + */ + public int dispatchAllWithCountImmediately() { + int sum = 0; + for (Map.Entry> entry : dataLoaders.entrySet()) { + DataLoader dataLoader = entry.getValue(); + sum += dataLoader.dispatchWithCounts().getKeysCount(); + } + return sum; + } + + + /** + * Returns true if the dataloader has a predicate which returned true, OR the overall + * registry predicate returned true. + * + * @param dataLoaderKey the key in the dataloader map + * @param dataLoader the dataloader + * + * @return true if it should dispatch + */ + protected boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { + DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); + if (dispatchPredicate != null) { + if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { + return true; + } + } + return this.dispatchPredicate.test(dataLoaderKey, dataLoader); + } + /** * @return a combined set of statistics for all data loaders in this registry presented * as the sum of all their statistics @@ -175,13 +252,22 @@ public Statistics getStatistics() { /** * @return A builder of {@link DataLoaderRegistry}s */ - public static Builder newRegistry() { + public static Builder newRegistry() { + //noinspection rawtypes return new Builder(); } - public static class Builder { + public static class Builder> { private final Map> dataLoaders = new HashMap<>(); + private final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); + + private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; + + private B self() { + //noinspection unchecked + return (B) this; + } /** * This will register a new dataloader @@ -191,22 +277,51 @@ public static class Builder { * * @return this builder for a fluent pattern */ - public Builder register(String key, DataLoader dataLoader) { + public B register(String key, DataLoader dataLoader) { dataLoaders.put(key, dataLoader); - return this; + return self(); } /** - * This will combine together the data loaders in this builder with the ones + * This will register a new dataloader with a specific {@link DispatchPredicate} + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * @param dispatchPredicate the dispatch predicate + * + * @return this builder for a fluent pattern + */ + public B register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + register(key, dataLoader); + dataLoaderPredicates.put(dataLoader, dispatchPredicate); + return self(); + } + + /** + * This will combine the data loaders in this builder with the ones * from a previous {@link DataLoaderRegistry} * * @param otherRegistry the previous {@link DataLoaderRegistry} * * @return this builder for a fluent pattern */ - public Builder registerAll(DataLoaderRegistry otherRegistry) { + public B registerAll(DataLoaderRegistry otherRegistry) { dataLoaders.putAll(otherRegistry.dataLoaders); - return this; + dataLoaderPredicates.putAll(otherRegistry.dataLoaderPredicates); + return self(); + } + + /** + * This sets a predicate on the {@link DataLoaderRegistry} that will control + * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched. + * + * @param dispatchPredicate the predicate + * + * @return this builder for a fluent pattern + */ + public B dispatchPredicate(DispatchPredicate dispatchPredicate) { + this.dispatchPredicate = dispatchPredicate; + return self(); } /** diff --git a/src/main/java/org/dataloader/registries/DispatchPredicate.java b/src/main/java/org/dataloader/registries/DispatchPredicate.java index d5bd31b..45e1da0 100644 --- a/src/main/java/org/dataloader/registries/DispatchPredicate.java +++ b/src/main/java/org/dataloader/registries/DispatchPredicate.java @@ -10,6 +10,16 @@ */ @FunctionalInterface public interface DispatchPredicate { + + /** + * A predicate that always returns true + */ + DispatchPredicate DISPATCH_ALWAYS = (dataLoaderKey, dataLoader) -> true; + /** + * A predicate that always returns false + */ + DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> true; + /** * This predicate tests whether the data loader should be dispatched or not. * diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 4be317e..3e7a327 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -5,7 +5,6 @@ import org.dataloader.annotations.ExperimentalApi; import java.time.Duration; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -30,14 +29,12 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable { private final ScheduledExecutorService scheduledExecutorService; - private final DispatchPredicate dispatchPredicate; private final Duration schedule; private volatile boolean closed; private ScheduledDataLoaderRegistry(Builder builder) { - this.dataLoaders.putAll(builder.dataLoaders); + super(builder); this.scheduledExecutorService = builder.scheduledExecutorService; - this.dispatchPredicate = builder.dispatchPredicate; this.schedule = builder.schedule; this.closed = false; } @@ -77,24 +74,6 @@ public int dispatchAllWithCount() { return sum; } - /** - * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicate - */ - public void dispatchAllImmediately() { - super.dispatchAll(); - } - - /** - * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicate - * - * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. - */ - public int dispatchAllWithCountImmediately() { - return super.dispatchAllWithCount(); - } - /** * This will schedule a task to check the predicate and dispatch if true right now. It will not do * a pre check of the preodicate like {@link #dispatchAll()} would @@ -111,7 +90,7 @@ private void reschedule(String key, DataLoader dataLoader) { } private void dispatchOrReschedule(String key, DataLoader dataLoader) { - if (dispatchPredicate.test(key, dataLoader)) { + if (shouldDispatch(key, dataLoader)) { dataLoader.dispatch(); } else { reschedule(key, dataLoader); @@ -128,12 +107,10 @@ public static Builder newScheduledRegistry() { return new Builder(); } - public static class Builder { + public static class Builder extends DataLoaderRegistry.Builder { private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - private DispatchPredicate dispatchPredicate = (key, dl) -> true; private Duration schedule = Duration.ofMillis(10); - private final Map> dataLoaders = new HashMap<>(); public Builder scheduledExecutorService(ScheduledExecutorService executorService) { this.scheduledExecutorService = nonNull(executorService); @@ -145,37 +122,6 @@ public Builder schedule(Duration schedule) { return this; } - public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { - this.dispatchPredicate = nonNull(dispatchPredicate); - return this; - } - - /** - * This will register a new dataloader - * - * @param key the key to put the data loader under - * @param dataLoader the data loader to register - * - * @return this builder for a fluent pattern - */ - public Builder register(String key, DataLoader dataLoader) { - dataLoaders.put(key, dataLoader); - return this; - } - - /** - * This will combine together the data loaders in this builder with the ones - * from a previous {@link DataLoaderRegistry} - * - * @param otherRegistry the previous {@link DataLoaderRegistry} - * - * @return this builder for a fluent pattern - */ - public Builder registerAll(DataLoaderRegistry otherRegistry) { - dataLoaders.putAll(otherRegistry.getDataLoadersMap()); - return this; - } - /** * @return the newly built {@link ScheduledDataLoaderRegistry} */ From fb0a072f171e8037f2d82a21da6f19125e9a8b46 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sat, 30 Sep 2023 11:09:01 +1000 Subject: [PATCH 11/18] Adds a predicate to DataLoaderRegistry - added tests --- .../org/dataloader/DataLoaderRegistry.java | 42 ++-- .../registries/DispatchPredicate.java | 2 +- .../DataLoaderRegistryPredicateTest.java | 198 ++++++++++++++++++ .../java/org/dataloader/fixtures/TestKit.java | 11 + 4 files changed, 233 insertions(+), 20 deletions(-) create mode 100644 src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 48e8d96..aa01baa 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -15,7 +15,7 @@ import java.util.function.Function; /** - * This allows data loaders to be registered together into a single place so + * This allows data loaders to be registered together into a single place, so * they can be dispatched as one. It also allows you to retrieve data loaders by * name from a central place */ @@ -32,6 +32,7 @@ public DataLoaderRegistry() { protected DataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); + this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates); this.dispatchPredicate = builder.dispatchPredicate; } @@ -116,6 +117,20 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { return new LinkedHashMap<>(dataLoaders); } + /** + * @return the current dispatch predicate + */ + public DispatchPredicate getDispatchPredicate() { + return dispatchPredicate; + } + + /** + * @return a map of data loaders to specific dispatch predicates + */ + public Map, DispatchPredicate> getDataLoaderPredicates() { + return new LinkedHashMap<>(dataLoaderPredicates); + } + /** * This will unregister a new dataloader * @@ -153,7 +168,7 @@ public Set getKeys() { } /** - * This will called {@link org.dataloader.DataLoader#dispatch()} on each of the registered + * This will be called {@link org.dataloader.DataLoader#dispatch()} on each of the registered * {@link org.dataloader.DataLoader}s */ public void dispatchAll() { @@ -183,20 +198,12 @@ public int dispatchAllWithCount() { * {@link org.dataloader.DataLoader}s */ public int dispatchDepth() { - int totalDispatchDepth = 0; - for (Map.Entry> entry : dataLoaders.entrySet()) { - DataLoader dataLoader = entry.getValue(); - String key = entry.getKey(); - if (shouldDispatch(key, dataLoader)) { - totalDispatchDepth += dataLoader.dispatchDepth(); - } - } - return totalDispatchDepth; + return dataLoaders.values().stream().mapToInt(DataLoader::dispatchDepth).sum(); } /** * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicate + * without testing the predicates */ public void dispatchAllImmediately() { dispatchAllWithCountImmediately(); @@ -204,17 +211,14 @@ public void dispatchAllImmediately() { /** * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicate + * without testing the predicates * * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. */ public int dispatchAllWithCountImmediately() { - int sum = 0; - for (Map.Entry> entry : dataLoaders.entrySet()) { - DataLoader dataLoader = entry.getValue(); - sum += dataLoader.dispatchWithCounts().getKeysCount(); - } - return sum; + return dataLoaders.values().stream() + .mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount()) + .sum(); } diff --git a/src/main/java/org/dataloader/registries/DispatchPredicate.java b/src/main/java/org/dataloader/registries/DispatchPredicate.java index 45e1da0..247a51a 100644 --- a/src/main/java/org/dataloader/registries/DispatchPredicate.java +++ b/src/main/java/org/dataloader/registries/DispatchPredicate.java @@ -18,7 +18,7 @@ public interface DispatchPredicate { /** * A predicate that always returns false */ - DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> true; + DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> false; /** * This predicate tests whether the data loader should be dispatched or not. diff --git a/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java b/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java new file mode 100644 index 0000000..56e4f90 --- /dev/null +++ b/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java @@ -0,0 +1,198 @@ +package org.dataloader; + +import org.dataloader.registries.DispatchPredicate; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import static java.util.Arrays.asList; +import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.dataloader.fixtures.TestKit.asSet; +import static org.dataloader.registries.DispatchPredicate.DISPATCH_NEVER; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class DataLoaderRegistryPredicateTest { + final BatchLoader identityBatchLoader = CompletableFuture::completedFuture; + + static class CountingDispatchPredicate implements DispatchPredicate { + int count = 0; + int max = 0; + + public CountingDispatchPredicate(int max) { + this.max = max; + } + + @Override + public boolean test(String dataLoaderKey, DataLoader dataLoader) { + boolean shouldFire = count >= max; + count++; + return shouldFire; + } + } + + @Test + public void predicate_registration_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateA = new CountingDispatchPredicate(1); + DispatchPredicate predicateB = new CountingDispatchPredicate(2); + DispatchPredicate predicateC = new CountingDispatchPredicate(3); + + DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("a", dlA, predicateA) + .register("b", dlB, predicateB) + .register("c", dlC, predicateC) + .dispatchPredicate(predicateOverAll) + .build(); + + assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB, dlC))); + assertThat(registry.getDataLoadersMap().keySet(), equalTo(asSet("a", "b", "c"))); + assertThat(asSet(registry.getDataLoadersMap().values()), equalTo(asSet(dlA, dlB, dlC))); + assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll)); + assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB, predicateC))); + + // and unregister (fluently) + DataLoaderRegistry dlR = registry.unregister("c"); + assertThat(dlR, equalTo(registry)); + + assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB))); + assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll)); + assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB))); + + // direct on the registry works + registry.register("c", dlC, predicateC); + assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB, dlC))); + assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll)); + assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB, predicateC))); + + } + + @Test + public void predicate_firing_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateA = new CountingDispatchPredicate(1); + DispatchPredicate predicateB = new CountingDispatchPredicate(2); + DispatchPredicate predicateC = new CountingDispatchPredicate(3); + + DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("a", dlA, predicateA) + .register("b", dlB, predicateB) + .register("c", dlC, predicateC) + .dispatchPredicate(predicateOverAll) + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCount(); // first firing + // none should fire + assertThat(count, equalTo(0)); + assertThat(cfA.isDone(), equalTo(false)); + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // second firing + // one should fire + assertThat(count, equalTo(1)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfA.join(), equalTo("A")); + + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // third firing + assertThat(count, equalTo(1)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfB.join(), equalTo("B")); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // fourth firing + assertThat(count, equalTo(1)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfC.isDone(), equalTo(true)); + assertThat(cfC.join(), equalTo("C")); + } + + @Test + public void test_the_registry_overall_predicate_firing_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateOverAllOnThree = new CountingDispatchPredicate(3); + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("a", dlA, DISPATCH_NEVER) + .register("b", dlB, DISPATCH_NEVER) + .register("c", dlC, DISPATCH_NEVER) + .dispatchPredicate(predicateOverAllOnThree) + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCount(); // first firing + // none should fire + assertThat(count, equalTo(0)); + assertThat(cfA.isDone(), equalTo(false)); + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // second firing but the overall been asked 3 times already + assertThat(count, equalTo(3)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfC.isDone(), equalTo(true)); + } + + @Test + public void dispatch_immediate_firing_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateA = new CountingDispatchPredicate(1); + DispatchPredicate predicateB = new CountingDispatchPredicate(2); + DispatchPredicate predicateC = new CountingDispatchPredicate(3); + + DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); + + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register("a", dlA, predicateA) + .register("b", dlB, predicateB) + .register("c", dlC, predicateC) + .dispatchPredicate(predicateOverAll) + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCountImmediately(); // all should fire + assertThat(count, equalTo(3)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfA.join(), equalTo("A")); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfB.join(), equalTo("B")); + assertThat(cfC.isDone(), equalTo(true)); + assertThat(cfC.join(), equalTo("C")); + } + +} diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 5c87148..a26c18f 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -6,8 +6,11 @@ import org.dataloader.DataLoaderOptions; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static java.util.stream.Collectors.toList; @@ -67,4 +70,12 @@ public static void snooze(int millis) { public static List sort(Collection collection) { return collection.stream().sorted().collect(toList()); } + + public static Set asSet(T... elements) { + return new LinkedHashSet<>(Arrays.asList(elements)); + } + + public static Set asSet(Collection elements) { + return new LinkedHashSet<>(elements); + } } From c528455564dab0f849c45a5e5fbd80ed123bacf1 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sat, 30 Sep 2023 19:36:07 +1000 Subject: [PATCH 12/18] Adds a predicate to DataLoaderRegistry - added more tests --- .../dataloader/DataLoaderRegistryPredicateTest.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java b/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java index 56e4f90..579ad81 100644 --- a/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java +++ b/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java @@ -133,13 +133,13 @@ public void test_the_registry_overall_predicate_firing_works() { DataLoader dlB = newDataLoader(identityBatchLoader); DataLoader dlC = newDataLoader(identityBatchLoader); - DispatchPredicate predicateOverAllOnThree = new CountingDispatchPredicate(3); + DispatchPredicate predicateOnSix = new CountingDispatchPredicate(6); DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() .register("a", dlA, DISPATCH_NEVER) .register("b", dlB, DISPATCH_NEVER) .register("c", dlC, DISPATCH_NEVER) - .dispatchPredicate(predicateOverAllOnThree) + .dispatchPredicate(predicateOnSix) .build(); @@ -148,13 +148,18 @@ public void test_the_registry_overall_predicate_firing_works() { CompletableFuture cfC = dlC.load("C"); int count = registry.dispatchAllWithCount(); // first firing - // none should fire assertThat(count, equalTo(0)); assertThat(cfA.isDone(), equalTo(false)); assertThat(cfB.isDone(), equalTo(false)); assertThat(cfC.isDone(), equalTo(false)); - count = registry.dispatchAllWithCount(); // second firing but the overall been asked 3 times already + count = registry.dispatchAllWithCount(); // second firing but the overall been asked 6 times already + assertThat(count, equalTo(0)); + assertThat(cfA.isDone(), equalTo(false)); + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // third firing but the overall been asked 9 times already assertThat(count, equalTo(3)); assertThat(cfA.isDone(), equalTo(true)); assertThat(cfB.isDone(), equalTo(true)); From c0c6eef673bc0b981a35c5bf4d24765451e4f21f Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Tue, 3 Oct 2023 10:34:58 +1100 Subject: [PATCH 13/18] Adds a predicate to ScheduledDataLoaderRegistry - no longer in DLR --- .../org/dataloader/DataLoaderRegistry.java | 138 ++------------- .../org/dataloader/annotations/GuardedBy.java | 2 +- .../ScheduledDataLoaderRegistry.java | 164 +++++++++++++++++- ...duledDataLoaderRegistryPredicateTest.java} | 57 +++++- 4 files changed, 222 insertions(+), 139 deletions(-) rename src/test/java/org/dataloader/{DataLoaderRegistryPredicateTest.java => registries/ScheduledDataLoaderRegistryPredicateTest.java} (77%) diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index aa01baa..3128d2c 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -1,7 +1,6 @@ package org.dataloader; import org.dataloader.annotations.PublicApi; -import org.dataloader.registries.DispatchPredicate; import org.dataloader.stats.Statistics; import java.util.ArrayList; @@ -22,18 +21,12 @@ @PublicApi public class DataLoaderRegistry { protected final Map> dataLoaders = new ConcurrentHashMap<>(); - protected final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); - protected final DispatchPredicate dispatchPredicate; - public DataLoaderRegistry() { - this.dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; } protected DataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); - this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates); - this.dispatchPredicate = builder.dispatchPredicate; } @@ -50,21 +43,6 @@ public DataLoaderRegistry register(String key, DataLoader dataLoader) { return this; } - /** - * This will register a new dataloader and dispatch predicate associated with that data loader - * - * @param key the key to put the data loader under - * @param dataLoader the data loader to register - * @param dispatchPredicate the dispatch predicate to associate with this data loader - * - * @return this registry - */ - public DataLoaderRegistry register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { - dataLoaders.put(key, dataLoader); - dataLoaderPredicates.put(dataLoader, dispatchPredicate); - return this; - } - /** * Computes a data loader if absent or return it if it was * already registered at that key. @@ -98,8 +76,6 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { this.dataLoaders.forEach(combined::register); registry.dataLoaders.forEach(combined::register); - combined.dataLoaderPredicates.putAll(this.dataLoaderPredicates); - combined.dataLoaderPredicates.putAll(registry.dataLoaderPredicates); return combined; } @@ -117,20 +93,6 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { return new LinkedHashMap<>(dataLoaders); } - /** - * @return the current dispatch predicate - */ - public DispatchPredicate getDispatchPredicate() { - return dispatchPredicate; - } - - /** - * @return a map of data loaders to specific dispatch predicates - */ - public Map, DispatchPredicate> getDataLoaderPredicates() { - return new LinkedHashMap<>(dataLoaderPredicates); - } - /** * This will unregister a new dataloader * @@ -139,10 +101,7 @@ public DispatchPredicate getDispatchPredicate() { * @return this registry */ public DataLoaderRegistry unregister(String key) { - DataLoader dataLoader = dataLoaders.remove(key); - if (dataLoader != null) { - dataLoaderPredicates.remove(dataLoader); - } + dataLoaders.remove(key); return this; } @@ -168,11 +127,11 @@ public Set getKeys() { } /** - * This will be called {@link org.dataloader.DataLoader#dispatch()} on each of the registered + * This will called {@link org.dataloader.DataLoader#dispatch()} on each of the registered * {@link org.dataloader.DataLoader}s */ public void dispatchAll() { - dispatchAllWithCount(); + getDataLoaders().forEach(DataLoader::dispatch); } /** @@ -183,12 +142,8 @@ public void dispatchAll() { */ public int dispatchAllWithCount() { int sum = 0; - for (Map.Entry> entry : dataLoaders.entrySet()) { - DataLoader dataLoader = entry.getValue(); - String key = entry.getKey(); - if (shouldDispatch(key, dataLoader)) { - sum += dataLoader.dispatchWithCounts().getKeysCount(); - } + for (DataLoader dataLoader : getDataLoaders()) { + sum += dataLoader.dispatchWithCounts().getKeysCount(); } return sum; } @@ -198,47 +153,11 @@ public int dispatchAllWithCount() { * {@link org.dataloader.DataLoader}s */ public int dispatchDepth() { - return dataLoaders.values().stream().mapToInt(DataLoader::dispatchDepth).sum(); - } - - /** - * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicates - */ - public void dispatchAllImmediately() { - dispatchAllWithCountImmediately(); - } - - /** - * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicates - * - * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. - */ - public int dispatchAllWithCountImmediately() { - return dataLoaders.values().stream() - .mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount()) - .sum(); - } - - - /** - * Returns true if the dataloader has a predicate which returned true, OR the overall - * registry predicate returned true. - * - * @param dataLoaderKey the key in the dataloader map - * @param dataLoader the dataloader - * - * @return true if it should dispatch - */ - protected boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { - DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); - if (dispatchPredicate != null) { - if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { - return true; - } + int totalDispatchDepth = 0; + for (DataLoader dataLoader : getDataLoaders()) { + totalDispatchDepth += dataLoader.dispatchDepth(); } - return this.dispatchPredicate.test(dataLoaderKey, dataLoader); + return totalDispatchDepth; } /** @@ -256,19 +175,15 @@ public Statistics getStatistics() { /** * @return A builder of {@link DataLoaderRegistry}s */ - public static Builder newRegistry() { - //noinspection rawtypes + public static Builder newRegistry() { return new Builder(); } public static class Builder> { private final Map> dataLoaders = new HashMap<>(); - private final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); - - private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; - private B self() { + protected B self() { //noinspection unchecked return (B) this; } @@ -287,22 +202,7 @@ public B register(String key, DataLoader dataLoader) { } /** - * This will register a new dataloader with a specific {@link DispatchPredicate} - * - * @param key the key to put the data loader under - * @param dataLoader the data loader to register - * @param dispatchPredicate the dispatch predicate - * - * @return this builder for a fluent pattern - */ - public B register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { - register(key, dataLoader); - dataLoaderPredicates.put(dataLoader, dispatchPredicate); - return self(); - } - - /** - * This will combine the data loaders in this builder with the ones + * This will combine together the data loaders in this builder with the ones * from a previous {@link DataLoaderRegistry} * * @param otherRegistry the previous {@link DataLoaderRegistry} @@ -311,20 +211,6 @@ public B register(String key, DataLoader dataLoader, DispatchPredicate dis */ public B registerAll(DataLoaderRegistry otherRegistry) { dataLoaders.putAll(otherRegistry.dataLoaders); - dataLoaderPredicates.putAll(otherRegistry.dataLoaderPredicates); - return self(); - } - - /** - * This sets a predicate on the {@link DataLoaderRegistry} that will control - * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched. - * - * @param dispatchPredicate the predicate - * - * @return this builder for a fluent pattern - */ - public B dispatchPredicate(DispatchPredicate dispatchPredicate) { - this.dispatchPredicate = dispatchPredicate; return self(); } diff --git a/src/main/java/org/dataloader/annotations/GuardedBy.java b/src/main/java/org/dataloader/annotations/GuardedBy.java index c26b2ef..85c5765 100644 --- a/src/main/java/org/dataloader/annotations/GuardedBy.java +++ b/src/main/java/org/dataloader/annotations/GuardedBy.java @@ -15,7 +15,7 @@ public @interface GuardedBy { /** - * The lock that should be held. + * @return The lock that should be held. */ String value(); } diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 3e7a327..e86b93e 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -5,7 +5,9 @@ import org.dataloader.annotations.ExperimentalApi; import java.time.Duration; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -28,6 +30,8 @@ @ExperimentalApi public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable { + private final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); + private final DispatchPredicate dispatchPredicate; private final ScheduledExecutorService scheduledExecutorService; private final Duration schedule; private volatile boolean closed; @@ -37,6 +41,8 @@ private ScheduledDataLoaderRegistry(Builder builder) { this.scheduledExecutorService = builder.scheduledExecutorService; this.schedule = builder.schedule; this.closed = false; + this.dispatchPredicate = builder.dispatchPredicate; + this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates); } /** @@ -54,6 +60,86 @@ public Duration getScheduleDuration() { return schedule; } + /** + * This will combine all the current data loaders in this registry and all the data loaders from the specified registry + * and return a new combined registry + * + * @param registry the registry to combine into this registry + * + * @return a new combined registry + */ + public DataLoaderRegistry combine(DataLoaderRegistry registry) { + Builder combinedBuilder = ScheduledDataLoaderRegistry.newScheduledRegistry() + .dispatchPredicate(this.dispatchPredicate); + combinedBuilder.registerAll(this); + combinedBuilder.registerAll(registry); + return combinedBuilder.build(); + } + + + /** + * This will unregister a new dataloader + * + * @param key the key of the data loader to unregister + * + * @return this registry + */ + public ScheduledDataLoaderRegistry unregister(String key) { + DataLoader dataLoader = dataLoaders.remove(key); + if (dataLoader != null) { + dataLoaderPredicates.remove(dataLoader); + } + return this; + } + + /** + * @return the current dispatch predicate + */ + public DispatchPredicate getDispatchPredicate() { + return dispatchPredicate; + } + + /** + * @return a map of data loaders to specific dispatch predicates + */ + public Map, DispatchPredicate> getDataLoaderPredicates() { + return new LinkedHashMap<>(dataLoaderPredicates); + } + + /** + * This will register a new dataloader and dispatch predicate associated with that data loader + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * @param dispatchPredicate the dispatch predicate to associate with this data loader + * + * @return this registry + */ + public DataLoaderRegistry register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + dataLoaders.put(key, dataLoader); + dataLoaderPredicates.put(dataLoader, dispatchPredicate); + return this; + } + + /** + * Returns true if the dataloader has a predicate which returned true, OR the overall + * registry predicate returned true. + * + * @param dataLoaderKey the key in the dataloader map + * @param dataLoader the dataloader + * + * @return true if it should dispatch + */ + private boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { + DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); + if (dispatchPredicate != null) { + if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { + return true; + } + } + return this.dispatchPredicate.test(dataLoaderKey, dataLoader); + } + @Override public void dispatchAll() { dispatchAllWithCount(); @@ -65,7 +151,7 @@ public int dispatchAllWithCount() { for (Map.Entry> entry : dataLoaders.entrySet()) { DataLoader dataLoader = entry.getValue(); String key = entry.getKey(); - if (dispatchPredicate.test(key, dataLoader)) { + if (shouldDispatch(key, dataLoader)) { sum += dataLoader.dispatchWithCounts().getKeysCount(); } else { reschedule(key, dataLoader); @@ -74,6 +160,28 @@ public int dispatchAllWithCount() { return sum; } + + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicates + */ + public void dispatchAllImmediately() { + dispatchAllWithCountImmediately(); + } + + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicates + * + * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. + */ + public int dispatchAllWithCountImmediately() { + return dataLoaders.values().stream() + .mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount()) + .sum(); + } + + /** * This will schedule a task to check the predicate and dispatch if true right now. It will not do * a pre check of the preodicate like {@link #dispatchAll()} would @@ -112,14 +220,64 @@ public static class Builder extends DataLoaderRegistry.Builder, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); + + private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; + public Builder scheduledExecutorService(ScheduledExecutorService executorService) { this.scheduledExecutorService = nonNull(executorService); - return this; + return self(); } public Builder schedule(Duration schedule) { this.schedule = schedule; - return this; + return self(); + } + + + /** + * This will register a new dataloader with a specific {@link DispatchPredicate} + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * @param dispatchPredicate the dispatch predicate + * + * @return this builder for a fluent pattern + */ + public Builder register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + register(key, dataLoader); + dataLoaderPredicates.put(dataLoader, dispatchPredicate); + return self(); + } + + /** + * This will combine the data loaders in this builder with the ones + * from a previous {@link DataLoaderRegistry} + * + * @param otherRegistry the previous {@link DataLoaderRegistry} + * + * @return this builder for a fluent pattern + */ + public Builder registerAll(DataLoaderRegistry otherRegistry) { + super.registerAll(otherRegistry); + if (otherRegistry instanceof ScheduledDataLoaderRegistry) { + ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry) otherRegistry; + dataLoaderPredicates.putAll(other.dataLoaderPredicates); + } + return self(); + } + + /** + * This sets a predicate on the {@link DataLoaderRegistry} that will control + * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched. + * + * @param dispatchPredicate the predicate + * + * @return this builder for a fluent pattern + */ + public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { + this.dispatchPredicate = dispatchPredicate; + return self(); } /** diff --git a/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java similarity index 77% rename from src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java rename to src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java index 579ad81..43da82f 100644 --- a/src/test/java/org/dataloader/DataLoaderRegistryPredicateTest.java +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java @@ -1,18 +1,23 @@ -package org.dataloader; +package org.dataloader.registries; -import org.dataloader.registries.DispatchPredicate; +import org.dataloader.BatchLoader; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderRegistry; import org.junit.Test; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import static java.util.Arrays.asList; +import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderFactory.newDataLoader; import static org.dataloader.fixtures.TestKit.asSet; import static org.dataloader.registries.DispatchPredicate.DISPATCH_NEVER; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -public class DataLoaderRegistryPredicateTest { +public class ScheduledDataLoaderRegistryPredicateTest { final BatchLoader identityBatchLoader = CompletableFuture::completedFuture; static class CountingDispatchPredicate implements DispatchPredicate { @@ -43,7 +48,7 @@ public void predicate_registration_works() { DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); - DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() .register("a", dlA, predicateA) .register("b", dlB, predicateB) .register("c", dlC, predicateC) @@ -82,13 +87,14 @@ public void predicate_firing_works() { DispatchPredicate predicateB = new CountingDispatchPredicate(2); DispatchPredicate predicateC = new CountingDispatchPredicate(3); - DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); + DispatchPredicate predicateOnTen = new CountingDispatchPredicate(10); - DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() .register("a", dlA, predicateA) .register("b", dlB, predicateB) .register("c", dlC, predicateC) - .dispatchPredicate(predicateOverAll) + .dispatchPredicate(predicateOnTen) + .schedule(Duration.ofHours(1000)) // make this so long its never rescheduled .build(); @@ -135,11 +141,12 @@ public void test_the_registry_overall_predicate_firing_works() { DispatchPredicate predicateOnSix = new CountingDispatchPredicate(6); - DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() .register("a", dlA, DISPATCH_NEVER) .register("b", dlB, DISPATCH_NEVER) .register("c", dlC, DISPATCH_NEVER) .dispatchPredicate(predicateOnSix) + .schedule(Duration.ofHours(1000)) .build(); @@ -178,11 +185,12 @@ public void dispatch_immediate_firing_works() { DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); - DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() .register("a", dlA, predicateA) .register("b", dlB, predicateB) .register("c", dlC, predicateC) .dispatchPredicate(predicateOverAll) + .schedule(Duration.ofHours(1000)) .build(); @@ -200,4 +208,35 @@ public void dispatch_immediate_firing_works() { assertThat(cfC.join(), equalTo("C")); } + @Test + public void test_the_registry_overall_predicate_firing_works_when_on_schedule() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateOnTwenty = new CountingDispatchPredicate(20); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA, DISPATCH_NEVER) + .register("b", dlB, DISPATCH_NEVER) + .register("c", dlC, DISPATCH_NEVER) + .dispatchPredicate(predicateOnTwenty) + .schedule(Duration.ofMillis(5)) + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCount(); // first firing + assertThat(count, equalTo(0)); + + // the calls will be rescheduled until eventually the counting predicate returns true + await().until(cfA::isDone, is(true)); + + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfC.isDone(), equalTo(true)); + } } From 1c8d48c33f9e8416411f4ab995ab1bceef05d8da Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Tue, 3 Oct 2023 12:50:14 +1100 Subject: [PATCH 14/18] Adds a predicate to ScheduledDataLoaderRegistry - no longer in DLR - code tweaks --- .../registries/ScheduledDataLoaderRegistry.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index e86b93e..b109974 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -68,7 +68,7 @@ public Duration getScheduleDuration() { * * @return a new combined registry */ - public DataLoaderRegistry combine(DataLoaderRegistry registry) { + public ScheduledDataLoaderRegistry combine(DataLoaderRegistry registry) { Builder combinedBuilder = ScheduledDataLoaderRegistry.newScheduledRegistry() .dispatchPredicate(this.dispatchPredicate); combinedBuilder.registerAll(this); @@ -115,7 +115,7 @@ public DispatchPredicate getDispatchPredicate() { * * @return this registry */ - public DataLoaderRegistry register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + public ScheduledDataLoaderRegistry register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { dataLoaders.put(key, dataLoader); dataLoaderPredicates.put(dataLoader, dispatchPredicate); return this; @@ -206,8 +206,8 @@ private void dispatchOrReschedule(String key, DataLoader dataLoader) { } /** - * By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()} - * and a schedule duration of 10 milli seconds. + * By default, this will create use a {@link Executors#newSingleThreadScheduledExecutor()} + * and a schedule duration of 10 milliseconds. * * @return A builder of {@link ScheduledDataLoaderRegistry}s */ From 69528f1c41464fe312431187f058132c7c830ace Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Wed, 4 Oct 2023 09:57:00 +1100 Subject: [PATCH 15/18] Adds a predicate to ScheduledDataLoaderRegistry - removed generic builders --- .../org/dataloader/DataLoaderRegistry.java | 17 +++------ .../ScheduledDataLoaderRegistry.java | 37 +++++++++++++------ 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 3128d2c..0bc54cb 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -25,7 +25,7 @@ public class DataLoaderRegistry { public DataLoaderRegistry() { } - protected DataLoaderRegistry(Builder builder) { + private DataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); } @@ -179,15 +179,10 @@ public static Builder newRegistry() { return new Builder(); } - public static class Builder> { + public static class Builder { private final Map> dataLoaders = new HashMap<>(); - protected B self() { - //noinspection unchecked - return (B) this; - } - /** * This will register a new dataloader * @@ -196,9 +191,9 @@ protected B self() { * * @return this builder for a fluent pattern */ - public B register(String key, DataLoader dataLoader) { + public Builder register(String key, DataLoader dataLoader) { dataLoaders.put(key, dataLoader); - return self(); + return this; } /** @@ -209,9 +204,9 @@ public B register(String key, DataLoader dataLoader) { * * @return this builder for a fluent pattern */ - public B registerAll(DataLoaderRegistry otherRegistry) { + public Builder registerAll(DataLoaderRegistry otherRegistry) { dataLoaders.putAll(otherRegistry.dataLoaders); - return self(); + return this; } /** diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index b109974..8ad5ecb 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -37,7 +37,8 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A private volatile boolean closed; private ScheduledDataLoaderRegistry(Builder builder) { - super(builder); + super(); + this.dataLoaders.putAll(builder.dataLoaders); this.scheduledExecutorService = builder.scheduledExecutorService; this.schedule = builder.schedule; this.closed = false; @@ -215,23 +216,35 @@ public static Builder newScheduledRegistry() { return new Builder(); } - public static class Builder extends DataLoaderRegistry.Builder { + public static class Builder { + private final Map> dataLoaders = new LinkedHashMap<>(); + private final Map, DispatchPredicate> dataLoaderPredicates = new LinkedHashMap<>(); + private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private Duration schedule = Duration.ofMillis(10); - private final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); - - private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; - public Builder scheduledExecutorService(ScheduledExecutorService executorService) { this.scheduledExecutorService = nonNull(executorService); - return self(); + return this; } public Builder schedule(Duration schedule) { this.schedule = schedule; - return self(); + return this; + } + + /** + * This will register a new dataloader + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * + * @return this builder for a fluent pattern + */ + public Builder register(String key, DataLoader dataLoader) { + dataLoaders.put(key, dataLoader); + return this; } @@ -247,7 +260,7 @@ public Builder schedule(Duration schedule) { public Builder register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { register(key, dataLoader); dataLoaderPredicates.put(dataLoader, dispatchPredicate); - return self(); + return this; } /** @@ -259,12 +272,12 @@ public Builder register(String key, DataLoader dataLoader, DispatchPredica * @return this builder for a fluent pattern */ public Builder registerAll(DataLoaderRegistry otherRegistry) { - super.registerAll(otherRegistry); + dataLoaders.putAll(otherRegistry.getDataLoadersMap()); if (otherRegistry instanceof ScheduledDataLoaderRegistry) { ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry) otherRegistry; dataLoaderPredicates.putAll(other.dataLoaderPredicates); } - return self(); + return this; } /** @@ -277,7 +290,7 @@ public Builder registerAll(DataLoaderRegistry otherRegistry) { */ public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { this.dispatchPredicate = dispatchPredicate; - return self(); + return this; } /** From 3099f1a9032963e99146f61d4fe8ffdf6a813eb8 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Fri, 6 Oct 2023 10:17:53 +1100 Subject: [PATCH 16/18] Adds a predicate to ScheduledDataLoaderRegistry - improved doco --- .../ScheduledDataLoaderRegistry.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 8ad5ecb..5b1af76 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -15,12 +15,15 @@ import static org.dataloader.impl.Assertions.nonNull; /** - * This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called + * This {@link DataLoaderRegistry} will use {@link DispatchPredicate}s when {@link #dispatchAll()} is called * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled * to perform that predicate dispatch again via the {@link ScheduledExecutorService}. *

+ * It;s possible to have a {@link DispatchPredicate} per dataloader as well as a default {@link DispatchPredicate} for the + * whole {@link ScheduledDataLoaderRegistry}. + *

* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case - * no rescheduling will occur and you will need to call dispatch again to restart the process. + * no rescheduling will occur, and you will need to call dispatch again to restart the process. *

* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and * call {@link #rescheduleNow()}. @@ -94,17 +97,19 @@ public ScheduledDataLoaderRegistry unregister(String key) { } /** - * @return the current dispatch predicate + * @return a map of data loaders to specific dispatch predicates */ - public DispatchPredicate getDispatchPredicate() { - return dispatchPredicate; + public Map, DispatchPredicate> getDataLoaderPredicates() { + return new LinkedHashMap<>(dataLoaderPredicates); } /** - * @return a map of data loaders to specific dispatch predicates + * There is a default predicate that applies to the whole {@link ScheduledDataLoaderRegistry} + * + * @return the default dispatch predicate */ - public Map, DispatchPredicate> getDataLoaderPredicates() { - return new LinkedHashMap<>(dataLoaderPredicates); + public DispatchPredicate getDispatchPredicate() { + return dispatchPredicate; } /** @@ -281,7 +286,7 @@ public Builder registerAll(DataLoaderRegistry otherRegistry) { } /** - * This sets a predicate on the {@link DataLoaderRegistry} that will control + * This sets a default predicate on the {@link DataLoaderRegistry} that will control * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched. * * @param dispatchPredicate the predicate From f5d79b471989924919b16a9d262c050558590967 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sun, 8 Oct 2023 17:17:53 +1100 Subject: [PATCH 17/18] Merged in master --- .../ScheduledDataLoaderRegistry.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 2fbec3d..6d3b910 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -157,25 +157,6 @@ public ScheduledDataLoaderRegistry register(String key, DataLoader dataLoa return this; } - /** - * Returns true if the dataloader has a predicate which returned true, OR the overall - * registry predicate returned true. - * - * @param dataLoaderKey the key in the dataloader map - * @param dataLoader the dataloader - * - * @return true if it should dispatch - */ - private boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { - DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); - if (dispatchPredicate != null) { - if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { - return true; - } - } - return this.dispatchPredicate.test(dataLoaderKey, dataLoader); - } - @Override public void dispatchAll() { dispatchAllWithCount(); @@ -222,6 +203,25 @@ public void rescheduleNow() { dataLoaders.forEach(this::reschedule); } + /** + * Returns true if the dataloader has a predicate which returned true, OR the overall + * registry predicate returned true. + * + * @param dataLoaderKey the key in the dataloader map + * @param dataLoader the dataloader + * + * @return true if it should dispatch + */ + private boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { + DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); + if (dispatchPredicate != null) { + if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { + return true; + } + } + return this.dispatchPredicate.test(dataLoaderKey, dataLoader); + } + private void reschedule(String key, DataLoader dataLoader) { if (!closed) { Runnable runThis = () -> dispatchOrReschedule(key, dataLoader); From 46ce1736dd609005e800ebded87cdb046dc55a43 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sun, 8 Oct 2023 17:20:27 +1100 Subject: [PATCH 18/18] Merged in master - tweaked doco --- .../org/dataloader/registries/ScheduledDataLoaderRegistry.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 6d3b910..fada66d 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -42,7 +42,8 @@ *

* When {@link #tickerMode} is true, you really SHOULD close the registry say at the end of a request otherwise you will leave a job * on the {@link ScheduledExecutorService} that is continuously dispatching. - *

* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and + *

+ * If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and * call {@link #rescheduleNow()}. *

* By default, it uses a {@link Executors#newSingleThreadScheduledExecutor()}} to schedule the tasks. However, if you