Skip to content

Commit 15f6952

Browse files
committed
make dataloader work inside defer blocks
1 parent 149bcdc commit 15f6952

14 files changed

+513
-353
lines changed

src/main/java/graphql/execution/AsyncExecutionStrategy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
7272
for (FieldValueInfo completeValueInfo : completeValueInfos) {
7373
fieldValuesFutures.addObject(completeValueInfo.getFieldValueObject());
7474
}
75-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos);
75+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos, parameters);
7676
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
7777
fieldValuesFutures.await().whenComplete(handleResultsConsumer);
7878
}).exceptionally((ex) -> {
7979
// if there are any issues with combining/handling the field results,
8080
// complete the future at all costs and bubble up any thrown exception so
8181
// the execution does not hang.
82-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex);
82+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex, parameters);
8383
executionStrategyCtx.onFieldValuesException();
8484
overallResult.completeExceptionally(ex);
8585
return null;

src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,13 @@ private Object resolveSerialField(ExecutionContext executionContext,
7474
if (fieldWithInfo instanceof CompletableFuture) {
7575
//noinspection unchecked
7676
return ((CompletableFuture<FieldValueInfo>) fieldWithInfo).thenCompose(fvi -> {
77-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
77+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi), newParameters);
7878
CompletableFuture<Object> fieldValueFuture = fvi.getFieldValueFuture();
7979
return fieldValueFuture;
8080
});
8181
} else {
8282
FieldValueInfo fvi = (FieldValueInfo) fieldWithInfo;
83-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
83+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi), newParameters);
8484
return fvi.getFieldValueObject();
8585
}
8686
}

src/main/java/graphql/execution/DataLoaderDispatchStrategy.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ default void executionSerialStrategy(ExecutionContext executionContext, Executio
2222

2323
}
2424

25-
default void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList) {
25+
default void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList, ExecutionStrategyParameters parameters) {
2626

2727
}
2828

29-
default void executionStrategyOnFieldValuesException(Throwable t) {
29+
default void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) {
3030

3131
}
3232

@@ -39,6 +39,10 @@ default void executeObjectOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoL
3939

4040
}
4141

42+
default void deferredOnFieldValue(String resultKey, FieldValueInfo fieldValueInfo, Throwable throwable, ExecutionStrategyParameters parameters) {
43+
44+
}
45+
4246
default void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) {
4347

4448
}
@@ -59,4 +63,8 @@ default DataFetcher<?> modifyDataFetcher(DataFetcher<?> dataFetcher) {
5963
default void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, ExecutionStrategyParameters executionStrategyParameters) {
6064

6165
}
66+
67+
default void startIncrementalCall() {
68+
69+
}
6270
}

src/main/java/graphql/execution/Execution.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import graphql.ExecutionInput;
77
import graphql.ExecutionResult;
88
import graphql.ExecutionResultImpl;
9-
import graphql.ExperimentalApi;
109
import graphql.GraphQLContext;
1110
import graphql.GraphQLError;
1211
import graphql.Internal;
@@ -16,7 +15,6 @@
1615
import graphql.execution.instrumentation.InstrumentationState;
1716
import graphql.execution.instrumentation.dataloader.FallbackDataLoaderDispatchStrategy;
1817
import graphql.execution.instrumentation.dataloader.PerLevelDataLoaderDispatchStrategy;
19-
import graphql.execution.instrumentation.dataloader.PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch;
2018
import graphql.execution.instrumentation.parameters.InstrumentationExecuteOperationParameters;
2119
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters;
2220
import graphql.extensions.ExtensionsBuilder;
@@ -37,7 +35,6 @@
3735
import java.util.Collections;
3836
import java.util.List;
3937
import java.util.Map;
40-
import java.util.Optional;
4138
import java.util.concurrent.CompletableFuture;
4239
import java.util.function.Supplier;
4340

@@ -261,9 +258,9 @@ private DataLoaderDispatchStrategy createDataLoaderDispatchStrategy(ExecutionCon
261258
boolean deferEnabled = executionContext.hasIncrementalSupport();
262259

263260
// Dedicated strategy for defer support, for safety purposes.
264-
return deferEnabled ?
265-
new PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch(executionContext) :
266-
new PerLevelDataLoaderDispatchStrategy(executionContext);
261+
// return deferEnabled ?
262+
// new PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch(executionContext) :
263+
return new PerLevelDataLoaderDispatchStrategy(executionContext);
267264
} else {
268265
return new FallbackDataLoaderDispatchStrategy(executionContext);
269266
}

