Skip to content

Commit fd3a118

Browse files
author
Loïc Hoguin
committed
CQ: Write large messages into their own files
1 parent 21054d4 commit fd3a118

File tree

1 file changed

+104
-41
lines changed

1 file changed

+104
-41
lines changed

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 104 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ handle_cast({write, CRef, MsgRef, MsgId, Flow},
899899
%% or the non-current files. If the message *is* in the
900900
%% current file then the cache entry will be removed by
901901
%% the normal logic for that in write_message/4 and
902-
%% maybe_roll_to_new_file/2.
902+
%% flush_or_roll_to_new_file/2.
903903
case index_lookup(MsgId, State) of
904904
[#msg_location { file = File }]
905905
when File == State #msstate.current_file ->
@@ -1208,26 +1208,102 @@ gc_candidate(File, State = #msstate{ gc_candidates = Candidates,
12081208
gc_candidate(File, State = #msstate{ gc_candidates = Candidates }) ->
12091209
State#msstate{ gc_candidates = Candidates#{ File => true }}.
12101210

1211-
write_message(MsgId, Msg,
1212-
State0 = #msstate { current_file_handle = CurHdl,
1213-
current_file = CurFile,
1214-
current_file_offset = CurOffset,
1215-
file_summary_ets = FileSummaryEts }) ->
1216-
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, Msg),
1217-
State = case MaybeFlush of
1218-
flush -> internal_sync(State0);
1219-
ok -> State0
1220-
end,
1211+
-define(LARGE_MESSAGE_THRESHOLD, 4194304). %% 4MB.
1212+
1213+
write_message(MsgId, MsgBody, State) ->
1214+
MsgBodyBin = term_to_binary(MsgBody),
1215+
%% Large messages get written to their own files.
1216+
if
1217+
byte_size(MsgBodyBin) >= ?LARGE_MESSAGE_THRESHOLD ->
1218+
write_large_message(MsgId, MsgBodyBin, State);
1219+
true ->
1220+
write_small_message(MsgId, MsgBodyBin, State)
1221+
end.
1222+
1223+
write_small_message(MsgId, MsgBodyBin,
1224+
State = #msstate { current_file_handle = CurHdl,
1225+
current_file = CurFile,
1226+
current_file_offset = CurOffset,
1227+
file_summary_ets = FileSummaryEts }) ->
1228+
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, MsgBodyBin),
12211229
ok = index_insert(
12221230
#msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
12231231
offset = CurOffset, total_size = TotalSize }, State),
12241232
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
12251233
[{#file_summary.valid_total_size, TotalSize},
12261234
{#file_summary.file_size, TotalSize}]),
1227-
maybe_roll_to_new_file(CurOffset + TotalSize,
1235+
flush_or_roll_to_new_file(CurOffset + TotalSize, MaybeFlush,
12281236
State #msstate {
12291237
current_file_offset = CurOffset + TotalSize }).
12301238

1239+
flush_or_roll_to_new_file(
1240+
Offset, _MaybeFlush,
1241+
State = #msstate { dir = Dir,
1242+
current_file_handle = CurHdl,
1243+
current_file = CurFile,
1244+
file_summary_ets = FileSummaryEts,
1245+
cur_file_cache_ets = CurFileCacheEts,
1246+
file_size_limit = FileSizeLimit })
1247+
when Offset >= FileSizeLimit ->
1248+
State1 = internal_sync(State),
1249+
ok = writer_close(CurHdl),
1250+
NextFile = CurFile + 1,
1251+
{ok, NextHdl} = writer_open(Dir, NextFile),
1252+
true = ets:insert_new(FileSummaryEts, #file_summary {
1253+
file = NextFile,
1254+
valid_total_size = 0,
1255+
file_size = 0,
1256+
locked = false }),
1257+
%% Delete messages from the cache that were written to disk.
1258+
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
1259+
State1 #msstate { current_file_handle = NextHdl,
1260+
current_file = NextFile,
1261+
current_file_offset = 0 };
1262+
%% If we need to flush, do so here.
1263+
flush_or_roll_to_new_file(_, flush, State) ->
1264+
internal_sync(State);
1265+
flush_or_roll_to_new_file(_, _, State) ->
1266+
State.
1267+
1268+
write_large_message(MsgId, MsgBodyBin,
1269+
State0 = #msstate { dir = Dir,
1270+
current_file_handle = CurHdl,
1271+
current_file = CurFile,
1272+
file_summary_ets = FileSummaryEts,
1273+
cur_file_cache_ets = CurFileCacheEts }) ->
1274+
%% Flush the current file and close it.
1275+
ok = writer_flush(CurHdl),
1276+
ok = writer_close(CurHdl),
1277+
%% Open a new file, write the message directly and close it.
1278+
LargeMsgFile = CurFile + 1,
1279+
{ok, LargeMsgHdl} = writer_open(Dir, LargeMsgFile),
1280+
TotalSize = writer_direct_write(LargeMsgHdl, MsgId, MsgBodyBin),
1281+
ok = writer_close(CurHdl),
1282+
%% Update ets with the new information directly.
1283+
ok = index_insert(
1284+
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
1285+
offset = 0, total_size = TotalSize }, State0),
1286+
true = ets:insert_new(FileSummaryEts, #file_summary {
1287+
file = LargeMsgFile,
1288+
valid_total_size = TotalSize,
1289+
file_size = TotalSize,
1290+
locked = false }),
1291+
%% Roll over to the next file.
1292+
NextFile = LargeMsgFile + 1,
1293+
{ok, NextHdl} = writer_open(Dir, NextFile),
1294+
true = ets:insert_new(FileSummaryEts, #file_summary {
1295+
file = NextFile,
1296+
valid_total_size = 0,
1297+
file_size = 0,
1298+
locked = false }),
1299+
%% Delete messages from the cache that were written to disk.
1300+
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
1301+
%% Process confirms (this won't flush; we already did) and continue.
1302+
State = internal_sync(State0),
1303+
State #msstate { current_file_handle = NextHdl,
1304+
current_file = NextFile,
1305+
current_file_offset = 0 }.
1306+
12311307
contains_message(MsgId, From, State) ->
12321308
MsgLocation = index_lookup_positive_ref_count(MsgId, State),
12331309
gen_server2:reply(From, MsgLocation =/= not_found),
@@ -1325,8 +1401,7 @@ writer_recover(Dir, Num, Offset) ->
13251401
ok = file:truncate(Fd),
13261402
{ok, #writer{fd = Fd, buffer = prim_buffer:new()}}.
13271403

1328-
writer_append(#writer{buffer = Buffer}, MsgId, MsgBody) ->
1329-
MsgBodyBin = term_to_binary(MsgBody),
1404+
writer_append(#writer{buffer = Buffer}, MsgId, MsgBodyBin) ->
13301405
MsgBodyBinSize = byte_size(MsgBodyBin),
13311406
EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin.
13321407
%% We send an iovec to the buffer instead of building a binary.
@@ -1354,6 +1429,21 @@ writer_flush(#writer{fd = Fd, buffer = Buffer}) ->
13541429
file:write(Fd, prim_buffer:read_iovec(Buffer, Size))
13551430
end.
13561431

1432+
%% For large messages we don't buffer anything. Large messages
1433+
%% are kept within their own files.
1434+
%%
1435+
%% This is basically the same as writer_append except no buffering.
1436+
writer_direct_write(#writer{fd = Fd}, MsgId, MsgBodyBin) ->
1437+
MsgBodyBinSize = byte_size(MsgBodyBin),
1438+
EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin.
1439+
file:write(Fd, [
1440+
<<EntrySize:64>>,
1441+
MsgId,
1442+
MsgBodyBin,
1443+
<<255>> %% OK marker.
1444+
]),
1445+
EntrySize + 9.
1446+
13571447
writer_close(#writer{fd = Fd}) ->
13581448
file:close(Fd).
13591449

@@ -1700,33 +1790,6 @@ rebuild_index(Gatherer, Files, State) ->
17001790
%% garbage collection / compaction / aggregation -- internal
17011791
%%----------------------------------------------------------------------------
17021792

1703-
maybe_roll_to_new_file(
1704-
Offset,
1705-
State = #msstate { dir = Dir,
1706-
current_file_handle = CurHdl,
1707-
current_file = CurFile,
1708-
file_summary_ets = FileSummaryEts,
1709-
cur_file_cache_ets = CurFileCacheEts,
1710-
file_size_limit = FileSizeLimit })
1711-
when Offset >= FileSizeLimit ->
1712-
State1 = internal_sync(State),
1713-
ok = writer_close(CurHdl),
1714-
NextFile = CurFile + 1,
1715-
{ok, NextHdl} = writer_open(Dir, NextFile),
1716-
true = ets:insert_new(FileSummaryEts, #file_summary {
1717-
file = NextFile,
1718-
valid_total_size = 0,
1719-
file_size = 0,
1720-
locked = false }),
1721-
%% We only delete messages from the cache that were written to disk
1722-
%% in the previous file.
1723-
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
1724-
State1 #msstate { current_file_handle = NextHdl,
1725-
current_file = NextFile,
1726-
current_file_offset = 0 };
1727-
maybe_roll_to_new_file(_, State) ->
1728-
State.
1729-
17301793
%% We keep track of files that have seen removes and
17311794
%% check those periodically for compaction. We only
17321795
%% compact files that have less than half valid data.

0 commit comments

Comments
 (0)