diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index bc9d5f8..01b89bc 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -13,11 +13,19 @@ jobs: MAVEN_CENTRAL_PGP_KEY: ${{ secrets.MAVEN_CENTRAL_PGP_KEY }} steps: - - uses: actions/checkout@v1 - - uses: gradle/wrapper-validation-action@v1 - - name: Set up JDK 1.8 - uses: actions/setup-java@v1 + - uses: actions/checkout@v4 + - uses: gradle/actions/wrapper-validation@v3 + - name: Set up JDK 11 + uses: actions/setup-java@v4 with: - java-version: '8.0.282' + java-version: '11' + distribution: 'temurin' + check-latest: true + # Configure Gradle for optimal use in GiHub Actions, including caching of downloaded dependencies. + # See: https://github.com/gradle/actions/blob/main/setup-gradle/README.md + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v4 - name: build test and publish run: ./gradlew assemble && ./gradlew check --info && ./gradlew publishToSonatype closeAndReleaseSonatypeStagingRepository -x check --info --stacktrace + env: + CI: true diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 13a366a..f16bf96 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -7,15 +7,25 @@ on: pull_request: branches: - master + - reactive-streams-branch + - '**' jobs: buildAndTest: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v1 - - uses: gradle/wrapper-validation-action@v1 - - name: Set up JDK 1.8 - uses: actions/setup-java@v1 + - uses: actions/checkout@v4 + - uses: gradle/actions/wrapper-validation@v3 + - name: Set up JDK 11 + uses: actions/setup-java@v4 with: - java-version: '8.0.282' + java-version: '11' + distribution: 'temurin' + check-latest: true + # Configure Gradle for optimal use in GiHub Actions, including caching of downloaded dependencies. + # See: https://github.com/gradle/actions/blob/main/setup-gradle/README.md + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v4 - name: build and test run: ./gradlew assemble && ./gradlew check --info --stacktrace + env: + CI: true diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b61d755..a574a68 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -17,11 +17,19 @@ jobs: RELEASE_VERSION: ${{ github.event.inputs.version }} steps: - - uses: actions/checkout@v1 - - uses: gradle/wrapper-validation-action@v1 - - name: Set up JDK 1.8 - uses: actions/setup-java@v1 + - uses: actions/checkout@v4 + - uses: gradle/actions/wrapper-validation@v3 + - name: Set up JDK 11 + uses: actions/setup-java@v4 with: - java-version: '8.0.282' + java-version: '11' + distribution: 'temurin' + check-latest: true + # Configure Gradle for optimal use in GiHub Actions, including caching of downloaded dependencies. + # See: https://github.com/gradle/actions/blob/main/setup-gradle/README.md + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v4 - name: build test and publish run: ./gradlew assemble && ./gradlew check --info && ./gradlew publishToSonatype closeAndReleaseSonatypeStagingRepository -x check --info --stacktrace + env: + CI: true diff --git a/README.md b/README.md index 24a65f6..fbfd620 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ [![Latest Release](https://maven-badges.herokuapp.com/maven-central/com.graphql-java/java-dataloader/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.graphql-java/java-dataloader/) [![Apache licensed](https://img.shields.io/hexpm/l/plug.svg?maxAge=2592000)](https://github.com/graphql-java/java-dataloader/blob/master/LICENSE) -This small and simple utility library is a pure Java 8 port of [Facebook DataLoader](https://github.com/facebook/dataloader). +This small and simple utility library is a pure Java 11 port of [Facebook DataLoader](https://github.com/facebook/dataloader). It can serve as integral part of your application's data layer to provide a consistent API over various back-ends and reduce message communication overhead through batching and caching. @@ -15,7 +15,7 @@ are resolved independently and, with a true graph of objects, you may be fetchin A naive implementation of graphql data fetchers can easily lead to the dreaded "n+1" fetch problem. Most of the code is ported directly from Facebook's reference implementation, with one IMPORTANT adaptation to make -it work for Java 8. ([more on this below](#manual-dispatching)). +it work for Java 11. ([more on this below](#manual-dispatching)). Before reading on, be sure to take a short dive into the [original documentation](https://github.com/facebook/dataloader/blob/master/README.md) provided by Lee Byron (@leebyron) @@ -67,7 +67,7 @@ repositories { } dependencies { - compile 'com.graphql-java:java-dataloader: 3.1.0' + compile 'com.graphql-java:java-dataloader: 3.3.0' } ``` @@ -286,6 +286,77 @@ For example, let's assume you want to load users from a database, you could prob // ... ``` +### Returning a stream of results from your batch publisher + +It may be that your batch loader function can use a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream. + +For example, let's say you wanted to load many users from a service without forcing the service to load all +users into its memory (which may exert considerable pressure on it). + +A `org.dataloader.BatchPublisher` may be used to load this data: + +```java + BatchPublisher batchPublisher = new BatchPublisher() { + @Override + public void load(List userIds, Subscriber userSubscriber) { + Publisher userResults = userManager.streamUsersById(userIds); + userResults.subscribe(userSubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher); + + // ... +``` + +Rather than waiting for all user values to be returned on one batch, this `DataLoader` will complete +the `CompletableFuture` returned by `Dataloader#load(Long)` as each value is +published. + +This pattern means that data loader values can (in theory) be satisfied more quickly than if we wait for +all results in the batch to be retrieved and hence the overall result may finish more quickly. + +If an exception is thrown, the remaining futures yet to be completed are completed +exceptionally. + +You *MUST* ensure that the values are streamed in the same order as the keys provided, +with the same cardinality (i.e. the number of values must match the number of keys). + +Failing to do so will result in incorrect data being returned from `DataLoader#load`. + +`BatchPublisher` is the reactive version of `BatchLoader`. + + +### Returning a mapped stream of results from your batch publisher + +Your publisher may not necessarily return values in the same order in which it processes keys and it +may not be able to find a value for each key presented. + +For example, let's say your batch publisher function loads user data which is spread across shards, +with some shards responding more quickly than others. + +In instances like these, `org.dataloader.MappedBatchPublisher` can be used. + +```java + MappedBatchPublisher mappedBatchPublisher = new MappedBatchPublisher() { + @Override + public void load(Set userIds, Subscriber> userEntrySubscriber) { + Publisher> userEntries = userManager.streamUsersById(userIds); + userEntries.subscribe(userEntrySubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher); + + // ... +``` + +Like the `BatchPublisher`, if an exception is thrown, the remaining futures yet to be completed are completed +exceptionally. + +Unlike the `BatchPublisher`, however, it is not necessary to return values in the same order as the provided keys, +or even the same number of values. + +`MappedBatchPublisher` is the reactive version of `MappedBatchLoader`. + ### Error object is not a thing in a type safe Java world In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is rejected @@ -541,6 +612,12 @@ The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invo return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + snooze(10); + scheduledCall.invoke(); + } }; ``` @@ -697,10 +774,10 @@ This library was originally written for use within a [VertX world](http://vertx. itself. All the heavy lifting has been done by this project : [vertx-dataloader](https://github.com/engagingspaces/vertx-dataloader) including the extensive testing (which itself came from Facebook). -This particular port was done to reduce the dependency on Vertx and to write a pure Java 8 implementation with no dependencies and also +This particular port was done to reduce the dependency on Vertx and to write a pure Java 11 implementation with no dependencies and also to use the more normative Java CompletableFuture. -[vertx-core](http://vertx.io/docs/vertx-core/java/) is not a lightweight library by any means so having a pure Java 8 implementation is +[vertx-core](http://vertx.io/docs/vertx-core/java/) is not a lightweight library by any means so having a pure Java 11 implementation is very desirable. diff --git a/build.gradle b/build.gradle index f5064ed..70d3918 100644 --- a/build.gradle +++ b/build.gradle @@ -3,12 +3,20 @@ import java.text.SimpleDateFormat plugins { id 'java' id 'java-library' + id 'jvm-test-suite' id 'maven-publish' id 'signing' - id "biz.aQute.bnd.builder" version "6.2.0" - id "io.github.gradle-nexus.publish-plugin" version "1.0.0" + id 'groovy' + id 'biz.aQute.bnd.builder' version '6.2.0' + id 'io.github.gradle-nexus.publish-plugin' version '1.0.0' + id 'com.github.ben-manes.versions' version '0.51.0' } +java { + toolchain { + languageVersion = JavaLanguageVersion.of(11) + } +} def getDevelopmentVersion() { def output = new StringBuilder() @@ -25,20 +33,10 @@ def getDevelopmentVersion() { version } -if (JavaVersion.current() != JavaVersion.VERSION_1_8) { - def msg = String.format("This build must be run with java 1.8 - you are running %s - gradle finds the JDK via JAVA_HOME=%s", - JavaVersion.current(), System.getenv("JAVA_HOME")) - throw new IllegalStateException(msg) -} - - -sourceCompatibility = 1.8 -targetCompatibility = 1.8 -def slf4jVersion = '1.7.30' def releaseVersion = System.env.RELEASE_VERSION version = releaseVersion ? releaseVersion : getDevelopmentVersion() group = 'com.graphql-java' -description = 'A pure Java 8 port of Facebook Dataloader' +description = 'A pure Java 11 port of Facebook Dataloader' gradle.buildFinished { buildResult -> println "*******************************" @@ -58,47 +56,64 @@ repositories { mavenLocal() } -apply plugin: 'groovy' - jar { manifest { attributes('Automatic-Module-Name': 'org.dataloader', - '-exportcontents': 'org.dataloader.*', - '-removeheaders': 'Private-Package') + '-exportcontents': 'org.dataloader.*', + '-removeheaders': 'Private-Package') } } dependencies { - api 'org.slf4j:slf4j-api:' + slf4jVersion - testImplementation 'org.slf4j:slf4j-simple:' + slf4jVersion - testImplementation 'junit:junit:4.12' - testImplementation 'org.awaitility:awaitility:2.0.0' - testImplementation 'com.github.ben-manes.caffeine:caffeine:2.9.0' + api "org.slf4j:slf4j-api:$slf4j_version" + api "org.reactivestreams:reactive-streams:$reactive_streams_version" } task sourcesJar(type: Jar) { dependsOn classes - classifier 'sources' + archiveClassifier.set('sources') from sourceSets.main.allSource } -task javadocJar(type: Jar, dependsOn: javadoc) { - classifier = 'javadoc' - from javadoc.destinationDir -} - javadoc { options.encoding = 'UTF-8' } +task javadocJar(type: Jar, dependsOn: javadoc) { + archiveClassifier.set('javadoc') + from javadoc.destinationDir +} + artifacts { archives sourcesJar archives javadocJar } -test { - testLogging { - exceptionFormat = 'full' +testing { + suites { + test { + useJUnitJupiter(junit_version) + dependencies { + // Testing dependencies + implementation platform("org.junit:junit-bom:$junit_version") + implementation 'org.junit.jupiter:junit-jupiter-api' + implementation 'org.junit.jupiter:junit-jupiter-params' + implementation 'org.junit.jupiter:junit-jupiter-engine' + implementation "org.slf4j:slf4j-simple:$slf4j_version" + implementation "org.awaitility:awaitility:$awaitility_version" + implementation "org.hamcrest:hamcrest:$hamcrest_version" + implementation "io.projectreactor:reactor-core:$reactor_core_version" + implementation "com.github.ben-manes.caffeine:caffeine:$caffeine_version" + } + + targets.configureEach { + testTask.configure { + testLogging { + exceptionFormat = 'full' + } + } + } + } } } @@ -117,7 +132,7 @@ publishing { asNode().children().last() + { resolveStrategy = Closure.DELEGATE_FIRST name 'java-dataloader' - description 'A pure Java 8 port of Facebook Dataloader' + description 'A pure Java 11 port of Facebook Dataloader' url 'https://github.com/graphql-java/java-dataloader' inceptionYear '2017' @@ -173,9 +188,15 @@ tasks.withType(PublishToMavenRepository) { dependsOn build } - -task myWrapper(type: Wrapper) { - gradleVersion = '6.6.1' - distributionUrl = "https://services.gradle.org/distributions/gradle-${gradleVersion}-all.zip" +def isNonStable = { String version -> + def stableKeyword = ['RELEASE', 'FINAL', 'GA'].any { it -> version.toUpperCase().contains(it) } + def regex = /^[0-9,.v-]+(-r)?$/ + return !stableKeyword && !(version ==~ regex) } +// https://github.com/ben-manes/gradle-versions-plugin +tasks.named("dependencyUpdates").configure { + rejectVersionIf { + isNonStable(it.candidate.version) + } +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 0394946..22d5603 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,27 @@ +# Project-wide Gradle settings. + +# For more details on how to configure your build environment visit +# http://www.gradle.org/docs/current/userguide/build_environment.html + +# Specifies the JVM arguments used for the daemon process. +# The setting is particularly useful for tweaking memory settings. +org.gradle.jvmargs=-Xmx4096m + +# When configured, Gradle will run in parallel mode. +# This option should only be used with decoupled projects. More details, visit +# http://www.gradle.org/docs/current/userguide/multi_project_builds.html#sec:decoupled_projects +org.gradle.parallel=true +org.gradle.caching=true + +# Bespoke settings. projectTitle = Java Dataloader -projectDescription = Port of Facebook Dataloader for Java \ No newline at end of file +projectDescription = Port of Facebook Dataloader for Java + +# Dependency versions. +junit_version=5.11.3 +hamcrest_version=2.2 +slf4j_version=1.7.30 +awaitility_version=2.0.0 +reactor_core_version=3.6.6 +caffeine_version=3.1.8 +reactive_streams_version=1.0.3 \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index d2880ba..e2847c8 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip +networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/settings.gradle b/settings.gradle index e69de29..47404e7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -0,0 +1,21 @@ +plugins { + id 'com.gradle.develocity' version '3.19' + id 'org.gradle.toolchains.foojay-resolver-convention' version '0.9.0' +} + +develocity { + buildScan { + final def isCI = System.getenv('CI') != null; + termsOfUseUrl = "https://gradle.com/help/legal-terms-of-use" + termsOfUseAgree = "yes" + publishing.onlyIf { true } + tag(isCI ? 'CI' : 'Local') + uploadInBackground = !isCI + } +} + +dependencyResolutionManagement { + repositories { + mavenCentral() + } +} \ No newline at end of file diff --git a/src/main/java/org/dataloader/BatchPublisher.java b/src/main/java/org/dataloader/BatchPublisher.java new file mode 100644 index 0000000..c499226 --- /dev/null +++ b/src/main/java/org/dataloader/BatchPublisher.java @@ -0,0 +1,36 @@ +package org.dataloader; + +import org.reactivestreams.Subscriber; + +import java.util.List; + +/** + * A function that is invoked for batch loading a stream of data values indicated by the provided list of keys. + *

