-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Subscription results keep in upstream order #3574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Updated it to have a
|
when request one by one private CompletableFuture<Flux<Object>> mkFluxDF(DataFetchingEnvironment env) {
// async deliver of the publisher with random snoozing between values
return CompletableFuture.supplyAsync(() -> Flux.generate(() -> 0, (counter, sink) -> {
sink.next(mkValue(counter));
snooze(rand(10, 100));
if (counter == 10) {
sink.complete();
}
return counter + 1;
}).doOnNext(System.out::println));
}
when request 10 @Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
System.out.println("onSubscribe");
subscription.request(10);
}
@Override
public void onNext(Object o) {
System.out.println("\tonNext : " + o);
// subscription.request(1);
} {id=0, counter=0}
|
…aks on tests and how we handle things
…aks on tests and how we handle things
…it out the different publishers and also took out the subscribers
…e tests and more code tweaks
… subscribers now inhrent from each other and outstanding futures are now cancelled if the Publisher fails
* source publisher. But this can be changed to {@link Boolean#TRUE} to keep them in order. | ||
*/ | ||
public static final String KEEP_SUBSCRIPTION_EVENTS_ORDERED = "KEEP_SUBSCRIPTION_EVENTS_ORDERED"; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default this is off (old behavior) but it can be opted into
* @param <U> the upstream type to be mapped to | ||
*/ | ||
@Internal | ||
public class CompletionStageMappingOrderedPublisher<D, U> extends CompletionStageMappingPublisher<D, U> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it extends CompletionStageMappingPublisher
because they have a LOT in common - just the ordering on inflight objects is different
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note also this has been split out of the publisher class
private final Publisher<U> upstreamPublisher; | ||
private final Function<U, CompletionStage<D>> mapper; | ||
protected final Publisher<U> upstreamPublisher; | ||
protected final Function<U, CompletionStage<D>> mapper; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now designed to be inherited from its ordered peer
@@ -52,126 +54,4 @@ public Publisher<U> getUpstreamPublisher() { | |||
return upstreamPublisher; | |||
} | |||
|
|||
@SuppressWarnings("ReactiveStreamsSubscriberImplementation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was moved to its own class up in the same package
* @param <D> mapped downstream values | ||
*/ | ||
@Internal | ||
public class CompletionStageOrderedSubscriber<U, D> extends CompletionStageSubscriber<U, D> implements Subscriber<U> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It inherents from CompletionStageSubscriber because again they are very much the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except for ordering of source events
…ked method name on review
if (throwable instanceof CompletionException & throwable.getCause() != null) { | ||
return throwable.getCause(); | ||
} | ||
return throwable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the onError will be the underling exception not the wrapper CompletionException that comes from CF usage
* @param <D> mapped downstream values | ||
*/ | ||
@Internal | ||
public class CompletionStageSubscriber<U, D> implements Subscriber<U> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was split out from the peer Publisher code as its own file. Its complicated enough to deserve that.
protected final Queue<CompletionStage<?>> inFlightDataQ; | ||
protected final LockKit.ReentrantLock lock = new LockKit.ReentrantLock(); | ||
protected final AtomicReference<Runnable> onCompleteRun; | ||
protected final AtomicBoolean isTerminal; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Designed to be inheret from by CompletionStageOrderedSubscriber
text | ||
} | ||
} | ||
""").graphQLContext([(SubscriptionExecutionStrategy.KEEP_SUBSCRIPTION_EVENTS_ORDERED): true]).build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
our hint to make them ordered. Here is an example
import java.util.concurrent.CompletionStage | ||
import java.util.function.Function | ||
|
||
class CompletionStageMappingOrderedPublisherTest extends Specification { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests for the two classes kinda repeat themselves - but I figuered that even though they inherent from each other - the tests should stress them the same and make no assumptions on how they are implemented
With even more work maybe the tests become common ones use where:
but I didnt go that far - copy pasta rather
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And - no I ended up refactoring so that the same tests are run for both implementations - one in order and one not in order
|
||
import static graphql.execution.reactive.CompletionStageSubscriberTest.mapperThatDoesNotComplete | ||
|
||
class CompletionStageOrderedSubscriberTest extends Specification { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by breaking out the Subscriber classes into their own class - we can unit test them better
@@ -0,0 +1,71 @@ | |||
package graphql.execution.reactive.tck; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made a series of new TestNG compliance tests that use the TCK to ensure they are valid publishers
* This reproduction is to see what's happening with Subscriptions and whether they keep their | ||
* order when values are async. | ||
*/ | ||
public class SubscriptionReproduction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a main()
reproduction class - not a test - I wanted to find out what was happening and I thought this was a good way to find out. Other code now tests that the fix is in place
…bined the tests so there is less repeated test code
@@ -117,6 +117,7 @@ dependencies { | |||
|
|||
testImplementation 'org.reactivestreams:reactive-streams-tck:' + reactiveStreamsVersion | |||
testImplementation "io.reactivex.rxjava2:rxjava:2.2.21" | |||
testImplementation "io.projectreactor:reactor-core:3.6.5" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added so we can do Reactor specific tests if we choose
@@ -40,9 +35,16 @@ public CompletionStageMappingPublisher(Publisher<U> upstreamPublisher, Function< | |||
|
|||
@Override | |||
public void subscribe(Subscriber<? super D> downstreamSubscriber) { | |||
upstreamPublisher.subscribe(new CompletionStageSubscriber(downstreamSubscriber)); | |||
assertNotNullWithNPE(downstreamSubscriber, () -> "Subscriber passed to subscribe must not be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spec says it MUST be non null
|
||
@NotNull | ||
protected Subscriber<? super U> createSubscriber(Subscriber<? super D> downstreamSubscriber) { | ||
return new CompletionStageSubscriber<>(mapper, downstreamSubscriber); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ordering comes from the Subscriber impl
onCompleteOrError(() -> { | ||
onCompleteOrErrorRunCalled.set(true); | ||
downstreamSubscriber.onError(t); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See in the old impl we waited until ALL inflights finished before we called onError - well now we dont.
We fail fast after discussing it with the Spring folks
CompletableFuture<?> cf = cs.toCompletableFuture(); | ||
if (cf.isDone()) { | ||
// take it off the queue | ||
inFlightDataQ.poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in a lock so we can rely on it not changing here on the second mutative read
downstreamSubscriber.onError(t); | ||
cancelInFlightFutures(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now fails fast
if (isTerminal.compareAndSet(false, true)) { | ||
downstreamSubscriber.onComplete(); | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waits for all inflight CFs to complete
public CompletionStageSubscriber(Function<U, CompletionStage<D>> mapper, Subscriber<? super D> downstreamSubscriber) { | ||
this.mapper = mapper; | ||
this.downstreamSubscriber = downstreamSubscriber; | ||
inFlightDataQ = new ArrayDeque<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: to a non-native English speaker it's going to be confusing what the "Q" is (it being a homophone for "queue"). I can see it's not introduced in this PR, it's based off naming from previous code.
Could rename these methods/variables to "Queue" or mention in a comment that the "Q" means queue.
See spring-projects/spring-graphql#949
This PR has a reproduction of the problem reported and also a series fo fixes
However after discussions with the Spring team, its not apparent that the old "stream what completes first" approach is wrong.
So I have added two modes - the older
CompletionStageMappingPublisher
which will emit objects as soon as they complete and a newerCompletionStageMappingOrderedPublisher
that will emit elements in the original upstream Publisher order.Both have bufferring (for elements that have not completed when they are mapped into graphql objects) but once keeps them in original presentation order while the old one does what it has always done.
I have split the inner Subscriber classes out into their own classes and hence can unit test them better
I have changed the tests considerable. I have tested more edge cases.
I have also made them finish up when
onError
is called and they cancel any futures in flight.