Skip to content

Commit 677e436

Browse files
committed
simplify delayed dataloader dispatching by not using a batch window
1 parent e8028d9 commit 677e436

File tree

6 files changed

+12
-231
lines changed

6 files changed

+12
-231
lines changed

src/main/java/graphql/GraphQLUnusualConfiguration.java

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
11
package graphql;
22

33
import graphql.execution.ResponseMapFactory;
4-
import graphql.execution.instrumentation.dataloader.DelayedDataLoaderDispatcherExecutorFactory;
54
import graphql.introspection.GoodFaithIntrospection;
65
import graphql.parser.ParserOptions;
76
import graphql.schema.PropertyDataFetcherHelper;
87

9-
import java.time.Duration;
10-
118
import static graphql.Assert.assertNotNull;
12-
import static graphql.execution.instrumentation.dataloader.DataLoaderDispatchingContextKeys.DELAYED_DATA_LOADER_BATCH_WINDOW_SIZE_NANO_SECONDS;
13-
import static graphql.execution.instrumentation.dataloader.DataLoaderDispatchingContextKeys.DELAYED_DATA_LOADER_DISPATCHING_EXECUTOR_FACTORY;
149
import static graphql.execution.instrumentation.dataloader.DataLoaderDispatchingContextKeys.ENABLE_DATA_LOADER_CHAINING;
1510

1611
/**
@@ -364,42 +359,6 @@ public DataloaderConfig enableDataLoaderChaining(boolean enable) {
364359
return this;
365360
}
366361

367-
/**
368-
* @return the batch window duration size for delayed DataLoaders.
369-
*/
370-
public Duration delayedDataLoaderBatchWindowSize() {
371-
Long d = contextConfig.get(DELAYED_DATA_LOADER_BATCH_WINDOW_SIZE_NANO_SECONDS);
372-
return d != null ? Duration.ofNanos(d) : null;
373-
}
374-
375-
/**
376-
* Sets the batch window duration size for delayed DataLoaders.
377-
* That is for DataLoaders, that are not batched as part of the normal per level
378-
* dispatching, because they were created after the level was already dispatched.
379-
*/
380-
@ExperimentalApi
381-
public DataloaderConfig delayedDataLoaderBatchWindowSize(Duration batchWindowSize) {
382-
contextConfig.put(DELAYED_DATA_LOADER_BATCH_WINDOW_SIZE_NANO_SECONDS, batchWindowSize.toNanos());
383-
return this;
384-
}
385-
386-
/**
387-
* @return the instance of {@link DelayedDataLoaderDispatcherExecutorFactory} that is used to create the
388-
* {@link java.util.concurrent.ScheduledExecutorService} for the delayed DataLoader dispatching.
389-
*/
390-
public DelayedDataLoaderDispatcherExecutorFactory delayedDataLoaderExecutorFactory() {
391-
return contextConfig.get(DELAYED_DATA_LOADER_DISPATCHING_EXECUTOR_FACTORY);
392-
}
393-
394-
/**
395-
* Sets the instance of {@link DelayedDataLoaderDispatcherExecutorFactory} that is used to create the
396-
* {@link java.util.concurrent.ScheduledExecutorService} for the delayed DataLoader dispatching.
397-
*/
398-
@ExperimentalApi
399-
public DataloaderConfig delayedDataLoaderExecutorFactory(DelayedDataLoaderDispatcherExecutorFactory delayedDataLoaderDispatcherExecutorFactory) {
400-
contextConfig.put(DELAYED_DATA_LOADER_DISPATCHING_EXECUTOR_FACTORY, delayedDataLoaderDispatcherExecutorFactory);
401-
return this;
402-
}
403362
}
404363

