Skip to content

A PR for reactive streams support #151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 41 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6e30220
Add a proof-of-concept for "Observer-like" batch loading
AlexandreCarlton May 12, 2024
95540ff
reactive streams support branch
bbakerman May 17, 2024
2cdba8a
Merge remote-tracking branch 'origin/master' into reactive-streams-br…
bbakerman May 17, 2024
1d78255
reactive streams support branch - merged master
bbakerman May 17, 2024
2032e33
Merge remote-tracking branch 'origin/master' into observer-batch-load…
AlexandreCarlton May 18, 2024
6b5a732
Eliminate *BatchObserver in favour of Publisher
AlexandreCarlton May 18, 2024
68d7f54
Use internal Assertions over Java's raw assert
AlexandreCarlton May 18, 2024
a3132b7
Remove handling of Throwable passed into onNext
AlexandreCarlton May 18, 2024
fbeffae
Expose `new*DataLoader` methods for *PublisherBatchLoader
AlexandreCarlton May 18, 2024
b2a662d
Copy/tweak original/ DataLoader tests for publisher equivalents
AlexandreCarlton May 18, 2024
0d0b2f8
Rename '*PublisherBatchLoader' to 'BatchPublisher'
AlexandreCarlton May 18, 2024
14002f6
Ensure DataLoaderSubscriber is only called by one thread
AlexandreCarlton May 18, 2024
0f303a8
Document Subscriber#onNext invocation order
AlexandreCarlton May 18, 2024
ce115fd
Merge branch 'reactive-streams-branch' into observer-batch-loader-pro…
bbakerman May 19, 2024
288be41
Merge pull request #148 from AlexandreCarlton/observer-batch-loader-p…
bbakerman May 19, 2024
e16fa65
Merge remote-tracking branch 'origin/master' into reactive-streams-br…
bbakerman May 20, 2024
a93112a
reactive streams support branch - getting it compiling
bbakerman May 20, 2024
74567fe
Making the Subscribers use a common base class
bbakerman May 21, 2024
4396624
Making the Subscribers use a common base class- synchronized on each …
bbakerman May 21, 2024
8a64483
Making the Subscribers use a common base class- now with failing test…
bbakerman May 21, 2024
3e8ac9c
Making the Subscribers use a common base class- fail the overall CF o…
bbakerman May 21, 2024
eb2b40c
Inline BatchPublisher tests into DataLoaderTest
AlexandreCarlton May 20, 2024
651e561
Fix MappedBatchPublisher loaders to work without cache
AlexandreCarlton May 20, 2024
8295396
Merge pull request #155 from AlexandreCarlton/migrate-publisher-tests
bbakerman May 22, 2024
86ec5c8
Merge remote-tracking branch 'origin/reactive-streams-branch' into re…
bbakerman May 22, 2024
6d3c4eb
Making the Subscribers use a common base class - merged in main branch
bbakerman May 22, 2024
3fddb8b
Merge pull request #154 from graphql-java/reactive-streams-common-pub…
bbakerman May 22, 2024
034c68f
More tests for Publishers
bbakerman May 22, 2024
b09ac60
Merge remote-tracking branch 'origin/master' into reactive-streams-br…
bbakerman May 23, 2024
5d826b8
Merge remote-tracking branch 'origin/master' into reactive-streams-br…
bbakerman May 23, 2024
8b344db
Now the builds pass - broken out the fixtures
bbakerman May 23, 2024
e9bfc2b
Merge pull request #158 from graphql-java/reactive-streams-branch-ext…
bbakerman May 23, 2024
91d3036
This moves the reactive code pout into its own package because DataLo…
bbakerman May 24, 2024
e98621b
renamed classes inline with their counterparts
bbakerman May 24, 2024
6523015
made them non public and created a static factory support class
bbakerman May 24, 2024
170ccf8
reorged method placement
bbakerman May 24, 2024
77fd0dd
Merge pull request #159 from graphql-java/reactive-streams-branch-mov…
bbakerman May 24, 2024
4b9356e
Added javadoc to publisher interfaces
bbakerman May 24, 2024
3c3cc99
Have MappedBatchPublisher take in a Set<K> keys
AlexandreCarlton May 26, 2024
2e82858
Add README sections for `*BatchPublisher`
AlexandreCarlton May 26, 2024
c3e6ee5
Merge pull request #160 from AlexandreCarlton/add-documentation-for-p…
bbakerman May 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,66 @@ For example, let's assume you want to load users from a database, you could prob
// ...
```

### Returning a stream of results from your batch publisher

It may be that your batch loader function is a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream.

For example, let's say you wanted to load many users from a service without forcing the service to load all
users into its memory (which may exert considerable pressure on it).

A `org.dataloader.BatchPublisher` may be used to load this data:

```java
BatchPublisher<Long, User> batchPublisher = new BatchPublisher<Long, User>() {
@Override
public void load(List<Long> userIds, Subscriber<User> userSubscriber) {
userManager.publishUsersById(userIds, userSubscriber);
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher);

// ...
```

Rather than waiting for all values to be returned, this `DataLoader` will complete
the `CompletableFuture<User>` returned by `Dataloader#load(Long)` as each value is
processed.

If an exception is thrown, the remaining futures yet to be completed are completed
exceptionally.

You *MUST* ensure that the values are streamed in the same order as the keys provided,
with the same cardinality (i.e. the number of values must match the number of keys).
Failing to do so will result in incorrect data being returned from `DataLoader#load`.


### Returning a mapped stream of results from your batch publisher

Your publisher may not necessarily return values in the same order in which it processes keys.

For example, let's say your batch publisher function loads user data which is spread across shards,
with some shards responding more quickly than others.

In instances like these, `org.dataloader.MappedBatchPublisher` can be used.

```java
MappedBatchPublisher<Long, User> mappedBatchPublisher = new MappedBatchPublisher<Long, User>() {
@Override
public void load(Set<Long> userIds, Subscriber<Map.Entry<Long, User>> userEntrySubscriber) {
userManager.publishUsersById(userIds, userEntrySubscriber);
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher);

// ...
```

Like the `BatchPublisher`, if an exception is thrown, the remaining futures yet to be completed are completed
exceptionally.

Unlike the `BatchPublisher`, however, it is not necessary to return values in the same order as the provided keys,
or even the same number of values.

### Error object is not a thing in a type safe Java world

In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is rejected
Expand Down Expand Up @@ -541,6 +601,12 @@ The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invo
return scheduledCall.invoke();
}).thenCompose(Function.identity());
}

