|
59 | 59 |
|
60 | 60 | import java.util.*;
|
61 | 61 | import java.util.concurrent.ConcurrentMap;
|
| 62 | +import java.util.concurrent.atomic.AtomicBoolean; |
62 | 63 |
|
63 | 64 | /**
|
64 | 65 | *
|
@@ -172,7 +173,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
|
172 | 173 |
|
173 | 174 | AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards> fetch = asyncFetchStarted.get(shard.shardId());
|
174 | 175 | if (fetch == null) {
|
175 |
| - fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction, clusterService, allocationService); |
| 176 | + fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction); |
176 | 177 | asyncFetchStarted.put(shard.shardId(), fetch);
|
177 | 178 | }
|
178 | 179 | AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards> shardState = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
|
@@ -408,7 +409,7 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
|
408 | 409 |
|
409 | 410 | AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch = asyncFetchStore.get(shard.shardId());
|
410 | 411 | if (fetch == null) {
|
411 |
| - fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction, clusterService, allocationService); |
| 412 | + fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction); |
412 | 413 | asyncFetchStore.put(shard.shardId(), fetch);
|
413 | 414 | }
|
414 | 415 | AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
|
@@ -519,23 +520,24 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
|
519 | 520 | return changed;
|
520 | 521 | }
|
521 | 522 |
|
522 |
| - static class InternalAsyncFetch<T extends NodeOperationResponse> extends AsyncShardFetch<T> { |
| 523 | + private final AtomicBoolean rerouting = new AtomicBoolean(); |
523 | 524 |
|
524 |
| - private final ClusterService clusterService; |
525 |
| - private final AllocationService allocationService; |
| 525 | + class InternalAsyncFetch<T extends NodeOperationResponse> extends AsyncShardFetch<T> { |
526 | 526 |
|
527 |
| - public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List<? extends NodesOperationResponse<T>, T> action, |
528 |
| - ClusterService clusterService, AllocationService allocationService) { |
| 527 | + public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List<? extends NodesOperationResponse<T>, T> action) { |
529 | 528 | super(logger, type, shardId, action);
|
530 |
| - this.clusterService = clusterService; |
531 |
| - this.allocationService = allocationService; |
532 | 529 | }
|
533 | 530 |
|
534 | 531 | @Override
|
535 | 532 | protected void reroute(ShardId shardId, String reason) {
|
536 |
| - clusterService.submitStateUpdateTask("async_shard_fetch(" + type + ") " + shardId + ", reasons (" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() { |
| 533 | + if (rerouting.compareAndSet(false, true) == false) { |
| 534 | + logger.trace("{} already has pending reroute, ignoring {}", shardId, reason); |
| 535 | + return; |
| 536 | + } |
| 537 | + clusterService.submitStateUpdateTask("async_shard_fetch", Priority.HIGH, new ClusterStateUpdateTask() { |
537 | 538 | @Override
|
538 | 539 | public ClusterState execute(ClusterState currentState) throws Exception {
|
| 540 | + rerouting.set(false); |
539 | 541 | if (currentState.nodes().masterNode() == null) {
|
540 | 542 | return currentState;
|
541 | 543 | }
|
|
0 commit comments