Skip to content

Commit 6413d2d

Browse files
committed
Fix channel reuse bug
This commit fixes the following test flake that occurred in CI: ``` make -C deps/rabbit ct-amqp_dotnet t=cluster_size_1:redelivery ``` After receiving the end frame, the server session proc replies with the end frame. Usually when the test case succeeds, the server connection process receives a DOWN for the session proc and untracks its channel number such that a subsequent begin frame for the same channel number will create a new session proc in the server. In the flake however, the client receives the end, and pipelines new begin, attach, and flow frames. These frames are received in the server connection's mailbox before the monitor for the old session proc fires. That's why these new frames are sent to the old session proc causing the test case to fail. This reveals a bug in the server. This commit fixes this bug similarly as done in the AMQP 0.9.1 channel in https://github.com/rabbitmq/rabbitmq-server/blob/94b4a6aafdfac6b6cae102f50b188e5ea4a32c0e/deps/rabbit/src/rabbit_channel.erl#L1146-L1155 Channel reuse by the client is valid and actually common, e.g. if channel-max is 0.
1 parent 94b4a6a commit 6413d2d

File tree

2 files changed

+15
-4
lines changed

2 files changed

+15
-4
lines changed

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
-export([init/1,
1818
info/2,
1919
mainloop/2,
20-
set_credential/2]).
20+
set_credential/2,
21+
notify_session_ending/3]).
2122

2223
-export([system_continue/3,
2324
system_terminate/4,
@@ -79,6 +80,11 @@ set_credential(Pid, Credential) ->
7980
Pid ! {set_credential, Credential},
8081
ok.
8182

83+
-spec notify_session_ending(pid(), pid(), non_neg_integer()) -> ok.
84+
notify_session_ending(ConnPid, SessionPid, ChannelNum) ->
85+
ConnPid ! {session_ending, SessionPid, ChannelNum},
86+
ok.
87+
8288
%%--------------------------------------------------------------------------
8389

8490
recvloop(Deb, State = #v1{pending_recv = true}) ->
@@ -233,6 +239,8 @@ handle_other({set_credential, Cred}, State) ->
233239
handle_other(credential_expired, State) ->
234240
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []),
235241
handle_exception(State, 0, Error);
242+
handle_other({session_ending, SessionPid, ChannelNum}, State) ->
243+
untrack_channel(ChannelNum, SessionPid, State);
236244
handle_other(Other, _State) ->
237245
%% internal error -> something worth dying for
238246
exit({unexpected_message, Other}).

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -638,10 +638,11 @@ log_error_and_close_session(
638638
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
639639
writer_pid = WriterPid,
640640
channel_num = Ch}}) ->
641-
End = #'v1_0.end'{error = Error},
642641
?LOG_WARNING("Closing session for connection ~p: ~tp",
643642
[ReaderPid, Error]),
644-
ok = rabbit_amqp_writer:send_command_sync(WriterPid, Ch, End),
643+
rabbit_amqp_reader:notify_session_ending(ReaderPid, self(), Ch),
644+
ok = rabbit_amqp_writer:send_command_sync(
645+
WriterPid, Ch, #'v1_0.end'{error = Error}),
645646
{stop, {shutdown, Error}, State}.
646647

647648
%% Batch confirms / rejects to publishers.
@@ -1178,9 +1179,11 @@ handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
11781179
reply_frames(Reply, State);
11791180

11801181
handle_frame(#'v1_0.end'{},
1181-
State0 = #state{cfg = #cfg{writer_pid = WriterPid,
1182+
State0 = #state{cfg = #cfg{reader_pid = ReaderPid,
1183+
writer_pid = WriterPid,
11821184
channel_num = Ch}}) ->
11831185
State = send_delivery_state_changes(State0),
1186+
rabbit_amqp_reader:notify_session_ending(ReaderPid, self(), Ch),
11841187
ok = try rabbit_amqp_writer:send_command_sync(WriterPid, Ch, #'v1_0.end'{})
11851188
catch exit:{Reason, {gen_server, call, _ArgList}}
11861189
when Reason =:= shutdown orelse

0 commit comments

Comments
 (0)