src/main/java/graphql/execution/ExecutionStrategy.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import graphql.EngineRunningState;
66
import graphql.ExecutionResult;
77
import graphql.ExecutionResultImpl;
8-
import graphql.ExperimentalApi;
98
import graphql.GraphQLError;
109
import graphql.Internal;
1110
import graphql.PublicSpi;
@@ -50,7 +49,6 @@
5049
import java.util.Collections;
5150
import java.util.List;
5251
import java.util.Map;
53-
import java.util.Optional;
5452
import java.util.OptionalInt;
5553
import java.util.concurrent.CompletableFuture;
5654
import java.util.concurrent.CompletionException;
@@ -300,7 +298,7 @@ DeferredExecutionSupport createDeferredExecutionSupport(ExecutionContext executi
300298
) {
301299
MergedSelectionSet fields = parameters.getFields();
302300

303-
executionContext.getIncrementalCallState().enqueue(deferredExecutionSupport.createCalls(parameters));
301+
executionContext.getIncrementalCallState().enqueue(deferredExecutionSupport.createCalls());
304302

305303
// Only non-deferred fields should be considered for calculating the expected size of futures.
306304
Async.CombinedBuilder<FieldValueInfo> futures = Async
@@ -400,7 +398,6 @@ private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext exec
400398
}
401399

402400
MergedField field = parameters.getField();
403-
String pathString = parameters.getPath().toString();
404401
GraphQLObjectType parentType = (GraphQLObjectType) parameters.getExecutionStepInfo().getUnwrappedNonNullType();
405402

406403
// if the DF (like PropertyDataFetcher) does not use the arguments or execution step info then dont build any
@@ -435,6 +432,7 @@ private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext exec
435432
.parentType(parentType)
436433
.selectionSet(fieldCollector)
437434
.queryDirectives(queryDirectives)
435+
.deferredCallContext(parameters.getDeferredCallContext())
438436
.build();
439437
});
440438

