From b8582b93f8509c846693215b6a23b15272b9d72c Mon Sep 17 00:00:00 2001 From: Oli Gillespie Date: Thu, 21 Nov 2024 16:20:48 +0000 Subject: [PATCH 1/5] Refactor MpscLinkedQueue.poll Handle empty queue first, then share most of the implementation for non-empty scenarios (spin and non-spin). --- .../internal/queue/MpscLinkedQueue.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java index d735ff43c0..3a0c93449f 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java +++ b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java @@ -87,23 +87,19 @@ public boolean offer(final T e) { public T poll() { LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright LinkedQueueNode nextNode = currConsumerNode.lvNext(); - if (nextNode != null) { - // we have to null out the value because we are going to hang on to the node - final T nextValue = nextNode.getAndNullValue(); - spConsumerNode(nextNode); - return nextValue; + final T nextValue; + if (nextNode == null && currConsumerNode == lvProducerNode()) { + return null; } - else if (currConsumerNode != lvProducerNode()) { + if (nextNode == null) { // spin, we are no longer wait free while ((nextNode = currConsumerNode.lvNext()) == null) { } // NOPMD // got the next node... - - // we have to null out the value because we are going to hang on to the node - final T nextValue = nextNode.getAndNullValue(); - spConsumerNode(nextNode); - return nextValue; } - return null; + // we have to null out the value because we are going to hang on to the node + nextValue = nextNode.getAndNullValue(); + spConsumerNode(nextNode); + return nextValue; } @Override From 4c6c6fabd2ef04faae27964dcaf78bcd23c41a74 Mon Sep 17 00:00:00 2001 From: Oli Gillespie Date: Thu, 21 Nov 2024 16:31:42 +0000 Subject: [PATCH 2/5] Unlink dead nodes in MpscLinkedQueue Similar to https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java#L120, null out the next pointer in the discarded consumer node when polling from the queue. If not, we leave behind a (potentially long) chain of connected garbage nodes. If we're unlucky (for example one of the early nodes is promoted to old generation, triggering nepotism), this can cause GC issues as now we have a long linked list which must be marked by young collections. Reproducer: ``` import io.reactivex.rxjava3.internal.queue.MpscLinkedQueue; public class MpscLinkedQueueGC { public static void main(String[] args) { MpscLinkedQueue queue = new MpscLinkedQueue<>(); for (int i = 0; i < 10; i++) System.gc(); // tenure consumer node while (true) { queue.offer(123); queue.poll(); } } } ``` ``` Before fix: $ java -Xlog:gc -Xmx1G -cp build/classes/java/main MpscLinkedQueueGC.java ... [1.261s] GC(20) Pause Young (Normal) (G1 Preventive Collection) 115M->115M(204M) 209.335ms [1.385s] GC(23) Pause Young (Normal) (G1 Evacuation Pause) 148M->149M(204M) 31.491ms [1.417s] GC(24) Pause Young (Normal) (G1 Evacuation Pause) 157M->158M(204M) 19.333ms [1.453s] GC(25) Pause Young (Normal) (G1 Evacuation Pause) 166M->167M(599M) 22.678ms [1.966s] GC(26) Pause Young (Normal) (G1 Evacuation Pause) 249M->249M(497M) 305.238ms ... After fix: $ java -Xlog:gc -Xmx1G -cp build/classes/java/main MpscLinkedQueueGC.java ... [1.169s] GC(14) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.755ms [1.558s] GC(15) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.689ms [1.948s] GC(16) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.800ms [2.337s] GC(17) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.714ms ... ``` --- .../io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java index 3a0c93449f..008159fcbc 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java +++ b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java @@ -99,6 +99,8 @@ public T poll() { // we have to null out the value because we are going to hang on to the node nextValue = nextNode.getAndNullValue(); spConsumerNode(nextNode); + // unlink previous consumer to help gc + currConsumerNode.soNext(null); return nextValue; } From 43a7cf764206abb23604d7dbfa37025408cecf0a Mon Sep 17 00:00:00 2001 From: Oli Gillespie Date: Thu, 21 Nov 2024 17:28:08 +0000 Subject: [PATCH 3/5] Revert "Unlink dead nodes in MpscLinkedQueue" This reverts commit 4c6c6fabd2ef04faae27964dcaf78bcd23c41a74. --- .../io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java index 008159fcbc..3a0c93449f 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java +++ b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java @@ -99,8 +99,6 @@ public T poll() { // we have to null out the value because we are going to hang on to the node nextValue = nextNode.getAndNullValue(); spConsumerNode(nextNode); - // unlink previous consumer to help gc - currConsumerNode.soNext(null); return nextValue; } From d51d3ffa06b91642de5ac0e4195487b0fd815f93 Mon Sep 17 00:00:00 2001 From: Oli Gillespie Date: Thu, 21 Nov 2024 17:28:19 +0000 Subject: [PATCH 4/5] Revert "Refactor MpscLinkedQueue.poll" This reverts commit b8582b93f8509c846693215b6a23b15272b9d72c. --- .../internal/queue/MpscLinkedQueue.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java index 3a0c93449f..d735ff43c0 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java +++ b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java @@ -87,19 +87,23 @@ public boolean offer(final T e) { public T poll() { LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright LinkedQueueNode nextNode = currConsumerNode.lvNext(); - final T nextValue; - if (nextNode == null && currConsumerNode == lvProducerNode()) { - return null; + if (nextNode != null) { + // we have to null out the value because we are going to hang on to the node + final T nextValue = nextNode.getAndNullValue(); + spConsumerNode(nextNode); + return nextValue; } - if (nextNode == null) { + else if (currConsumerNode != lvProducerNode()) { // spin, we are no longer wait free while ((nextNode = currConsumerNode.lvNext()) == null) { } // NOPMD // got the next node... + + // we have to null out the value because we are going to hang on to the node + final T nextValue = nextNode.getAndNullValue(); + spConsumerNode(nextNode); + return nextValue; } - // we have to null out the value because we are going to hang on to the node - nextValue = nextNode.getAndNullValue(); - spConsumerNode(nextNode); - return nextValue; + return null; } @Override From 898c19d2febb57f46eb2d16889e34547cb114d87 Mon Sep 17 00:00:00 2001 From: Oli Gillespie Date: Thu, 21 Nov 2024 17:29:19 +0000 Subject: [PATCH 5/5] Unlink dead nodes in MpscLinkedQueue Similar to https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java#L120, null out the next pointer in the discarded consumer node when polling from the queue. If not, we leave behind a (potentially long) chain of connected garbage nodes. If we're unlucky (for example one of the early nodes is promoted to old generation, triggering nepotism), this can cause GC issues as now we have a long linked list which must be marked by young collections. Reproducer: ``` import io.reactivex.rxjava3.internal.queue.MpscLinkedQueue; public class MpscLinkedQueueGC { public static void main(String[] args) { MpscLinkedQueue queue = new MpscLinkedQueue<>(); for (int i = 0; i < 10; i++) System.gc(); // tenure consumer node while (true) { queue.offer(123); queue.poll(); } } } ``` ``` Before fix: $ java -Xlog:gc -Xmx1G -cp build/classes/java/main MpscLinkedQueueGC.java ... [1.261s] GC(20) Pause Young (Normal) (G1 Preventive Collection) 115M->115M(204M) 209.335ms [1.385s] GC(23) Pause Young (Normal) (G1 Evacuation Pause) 148M->149M(204M) 31.491ms [1.417s] GC(24) Pause Young (Normal) (G1 Evacuation Pause) 157M->158M(204M) 19.333ms [1.453s] GC(25) Pause Young (Normal) (G1 Evacuation Pause) 166M->167M(599M) 22.678ms [1.966s] GC(26) Pause Young (Normal) (G1 Evacuation Pause) 249M->249M(497M) 305.238ms ... After fix: $ java -Xlog:gc -Xmx1G -cp build/classes/java/main MpscLinkedQueueGC.java ... [1.169s] GC(14) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.755ms [1.558s] GC(15) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.689ms [1.948s] GC(16) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.800ms [2.337s] GC(17) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.714ms ... ``` --- .../io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java index d735ff43c0..e8d19c633e 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java +++ b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java @@ -91,6 +91,8 @@ public T poll() { // we have to null out the value because we are going to hang on to the node final T nextValue = nextNode.getAndNullValue(); spConsumerNode(nextNode); + // unlink previous consumer to help gc + currConsumerNode.soNext(null); return nextValue; } else if (currConsumerNode != lvProducerNode()) { @@ -101,6 +103,8 @@ else if (currConsumerNode != lvProducerNode()) { // we have to null out the value because we are going to hang on to the node final T nextValue = nextNode.getAndNullValue(); spConsumerNode(nextNode); + // unlink previous consumer to help gc + currConsumerNode.soNext(null); return nextValue; } return null;