Skip to content

Commit 5fa9520

Browse files
committed
Reduce cluster update reroutes with async fetch
When using async fetch, we can end up with cluster updates and reroutes based on teh number of shards. While not disastrous we can optimize it, since a single reroute is enough to apply to all the async fetch results that arrived during that time.
1 parent dedbe52 commit 5fa9520

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

src/main/java/org/elasticsearch/gateway/local/LocalGatewayAllocator.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959

6060
import java.util.*;
6161
import java.util.concurrent.ConcurrentMap;
62+
import java.util.concurrent.atomic.AtomicBoolean;
6263

6364
/**
6465
*
@@ -172,7 +173,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
172173

173174
AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards> fetch = asyncFetchStarted.get(shard.shardId());
174175
if (fetch == null) {
175-
fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction, clusterService, allocationService);
176+
fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction);
176177
asyncFetchStarted.put(shard.shardId(), fetch);
177178
}
178179
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards> shardState = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
@@ -408,7 +409,7 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
408409

409410
AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch = asyncFetchStore.get(shard.shardId());
410411
if (fetch == null) {
411-
fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction, clusterService, allocationService);
412+
fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction);
412413
asyncFetchStore.put(shard.shardId(), fetch);
413414
}
414415
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
@@ -519,23 +520,24 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
519520
return changed;
520521
}
521522

522-
static class InternalAsyncFetch<T extends NodeOperationResponse> extends AsyncShardFetch<T> {
523+
private final AtomicBoolean rerouting = new AtomicBoolean();
523524

524-
private final ClusterService clusterService;
525-
private final AllocationService allocationService;
525+
class InternalAsyncFetch<T extends NodeOperationResponse> extends AsyncShardFetch<T> {
526526

527-
public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List<? extends NodesOperationResponse<T>, T> action,
528-
ClusterService clusterService, AllocationService allocationService) {
527+
public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List<? extends NodesOperationResponse<T>, T> action) {
529528
super(logger, type, shardId, action);
530-
this.clusterService = clusterService;
531-
this.allocationService = allocationService;
532529
}
533530

534531
@Override
535532
protected void reroute(ShardId shardId, String reason) {
536-
clusterService.submitStateUpdateTask("async_shard_fetch(" + type + ") " + shardId + ", reasons (" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() {
533+
if (rerouting.compareAndSet(false, true) == false) {
534+
logger.trace("{} already has pending reroute, ignoring {}", shardId, reason);
535+
return;
536+
}
537+
clusterService.submitStateUpdateTask("async_shard_fetch", Priority.HIGH, new ClusterStateUpdateTask() {
537538
@Override
538539
public ClusterState execute(ClusterState currentState) throws Exception {
540+
rerouting.set(false);
539541
if (currentState.nodes().masterNode() == null) {
540542
return currentState;
541543
}

0 commit comments

Comments
 (0)