@@ -899,7 +899,7 @@ handle_cast({write, CRef, MsgRef, MsgId, Flow},
899
899
% % or the non-current files. If the message *is* in the
900
900
% % current file then the cache entry will be removed by
901
901
% % the normal logic for that in write_message/4 and
902
- % % maybe_roll_to_new_file /2.
902
+ % % flush_or_roll_to_new_file /2.
903
903
case index_lookup (MsgId , State ) of
904
904
[# msg_location { file = File }]
905
905
when File == State # msstate .current_file ->
@@ -1208,26 +1208,102 @@ gc_candidate(File, State = #msstate{ gc_candidates = Candidates,
1208
1208
gc_candidate (File , State = # msstate { gc_candidates = Candidates }) ->
1209
1209
State # msstate { gc_candidates = Candidates #{ File => true }}.
1210
1210
1211
- write_message (MsgId , Msg ,
1212
- State0 = # msstate { current_file_handle = CurHdl ,
1213
- current_file = CurFile ,
1214
- current_file_offset = CurOffset ,
1215
- file_summary_ets = FileSummaryEts }) ->
1216
- {MaybeFlush , TotalSize } = writer_append (CurHdl , MsgId , Msg ),
1217
- State = case MaybeFlush of
1218
- flush -> internal_sync (State0 );
1219
- ok -> State0
1220
- end ,
1211
+ -define (LARGE_MESSAGE_THRESHOLD , 4194304 ). % % 4MB.
1212
+
1213
+ write_message (MsgId , MsgBody , State ) ->
1214
+ MsgBodyBin = term_to_binary (MsgBody ),
1215
+ % % Large messages get written to their own files.
1216
+ if
1217
+ byte_size (MsgBodyBin ) >= ? LARGE_MESSAGE_THRESHOLD ->
1218
+ write_large_message (MsgId , MsgBodyBin , State );
1219
+ true ->
1220
+ write_small_message (MsgId , MsgBodyBin , State )
1221
+ end .
1222
+
1223
+ write_small_message (MsgId , MsgBodyBin ,
1224
+ State = # msstate { current_file_handle = CurHdl ,
1225
+ current_file = CurFile ,
1226
+ current_file_offset = CurOffset ,
1227
+ file_summary_ets = FileSummaryEts }) ->
1228
+ {MaybeFlush , TotalSize } = writer_append (CurHdl , MsgId , MsgBodyBin ),
1221
1229
ok = index_insert (
1222
1230
# msg_location { msg_id = MsgId , ref_count = 1 , file = CurFile ,
1223
1231
offset = CurOffset , total_size = TotalSize }, State ),
1224
1232
[_ ,_ ] = ets :update_counter (FileSummaryEts , CurFile ,
1225
1233
[{# file_summary .valid_total_size , TotalSize },
1226
1234
{# file_summary .file_size , TotalSize }]),
1227
- maybe_roll_to_new_file (CurOffset + TotalSize ,
1235
+ flush_or_roll_to_new_file (CurOffset + TotalSize , MaybeFlush ,
1228
1236
State # msstate {
1229
1237
current_file_offset = CurOffset + TotalSize }).
1230
1238
1239
+ flush_or_roll_to_new_file (
1240
+ Offset , _MaybeFlush ,
1241
+ State = # msstate { dir = Dir ,
1242
+ current_file_handle = CurHdl ,
1243
+ current_file = CurFile ,
1244
+ file_summary_ets = FileSummaryEts ,
1245
+ cur_file_cache_ets = CurFileCacheEts ,
1246
+ file_size_limit = FileSizeLimit })
1247
+ when Offset >= FileSizeLimit ->
1248
+ State1 = internal_sync (State ),
1249
+ ok = writer_close (CurHdl ),
1250
+ NextFile = CurFile + 1 ,
1251
+ {ok , NextHdl } = writer_open (Dir , NextFile ),
1252
+ true = ets :insert_new (FileSummaryEts , # file_summary {
1253
+ file = NextFile ,
1254
+ valid_total_size = 0 ,
1255
+ file_size = 0 ,
1256
+ locked = false }),
1257
+ % % Delete messages from the cache that were written to disk.
1258
+ true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
1259
+ State1 # msstate { current_file_handle = NextHdl ,
1260
+ current_file = NextFile ,
1261
+ current_file_offset = 0 };
1262
+ % % If we need to flush, do so here.
1263
+ flush_or_roll_to_new_file (_ , flush , State ) ->
1264
+ internal_sync (State );
1265
+ flush_or_roll_to_new_file (_ , _ , State ) ->
1266
+ State .
1267
+
1268
+ write_large_message (MsgId , MsgBodyBin ,
1269
+ State0 = # msstate { dir = Dir ,
1270
+ current_file_handle = CurHdl ,
1271
+ current_file = CurFile ,
1272
+ file_summary_ets = FileSummaryEts ,
1273
+ cur_file_cache_ets = CurFileCacheEts }) ->
1274
+ % % Flush the current file and close it.
1275
+ ok = writer_flush (CurHdl ),
1276
+ ok = writer_close (CurHdl ),
1277
+ % % Open a new file, write the message directly and close it.
1278
+ LargeMsgFile = CurFile + 1 ,
1279
+ {ok , LargeMsgHdl } = writer_open (Dir , LargeMsgFile ),
1280
+ TotalSize = writer_direct_write (LargeMsgHdl , MsgId , MsgBodyBin ),
1281
+ ok = writer_close (CurHdl ),
1282
+ % % Update ets with the new information directly.
1283
+ ok = index_insert (
1284
+ # msg_location { msg_id = MsgId , ref_count = 1 , file = LargeMsgFile ,
1285
+ offset = 0 , total_size = TotalSize }, State0 ),
1286
+ true = ets :insert_new (FileSummaryEts , # file_summary {
1287
+ file = LargeMsgFile ,
1288
+ valid_total_size = TotalSize ,
1289
+ file_size = TotalSize ,
1290
+ locked = false }),
1291
+ % % Roll over to the next file.
1292
+ NextFile = LargeMsgFile + 1 ,
1293
+ {ok , NextHdl } = writer_open (Dir , NextFile ),
1294
+ true = ets :insert_new (FileSummaryEts , # file_summary {
1295
+ file = NextFile ,
1296
+ valid_total_size = 0 ,
1297
+ file_size = 0 ,
1298
+ locked = false }),
1299
+ % % Delete messages from the cache that were written to disk.
1300
+ true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
1301
+ % % Process confirms (this won't flush; we already did) and continue.
1302
+ State = internal_sync (State0 ),
1303
+ State # msstate { current_file_handle = NextHdl ,
1304
+ current_file = NextFile ,
1305
+ current_file_offset = 0 }.
1306
+
1231
1307
contains_message (MsgId , From , State ) ->
1232
1308
MsgLocation = index_lookup_positive_ref_count (MsgId , State ),
1233
1309
gen_server2 :reply (From , MsgLocation =/= not_found ),
@@ -1325,8 +1401,7 @@ writer_recover(Dir, Num, Offset) ->
1325
1401
ok = file :truncate (Fd ),
1326
1402
{ok , # writer {fd = Fd , buffer = prim_buffer :new ()}}.
1327
1403
1328
- writer_append (# writer {buffer = Buffer }, MsgId , MsgBody ) ->
1329
- MsgBodyBin = term_to_binary (MsgBody ),
1404
+ writer_append (# writer {buffer = Buffer }, MsgId , MsgBodyBin ) ->
1330
1405
MsgBodyBinSize = byte_size (MsgBodyBin ),
1331
1406
EntrySize = MsgBodyBinSize + 16 , % % Size of MsgId + MsgBodyBin.
1332
1407
% % We send an iovec to the buffer instead of building a binary.
@@ -1354,6 +1429,21 @@ writer_flush(#writer{fd = Fd, buffer = Buffer}) ->
1354
1429
file :write (Fd , prim_buffer :read_iovec (Buffer , Size ))
1355
1430
end .
1356
1431
1432
+ % % For large messages we don't buffer anything. Large messages
1433
+ % % are kept within their own files.
1434
+ % %
1435
+ % % This is basically the same as writer_append except no buffering.
1436
+ writer_direct_write (# writer {fd = Fd }, MsgId , MsgBodyBin ) ->
1437
+ MsgBodyBinSize = byte_size (MsgBodyBin ),
1438
+ EntrySize = MsgBodyBinSize + 16 , % % Size of MsgId + MsgBodyBin.
1439
+ file :write (Fd , [
1440
+ <<EntrySize :64 >>,
1441
+ MsgId ,
1442
+ MsgBodyBin ,
1443
+ <<255 >> % % OK marker.
1444
+ ]),
1445
+ EntrySize + 9 .
1446
+
1357
1447
writer_close (# writer {fd = Fd }) ->
1358
1448
file :close (Fd ).
1359
1449
@@ -1700,33 +1790,6 @@ rebuild_index(Gatherer, Files, State) ->
1700
1790
% % garbage collection / compaction / aggregation -- internal
1701
1791
% %----------------------------------------------------------------------------
1702
1792
1703
- maybe_roll_to_new_file (
1704
- Offset ,
1705
- State = # msstate { dir = Dir ,
1706
- current_file_handle = CurHdl ,
1707
- current_file = CurFile ,
1708
- file_summary_ets = FileSummaryEts ,
1709
- cur_file_cache_ets = CurFileCacheEts ,
1710
- file_size_limit = FileSizeLimit })
1711
- when Offset >= FileSizeLimit ->
1712
- State1 = internal_sync (State ),
1713
- ok = writer_close (CurHdl ),
1714
- NextFile = CurFile + 1 ,
1715
- {ok , NextHdl } = writer_open (Dir , NextFile ),
1716
- true = ets :insert_new (FileSummaryEts , # file_summary {
1717
- file = NextFile ,
1718
- valid_total_size = 0 ,
1719
- file_size = 0 ,
1720
- locked = false }),
1721
- % % We only delete messages from the cache that were written to disk
1722
- % % in the previous file.
1723
- true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
1724
- State1 # msstate { current_file_handle = NextHdl ,
1725
- current_file = NextFile ,
1726
- current_file_offset = 0 };
1727
- maybe_roll_to_new_file (_ , State ) ->
1728
- State .
1729
-
1730
1793
% % We keep track of files that have seen removes and
1731
1794
% % check those periodically for compaction. We only
1732
1795
% % compact files that have less than half valid data.
0 commit comments