From eb2b40cc2c50300ef5c913c61737f553a8549a65 Mon Sep 17 00:00:00 2001 From: Alexandre Carlton Date: Mon, 20 May 2024 22:26:16 +1000 Subject: [PATCH 1/2] Inline BatchPublisher tests into DataLoaderTest We now have the same coverage but with less code. Note that: - this is currently failing on 'duplicate keys when caching disabled'. - we still need to add tests that only make sense for the Publisher variants (e.g. half-completed keys). --- .../DataLoaderBatchPublisherTest.java | 1096 ----------------- .../DataLoaderMappedBatchPublisherTest.java | 175 --- .../java/org/dataloader/DataLoaderTest.java | 132 +- 3 files changed, 116 insertions(+), 1287 deletions(-) delete mode 100644 src/test/java/org/dataloader/DataLoaderBatchPublisherTest.java delete mode 100644 src/test/java/org/dataloader/DataLoaderMappedBatchPublisherTest.java diff --git a/src/test/java/org/dataloader/DataLoaderBatchPublisherTest.java b/src/test/java/org/dataloader/DataLoaderBatchPublisherTest.java deleted file mode 100644 index e14d9f7..0000000 --- a/src/test/java/org/dataloader/DataLoaderBatchPublisherTest.java +++ /dev/null @@ -1,1096 +0,0 @@ -package org.dataloader; - -import org.dataloader.fixtures.CustomCacheMap; -import org.dataloader.fixtures.JsonObject; -import org.dataloader.fixtures.User; -import org.dataloader.fixtures.UserManager; -import org.dataloader.impl.CompletableFutureKit; -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static java.util.Arrays.asList; -import static java.util.Collections.emptyList; -import static java.util.Collections.singletonList; -import static org.awaitility.Awaitility.await; -import static org.dataloader.DataLoaderFactory.newDataLoader; -import static org.dataloader.DataLoaderFactory.newPublisherDataLoader; -import static org.dataloader.DataLoaderFactory.newPublisherDataLoaderWithTry; -import static org.dataloader.DataLoaderOptions.newOptions; -import static org.dataloader.fixtures.TestKit.listFrom; -import static org.dataloader.impl.CompletableFutureKit.cause; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; - -public class DataLoaderBatchPublisherTest { - - @Test - public void should_Build_a_really_really_simple_data_loader() { - AtomicBoolean success = new AtomicBoolean(); - DataLoader identityLoader = newPublisherDataLoader(keysAsValues()); - - CompletionStage future1 = identityLoader.load(1); - - future1.thenAccept(value -> { - assertThat(value, equalTo(1)); - success.set(true); - }); - identityLoader.dispatch(); - await().untilAtomic(success, is(true)); - } - - @Test - public void should_Support_loading_multiple_keys_in_one_call() { - AtomicBoolean success = new AtomicBoolean(); - DataLoader identityLoader = newPublisherDataLoader(keysAsValues()); - - CompletionStage> futureAll = identityLoader.loadMany(asList(1, 2)); - futureAll.thenAccept(promisedValues -> { - assertThat(promisedValues.size(), is(2)); - success.set(true); - }); - identityLoader.dispatch(); - await().untilAtomic(success, is(true)); - assertThat(futureAll.toCompletableFuture().join(), equalTo(asList(1, 2))); - } - - @Test - public void should_Resolve_to_empty_list_when_no_keys_supplied() { - AtomicBoolean success = new AtomicBoolean(); - DataLoader identityLoader = newPublisherDataLoader(keysAsValues()); - CompletableFuture> futureEmpty = identityLoader.loadMany(emptyList()); - futureEmpty.thenAccept(promisedValues -> { - assertThat(promisedValues.size(), is(0)); - success.set(true); - }); - identityLoader.dispatch(); - await().untilAtomic(success, is(true)); - assertThat(futureEmpty.join(), empty()); - } - - @Test - public void should_Return_zero_entries_dispatched_when_no_keys_supplied() { - AtomicBoolean success = new AtomicBoolean(); - DataLoader identityLoader = newPublisherDataLoader(keysAsValues()); - CompletableFuture> futureEmpty = identityLoader.loadMany(emptyList()); - futureEmpty.thenAccept(promisedValues -> { - assertThat(promisedValues.size(), is(0)); - success.set(true); - }); - DispatchResult dispatchResult = identityLoader.dispatchWithCounts(); - await().untilAtomic(success, is(true)); - assertThat(dispatchResult.getKeysCount(), equalTo(0)); - } - - @Test - public void should_Batch_multiple_requests() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = identityLoader.load(1); - CompletableFuture future2 = identityLoader.load(2); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(future1.get(), equalTo(1)); - assertThat(future2.get(), equalTo(2)); - assertThat(loadCalls, equalTo(singletonList(asList(1, 2)))); - } - - @Test - public void should_Return_number_of_batched_entries() { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = identityLoader.load(1); - CompletableFuture future2 = identityLoader.load(2); - DispatchResult dispatchResult = identityLoader.dispatchWithCounts(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(dispatchResult.getKeysCount(), equalTo(2)); // its two because it's the number dispatched (by key) not the load calls - assertThat(dispatchResult.getPromisedResults().isDone(), equalTo(true)); - } - - @Test - public void should_Coalesce_identical_requests() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1a = identityLoader.load(1); - CompletableFuture future1b = identityLoader.load(1); - assertThat(future1a, equalTo(future1b)); - identityLoader.dispatch(); - - await().until(future1a::isDone); - assertThat(future1a.get(), equalTo(1)); - assertThat(future1b.get(), equalTo(1)); - assertThat(loadCalls, equalTo(singletonList(singletonList(1)))); - } - - @Test - public void should_Cache_repeated_requests() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); - - CompletableFuture future1a = identityLoader.load("A"); - CompletableFuture future3 = identityLoader.load("C"); - identityLoader.dispatch(); - - await().until(() -> future1a.isDone() && future3.isDone()); - assertThat(future1a.get(), equalTo("A")); - assertThat(future3.get(), equalTo("C")); - assertThat(loadCalls, equalTo(asList(asList("A", "B"), singletonList("C")))); - - CompletableFuture future1b = identityLoader.load("A"); - CompletableFuture future2a = identityLoader.load("B"); - CompletableFuture future3a = identityLoader.load("C"); - identityLoader.dispatch(); - - await().until(() -> future1b.isDone() && future2a.isDone() && future3a.isDone()); - assertThat(future1b.get(), equalTo("A")); - assertThat(future2a.get(), equalTo("B")); - assertThat(future3a.get(), equalTo("C")); - assertThat(loadCalls, equalTo(asList(asList("A", "B"), singletonList("C")))); - } - - @Test - public void should_Not_redispatch_previous_load() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - identityLoader.dispatch(); - - CompletableFuture future2 = identityLoader.load("B"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(loadCalls, equalTo(asList(singletonList("A"), singletonList("B")))); - } - - @Test - public void should_Cache_on_redispatch() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - identityLoader.dispatch(); - - CompletableFuture> future2 = identityLoader.loadMany(asList("A", "B")); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo(asList("A", "B"))); - assertThat(loadCalls, equalTo(asList(singletonList("A"), singletonList("B")))); - } - - @Test - public void should_Clear_single_value_in_loader() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); - - // fluency - DataLoader dl = identityLoader.clear("A"); - assertThat(dl, equalTo(identityLoader)); - - CompletableFuture future1a = identityLoader.load("A"); - CompletableFuture future2a = identityLoader.load("B"); - identityLoader.dispatch(); - - await().until(() -> future1a.isDone() && future2a.isDone()); - assertThat(future1a.get(), equalTo("A")); - assertThat(future2a.get(), equalTo("B")); - assertThat(loadCalls, equalTo(asList(asList("A", "B"), singletonList("A")))); - } - - @Test - public void should_Clear_all_values_in_loader() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); - - DataLoader dlFluent = identityLoader.clearAll(); - assertThat(dlFluent, equalTo(identityLoader)); // fluency - - CompletableFuture future1a = identityLoader.load("A"); - CompletableFuture future2a = identityLoader.load("B"); - identityLoader.dispatch(); - - await().until(() -> future1a.isDone() && future2a.isDone()); - assertThat(future1a.get(), equalTo("A")); - assertThat(future2a.get(), equalTo("B")); - assertThat(loadCalls, equalTo(asList(asList("A", "B"), asList("A", "B")))); - } - - @Test - public void should_Allow_priming_the_cache() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - DataLoader dlFluency = identityLoader.prime("A", "A"); - assertThat(dlFluency, equalTo(identityLoader)); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(loadCalls, equalTo(singletonList(singletonList("B")))); - } - - @Test - public void should_Not_prime_keys_that_already_exist() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - identityLoader.prime("A", "X"); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - CompletableFuture> composite = identityLoader.dispatch(); - - await().until(composite::isDone); - assertThat(future1.get(), equalTo("X")); - assertThat(future2.get(), equalTo("B")); - - identityLoader.prime("A", "Y"); - identityLoader.prime("B", "Y"); - - CompletableFuture future1a = identityLoader.load("A"); - CompletableFuture future2a = identityLoader.load("B"); - CompletableFuture> composite2 = identityLoader.dispatch(); - - await().until(composite2::isDone); - assertThat(future1a.get(), equalTo("X")); - assertThat(future2a.get(), equalTo("B")); - assertThat(loadCalls, equalTo(singletonList(singletonList("B")))); - } - - @Test - public void should_Allow_to_forcefully_prime_the_cache() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - identityLoader.prime("A", "X"); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - CompletableFuture> composite = identityLoader.dispatch(); - - await().until(composite::isDone); - assertThat(future1.get(), equalTo("X")); - assertThat(future2.get(), equalTo("B")); - - identityLoader.clear("A").prime("A", "Y"); - identityLoader.clear("B").prime("B", "Y"); - - CompletableFuture future1a = identityLoader.load("A"); - CompletableFuture future2a = identityLoader.load("B"); - CompletableFuture> composite2 = identityLoader.dispatch(); - - await().until(composite2::isDone); - assertThat(future1a.get(), equalTo("Y")); - assertThat(future2a.get(), equalTo("Y")); - assertThat(loadCalls, equalTo(singletonList(singletonList("B")))); - } - - @Test - public void should_Allow_priming_the_cache_with_a_future() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - DataLoader dlFluency = identityLoader.prime("A", CompletableFuture.completedFuture("A")); - assertThat(dlFluency, equalTo(identityLoader)); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(loadCalls, equalTo(singletonList(singletonList("B")))); - } - - @Test - public void should_not_Cache_failed_fetches_on_complete_failure() { - List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idLoaderBlowsUps(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = errorLoader.load(1); - errorLoader.dispatch(); - - await().until(future1::isDone); - assertThat(future1.isCompletedExceptionally(), is(true)); - assertThat(cause(future1), instanceOf(IllegalStateException.class)); - - CompletableFuture future2 = errorLoader.load(1); - errorLoader.dispatch(); - - await().until(future2::isDone); - assertThat(future2.isCompletedExceptionally(), is(true)); - assertThat(cause(future2), instanceOf(IllegalStateException.class)); - assertThat(loadCalls, equalTo(asList(singletonList(1), singletonList(1)))); - } - - @Test - public void should_Resolve_to_error_to_indicate_failure() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader evenLoader = idLoaderOddEvenExceptions(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = evenLoader.load(1); - evenLoader.dispatch(); - - await().until(future1::isDone); - assertThat(future1.isCompletedExceptionally(), is(true)); - assertThat(cause(future1), instanceOf(IllegalStateException.class)); - - CompletableFuture future2 = evenLoader.load(2); - evenLoader.dispatch(); - - await().until(future2::isDone); - assertThat(future2.get(), equalTo(2)); - assertThat(loadCalls, equalTo(asList(singletonList(1), singletonList(2)))); - } - - // Accept any kind of key. - - @Test - public void should_Represent_failures_and_successes_simultaneously() throws ExecutionException, InterruptedException { - AtomicBoolean success = new AtomicBoolean(); - List> loadCalls = new ArrayList<>(); - DataLoader evenLoader = idLoaderOddEvenExceptions(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = evenLoader.load(1); - CompletableFuture future2 = evenLoader.load(2); - CompletableFuture future3 = evenLoader.load(3); - CompletableFuture future4 = evenLoader.load(4); - CompletableFuture> result = evenLoader.dispatch(); - result.thenAccept(promisedValues -> success.set(true)); - - await().untilAtomic(success, is(true)); - - assertThat(future1.isCompletedExceptionally(), is(true)); - assertThat(cause(future1), instanceOf(IllegalStateException.class)); - assertThat(future2.get(), equalTo(2)); - assertThat(future3.isCompletedExceptionally(), is(true)); - assertThat(future4.get(), equalTo(4)); - - assertThat(loadCalls, equalTo(singletonList(asList(1, 2, 3, 4)))); - } - - // Accepts options - - @Test - public void should_Cache_failed_fetches() { - List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idLoaderAllExceptions(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = errorLoader.load(1); - errorLoader.dispatch(); - - await().until(future1::isDone); - assertThat(future1.isCompletedExceptionally(), is(true)); - assertThat(cause(future1), instanceOf(IllegalStateException.class)); - - CompletableFuture future2 = errorLoader.load(1); - errorLoader.dispatch(); - - await().until(future2::isDone); - assertThat(future2.isCompletedExceptionally(), is(true)); - assertThat(cause(future2), instanceOf(IllegalStateException.class)); - - assertThat(loadCalls, equalTo(singletonList(singletonList(1)))); - } - - @Test - public void should_NOT_Cache_failed_fetches_if_told_not_too() { - DataLoaderOptions options = DataLoaderOptions.newOptions().setCachingExceptionsEnabled(false); - List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idLoaderAllExceptions(options, loadCalls); - - CompletableFuture future1 = errorLoader.load(1); - errorLoader.dispatch(); - - await().until(future1::isDone); - assertThat(future1.isCompletedExceptionally(), is(true)); - assertThat(cause(future1), instanceOf(IllegalStateException.class)); - - CompletableFuture future2 = errorLoader.load(1); - errorLoader.dispatch(); - - await().until(future2::isDone); - assertThat(future2.isCompletedExceptionally(), is(true)); - assertThat(cause(future2), instanceOf(IllegalStateException.class)); - - assertThat(loadCalls, equalTo(asList(singletonList(1), singletonList(1)))); - } - - - // Accepts object key in custom cacheKey function - - @Test - public void should_Handle_priming_the_cache_with_an_error() { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - identityLoader.prime(1, new IllegalStateException("Error")); - - CompletableFuture future1 = identityLoader.load(1); - identityLoader.dispatch(); - - await().until(future1::isDone); - assertThat(future1.isCompletedExceptionally(), is(true)); - assertThat(cause(future1), instanceOf(IllegalStateException.class)); - assertThat(loadCalls, equalTo(emptyList())); - } - - @Test - public void should_Clear_values_from_cache_after_errors() { - List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idLoaderBlowsUps(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = errorLoader.load(1); - future1.handle((value, t) -> { - if (t != null) { - // Presumably determine if this error is transient, and only clear the cache in that case. - errorLoader.clear(1); - } - return null; - }); - errorLoader.dispatch(); - - await().until(future1::isDone); - assertThat(future1.isCompletedExceptionally(), is(true)); - assertThat(cause(future1), instanceOf(IllegalStateException.class)); - - CompletableFuture future2 = errorLoader.load(1); - future2.handle((value, t) -> { - if (t != null) { - // Again, only do this if you can determine the error is transient. - errorLoader.clear(1); - } - return null; - }); - errorLoader.dispatch(); - - await().until(future2::isDone); - assertThat(future2.isCompletedExceptionally(), is(true)); - assertThat(cause(future2), instanceOf(IllegalStateException.class)); - assertThat(loadCalls, equalTo(asList(singletonList(1), singletonList(1)))); - } - - @Test - public void should_Propagate_error_to_all_loads() { - List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idLoaderBlowsUps(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = errorLoader.load(1); - CompletableFuture future2 = errorLoader.load(2); - errorLoader.dispatch(); - - await().until(future1::isDone); - assertThat(future1.isCompletedExceptionally(), is(true)); - Throwable cause = cause(future1); - assert cause != null; - assertThat(cause, instanceOf(IllegalStateException.class)); - assertThat(cause.getMessage(), equalTo("Error")); - - await().until(future2::isDone); - cause = cause(future2); - assert cause != null; - assertThat(cause.getMessage(), equalTo(cause.getMessage())); - assertThat(loadCalls, equalTo(singletonList(asList(1, 2)))); - } - - @Test - public void should_Accept_objects_as_keys() { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(new DataLoaderOptions(), loadCalls); - - Object keyA = new Object(); - Object keyB = new Object(); - - // Fetches as expected - - identityLoader.load(keyA); - identityLoader.load(keyB); - - identityLoader.dispatch().thenAccept(promisedValues -> { - assertThat(promisedValues.get(0), equalTo(keyA)); - assertThat(promisedValues.get(1), equalTo(keyB)); - }); - - assertThat(loadCalls.size(), equalTo(1)); - assertThat(loadCalls.get(0).size(), equalTo(2)); - assertThat(loadCalls.get(0).toArray()[0], equalTo(keyA)); - assertThat(loadCalls.get(0).toArray()[1], equalTo(keyB)); - - // Caching - identityLoader.clear(keyA); - //noinspection SuspiciousMethodCalls - loadCalls.remove(keyA); - - identityLoader.load(keyA); - identityLoader.load(keyB); - - identityLoader.dispatch().thenAccept(promisedValues -> { - assertThat(promisedValues.get(0), equalTo(keyA)); - assertThat(identityLoader.getCacheKey(keyB), equalTo(keyB)); - }); - - assertThat(loadCalls.size(), equalTo(2)); - assertThat(loadCalls.get(1).size(), equalTo(1)); - assertThat(loadCalls.get(1).toArray()[0], equalTo(keyA)); - } - - @Test - public void should_Disable_caching() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = - idLoader(newOptions().setCachingEnabled(false), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); - - CompletableFuture future1a = identityLoader.load("A"); - CompletableFuture future3 = identityLoader.load("C"); - identityLoader.dispatch(); - - await().until(() -> future1a.isDone() && future3.isDone()); - assertThat(future1a.get(), equalTo("A")); - assertThat(future3.get(), equalTo("C")); - assertThat(loadCalls, equalTo(asList(asList("A", "B"), asList("A", "C")))); - - CompletableFuture future1b = identityLoader.load("A"); - CompletableFuture future2a = identityLoader.load("B"); - CompletableFuture future3a = identityLoader.load("C"); - identityLoader.dispatch(); - - await().until(() -> future1b.isDone() && future2a.isDone() && future3a.isDone()); - assertThat(future1b.get(), equalTo("A")); - assertThat(future2a.get(), equalTo("B")); - assertThat(future3a.get(), equalTo("C")); - assertThat(loadCalls, equalTo(asList(asList("A", "B"), - asList("A", "C"), asList("A", "B", "C")))); - } - - @Test - public void should_work_with_duplicate_keys_when_caching_disabled() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = - idLoader(newOptions().setCachingEnabled(false), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - CompletableFuture future3 = identityLoader.load("A"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone() && future3.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(future3.get(), equalTo("A")); - assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A")))); - } - - @Test - public void should_work_with_duplicate_keys_when_caching_enabled() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = - idLoader(newOptions().setCachingEnabled(true), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - CompletableFuture future3 = identityLoader.load("A"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone() && future3.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(future3.get(), equalTo("A")); - assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); - } - - // It is resilient to job queue ordering - - @Test - public void should_Accept_objects_with_a_complex_key() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setCacheKeyFunction(getJsonObjectCacheMapFn()); - DataLoader identityLoader = idLoader(options, loadCalls); - - JsonObject key1 = new JsonObject().put("id", 123); - JsonObject key2 = new JsonObject().put("id", 123); - - CompletableFuture future1 = identityLoader.load(key1); - CompletableFuture future2 = identityLoader.load(key2); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(loadCalls, equalTo(singletonList(singletonList(key1)))); - assertThat(future1.get(), equalTo(key1)); - assertThat(future2.get(), equalTo(key1)); - } - - // Helper methods - - @Test - public void should_Clear_objects_with_complex_key() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setCacheKeyFunction(getJsonObjectCacheMapFn()); - DataLoader identityLoader = idLoader(options, loadCalls); - - JsonObject key1 = new JsonObject().put("id", 123); - JsonObject key2 = new JsonObject().put("id", 123); - - CompletableFuture future1 = identityLoader.load(key1); - identityLoader.dispatch(); - - await().until(future1::isDone); - identityLoader.clear(key2); // clear equivalent object key - - CompletableFuture future2 = identityLoader.load(key1); - identityLoader.dispatch(); - - await().until(future2::isDone); - assertThat(loadCalls, equalTo(asList(singletonList(key1), singletonList(key1)))); - assertThat(future1.get(), equalTo(key1)); - assertThat(future2.get(), equalTo(key1)); - } - - @Test - public void should_Accept_objects_with_different_order_of_keys() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setCacheKeyFunction(getJsonObjectCacheMapFn()); - DataLoader identityLoader = idLoader(options, loadCalls); - - JsonObject key1 = new JsonObject().put("a", 123).put("b", 321); - JsonObject key2 = new JsonObject().put("b", 321).put("a", 123); - - // Fetches as expected - - CompletableFuture future1 = identityLoader.load(key1); - CompletableFuture future2 = identityLoader.load(key2); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(loadCalls, equalTo(singletonList(singletonList(key1)))); - assertThat(loadCalls.size(), equalTo(1)); - assertThat(future1.get(), equalTo(key1)); - assertThat(future2.get(), equalTo(key2)); - } - - @Test - public void should_Allow_priming_the_cache_with_an_object_key() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setCacheKeyFunction(getJsonObjectCacheMapFn()); - DataLoader identityLoader = idLoader(options, loadCalls); - - JsonObject key1 = new JsonObject().put("id", 123); - JsonObject key2 = new JsonObject().put("id", 123); - - identityLoader.prime(key1, key1); - - CompletableFuture future1 = identityLoader.load(key1); - CompletableFuture future2 = identityLoader.load(key2); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(loadCalls, equalTo(emptyList())); - assertThat(future1.get(), equalTo(key1)); - assertThat(future2.get(), equalTo(key1)); - } - - @Test - public void should_Accept_a_custom_cache_map_implementation() throws ExecutionException, InterruptedException { - CustomCacheMap customMap = new CustomCacheMap(); - List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setCacheMap(customMap); - DataLoader identityLoader = idLoader(options, loadCalls); - - // Fetches as expected - - CompletableFuture future1 = identityLoader.load("a"); - CompletableFuture future2 = identityLoader.load("b"); - CompletableFuture> composite = identityLoader.dispatch(); - - await().until(composite::isDone); - assertThat(future1.get(), equalTo("a")); - assertThat(future2.get(), equalTo("b")); - - assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); - assertArrayEquals(customMap.stash.keySet().toArray(), asList("a", "b").toArray()); - - CompletableFuture future3 = identityLoader.load("c"); - CompletableFuture future2a = identityLoader.load("b"); - composite = identityLoader.dispatch(); - - await().until(composite::isDone); - assertThat(future3.get(), equalTo("c")); - assertThat(future2a.get(), equalTo("b")); - - assertThat(loadCalls, equalTo(asList(asList("a", "b"), singletonList("c")))); - assertArrayEquals(customMap.stash.keySet().toArray(), asList("a", "b", "c").toArray()); - - // Supports clear - - identityLoader.clear("b"); - assertArrayEquals(customMap.stash.keySet().toArray(), asList("a", "c").toArray()); - - CompletableFuture future2b = identityLoader.load("b"); - composite = identityLoader.dispatch(); - - await().until(composite::isDone); - assertThat(future2b.get(), equalTo("b")); - assertThat(loadCalls, equalTo(asList(asList("a", "b"), - singletonList("c"), singletonList("b")))); - assertArrayEquals(customMap.stash.keySet().toArray(), asList("a", "c", "b").toArray()); - - // Supports clear all - - identityLoader.clearAll(); - assertArrayEquals(customMap.stash.keySet().toArray(), emptyList().toArray()); - } - - @Test - public void should_degrade_gracefully_if_cache_get_throws() { - CacheMap cache = new ThrowingCacheMap(); - DataLoaderOptions options = newOptions().setCachingEnabled(true).setCacheMap(cache); - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(options, loadCalls); - - assertThat(identityLoader.getIfPresent("a"), equalTo(Optional.empty())); - - CompletableFuture future = identityLoader.load("a"); - identityLoader.dispatch(); - assertThat(future.join(), equalTo("a")); - } - - @Test - public void batching_disabled_should_dispatch_immediately() { - List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setBatchingEnabled(false); - DataLoader identityLoader = idLoader(options, loadCalls); - - CompletableFuture fa = identityLoader.load("A"); - CompletableFuture fb = identityLoader.load("B"); - - // caching is on still - CompletableFuture fa1 = identityLoader.load("A"); - CompletableFuture fb1 = identityLoader.load("B"); - - List values = CompletableFutureKit.allOf(asList(fa, fb, fa1, fb1)).join(); - - assertThat(fa.join(), equalTo("A")); - assertThat(fb.join(), equalTo("B")); - assertThat(fa1.join(), equalTo("A")); - assertThat(fb1.join(), equalTo("B")); - - assertThat(values, equalTo(asList("A", "B", "A", "B"))); - - assertThat(loadCalls, equalTo(asList( - singletonList("A"), - singletonList("B")))); - - } - - @Test - public void batching_disabled_and_caching_disabled_should_dispatch_immediately_and_forget() { - List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setBatchingEnabled(false).setCachingEnabled(false); - DataLoader identityLoader = idLoader(options, loadCalls); - - CompletableFuture fa = identityLoader.load("A"); - CompletableFuture fb = identityLoader.load("B"); - - // caching is off - CompletableFuture fa1 = identityLoader.load("A"); - CompletableFuture fb1 = identityLoader.load("B"); - - List values = CompletableFutureKit.allOf(asList(fa, fb, fa1, fb1)).join(); - - assertThat(fa.join(), equalTo("A")); - assertThat(fb.join(), equalTo("B")); - assertThat(fa1.join(), equalTo("A")); - assertThat(fb1.join(), equalTo("B")); - - assertThat(values, equalTo(asList("A", "B", "A", "B"))); - - assertThat(loadCalls, equalTo(asList( - singletonList("A"), - singletonList("B"), - singletonList("A"), - singletonList("B") - ))); - - } - - @Test - public void batches_multiple_requests_with_max_batch_size() { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(newOptions().setMaxBatchSize(2), loadCalls); - - CompletableFuture f1 = identityLoader.load(1); - CompletableFuture f2 = identityLoader.load(2); - CompletableFuture f3 = identityLoader.load(3); - - identityLoader.dispatch(); - - CompletableFuture.allOf(f1, f2, f3).join(); - - assertThat(f1.join(), equalTo(1)); - assertThat(f2.join(), equalTo(2)); - assertThat(f3.join(), equalTo(3)); - - assertThat(loadCalls, equalTo(asList(asList(1, 2), singletonList(3)))); - - } - - @Test - public void can_split_max_batch_sizes_correctly() { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(newOptions().setMaxBatchSize(5), loadCalls); - - for (int i = 0; i < 21; i++) { - identityLoader.load(i); - } - List> expectedCalls = new ArrayList<>(); - expectedCalls.add(listFrom(0, 5)); - expectedCalls.add(listFrom(5, 10)); - expectedCalls.add(listFrom(10, 15)); - expectedCalls.add(listFrom(15, 20)); - expectedCalls.add(listFrom(20, 21)); - - List result = identityLoader.dispatch().join(); - - assertThat(result, equalTo(listFrom(0, 21))); - assertThat(loadCalls, equalTo(expectedCalls)); - - } - - @Test - public void should_Batch_loads_occurring_within_futures() { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idLoader(newOptions(), loadCalls); - - Supplier nullValue = () -> null; - - AtomicBoolean v4Called = new AtomicBoolean(); - - CompletableFuture.supplyAsync(nullValue).thenAccept(v1 -> { - identityLoader.load("a"); - CompletableFuture.supplyAsync(nullValue).thenAccept(v2 -> { - identityLoader.load("b"); - CompletableFuture.supplyAsync(nullValue).thenAccept(v3 -> { - identityLoader.load("c"); - CompletableFuture.supplyAsync(nullValue).thenAccept( - v4 -> { - identityLoader.load("d"); - v4Called.set(true); - }); - }); - }); - }); - - await().untilTrue(v4Called); - - identityLoader.dispatchAndJoin(); - - assertThat(loadCalls, equalTo( - singletonList(asList("a", "b", "c", "d")))); - } - - @Test - public void can_call_a_loader_from_a_loader() throws Exception { - List> deepLoadCalls = new ArrayList<>(); - DataLoader deepLoader = newDataLoader(keys -> { - deepLoadCalls.add(keys); - return CompletableFuture.completedFuture(keys); - }); - - List> aLoadCalls = new ArrayList<>(); - DataLoader aLoader = newDataLoader(keys -> { - aLoadCalls.add(keys); - return deepLoader.loadMany(keys); - }); - - List> bLoadCalls = new ArrayList<>(); - DataLoader bLoader = newDataLoader(keys -> { - bLoadCalls.add(keys); - return deepLoader.loadMany(keys); - }); - - CompletableFuture a1 = aLoader.load("A1"); - CompletableFuture a2 = aLoader.load("A2"); - CompletableFuture b1 = bLoader.load("B1"); - CompletableFuture b2 = bLoader.load("B2"); - - CompletableFuture.allOf( - aLoader.dispatch(), - deepLoader.dispatch(), - bLoader.dispatch(), - deepLoader.dispatch() - ).join(); - - assertThat(a1.get(), equalTo("A1")); - assertThat(a2.get(), equalTo("A2")); - assertThat(b1.get(), equalTo("B1")); - assertThat(b2.get(), equalTo("B2")); - - assertThat(aLoadCalls, equalTo( - singletonList(asList("A1", "A2")))); - - assertThat(bLoadCalls, equalTo( - singletonList(asList("B1", "B2")))); - - assertThat(deepLoadCalls, equalTo( - asList(asList("A1", "A2"), asList("B1", "B2")))); - } - - @Test - public void should_allow_composition_of_data_loader_calls() { - UserManager userManager = new UserManager(); - - BatchLoader userBatchLoader = userIds -> CompletableFuture - .supplyAsync(() -> userIds - .stream() - .map(userManager::loadUserById) - .collect(Collectors.toList())); - DataLoader userLoader = newDataLoader(userBatchLoader); - - AtomicBoolean gandalfCalled = new AtomicBoolean(false); - AtomicBoolean sarumanCalled = new AtomicBoolean(false); - - userLoader.load(1L) - .thenAccept(user -> userLoader.load(user.getInvitedByID()) - .thenAccept(invitedBy -> { - gandalfCalled.set(true); - assertThat(invitedBy.getName(), equalTo("Manwë")); - })); - - userLoader.load(2L) - .thenAccept(user -> userLoader.load(user.getInvitedByID()) - .thenAccept(invitedBy -> { - sarumanCalled.set(true); - assertThat(invitedBy.getName(), equalTo("Aulë")); - })); - - List allResults = userLoader.dispatchAndJoin(); - - await().untilTrue(gandalfCalled); - await().untilTrue(sarumanCalled); - - assertThat(allResults.size(), equalTo(4)); - } - - private static CacheKey getJsonObjectCacheMapFn() { - return key -> key.stream() - .map(entry -> entry.getKey() + ":" + entry.getValue()) - .sorted() - .collect(Collectors.joining()); - } - - private static DataLoader idLoader(DataLoaderOptions options, List> loadCalls) { - return newPublisherDataLoader((BatchPublisher) (keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - Flux.fromIterable(keys).subscribe(subscriber); - }, options); - } - - private static DataLoader idLoaderBlowsUps( - DataLoaderOptions options, List> loadCalls) { - return newPublisherDataLoader((BatchPublisher) (keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - Flux.error(new IllegalStateException("Error")).subscribe(subscriber); - }, options); - } - - private static DataLoader idLoaderAllExceptions( - DataLoaderOptions options, List> loadCalls) { - return newPublisherDataLoaderWithTry((BatchPublisher>) (keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - Stream> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error"))); - Flux.fromStream(failures).subscribe(subscriber); - }, options); - } - - private static DataLoader idLoaderOddEvenExceptions( - DataLoaderOptions options, List> loadCalls) { - return newPublisherDataLoaderWithTry((BatchPublisher>) (keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - - List> errors = new ArrayList<>(); - for (Integer key : keys) { - if (key % 2 == 0) { - errors.add(Try.succeeded(key)); - } else { - errors.add(Try.failed(new IllegalStateException("Error"))); - } - } - Flux.fromIterable(errors).subscribe(subscriber); - }, options); - } - - private static BatchPublisher keysAsValues() { - return (keys, subscriber) -> Flux.fromIterable(keys).subscribe(subscriber); - } - - private static class ThrowingCacheMap extends CustomCacheMap { - @Override - public CompletableFuture get(String key) { - throw new RuntimeException("Cache implementation failed."); - } - } -} diff --git a/src/test/java/org/dataloader/DataLoaderMappedBatchPublisherTest.java b/src/test/java/org/dataloader/DataLoaderMappedBatchPublisherTest.java deleted file mode 100644 index 5b9ca0b..0000000 --- a/src/test/java/org/dataloader/DataLoaderMappedBatchPublisherTest.java +++ /dev/null @@ -1,175 +0,0 @@ -package org.dataloader; - -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; - -import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; -import static org.awaitility.Awaitility.await; -import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader; -import static org.dataloader.DataLoaderOptions.newOptions; -import static org.dataloader.fixtures.TestKit.listFrom; -import static org.dataloader.impl.CompletableFutureKit.cause; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; - -public class DataLoaderMappedBatchPublisherTest { - - MappedBatchPublisher evensOnlyMappedBatchLoader = (keys, subscriber) -> { - Map mapOfResults = new HashMap<>(); - - AtomicInteger index = new AtomicInteger(); - keys.forEach(k -> { - int i = index.getAndIncrement(); - if (i % 2 == 0) { - mapOfResults.put(k, k); - } - }); - Flux.fromIterable(mapOfResults.entrySet()).subscribe(subscriber); - }; - - private static DataLoader idMapLoader(DataLoaderOptions options, List> loadCalls) { - MappedBatchPublisher kvBatchLoader = (keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - Map map = new HashMap<>(); - //noinspection unchecked - keys.forEach(k -> map.put(k, (V) k)); - Flux.fromIterable(map.entrySet()).subscribe(subscriber); - }; - return DataLoaderFactory.newMappedPublisherDataLoader(kvBatchLoader, options); - } - - private static DataLoader idMapLoaderBlowsUps( - DataLoaderOptions options, List> loadCalls) { - return newMappedPublisherDataLoader((MappedBatchPublisher) (keys, subscriber) -> { - loadCalls.add(new ArrayList<>(keys)); - Flux.>error(new IllegalStateException("Error")).subscribe(subscriber); - }, options); - } - - - @Test - public void basic_map_batch_loading() { - DataLoader loader = DataLoaderFactory.newMappedPublisherDataLoader(evensOnlyMappedBatchLoader); - - loader.load("A"); - loader.load("B"); - loader.loadMany(asList("C", "D")); - - List results = loader.dispatchAndJoin(); - - assertThat(results.size(), equalTo(4)); - assertThat(results, equalTo(asList("A", null, "C", null))); - } - - @Test - public void should_map_Batch_multiple_requests() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idMapLoader(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = identityLoader.load(1); - CompletableFuture future2 = identityLoader.load(2); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone()); - assertThat(future1.get(), equalTo(1)); - assertThat(future2.get(), equalTo(2)); - assertThat(loadCalls, equalTo(singletonList(asList(1, 2)))); - } - - @Test - public void can_split_max_batch_sizes_correctly() { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = idMapLoader(newOptions().setMaxBatchSize(5), loadCalls); - - for (int i = 0; i < 21; i++) { - identityLoader.load(i); - } - List> expectedCalls = new ArrayList<>(); - expectedCalls.add(listFrom(0, 5)); - expectedCalls.add(listFrom(5, 10)); - expectedCalls.add(listFrom(10, 15)); - expectedCalls.add(listFrom(15, 20)); - expectedCalls.add(listFrom(20, 21)); - - List result = identityLoader.dispatch().join(); - - assertThat(result, equalTo(listFrom(0, 21))); - assertThat(loadCalls, equalTo(expectedCalls)); - } - - @Test - public void should_Propagate_error_to_all_loads() { - List> loadCalls = new ArrayList<>(); - DataLoader errorLoader = idMapLoaderBlowsUps(new DataLoaderOptions(), loadCalls); - - CompletableFuture future1 = errorLoader.load(1); - CompletableFuture future2 = errorLoader.load(2); - errorLoader.dispatch(); - - await().until(future1::isDone); - - assertThat(future1.isCompletedExceptionally(), is(true)); - Throwable cause = cause(future1); - assert cause != null; - assertThat(cause, instanceOf(IllegalStateException.class)); - assertThat(cause.getMessage(), equalTo("Error")); - - await().until(future2::isDone); - cause = cause(future2); - assert cause != null; - assertThat(cause.getMessage(), equalTo(cause.getMessage())); - - assertThat(loadCalls, equalTo(singletonList(asList(1, 2)))); - } - - @Test - public void should_work_with_duplicate_keys_when_caching_disabled() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = - idMapLoader(newOptions().setCachingEnabled(false), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - CompletableFuture future3 = identityLoader.load("A"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone() && future3.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(future3.get(), equalTo("A")); - - // the map batch functions use a set of keys as input and hence remove duplicates unlike list variant - assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); - } - - @Test - public void should_work_with_duplicate_keys_when_caching_enabled() throws ExecutionException, InterruptedException { - List> loadCalls = new ArrayList<>(); - DataLoader identityLoader = - idMapLoader(newOptions().setCachingEnabled(true), loadCalls); - - CompletableFuture future1 = identityLoader.load("A"); - CompletableFuture future2 = identityLoader.load("B"); - CompletableFuture future3 = identityLoader.load("A"); - identityLoader.dispatch(); - - await().until(() -> future1.isDone() && future2.isDone() && future3.isDone()); - assertThat(future1.get(), equalTo("A")); - assertThat(future2.get(), equalTo("B")); - assertThat(future3.get(), equalTo("A")); - assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); - } - -} diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java index db71c1e..ea4b2b9 100644 --- a/src/test/java/org/dataloader/DataLoaderTest.java +++ b/src/test/java/org/dataloader/DataLoaderTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Flux; import java.util.ArrayList; import java.util.Collection; @@ -49,6 +50,10 @@ import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderFactory.newDataLoader; import static org.dataloader.DataLoaderFactory.newMappedDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoaderWithTry; +import static org.dataloader.DataLoaderFactory.newPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newPublisherDataLoaderWithTry; import static org.dataloader.DataLoaderOptions.newOptions; import static org.dataloader.fixtures.TestKit.futureError; import static org.dataloader.fixtures.TestKit.listFrom; @@ -727,7 +732,7 @@ public void should_work_with_duplicate_keys_when_caching_disabled(TestDataLoader assertThat(future1.get(), equalTo("A")); assertThat(future2.get(), equalTo("B")); assertThat(future3.get(), equalTo("A")); - if (factory instanceof ListDataLoaderFactory) { + if (factory instanceof ListDataLoaderFactory || factory instanceof PublisherDataLoaderFactory) { assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A")))); } else { assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); @@ -1147,32 +1152,30 @@ private static CacheKey getJsonObjectCacheMapFn() { private static Stream dataLoaderFactories() { return Stream.of( Arguments.of(Named.of("List DataLoader", new ListDataLoaderFactory())), - Arguments.of(Named.of("Mapped DataLoader", new MappedDataLoaderFactory())) + Arguments.of(Named.of("Mapped DataLoader", new MappedDataLoaderFactory())), + Arguments.of(Named.of("Publisher DataLoader", new PublisherDataLoaderFactory())), + Arguments.of(Named.of("Mapped Publisher DataLoader", new MappedPublisherDataLoaderFactory())) ); } public interface TestDataLoaderFactory { - DataLoader idLoader(DataLoaderOptions options, List> loadCalls); - DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls); + DataLoader idLoader(DataLoaderOptions options, List> loadCalls); + DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls); DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls); DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls); } private static class ListDataLoaderFactory implements TestDataLoaderFactory { @Override - public DataLoader idLoader(DataLoaderOptions options, List> loadCalls) { + public DataLoader idLoader(DataLoaderOptions options, List> loadCalls) { return newDataLoader(keys -> { loadCalls.add(new ArrayList<>(keys)); - @SuppressWarnings("unchecked") - List values = keys.stream() - .map(k -> (V) k) - .collect(Collectors.toList()); - return CompletableFuture.completedFuture(values); + return CompletableFuture.completedFuture(keys); }, options); } @Override - public DataLoader idLoaderBlowsUps( + public DataLoader idLoaderBlowsUps( DataLoaderOptions options, List> loadCalls) { return newDataLoader(keys -> { loadCalls.add(new ArrayList<>(keys)); @@ -1211,19 +1214,18 @@ public DataLoader idLoaderOddEvenExceptions(DataLoaderOptions o private static class MappedDataLoaderFactory implements TestDataLoaderFactory { @Override - public DataLoader idLoader( + public DataLoader idLoader( DataLoaderOptions options, List> loadCalls) { return newMappedDataLoader((keys) -> { loadCalls.add(new ArrayList<>(keys)); - Map map = new HashMap<>(); - //noinspection unchecked - keys.forEach(k -> map.put(k, (V) k)); + Map map = new HashMap<>(); + keys.forEach(k -> map.put(k, k)); return CompletableFuture.completedFuture(map); }, options); } @Override - public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { return newMappedDataLoader((keys) -> { loadCalls.add(new ArrayList<>(keys)); return futureError(); @@ -1260,6 +1262,104 @@ public DataLoader idLoaderOddEvenExceptions( } } + private static class PublisherDataLoaderFactory implements TestDataLoaderFactory { + + @Override + public DataLoader idLoader( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.fromIterable(keys).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.error(new IllegalStateException("Error")).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Stream> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error"))); + Flux.fromStream(failures).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List> errors = new ArrayList<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errors.add(Try.succeeded(key)); + } else { + errors.add(Try.failed(new IllegalStateException("Error"))); + } + } + Flux.fromIterable(errors).subscribe(subscriber); + }, options); + } + } + + private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory { + + @Override + public DataLoader idLoader( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Map map = new HashMap<>(); + keys.forEach(k -> map.put(k, k)); + Flux.fromIterable(map.entrySet()).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.>error(new IllegalStateException("Error")).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Stream>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error")))); + Flux.fromStream(failures).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + Map> errorByKey = new HashMap<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errorByKey.put(key, Try.succeeded(key)); + } else { + errorByKey.put(key, Try.failed(new IllegalStateException("Error"))); + } + } + Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber); + }, options); + } + } + private static class ThrowingCacheMap extends CustomCacheMap { @Override public CompletableFuture get(String key) { From 651e5611f3beecf6a74f5388431033fd260704dd Mon Sep 17 00:00:00 2001 From: Alexandre Carlton Date: Mon, 20 May 2024 22:49:59 +1000 Subject: [PATCH 2/2] Fix MappedBatchPublisher loaders to work without cache If we did not cache the futures, then the MappedBatchPublisher DataLoader would not work as we were only completing the last future for a given key. --- .../java/org/dataloader/DataLoaderHelper.java | 20 ++++++++++--------- .../java/org/dataloader/DataLoaderTest.java | 6 +++--- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index ee8d78b..f4e3915 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -714,7 +714,7 @@ private class DataLoaderMapEntrySubscriber implements Subscriber private final List callContexts; private final List> queuedFutures; private final Map callContextByKey; - private final Map> queuedFutureByKey; + private final Map>> queuedFuturesByKey; private final List clearCacheKeys = new ArrayList<>(); private final Map completedValuesByKey = new HashMap<>(); @@ -733,13 +733,13 @@ private DataLoaderMapEntrySubscriber( this.queuedFutures = queuedFutures; this.callContextByKey = new HashMap<>(); - this.queuedFutureByKey = new HashMap<>(); + this.queuedFuturesByKey = new HashMap<>(); for (int idx = 0; idx < queuedFutures.size(); idx++) { K key = keys.get(idx); Object callContext = callContexts.get(idx); CompletableFuture queuedFuture = queuedFutures.get(idx); callContextByKey.put(key, callContext); - queuedFutureByKey.put(key, queuedFuture); + queuedFuturesByKey.computeIfAbsent(key, k -> new ArrayList<>()).add(queuedFuture); } } @@ -756,20 +756,20 @@ public void onNext(Map.Entry entry) { V value = entry.getValue(); Object callContext = callContextByKey.get(key); - CompletableFuture future = queuedFutureByKey.get(key); + List> futures = queuedFuturesByKey.get(key); if (value instanceof Try) { // we allow the batch loader to return a Try so we can better represent a computation // that might have worked or not. Try tryValue = (Try) value; if (tryValue.isSuccess()) { - future.complete(tryValue.get()); + futures.forEach(f -> f.complete(tryValue.get())); } else { stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); - future.completeExceptionally(tryValue.getThrowable()); + futures.forEach(f -> f.completeExceptionally(tryValue.getThrowable())); clearCacheKeys.add(key); } } else { - future.complete(value); + futures.forEach(f -> f.complete(value)); } completedValuesByKey.put(key, value); @@ -801,9 +801,11 @@ public void onError(Throwable ex) { // Complete the futures for the remaining keys with the exception. for (int idx = 0; idx < queuedFutures.size(); idx++) { K key = keys.get(idx); - CompletableFuture future = queuedFutureByKey.get(key); + List> futures = queuedFuturesByKey.get(key); if (!completedValuesByKey.containsKey(key)) { - future.completeExceptionally(ex); + for (CompletableFuture future : futures) { + future.completeExceptionally(ex); + } // clear any cached view of this key because they all failed dataLoader.clear(key); } diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java index ea4b2b9..a87c77d 100644 --- a/src/test/java/org/dataloader/DataLoaderTest.java +++ b/src/test/java/org/dataloader/DataLoaderTest.java @@ -732,10 +732,10 @@ public void should_work_with_duplicate_keys_when_caching_disabled(TestDataLoader assertThat(future1.get(), equalTo("A")); assertThat(future2.get(), equalTo("B")); assertThat(future3.get(), equalTo("A")); - if (factory instanceof ListDataLoaderFactory || factory instanceof PublisherDataLoaderFactory) { - assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A")))); - } else { + if (factory instanceof MappedDataLoaderFactory) { assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); + } else { + assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A")))); } }