7
7
import graphql .execution .ExecutionStrategyParameters ;
8
8
import graphql .execution .FieldValueInfo ;
9
9
import graphql .schema .DataFetcher ;
10
+ import graphql .schema .DataFetchingEnvironment ;
10
11
import graphql .util .LockKit ;
11
12
import org .dataloader .DataLoaderRegistry ;
12
13
14
+ import java .util .ArrayList ;
13
15
import java .util .LinkedHashSet ;
14
16
import java .util .List ;
17
+ import java .util .Map ;
15
18
import java .util .Set ;
19
+ import java .util .concurrent .ConcurrentHashMap ;
20
+ import java .util .concurrent .CopyOnWriteArrayList ;
21
+ import java .util .concurrent .CountDownLatch ;
22
+ import java .util .concurrent .Executors ;
23
+ import java .util .concurrent .ScheduledExecutorService ;
24
+ import java .util .concurrent .TimeUnit ;
25
+ import java .util .concurrent .atomic .AtomicReference ;
26
+ import java .util .function .Supplier ;
16
27
17
28
@ Internal
18
29
public class PerLevelDataLoaderDispatchStrategy implements DataLoaderDispatchStrategy {
19
30
20
31
private final CallStack callStack ;
21
32
private final ExecutionContext executionContext ;
22
33
34
+ static final ScheduledExecutorService isolatedDLCFBatchWindowScheduler = Executors .newSingleThreadScheduledExecutor ();
35
+ static final int BATCH_WINDOW_NANO_SECONDS = 500_000 ;
36
+
23
37
24
38
private static class CallStack {
25
39
@@ -34,10 +48,27 @@ private static class CallStack {
34
48
35
49
private final Set <Integer > dispatchedLevels = new LinkedHashSet <>();
36
50
51
+ // fields only relevant when a DataLoaderCF is involved
52
+ private final List <DataLoaderCF <?>> allDataLoaderCF = new CopyOnWriteArrayList <>();
53
+ //TODO: maybe this should be cleaned up once the CF returned by these fields are completed
54
+ // otherwise this will stick around until the whole request is finished
55
+ private final Set <DataFetchingEnvironment > fieldsFinishedDispatching = ConcurrentHashMap .newKeySet ();
56
+ private final Map <Integer , Set <DataFetchingEnvironment >> levelToDFEWithDataLoaderCF = new ConcurrentHashMap <>();
57
+
58
+ private final Set <DataFetchingEnvironment > batchWindowOfIsolatedDfeToDispatch = ConcurrentHashMap .newKeySet ();
59
+
60
+ private boolean batchWindowOpen = false ;
61
+
62
+
37
63
public CallStack () {
38
64
expectedExecuteObjectCallsPerLevel .set (1 , 1 );
39
65
}
40
66
67
+ public void addDataLoaderDFE (int level , DataFetchingEnvironment dfe ) {
68
+ levelToDFEWithDataLoaderCF .computeIfAbsent (level , k -> new LinkedHashSet <>()).add (dfe );
69
+ }
70
+
71
+
41
72
void increaseExpectedFetchCount (int level , int count ) {
42
73
expectedFetchCountPerLevel .increment (level , count );
43
74
}
@@ -234,9 +265,13 @@ private int getObjectCountForList(List<FieldValueInfo> fieldValueInfos) {
234
265
public void fieldFetched (ExecutionContext executionContext ,
235
266
ExecutionStrategyParameters executionStrategyParameters ,
236
267
DataFetcher <?> dataFetcher ,
237
- Object fetchedValue ) {
268
+ Object fetchedValue ,
269
+ Supplier <DataFetchingEnvironment > dataFetchingEnvironment ) {
238
270
int level = executionStrategyParameters .getPath ().getLevel ();
239
271
boolean dispatchNeeded = callStack .lock .callLocked (() -> {
272
+ if (DataLoaderCF .isDataLoaderCF (fetchedValue )) {
273
+ callStack .addDataLoaderDFE (level , dataFetchingEnvironment .get ());
274
+ }
240
275
callStack .increaseFetchCount (level );
241
276
return dispatchIfNeeded (level );
242
277
});
@@ -275,9 +310,89 @@ private boolean levelReady(int level) {
275
310
}
276
311
277
312
void dispatch (int level ) {
278
- DataLoaderRegistry dataLoaderRegistry = executionContext .getDataLoaderRegistry ();
279
- dataLoaderRegistry .dispatchAll ();
313
+ if (callStack .levelToDFEWithDataLoaderCF .size () > 0 ) {
314
+ dispatchDLCFImpl (callStack .levelToDFEWithDataLoaderCF .get (level ));
315
+ } else {
316
+ DataLoaderRegistry dataLoaderRegistry = executionContext .getDataLoaderRegistry ();
317
+ dataLoaderRegistry .dispatchAll ();
318
+ }
280
319
}
281
320
321
+
322
+ public void dispatchDLCFImpl (Set <DataFetchingEnvironment > dfeToDispatchSet ) {
323
+
324
+ // filter out all DataLoaderCFS that are matching the fields we want to dispatch
325
+ List <DataLoaderCF <?>> relevantDataLoaderCFs = new ArrayList <>();
326
+ for (DataLoaderCF <?> dataLoaderCF : callStack .allDataLoaderCF ) {
327
+ if (dfeToDispatchSet .contains (dataLoaderCF .dfe )) {
328
+ relevantDataLoaderCFs .add (dataLoaderCF );
329
+ }
330
+ }
331
+ // we are cleaning up the list of all DataLoadersCFs
332
+ callStack .allDataLoaderCF .removeAll (relevantDataLoaderCFs );
333
+
334
+ // means we are all done dispatching the fields
335
+ if (relevantDataLoaderCFs .size () == 0 ) {
336
+ callStack .fieldsFinishedDispatching .addAll (dfeToDispatchSet );
337
+ return ;
338
+ }
339
+ // we are dispatching all data loaders and waiting for all dataLoaderCFs to complete
340
+ // and to finish their sync actions
341
+ CountDownLatch countDownLatch = new CountDownLatch (relevantDataLoaderCFs .size ());
342
+ for (DataLoaderCF dlCF : relevantDataLoaderCFs ) {
343
+ dlCF .latch = countDownLatch ;
344
+ }
345
+ // TODO: this should be done async or in a more regulated way with a configurable thread pool or so
346
+ new Thread (() -> {
347
+ try {
348
+ // waiting until all sync codes for all DL CFs are run
349
+ countDownLatch .await ();
350
+ } catch (InterruptedException e ) {
351
+ throw new RuntimeException (e );
352
+ }
353
+ // now we handle all new DataLoaders
354
+ dispatchDLCFImpl (dfeToDispatchSet );
355
+ }).start ();
356
+ // Only dispatching relevant data loaders
357
+ for (DataLoaderCF dlCF : relevantDataLoaderCFs ) {
358
+ dlCF .dfe .getDataLoader (dlCF .dataLoaderName ).dispatch ();
359
+ }
360
+ // executionContext.getDataLoaderRegistry().dispatchAll();
361
+ }
362
+
363
+
364
+ public void newDataLoaderCF (DataLoaderCF <?> dataLoaderCF ) {
365
+ System .out .println ("newDataLoaderCF" );
366
+ callStack .lock .runLocked (() -> {
367
+ callStack .allDataLoaderCF .add (dataLoaderCF );
368
+ });
369
+ if (callStack .fieldsFinishedDispatching .contains (dataLoaderCF .dfe )) {
370
+ System .out .println ("isolated dispatch" );
371
+ dispatchIsolatedDataLoader (dataLoaderCF );
372
+ }
373
+
374
+ }
375
+
376
+ private void dispatchIsolatedDataLoader (DataLoaderCF <?> dlCF ) {
377
+ callStack .lock .runLocked (() -> {
378
+ callStack .batchWindowOfIsolatedDfeToDispatch .add (dlCF .dfe );
379
+ if (!callStack .batchWindowOpen ) {
380
+ callStack .batchWindowOpen = true ;
381
+ AtomicReference <Set <DataFetchingEnvironment >> dfesToDispatch = new AtomicReference <>();
382
+ Runnable runnable = () -> {
383
+ callStack .lock .runLocked (() -> {
384
+ dfesToDispatch .set (new LinkedHashSet <>(callStack .batchWindowOfIsolatedDfeToDispatch ));
385
+ callStack .batchWindowOfIsolatedDfeToDispatch .clear ();
386
+ callStack .batchWindowOpen = false ;
387
+ });
388
+ dispatchDLCFImpl (dfesToDispatch .get ());
389
+ };
390
+ isolatedDLCFBatchWindowScheduler .schedule (runnable , BATCH_WINDOW_NANO_SECONDS , TimeUnit .NANOSECONDS );
391
+ }
392
+
393
+ });
394
+ }
395
+
396
+
282
397
}
283
398
0 commit comments