Skip to content

Commit 7b6b7af

Browse files
committed
Fix and re-enable hanging test
The test could hang because a connection was in flow control and a queue purge operation was stuck. The test now uses different connections to publish and purge and use publisher confirms. Publisher confirms seems necessary because the test was expecting the exact number of published messages was purged from the queue. It's maybe expecting too much considering publishing is asynchronous. Publisher confirms should make the test more reliable.
1 parent 1652485 commit 7b6b7af

File tree

2 files changed

+18
-6
lines changed

2 files changed

+18
-6
lines changed

src/test/java/com/rabbitmq/client/test/server/EffectVisibilityCrossNodeTest.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
import static org.junit.Assert.assertEquals;
1919

2020
import java.io.IOException;
21+
import java.util.concurrent.TimeoutException;
2122

23+
import com.rabbitmq.client.Channel;
24+
import com.rabbitmq.client.Connection;
25+
import com.rabbitmq.client.test.TestUtils;
2226
import org.junit.Test;
2327

2428
import com.rabbitmq.client.test.functional.ClusteredTestBase;
@@ -29,19 +33,25 @@
2933
*/
3034
public class EffectVisibilityCrossNodeTest extends ClusteredTestBase {
3135
private final String[] queues = new String[QUEUES];
36+
private Connection purgeConnection;
3237

3338
@Override
34-
protected void createResources() throws IOException {
39+
protected void createResources() throws IOException, TimeoutException {
3540
for (int i = 0; i < queues.length ; i++) {
3641
queues[i] = alternateChannel.queueDeclare("", false, false, true, null).getQueue();
3742
alternateChannel.queueBind(queues[i], "amq.fanout", "");
3843
}
44+
this.purgeConnection = TestUtils.connectionFactory().newConnection();
3945
}
4046

4147
@Override
4248
protected void releaseResources() throws IOException {
43-
for (int i = 0; i < queues.length ; i++) {
44-
alternateChannel.queueDelete(queues[i]);
49+
try {
50+
for (int i = 0; i < queues.length ; i++) {
51+
alternateChannel.queueDelete(queues[i]);
52+
}
53+
} finally {
54+
TestUtils.close(this.purgeConnection);
4555
}
4656
}
4757

@@ -52,13 +62,15 @@ protected void releaseResources() throws IOException {
5262
private static final byte[] msg = "".getBytes();
5363

5464
@Test public void effectVisibility() throws Exception {
55-
65+
Channel purgeChannel = this.purgeConnection.createChannel();
66+
channel.confirmSelect();
5667
for (int i = 0; i < BATCHES; i++) {
5768
for (int j = 0; j < MESSAGES_PER_BATCH; j++) {
5869
channel.basicPublish("amq.fanout", "", null, msg);
5970
}
71+
channel.waitForConfirmsOrDie(10_000);
6072
for (int j = 0; j < queues.length ; j++) {
61-
assertEquals(MESSAGES_PER_BATCH, channel.queuePurge(queues[j]).getMessageCount());
73+
assertEquals(MESSAGES_PER_BATCH, purgeChannel.queuePurge(queues[j]).getMessageCount());
6274
}
6375
}
6476
}

src/test/java/com/rabbitmq/client/test/server/ServerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
Permissions.class,
2727
DurableBindingLifecycle.class,
2828
DeadLetterExchangeDurable.class,
29-
//EffectVisibilityCrossNodeTest.class,
29+
EffectVisibilityCrossNodeTest.class,
3030
ExclusiveQueueDurability.class,
3131
AbsentQueue.class,
3232
AlternateExchangeEquivalence.class,

0 commit comments

Comments
 (0)