Skip to content

Commit c0d42df

Browse files
committed
Predicates in DataLoaders
1 parent db61b14 commit c0d42df

File tree

5 files changed

+188
-8
lines changed

5 files changed

+188
-8
lines changed

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ Object getCacheKeyWithContext(K key, Object context) {
161161

162162
DispatchResult<V> dispatch() {
163163
boolean batchingEnabled = loaderOptions.batchingEnabled();
164+
if (!loaderOptions.getDispatchPredicate().test(null, dataLoader)) {
165+
return new DispatchResult<>(CompletableFuture.completedFuture(emptyList()), loaderQueue.size(), false);
166+
}
164167
//
165168
// we copy the pre-loaded set of futures ready for dispatch
166169
final List<K> keys = new ArrayList<>();
@@ -176,7 +179,7 @@ DispatchResult<V> dispatch() {
176179
lastDispatchTime.set(now());
177180
}
178181
if (!batchingEnabled || keys.isEmpty()) {
179-
return new DispatchResult<>(completedFuture(emptyList()), 0);
182+
return new DispatchResult<>(completedFuture(emptyList()), 0, false);
180183
}
181184
final int totalEntriesHandled = keys.size();
182185
//
@@ -197,7 +200,7 @@ DispatchResult<V> dispatch() {
197200
} else {
198201
futureList = dispatchQueueBatch(keys, callContexts, queuedFutures);
199202
}
200-
return new DispatchResult<>(futureList, totalEntriesHandled);
203+
return new DispatchResult<>(futureList, totalEntriesHandled, true);
201204
}
202205

203206
private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<CompletableFuture<V>> queuedFutures, List<Object> callContexts, int maxBatchSize) {

src/main/java/org/dataloader/DataLoaderOptions.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.dataloader.annotations.PublicApi;
2020
import org.dataloader.impl.Assertions;
21+
import org.dataloader.registries.DispatchPredicate;
2122
import org.dataloader.stats.NoOpStatisticsCollector;
2223
import org.dataloader.stats.StatisticsCollector;
2324

@@ -42,6 +43,7 @@ public class DataLoaderOptions {
4243
private CacheKey<?> cacheKeyFunction;
4344
private CacheMap<?, ?> cacheMap;
4445
private ValueCache<?, ?> valueCache;
46+
private DispatchPredicate dispatchPredicate;
4547
private int maxBatchSize;
4648
private Supplier<StatisticsCollector> statisticsCollector;
4749
private BatchLoaderContextProvider environmentProvider;
@@ -58,6 +60,7 @@ public DataLoaderOptions() {
5860
statisticsCollector = NoOpStatisticsCollector::new;
5961
environmentProvider = NULL_PROVIDER;
6062
valueCacheOptions = ValueCacheOptions.newOptions();
63+
dispatchPredicate = DispatchPredicate.dispatchAlways();
6164
}
6265

6366
/**
@@ -286,6 +289,25 @@ public DataLoaderOptions setValueCache(ValueCache<?, ?> valueCache) {
286289
return this;
287290
}
288291

292+
/**
293+
* @return the dispatch predicate of these options
294+
*/
295+
public DispatchPredicate getDispatchPredicate() {
296+
return dispatchPredicate;
297+
}
298+
299+
/**
300+
* Sets the {@link DispatchPredicate} to use for.
301+
*
302+
* @param dispatchPredicate the non-null DispatchPredicate to use
303+
*
304+
* @return the data loader options for fluent coding
305+
*/
306+
public DataLoaderOptions dispatchPredicate(DispatchPredicate dispatchPredicate) {
307+
this.dispatchPredicate = nonNull(dispatchPredicate);
308+
return this;
309+
}
310+
289311
/**
290312
* @return the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used
291313
*/

src/main/java/org/dataloader/DispatchResult.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,16 @@
1515
public class DispatchResult<T> {
1616
private final CompletableFuture<List<T>> futureList;
1717
private final int keysCount;
18+
private final boolean wasDispatched;
1819

1920
public DispatchResult(CompletableFuture<List<T>> futureList, int keysCount) {
21+
this(futureList, keysCount, true);
22+
}
23+
24+
public DispatchResult(CompletableFuture<List<T>> futureList, int keysCount, boolean wasDispatched) {
2025
this.futureList = futureList;
2126
this.keysCount = keysCount;
27+
this.wasDispatched = wasDispatched;
2228
}
2329

2430
public CompletableFuture<List<T>> getPromisedResults() {
@@ -28,4 +34,14 @@ public CompletableFuture<List<T>> getPromisedResults() {
2834
public int getKeysCount() {
2935
return keysCount;
3036
}
37+
38+
/**
39+
* If the {@link org.dataloader.registries.DispatchPredicate} associated with the dataloader
40+
* returns false, then the call to dispatch was not performed and this will return false.
41+
*
42+
* @return true of the dispatch call was actually made or false if it was not
43+
*/
44+
public boolean wasDispatched() {
45+
return wasDispatched;
46+
}
3147
}

src/main/java/org/dataloader/registries/DispatchPredicate.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66
import java.util.Objects;
77

88
/**
9-
* A predicate class used by {@link ScheduledDataLoaderRegistry} to decide whether to dispatch or not
9+
* A predicate class used by {@link ScheduledDataLoaderRegistry}s as well as by individual
10+
* {@link DataLoader}s to decide whether to dispatch or not.
1011
*/
1112
@FunctionalInterface
1213
public interface DispatchPredicate {
1314
/**
14-
* This predicate tests whether the data loader should be dispatched or not.
15+
* This predicate tests whether the data loader should be dispatched or not. If the predicate is associated direct to a {@link DataLoader}
16+
* then the dataLoaderKey parameter will be null.
1517
*
16-
* @param dataLoaderKey the key of the data loader when registered
18+
* @param dataLoaderKey the key of the data loader when registered or null if this is a predicate associated direct with a {@link DataLoader}
1719
* @param dataLoader the dataloader to dispatch
1820
*
1921
* @return true if the data loader SHOULD be dispatched
@@ -68,7 +70,7 @@ default DispatchPredicate or(DispatchPredicate other) {
6870
*
6971
* @param duration the length of time to check
7072
*
71-
* @return true if the data loader has not been dispatched in duration time
73+
* @return a predicate that returns true if the data loader has not been dispatched in duration time
7274
*/
7375
static DispatchPredicate dispatchIfLongerThan(Duration duration) {
7476
return (dataLoaderKey, dataLoader) -> {
@@ -79,14 +81,32 @@ static DispatchPredicate dispatchIfLongerThan(Duration duration) {
7981

8082
/**
8183
* This predicate will return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
82-
*
84+
* <p>
8385
* This will act as minimum batch size. There must be more than `depth` items queued for the predicate to return true.
8486
*
8587
* @param depth the value to be greater than
8688
*
87-
* @return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
89+
* @return a predicate that returns true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
8890
*/
8991
static DispatchPredicate dispatchIfDepthGreaterThan(int depth) {
9092
return (dataLoaderKey, dataLoader) -> dataLoader.dispatchDepth() > depth;
9193
}
94+
95+
/**
96+
* This predicate will return true always
97+
*
98+
* @return a predicate that returns true always
99+
*/
100+
static DispatchPredicate dispatchAlways() {
101+
return (dataLoaderKey, dataLoader) -> true;
102+
}
103+
104+
/**
105+
* This predicate will never return true
106+
*
107+
* @return a predicate that never returns true
108+
*/
109+
static DispatchPredicate dispatchNever() {
110+
return (dataLoaderKey, dataLoader) -> false;
111+
}
92112
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package org.dataloader;
2+
3+
import org.dataloader.registries.DispatchPredicate;
4+
import org.dataloader.stats.SimpleStatisticsCollector;
5+
import org.dataloader.stats.Statistics;
6+
import org.junit.Test;
7+
8+
import java.util.concurrent.CompletableFuture;
9+
10+
import static java.util.Arrays.asList;
11+
import static org.dataloader.DataLoaderFactory.newDataLoader;
12+
import static org.hamcrest.Matchers.equalTo;
13+
import static org.hamcrest.Matchers.notNullValue;
14+
import static org.junit.Assert.assertThat;
15+
16+
/**
17+
* Tests related to dispatching predicates.
18+
*/
19+
public class DataLoaderPredicateTest {
20+
21+
@Test
22+
public void the_predicate_will_prevent_loading() {
23+
BatchLoader<String, String> batchLoader = CompletableFuture::completedFuture;
24+
DataLoader<String, String> loader = newDataLoader(batchLoader,
25+
DataLoaderOptions.newOptions().setStatisticsCollector(SimpleStatisticsCollector::new)
26+
.dispatchPredicate(DispatchPredicate.dispatchNever())
27+
);
28+
29+
loader.load("A");
30+
loader.load("B");
31+
loader.loadMany(asList("C", "D"));
32+
33+
Statistics stats = loader.getStatistics();
34+
assertThat(stats.getLoadCount(), equalTo(4L));
35+
assertThat(stats.getBatchInvokeCount(), equalTo(0L));
36+
assertThat(stats.getBatchLoadCount(), equalTo(0L));
37+
assertThat(stats.getCacheHitCount(), equalTo(0L));
38+
39+
DispatchResult<String> dispatchResult = loader.dispatchWithCounts();
40+
assertThat(dispatchResult.wasDispatched(), equalTo(false));
41+
assertThat(dispatchResult.getKeysCount(), equalTo(4));
42+
43+
stats = loader.getStatistics();
44+
assertThat(stats.getLoadCount(), equalTo(4L));
45+
assertThat(stats.getBatchInvokeCount(), equalTo(0L));
46+
assertThat(stats.getBatchLoadCount(), equalTo(0L));
47+
48+
49+
loader.load("A");
50+
loader.load("B");
51+
52+
dispatchResult = loader.dispatchWithCounts();
53+
assertThat(dispatchResult.wasDispatched(), equalTo(false));
54+
assertThat(dispatchResult.getKeysCount(), equalTo(4));
55+
56+
stats = loader.getStatistics();
57+
assertThat(stats.getLoadCount(), equalTo(6L));
58+
assertThat(stats.getBatchInvokeCount(), equalTo(0L));
59+
assertThat(stats.getBatchLoadCount(), equalTo(0L));
60+
}
61+
62+
@Test
63+
public void the_predicate_will_allow_loading_by_default() {
64+
BatchLoader<String, String> batchLoader = CompletableFuture::completedFuture;
65+
DataLoader<String, String> loader = newDataLoader(batchLoader,
66+
DataLoaderOptions.newOptions().setStatisticsCollector(SimpleStatisticsCollector::new)
67+
.dispatchPredicate(DispatchPredicate.dispatchAlways())
68+
);
69+
70+
loader.load("A");
71+
loader.load("B");
72+
loader.loadMany(asList("C", "D"));
73+
74+
75+
DispatchResult<String> dispatchResult = loader.dispatchWithCounts();
76+
assertThat(dispatchResult.wasDispatched(), equalTo(true));
77+
assertThat(dispatchResult.getKeysCount(), equalTo(4));
78+
79+
Statistics stats = loader.getStatistics();
80+
assertThat(stats.getLoadCount(), equalTo(4L));
81+
assertThat(stats.getBatchInvokeCount(), equalTo(1L));
82+
assertThat(stats.getBatchLoadCount(), equalTo(4L));
83+
84+
85+
loader.load("E");
86+
loader.load("F");
87+
88+
dispatchResult = loader.dispatchWithCounts();
89+
assertThat(dispatchResult.wasDispatched(), equalTo(true));
90+
assertThat(dispatchResult.getKeysCount(), equalTo(2));
91+
92+
stats = loader.getStatistics();
93+
assertThat(stats.getLoadCount(), equalTo(6L));
94+
assertThat(stats.getBatchInvokeCount(), equalTo(2L));
95+
assertThat(stats.getBatchLoadCount(), equalTo(6L));
96+
}
97+
98+
@Test
99+
public void dataloader_options_have_a_default_which_is_always_on() {
100+
BatchLoader<String, String> batchLoader = CompletableFuture::completedFuture;
101+
DataLoaderOptions dataLoaderOptions = DataLoaderOptions.newOptions();
102+
103+
DispatchPredicate defaultPredicate = dataLoaderOptions.getDispatchPredicate();
104+
assertThat(defaultPredicate, notNullValue());
105+
assertThat(defaultPredicate.test(null, null), equalTo(true));
106+
107+
108+
DataLoader<String, String> loader = newDataLoader(batchLoader, dataLoaderOptions);
109+
110+
loader.load("A");
111+
loader.load("B");
112+
loader.loadMany(asList("C", "D"));
113+
114+
DispatchResult<String> dispatchResult = loader.dispatchWithCounts();
115+
assertThat(dispatchResult.wasDispatched(), equalTo(true));
116+
assertThat(dispatchResult.getKeysCount(), equalTo(4));
117+
118+
}
119+
}

0 commit comments

Comments
 (0)