Skip to content

Added support for reactive Publishers to be returned from data fetchers #3731

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 28, 2024

Conversation

bbakerman
Copy link
Member

This adds support for Reactive Publishers, both reactive streams Publishers and JDK Flow Publishers, to be returned from a DataFetcher as a value.

They will be turned into CompletableFuture by using a subscriber to ask for one value from them.

This is a lot like how reactor Mono has a .toFuture() method except now they can just return the Mono itself, since a Mono is a reactive streams Publisher.

The cost of this is 2 more instanceof checks like we already for for CompleteableFutures

@bbakerman bbakerman added this to the 23.x breaking changes milestone Oct 26, 2024
* @param <T> for two
* @param <S> for subscription
*/
private static abstract class PublisherToCompletableFuture<T, S> extends CompletableFuture<T> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is heavily inspired from the Reactor reactor.core.publisher.MonoToCompletableFuture class. Its uses pretty much the same pattern to turn a Publisher into a CF

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note it "IS-A" CompletableFuture class

@@ -197,8 +199,8 @@ public static String mkNameForPath(List<Field> currentField) {
* @throws NonNullableFieldWasNullException in the {@link CompletableFuture} if a non-null field resolved to a null value
*/
@SuppressWarnings("unchecked")
protected Object /* CompletableFuture<Map<String, Object>> | Map<String, Object> */
executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
@DuckTyped(shape = "CompletableFuture<Map<String, Object>> | Map<String, Object>")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this annotation reads better in the IDEA rather than a comment. Maybe we can use it later in some way but honestly its just a more type safe way of saying the same as we try to say in the comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea. Makes it more official than a comment

public class ReactiveSupport {

@DuckTyped(shape = "CompletableFuture | Object")
public static Object fetchedObject(Object fetchedObject) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also ducked typed

val = ReactiveSupport.fetchedObject(cf)
then:
val === cf
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wont touch CFs or plain old objects

!cf.isDone()

when:
def cfCancelled = cf.cancel(true)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancelled before it produce a value

return counter + 1;
})
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a test helper

reactiveObject || _
Mono.just("X") || _
toFlow(Mono.just("X")) || _
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests with both Reactor and Java Flows

}


def "can get a reactive Flux and only take one value and make a CF from it"() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even a Flux of N values only has 1 value taken from it because that is what the graphql contact is - a Query field is 1 value not a stream of values - at least not yet (aka @stream future work perhaps)

cfField : "cf",
materialisedField: "materialised"
]
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

full integration tests showing it working.

We know CF error handling works already so there is no need for integration tests showing all that

@@ -498,6 +500,8 @@ Async.CombinedBuilder<FieldValueInfo> getAsyncFieldValueInfo(
executionContext.getDataLoaderDispatcherStrategy().fieldFetched(executionContext, parameters, dataFetcher, fetchedObject);
fetchCtx.onDispatched();
fetchCtx.onFetchedValue(fetchedObject);
// possible convert reactive objects into CompletableFutures
fetchedObject = ReactiveSupport.fetchedObject(fetchedObject);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main chance. One liner. Turns a reactive publisher into a CF

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

*/
public boolean isSubscriptionOperation() {
return isOpType(OperationDefinition.Operation.SUBSCRIPTION);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hav long wanted this. Checking operationDefinition.getOperation() is a PITA

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

if (!executionContext.isSubscriptionOperation()) {
// possible convert reactive objects into CompletableFutures
fetchedObject = ReactiveSupport.fetchedObject(fetchedObject);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main code - if its NOT a Subscription, we turns the Pubisher into a CF

@Target(value = {METHOD, PARAMETER})
public @interface DuckTyped {
String shape();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is better than a comment

@DuckedTyped(shape="Object | CompletableFuture<Object>")

versus

/* Object | CompletableFuture<Object> */

return reactivePublisherToCF((Publisher<?>) fetchedObject);
}
return fetchedObject;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaves other things alone = only acts on Flow.Publisher and Pubisher

@bbakerman bbakerman merged commit d854425 into master Nov 28, 2024
1 check passed
@dondonz dondonz deleted the reactive-values-from-datafetchers branch November 30, 2024 07:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants