Skip to content

Breaking change - adds a name to a DataLoader #193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 20 additions & 308 deletions src/main/java/org/dataloader/DataLoader.java

Large diffs are not rendered by default.

352 changes: 331 additions & 21 deletions src/main/java/org/dataloader/DataLoaderFactory.java

Large diffs are not rendered by default.

79 changes: 69 additions & 10 deletions src/main/java/org/dataloader/DataLoaderRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.dataloader.instrumentation.DataLoaderInstrumentation;
import org.dataloader.instrumentation.DataLoaderInstrumentationHelper;
import org.dataloader.stats.Statistics;
import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -16,6 +18,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import static org.dataloader.impl.Assertions.assertState;

/**
* This allows data loaders to be registered together into a single place, so
* they can be dispatched as one. It also allows you to retrieve data loaders by
Expand All @@ -35,9 +39,10 @@
* are the same object, then nothing is changed, since the same instrumentation code is being run.
*/
@PublicApi
@NullMarked
public class DataLoaderRegistry {
protected final Map<String, DataLoader<?, ?>> dataLoaders;
protected final DataLoaderInstrumentation instrumentation;
protected final @Nullable DataLoaderInstrumentation instrumentation;


public DataLoaderRegistry() {
Expand All @@ -48,27 +53,30 @@ private DataLoaderRegistry(Builder builder) {
this(builder.dataLoaders, builder.instrumentation);
}

protected DataLoaderRegistry(Map<String, DataLoader<?, ?>> dataLoaders, DataLoaderInstrumentation instrumentation) {
protected DataLoaderRegistry(Map<String, DataLoader<?, ?>> dataLoaders, @Nullable DataLoaderInstrumentation instrumentation) {
this.dataLoaders = instrumentDLs(dataLoaders, instrumentation);
this.instrumentation = instrumentation;
}

private Map<String, DataLoader<?, ?>> instrumentDLs(Map<String, DataLoader<?, ?>> incomingDataLoaders, DataLoaderInstrumentation registryInstrumentation) {
private Map<String, DataLoader<?, ?>> instrumentDLs(Map<String, DataLoader<?, ?>> incomingDataLoaders, @Nullable DataLoaderInstrumentation registryInstrumentation) {
Map<String, DataLoader<?, ?>> dataLoaders = new ConcurrentHashMap<>(incomingDataLoaders);
if (registryInstrumentation != null) {
dataLoaders.replaceAll((k, existingDL) -> instrumentDL(registryInstrumentation, existingDL));
dataLoaders.replaceAll((k, existingDL) -> nameAndInstrumentDL(k, registryInstrumentation, existingDL));
}
return dataLoaders;
}

/**
* Can be called to tweak a {@link DataLoader} so that it has the registry {@link DataLoaderInstrumentation} added as the first one.
*
* @param key the key used to register the data loader
* @param registryInstrumentation the common registry {@link DataLoaderInstrumentation}
* @param existingDL the existing data loader
* @return a new {@link DataLoader} or the same one if there is nothing to change
*/
private static DataLoader<?, ?> instrumentDL(DataLoaderInstrumentation registryInstrumentation, DataLoader<?, ?> existingDL) {
private static DataLoader<?, ?> nameAndInstrumentDL(String key, @Nullable DataLoaderInstrumentation registryInstrumentation, DataLoader<?, ?> existingDL) {
existingDL = checkAndSetName(key, existingDL);

if (registryInstrumentation == null) {
return existingDL;
}
Expand Down Expand Up @@ -97,6 +105,15 @@ protected DataLoaderRegistry(Map<String, DataLoader<?, ?>> dataLoaders, DataLoad
}
}

private static DataLoader<?, ?> checkAndSetName(String key, DataLoader<?, ?> dataLoader) {
if (dataLoader.getName() == null) {
return dataLoader.transform(b -> b.name(key));
}
assertState(key.equals(dataLoader.getName()),
() -> String.format("Data loader name '%s' is not the same as registered key '%s'", dataLoader.getName(), key));
return dataLoader;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It validates that a DL named "Foo" has been registered as the key "Foo" because dlr.register("Bar", fooDL) makes not sense


private static DataLoader<?, ?> mkInstrumentedDataLoader(DataLoader<?, ?> existingDL, DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation) {
return existingDL.transform(builder -> builder.options(setInInstrumentation(options, newInstrumentation)));
}
Expand All @@ -108,28 +125,70 @@ private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options,
/**
* @return the {@link DataLoaderInstrumentation} associated with this registry which can be null
*/
public DataLoaderInstrumentation getInstrumentation() {
public @Nullable DataLoaderInstrumentation getInstrumentation() {
return instrumentation;
}

/**
* This will register a new dataloader
* This will register a new named dataloader. The {@link DataLoader} must be named something and
* cannot have a null name.
* <p>
* Note: Registration can change the data loader instance since it might get an {@link DataLoaderInstrumentation} applied to
* it. So the {@link DataLoader} instance your read via {@link DataLoaderRegistry#getDataLoader(String)} might not be the same
* object that was registered.
*
* @param dataLoader the named data loader to register
* @return this registry
*/
public DataLoaderRegistry register(DataLoader<?, ?> dataLoader) {
String name = dataLoader.getName();
assertState(name != null, () -> "The DataLoader must have a non null name");
dataLoaders.put(name, nameAndInstrumentDL(name, instrumentation, dataLoader));
return this;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New - we can register "named" DL now


/**
* This will register a new {@link DataLoader}
* <p>
* Note: Registration can change the data loader instance since it might get an {@link DataLoaderInstrumentation} applied to
* it. So the {@link DataLoader} instance your read via {@link DataLoaderRegistry#getDataLoader(String)} might not be the same
* object that was registered.
*
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
* @return this registry
*/
public DataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since i can give a name to a dataloader and register it with a different name, or register the same DL multiple times with a different name (different options attached), what about providing a pass through cache key into the instrumentation i.e.

DataLoader loader, String dataLoaderKey in the instrumentation rather than attaching an explicit name to the loader itself. This way register is the SoT. I feel if we allow both we might introduce some slight oddities of dataLoaderKey != dataLoaderName else you fail at runtime with Data loader name '%s' is not the same as registered key '%s' If I attempt to do Factory create with name then register with a different name later.

Don't allow creation with a name, only allow association at registry time and pass through to instrumentation since keys can be only a registry thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave some thought to this but there is no real link between a DataLoaderRegistry and a DataLoader.

So the instrumentation for a DL can be run without it coming from a DLR

So while you can register them "wrong" - so be it - it validates

dataLoaders.put(key, instrumentDL(instrumentation, dataLoader));
dataLoaders.put(key, nameAndInstrumentDL(key, instrumentation, dataLoader));
return this;
}

/**
* This will register a new {@link DataLoader} and then return it.
* <p>
* Note: Registration can change the data loader instance since it might get an {@link DataLoaderInstrumentation} applied to
* it. So the {@link DataLoader} instance your read via {@link DataLoaderRegistry#getDataLoader(String)} might not be the same
* object that was registered.
*
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
* @return the data loader instance that was registered
*/
public <K, V> DataLoader<K, V> registerAndGet(String key, DataLoader<?, ?> dataLoader) {
dataLoaders.put(key, nameAndInstrumentDL(key, instrumentation, dataLoader));
return getDataLoader(key);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New - gives back the DL immediately - it may have changed

}

/**
* Computes a data loader if absent or return it if it was
* already registered at that key.
* <p>
* Note: The entire method invocation is performed atomically,
* so the function is applied at most once per key.
* <p>
* Note: Registration can change the data loader instance since it might get an {@link DataLoaderInstrumentation} applied to
* it. So the {@link DataLoader} instance your read via {@link DataLoaderRegistry#getDataLoader(String)} might not be the same
* object that was registered.
*
* @param key the key of the data loader
* @param mappingFunction the function to compute a data loader
Expand All @@ -142,7 +201,7 @@ public <K, V> DataLoader<K, V> computeIfAbsent(final String key,
final Function<String, DataLoader<?, ?>> mappingFunction) {
return (DataLoader<K, V>) dataLoaders.computeIfAbsent(key, (k) -> {
DataLoader<?, ?> dl = mappingFunction.apply(k);
return instrumentDL(instrumentation, dl);
return nameAndInstrumentDL(key, instrumentation, dl);
});
}

Expand Down Expand Up @@ -262,7 +321,7 @@ public static Builder newRegistry() {
public static class Builder {

private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();
private DataLoaderInstrumentation instrumentation;
private @Nullable DataLoaderInstrumentation instrumentation;

/**
* This will register a new dataloader
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/dataloader/DelegatingDataLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
* CompletableFuture<String> cf = super.load(key, keyContext);
* return cf.thenApply(v -> "|" + v + "|");
* }
*};
*}</pre>
* };
* }</pre>
*
* @param <K> type parameter indicating the type of the data load keys
* @param <V> type parameter indicating the type of the data that is returned
Expand Down Expand Up @@ -58,7 +58,7 @@ public static <K, V> DataLoader<K, V> unwrap(DataLoader<K, V> dataLoader) {
}

public DelegatingDataLoader(DataLoader<K, V> delegate) {
super(delegate.getBatchLoadFunction(), delegate.getOptions());
super(delegate.getName(), delegate.getBatchLoadFunction(), delegate.getOptions());
this.delegate = delegate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import org.dataloader.DataLoader;
import org.dataloader.DataLoaderRegistry;
import org.dataloader.annotations.ExperimentalApi;
import org.dataloader.impl.Assertions;
import org.dataloader.instrumentation.DataLoaderInstrumentation;
import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;

import java.time.Duration;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -54,6 +57,7 @@
* This code is currently marked as {@link ExperimentalApi}
*/
@ExperimentalApi
@NullMarked
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {

private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>();
Expand All @@ -66,7 +70,7 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A

private ScheduledDataLoaderRegistry(Builder builder) {
super(builder.dataLoaders, builder.instrumentation);
this.scheduledExecutorService = builder.scheduledExecutorService;
this.scheduledExecutorService = Assertions.nonNull(builder.scheduledExecutorService);
this.defaultExecutorUsed = builder.defaultExecutorUsed;
this.schedule = builder.schedule;
this.tickerMode = builder.tickerMode;
Expand Down Expand Up @@ -112,7 +116,6 @@ public boolean isTickerMode() {
* and return a new combined registry
*
* @param registry the registry to combine into this registry
*
* @return a new combined registry
*/
public ScheduledDataLoaderRegistry combine(DataLoaderRegistry registry) {
Expand All @@ -128,7 +131,6 @@ public ScheduledDataLoaderRegistry combine(DataLoaderRegistry registry) {
* This will unregister a new dataloader
*
* @param key the key of the data loader to unregister
*
* @return this registry
*/
public ScheduledDataLoaderRegistry unregister(String key) {
Expand Down Expand Up @@ -161,7 +163,6 @@ public DispatchPredicate getDispatchPredicate() {
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
* @param dispatchPredicate the dispatch predicate to associate with this data loader
*
* @return this registry
*/
public ScheduledDataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
Expand Down Expand Up @@ -222,7 +223,6 @@ public void rescheduleNow() {
*
* @param dataLoaderKey the key in the dataloader map
* @param dataLoader the dataloader
*
* @return true if it should dispatch
*/
private boolean shouldDispatch(String dataLoaderKey, DataLoader<?, ?> dataLoader) {
Expand Down Expand Up @@ -267,19 +267,18 @@ public static class Builder {
private final Map<String, DataLoader<?, ?>> dataLoaders = new LinkedHashMap<>();
private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new LinkedHashMap<>();
private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS;
private ScheduledExecutorService scheduledExecutorService;
private @Nullable ScheduledExecutorService scheduledExecutorService;
private boolean defaultExecutorUsed = false;
private Duration schedule = Duration.ofMillis(10);
private boolean tickerMode = false;
private DataLoaderInstrumentation instrumentation;
private @Nullable DataLoaderInstrumentation instrumentation;


/**
* If you provide a {@link ScheduledExecutorService} then it will NOT be shutdown when
* {@link ScheduledDataLoaderRegistry#close()} is called. This is left to the code that made this setup code
*
* @param executorService the executor service to run the ticker on
*
* @return this builder for a fluent pattern
*/
public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
Expand All @@ -297,7 +296,6 @@ public Builder schedule(Duration schedule) {
*
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
*
* @return this builder for a fluent pattern
*/
public Builder register(String key, DataLoader<?, ?> dataLoader) {
Expand All @@ -312,7 +310,6 @@ public Builder register(String key, DataLoader<?, ?> dataLoader) {
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
* @param dispatchPredicate the dispatch predicate
*
* @return this builder for a fluent pattern
*/
public Builder register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
Expand All @@ -326,7 +323,6 @@ public Builder register(String key, DataLoader<?, ?> dataLoader, DispatchPredica
* from a previous {@link DataLoaderRegistry}
*
* @param otherRegistry the previous {@link DataLoaderRegistry}
*
* @return this builder for a fluent pattern
*/
public Builder registerAll(DataLoaderRegistry otherRegistry) {
Expand All @@ -343,7 +339,6 @@ public Builder registerAll(DataLoaderRegistry otherRegistry) {
* whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched.
*
* @param dispatchPredicate the predicate
*
* @return this builder for a fluent pattern
*/
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
Expand All @@ -357,7 +352,6 @@ public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
* to dispatchAll.
*
* @param tickerMode true or false
*
* @return this builder for a fluent pattern
*/
public Builder tickerMode(boolean tickerMode) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/dataloader/ClockDataLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public ClockDataLoader(Object batchLoadFunction, Clock clock) {
}

public ClockDataLoader(Object batchLoadFunction, DataLoaderOptions options, Clock clock) {
super(batchLoadFunction, options, clock);
super(null, batchLoadFunction, options, clock);
}

}
51 changes: 51 additions & 0 deletions src/test/java/org/dataloader/DataLoaderFactoryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.dataloader;

import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertNotNull;

class DataLoaderFactoryTest {

@Test
void can_create_via_builder() {
BatchLoaderWithContext<String, String> loader = (keys, environment) -> CompletableFuture.completedFuture(keys);
DataLoaderOptions options = DataLoaderOptions.newOptionsBuilder().setBatchingEnabled(true).build();

DataLoader<String, String> dl = DataLoaderFactory.<String, String>builder()
.name("x").batchLoader(loader).options(options).build();

assertNotNull(dl.getName());
assertThat(dl.getName(), equalTo("x"));
assertThat(dl.getBatchLoadFunction(), equalTo(loader));
assertThat(dl.getOptions(), equalTo(options));

BatchLoaderWithContext<String, Try<String>> loaderTry = (keys, environment)
-> CompletableFuture.completedFuture(keys.stream().map(Try::succeeded).collect(Collectors.toList()));

DataLoader<String, Try<String>> dlTry = DataLoaderFactory.<String, Try<String>>builder()
.name("try").batchLoader(loaderTry).options(options).build();

assertNotNull(dlTry.getName());
assertThat(dlTry.getName(), equalTo("try"));
assertThat(dlTry.getBatchLoadFunction(), equalTo(loaderTry));
assertThat(dlTry.getOptions(), equalTo(options));

MappedBatchLoader<String, Try<String>> mappedLoaderTry = (keys)
-> CompletableFuture.completedFuture(
keys.stream().collect(Collectors.toMap(k -> k, Try::succeeded))
);

DataLoader<String, Try<String>> dlTry2 = DataLoaderFactory.<String, Try<String>>builder()
.name("try2").mappedBatchLoader(mappedLoaderTry).options(options).build();

assertNotNull(dlTry2.getName());
assertThat(dlTry2.getName(), equalTo("try2"));
assertThat(dlTry2.getBatchLoadFunction(), equalTo(mappedLoaderTry));
assertThat(dlTry2.getOptions(), equalTo(options));
}
}
Loading