src/main/java/graphql/execution/ExecutionStrategyParameters.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public ExecutionStrategyParameters getParent() {
9494
* @return the deferred call context or null if we're not in the scope of a deferred call
9595
*/
9696
@Nullable
97+
@Internal
9798
public DeferredCallContext getDeferredCallContext() {
9899
return deferredCallContext;
99100
}

src/main/java/graphql/execution/incremental/DeferredCallContext.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,22 @@
1818
@Internal
1919
public class DeferredCallContext {
2020

21+
private final int startLevel;
22+
private final int fields;
23+
24+
public DeferredCallContext(int startLevel, int fields) {
25+
this.startLevel = startLevel;
26+
this.fields = fields;
27+
}
28+
29+
public int getStartLevel() {
30+
return startLevel;
31+
}
32+
33+
public int getFields() {
34+
return fields;
35+
}
36+
2137
private final List<GraphQLError> errors = new CopyOnWriteArrayList<>();
2238

2339
public void addErrors(List<GraphQLError> errors) {
@@ -34,4 +50,6 @@ public void addError(GraphQLError graphqlError) {
3450
public List<GraphQLError> getErrors() {
3551
return errors;
3652
}
53+
54+
3755
}

src/main/java/graphql/execution/incremental/DeferredExecutionSupport.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public interface DeferredExecutionSupport {
4545

4646
List<String> getNonDeferredFieldNames(List<String> allFieldNames);
4747

48-
Set<IncrementalCall<? extends IncrementalPayload>> createCalls(ExecutionStrategyParameters executionStrategyParameters);
48+
Set<IncrementalCall<? extends IncrementalPayload>> createCalls();
4949

5050
DeferredExecutionSupport NOOP = new DeferredExecutionSupport.NoOp();
5151

@@ -106,23 +106,25 @@ public List<String> getNonDeferredFieldNames(List<String> allFieldNames) {
106106
}
107107

108108
@Override
109-
public Set<IncrementalCall<? extends IncrementalPayload>> createCalls(ExecutionStrategyParameters executionStrategyParameters) {
109+
public Set<IncrementalCall<? extends IncrementalPayload>> createCalls() {
110110
ImmutableSet<DeferredExecution> deferredExecutions = deferredExecutionToFields.keySet();
111111
Set<IncrementalCall<? extends IncrementalPayload>> set = new HashSet<>(deferredExecutions.size());
112112
for (DeferredExecution deferredExecution : deferredExecutions) {
113-
set.add(this.createDeferredFragmentCall(deferredExecution, executionStrategyParameters));
113+
set.add(this.createDeferredFragmentCall(deferredExecution));
114114
}
115115
return set;
116116
}
117117

118-
private DeferredFragmentCall createDeferredFragmentCall(DeferredExecution deferredExecution, ExecutionStrategyParameters executionStrategyParameters) {
119-
DeferredCallContext deferredCallContext = new DeferredCallContext();
118+
private DeferredFragmentCall createDeferredFragmentCall(DeferredExecution deferredExecution) {
119+
int level = parameters.getPath().getLevel() + 1;
120+
System.out.println("new DeferredFragmentCall for level " + level + " with fields " + deferredFields.size());
121+
DeferredCallContext deferredCallContext = new DeferredCallContext(level, deferredFields.size());
120122

121123
List<MergedField> mergedFields = deferredExecutionToFields.get(deferredExecution);
122124

123125
List<Supplier<CompletableFuture<DeferredFragmentCall.FieldWithExecutionResult>>> calls = FpKit.arrayListSizedTo(mergedFields);
124126
for (MergedField currentField : mergedFields) {
125-
calls.add(this.createResultSupplier(currentField, deferredCallContext, executionStrategyParameters));
127+
calls.add(this.createResultSupplier(currentField, deferredCallContext));
126128
}
127129

128130
return new DeferredFragmentCall(
@@ -135,13 +137,12 @@ private DeferredFragmentCall createDeferredFragmentCall(DeferredExecution deferr
135137

136138
private Supplier<CompletableFuture<DeferredFragmentCall.FieldWithExecutionResult>> createResultSupplier(
137139
MergedField currentField,
138-
DeferredCallContext deferredCallContext,
139-
ExecutionStrategyParameters executionStrategyParameters
140+
DeferredCallContext deferredCallContext
140141
) {
141142
Map<String, MergedField> fields = new LinkedHashMap<>();
142143
fields.put(currentField.getResultKey(), currentField);
143144

144-
ExecutionStrategyParameters callParameters = parameters.transform(builder ->
145+
ExecutionStrategyParameters executionStrategyParameters = parameters.transform(builder ->
145146
{
146147
MergedSelectionSet mergedSelectionSet = MergedSelectionSet.newMergedSelectionSet().subFields(fields).build();
147148
ResultPath path = parameters.getPath().segment(currentField.getResultKey());
@@ -158,22 +159,23 @@ private Supplier<CompletableFuture<DeferredFragmentCall.FieldWithExecutionResult
158159

159160
instrumentation.beginDeferredField(executionContext.getInstrumentationState());
160161

162+
// todo: handle cached computations
161163
return dfCache.computeIfAbsent(
162164
currentField.getResultKey(),
163165
// The same field can be associated with multiple defer executions, so
164166
// we memoize the field resolution to avoid multiple calls to the same data fetcher
165167
key -> FpKit.interThreadMemoize(() -> {
166-
CompletableFuture<FieldValueInfo> fieldValueResult = resolveFieldWithInfoFn
167-
.apply(executionContext, callParameters);
168+
CompletableFuture<FieldValueInfo> fieldValueResult = resolveFieldWithInfoFn.apply(executionContext, executionStrategyParameters);
169+
170+
fieldValueResult.whenComplete((fieldValueInfo, throwable) -> {
171+
executionContext.getDataLoaderDispatcherStrategy().deferredOnFieldValue(currentField.getResultKey(), fieldValueInfo, throwable, executionStrategyParameters);
172+
});
168173

169-
CompletableFuture<ExecutionResult> executionResultCF = fieldValueResult
170-
.thenCompose(fvi -> {
171-
executionContext.getDataLoaderDispatcherStrategy().executeDeferredOnFieldValueInfo(fvi, executionStrategyParameters);
172174

173-
return fvi
174-
.getFieldValueFuture()
175-
.thenApply(fv -> ExecutionResultImpl.newExecutionResult().data(fv).build());
176-
}
175+
CompletableFuture<ExecutionResult> executionResultCF = fieldValueResult
176+
.thenCompose(fvi -> fvi
177+
.getFieldValueFuture()
178+
.thenApply(fv -> ExecutionResultImpl.newExecutionResult().data(fv).build())
177179
);
178180

179181
return executionResultCF
@@ -207,7 +209,7 @@ public List<String> getNonDeferredFieldNames(List<String> allFieldNames) {
207209
}
208210

209211
@Override
210-
public Set<IncrementalCall<? extends IncrementalPayload>> createCalls(ExecutionStrategyParameters executionStrategyParameters) {
212+
public Set<IncrementalCall<? extends IncrementalPayload>> createCalls() {
211213
return Collections.emptySet();
212214
}
213215
}

src/main/java/graphql/execution/incremental/IncrementalCallState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,5 @@ private Supplier<SingleSubscriberPublisher<DelayedIncrementalPartialResult>> cre
103103
public Publisher<DelayedIncrementalPartialResult> startDeferredCalls() {
104104
return publisher.get();
105105
}
106+
106107
}

0 commit comments

Comments
 (0)