@@ -113,6 +113,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
113
113
// flag indicating if a dirty operation has occurred since the last refresh
114
114
private volatile boolean dirty = false ;
115
115
116
+ private volatile boolean possibleMergeNeeded = false ;
117
+
116
118
private volatile int disableFlushCounter = 0 ;
117
119
118
120
// indexing searcher is initialized
@@ -228,6 +230,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
228
230
}
229
231
innerCreate (create , writer );
230
232
dirty = true ;
233
+ possibleMergeNeeded = true ;
231
234
if (create .refresh ()) {
232
235
refresh (new Refresh (false ));
233
236
}
@@ -318,6 +321,7 @@ private void innerCreate(Create create, IndexWriter writer) throws IOException {
318
321
319
322
innerIndex (index , writer );
320
323
dirty = true ;
324
+ possibleMergeNeeded = true ;
321
325
if (index .refresh ()) {
322
326
refresh (new Refresh (false ));
323
327
}
@@ -402,6 +406,7 @@ private void innerIndex(Index index, IndexWriter writer) throws IOException {
402
406
}
403
407
innerDelete (delete , writer );
404
408
dirty = true ;
409
+ possibleMergeNeeded = true ;
405
410
if (delete .refresh ()) {
406
411
refresh (new Refresh (false ));
407
412
}
@@ -485,6 +490,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
485
490
writer .deleteDocuments (delete .query ());
486
491
translog .add (new Translog .DeleteByQuery (delete ));
487
492
dirty = true ;
493
+ possibleMergeNeeded = true ;
488
494
} catch (IOException e ) {
489
495
throw new DeleteByQueryFailedEngineException (shardId , delete , e );
490
496
} finally {
@@ -521,6 +527,10 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
521
527
return dirty ;
522
528
}
523
529
530
+ @ Override public boolean possibleMergeNeeded () {
531
+ return this .possibleMergeNeeded ;
532
+ }
533
+
524
534
@ Override public void refresh (Refresh refresh ) throws EngineException {
525
535
if (indexWriter == null ) {
526
536
throw new EngineClosedException (shardId );
@@ -535,7 +545,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
535
545
throw new EngineClosedException (shardId );
536
546
}
537
547
try {
538
- if (dirty ) {
548
+ if (dirty || refresh . force () ) {
539
549
// we eagerly set dirty to false so we won't miss refresh requests
540
550
dirty = false ;
541
551
AcquirableResource <ReaderSearcherHolder > current = nrtResource ;
@@ -586,6 +596,10 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
586
596
if (disableFlushCounter > 0 ) {
587
597
throw new FlushNotAllowedEngineException (shardId , "Recovery is in progress, flush is not allowed" );
588
598
}
599
+ if (indexingSearcher .get () != null ) {
600
+ indexingSearcher .get ().release ();
601
+ indexingSearcher .set (null );
602
+ }
589
603
if (flush .full ()) {
590
604
// disable refreshing, not dirty
591
605
dirty = false ;
@@ -613,11 +627,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
613
627
versionMap .clear ();
614
628
dirty = true ; // force a refresh
615
629
// we need to do a refresh here so we sync versioning support
616
- refresh (new Refresh (true ));
617
- if (indexingSearcher .get () != null ) {
618
- indexingSearcher .get ().release ();
619
- indexingSearcher .set (null );
620
- }
630
+ refresh (new Refresh (true ).force (true ));
621
631
} finally {
622
632
rwl .writeLock ().unlock ();
623
633
flushing .set (false );
@@ -628,6 +638,30 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
628
638
// }
629
639
}
630
640
641
+ @ Override public void maybeMerge () throws EngineException {
642
+ if (!possibleMergeNeeded ) {
643
+ return ;
644
+ }
645
+ possibleMergeNeeded = false ;
646
+ rwl .readLock ().lock ();
647
+ try {
648
+ if (indexWriter == null ) {
649
+ throw new EngineClosedException (shardId );
650
+ }
651
+ if (indexWriter .getMergePolicy () instanceof EnableMergePolicy ) {
652
+ ((EnableMergePolicy ) indexWriter .getMergePolicy ()).enableMerge ();
653
+ }
654
+ indexWriter .maybeMerge ();
655
+ } catch (Exception e ) {
656
+ throw new OptimizeFailedEngineException (shardId , e );
657
+ } finally {
658
+ rwl .readLock ().unlock ();
659
+ if (indexWriter != null && indexWriter .getMergePolicy () instanceof EnableMergePolicy ) {
660
+ ((EnableMergePolicy ) indexWriter .getMergePolicy ()).disableMerge ();
661
+ }
662
+ }
663
+ }
664
+
631
665
@ Override public void optimize (Optimize optimize ) throws EngineException {
632
666
if (optimizeMutex .compareAndSet (false , true )) {
633
667
rwl .readLock ().lock ();
@@ -642,6 +676,7 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
642
676
indexWriter .expungeDeletes (false );
643
677
} else if (optimize .maxNumSegments () <= 0 ) {
644
678
indexWriter .maybeMerge ();
679
+ possibleMergeNeeded = false ;
645
680
} else {
646
681
indexWriter .optimize (optimize .maxNumSegments (), false );
647
682
}
@@ -659,14 +694,11 @@ private void innerDelete(Delete delete, IndexWriter writer) throws IOException {
659
694
if (optimize .waitForMerge ()) {
660
695
indexWriter .waitForMerges ();
661
696
}
662
- // once we did the optimization, we are "dirty" since we removed deletes potentially which
663
- // affects TermEnum
664
- dirty = true ;
665
697
if (optimize .flush ()) {
666
698
flush (new Flush ());
667
699
}
668
700
if (optimize .refresh ()) {
669
- refresh (new Refresh (false ));
701
+ refresh (new Refresh (false ). force ( true ) );
670
702
}
671
703
}
672
704
0 commit comments