Skip to content

Commit d959c8a

Browse files
authored
Merge pull request #11112 from rabbitmq/loic-faster-cq-shared-store-gc
4.x: Additional CQv2 message store optimisations
2 parents d2c7739 + cc73b86 commit d959c8a

File tree

11 files changed

+319
-409
lines changed

11 files changed

+319
-409
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ _APP_ENV = """[
3939
{vm_memory_calculation_strategy, rss},
4040
{memory_monitor_interval, 2500},
4141
{disk_free_limit, 50000000}, %% 50MB
42-
{msg_store_index_module, rabbit_msg_store_ets_index},
4342
{backing_queue_module, rabbit_variable_queue},
4443
%% 0 ("no limit") would make a better default, but that
4544
%% breaks the QPid Java client

deps/rabbit/Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ define PROJECT_ENV
1919
{vm_memory_calculation_strategy, rss},
2020
{memory_monitor_interval, 2500},
2121
{disk_free_limit, 50000000}, %% 50MB
22-
{msg_store_index_module, rabbit_msg_store_ets_index},
2322
{backing_queue_module, rabbit_variable_queue},
2423
%% 0 ("no limit") would make a better default, but that
2524
%% breaks the QPid Java client

deps/rabbit/app.bzl

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,6 @@ def all_beam_files(name = "all_beam_files"):
170170
"src/rabbit_mirror_queue_misc.erl",
171171
"src/rabbit_mnesia.erl",
172172
"src/rabbit_msg_store.erl",
173-
"src/rabbit_msg_store_ets_index.erl",
174173
"src/rabbit_msg_store_gc.erl",
175174
"src/rabbit_networking.erl",
176175
"src/rabbit_networking_store.erl",
@@ -431,7 +430,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
431430
"src/rabbit_mirror_queue_misc.erl",
432431
"src/rabbit_mnesia.erl",
433432
"src/rabbit_msg_store.erl",
434-
"src/rabbit_msg_store_ets_index.erl",
435433
"src/rabbit_msg_store_gc.erl",
436434
"src/rabbit_networking.erl",
437435
"src/rabbit_networking_store.erl",
@@ -711,7 +709,6 @@ def all_srcs(name = "all_srcs"):
711709
"src/rabbit_mirror_queue_misc.erl",
712710
"src/rabbit_mnesia.erl",
713711
"src/rabbit_msg_store.erl",
714-
"src/rabbit_msg_store_ets_index.erl",
715712
"src/rabbit_msg_store_gc.erl",
716713
"src/rabbit_networking.erl",
717714
"src/rabbit_networking_store.erl",

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,8 +1125,11 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
11251125
empty ->
11261126
ok = gatherer:stop(Gatherer),
11271127
finished;
1128+
%% From v1 index walker. @todo Remove when no longer possible to convert from v1.
11281129
{value, {MsgId, Count}} ->
1129-
{MsgId, Count, {next, Gatherer}}
1130+
{MsgId, Count, {next, Gatherer}};
1131+
{value, MsgIds} ->
1132+
{MsgIds, {next, Gatherer}}
11301133
end.
11311134

11321135
queue_index_walker_reader(#resource{ virtual_host = VHost } = Name, Gatherer) ->
@@ -1153,27 +1156,30 @@ queue_index_walker_segment(F, Gatherer) ->
11531156
{ok, <<?MAGIC:32,?VERSION:8,
11541157
FromSeqId:64/unsigned,ToSeqId:64/unsigned,
11551158
_/bits>>} ->
1156-
queue_index_walker_segment(Fd, Gatherer, 0, ToSeqId - FromSeqId);
1159+
queue_index_walker_segment(Fd, Gatherer, 0, ToSeqId - FromSeqId, []);
11571160
_ ->
11581161
%% Invalid segment file. Skip.
11591162
ok
11601163
end,
11611164
ok = file:close(Fd).
11621165

1163-
queue_index_walker_segment(_, _, N, N) ->
1166+
queue_index_walker_segment(_, Gatherer, N, N, Acc) ->
11641167
%% We reached the end of the segment file.
1168+
gatherer:sync_in(Gatherer, Acc),
11651169
ok;
1166-
queue_index_walker_segment(Fd, Gatherer, N, Total) ->
1170+
queue_index_walker_segment(Fd, Gatherer, N, Total, Acc) ->
11671171
case file:read(Fd, ?ENTRY_SIZE) of
11681172
%% We found a non-ack persistent entry. Gather it.
11691173
{ok, <<1,_:7,1:1,_,1,Id:16/binary,_/bits>>} ->
1170-
gatherer:sync_in(Gatherer, {Id, 1}),
1171-
queue_index_walker_segment(Fd, Gatherer, N + 1, Total);
1174+
queue_index_walker_segment(Fd, Gatherer, N + 1, Total, [Id|Acc]);
11721175
%% We found an ack, a transient entry or a non-entry. Skip it.
11731176
{ok, _} ->
1174-
queue_index_walker_segment(Fd, Gatherer, N + 1, Total);
1177+
queue_index_walker_segment(Fd, Gatherer, N + 1, Total, Acc);
11751178
%% We reached the end of a partial segment file.
1179+
eof when Acc =:= [] ->
1180+
ok;
11761181
eof ->
1182+
gatherer:sync_in(Gatherer, Acc),
11771183
ok
11781184
end.
11791185

0 commit comments

Comments
 (0)