Skip to content

Commit de2203d

Browse files
committed
improve async merge process, don't spawn a thread unless a merge is really needed, and add an optimized "maybeMerge" operation
1 parent b75c689 commit de2203d

File tree

3 files changed

+89
-30
lines changed

3 files changed

+89
-30
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
6969
*/
7070
boolean refreshNeeded();
7171

72+
/**
73+
* Returns <tt>true</tt> if a possible merge is really needed.
74+
*/
75+
boolean possibleMergeNeeded();
76+
77+
void maybeMerge() throws EngineException;
78+
7279
/**
7380
* Refreshes the engine for new search operations to reflect the latest
7481
* changes. Pass <tt>true</tt> if the refresh operation should include
@@ -131,10 +138,21 @@ static class Refresh {
131138

132139
private final boolean waitForOperations;
133140

141+
private boolean force = false;
142+
134143
public Refresh(boolean waitForOperations) {
135144
this.waitForOperations = waitForOperations;
136145
}
137146

147+
public Refresh force(boolean force) {
148+
this.force = force;
149+
return this;
150+
}
151+
152+
public boolean force() {
153+
return this.force;
154+
}
155+
138156
public boolean waitForOperations() {
139157
return waitForOperations;
140158
}

modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
113113
// flag indicating if a dirty operation has occurred since the last refresh
114114
private volatile boolean dirty = false;
115115

116+
private volatile boolean possibleMergeNeeded = false;
117+
116118
private volatile int disableFlushCounter = 0;
117119

118120
// indexing searcher is initialized
@@ -228,6 +230,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
228230
}
229231
innerCreate(create, writer);
230232
dirty = true;
233+
possibleMergeNeeded = true;
231234
if (create.refresh()) {
232235
refresh(new Refresh(false));
233236
}
@@ -318,6 +321,7 @@ private void innerCreate(Create create, IndexWriter writer) throws IOException {
318321

319322
innerIndex(index, writer);
320323
dirty = true;
324+
possibleMergeNeeded = true;
321325
if (index.refresh()) {
322326
refresh(new Refresh(false));
323327
}
@@ -402,6 +406,7 @@ private void innerIndex(Index index, IndexWriter writer) throws IOException {
402406
}
403407
innerDelete(delete, writer);
404408
dirty = true;
409+
possibleMergeNeeded = true;
405410
if (delete.refresh()) {
406411
refresh(new Refresh(false));
407412
}
@@ -485,6 +490,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
485490
writer.deleteDocuments(delete.query());
486491
translog.add(new Translog.DeleteByQuery(delete));
487492
dirty = true;
493+
possibleMergeNeeded = true;
488494
} catch (IOException e) {
489495
throw new DeleteByQueryFailedEngineException(shardId, delete, e);
490496
} finally {
@@ -521,6 +527,10 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
521527
return dirty;
522528
}
523529

530+
@Override public boolean possibleMergeNeeded() {
531+
return this.possibleMergeNeeded;
532+
}
533+
524534
@Override public void refresh(Refresh refresh) throws EngineException {
525535
if (indexWriter == null) {
526536
throw new EngineClosedException(shardId);
@@ -535,7 +545,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
535545
throw new EngineClosedException(shardId);
536546
}
537547
try {
538-
if (dirty) {
548+
if (dirty || refresh.force()) {
539549
// we eagerly set dirty to false so we won't miss refresh requests
540550
dirty = false;
541551
AcquirableResource<ReaderSearcherHolder> current = nrtResource;
@@ -586,6 +596,10 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
586596
if (disableFlushCounter > 0) {
587597
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
588598
}
599+
if (indexingSearcher.get() != null) {
600+
indexingSearcher.get().release();
601+
indexingSearcher.set(null);
602+
}
589603
if (flush.full()) {
590604
// disable refreshing, not dirty
591605
dirty = false;
@@ -613,11 +627,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
613627
versionMap.clear();
614628
dirty = true; // force a refresh
615629
// we need to do a refresh here so we sync versioning support
616-
refresh(new Refresh(true));
617-
if (indexingSearcher.get() != null) {
618-
indexingSearcher.get().release();
619-
indexingSearcher.set(null);
620-
}
630+
refresh(new Refresh(true).force(true));
621631
} finally {
622632
rwl.writeLock().unlock();
623633
flushing.set(false);
@@ -628,6 +638,30 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
628638
// }
629639
}
630640

641+
@Override public void maybeMerge() throws EngineException {
642+
if (!possibleMergeNeeded) {
643+
return;
644+
}
645+
possibleMergeNeeded = false;
646+
rwl.readLock().lock();
647+
try {
648+
if (indexWriter == null) {
649+
throw new EngineClosedException(shardId);
650+
}
651+
if (indexWriter.getMergePolicy() instanceof EnableMergePolicy) {
652+
((EnableMergePolicy) indexWriter.getMergePolicy()).enableMerge();
653+
}
654+
indexWriter.maybeMerge();
655+
} catch (Exception e) {
656+
throw new OptimizeFailedEngineException(shardId, e);
657+
} finally {
658+
rwl.readLock().unlock();
659+
if (indexWriter != null && indexWriter.getMergePolicy() instanceof EnableMergePolicy) {
660+
((EnableMergePolicy) indexWriter.getMergePolicy()).disableMerge();
661+
}
662+
}
663+
}
664+
631665
@Override public void optimize(Optimize optimize) throws EngineException {
632666
if (optimizeMutex.compareAndSet(false, true)) {
633667
rwl.readLock().lock();
@@ -642,6 +676,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
642676
indexWriter.expungeDeletes(false);
643677
} else if (optimize.maxNumSegments() <= 0) {
644678
indexWriter.maybeMerge();
679+
possibleMergeNeeded = false;
645680
} else {
646681
indexWriter.optimize(optimize.maxNumSegments(), false);
647682
}
@@ -659,14 +694,11 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
659694
if (optimize.waitForMerge()) {
660695
indexWriter.waitForMerges();
661696
}
662-
// once we did the optimization, we are "dirty" since we removed deletes potentially which
663-
// affects TermEnum
664-
dirty = true;
665697
if (optimize.flush()) {
666698
flush(new Flush());
667699
}
668700
if (optimize.refresh()) {
669-
refresh(new Refresh(false));
701+
refresh(new Refresh(false).force(true));
670702
}
671703
}
672704

modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ private void startScheduledTasksIfNeeded() {
554554
// so, make sure we periodically call it, this need to be a small enough value so mergine will actually
555555
// happen and reduce the number of segments
556556
if (mergeInterval.millis() > 0) {
557-
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.MERGE, new EngineMerger());
557+
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger());
558558
logger.debug("scheduling optimizer / merger every {}", mergeInterval);
559559
} else {
560560
logger.debug("scheduled optimizer / merger disabled");
@@ -608,27 +608,36 @@ private class EngineRefresher implements Runnable {
608608

609609
private class EngineMerger implements Runnable {
610610
@Override public void run() {
611-
try {
612-
// -1 means maybe merge
613-
engine.optimize(new Engine.Optimize().maxNumSegments(-1).waitForMerge(false).flush(false).refresh(false));
614-
} catch (EngineClosedException e) {
615-
// we are being closed, ignore
616-
} catch (OptimizeFailedEngineException e) {
617-
if (e.getCause() instanceof InterruptedException) {
618-
// ignore, we are being shutdown
619-
} else if (e.getCause() instanceof ClosedByInterruptException) {
620-
// ignore, we are being shutdown
621-
} else if (e.getCause() instanceof ThreadInterruptedException) {
622-
// ignore, we are being shutdown
623-
} else {
624-
logger.warn("Failed to perform scheduled engine optimize/merge", e);
611+
if (!engine().possibleMergeNeeded()) {
612+
if (state != IndexShardState.CLOSED) {
613+
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this);
625614
}
626-
} catch (Exception e) {
627-
logger.warn("Failed to perform scheduled engine optimize/merge", e);
628-
}
629-
if (state != IndexShardState.CLOSED) {
630-
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.MERGE, this);
615+
return;
631616
}
617+
threadPool.executor(ThreadPool.Names.MERGE).execute(new Runnable() {
618+
@Override public void run() {
619+
try {
620+
engine.maybeMerge();
621+
} catch (EngineClosedException e) {
622+
// we are being closed, ignore
623+
} catch (OptimizeFailedEngineException e) {
624+
if (e.getCause() instanceof InterruptedException) {
625+
// ignore, we are being shutdown
626+
} else if (e.getCause() instanceof ClosedByInterruptException) {
627+
// ignore, we are being shutdown
628+
} else if (e.getCause() instanceof ThreadInterruptedException) {
629+
// ignore, we are being shutdown
630+
} else {
631+
logger.warn("Failed to perform scheduled engine optimize/merge", e);
632+
}
633+
} catch (Exception e) {
634+
logger.warn("Failed to perform scheduled engine optimize/merge", e);
635+
}
636+
if (state != IndexShardState.CLOSED) {
637+
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, EngineMerger.this);
638+
}
639+
}
640+
});
632641
}
633642
}
634643

0 commit comments

Comments
 (0)