@Override
public <K> void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
snooze(10);
scheduledCall.invoke();
}
};
```

Expand Down
8 changes: 7 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def getDevelopmentVersion() {
version
}

def slf4jVersion = '1.7.30'
def releaseVersion = System.env.RELEASE_VERSION
version = releaseVersion ? releaseVersion : getDevelopmentVersion()
group = 'com.graphql-java'
Expand Down Expand Up @@ -64,16 +63,23 @@ jar {
}
}

def slf4jVersion = '1.7.30'
def reactiveStreamsVersion = '1.0.3'

dependencies {
api 'org.slf4j:slf4j-api:' + slf4jVersion
api 'org.reactivestreams:reactive-streams:' + reactiveStreamsVersion

testImplementation 'org.slf4j:slf4j-simple:' + slf4jVersion
testImplementation 'org.awaitility:awaitility:2.0.0'
testImplementation "org.hamcrest:hamcrest:2.2"
testImplementation 'io.projectreactor:reactor-core:3.6.6'
testImplementation 'com.github.ben-manes.caffeine:caffeine:2.9.0'
testImplementation platform('org.junit:junit-bom:5.10.2')
testImplementation 'org.junit.jupiter:junit-jupiter-api'
testImplementation 'org.junit.jupiter:junit-jupiter-params'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testImplementation 'io.projectreactor:reactor-core:3.6.6'
}

task sourcesJar(type: Jar) {
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/org/dataloader/BatchPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.dataloader;

import org.reactivestreams.Subscriber;

import java.util.List;

/**
* A function that is invoked for batch loading a stream of data values indicated by the provided list of keys.
* <p>
* The function <b>must</b> call the provided {@link Subscriber} to process the values it has retrieved to allow
* the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available
* (rather than when all values have been retrieved).
* <p>
* <b>NOTE:</b> It is <b>required</b> that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as
* the provided keys and that you provide a value for every key provided.
*
* @param <K> type parameter indicating the type of keys to use for data load requests.
* @param <V> type parameter indicating the type of values returned
* @see BatchLoader for the non-reactive version
*/
public interface BatchPublisher<K, V> {
/**
* Called to batch the provided keys into a stream of values. You <b>must</b> provide
* the same number of values as there as keys, and they <b>must</b> be in the order of the keys.
* <p>
* The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides
* the values given the keys and then subscribe to it with the provided {@link Subscriber}.
* <p>
* <b>NOTE:</b> It is <b>required</b> that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as
* the provided keys and that you provide a value for every key provided.
*
* @param keys the collection of keys to load
* @param subscriber as values arrive you must call the subscriber for each value
*/
void load(List<K> keys, Subscriber<V> subscriber);
}
34 changes: 34 additions & 0 deletions src/main/java/org/dataloader/BatchPublisherWithContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.dataloader;

import org.reactivestreams.Subscriber;

import java.util.List;

/**
* This form of {@link BatchPublisher} is given a {@link org.dataloader.BatchLoaderEnvironment} object
* that encapsulates the calling context. A typical use case is passing in security credentials or database details
* for example.
* <p>
* See {@link BatchPublisher} for more details on the design invariants that you must implement in order to
* use this interface.
*/
public interface BatchPublisherWithContext<K, V> {
/**
* Called to batch the provided keys into a stream of values. You <b>must</b> provide
* the same number of values as there as keys, and they <b>must</b> be in the order of the keys.
* <p>
* The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides
* the values given the keys and then subscribe to it with the provided {@link Subscriber}.
* <p>
* <b>NOTE:</b> It is <b>required</b> that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as
* the provided keys and that you provide a value for every key provided.
* <p>
* This is given an environment object to that maybe be useful during the call. A typical use case
* is passing in security credentials or database details for example.
*
* @param keys the collection of keys to load
* @param subscriber as values arrive you must call the subscriber for each value
* @param environment an environment object that can help with the call
*/
void load(List<K> keys, Subscriber<V> subscriber, BatchLoaderEnvironment environment);
}
Loading
Loading