-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Added support for reactive Publishers to be returned from data fetchers #3731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* @param <T> for two | ||
* @param <S> for subscription | ||
*/ | ||
private static abstract class PublisherToCompletableFuture<T, S> extends CompletableFuture<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is heavily inspired from the Reactor reactor.core.publisher.MonoToCompletableFuture
class. Its uses pretty much the same pattern to turn a Publisher into a CF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note it "IS-A" CompletableFuture class
@@ -197,8 +199,8 @@ public static String mkNameForPath(List<Field> currentField) { | |||
* @throws NonNullableFieldWasNullException in the {@link CompletableFuture} if a non-null field resolved to a null value | |||
*/ | |||
@SuppressWarnings("unchecked") | |||
protected Object /* CompletableFuture<Map<String, Object>> | Map<String, Object> */ | |||
executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException { | |||
@DuckTyped(shape = "CompletableFuture<Map<String, Object>> | Map<String, Object>") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this annotation reads better in the IDEA rather than a comment. Maybe we can use it later in some way but honestly its just a more type safe way of saying the same as we try to say in the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice idea. Makes it more official than a comment
public class ReactiveSupport { | ||
|
||
@DuckTyped(shape = "CompletableFuture | Object") | ||
public static Object fetchedObject(Object fetchedObject) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also ducked typed
…rs - review tweaks
val = ReactiveSupport.fetchedObject(cf) | ||
then: | ||
val === cf | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wont touch CFs or plain old objects
!cf.isDone() | ||
|
||
when: | ||
def cfCancelled = cf.cancel(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cancelled before it produce a value
return counter + 1; | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a test helper
reactiveObject || _ | ||
Mono.just("X") || _ | ||
toFlow(Mono.just("X")) || _ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests with both Reactor and Java Flows
} | ||
|
||
|
||
def "can get a reactive Flux and only take one value and make a CF from it"() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even a Flux of N values only has 1 value taken from it because that is what the graphql contact is - a Query field is 1 value not a stream of values - at least not yet (aka @stream future work perhaps)
cfField : "cf", | ||
materialisedField: "materialised" | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
full integration tests showing it working.
We know CF error handling works already so there is no need for integration tests showing all that
@@ -498,6 +500,8 @@ Async.CombinedBuilder<FieldValueInfo> getAsyncFieldValueInfo( | |||
executionContext.getDataLoaderDispatcherStrategy().fieldFetched(executionContext, parameters, dataFetcher, fetchedObject); | |||
fetchCtx.onDispatched(); | |||
fetchCtx.onFetchedValue(fetchedObject); | |||
// possible convert reactive objects into CompletableFutures | |||
fetchedObject = ReactiveSupport.fetchedObject(fetchedObject); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main chance. One liner. Turns a reactive publisher into a CF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe
…rs - never on subscriptions
…rs - never on subscriptions with tests working
*/ | ||
public boolean isSubscriptionOperation() { | ||
return isOpType(OperationDefinition.Operation.SUBSCRIPTION); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hav long wanted this. Checking operationDefinition.getOperation() is a PITA
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
if (!executionContext.isSubscriptionOperation()) { | ||
// possible convert reactive objects into CompletableFutures | ||
fetchedObject = ReactiveSupport.fetchedObject(fetchedObject); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main code - if its NOT a Subscription, we turns the Pubisher into a CF
@Target(value = {METHOD, PARAMETER}) | ||
public @interface DuckTyped { | ||
String shape(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is better than a comment
@DuckedTyped(shape="Object | CompletableFuture<Object>")
versus
/* Object | CompletableFuture<Object> */
return reactivePublisherToCF((Publisher<?>) fetchedObject); | ||
} | ||
return fetchedObject; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaves other things alone = only acts on Flow.Publisher
and Pubisher
This adds support for Reactive Publishers, both reactive streams Publishers and JDK Flow Publishers, to be returned from a
DataFetcher
as a value.They will be turned into
CompletableFuture
by using a subscriber to ask for one value from them.This is a lot like how reactor
Mono
has a.toFuture()
method except now they can just return the Mono itself, since a Mono is a reactive streams Publisher.The cost of this is 2 more
instanceof
checks like we already for forCompleteableFuture
s