405364
public static class ResponseMapFactoryConfig extends BaseContextConfig {

src/main/java/graphql/execution/instrumentation/dataloader/DataLoaderDispatchingContextKeys.java

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import graphql.Internal;
66
import org.jspecify.annotations.NullMarked;
77

8-
import java.time.Duration;
9-
108
/**
119
* GraphQLContext keys related to DataLoader dispatching.
1210
*/
@@ -16,26 +14,6 @@ public final class DataLoaderDispatchingContextKeys {
1614
private DataLoaderDispatchingContextKeys() {
1715
}
1816

19-
/**
20-
* In nano seconds, the batch window size for delayed DataLoaders.
21-
* That is for DataLoaders, that are not batched as part of the normal per level
22-
* dispatching, because they were created after the level was already dispatched.
23-
* <p>
24-
* Expect Long values
25-
* <p>
26-
* Default is 500_000 (0.5 ms)
27-
*/
28-
public static final String DELAYED_DATA_LOADER_BATCH_WINDOW_SIZE_NANO_SECONDS = "__GJ_delayed_data_loader_batch_window_size_nano_seconds";
29-
30-
/**
31-
* An instance of {@link DelayedDataLoaderDispatcherExecutorFactory} that is used to create the
32-
* {@link java.util.concurrent.ScheduledExecutorService} for the delayed DataLoader dispatching.
33-
* <p>
34-
* Default is one static executor thread pool with a single thread.
35-
*/
36-
public static final String DELAYED_DATA_LOADER_DISPATCHING_EXECUTOR_FACTORY = "__GJ_delayed_data_loader_dispatching_executor_factory";
37-
38-
3917
/**
4018
* Enables the ability to chain DataLoader dispatching.
4119
* <p>
@@ -57,27 +35,4 @@ public static void setEnableDataLoaderChaining(GraphQLContext graphQLContext, bo
5735
}
5836

5937

60-
/**
61-
* Sets nanoseconds the batch window duration size for delayed DataLoaders.
62-
* That is for DataLoaders, that are not batched as part of the normal per level
63-
* dispatching, because they were created after the level was already dispatched.
64-
*
65-
* @param graphQLContext
66-
* @param batchWindowSize
67-
*/
68-
public static void setDelayedDataLoaderBatchWindowSize(GraphQLContext graphQLContext, Duration batchWindowSize) {
69-
graphQLContext.put(DELAYED_DATA_LOADER_BATCH_WINDOW_SIZE_NANO_SECONDS, batchWindowSize.toNanos());
70-
}
71-
72-
/**
73-
* Sets the instance of {@link DelayedDataLoaderDispatcherExecutorFactory} that is used to create the
74-
* {@link java.util.concurrent.ScheduledExecutorService} for the delayed DataLoader dispatching.
75-
* <p>
76-
*
77-
* @param graphQLContext
78-
* @param delayedDataLoaderDispatcherExecutorFactory
79-
*/
80-
public static void setDelayedDataLoaderDispatchingExecutorFactory(GraphQLContext graphQLContext, DelayedDataLoaderDispatcherExecutorFactory delayedDataLoaderDispatcherExecutorFactory) {
81-
graphQLContext.put(DELAYED_DATA_LOADER_DISPATCHING_EXECUTOR_FACTORY, delayedDataLoaderDispatcherExecutorFactory);
82-
}
8338
}

src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import graphql.execution.incremental.AlternativeCallContext;
1212
import graphql.schema.DataFetcher;
1313
import graphql.schema.DataFetchingEnvironment;
14-
import graphql.util.InterThreadMemoizedSupplier;
1514
import graphql.util.LockKit;
1615
import org.dataloader.DataLoader;
1716
import org.dataloader.DataLoaderRegistry;
@@ -26,9 +25,6 @@
2625
import java.util.Set;
2726
import java.util.concurrent.CompletableFuture;
2827
import java.util.concurrent.ConcurrentHashMap;
29-
import java.util.concurrent.Executors;
30-
import java.util.concurrent.ScheduledExecutorService;
31-
import java.util.concurrent.TimeUnit;
3228
import java.util.concurrent.atomic.AtomicReference;
3329
import java.util.function.Supplier;
3430
import java.util.stream.Collectors;
@@ -39,13 +35,8 @@ public class PerLevelDataLoaderDispatchStrategy implements DataLoaderDispatchStr
3935

4036
private final CallStack initialCallStack;
4137
private final ExecutionContext executionContext;
42-
private final long batchWindowNs;
4338
private final boolean enableDataLoaderChaining;
4439

45-
private final InterThreadMemoizedSupplier<ScheduledExecutorService> delayedDataLoaderDispatchExecutor;
46-
47-
static final InterThreadMemoizedSupplier<ScheduledExecutorService> defaultDelayedDLCFBatchWindowScheduler
48-
= new InterThreadMemoizedSupplier<>(() -> Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()));
4940

