Skip to content

Commit 0a27e76

Browse files
committed
introducing of a special CompletableFuture enabling scheduling of nested/chained DataLoader calls
1 parent bac784b commit 0a27e76

File tree

7 files changed

+516
-7
lines changed

7 files changed

+516
-7
lines changed

src/main/java/graphql/execution/DataLoaderDispatchStrategy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
import graphql.Internal;
44
import graphql.schema.DataFetcher;
5+
import graphql.schema.DataFetchingEnvironment;
56

67
import java.util.List;
8+
import java.util.function.Supplier;
79

810
@Internal
911
public interface DataLoaderDispatchStrategy {
@@ -44,7 +46,7 @@ default void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyP
4446
default void fieldFetched(ExecutionContext executionContext,
4547
ExecutionStrategyParameters executionStrategyParameters,
4648
DataFetcher<?> dataFetcher,
47-
Object fetchedValue) {
49+
Object fetchedValue, Supplier<DataFetchingEnvironment> dataFetchingEnvironment) {
4850

4951
}
5052

src/main/java/graphql/execution/Execution.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import graphql.schema.GraphQLObjectType;
3030
import graphql.schema.GraphQLSchema;
3131
import graphql.schema.impl.SchemaUtil;
32-
import org.jspecify.annotations.NonNull;
3332
import graphql.util.FpKit;
33+
import org.jspecify.annotations.NonNull;
3434
import org.reactivestreams.Publisher;
3535

3636
import java.util.Collections;
@@ -58,6 +58,8 @@ public class Execution {
5858
private final ValueUnboxer valueUnboxer;
5959
private final boolean doNotAutomaticallyDispatchDataLoader;
6060

61+
public static final String EXECUTION_CONTEXT_KEY = "__GraphQL_Java_ExecutionContext";
62+
6163
public Execution(ExecutionStrategy queryStrategy,
6264
ExecutionStrategy mutationStrategy,
6365
ExecutionStrategy subscriptionStrategy,
@@ -114,6 +116,8 @@ public CompletableFuture<ExecutionResult> execute(Document document, GraphQLSche
114116
.build();
115117

116118
executionContext.getGraphQLContext().put(ResultNodesInfo.RESULT_NODES_INFO, executionContext.getResultNodesInfo());
119+
executionContext.getGraphQLContext().put(EXECUTION_CONTEXT_KEY, executionContext);
120+
117121

118122
InstrumentationExecutionParameters parameters = new InstrumentationExecutionParameters(
119123
executionInput, graphQLSchema

src/main/java/graphql/execution/ExecutionStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext exec
496496
dataFetcher = instrumentation.instrumentDataFetcher(dataFetcher, instrumentationFieldFetchParams, executionContext.getInstrumentationState());
497497
dataFetcher = executionContext.getDataLoaderDispatcherStrategy().modifyDataFetcher(dataFetcher);
498498
Object fetchedObject = invokeDataFetcher(executionContext, parameters, fieldDef, dataFetchingEnvironment, dataFetcher);
499-
executionContext.getDataLoaderDispatcherStrategy().fieldFetched(executionContext, parameters, dataFetcher, fetchedObject);
499+
executionContext.getDataLoaderDispatcherStrategy().fieldFetched(executionContext, parameters, dataFetcher, fetchedObject, dataFetchingEnvironment);
500500
fetchCtx.onDispatched();
501501
fetchCtx.onFetchedValue(fetchedObject);
502502
// if it's a subscription, leave any reactive objects alone
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package graphql.execution.instrumentation.dataloader;
2+
3+
import graphql.ExperimentalApi;
4+
import graphql.Internal;
5+
import graphql.execution.DataLoaderDispatchStrategy;
6+
import graphql.execution.ExecutionContext;
7+
import graphql.schema.DataFetchingEnvironment;
8+
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.CountDownLatch;
11+
import java.util.function.Supplier;
12+
13+
import static graphql.execution.Execution.EXECUTION_CONTEXT_KEY;
14+
15+
@Internal
16+
public class DataLoaderCF<T> extends CompletableFuture<T> {
17+
final DataFetchingEnvironment dfe;
18+
final String dataLoaderName;
19+
final Object key;
20+
final CompletableFuture<Object> dataLoaderCF;
21+
22+
volatile CountDownLatch latch;
23+
24+
public DataLoaderCF(DataFetchingEnvironment dfe, String dataLoaderName, Object key) {
25+
this.dfe = dfe;
26+
this.dataLoaderName = dataLoaderName;
27+
this.key = key;
28+
if (dataLoaderName != null) {
29+
dataLoaderCF = dfe.getDataLoaderRegistry().getDataLoader(dataLoaderName).load(key);
30+
dataLoaderCF.whenComplete((value, throwable) -> {
31+
System.out.println("underlying DataLoader completed");
32+
if (throwable != null) {
33+
completeExceptionally(throwable);
34+
} else {
35+
complete((T) value);
36+
}
37+
// post completion hook
38+
if (latch != null) {
39+
latch.countDown();
40+
}
41+
});
42+
} else {
43+
dataLoaderCF = null;
44+
}
45+
}
46+
47+
DataLoaderCF() {
48+
this.dfe = null;
49+
this.dataLoaderName = null;
50+
this.key = null;
51+
dataLoaderCF = null;
52+
}
53+
54+
@Override
55+
public <U> CompletableFuture<U> newIncompleteFuture() {
56+
return new DataLoaderCF<>();
57+
}
58+
59+
public static boolean isDataLoaderCF(Object object) {
60+
return object instanceof DataLoaderCF;
61+
}
62+
63+
@ExperimentalApi
64+
public static <T> CompletableFuture<T> newDataLoaderCF(DataFetchingEnvironment dfe, String dataLoaderName, Object key) {
65+
DataLoaderCF<T> result = new DataLoaderCF<>(dfe, dataLoaderName, key);
66+
ExecutionContext executionContext = dfe.getGraphQlContext().get(EXECUTION_CONTEXT_KEY);
67+
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
68+
if (dataLoaderDispatcherStrategy instanceof PerLevelDataLoaderDispatchStrategy) {
69+
((PerLevelDataLoaderDispatchStrategy) dataLoaderDispatcherStrategy).newDataLoaderCF(result);
70+
}
71+
return result;
72+
}
73+
74+
75+
@ExperimentalApi
76+
public static <U> CompletableFuture<U> supplyAsyncDataLoaderCF(DataFetchingEnvironment env, Supplier<U> supplier) {
77+
DataLoaderCF<U> d = new DataLoaderCF<>(env, null, null);
78+
d.defaultExecutor().execute(() -> {
79+
d.complete(supplier.get());
80+
});
81+
return d;
82+
83+
}
84+
85+
@ExperimentalApi
86+
public static <U> CompletableFuture<U> wrap(DataFetchingEnvironment env, CompletableFuture<U> completableFuture) {
87+
DataLoaderCF<U> d = new DataLoaderCF<>(env, null, null);
88+
completableFuture.whenComplete((u, ex) -> {
89+
if (ex != null) {
90+
d.completeExceptionally(ex);
91+
} else {
92+
d.complete(u);
93+
}
94+
});
95+
return d;
96+
}
97+
98+
99+
}

src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java

Lines changed: 118 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,33 @@
77
import graphql.execution.ExecutionStrategyParameters;
88
import graphql.execution.FieldValueInfo;
99
import graphql.schema.DataFetcher;
10+
import graphql.schema.DataFetchingEnvironment;
1011
import graphql.util.LockKit;
1112
import org.dataloader.DataLoaderRegistry;
1213

14+
import java.util.ArrayList;
1315
import java.util.LinkedHashSet;
1416
import java.util.List;
17+
import java.util.Map;
1518
import java.util.Set;
19+
import java.util.concurrent.ConcurrentHashMap;
20+
import java.util.concurrent.CopyOnWriteArrayList;
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
import java.util.function.Supplier;
1627

1728
@Internal
1829
public class PerLevelDataLoaderDispatchStrategy implements DataLoaderDispatchStrategy {
1930

2031
private final CallStack callStack;
2132
private final ExecutionContext executionContext;
2233

34+
static final ScheduledExecutorService isolatedDLCFBatchWindowScheduler = Executors.newSingleThreadScheduledExecutor();
35+
static final int BATCH_WINDOW_NANO_SECONDS = 500_000;
36+
2337

2438
private static class CallStack {
2539

@@ -34,10 +48,27 @@ private static class CallStack {
3448

3549
private final Set<Integer> dispatchedLevels = new LinkedHashSet<>();
3650

51+
// fields only relevant when a DataLoaderCF is involved
52+
private final List<DataLoaderCF<?>> allDataLoaderCF = new CopyOnWriteArrayList<>();
53+
//TODO: maybe this should be cleaned up once the CF returned by these fields are completed
54+
// otherwise this will stick around until the whole request is finished
55+
private final Set<DataFetchingEnvironment> fieldsFinishedDispatching = ConcurrentHashMap.newKeySet();
56+
private final Map<Integer, Set<DataFetchingEnvironment>> levelToDFEWithDataLoaderCF = new ConcurrentHashMap<>();
57+
58+
private final Set<DataFetchingEnvironment> batchWindowOfIsolatedDfeToDispatch = ConcurrentHashMap.newKeySet();
59+
60+
private boolean batchWindowOpen = false;
61+
62+
3763
public CallStack() {
3864
expectedExecuteObjectCallsPerLevel.set(1, 1);
3965
}
4066

67+
public void addDataLoaderDFE(int level, DataFetchingEnvironment dfe) {
68+
levelToDFEWithDataLoaderCF.computeIfAbsent(level, k -> new LinkedHashSet<>()).add(dfe);
69+
}
70+
71+
4172
void increaseExpectedFetchCount(int level, int count) {
4273
expectedFetchCountPerLevel.increment(level, count);
4374
}
@@ -234,9 +265,13 @@ private int getObjectCountForList(List<FieldValueInfo> fieldValueInfos) {
234265
public void fieldFetched(ExecutionContext executionContext,
235266
ExecutionStrategyParameters executionStrategyParameters,
236267
DataFetcher<?> dataFetcher,
237-
Object fetchedValue) {
268+
Object fetchedValue,
269+
Supplier<DataFetchingEnvironment> dataFetchingEnvironment) {
238270
int level = executionStrategyParameters.getPath().getLevel();
239271
boolean dispatchNeeded = callStack.lock.callLocked(() -> {
272+
if (DataLoaderCF.isDataLoaderCF(fetchedValue)) {
273+
callStack.addDataLoaderDFE(level, dataFetchingEnvironment.get());
274+
}
240275
callStack.increaseFetchCount(level);
241276
return dispatchIfNeeded(level);
242277
});
@@ -275,9 +310,89 @@ private boolean levelReady(int level) {
275310
}
276311

277312
void dispatch(int level) {
278-
DataLoaderRegistry dataLoaderRegistry = executionContext.getDataLoaderRegistry();
279-
dataLoaderRegistry.dispatchAll();
313+
if (callStack.levelToDFEWithDataLoaderCF.size() > 0) {
314+
dispatchDLCFImpl(callStack.levelToDFEWithDataLoaderCF.get(level));
315+
} else {
316+
DataLoaderRegistry dataLoaderRegistry = executionContext.getDataLoaderRegistry();
317+
dataLoaderRegistry.dispatchAll();
318+
}
280319
}
281320

321+
322+
public void dispatchDLCFImpl(Set<DataFetchingEnvironment> dfeToDispatchSet) {
323+
324+
// filter out all DataLoaderCFS that are matching the fields we want to dispatch
325+
List<DataLoaderCF<?>> relevantDataLoaderCFs = new ArrayList<>();
326+
for (DataLoaderCF<?> dataLoaderCF : callStack.allDataLoaderCF) {
327+
if (dfeToDispatchSet.contains(dataLoaderCF.dfe)) {
328+
relevantDataLoaderCFs.add(dataLoaderCF);
329+
}
330+
}
331+
// we are cleaning up the list of all DataLoadersCFs
332+
callStack.allDataLoaderCF.removeAll(relevantDataLoaderCFs);
333+
334+
// means we are all done dispatching the fields
335+
if (relevantDataLoaderCFs.size() == 0) {
336+
callStack.fieldsFinishedDispatching.addAll(dfeToDispatchSet);
337+
return;
338+
}
339+
// we are dispatching all data loaders and waiting for all dataLoaderCFs to complete
340+
// and to finish their sync actions
341+
CountDownLatch countDownLatch = new CountDownLatch(relevantDataLoaderCFs.size());
342+
for (DataLoaderCF dlCF : relevantDataLoaderCFs) {
343+
dlCF.latch = countDownLatch;
344+
}
345+
// TODO: this should be done async or in a more regulated way with a configurable thread pool or so
346+
new Thread(() -> {
347+
try {
348+
// waiting until all sync codes for all DL CFs are run
349+
countDownLatch.await();
350+
} catch (InterruptedException e) {
351+
throw new RuntimeException(e);
352+
}
353+
// now we handle all new DataLoaders
354+
dispatchDLCFImpl(dfeToDispatchSet);
355+
}).start();
356+
// Only dispatching relevant data loaders
357+
for (DataLoaderCF dlCF : relevantDataLoaderCFs) {
358+
dlCF.dfe.getDataLoader(dlCF.dataLoaderName).dispatch();
359+
}
360+
// executionContext.getDataLoaderRegistry().dispatchAll();
361+
}
362+
363+
364+
public void newDataLoaderCF(DataLoaderCF<?> dataLoaderCF) {
365+
System.out.println("newDataLoaderCF");
366+
callStack.lock.runLocked(() -> {
367+
callStack.allDataLoaderCF.add(dataLoaderCF);
368+
});
369+
if (callStack.fieldsFinishedDispatching.contains(dataLoaderCF.dfe)) {
370+
System.out.println("isolated dispatch");
371+
dispatchIsolatedDataLoader(dataLoaderCF);
372+
}
373+
374+
}
375+
376+
private void dispatchIsolatedDataLoader(DataLoaderCF<?> dlCF) {
377+
callStack.lock.runLocked(() -> {
378+
callStack.batchWindowOfIsolatedDfeToDispatch.add(dlCF.dfe);
379+
if (!callStack.batchWindowOpen) {
380+
callStack.batchWindowOpen = true;
381+
AtomicReference<Set<DataFetchingEnvironment>> dfesToDispatch = new AtomicReference<>();
382+
Runnable runnable = () -> {
383+
callStack.lock.runLocked(() -> {
384+
dfesToDispatch.set(new LinkedHashSet<>(callStack.batchWindowOfIsolatedDfeToDispatch));
385+
callStack.batchWindowOfIsolatedDfeToDispatch.clear();
386+
callStack.batchWindowOpen = false;
387+
});
388+
dispatchDLCFImpl(dfesToDispatch.get());
389+
};
390+
isolatedDLCFBatchWindowScheduler.schedule(runnable, BATCH_WINDOW_NANO_SECONDS, TimeUnit.NANOSECONDS);
391+
}
392+
393+
});
394+
}
395+
396+
282397
}
283398

src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
import graphql.execution.ExecutionStrategyParameters;
88
import graphql.execution.FieldValueInfo;
99
import graphql.schema.DataFetcher;
10+
import graphql.schema.DataFetchingEnvironment;
1011
import graphql.util.LockKit;
1112
import org.dataloader.DataLoaderRegistry;
1213

1314
import java.util.LinkedHashSet;
1415
import java.util.List;
1516
import java.util.Set;
1617
import java.util.concurrent.atomic.AtomicBoolean;
18+
import java.util.function.Supplier;
1719

1820
/**
1921
* The execution of a query can be divided into 2 phases: first, the non-deferred fields are executed and only once
@@ -173,7 +175,7 @@ public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyPa
173175
public void fieldFetched(ExecutionContext executionContext,
174176
ExecutionStrategyParameters parameters,
175177
DataFetcher<?> dataFetcher,
176-
Object fetchedValue) {
178+
Object fetchedValue, Supplier<DataFetchingEnvironment> dataFetchingEnvironment) {
177179

178180
final boolean dispatchNeeded;
179181

0 commit comments

Comments
 (0)