Skip to content

Commit e8c193a

Browse files
authored
Merge pull request #1593 from rabbitmq/mk-clear-recorded-bindings-with-deleted-source
AutorecoveringConnection: when an exchange is deleted, clean up the bindings where it is the source [in the recorded topology]
2 parents cc9863a + 8dff80c commit e8c193a

File tree

2 files changed

+34
-3
lines changed

2 files changed

+34
-3
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,8 +1095,12 @@ void recordExchange(String exchange, RecordedExchange x) {
10951095

10961096
void deleteRecordedExchange(String exchange) {
10971097
this.recordedExchanges.remove(exchange);
1098-
Set<RecordedBinding> xs = this.removeBindingsWithDestination(exchange);
1099-
for (RecordedBinding b : xs) {
1098+
Set<RecordedBinding> xs1 = this.removeBindingsWithDestination(exchange);
1099+
for (RecordedBinding b : xs1) {
1100+
this.maybeDeleteRecordedAutoDeleteExchange(b.getSource());
1101+
}
1102+
Set<RecordedBinding> xs2 = this.removeBindingsWithSource(exchange);
1103+
for (RecordedBinding b : xs2) {
11001104
this.maybeDeleteRecordedAutoDeleteExchange(b.getSource());
11011105
}
11021106
}
@@ -1160,11 +1164,19 @@ boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String q
11601164
}
11611165

11621166
Set<RecordedBinding> removeBindingsWithDestination(String s) {
1167+
return this.removeBindingsWithCondition(b -> b.getDestination().equals(s));
1168+
}
1169+
1170+
Set<RecordedBinding> removeBindingsWithSource(String s) {
1171+
return this.removeBindingsWithCondition(b -> b.getSource().equals(s));
1172+
}
1173+
1174+
private Set<RecordedBinding> removeBindingsWithCondition(Predicate<RecordedBinding> condition) {
11631175
final Set<RecordedBinding> result = new LinkedHashSet<>();
11641176
synchronized (this.recordedBindings) {
11651177
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
11661178
RecordedBinding b = it.next();
1167-
if(b.getDestination().equals(s)) {
1179+
if (condition.test(b)) {
11681180
it.remove();
11691181
result.add(b);
11701182
}

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,21 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
865865
}
866866
}
867867

868+
@Test public void thatBindingFromDeletedExchangeIsDeleted() throws IOException, InterruptedException {
869+
String q = generateQueueName();
870+
channel.queueDeclare(q, false, false, false, null);
871+
try {
872+
String x = generateExchangeName();
873+
channel.exchangeDeclare(x, "fanout");
874+
channel.queueBind(q, x, "");
875+
assertRecordedBinding(connection, 1);
876+
channel.exchangeDelete(x);
877+
assertRecordedBinding(connection, 0);
878+
} finally {
879+
channel.queueDelete(q);
880+
}
881+
}
882+
868883
private void assertConsumerCount(int exp, String q) throws IOException {
869884
assertThat(channel.queueDeclarePassive(q).getConsumerCount()).isEqualTo(exp);
870885
}
@@ -1017,4 +1032,8 @@ private static void assertRecordedQueues(Connection conn, int size) {
10171032
private static void assertRecordedExchanges(Connection conn, int size) {
10181033
assertThat(((AutorecoveringConnection)conn).getRecordedExchanges()).hasSize(size);
10191034
}
1035+
1036+
private static void assertRecordedBinding(Connection conn, int size) {
1037+
assertThat(((AutorecoveringConnection)conn).getRecordedBindings()).hasSize(size);
1038+
}
10201039
}

0 commit comments

Comments
 (0)