5041
static final long DEFAULT_BATCH_WINDOW_NANO_SECONDS_DEFAULT = 500_000L;
5142
private final Profiler profiler;
@@ -209,15 +200,6 @@ public PerLevelDataLoaderDispatchStrategy(ExecutionContext executionContext) {
209200
this.executionContext = executionContext;
210201

211202
GraphQLContext graphQLContext = executionContext.getGraphQLContext();
212-
this.batchWindowNs = graphQLContext.getOrDefault(DataLoaderDispatchingContextKeys.DELAYED_DATA_LOADER_BATCH_WINDOW_SIZE_NANO_SECONDS, DEFAULT_BATCH_WINDOW_NANO_SECONDS_DEFAULT);
213-
214-
this.delayedDataLoaderDispatchExecutor = new InterThreadMemoizedSupplier<>(() -> {
215-
DelayedDataLoaderDispatcherExecutorFactory delayedDataLoaderDispatcherExecutorFactory = graphQLContext.get(DataLoaderDispatchingContextKeys.DELAYED_DATA_LOADER_DISPATCHING_EXECUTOR_FACTORY);
216-
if (delayedDataLoaderDispatcherExecutorFactory != null) {
217-
return delayedDataLoaderDispatcherExecutorFactory.createExecutor(executionContext.getExecutionId(), graphQLContext);
218-
}
219-
return defaultDelayedDLCFBatchWindowScheduler.get();
220-
});
221203

222204
this.enableDataLoaderChaining = graphQLContext.getBoolean(DataLoaderDispatchingContextKeys.ENABLE_DATA_LOADER_CHAINING, false);
223205
this.profiler = executionContext.getProfiler();
@@ -607,14 +589,15 @@ public void run() {
607589
}
608590

609591
private void newDelayedDataLoader(ResultPathWithDataLoader resultPathWithDataLoader, CallStack callStack) {
610-
callStack.lock.runLocked(() -> {
611-
callStack.batchWindowOfDelayedDataLoaderToDispatch.add(resultPathWithDataLoader.resultPath);
612-
if (!callStack.batchWindowOpen) {
613-
callStack.batchWindowOpen = true;
614-
delayedDataLoaderDispatchExecutor.get().schedule(new DispatchDelayedDataloader(callStack), this.batchWindowNs, TimeUnit.NANOSECONDS);
615-
}
616-
617-
});
592+
dispatchDLCFImpl(Set.of(resultPathWithDataLoader.resultPath), null, callStack);
593+
// callStack.lock.runLocked(() -> {
594+
// callStack.batchWindowOfDelayedDataLoaderToDispatch.add(resultPathWithDataLoader.resultPath);
595+
// if (!callStack.batchWindowOpen) {
596+
// callStack.batchWindowOpen = true;
597+
// delayedDataLoaderDispatchExecutor.get().schedule(new DispatchDelayedDataloader(callStack), this.batchWindowNs, TimeUnit.NANOSECONDS);
598+
// }
599+
//
600+
// });
618601
}
619602

620603
private static class ResultPathWithDataLoader {

src/test/groovy/graphql/ChainedDataLoaderTest.groovy

Lines changed: 2 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package graphql
22

3-
import graphql.execution.ExecutionId
4-
import graphql.execution.instrumentation.dataloader.DataLoaderDispatchingContextKeys
5-
import graphql.execution.instrumentation.dataloader.DelayedDataLoaderDispatcherExecutorFactory
3+
64
import graphql.schema.DataFetcher
75
import org.awaitility.Awaitility
86
import org.dataloader.BatchLoader
@@ -12,10 +10,6 @@ import org.dataloader.DataLoaderRegistry
1210
import spock.lang.RepeatUntilFailure
1311
import spock.lang.Specification
1412

15-
import java.time.Duration
16-
import java.util.concurrent.Executors
17-
import java.util.concurrent.ScheduledExecutorService
18-
import java.util.concurrent.TimeUnit
1913
import java.util.concurrent.atomic.AtomicInteger
2014

2115
import static graphql.ExecutionInput.newExecutionInput
@@ -390,78 +384,14 @@ class ChainedDataLoaderTest extends Specification {
390384
def ei = eiBuilder.dataLoaderRegistry(dataLoaderRegistry).build()
391385
setEnableDataLoaderChaining(ei.graphQLContext, true);
392386

393-
// make the window 250ms
394-
DataLoaderDispatchingContextKeys.setDelayedDataLoaderBatchWindowSize(ei.graphQLContext, Duration.ofMillis(250))
395387

396388
when:
397389
def efCF = graphQL.executeAsync(ei)
398390
Awaitility.await().until { efCF.isDone() }
399391
def er = efCF.get()
400392
then:
401393
er.data == [foo: "fooFirstValue", bar: "barFirstValue"]
402-
batchLoadCalls.get() == 1
403-
}
404-
405-
def "executor for delayed dispatching can be configured"() {
406-
given:
407-
def sdl = '''
408-
409-
type Query {
410-
foo: String
411-
bar: String
412-
}
413-
'''
414-
BatchLoader<String, String> batchLoader = { keys ->
415-
return supplyAsync {
416-
Thread.sleep(250)
417-
return keys;
418-
}
419-
}
420-
421-
DataLoader<String, String> nameDataLoader = DataLoaderFactory.newDataLoader(batchLoader);
422-
423-
DataLoaderRegistry dataLoaderRegistry = new DataLoaderRegistry();
424-
dataLoaderRegistry.register("dl", nameDataLoader);
425-
426-
def fooDF = { env ->
427-
return supplyAsync {
428-
Thread.sleep(1000)
429-
return "fooFirstValue"
430-
}.thenCompose {
431-
return env.getDataLoader("dl").load(it)
432-
}
433-
} as DataFetcher
434-
435-
436-
def fetchers = ["Query": ["foo": fooDF]]
437-
def schema = TestUtil.schema(sdl, fetchers)
438-
def graphQL = GraphQL.newGraphQL(schema).build()
439-
440-
def query = "{ foo } "
441-
def ei = newExecutionInput(query).dataLoaderRegistry(dataLoaderRegistry).build()
442-
setEnableDataLoaderChaining(ei.graphQLContext, true);
443-
444-
445-
ScheduledExecutorService scheduledExecutorService = Mock()
446-
DataLoaderDispatchingContextKeys.setDelayedDataLoaderDispatchingExecutorFactory(ei.getGraphQLContext(), new DelayedDataLoaderDispatcherExecutorFactory() {
447-
@Override
448-
ScheduledExecutorService createExecutor(ExecutionId executionId, GraphQLContext graphQLContext) {
449-
return scheduledExecutorService
450-
}
451-
})
452-
453-
454-
when:
455-
def efCF = graphQL.executeAsync(ei)
456-
Awaitility.await().until { efCF.isDone() }
457-
def er = efCF.get()
458-
459-
then:
460-
er.data == [foo: "fooFirstValue"]
461-
1 * scheduledExecutorService.schedule(_ as Runnable, _ as Long, _ as TimeUnit) >> { Runnable runnable, Long delay, TimeUnit timeUnit ->
462-
return Executors.newSingleThreadScheduledExecutor().schedule(runnable, delay, timeUnit)
463-
}
464-
394+
batchLoadCalls.get() == 1 || batchLoadCalls.get() == 2 // depending on timing, it can be 1 or 2 calls
465395
}
466396

467397
def "handling of chained DataLoaders is disabled by default"() {

src/test/groovy/graphql/config/GraphQLUnusualConfigurationTest.groovy

Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,13 @@ import graphql.ExecutionInput
44
import graphql.ExperimentalApi
55
import graphql.GraphQL
66
import graphql.GraphQLContext
7-
import graphql.execution.instrumentation.dataloader.DataLoaderDispatchingContextKeys
8-
import graphql.execution.instrumentation.dataloader.DelayedDataLoaderDispatcherExecutorFactory
97
import graphql.execution.ResponseMapFactory
8+
import graphql.execution.instrumentation.dataloader.DataLoaderDispatchingContextKeys
109
import graphql.introspection.GoodFaithIntrospection
1110
import graphql.parser.ParserOptions
1211
import graphql.schema.PropertyDataFetcherHelper
1312
import spock.lang.Specification
1413

15-
import java.time.Duration
16-
1714
import static graphql.parser.ParserOptions.newParserOptions
1815

1916
class GraphQLUnusualConfigurationTest extends Specification {
@@ -146,45 +143,6 @@ class GraphQLUnusualConfigurationTest extends Specification {
146143
GraphQL.unusualConfiguration(graphqlContext).dataloaderConfig().isDataLoaderChainingEnabled()
147144
}
148145

149-
def "can set data loader chaining config for extra config"() {
150-
when:
151-
def graphqlContext = GraphQLContext.newContext().build()
152-
GraphQL.unusualConfiguration(graphqlContext).dataloaderConfig().delayedDataLoaderBatchWindowSize(Duration.ofMillis(10))
153-
154-
then:
155-
graphqlContext.get(DataLoaderDispatchingContextKeys.DELAYED_DATA_LOADER_BATCH_WINDOW_SIZE_NANO_SECONDS) == Duration.ofMillis(10).toNanos()
156-
GraphQL.unusualConfiguration(graphqlContext).dataloaderConfig().delayedDataLoaderBatchWindowSize() == Duration.ofMillis(10)
157-
158-
when:
159-
DelayedDataLoaderDispatcherExecutorFactory factory = {}
160-
graphqlContext = GraphQLContext.newContext().build()
161-
GraphQL.unusualConfiguration(graphqlContext).dataloaderConfig().delayedDataLoaderExecutorFactory(factory)
162-
163-
then:
164-
graphqlContext.get(DataLoaderDispatchingContextKeys.DELAYED_DATA_LOADER_DISPATCHING_EXECUTOR_FACTORY) == factory
165-
GraphQL.unusualConfiguration(graphqlContext).dataloaderConfig().delayedDataLoaderExecutorFactory() == factory
166-
167-
when:
168-
graphqlContext = GraphQLContext.newContext().build()
169-
// just to show we we can navigate the DSL
170-
GraphQL.unusualConfiguration(graphqlContext)
171-
.incrementalSupport()
172-
.enableIncrementalSupport(false)
173-
.enableIncrementalSupport(true)
174-
.then()
175-
.dataloaderConfig()
176-
.enableDataLoaderChaining(true)
177-
.then()
178-
.dataloaderConfig()
179-
.delayedDataLoaderBatchWindowSize(Duration.ofMillis(10))
180-
.delayedDataLoaderExecutorFactory(factory)
181-
182-
then:
183-
graphqlContext.get(DataLoaderDispatchingContextKeys.ENABLE_DATA_LOADER_CHAINING) == true
184-
graphqlContext.get(DataLoaderDispatchingContextKeys.DELAYED_DATA_LOADER_BATCH_WINDOW_SIZE_NANO_SECONDS) == Duration.ofMillis(10).toNanos()
185-
graphqlContext.get(DataLoaderDispatchingContextKeys.DELAYED_DATA_LOADER_DISPATCHING_EXECUTOR_FACTORY) == factory
186-
}
187-
188146
def "we can access via the ExecutionInput"() {
189147
when:
190148
def eiBuilder = ExecutionInput.newExecutionInput("query q {f}")

src/test/groovy/graphql/execution/instrumentation/dataloader/DeferWithDataLoaderTest.groovy

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import org.dataloader.DataLoaderRegistry
1414
import spock.lang.RepeatUntilFailure
1515
import spock.lang.Specification
1616

17-
import java.time.Duration
1817
import java.util.concurrent.CompletableFuture
1918

2019
import static graphql.ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT
@@ -350,7 +349,6 @@ class DeferWithDataLoaderTest extends Specification {
350349
}
351350

352351
@RepeatUntilFailure(maxAttempts = 50, ignoreRest = false)
353-
// skip until
354352
def "dataloader in initial result and chained dataloader inside nested defer block"() {
355353
given:
356354
def sdl = '''
@@ -392,7 +390,6 @@ class DeferWithDataLoaderTest extends Specification {
392390
}
393391
BatchLoader addressBatchLoader = { List<String> keys ->
394392
println "addressBatchLoader called with $keys"
395-
assert keys.size() == 3
396393
return CompletableFuture.completedFuture(keys.collect { it ->
397394
if (it == "owner-1") {
398395
return "Address 1"
@@ -450,7 +447,6 @@ class DeferWithDataLoaderTest extends Specification {
450447
def ei = ExecutionInput.newExecutionInput(query).dataLoaderRegistry(dataLoaderRegistry).build()
451448
ei.getGraphQLContext().put(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT, true)
452449
ei.getGraphQLContext().put(DataLoaderDispatchingContextKeys.ENABLE_DATA_LOADER_CHAINING, true)
453-
ei.getGraphQLContext().put(DataLoaderDispatchingContextKeys.DELAYED_DATA_LOADER_BATCH_WINDOW_SIZE_NANO_SECONDS, Duration.ofSeconds(1).toNanos())
454450

455451
when:
456452
CompletableFuture<IncrementalExecutionResult> erCF = graphQL.executeAsync(ei)

0 commit comments

Comments
 (0)