Skip to content

Commit 3df56db

Browse files
committed
Made the LinkedHashMap loader queue thread safe
1 parent b11316e commit 3df56db

File tree

2 files changed

+48
-41
lines changed

2 files changed

+48
-41
lines changed

src/main/java/org/dataloader/DataLoader.java

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options
7676
this.loaderOptions = options == null ? new DataLoaderOptions() : options;
7777
this.futureCache = determineCacheMap(loaderOptions);
7878
// order of keys matter in data loader
79-
this.loaderQueue = Collections.synchronizedMap(new LinkedHashMap<>());
79+
this.loaderQueue = new LinkedHashMap<>();
8080
}
8181

8282
@SuppressWarnings("unchecked")
@@ -103,7 +103,9 @@ public CompletableFuture<V> load(K key) {
103103

104104
CompletableFuture<V> future = new CompletableFuture<>();
105105
if (loaderOptions.batchingEnabled()) {
106-
loaderQueue.put(key, future);
106+
synchronized (loaderQueue) {
107+
loaderQueue.put(key, future);
108+
}
107109
} else {
108110
PromisedValues<V> combinedFutures = batchLoadFunction.load(Collections.singletonList(key));
109111
if (combinedFutures.succeeded()) {
@@ -131,9 +133,11 @@ public CompletableFuture<V> load(K key) {
131133
* @return the composite future of the list of values
132134
*/
133135
public PromisedValues<V> loadMany(List<K> keys) {
134-
return PromisedValues.allOf(keys.stream()
135-
.map(this::load)
136-
.collect(Collectors.toList()));
136+
synchronized (loaderQueue) {
137+
return PromisedValues.allOf(keys.stream()
138+
.map(this::load)
139+
.collect(Collectors.toList()));
140+
}
137141
}
138142

139143
/**
@@ -144,45 +148,48 @@ public PromisedValues<V> loadMany(List<K> keys) {
144148
* @return the composite future of the queued load requests
145149
*/
146150
public PromisedValues<V> dispatch() {
151+
//
152+
// we copy the pre-loaded set of futures ready for dispatch
153+
final List<K> keys = new ArrayList<>();
154+
final List<CompletableFuture<V>> futureValues = new ArrayList<>();
147155
synchronized (loaderQueue) {
148-
if (!loaderOptions.batchingEnabled() || loaderQueue.size() == 0) {
149-
return PromisedValues.allOf(Collections.emptyList());
150-
}
151-
//
152-
// order of keys -> values matter in data loader hence the use of linked hash map
153-
//
154-
// See https://github.com/facebook/dataloader/blob/master/README.md for more details
155-
//
156-
List<K> keys = new ArrayList<>(loaderQueue.keySet());
157-
List<CompletableFuture<V>> futureList = keys.stream()
158-
.map(loaderQueue::get)
159-
.collect(Collectors.toList());
156+
loaderQueue.forEach((key, future) -> {
157+
keys.add(key);
158+
futureValues.add(future);
159+
});
160+
loaderQueue.clear();
161+
}
162+
if (!loaderOptions.batchingEnabled() || keys.size() == 0) {
163+
return PromisedValues.allOf(Collections.emptyList());
164+
}
165+
//
166+
// order of keys -> values matter in data loader hence the use of linked hash map
167+
//
168+
// See https://github.com/facebook/dataloader/blob/master/README.md for more details
169+
//
160170

161-
PromisedValues<V> batchOfPromisedValues = batchLoadFunction.load(keys);
171+
PromisedValues<V> batchOfPromisedValues = batchLoadFunction.load(keys);
162172

163-
assertState(keys.size() == batchOfPromisedValues.size(), "The size of the promised values MUST be the same size as the key list");
173+
assertState(keys.size() == batchOfPromisedValues.size(), "The size of the promised values MUST be the same size as the key list");
164174

165-
//
166-
// when the promised list of values completes, we transfer the values into
167-
// the previously cached future objects that client already has been given
168-
// via calls to load("foo") and loadMany("foo")
169-
//
170-
batchOfPromisedValues.thenAccept(promisedValues -> {
171-
for (int idx = 0; idx < futureList.size(); idx++) {
172-
CompletableFuture<V> future = futureList.get(idx);
173-
if (promisedValues.succeeded(idx)) {
174-
V value = promisedValues.get(idx);
175-
future.complete(value);
176-
} else {
177-
Throwable cause = promisedValues.cause(idx);
178-
future.completeExceptionally(cause);
179-
}
175+
//
176+
// when the promised list of values completes, we transfer the values into
177+
// the previously cached future objects that the client already has been given
178+
// via calls to load("foo") and loadMany("foo")
179+
//
180+
batchOfPromisedValues.thenAccept(promisedValues -> {
181+
for (int idx = 0; idx < futureValues.size(); idx++) {
182+
CompletableFuture<V> future = futureValues.get(idx);
183+
if (promisedValues.succeeded(idx)) {
184+
V value = promisedValues.get(idx);
185+
future.complete(value);
186+
} else {
187+
Throwable cause = promisedValues.cause(idx);
188+
future.completeExceptionally(cause);
180189
}
181-
});
182-
183-
loaderQueue.clear();
184-
return batchOfPromisedValues;
185-
}
190+
}
191+
});
192+
return batchOfPromisedValues;
186193
}
187194

188195
/**

src/test/java/org/dataloader/DataLoaderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.dataloader;
1818

19-
import org.dataloader.impl.CompletableFutureKit;
2019
import org.junit.Before;
2120
import org.junit.Test;
2221

@@ -38,7 +37,8 @@
3837
import static java.util.Collections.emptyList;
3938
import static java.util.Collections.singletonList;
4039
import static org.awaitility.Awaitility.await;
41-
import static org.dataloader.impl.CompletableFutureKit.*;
40+
import static org.dataloader.impl.CompletableFutureKit.cause;
41+
import static org.dataloader.impl.CompletableFutureKit.failedFuture;
4242
import static org.hamcrest.Matchers.empty;
4343
import static org.hamcrest.Matchers.equalTo;
4444
import static org.hamcrest.Matchers.instanceOf;

0 commit comments

Comments
 (0)