+ * The function must call the provided {@link Subscriber} to process the values it has retrieved to allow + * the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available + * (rather than when all values have been retrieved). + *

+ * NOTE: It is required that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as + * the provided keys and that you provide a value for every key provided. + * + * @param type parameter indicating the type of keys to use for data load requests. + * @param type parameter indicating the type of values returned + * @see BatchLoader for the non-reactive version + */ +public interface BatchPublisher { + /** + * Called to batch the provided keys into a stream of values. You must provide + * the same number of values as there as keys, and they must be in the order of the keys. + *

+ * The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides + * the values given the keys and then subscribe to it with the provided {@link Subscriber}. + *

+ * NOTE: It is required that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as + * the provided keys and that you provide a value for every key provided. + * + * @param keys the collection of keys to load + * @param subscriber as values arrive you must call the subscriber for each value + */ + void load(List keys, Subscriber subscriber); +} diff --git a/src/main/java/org/dataloader/BatchPublisherWithContext.java b/src/main/java/org/dataloader/BatchPublisherWithContext.java new file mode 100644 index 0000000..4eadfe9 --- /dev/null +++ b/src/main/java/org/dataloader/BatchPublisherWithContext.java @@ -0,0 +1,34 @@ +package org.dataloader; + +import org.reactivestreams.Subscriber; + +import java.util.List; + +/** + * This form of {@link BatchPublisher} is given a {@link org.dataloader.BatchLoaderEnvironment} object + * that encapsulates the calling context. A typical use case is passing in security credentials or database details + * for example. + *

+ * See {@link BatchPublisher} for more details on the design invariants that you must implement in order to + * use this interface. + */ +public interface BatchPublisherWithContext { + /** + * Called to batch the provided keys into a stream of values. You must provide + * the same number of values as there as keys, and they must be in the order of the keys. + *

+ * The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides + * the values given the keys and then subscribe to it with the provided {@link Subscriber}. + *

+ * NOTE: It is required that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as + * the provided keys and that you provide a value for every key provided. + *

+ * This is given an environment object to that maybe be useful during the call. A typical use case + * is passing in security credentials or database details for example. + * + * @param keys the collection of keys to load + * @param subscriber as values arrive you must call the subscriber for each value + * @param environment an environment object that can help with the call + */ + void load(List keys, Subscriber subscriber, BatchLoaderEnvironment environment); +} diff --git a/src/main/java/org/dataloader/CacheMap.java b/src/main/java/org/dataloader/CacheMap.java index 48d0f41..1a4a455 100644 --- a/src/main/java/org/dataloader/CacheMap.java +++ b/src/main/java/org/dataloader/CacheMap.java @@ -65,8 +65,7 @@ static CacheMap simpleMap() { /** * Gets the specified key from the cache map. *

- * May throw an exception if the key does not exist, depending on the cache map implementation that is used, - * so be sure to check {@link CacheMap#containsKey(Object)} first. + * May throw an exception if the key does not exist, depending on the cache map implementation that is used. * * @param key the key to retrieve * diff --git a/src/main/java/org/dataloader/DataLoader.java b/src/main/java/org/dataloader/DataLoader.java index 1e4ce7d..dc4b726 100644 --- a/src/main/java/org/dataloader/DataLoader.java +++ b/src/main/java/org/dataloader/DataLoader.java @@ -25,10 +25,7 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; @@ -574,6 +571,35 @@ public CompletableFuture> loadMany(List keys, List keyContext } } + /** + * Requests to load the map of data provided by the specified keys asynchronously, and returns a composite future + * of the resulting values. + *

+ * If batching is enabled (the default), you'll have to call {@link DataLoader#dispatch()} at a later stage to + * start batch execution. If you forget this call the future will never be completed (unless already completed, + * and returned from cache). + *

+ * The key context object may be useful in the batch loader interfaces such as {@link org.dataloader.BatchLoaderWithContext} or + * {@link org.dataloader.MappedBatchLoaderWithContext} to help retrieve data. + * + * @param keysAndContexts the map of keys to their respective contexts + * + * @return the composite future of the map of keys and values + */ + public CompletableFuture> loadMany(Map keysAndContexts) { + nonNull(keysAndContexts); + + synchronized (this) { + Map> collect = new HashMap<>(keysAndContexts.size()); + for (Map.Entry entry : keysAndContexts.entrySet()) { + K key = entry.getKey(); + Object keyContext = entry.getValue(); + collect.put(key, load(key, keyContext)); + } + return CompletableFutureKit.allOf(collect); + } + } + /** * Dispatches the queued load requests to the batch execution function and returns a promise of the result. *

diff --git a/src/main/java/org/dataloader/DataLoaderFactory.java b/src/main/java/org/dataloader/DataLoaderFactory.java index 013f473..db14f2e 100644 --- a/src/main/java/org/dataloader/DataLoaderFactory.java +++ b/src/main/java/org/dataloader/DataLoaderFactory.java @@ -278,6 +278,274 @@ public static DataLoader newMappedDataLoaderWithTry(MappedBatchLoad return mkDataLoader(batchLoadFunction, options); } + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size). + * + * @param batchLoadFunction the batch load function to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoader(BatchPublisher batchLoadFunction) { + return newPublisherDataLoader(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function with the provided options + * + * @param batchLoadFunction the batch load function to use + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoader(BatchPublisher batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size) where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + *

+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then + * you can use this form to create the data loader. + *

+ * Using Try objects allows you to capture a value returned or an exception that might + * have occurred trying to get a value. . + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoaderWithTry(BatchPublisher> batchLoadFunction) { + return newPublisherDataLoaderWithTry(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function and with the provided options + * where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + * + * @see #newDataLoaderWithTry(BatchLoader) + */ + public static DataLoader newPublisherDataLoaderWithTry(BatchPublisher> batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size). + * + * @param batchLoadFunction the batch load function to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoader(BatchPublisherWithContext batchLoadFunction) { + return newPublisherDataLoader(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function with the provided options + * + * @param batchLoadFunction the batch load function to use + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoader(BatchPublisherWithContext batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size) where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + *

+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then + * you can use this form to create the data loader. + *

+ * Using Try objects allows you to capture a value returned or an exception that might + * have occurred trying to get a value. . + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoaderWithTry(BatchPublisherWithContext> batchLoadFunction) { + return newPublisherDataLoaderWithTry(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function and with the provided options + * where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + * + * @see #newPublisherDataLoaderWithTry(BatchPublisher) + */ + public static DataLoader newPublisherDataLoaderWithTry(BatchPublisherWithContext> batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size). + * + * @param batchLoadFunction the batch load function to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisher batchLoadFunction) { + return newMappedPublisherDataLoader(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function with the provided options + * + * @param batchLoadFunction the batch load function to use + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisher batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size) where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + *

+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then + * you can use this form to create the data loader. + *

+ * Using Try objects allows you to capture a value returned or an exception that might + * have occurred trying to get a value. . + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisher> batchLoadFunction) { + return newMappedPublisherDataLoaderWithTry(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function and with the provided options + * where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + * + * @see #newDataLoaderWithTry(BatchLoader) + */ + public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisher> batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size). + * + * @param batchLoadFunction the batch load function to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisherWithContext batchLoadFunction) { + return newMappedPublisherDataLoader(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function with the provided options + * + * @param batchLoadFunction the batch load function to use + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisherWithContext batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size) where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + *

+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then + * you can use this form to create the data loader. + *

+ * Using Try objects allows you to capture a value returned or an exception that might + * have occurred trying to get a value. . + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisherWithContext> batchLoadFunction) { + return newMappedPublisherDataLoaderWithTry(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function and with the provided options + * where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + * + * @see #newMappedPublisherDataLoaderWithTry(MappedBatchPublisher) + */ + public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisherWithContext> batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + static DataLoader mkDataLoader(Object batchLoadFunction, DataLoaderOptions options) { return new DataLoader<>(batchLoadFunction, options); } diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 1e48a94..9cd38d6 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -3,6 +3,7 @@ import org.dataloader.annotations.GuardedBy; import org.dataloader.annotations.Internal; import org.dataloader.impl.CompletableFutureKit; +import org.dataloader.reactive.ReactiveSupport; import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.StatisticsCollector; import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext; @@ -10,6 +11,7 @@ import org.dataloader.stats.context.IncrementCacheHitCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext; +import org.reactivestreams.Subscriber; import java.time.Clock; import java.time.Instant; @@ -110,9 +112,13 @@ Optional> getIfPresent(K key) { boolean cachingEnabled = loaderOptions.cachingEnabled(); if (cachingEnabled) { Object cacheKey = getCacheKey(nonNull(key)); - if (futureCache.containsKey(cacheKey)) { - stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key)); - return Optional.of(futureCache.get(cacheKey)); + try { + CompletableFuture cacheValue = futureCache.get(cacheKey); + if (cacheValue != null) { + stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key)); + return Optional.of(cacheValue); + } + } catch (Exception ignored) { } } } @@ -148,11 +154,13 @@ CompletableFuture load(K key, Object loadContext) { } } + @SuppressWarnings("unchecked") Object getCacheKey(K key) { return loaderOptions.cacheKeyFunction().isPresent() ? loaderOptions.cacheKeyFunction().get().getKey(key) : key; } + @SuppressWarnings("unchecked") Object getCacheKeyWithContext(K key, Object context) { return loaderOptions.cacheKeyFunction().isPresent() ? loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key; @@ -237,10 +245,14 @@ private CompletableFuture> sliceIntoBatchesOfBatches(List keys, List< @SuppressWarnings("unchecked") private CompletableFuture> dispatchQueueBatch(List keys, List callContexts, List> queuedFutures) { stats.incrementBatchLoadCountBy(keys.size(), new IncrementBatchLoadCountByStatisticsContext<>(keys, callContexts)); - CompletableFuture> batchLoad = invokeLoader(keys, callContexts, loaderOptions.cachingEnabled()); + CompletableFuture> batchLoad = invokeLoader(keys, callContexts, queuedFutures, loaderOptions.cachingEnabled()); return batchLoad .thenApply(values -> { assertResultSize(keys, values); + if (isPublisher() || isMappedPublisher()) { + // We have already completed the queued futures by the time the overall batchLoad future has completed. + return values; + } List clearCacheKeys = new ArrayList<>(); for (int idx = 0; idx < queuedFutures.size(); idx++) { @@ -307,10 +319,14 @@ private void possiblyClearCacheEntriesOnExceptions(List keys) { private CompletableFuture loadFromCache(K key, Object loadContext, boolean batchingEnabled) { final Object cacheKey = loadContext == null ? getCacheKey(key) : getCacheKeyWithContext(key, loadContext); - if (futureCache.containsKey(cacheKey)) { - // We already have a promise for this key, no need to check value cache or queue up load - stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); - return futureCache.get(cacheKey); + try { + CompletableFuture cacheValue = futureCache.get(cacheKey); + if (cacheValue != null) { + // We already have a promise for this key, no need to check value cache or queue up load + stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); + return cacheValue; + } + } catch (Exception ignored) { } CompletableFuture loadCallFuture = queueOrInvokeLoader(key, loadContext, batchingEnabled, true); @@ -334,14 +350,15 @@ private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, bool CompletableFuture invokeLoaderImmediately(K key, Object keyContext, boolean cachingEnabled) { List keys = singletonList(key); List keyContexts = singletonList(keyContext); - return invokeLoader(keys, keyContexts, cachingEnabled) + List> queuedFutures = singletonList(new CompletableFuture<>()); + return invokeLoader(keys, keyContexts, queuedFutures, cachingEnabled) .thenApply(list -> list.get(0)) .toCompletableFuture(); } - CompletableFuture> invokeLoader(List keys, List keyContexts, boolean cachingEnabled) { + CompletableFuture> invokeLoader(List keys, List keyContexts, List> queuedFutures, boolean cachingEnabled) { if (!cachingEnabled) { - return invokeLoader(keys, keyContexts); + return invokeLoader(keys, keyContexts, queuedFutures); } CompletableFuture>> cacheCallCF = getFromValueCache(keys); return cacheCallCF.thenCompose(cachedValues -> { @@ -352,6 +369,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, List missedKeyIndexes = new ArrayList<>(); List missedKeys = new ArrayList<>(); List missedKeyContexts = new ArrayList<>(); + List> missedQueuedFutures = new ArrayList<>(); // if they return a ValueCachingNotSupported exception then we insert this special marker value, and it // means it's a total miss, we need to get all these keys via the batch loader @@ -361,6 +379,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, missedKeyIndexes.add(i); missedKeys.add(keys.get(i)); missedKeyContexts.add(keyContexts.get(i)); + missedQueuedFutures.add(queuedFutures.get(i)); } } else { assertState(keys.size() == cachedValues.size(), () -> "The size of the cached values MUST be the same size as the key list"); @@ -385,7 +404,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, // we missed some keys from cache, so send them to the batch loader // and then fill in their values // - CompletableFuture> batchLoad = invokeLoader(missedKeys, missedKeyContexts); + CompletableFuture> batchLoad = invokeLoader(missedKeys, missedKeyContexts, missedQueuedFutures); return batchLoad.thenCompose(missedValues -> { assertResultSize(missedKeys, missedValues); @@ -404,8 +423,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, }); } - - CompletableFuture> invokeLoader(List keys, List keyContexts) { + CompletableFuture> invokeLoader(List keys, List keyContexts, List> queuedFutures) { CompletableFuture> batchLoad; try { Object context = loaderOptions.getBatchLoaderContextProvider().getContext(); @@ -413,6 +431,10 @@ CompletableFuture> invokeLoader(List keys, List keyContexts) .context(context).keyContexts(keys, keyContexts).build(); if (isMapLoader()) { batchLoad = invokeMapBatchLoader(keys, environment); + } else if (isPublisher()) { + batchLoad = invokeBatchPublisher(keys, keyContexts, queuedFutures, environment); + } else if (isMappedPublisher()) { + batchLoad = invokeMappedBatchPublisher(keys, keyContexts, queuedFutures, environment); } else { batchLoad = invokeListBatchLoader(keys, environment); } @@ -484,10 +506,72 @@ private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoade }); } + private CompletableFuture> invokeBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { + CompletableFuture> loadResult = new CompletableFuture<>(); + Subscriber subscriber = ReactiveSupport.batchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); + + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); + if (batchLoadFunction instanceof BatchPublisherWithContext) { + //noinspection unchecked + BatchPublisherWithContext loadFunction = (BatchPublisherWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber, environment); + batchLoaderScheduler.scheduleBatchPublisher(loadCall, keys, environment); + } else { + loadFunction.load(keys, subscriber, environment); + } + } else { + //noinspection unchecked + BatchPublisher loadFunction = (BatchPublisher) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber); + batchLoaderScheduler.scheduleBatchPublisher(loadCall, keys, null); + } else { + loadFunction.load(keys, subscriber); + } + } + return loadResult; + } + + private CompletableFuture> invokeMappedBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { + CompletableFuture> loadResult = new CompletableFuture<>(); + Subscriber> subscriber = ReactiveSupport.mappedBatchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); + Set setOfKeys = new LinkedHashSet<>(keys); + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); + if (batchLoadFunction instanceof MappedBatchPublisherWithContext) { + //noinspection unchecked + MappedBatchPublisherWithContext loadFunction = (MappedBatchPublisherWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber, environment); + batchLoaderScheduler.scheduleBatchPublisher(loadCall, keys, environment); + } else { + loadFunction.load(keys, subscriber, environment); + } + } else { + //noinspection unchecked + MappedBatchPublisher loadFunction = (MappedBatchPublisher) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(setOfKeys, subscriber); + batchLoaderScheduler.scheduleBatchPublisher(loadCall, keys, null); + } else { + loadFunction.load(setOfKeys, subscriber); + } + } + return loadResult; + } + private boolean isMapLoader() { return batchLoadFunction instanceof MappedBatchLoader || batchLoadFunction instanceof MappedBatchLoaderWithContext; } + private boolean isPublisher() { + return batchLoadFunction instanceof BatchPublisher; + } + + private boolean isMappedPublisher() { + return batchLoadFunction instanceof MappedBatchPublisher; + } + int dispatchDepth() { synchronized (dataLoader) { return loaderQueue.size(); @@ -538,4 +622,23 @@ private CompletableFuture> setToValueCache(List assembledValues, List private static DispatchResult emptyDispatchResult() { return (DispatchResult) EMPTY_DISPATCH_RESULT; } + + private ReactiveSupport.HelperIntegration helperIntegration() { + return new ReactiveSupport.HelperIntegration<>() { + @Override + public StatisticsCollector getStats() { + return stats; + } + + @Override + public void clearCacheView(K key) { + dataLoader.clear(key); + } + + @Override + public void clearCacheEntriesOnExceptions(List keys) { + possiblyClearCacheEntriesOnExceptions(keys); + } + }; + } } diff --git a/src/main/java/org/dataloader/MappedBatchPublisher.java b/src/main/java/org/dataloader/MappedBatchPublisher.java new file mode 100644 index 0000000..754ee52 --- /dev/null +++ b/src/main/java/org/dataloader/MappedBatchPublisher.java @@ -0,0 +1,30 @@ +package org.dataloader; + +import org.reactivestreams.Subscriber; + +import java.util.Map; +import java.util.Set; + +/** + * A function that is invoked for batch loading a stream of data values indicated by the provided list of keys. + *

+ * The function must call the provided {@link Subscriber} to process the key/value pairs it has retrieved to allow + * the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available + * (rather than when all values have been retrieved). + * + * @param type parameter indicating the type of keys to use for data load requests. + * @param type parameter indicating the type of values returned + * @see MappedBatchLoader for the non-reactive version + */ +public interface MappedBatchPublisher { + /** + * Called to batch the provided keys into a stream of map entries of keys and values. + *

+ * The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides + * the values given the keys and then subscribe to it with the provided {@link Subscriber}. + * + * @param keys the collection of keys to load + * @param subscriber as values arrive you must call the subscriber for each value + */ + void load(Set keys, Subscriber> subscriber); +} diff --git a/src/main/java/org/dataloader/MappedBatchPublisherWithContext.java b/src/main/java/org/dataloader/MappedBatchPublisherWithContext.java new file mode 100644 index 0000000..2e94152 --- /dev/null +++ b/src/main/java/org/dataloader/MappedBatchPublisherWithContext.java @@ -0,0 +1,32 @@ +package org.dataloader; + +import org.reactivestreams.Subscriber; + +import java.util.List; +import java.util.Map; + +/** + * This form of {@link MappedBatchPublisher} is given a {@link org.dataloader.BatchLoaderEnvironment} object + * that encapsulates the calling context. A typical use case is passing in security credentials or database details + * for example. + *

+ * See {@link MappedBatchPublisher} for more details on the design invariants that you must implement in order to + * use this interface. + */ +public interface MappedBatchPublisherWithContext { + + /** + * Called to batch the provided keys into a stream of map entries of keys and values. + *

+ * The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides + * the values given the keys and then subscribe to it with the provided {@link Subscriber}. + *

+ * This is given an environment object to that maybe be useful during the call. A typical use case + * is passing in security credentials or database details for example. + * + * @param keys the collection of keys to load + * @param subscriber as values arrive you must call the subscriber for each value + * @param environment an environment object that can help with the call + */ + void load(List keys, Subscriber> subscriber, BatchLoaderEnvironment environment); +} diff --git a/src/main/java/org/dataloader/impl/CompletableFutureKit.java b/src/main/java/org/dataloader/impl/CompletableFutureKit.java index 2b94d10..ebc35ec 100644 --- a/src/main/java/org/dataloader/impl/CompletableFutureKit.java +++ b/src/main/java/org/dataloader/impl/CompletableFutureKit.java @@ -3,8 +3,10 @@ import org.dataloader.annotations.Internal; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; @@ -48,10 +50,21 @@ public static boolean failed(CompletableFuture future) { } public static CompletableFuture> allOf(List> cfs) { - return CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])) + return CompletableFuture.allOf(cfs.toArray(CompletableFuture[]::new)) .thenApply(v -> cfs.stream() .map(CompletableFuture::join) .collect(toList()) ); } + + public static CompletableFuture> allOf(Map> cfs) { + return CompletableFuture.allOf(cfs.values().toArray(CompletableFuture[]::new)) + .thenApply(v -> cfs.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + task -> task.getValue().join()) + ) + ); + } } diff --git a/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java new file mode 100644 index 0000000..c2f5438 --- /dev/null +++ b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java @@ -0,0 +1,104 @@ +package org.dataloader.reactive; + +import org.dataloader.Try; +import org.dataloader.stats.context.IncrementBatchLoadExceptionCountStatisticsContext; +import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import static org.dataloader.impl.Assertions.assertState; + +/** + * The base class for our reactive subscriber support + * + * @param for two + */ +abstract class AbstractBatchSubscriber implements Subscriber { + + final CompletableFuture> valuesFuture; + final List keys; + final List callContexts; + final List> queuedFutures; + final ReactiveSupport.HelperIntegration helperIntegration; + + List clearCacheKeys = new ArrayList<>(); + List completedValues = new ArrayList<>(); + boolean onErrorCalled = false; + boolean onCompleteCalled = false; + + AbstractBatchSubscriber( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + this.valuesFuture = valuesFuture; + this.keys = keys; + this.callContexts = callContexts; + this.queuedFutures = queuedFutures; + this.helperIntegration = helperIntegration; + } + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(keys.size()); + } + + @Override + public void onNext(T v) { + assertState(!onErrorCalled, () -> "onError has already been called; onNext may not be invoked."); + assertState(!onCompleteCalled, () -> "onComplete has already been called; onNext may not be invoked."); + } + + @Override + public void onComplete() { + assertState(!onErrorCalled, () -> "onError has already been called; onComplete may not be invoked."); + onCompleteCalled = true; + } + + @Override + public void onError(Throwable throwable) { + assertState(!onCompleteCalled, () -> "onComplete has already been called; onError may not be invoked."); + onErrorCalled = true; + + helperIntegration.getStats().incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts)); + } + + /* + * A value has arrived - how do we complete the future that's associated with it in a common way + */ + void onNextValue(K key, V value, Object callContext, List> futures) { + if (value instanceof Try) { + // we allow the batch loader to return a Try so we can better represent a computation + // that might have worked or not. + //noinspection unchecked + Try tryValue = (Try) value; + if (tryValue.isSuccess()) { + futures.forEach(f -> f.complete(tryValue.get())); + } else { + helperIntegration.getStats().incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); + futures.forEach(f -> f.completeExceptionally(tryValue.getThrowable())); + clearCacheKeys.add(key); + } + } else { + futures.forEach(f -> f.complete(value)); + } + } + + Throwable unwrapThrowable(Throwable ex) { + if (ex instanceof CompletionException) { + ex = ex.getCause(); + } + return ex; + } + + void possiblyClearCacheEntriesOnExceptions() { + helperIntegration.clearCacheEntriesOnExceptions(clearCacheKeys); + } +} diff --git a/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java b/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java new file mode 100644 index 0000000..d0b8110 --- /dev/null +++ b/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java @@ -0,0 +1,86 @@ +package org.dataloader.reactive; + +import org.dataloader.impl.DataLoaderAssertionException; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * This class can be used to subscribe to a {@link org.reactivestreams.Publisher} and then + * have the values it receives complete the data loader keys. The keys and values must be + * in index order. + *

+ * This is a reactive version of {@link org.dataloader.BatchLoader} + * + * @param the type of keys + * @param the type of values + */ +class BatchSubscriberImpl extends AbstractBatchSubscriber { + + private int idx = 0; + + BatchSubscriberImpl( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + } + + // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee + // correctness (at the cost of speed). + @Override + public synchronized void onNext(V value) { + super.onNext(value); + + if (idx >= keys.size()) { + // hang on they have given us more values than we asked for in keys + // we cant handle this + return; + } + K key = keys.get(idx); + Object callContext = callContexts.get(idx); + CompletableFuture future = queuedFutures.get(idx); + onNextValue(key, value, callContext, List.of(future)); + + completedValues.add(value); + idx++; + } + + + @Override + public synchronized void onComplete() { + super.onComplete(); + if (keys.size() != completedValues.size()) { + // we have more or less values than promised + // we will go through all the outstanding promises and mark those that + // have not finished as failed + for (CompletableFuture queuedFuture : queuedFutures) { + if (!queuedFuture.isDone()) { + queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list")); + } + } + } + possiblyClearCacheEntriesOnExceptions(); + valuesFuture.complete(completedValues); + } + + @Override + public synchronized void onError(Throwable ex) { + super.onError(ex); + ex = unwrapThrowable(ex); + // Set the remaining keys to the exception. + for (int i = idx; i < queuedFutures.size(); i++) { + K key = keys.get(i); + CompletableFuture future = queuedFutures.get(i); + if (!future.isDone()) { + future.completeExceptionally(ex); + // clear any cached view of this key because it failed + helperIntegration.clearCacheView(key); + } + } + valuesFuture.completeExceptionally(ex); + } +} diff --git a/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java b/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java new file mode 100644 index 0000000..d56efa0 --- /dev/null +++ b/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java @@ -0,0 +1,103 @@ +package org.dataloader.reactive; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * This class can be used to subscribe to a {@link org.reactivestreams.Publisher} and then + * have the values it receives complete the data loader keys in a map lookup fashion. + *

+ * This is a reactive version of {@link org.dataloader.MappedBatchLoader} + * + * @param the type of keys + * @param the type of values + */ +class MappedBatchSubscriberImpl extends AbstractBatchSubscriber> { + + private final Map callContextByKey; + private final Map>> queuedFuturesByKey; + private final Map completedValuesByKey = new HashMap<>(); + + + MappedBatchSubscriberImpl( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + this.callContextByKey = new HashMap<>(); + this.queuedFuturesByKey = new HashMap<>(); + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + Object callContext = callContexts.get(idx); + CompletableFuture queuedFuture = queuedFutures.get(idx); + callContextByKey.put(key, callContext); + queuedFuturesByKey.computeIfAbsent(key, k -> new ArrayList<>()).add(queuedFuture); + } + } + + + @Override + public synchronized void onNext(Map.Entry entry) { + super.onNext(entry); + K key = entry.getKey(); + V value = entry.getValue(); + + Object callContext = callContextByKey.get(key); + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); + + onNextValue(key, value, callContext, futures); + + // did we have an actual key for this value - ignore it if they send us one outside the key set + if (!futures.isEmpty()) { + completedValuesByKey.put(key, value); + } + } + + @Override + public synchronized void onComplete() { + super.onComplete(); + + possiblyClearCacheEntriesOnExceptions(); + List values = new ArrayList<>(keys.size()); + for (K key : keys) { + V value = completedValuesByKey.get(key); + values.add(value); + + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); + for (CompletableFuture future : futures) { + if (!future.isDone()) { + // we have a future that never came back for that key + // but the publisher is done sending in data - it must be null + // e.g. for key X when found no value + future.complete(null); + } + } + } + valuesFuture.complete(values); + } + + @Override + public synchronized void onError(Throwable ex) { + super.onError(ex); + ex = unwrapThrowable(ex); + // Complete the futures for the remaining keys with the exception. + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + List> futures = queuedFuturesByKey.get(key); + if (!completedValuesByKey.containsKey(key)) { + for (CompletableFuture future : futures) { + future.completeExceptionally(ex); + } + // clear any cached view of this key because they all failed + helperIntegration.clearCacheView(key); + } + } + valuesFuture.completeExceptionally(ex); + } +} diff --git a/src/main/java/org/dataloader/reactive/ReactiveSupport.java b/src/main/java/org/dataloader/reactive/ReactiveSupport.java new file mode 100644 index 0000000..fc03bb0 --- /dev/null +++ b/src/main/java/org/dataloader/reactive/ReactiveSupport.java @@ -0,0 +1,45 @@ +package org.dataloader.reactive; + +import org.dataloader.stats.StatisticsCollector; +import org.reactivestreams.Subscriber; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class ReactiveSupport { + + public static Subscriber batchSubscriber( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + return new BatchSubscriberImpl<>(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + } + + public static Subscriber> mappedBatchSubscriber( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + return new MappedBatchSubscriberImpl<>(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + } + + /** + * Just some callbacks to the data loader code to do common tasks + * + * @param for keys + */ + public interface HelperIntegration { + + StatisticsCollector getStats(); + + void clearCacheView(K key); + + void clearCacheEntriesOnExceptions(List keys); + } +} diff --git a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java index 7cddd54..e7e95d9 100644 --- a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java +++ b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java @@ -5,6 +5,8 @@ import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; import org.dataloader.MappedBatchLoader; +import org.dataloader.MappedBatchPublisher; +import org.dataloader.BatchPublisher; import java.util.List; import java.util.Map; @@ -42,6 +44,13 @@ interface ScheduledMappedBatchLoaderCall { CompletionStage> invoke(); } + /** + * This represents a callback that will invoke a {@link BatchPublisher} or {@link MappedBatchPublisher} function under the covers + */ + interface ScheduledBatchPublisherCall { + void invoke(); + } + /** * This is called to schedule a {@link BatchLoader} call. * @@ -71,4 +80,16 @@ interface ScheduledMappedBatchLoaderCall { * @return a promise to the values that come from the {@link BatchLoader} */ CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment); + + /** + * This is called to schedule a {@link BatchPublisher} call. + * + * @param scheduledCall the callback that needs to be invoked to allow the {@link BatchPublisher} to proceed. + * @param keys this is the list of keys that will be passed to the {@link BatchPublisher}. + * This is provided only for informative reasons and, you can't change the keys that are used + * @param environment this is the {@link BatchLoaderEnvironment} in place, + * which can be null if it's a simple {@link BatchPublisher} call + * @param the key type + */ + void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment); } diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index d25dfa7..9e30c90 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -1,11 +1,13 @@ import org.dataloader.BatchLoader; import org.dataloader.BatchLoaderEnvironment; import org.dataloader.BatchLoaderWithContext; +import org.dataloader.BatchPublisher; import org.dataloader.CacheMap; import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderOptions; import org.dataloader.MappedBatchLoaderWithContext; +import org.dataloader.MappedBatchPublisher; import org.dataloader.Try; import org.dataloader.fixtures.SecurityCtx; import org.dataloader.fixtures.User; @@ -15,6 +17,8 @@ import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.Statistics; import org.dataloader.stats.ThreadLocalStatisticsCollector; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import java.time.Duration; import java.util.ArrayList; @@ -171,7 +175,7 @@ private void tryExample() { } } - private void tryBatcLoader() { + private void tryBatchLoader() { DataLoader dataLoader = DataLoaderFactory.newDataLoaderWithTry(new BatchLoader>() { @Override public CompletionStage>> load(List keys) { @@ -187,6 +191,28 @@ public CompletionStage>> load(List keys) { }); } + private void batchPublisher() { + BatchPublisher batchPublisher = new BatchPublisher() { + @Override + public void load(List userIds, Subscriber userSubscriber) { + Publisher userResults = userManager.streamUsersById(userIds); + userResults.subscribe(userSubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher); + } + + private void mappedBatchPublisher() { + MappedBatchPublisher mappedBatchPublisher = new MappedBatchPublisher() { + @Override + public void load(Set userIds, Subscriber> userEntrySubscriber) { + Publisher> userEntries = userManager.streamUsersById(userIds); + userEntries.subscribe(userEntrySubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher); + } + DataLoader userDataLoader; private void clearCacheOnError() { @@ -304,6 +330,12 @@ public CompletionStage> scheduleMappedBatchLoader(ScheduledMapp return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + snooze(10); + scheduledCall.invoke(); + } }; } diff --git a/src/test/java/org/dataloader/DataLoaderBatchLoaderEnvironmentTest.java b/src/test/java/org/dataloader/DataLoaderBatchLoaderEnvironmentTest.java index 36e0ed4..90adbc5 100644 --- a/src/test/java/org/dataloader/DataLoaderBatchLoaderEnvironmentTest.java +++ b/src/test/java/org/dataloader/DataLoaderBatchLoaderEnvironmentTest.java @@ -1,11 +1,8 @@ package org.dataloader; -import org.junit.Test; +import org.junit.jupiter.api.Test; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -14,8 +11,8 @@ import static java.util.Collections.singletonList; import static org.dataloader.DataLoaderFactory.newDataLoader; import static org.dataloader.DataLoaderFactory.newMappedDataLoader; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; /** * Tests related to context. DataLoaderTest is getting to big and needs refactoring @@ -50,10 +47,14 @@ public void context_is_passed_to_batch_loader_function() { loader.load("A"); loader.load("B"); loader.loadMany(asList("C", "D")); + Map keysAndContexts = new LinkedHashMap<>(); + keysAndContexts.put("E", null); + keysAndContexts.put("F", null); + loader.loadMany(keysAndContexts); List results = loader.dispatchAndJoin(); - assertThat(results, equalTo(asList("A-ctx", "B-ctx", "C-ctx", "D-ctx"))); + assertThat(results, equalTo(asList("A-ctx", "B-ctx", "C-ctx", "D-ctx", "E-ctx", "F-ctx"))); } @Test @@ -66,10 +67,14 @@ public void key_contexts_are_passed_to_batch_loader_function() { loader.load("A", "aCtx"); loader.load("B", "bCtx"); loader.loadMany(asList("C", "D"), asList("cCtx", "dCtx")); + Map keysAndContexts = new LinkedHashMap<>(); + keysAndContexts.put("E", "eCtx"); + keysAndContexts.put("F", "fCtx"); + loader.loadMany(keysAndContexts); List results = loader.dispatchAndJoin(); - assertThat(results, equalTo(asList("A-ctx-m:aCtx-l:aCtx", "B-ctx-m:bCtx-l:bCtx", "C-ctx-m:cCtx-l:cCtx", "D-ctx-m:dCtx-l:dCtx"))); + assertThat(results, equalTo(asList("A-ctx-m:aCtx-l:aCtx", "B-ctx-m:bCtx-l:bCtx", "C-ctx-m:cCtx-l:cCtx", "D-ctx-m:dCtx-l:dCtx", "E-ctx-m:eCtx-l:eCtx", "F-ctx-m:fCtx-l:fCtx"))); } @Test @@ -82,12 +87,17 @@ public void key_contexts_are_passed_to_batch_loader_function_when_batching_disab CompletableFuture aLoad = loader.load("A", "aCtx"); CompletableFuture bLoad = loader.load("B", "bCtx"); - CompletableFuture> canDLoad = loader.loadMany(asList("C", "D"), asList("cCtx", "dCtx")); + CompletableFuture> cAndDLoad = loader.loadMany(asList("C", "D"), asList("cCtx", "dCtx")); + Map keysAndContexts = new LinkedHashMap<>(); + keysAndContexts.put("E", "eCtx"); + keysAndContexts.put("F", "fCtx"); + CompletableFuture> eAndFLoad = loader.loadMany(keysAndContexts); List results = new ArrayList<>(asList(aLoad.join(), bLoad.join())); - results.addAll(canDLoad.join()); + results.addAll(cAndDLoad.join()); + results.addAll(eAndFLoad.join().values()); - assertThat(results, equalTo(asList("A-ctx-m:aCtx-l:aCtx", "B-ctx-m:bCtx-l:bCtx", "C-ctx-m:cCtx-l:cCtx", "D-ctx-m:dCtx-l:dCtx"))); + assertThat(results, equalTo(asList("A-ctx-m:aCtx-l:aCtx", "B-ctx-m:bCtx-l:bCtx", "C-ctx-m:cCtx-l:cCtx", "D-ctx-m:dCtx-l:dCtx", "E-ctx-m:eCtx-l:eCtx", "F-ctx-m:fCtx-l:fCtx"))); } @Test @@ -101,9 +111,14 @@ public void missing_key_contexts_are_passed_to_batch_loader_function() { loader.load("B"); loader.loadMany(asList("C", "D"), singletonList("cCtx")); + Map keysAndContexts = new LinkedHashMap<>(); + keysAndContexts.put("E", "eCtx"); + keysAndContexts.put("F", null); + loader.loadMany(keysAndContexts); + List results = loader.dispatchAndJoin(); - assertThat(results, equalTo(asList("A-ctx-m:aCtx-l:aCtx", "B-ctx-m:null-l:null", "C-ctx-m:cCtx-l:cCtx", "D-ctx-m:null-l:null"))); + assertThat(results, equalTo(asList("A-ctx-m:aCtx-l:aCtx", "B-ctx-m:null-l:null", "C-ctx-m:cCtx-l:cCtx", "D-ctx-m:null-l:null", "E-ctx-m:eCtx-l:eCtx", "F-ctx-m:null-l:null"))); } @Test @@ -125,9 +140,14 @@ public void context_is_passed_to_map_batch_loader_function() { loader.load("B"); loader.loadMany(asList("C", "D"), singletonList("cCtx")); + Map keysAndContexts = new LinkedHashMap<>(); + keysAndContexts.put("E", "eCtx"); + keysAndContexts.put("F", null); + loader.loadMany(keysAndContexts); + List results = loader.dispatchAndJoin(); - assertThat(results, equalTo(asList("A-ctx-aCtx", "B-ctx-null", "C-ctx-cCtx", "D-ctx-null"))); + assertThat(results, equalTo(asList("A-ctx-aCtx", "B-ctx-null", "C-ctx-cCtx", "D-ctx-null", "E-ctx-eCtx", "F-ctx-null"))); } @Test @@ -142,9 +162,14 @@ public void null_is_passed_as_context_if_you_do_nothing() { loader.load("B"); loader.loadMany(asList("C", "D")); + Map keysAndContexts = new LinkedHashMap<>(); + keysAndContexts.put("E", null); + keysAndContexts.put("F", null); + loader.loadMany(keysAndContexts); + List results = loader.dispatchAndJoin(); - assertThat(results, equalTo(asList("A-null", "B-null", "C-null", "D-null"))); + assertThat(results, equalTo(asList("A-null", "B-null", "C-null", "D-null", "E-null", "F-null"))); } @Test @@ -160,9 +185,14 @@ public void null_is_passed_as_context_to_map_loader_if_you_do_nothing() { loader.load("B"); loader.loadMany(asList("C", "D")); + Map keysAndContexts = new LinkedHashMap<>(); + keysAndContexts.put("E", null); + keysAndContexts.put("F", null); + loader.loadMany(keysAndContexts); + List results = loader.dispatchAndJoin(); - assertThat(results, equalTo(asList("A-null", "B-null", "C-null", "D-null"))); + assertThat(results, equalTo(asList("A-null", "B-null", "C-null", "D-null", "E-null", "F-null"))); } @Test diff --git a/src/test/java/org/dataloader/DataLoaderCacheMapTest.java b/src/test/java/org/dataloader/DataLoaderCacheMapTest.java index abfc8d3..a7b82b7 100644 --- a/src/test/java/org/dataloader/DataLoaderCacheMapTest.java +++ b/src/test/java/org/dataloader/DataLoaderCacheMapTest.java @@ -1,6 +1,6 @@ package org.dataloader; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Collection; @@ -8,8 +8,8 @@ import java.util.concurrent.CompletableFuture; import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; /** * Tests for cacheMap functionality.. diff --git a/src/test/java/org/dataloader/DataLoaderIfPresentTest.java b/src/test/java/org/dataloader/DataLoaderIfPresentTest.java index 1d897f2..f0a50d6 100644 --- a/src/test/java/org/dataloader/DataLoaderIfPresentTest.java +++ b/src/test/java/org/dataloader/DataLoaderIfPresentTest.java @@ -1,15 +1,15 @@ package org.dataloader; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; -import static org.junit.Assert.assertThat; /** * Tests for IfPresent and IfCompleted functionality. diff --git a/src/test/java/org/dataloader/DataLoaderMapBatchLoaderTest.java b/src/test/java/org/dataloader/DataLoaderMapBatchLoaderTest.java deleted file mode 100644 index 0fced79..0000000 --- a/src/test/java/org/dataloader/DataLoaderMapBatchLoaderTest.java +++ /dev/null @@ -1,184 +0,0 @@ -package org.dataloader; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; - -import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; -import static org.awaitility.Awaitility.await; -import static org.dataloader.DataLoaderFactory.newDataLoader; -import static org.dataloader.DataLoaderOptions.newOptions; -import static org.dataloader.fixtures.TestKit.futureError; -import static org.dataloader.fixtures.TestKit.listFrom; -import static org.dataloader.impl.CompletableFutureKit.cause; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -/** - * Much of the tests that related to {@link MappedBatchLoader} also related to - * {@link org.dataloader.BatchLoader}. This is white box testing somewhat because we could have repeated - * ALL the tests in {@link org.dataloader.DataLoaderTest} here as well but chose not to because we KNOW that - * DataLoader differs only a little in how it handles the 2 types of loader functions. We choose to grab some - * common functionality for repeat testing and otherwise rely on the very complete other tests. - */ -public class DataLoaderMapBatchLoaderTest { - - MappedBatchLoader evensOnlyMappedBatchLoader = (keys) -> { - Map mapOfResults = new HashMap<>(); - - AtomicInteger index = new AtomicInteger(); - keys.forEach(k -> { - int i = index.getAndIncrement(); - if (i % 2 == 0) { - mapOfResults.put(k, k); - } - }); - return CompletableFuture.completedFuture(mapOfResults); - }; - - private static DataLoader idMapLoader(DataLoaderOptions options, List> loadCalls) { - MappedBatchLoader kvBatchLoader = (keys) -> { - loadCalls.add(new ArrayList<>(keys)); - Map map = new HashMap<>(); - //noinspection unchecked - keys.forEach(k -> map.put(k, (V) k)); - return CompletableFuture.completedFuture(map); - }; - return DataLoaderFactory.newMappedDataLoader(kvBatchLoader, options); - } - - private static DataLoader idMapLoaderBlowsUps( - DataLoaderOptions options, List> loadCalls) { - return newDataLoader((keys) -> { - loadCalls.add(new ArrayList<>(keys)); - return futureError(); - }, options); - } - - - @Test - public void basic_map_batch_loading() { - DataLoader loader = DataLoaderFactory.newMappedDataLoader(evensOnlyMappedBatchLoader); - - loader.load("A"); - loader.load("B"); - loader.loadMany(asList("C", "D")); - - List results = loader.dispatchAndJoin(); - - assertThat(results.size(), equalTo(4)); - assertThat(results, equalTo(asList("A", null, "C", null))); - } - - - @Test - public void should_map_Batch_multiple_requests() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idMapLoader(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = identityLoader.load(1); - CompletableFuture future2 = identityLoader.load(2); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(future1.get(), equalTo(1)); - assertThat(future2.get(), equalTo(2)); - assertThat(loadCalls, equalTo(singletonList(asList(1, 2)))); - } - - @Test - public void can_split_max_batch_sizes_correctly() { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idMapLoader(newOptions().setMaxBatchSize(5), loadCalls); - - for (int i = 0; i < 21; i++) { - identityLoader.load(i); - } - List> expectedCalls = new ArrayList<>(); - expectedCalls.add(listFrom(0, 5)); - expectedCalls.add(listFrom(5, 10)); - expectedCalls.add(listFrom(10, 15)); - expectedCalls.add(listFrom(15, 20)); - expectedCalls.add(listFrom(20, 21)); - - List result = identityLoader.dispatch().join(); - - assertThat(result, equalTo(listFrom(0, 21))); - assertThat(loadCalls, equalTo(expectedCalls)); - } - - @Test - public void should_Propagate_error_to_all_loads() { - List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idMapLoaderBlowsUps(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = errorLoader.load(1); - CompletableFuture future2 = errorLoader.load(2); - errorLoader.dispatch(); - - await().until(future1::isDone); - - assertThat(future1.isCompletedExceptionally(), is(true)); - Throwable cause = cause(future1); - assert cause != null; - assertThat(cause, instanceOf(IllegalStateException.class)); - assertThat(cause.getMessage(), equalTo("Error")); - - await().until(future2::isDone); - cause = cause(future2); - assert cause != null; - assertThat(cause.getMessage(), equalTo(cause.getMessage())); - - assertThat(loadCalls, equalTo(singletonList(asList(1, 2)))); - } - - @Test - public void should_work_with_duplicate_keys_when_caching_disabled() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = - idMapLoader(newOptions().setCachingEnabled(false), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - CompletableFuture future3 = identityLoader.load("A"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone() && future3.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(future3.get(), equalTo("A")); - - // the map batch functions use a set of keys as input and hence remove duplicates unlike list variant - assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); - } - - @Test - public void should_work_with_duplicate_keys_when_caching_enabled() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = - idMapLoader(newOptions().setCachingEnabled(true), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - CompletableFuture future3 = identityLoader.load("A"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone() && future3.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(future3.get(), equalTo("A")); - assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); - } - - -} diff --git a/src/test/java/org/dataloader/DataLoaderRegistryTest.java b/src/test/java/org/dataloader/DataLoaderRegistryTest.java index aeaf668..bd1534d 100644 --- a/src/test/java/org/dataloader/DataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/DataLoaderRegistryTest.java @@ -2,16 +2,16 @@ import org.dataloader.stats.SimpleStatisticsCollector; import org.dataloader.stats.Statistics; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.concurrent.CompletableFuture; import static java.util.Arrays.asList; import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.sameInstance; -import static org.junit.Assert.assertThat; public class DataLoaderRegistryTest { final BatchLoader identityBatchLoader = CompletableFuture::completedFuture; diff --git a/src/test/java/org/dataloader/DataLoaderStatsTest.java b/src/test/java/org/dataloader/DataLoaderStatsTest.java index c2faa50..b8393e6 100644 --- a/src/test/java/org/dataloader/DataLoaderStatsTest.java +++ b/src/test/java/org/dataloader/DataLoaderStatsTest.java @@ -9,19 +9,20 @@ import org.dataloader.stats.context.IncrementCacheHitCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; -import static org.junit.Assert.assertThat; /** * Tests related to stats. DataLoaderTest is getting to big and needs refactoring @@ -118,9 +119,10 @@ public void stats_are_collected_with_caching_disabled() { loader.load("A"); loader.load("B"); loader.loadMany(asList("C", "D")); + loader.loadMany(Map.of("E", "E", "F", "F")); Statistics stats = loader.getStatistics(); - assertThat(stats.getLoadCount(), equalTo(4L)); + assertThat(stats.getLoadCount(), equalTo(6L)); assertThat(stats.getBatchInvokeCount(), equalTo(0L)); assertThat(stats.getBatchLoadCount(), equalTo(0L)); assertThat(stats.getCacheHitCount(), equalTo(0L)); @@ -128,9 +130,9 @@ public void stats_are_collected_with_caching_disabled() { loader.dispatch(); stats = loader.getStatistics(); - assertThat(stats.getLoadCount(), equalTo(4L)); + assertThat(stats.getLoadCount(), equalTo(6L)); assertThat(stats.getBatchInvokeCount(), equalTo(1L)); - assertThat(stats.getBatchLoadCount(), equalTo(4L)); + assertThat(stats.getBatchLoadCount(), equalTo(6L)); assertThat(stats.getCacheHitCount(), equalTo(0L)); loader.load("A"); @@ -139,9 +141,9 @@ public void stats_are_collected_with_caching_disabled() { loader.dispatch(); stats = loader.getStatistics(); - assertThat(stats.getLoadCount(), equalTo(6L)); + assertThat(stats.getLoadCount(), equalTo(8L)); assertThat(stats.getBatchInvokeCount(), equalTo(2L)); - assertThat(stats.getBatchLoadCount(), equalTo(6L)); + assertThat(stats.getBatchLoadCount(), equalTo(8L)); assertThat(stats.getCacheHitCount(), equalTo(0L)); } diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java index 18dd6f8..c4ae883 100644 --- a/src/test/java/org/dataloader/DataLoaderTest.java +++ b/src/test/java/org/dataloader/DataLoaderTest.java @@ -16,38 +16,54 @@ package org.dataloader; +import org.awaitility.Duration; import org.dataloader.fixtures.CustomCacheMap; import org.dataloader.fixtures.JsonObject; -import org.dataloader.fixtures.TestKit; import org.dataloader.fixtures.User; import org.dataloader.fixtures.UserManager; +import org.dataloader.fixtures.parameterized.ListDataLoaderFactory; +import org.dataloader.fixtures.parameterized.MappedDataLoaderFactory; +import org.dataloader.fixtures.parameterized.MappedPublisherDataLoaderFactory; +import org.dataloader.fixtures.parameterized.PublisherDataLoaderFactory; +import org.dataloader.fixtures.parameterized.TestDataLoaderFactory; +import org.dataloader.fixtures.parameterized.TestReactiveDataLoaderFactory; import org.dataloader.impl.CompletableFutureKit; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import org.dataloader.impl.DataLoaderAssertionException; +import org.junit.jupiter.api.Named; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Arrays.asList; -import static java.util.Collections.emptyList; -import static java.util.Collections.singletonList; +import static java.util.Collections.*; +import static java.util.concurrent.CompletableFuture.*; import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoaderWithTry; +import static org.dataloader.DataLoaderFactory.newPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newPublisherDataLoaderWithTry; import static org.dataloader.DataLoaderOptions.newOptions; +import static org.dataloader.fixtures.TestKit.areAllDone; import static org.dataloader.fixtures.TestKit.listFrom; import static org.dataloader.impl.CompletableFutureKit.cause; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.fail; /** * Tests for {@link DataLoader}. @@ -64,7 +80,7 @@ public class DataLoaderTest { @Test public void should_Build_a_really_really_simple_data_loader() { AtomicBoolean success = new AtomicBoolean(); - DataLoader identityLoader = newDataLoader(keysAsValues()); + DataLoader identityLoader = newDataLoader(CompletableFuture::completedFuture); CompletionStage future1 = identityLoader.load(1); @@ -77,9 +93,42 @@ public void should_Build_a_really_really_simple_data_loader() { } @Test - public void should_Support_loading_multiple_keys_in_one_call() { + public void basic_map_batch_loading() { + MappedBatchLoader evensOnlyMappedBatchLoader = (keys) -> { + Map mapOfResults = new HashMap<>(); + + AtomicInteger index = new AtomicInteger(); + keys.forEach(k -> { + int i = index.getAndIncrement(); + if (i % 2 == 0) { + mapOfResults.put(k, k); + } + }); + return completedFuture(mapOfResults); + }; + DataLoader loader = DataLoaderFactory.newMappedDataLoader(evensOnlyMappedBatchLoader); + + final List keys = asList("C", "D"); + final Map keysAndContexts = new LinkedHashMap<>(); + keysAndContexts.put("E", null); + keysAndContexts.put("F", null); + + loader.load("A"); + loader.load("B"); + loader.loadMany(keys); + loader.loadMany(keysAndContexts); + + List results = loader.dispatchAndJoin(); + + assertThat(results.size(), equalTo(6)); + assertThat(results, equalTo(asList("A", null, "C", null, "E", null))); + } + + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Support_loading_multiple_keys_in_one_call_via_list(TestDataLoaderFactory factory) { AtomicBoolean success = new AtomicBoolean(); - DataLoader identityLoader = newDataLoader(keysAsValues()); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), new ArrayList<>()); CompletionStage> futureAll = identityLoader.loadMany(asList(1, 2)); futureAll.thenAccept(promisedValues -> { @@ -91,10 +140,31 @@ public void should_Support_loading_multiple_keys_in_one_call() { assertThat(futureAll.toCompletableFuture().join(), equalTo(asList(1, 2))); } - @Test - public void should_Resolve_to_empty_list_when_no_keys_supplied() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Support_loading_multiple_keys_in_one_call_via_map(TestDataLoaderFactory factory) { + AtomicBoolean success = new AtomicBoolean(); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), new ArrayList<>()); + + final Map keysAndContexts = new LinkedHashMap<>(); + keysAndContexts.put(1, null); + keysAndContexts.put(2, null); + + CompletionStage> futureAll = identityLoader.loadMany(keysAndContexts); + futureAll.thenAccept(promisedValues -> { + assertThat(promisedValues.size(), is(2)); + success.set(true); + }); + identityLoader.dispatch(); + await().untilAtomic(success, is(true)); + assertThat(futureAll.toCompletableFuture().join(), equalTo(Map.of(1, 1, 2, 2))); + } + + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Resolve_to_empty_list_when_no_keys_supplied(TestDataLoaderFactory factory) { AtomicBoolean success = new AtomicBoolean(); - DataLoader identityLoader = newDataLoader(keysAsValues()); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), new ArrayList<>()); CompletableFuture> futureEmpty = identityLoader.loadMany(emptyList()); futureEmpty.thenAccept(promisedValues -> { assertThat(promisedValues.size(), is(0)); @@ -105,10 +175,26 @@ public void should_Resolve_to_empty_list_when_no_keys_supplied() { assertThat(futureEmpty.join(), empty()); } - @Test - public void should_Return_zero_entries_dispatched_when_no_keys_supplied() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Resolve_to_empty_map_when_no_keys_supplied(TestDataLoaderFactory factory) { + AtomicBoolean success = new AtomicBoolean(); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), new ArrayList<>()); + CompletableFuture> futureEmpty = identityLoader.loadMany(emptyMap()); + futureEmpty.thenAccept(promisedValues -> { + assertThat(promisedValues.size(), is(0)); + success.set(true); + }); + identityLoader.dispatch(); + await().untilAtomic(success, is(true)); + assertThat(futureEmpty.join(), anEmptyMap()); + } + + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Return_zero_entries_dispatched_when_no_keys_supplied_via_list(TestDataLoaderFactory factory) { AtomicBoolean success = new AtomicBoolean(); - DataLoader identityLoader = newDataLoader(keysAsValues()); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), new ArrayList<>()); CompletableFuture> futureEmpty = identityLoader.loadMany(emptyList()); futureEmpty.thenAccept(promisedValues -> { assertThat(promisedValues.size(), is(0)); @@ -119,10 +205,26 @@ public void should_Return_zero_entries_dispatched_when_no_keys_supplied() { assertThat(dispatchResult.getKeysCount(), equalTo(0)); } - @Test - public void should_Batch_multiple_requests() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Return_zero_entries_dispatched_when_no_keys_supplied_via_map(TestDataLoaderFactory factory) { + AtomicBoolean success = new AtomicBoolean(); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), new ArrayList<>()); + CompletableFuture> futureEmpty = identityLoader.loadMany(emptyMap()); + futureEmpty.thenAccept(promisedValues -> { + assertThat(promisedValues.size(), is(0)); + success.set(true); + }); + DispatchResult dispatchResult = identityLoader.dispatchWithCounts(); + await().untilAtomic(success, is(true)); + assertThat(dispatchResult.getKeysCount(), equalTo(0)); + } + + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Batch_multiple_requests(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = identityLoader.load(1); CompletableFuture future2 = identityLoader.load(2); @@ -134,10 +236,11 @@ public void should_Batch_multiple_requests() throws ExecutionException, Interrup assertThat(loadCalls, equalTo(singletonList(asList(1, 2)))); } - @Test - public void should_Return_number_of_batched_entries() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Return_number_of_batched_entries(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = identityLoader.load(1); CompletableFuture future2 = identityLoader.load(2); @@ -148,10 +251,11 @@ public void should_Return_number_of_batched_entries() { assertThat(dispatchResult.getPromisedResults().isDone(), equalTo(true)); } - @Test - public void should_Coalesce_identical_requests() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Coalesce_identical_requests(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); CompletableFuture future1a = identityLoader.load(1); CompletableFuture future1b = identityLoader.load(1); @@ -164,10 +268,11 @@ public void should_Coalesce_identical_requests() throws ExecutionException, Inte assertThat(loadCalls, equalTo(singletonList(singletonList(1)))); } - @Test - public void should_Cache_repeated_requests() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Cache_repeated_requests(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = identityLoader.load("A"); CompletableFuture future2 = identityLoader.load("B"); @@ -199,10 +304,11 @@ public void should_Cache_repeated_requests() throws ExecutionException, Interrup assertThat(loadCalls, equalTo(asList(asList("A", "B"), singletonList("C")))); } - @Test - public void should_Not_redispatch_previous_load() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Not_redispatch_previous_load(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = identityLoader.load("A"); identityLoader.dispatch(); @@ -216,10 +322,11 @@ public void should_Not_redispatch_previous_load() throws ExecutionException, Int assertThat(loadCalls, equalTo(asList(singletonList("A"), singletonList("B")))); } - @Test - public void should_Cache_on_redispatch() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Cache_on_redispatch(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = identityLoader.load("A"); identityLoader.dispatch(); @@ -227,16 +334,24 @@ public void should_Cache_on_redispatch() throws ExecutionException, InterruptedE CompletableFuture> future2 = identityLoader.loadMany(asList("A", "B")); identityLoader.dispatch(); - await().until(() -> future1.isDone() && future2.isDone()); + Map keysAndContexts = new LinkedHashMap<>(); + keysAndContexts.put("A", null); + keysAndContexts.put("C", null); + CompletableFuture> future3 = identityLoader.loadMany(keysAndContexts); + identityLoader.dispatch(); + + await().until(() -> future1.isDone() && future2.isDone() && future3.isDone()); assertThat(future1.get(), equalTo("A")); assertThat(future2.get(), equalTo(asList("A", "B"))); - assertThat(loadCalls, equalTo(asList(singletonList("A"), singletonList("B")))); + assertThat(future3.get(), equalTo(keysAndContexts.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getKey)))); + assertThat(loadCalls, equalTo(asList(singletonList("A"), singletonList("B"), singletonList("C")))); } - @Test - public void should_Clear_single_value_in_loader() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Clear_single_value_in_loader(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = identityLoader.load("A"); CompletableFuture future2 = identityLoader.load("B"); @@ -261,10 +376,11 @@ public void should_Clear_single_value_in_loader() throws ExecutionException, Int assertThat(loadCalls, equalTo(asList(asList("A", "B"), singletonList("A")))); } - @Test - public void should_Clear_all_values_in_loader() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Clear_all_values_in_loader(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = identityLoader.load("A"); CompletableFuture future2 = identityLoader.load("B"); @@ -288,10 +404,11 @@ public void should_Clear_all_values_in_loader() throws ExecutionException, Inter assertThat(loadCalls, equalTo(asList(asList("A", "B"), asList("A", "B")))); } - @Test - public void should_Allow_priming_the_cache() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Allow_priming_the_cache(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); DataLoader dlFluency = identityLoader.prime("A", "A"); assertThat(dlFluency, equalTo(identityLoader)); @@ -306,10 +423,11 @@ public void should_Allow_priming_the_cache() throws ExecutionException, Interrup assertThat(loadCalls, equalTo(singletonList(singletonList("B")))); } - @Test - public void should_Not_prime_keys_that_already_exist() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Not_prime_keys_that_already_exist(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); identityLoader.prime("A", "X"); @@ -334,10 +452,11 @@ public void should_Not_prime_keys_that_already_exist() throws ExecutionException assertThat(loadCalls, equalTo(singletonList(singletonList("B")))); } - @Test - public void should_Allow_to_forcefully_prime_the_cache() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Allow_to_forcefully_prime_the_cache(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); identityLoader.prime("A", "X"); @@ -362,12 +481,13 @@ public void should_Allow_to_forcefully_prime_the_cache() throws ExecutionExcepti assertThat(loadCalls, equalTo(singletonList(singletonList("B")))); } - @Test - public void should_Allow_priming_the_cache_with_a_future() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Allow_priming_the_cache_with_a_future(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); - DataLoader dlFluency = identityLoader.prime("A", CompletableFuture.completedFuture("A")); + DataLoader dlFluency = identityLoader.prime("A", completedFuture("A")); assertThat(dlFluency, equalTo(identityLoader)); CompletableFuture future1 = identityLoader.load("A"); @@ -380,10 +500,11 @@ public void should_Allow_priming_the_cache_with_a_future() throws ExecutionExcep assertThat(loadCalls, equalTo(singletonList(singletonList("B")))); } - @Test - public void should_not_Cache_failed_fetches_on_complete_failure() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_not_Cache_failed_fetches_on_complete_failure(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idLoaderBlowsUps(new DataLoaderOptions(), loadCalls); + DataLoader errorLoader = factory.idLoaderBlowsUps(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = errorLoader.load(1); errorLoader.dispatch(); @@ -401,10 +522,11 @@ public void should_not_Cache_failed_fetches_on_complete_failure() { assertThat(loadCalls, equalTo(asList(singletonList(1), singletonList(1)))); } - @Test - public void should_Resolve_to_error_to_indicate_failure() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Resolve_to_error_to_indicate_failure(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); - DataLoader evenLoader = idLoaderOddEvenExceptions(new DataLoaderOptions(), loadCalls); + DataLoader evenLoader = factory.idLoaderOddEvenExceptions(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = evenLoader.load(1); evenLoader.dispatch(); @@ -423,11 +545,12 @@ public void should_Resolve_to_error_to_indicate_failure() throws ExecutionExcept // Accept any kind of key. - @Test - public void should_Represent_failures_and_successes_simultaneously() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Represent_failures_and_successes_simultaneously(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { AtomicBoolean success = new AtomicBoolean(); List> loadCalls = new ArrayList<>(); - DataLoader evenLoader = idLoaderOddEvenExceptions(new DataLoaderOptions(), loadCalls); + DataLoader evenLoader = factory.idLoaderOddEvenExceptions(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = evenLoader.load(1); CompletableFuture future2 = evenLoader.load(2); @@ -449,10 +572,11 @@ public void should_Represent_failures_and_successes_simultaneously() throws Exec // Accepts options - @Test - public void should_Cache_failed_fetches() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Cache_failed_fetches(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idLoaderAllExceptions(new DataLoaderOptions(), loadCalls); + DataLoader errorLoader = factory.idLoaderAllExceptions(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = errorLoader.load(1); errorLoader.dispatch(); @@ -471,11 +595,12 @@ public void should_Cache_failed_fetches() { assertThat(loadCalls, equalTo(singletonList(singletonList(1)))); } - @Test - public void should_NOT_Cache_failed_fetches_if_told_not_too() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_NOT_Cache_failed_fetches_if_told_not_too(TestDataLoaderFactory factory) { DataLoaderOptions options = DataLoaderOptions.newOptions().setCachingExceptionsEnabled(false); List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idLoaderAllExceptions(options, loadCalls); + DataLoader errorLoader = factory.idLoaderAllExceptions(options, loadCalls); CompletableFuture future1 = errorLoader.load(1); errorLoader.dispatch(); @@ -497,10 +622,11 @@ public void should_NOT_Cache_failed_fetches_if_told_not_too() { // Accepts object key in custom cacheKey function - @Test - public void should_Handle_priming_the_cache_with_an_error() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Handle_priming_the_cache_with_an_error(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); identityLoader.prime(1, new IllegalStateException("Error")); @@ -513,10 +639,11 @@ public void should_Handle_priming_the_cache_with_an_error() { assertThat(loadCalls, equalTo(emptyList())); } - @Test - public void should_Clear_values_from_cache_after_errors() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Clear_values_from_cache_after_errors(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idLoaderBlowsUps(new DataLoaderOptions(), loadCalls); + DataLoader errorLoader = factory.idLoaderBlowsUps(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = errorLoader.load(1); future1.handle((value, t) -> { @@ -548,10 +675,11 @@ public void should_Clear_values_from_cache_after_errors() { assertThat(loadCalls, equalTo(asList(singletonList(1), singletonList(1)))); } - @Test - public void should_Propagate_error_to_all_loads() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Propagate_error_to_all_loads(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idLoaderBlowsUps(new DataLoaderOptions(), loadCalls); + DataLoader errorLoader = factory.idLoaderBlowsUps(new DataLoaderOptions(), loadCalls); CompletableFuture future1 = errorLoader.load(1); CompletableFuture future2 = errorLoader.load(2); @@ -571,10 +699,11 @@ public void should_Propagate_error_to_all_loads() { assertThat(loadCalls, equalTo(singletonList(asList(1, 2)))); } - @Test - public void should_Accept_objects_as_keys() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Accept_objects_as_keys(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); Object keyA = new Object(); Object keyB = new Object(); @@ -612,11 +741,12 @@ public void should_Accept_objects_as_keys() { assertThat(loadCalls.get(1).toArray()[0], equalTo(keyA)); } - @Test - public void should_Disable_caching() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Disable_caching(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); DataLoader identityLoader = - idLoader(newOptions().setCachingEnabled(false), loadCalls); + factory.idLoader(newOptions().setCachingEnabled(false), loadCalls); CompletableFuture future1 = identityLoader.load("A"); CompletableFuture future2 = identityLoader.load("B"); @@ -649,11 +779,12 @@ public void should_Disable_caching() throws ExecutionException, InterruptedExcep asList("A", "C"), asList("A", "B", "C")))); } - @Test - public void should_work_with_duplicate_keys_when_caching_disabled() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_work_with_duplicate_keys_when_caching_disabled(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); DataLoader identityLoader = - idLoader(newOptions().setCachingEnabled(false), loadCalls); + factory.idLoader(newOptions().setCachingEnabled(false), loadCalls); CompletableFuture future1 = identityLoader.load("A"); CompletableFuture future2 = identityLoader.load("B"); @@ -664,14 +795,19 @@ public void should_work_with_duplicate_keys_when_caching_disabled() throws Execu assertThat(future1.get(), equalTo("A")); assertThat(future2.get(), equalTo("B")); assertThat(future3.get(), equalTo("A")); - assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A")))); + if (factory instanceof MappedDataLoaderFactory || factory instanceof MappedPublisherDataLoaderFactory) { + assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); + } else { + assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A")))); + } } - @Test - public void should_work_with_duplicate_keys_when_caching_enabled() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_work_with_duplicate_keys_when_caching_enabled(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); DataLoader identityLoader = - idLoader(newOptions().setCachingEnabled(true), loadCalls); + factory.idLoader(newOptions().setCachingEnabled(true), loadCalls); CompletableFuture future1 = identityLoader.load("A"); CompletableFuture future2 = identityLoader.load("B"); @@ -687,17 +823,18 @@ public void should_work_with_duplicate_keys_when_caching_enabled() throws Execut // It is resilient to job queue ordering - @Test - public void should_Accept_objects_with_a_complex_key() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Accept_objects_with_a_complex_key(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); DataLoaderOptions options = newOptions().setCacheKeyFunction(getJsonObjectCacheMapFn()); - DataLoader identityLoader = idLoader(options, loadCalls); + DataLoader identityLoader = factory.idLoader(options, loadCalls); JsonObject key1 = new JsonObject().put("id", 123); JsonObject key2 = new JsonObject().put("id", 123); - CompletableFuture future1 = identityLoader.load(key1); - CompletableFuture future2 = identityLoader.load(key2); + CompletableFuture future1 = identityLoader.load(key1); + CompletableFuture future2 = identityLoader.load(key2); identityLoader.dispatch(); await().until(() -> future1.isDone() && future2.isDone()); @@ -708,22 +845,23 @@ public void should_Accept_objects_with_a_complex_key() throws ExecutionException // Helper methods - @Test - public void should_Clear_objects_with_complex_key() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Clear_objects_with_complex_key(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); DataLoaderOptions options = newOptions().setCacheKeyFunction(getJsonObjectCacheMapFn()); - DataLoader identityLoader = idLoader(options, loadCalls); + DataLoader identityLoader = factory.idLoader(options, loadCalls); JsonObject key1 = new JsonObject().put("id", 123); JsonObject key2 = new JsonObject().put("id", 123); - CompletableFuture future1 = identityLoader.load(key1); + CompletableFuture future1 = identityLoader.load(key1); identityLoader.dispatch(); await().until(future1::isDone); identityLoader.clear(key2); // clear equivalent object key - CompletableFuture future2 = identityLoader.load(key1); + CompletableFuture future2 = identityLoader.load(key1); identityLoader.dispatch(); await().until(future2::isDone); @@ -732,33 +870,35 @@ public void should_Clear_objects_with_complex_key() throws ExecutionException, I assertThat(future2.get(), equalTo(key1)); } - @Test - public void should_Accept_objects_with_different_order_of_keys() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Accept_objects_with_different_order_of_keys(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); DataLoaderOptions options = newOptions().setCacheKeyFunction(getJsonObjectCacheMapFn()); - DataLoader identityLoader = idLoader(options, loadCalls); + DataLoader identityLoader = factory.idLoader(options, loadCalls); JsonObject key1 = new JsonObject().put("a", 123).put("b", 321); JsonObject key2 = new JsonObject().put("b", 321).put("a", 123); // Fetches as expected - CompletableFuture future1 = identityLoader.load(key1); - CompletableFuture future2 = identityLoader.load(key2); + CompletableFuture future1 = identityLoader.load(key1); + CompletableFuture future2 = identityLoader.load(key2); identityLoader.dispatch(); await().until(() -> future1.isDone() && future2.isDone()); assertThat(loadCalls, equalTo(singletonList(singletonList(key1)))); assertThat(loadCalls.size(), equalTo(1)); assertThat(future1.get(), equalTo(key1)); - assertThat(future2.get(), equalTo(key1)); + assertThat(future2.get(), equalTo(key2)); } - @Test - public void should_Allow_priming_the_cache_with_an_object_key() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Allow_priming_the_cache_with_an_object_key(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { List> loadCalls = new ArrayList<>(); DataLoaderOptions options = newOptions().setCacheKeyFunction(getJsonObjectCacheMapFn()); - DataLoader identityLoader = idLoader(options, loadCalls); + DataLoader identityLoader = factory.idLoader(options, loadCalls); JsonObject key1 = new JsonObject().put("id", 123); JsonObject key2 = new JsonObject().put("id", 123); @@ -775,12 +915,13 @@ public void should_Allow_priming_the_cache_with_an_object_key() throws Execution assertThat(future2.get(), equalTo(key1)); } - @Test - public void should_Accept_a_custom_cache_map_implementation() throws ExecutionException, InterruptedException { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Accept_a_custom_cache_map_implementation(TestDataLoaderFactory factory) throws ExecutionException, InterruptedException { CustomCacheMap customMap = new CustomCacheMap(); List> loadCalls = new ArrayList<>(); DataLoaderOptions options = newOptions().setCacheMap(customMap); - DataLoader identityLoader = idLoader(options, loadCalls); + DataLoader identityLoader = factory.idLoader(options, loadCalls); // Fetches as expected @@ -826,11 +967,27 @@ public void should_Accept_a_custom_cache_map_implementation() throws ExecutionEx assertArrayEquals(customMap.stash.keySet().toArray(), emptyList().toArray()); } - @Test - public void batching_disabled_should_dispatch_immediately() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_degrade_gracefully_if_cache_get_throws(TestDataLoaderFactory factory) { + CacheMap cache = new ThrowingCacheMap(); + DataLoaderOptions options = newOptions().setCachingEnabled(true).setCacheMap(cache); + List> loadCalls = new ArrayList<>(); + DataLoader identityLoader = factory.idLoader(options, loadCalls); + + assertThat(identityLoader.getIfPresent("a"), equalTo(Optional.empty())); + + CompletableFuture future = identityLoader.load("a"); + identityLoader.dispatch(); + assertThat(future.join(), equalTo("a")); + } + + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void batching_disabled_should_dispatch_immediately(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); DataLoaderOptions options = newOptions().setBatchingEnabled(false); - DataLoader identityLoader = idLoader(options, loadCalls); + DataLoader identityLoader = factory.idLoader(options, loadCalls); CompletableFuture fa = identityLoader.load("A"); CompletableFuture fb = identityLoader.load("B"); @@ -854,11 +1011,12 @@ public void batching_disabled_should_dispatch_immediately() { } - @Test - public void batching_disabled_and_caching_disabled_should_dispatch_immediately_and_forget() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void batching_disabled_and_caching_disabled_should_dispatch_immediately_and_forget(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); DataLoaderOptions options = newOptions().setBatchingEnabled(false).setCachingEnabled(false); - DataLoader identityLoader = idLoader(options, loadCalls); + DataLoader identityLoader = factory.idLoader(options, loadCalls); CompletableFuture fa = identityLoader.load("A"); CompletableFuture fb = identityLoader.load("B"); @@ -885,10 +1043,11 @@ public void batching_disabled_and_caching_disabled_should_dispatch_immediately_a } - @Test - public void batches_multiple_requests_with_max_batch_size() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void batches_multiple_requests_with_max_batch_size(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(newOptions().setMaxBatchSize(2), loadCalls); + DataLoader identityLoader = factory.idLoader(newOptions().setMaxBatchSize(2), loadCalls); CompletableFuture f1 = identityLoader.load(1); CompletableFuture f2 = identityLoader.load(2); @@ -896,7 +1055,7 @@ public void batches_multiple_requests_with_max_batch_size() { identityLoader.dispatch(); - CompletableFuture.allOf(f1, f2, f3).join(); + allOf(f1, f2, f3).join(); assertThat(f1.join(), equalTo(1)); assertThat(f2.join(), equalTo(2)); @@ -906,10 +1065,11 @@ public void batches_multiple_requests_with_max_batch_size() { } - @Test - public void can_split_max_batch_sizes_correctly() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void can_split_max_batch_sizes_correctly(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(newOptions().setMaxBatchSize(5), loadCalls); + DataLoader identityLoader = factory.idLoader(newOptions().setMaxBatchSize(5), loadCalls); for (int i = 0; i < 21; i++) { identityLoader.load(i); @@ -928,22 +1088,23 @@ public void can_split_max_batch_sizes_correctly() { } - @Test - public void should_Batch_loads_occurring_within_futures() { + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory factory) { List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(newOptions(), loadCalls); + DataLoader identityLoader = factory.idLoader(newOptions(), loadCalls); Supplier nullValue = () -> null; AtomicBoolean v4Called = new AtomicBoolean(); - CompletableFuture.supplyAsync(nullValue).thenAccept(v1 -> { + supplyAsync(nullValue).thenAccept(v1 -> { identityLoader.load("a"); - CompletableFuture.supplyAsync(nullValue).thenAccept(v2 -> { + supplyAsync(nullValue).thenAccept(v2 -> { identityLoader.load("b"); - CompletableFuture.supplyAsync(nullValue).thenAccept(v3 -> { + supplyAsync(nullValue).thenAccept(v3 -> { identityLoader.load("c"); - CompletableFuture.supplyAsync(nullValue).thenAccept( + supplyAsync(nullValue).thenAccept( v4 -> { identityLoader.load("d"); v4Called.set(true); @@ -960,12 +1121,101 @@ public void should_Batch_loads_occurring_within_futures() { singletonList(asList("a", "b", "c", "d")))); } + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_blowup_after_N_keys(TestDataLoaderFactory factory) { + if (!(factory instanceof TestReactiveDataLoaderFactory)) { + return; + } + // + // if we blow up after emitting N keys, the N keys should work but the rest of the keys + // should be exceptional + DataLoader identityLoader = ((TestReactiveDataLoaderFactory) factory).idLoaderBlowsUpsAfterN(3, new DataLoaderOptions(), new ArrayList<>()); + CompletableFuture cf1 = identityLoader.load(1); + CompletableFuture cf2 = identityLoader.load(2); + CompletableFuture cf3 = identityLoader.load(3); + CompletableFuture cf4 = identityLoader.load(4); + CompletableFuture cf5 = identityLoader.load(5); + identityLoader.dispatch(); + await().until(cf5::isDone); + + assertThat(cf1.join(), equalTo(1)); + assertThat(cf2.join(), equalTo(2)); + assertThat(cf3.join(), equalTo(3)); + assertThat(cf4.isCompletedExceptionally(), is(true)); + assertThat(cf5.isCompletedExceptionally(), is(true)); + + } + + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void when_values_size_are_less_then_key_size(TestDataLoaderFactory factory) { + // + // what happens if we want 4 values but are only given 2 back say + // + DataLoader identityLoader = factory.onlyReturnsNValues(2, new DataLoaderOptions(), new ArrayList<>()); + CompletableFuture cf1 = identityLoader.load("A"); + CompletableFuture cf2 = identityLoader.load("B"); + CompletableFuture cf3 = identityLoader.load("C"); + CompletableFuture cf4 = identityLoader.load("D"); + identityLoader.dispatch(); + + await().atMost(Duration.FIVE_SECONDS).until(() -> areAllDone(cf1, cf2, cf3, cf4)); + + if (factory instanceof ListDataLoaderFactory) { + assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class)); + } else if (factory instanceof PublisherDataLoaderFactory) { + // some have completed progressively but the other never did + assertThat(cf1.join(), equalTo("A")); + assertThat(cf2.join(), equalTo("B")); + assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class)); + } else { + // with the maps it's ok to have fewer results + assertThat(cf1.join(), equalTo("A")); + assertThat(cf2.join(), equalTo("B")); + assertThat(cf3.join(), equalTo(null)); + assertThat(cf4.join(), equalTo(null)); + } + } + + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void when_values_size_are_more_then_key_size(TestDataLoaderFactory factory) { + // + // what happens if we want 4 values but only given 6 back say + // + DataLoader identityLoader = factory.idLoaderReturnsTooMany(2, new DataLoaderOptions(), new ArrayList<>()); + CompletableFuture cf1 = identityLoader.load("A"); + CompletableFuture cf2 = identityLoader.load("B"); + CompletableFuture cf3 = identityLoader.load("C"); + CompletableFuture cf4 = identityLoader.load("D"); + identityLoader.dispatch(); + await().atMost(Duration.FIVE_SECONDS).until(() -> areAllDone(cf1, cf2, cf3, cf4)); + + + if (factory instanceof ListDataLoaderFactory) { + assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class)); + } else { + assertThat(cf1.join(), equalTo("A")); + assertThat(cf2.join(), equalTo("B")); + assertThat(cf3.join(), equalTo("C")); + assertThat(cf4.join(), equalTo("D")); + } + } + @Test public void can_call_a_loader_from_a_loader() throws Exception { List> deepLoadCalls = new ArrayList<>(); DataLoader deepLoader = newDataLoader(keys -> { deepLoadCalls.add(keys); - return CompletableFuture.completedFuture(keys); + return completedFuture(keys); }); List> aLoadCalls = new ArrayList<>(); @@ -985,7 +1235,7 @@ public void can_call_a_loader_from_a_loader() throws Exception { CompletableFuture b1 = bLoader.load("B1"); CompletableFuture b2 = bLoader.load("B2"); - CompletableFuture.allOf( + allOf( aLoader.dispatch(), deepLoader.dispatch(), bLoader.dispatch(), @@ -1011,11 +1261,10 @@ public void can_call_a_loader_from_a_loader() throws Exception { public void should_allow_composition_of_data_loader_calls() { UserManager userManager = new UserManager(); - BatchLoader userBatchLoader = userIds -> CompletableFuture - .supplyAsync(() -> userIds - .stream() - .map(userManager::loadUserById) - .collect(Collectors.toList())); + BatchLoader userBatchLoader = userIds -> supplyAsync(() -> userIds + .stream() + .map(userManager::loadUserById) + .collect(Collectors.toList())); DataLoader userLoader = newDataLoader(userBatchLoader); AtomicBoolean gandalfCalled = new AtomicBoolean(false); @@ -1051,56 +1300,21 @@ private static CacheKey getJsonObjectCacheMapFn() { .collect(Collectors.joining()); } - private static DataLoader idLoader(DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - @SuppressWarnings("unchecked") - List values = keys.stream() - .map(k -> (V) k) - .collect(Collectors.toList()); - return CompletableFuture.completedFuture(values); - }, options); - } - - private static DataLoader idLoaderBlowsUps( - DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - return TestKit.futureError(); - }, options); - } - - private static DataLoader idLoaderAllExceptions( - DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); + private static class ThrowingCacheMap extends CustomCacheMap { - List errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList()); - return CompletableFuture.completedFuture(errors); - }, options); - } - - private static DataLoader idLoaderOddEvenExceptions( - DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - - List errors = new ArrayList<>(); - for (Integer key : keys) { - if (key % 2 == 0) { - errors.add(key); - } else { - errors.add(new IllegalStateException("Error")); - } - } - return CompletableFuture.completedFuture(errors); - }, options); + @Override + public CompletableFuture get(String key) { + throw new RuntimeException("Cache implementation failed."); + } } - - private BatchLoader keysAsValues() { - return CompletableFuture::completedFuture; + private static Stream dataLoaderFactories() { + return Stream.of( + Arguments.of(Named.of("List DataLoader", new ListDataLoaderFactory())), + Arguments.of(Named.of("Mapped DataLoader", new MappedDataLoaderFactory())), + Arguments.of(Named.of("Publisher DataLoader", new PublisherDataLoaderFactory())), + Arguments.of(Named.of("Mapped Publisher DataLoader", new MappedPublisherDataLoaderFactory())) + ); } - } diff --git a/src/test/java/org/dataloader/DataLoaderTimeTest.java b/src/test/java/org/dataloader/DataLoaderTimeTest.java index ee73d85..b4d645c 100644 --- a/src/test/java/org/dataloader/DataLoaderTimeTest.java +++ b/src/test/java/org/dataloader/DataLoaderTimeTest.java @@ -1,13 +1,13 @@ package org.dataloader; import org.dataloader.fixtures.TestingClock; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.time.Instant; import static org.dataloader.fixtures.TestKit.keysAsValues; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; @SuppressWarnings("UnusedReturnValue") public class DataLoaderTimeTest { diff --git a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java index 2716fae..1fb5ea2 100644 --- a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java +++ b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java @@ -5,7 +5,7 @@ import org.dataloader.fixtures.CaffeineValueCache; import org.dataloader.fixtures.CustomValueCache; import org.dataloader.impl.DataLoaderAssertionException; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; @@ -22,11 +22,11 @@ import static org.dataloader.fixtures.TestKit.snooze; import static org.dataloader.fixtures.TestKit.sort; import static org.dataloader.impl.CompletableFutureKit.failedFuture; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class DataLoaderValueCacheTest { diff --git a/src/test/java/org/dataloader/DataLoaderWithTryTest.java b/src/test/java/org/dataloader/DataLoaderWithTryTest.java index e9e8538..fda7bd4 100644 --- a/src/test/java/org/dataloader/DataLoaderWithTryTest.java +++ b/src/test/java/org/dataloader/DataLoaderWithTryTest.java @@ -1,7 +1,7 @@ package org.dataloader; import org.hamcrest.Matchers; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.HashMap; @@ -12,9 +12,9 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.dataloader.DataLoaderFactory.*; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; public class DataLoaderWithTryTest { diff --git a/src/test/java/org/dataloader/TryTest.java b/src/test/java/org/dataloader/TryTest.java index 4da7bca..1b237e2 100644 --- a/src/test/java/org/dataloader/TryTest.java +++ b/src/test/java/org/dataloader/TryTest.java @@ -1,7 +1,7 @@ package org.dataloader; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -9,11 +9,11 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TryTest { @@ -29,7 +29,7 @@ private void expectThrowable(RunThatCanThrow runnable, Class Set asSet(T... elements) { public static Set asSet(Collection elements) { return new LinkedHashSet<>(elements); } + + public static boolean areAllDone(CompletableFuture... cfs) { + for (CompletableFuture cf : cfs) { + if (! cf.isDone()) { + return false; + } + } + return true; + } } diff --git a/src/test/java/org/dataloader/fixtures/UserManager.java b/src/test/java/org/dataloader/fixtures/UserManager.java index 24fee0d..1d2ff1f 100644 --- a/src/test/java/org/dataloader/fixtures/UserManager.java +++ b/src/test/java/org/dataloader/fixtures/UserManager.java @@ -1,5 +1,8 @@ package org.dataloader.fixtures; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -52,6 +55,14 @@ public List loadUsersById(List userIds) { return userIds.stream().map(this::loadUserById).collect(Collectors.toList()); } + public Publisher streamUsersById(List userIds) { + return Flux.fromIterable(loadUsersById(userIds)); + } + + public Publisher> streamUsersById(Set userIds) { + return Flux.fromIterable(loadMapOfUsersByIds(null, userIds).entrySet()); + } + public Map loadMapOfUsersByIds(SecurityCtx callCtx, Set userIds) { Map map = new HashMap<>(); userIds.forEach(userId -> { diff --git a/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java new file mode 100644 index 0000000..ee1f1d7 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java @@ -0,0 +1,79 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.fixtures.TestKit; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.dataloader.DataLoaderFactory.newDataLoader; + +public class ListDataLoaderFactory implements TestDataLoaderFactory { + @Override + public DataLoader idLoader(DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + return completedFuture(keys); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps( + DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + return TestKit.futureError(); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + List errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList()); + return completedFuture(errors); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + List errors = new ArrayList<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errors.add(key); + } else { + errors.add(new IllegalStateException("Error")); + } + } + return completedFuture(errors); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + return completedFuture(keys.subList(0, N)); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + return completedFuture(l); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java new file mode 100644 index 0000000..8f41441 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java @@ -0,0 +1,95 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.dataloader.DataLoaderFactory.newMappedDataLoader; +import static org.dataloader.fixtures.TestKit.futureError; + +public class MappedDataLoaderFactory implements TestDataLoaderFactory { + + @Override + public DataLoader idLoader( + DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader((keys) -> { + loadCalls.add(new ArrayList<>(keys)); + Map map = new HashMap<>(); + keys.forEach(k -> map.put(k, k)); + return completedFuture(map); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader((keys) -> { + loadCalls.add(new ArrayList<>(keys)); + return futureError(); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + Map errorByKey = new HashMap<>(); + keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error"))); + return completedFuture(errorByKey); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + Map errorByKey = new HashMap<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errorByKey.put(key, key); + } else { + errorByKey.put(key, new IllegalStateException("Error")); + } + } + return completedFuture(errorByKey); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + Map collect = List.copyOf(keys).subList(0, N).stream().collect(Collectors.toMap( + k -> k, v -> v + )); + return completedFuture(collect); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + + Map collect = l.stream().collect(Collectors.toMap( + k -> k, v -> v + )); + return completedFuture(collect); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java new file mode 100644 index 0000000..9c92330 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java @@ -0,0 +1,108 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.Try; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoaderWithTry; + +public class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory { + + @Override + public DataLoader idLoader( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Map map = new HashMap<>(); + keys.forEach(k -> map.put(k, k)); + Flux.fromIterable(map.entrySet()).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.>error(new IllegalStateException("Error")).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Stream>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error")))); + Flux.fromStream(failures).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + Map> errorByKey = new HashMap<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errorByKey.put(key, Try.succeeded(key)); + } else { + errorByKey.put(key, Try.failed(new IllegalStateException("Error"))); + } + } + Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.stream().limit(N).collect(toList()); + Flux> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)); + subFlux.concatWith(Flux.error(new IllegalStateException("Error"))) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.stream().limit(N).collect(toList()); + Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + + Flux.fromIterable(l).map(k -> Map.entry(k, k)) + .subscribe(subscriber); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java new file mode 100644 index 0000000..d75ff38 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java @@ -0,0 +1,100 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.Try; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Stream; + +import static org.dataloader.DataLoaderFactory.newPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newPublisherDataLoaderWithTry; + +public class PublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory { + + @Override + public DataLoader idLoader( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.fromIterable(keys).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.error(new IllegalStateException("Error")).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Stream> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error"))); + Flux.fromStream(failures).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List> errors = new ArrayList<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errors.add(Try.succeeded(key)); + } else { + errors.add(Try.failed(new IllegalStateException("Error"))); + } + } + Flux.fromIterable(errors).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux subFlux = Flux.fromIterable(nKeys); + subFlux.concatWith(Flux.error(new IllegalStateException("Error"))) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux.fromIterable(nKeys) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + + Flux.fromIterable(l) + .subscribe(subscriber); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java new file mode 100644 index 0000000..8c1bc22 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java @@ -0,0 +1,22 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public interface TestDataLoaderFactory { + DataLoader idLoader(DataLoaderOptions options, List> loadCalls); + + DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls); + + DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls); + + DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls); + + DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls); + + DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls); +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java new file mode 100644 index 0000000..d45932c --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java @@ -0,0 +1,11 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; + +import java.util.Collection; +import java.util.List; + +public interface TestReactiveDataLoaderFactory { + DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls); +} diff --git a/src/test/java/org/dataloader/impl/PromisedValuesImplTest.java b/src/test/java/org/dataloader/impl/PromisedValuesImplTest.java index 3c9ce65..6073319 100644 --- a/src/test/java/org/dataloader/impl/PromisedValuesImplTest.java +++ b/src/test/java/org/dataloader/impl/PromisedValuesImplTest.java @@ -1,6 +1,6 @@ package org.dataloader.impl; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; @@ -12,12 +12,12 @@ import static java.util.Arrays.asList; import static java.util.concurrent.CompletableFuture.supplyAsync; import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; public class PromisedValuesImplTest { diff --git a/src/test/java/org/dataloader/registries/DispatchPredicateTest.java b/src/test/java/org/dataloader/registries/DispatchPredicateTest.java index f241c2f..07a7416 100644 --- a/src/test/java/org/dataloader/registries/DispatchPredicateTest.java +++ b/src/test/java/org/dataloader/registries/DispatchPredicateTest.java @@ -4,12 +4,12 @@ import org.dataloader.DataLoader; import org.dataloader.fixtures.TestKit; import org.dataloader.fixtures.TestingClock; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.time.Duration; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class DispatchPredicateTest { diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java index 4eab564..94f5cff 100644 --- a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java @@ -3,7 +3,7 @@ import org.dataloader.BatchLoader; import org.dataloader.DataLoader; import org.dataloader.DataLoaderRegistry; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.concurrent.CompletableFuture; @@ -12,10 +12,9 @@ import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderFactory.newDataLoader; import static org.dataloader.fixtures.TestKit.asSet; -import static org.dataloader.registries.DispatchPredicate.DISPATCH_NEVER; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; public class ScheduledDataLoaderRegistryPredicateTest { final BatchLoader identityBatchLoader = CompletableFuture::completedFuture; diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java index 146c186..e82205d 100644 --- a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java @@ -1,11 +1,11 @@ package org.dataloader.registries; -import junit.framework.TestCase; import org.awaitility.core.ConditionTimeoutException; import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderRegistry; import org.dataloader.fixtures.TestKit; +import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.ArrayList; @@ -22,17 +22,22 @@ import static org.awaitility.Duration.TWO_SECONDS; import static org.dataloader.fixtures.TestKit.keysAsValues; import static org.dataloader.fixtures.TestKit.snooze; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; -public class ScheduledDataLoaderRegistryTest extends TestCase { +public class ScheduledDataLoaderRegistryTest { DispatchPredicate alwaysDispatch = (key, dl) -> true; DispatchPredicate neverDispatch = (key, dl) -> false; - public void test_basic_setup_works_like_a_normal_dlr() { + @Test + public void basic_setup_works_like_a_normal_dlr() { List> aCalls = new ArrayList<>(); List> bCalls = new ArrayList<>(); @@ -63,7 +68,8 @@ public void test_basic_setup_works_like_a_normal_dlr() { assertThat(bCalls, equalTo(singletonList(asList("BK1", "BK2")))); } - public void test_predicate_always_false() { + @Test + public void predicate_always_false() { List> calls = new ArrayList<>(); DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); @@ -92,7 +98,8 @@ public void test_predicate_always_false() { assertThat(calls.size(), equalTo(0)); } - public void test_predicate_that_eventually_returns_true() { + @Test + public void predicate_that_eventually_returns_true() { AtomicInteger counter = new AtomicInteger(); @@ -123,7 +130,8 @@ public void test_predicate_that_eventually_returns_true() { assertTrue(p2.isDone()); } - public void test_dispatchAllWithCountImmediately() { + @Test + public void dispatchAllWithCountImmediately() { List> calls = new ArrayList<>(); DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); dlA.load("K1"); @@ -140,7 +148,8 @@ public void test_dispatchAllWithCountImmediately() { assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); } - public void test_dispatchAllImmediately() { + @Test + public void dispatchAllImmediately() { List> calls = new ArrayList<>(); DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); dlA.load("K1"); @@ -156,7 +165,8 @@ public void test_dispatchAllImmediately() { assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); } - public void test_rescheduleNow() { + @Test + public void rescheduleNow() { AtomicInteger i = new AtomicInteger(); DispatchPredicate countingPredicate = (dataLoaderKey, dataLoader) -> i.incrementAndGet() > 5; @@ -179,7 +189,8 @@ public void test_rescheduleNow() { assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); } - public void test_it_will_take_out_the_schedule_once_it_dispatches() { + @Test + public void it_will_take_out_the_schedule_once_it_dispatches() { AtomicInteger counter = new AtomicInteger(); DispatchPredicate countingPredicate = (dataLoaderKey, dataLoader) -> counter.incrementAndGet() > 5; @@ -220,7 +231,8 @@ public void test_it_will_take_out_the_schedule_once_it_dispatches() { assertThat(calls, equalTo(asList(asList("K1", "K2"), asList("K3", "K4")))); } - public void test_close_is_a_one_way_door() { + @Test + public void close_is_a_one_way_door() { AtomicInteger counter = new AtomicInteger(); DispatchPredicate countingPredicate = (dataLoaderKey, dataLoader) -> { counter.incrementAndGet(); @@ -264,7 +276,8 @@ public void test_close_is_a_one_way_door() { assertEquals(counter.get(), countThen + 1); } - public void test_can_tick_after_first_dispatch_for_chain_data_loaders() { + @Test + public void can_tick_after_first_dispatch_for_chain_data_loaders() { // delays much bigger than the tick rate will mean multiple calls to dispatch DataLoader dlA = TestKit.idLoaderAsync(Duration.ofMillis(100)); @@ -293,7 +306,8 @@ public void test_can_tick_after_first_dispatch_for_chain_data_loaders() { registry.close(); } - public void test_chain_data_loaders_will_hang_if_not_in_ticker_mode() { + @Test + public void chain_data_loaders_will_hang_if_not_in_ticker_mode() { // delays much bigger than the tick rate will mean multiple calls to dispatch DataLoader dlA = TestKit.idLoaderAsync(Duration.ofMillis(100)); @@ -325,7 +339,8 @@ public void test_chain_data_loaders_will_hang_if_not_in_ticker_mode() { registry.close(); } - public void test_executors_are_shutdown() { + @Test + public void executors_are_shutdown() { ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry().build(); ScheduledExecutorService executorService = registry.getScheduledExecutorService(); @@ -345,4 +360,4 @@ public void test_executors_are_shutdown() { } -} \ No newline at end of file +} diff --git a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java index beb7c18..ff9ec8e 100644 --- a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java +++ b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java @@ -3,7 +3,7 @@ import org.dataloader.BatchLoaderEnvironment; import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; @@ -20,8 +20,8 @@ import static org.dataloader.fixtures.TestKit.keysAsValues; import static org.dataloader.fixtures.TestKit.keysAsValuesWithContext; import static org.dataloader.fixtures.TestKit.snooze; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; public class BatchLoaderSchedulerTest { @@ -36,6 +36,11 @@ public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderC public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { return scheduledCall.invoke(); } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + scheduledCall.invoke(); + } }; private BatchLoaderScheduler delayedScheduling(int ms) { @@ -56,6 +61,12 @@ public CompletionStage> scheduleMappedBatchLoader(ScheduledMapp return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + snooze(ms); + scheduledCall.invoke(); + } }; } @@ -139,6 +150,15 @@ public CompletionStage> scheduleMappedBatchLoader(ScheduledMapp return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + CompletableFuture.supplyAsync(() -> { + snooze(10); + scheduledCall.invoke(); + return null; + }); + } }; DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(funkyScheduler); diff --git a/src/test/java/org/dataloader/stats/StatisticsCollectorTest.java b/src/test/java/org/dataloader/stats/StatisticsCollectorTest.java index fbfd5e2..f1cc8d8 100644 --- a/src/test/java/org/dataloader/stats/StatisticsCollectorTest.java +++ b/src/test/java/org/dataloader/stats/StatisticsCollectorTest.java @@ -5,13 +5,13 @@ import org.dataloader.stats.context.IncrementCacheHitCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.concurrent.CompletableFuture; import static java.util.Collections.singletonList; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; public class StatisticsCollectorTest { diff --git a/src/test/java/org/dataloader/stats/StatisticsTest.java b/src/test/java/org/dataloader/stats/StatisticsTest.java index b900807..6c90907 100644 --- a/src/test/java/org/dataloader/stats/StatisticsTest.java +++ b/src/test/java/org/dataloader/stats/StatisticsTest.java @@ -1,11 +1,11 @@ package org.dataloader.stats; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Map; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; public class StatisticsTest {