Skip to content

Commit 613e1a9

Browse files
hx235facebook-github-bot
authored andcommitted
Verify flush output file record count + minor clean up (facebook#13556)
Summary: **Context/Summary:** Similar to facebook@0a43d8a, this is to verify flush output file contains the exact number of keys (represented by its `TableProperties::num_entries`) as added to table builder for block-based and plain table format. The implementation reuses a temporary compaction stats to record output record and existing input record (with some refactoring) **Bonus:** following facebook@0a43d8a#r154313564, limit compaction output record count check within block based table and plain table format as well as removing extra test setting; fix some typo Pull Request resolved: facebook#13556 Test Plan: New test Reviewed By: jaykorean Differential Revision: D73229644 Pulled By: hx235 fbshipit-source-id: 2a7796450048b3bcb2d5c38f2b5fc6b53e4aae37
1 parent bcda3bd commit 613e1a9

File tree

14 files changed

+188
-72
lines changed

14 files changed

+188
-72
lines changed

db/builder.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ Status BuildTable(
7474
EventLogger* event_logger, int job_id, TableProperties* table_properties,
7575
Env::WriteLifeTimeHint write_hint, const std::string* full_history_ts_low,
7676
BlobFileCompletionCallback* blob_callback, Version* version,
77-
uint64_t* num_input_entries, uint64_t* memtable_payload_bytes,
78-
uint64_t* memtable_garbage_bytes) {
77+
uint64_t* memtable_payload_bytes, uint64_t* memtable_garbage_bytes,
78+
InternalStats::CompactionStats* flush_stats) {
7979
assert((tboptions.column_family_id ==
8080
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
8181
tboptions.column_family_name.empty());
@@ -253,6 +253,10 @@ Status BuildTable(
253253
}
254254
builder->Add(key_after_flush, value_after_flush);
255255

256+
if (flush_stats) {
257+
flush_stats->num_output_records++;
258+
}
259+
256260
s = meta->UpdateBoundaries(key_after_flush, value_after_flush,
257261
ikey.sequence, ikey.type);
258262
if (!s.ok()) {
@@ -284,6 +288,9 @@ Status BuildTable(
284288
auto tombstone = range_del_it->Tombstone();
285289
std::pair<InternalKey, Slice> kv = tombstone.Serialize();
286290
builder->Add(kv.first.Encode(), kv.second);
291+
if (flush_stats) {
292+
flush_stats->num_output_records++;
293+
}
287294
InternalKey tombstone_end = tombstone.SerializeEndKey();
288295
meta->UpdateBoundariesForRange(kv.first, tombstone_end, tombstone.seq_,
289296
tboptions.internal_comparator);
@@ -305,9 +312,9 @@ Status BuildTable(
305312

306313
TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable");
307314
const bool empty = builder->IsEmpty();
308-
if (num_input_entries != nullptr) {
315+
if (flush_stats) {
309316
assert(c_iter.HasNumInputEntryScanned());
310-
*num_input_entries =
317+
flush_stats->num_input_records =
311318
c_iter.NumInputEntryScanned() + num_unfragmented_tombstones;
312319
}
313320
if (!s.ok() || empty) {

db/builder.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <utility>
1111
#include <vector>
1212

13+
#include "db/internal_stats.h"
1314
#include "db/range_tombstone_fragmenter.h"
1415
#include "db/seqno_to_time_mapping.h"
1516
#include "db/table_properties_collector.h"
@@ -34,7 +35,6 @@ class SnapshotChecker;
3435
class TableCache;
3536
class TableBuilder;
3637
class WritableFileWriter;
37-
class InternalStats;
3838
class BlobFileCompletionCallback;
3939

4040
// Convenience function for NewTableBuilder on the embedded table_factory.
@@ -49,6 +49,7 @@ TableBuilder* NewTableBuilder(const TableBuilderOptions& tboptions,
4949
//
5050
// @param column_family_name Name of the column family that is also identified
5151
// by column_family_id, or empty string if unknown.
52+
// @param flush_stats treat flush as level 0 compaction in internal stats
5253
Status BuildTable(
5354
const std::string& dbname, VersionSet* versions,
5455
const ImmutableDBOptions& db_options, const TableBuilderOptions& tboptions,
@@ -69,8 +70,8 @@ Status BuildTable(
6970
Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET,
7071
const std::string* full_history_ts_low = nullptr,
7172
BlobFileCompletionCallback* blob_callback = nullptr,
72-
Version* version = nullptr, uint64_t* num_input_entries = nullptr,
73-
uint64_t* memtable_payload_bytes = nullptr,
74-
uint64_t* memtable_garbage_bytes = nullptr);
73+
Version* version = nullptr, uint64_t* memtable_payload_bytes = nullptr,
74+
uint64_t* memtable_garbage_bytes = nullptr,
75+
InternalStats::CompactionStats* flush_stats = nullptr);
7576

7677
} // namespace ROCKSDB_NAMESPACE

db/compaction/compaction_job.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -878,7 +878,14 @@ Status CompactionJob::Run() {
878878
UpdateCompactionJobOutputStats(internal_stats_);
879879

880880
// Verify number of output records
881-
if (status.ok() && db_options_.compaction_verify_record_count) {
881+
// Only verify on table with format collects table properties
882+
const auto& mutable_cf_options = compact_->compaction->mutable_cf_options();
883+
if (status.ok() &&
884+
(mutable_cf_options.table_factory->IsInstanceOf(
885+
TableFactory::kBlockBasedTableName()) ||
886+
mutable_cf_options.table_factory->IsInstanceOf(
887+
TableFactory::kPlainTableName())) &&
888+
db_options_.compaction_verify_record_count) {
882889
uint64_t total_output_num = 0;
883890
for (const auto& state : compact_->sub_compact_states) {
884891
for (const auto& output : state.GetOutputs()) {

db/compaction/compaction_job_test.cc

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,6 @@ class CompactionJobTestBase : public testing::Test {
232232
// set default for the tests
233233
mutable_cf_options_.target_file_size_base = 1024 * 1024;
234234
mutable_cf_options_.max_compaction_bytes = 10 * 1024 * 1024;
235-
236-
// Turn off compaction_verify_record_count MockTables
237-
if (table_type == TableTypeForTest::kMockTable) {
238-
db_options_.compaction_verify_record_count = false;
239-
}
240235
}
241236

242237
void SetUp() override {

db/compaction/tiered_compaction_test.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageUniversal) {
225225
flush_stats.micros = 1;
226226
flush_stats.bytes_written = bytes_per_file;
227227
flush_stats.num_output_files = 1;
228+
flush_stats.num_input_records = kNumKeys;
229+
flush_stats.num_output_records = kNumKeys;
228230
expect_stats[0].Add(flush_stats);
229231
}
230232
ASSERT_OK(dbfull()->TEST_WaitForCompact());
@@ -1080,6 +1082,8 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
10801082
flush_stats.micros = 1;
10811083
flush_stats.bytes_written = bytes_per_file;
10821084
flush_stats.num_output_files = 1;
1085+
flush_stats.num_input_records = kNumKeys;
1086+
flush_stats.num_output_records = kNumKeys;
10831087
expect_stats[0].Add(flush_stats);
10841088
}
10851089
ASSERT_OK(dbfull()->TEST_WaitForCompact());

db/corruption_test.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -851,9 +851,6 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
851851
options.env = env_.get();
852852
options.paranoid_file_checks = true;
853853
options.create_if_missing = true;
854-
// Skip verifying record count against TableProperties for
855-
// MockTables
856-
options.compaction_verify_record_count = false;
857854
Status s;
858855
for (const auto& mode : corruption_modes) {
859856
delete db_;

db/db_flush_test.cc

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3504,6 +3504,63 @@ TEST_F(DBFlushTest, DBStuckAfterAtomicFlushError) {
35043504
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
35053505
ASSERT_EQ(1, NumTableFilesAtLevel(0));
35063506
}
3507+
3508+
TEST_F(DBFlushTest, VerifyOutputRecordCount) {
3509+
for (bool use_plain_table : {false, true}) {
3510+
Options options = CurrentOptions();
3511+
options.flush_verify_memtable_count = true;
3512+
options.merge_operator = MergeOperators::CreateStringAppendOperator();
3513+
DestroyAndReopen(options);
3514+
// Verify flush output record count verification in different table
3515+
// formats
3516+
if (use_plain_table) {
3517+
options.table_factory.reset(NewPlainTableFactory());
3518+
}
3519+
3520+
// Verify that flush output record count verification does not produce false
3521+
// positives.
3522+
ASSERT_OK(Merge("k0", "v1"));
3523+
ASSERT_OK(Put("k1", "v1"));
3524+
ASSERT_OK(Put("k2", "v1"));
3525+
ASSERT_OK(SingleDelete("k2"));
3526+
ASSERT_OK(Delete("k2"));
3527+
ASSERT_OK(Delete("k3"));
3528+
ASSERT_OK(db_->DeleteRange(WriteOptions(), "k1", "k3"));
3529+
ASSERT_OK(Flush());
3530+
3531+
// Verify that flush output record count verification catch corruption
3532+
DestroyAndReopen(options);
3533+
if (use_plain_table) {
3534+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3535+
"PlainTableBuilder::Add::skip",
3536+
[&](void* skip) { *(bool*)skip = true; });
3537+
3538+
} else {
3539+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3540+
"BlockBasedTableBuilder::Add::skip",
3541+
[&](void* skip) { *(bool*)skip = true; });
3542+
}
3543+
SyncPoint::GetInstance()->EnableProcessing();
3544+
const char* expect =
3545+
"Number of keys in flush output SST files does not match";
3546+
3547+
// 1. During DB open flush
3548+
ASSERT_OK(Put("k1", "v1"));
3549+
ASSERT_OK(Put("k2", "v1"));
3550+
Status s = TryReopen(options);
3551+
ASSERT_TRUE(s.IsCorruption());
3552+
ASSERT_TRUE(std::strstr(s.getState(), expect));
3553+
3554+
// 2. During regular flush
3555+
DestroyAndReopen(options);
3556+
ASSERT_OK(Put("k1", "v1"));
3557+
ASSERT_OK(Put("k2", "v1"));
3558+
s = Flush();
3559+
ASSERT_TRUE(s.IsCorruption());
3560+
ASSERT_TRUE(std::strstr(s.getState(), expect));
3561+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3562+
}
3563+
}
35073564
} // namespace ROCKSDB_NAMESPACE
35083565

35093566
int main(int argc, char** argv) {

db/db_impl/db_impl_open.cc

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,6 +2000,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
20002000
const size_t ts_sz = ucmp->timestamp_size();
20012001
const bool logical_strip_timestamp =
20022002
ts_sz > 0 && !cfd->ioptions().persist_user_defined_timestamps;
2003+
// Note that here we treat flush as level 0 compaction in internal stats
2004+
InternalStats::CompactionStats flush_stats(CompactionReason::kFlush,
2005+
1 /* count */);
20032006
{
20042007
ScopedArenaPtr<InternalIterator> iter(
20052008
logical_strip_timestamp
@@ -2072,19 +2075,20 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
20722075
kMaxSequenceNumber);
20732076
Version* version = cfd->current();
20742077
version->Ref();
2075-
uint64_t num_input_entries = 0;
2076-
s = BuildTable(dbname_, versions_.get(), immutable_db_options_, tboptions,
2077-
file_options_for_compaction_, cfd->table_cache(),
2078-
iter.get(), std::move(range_del_iters), &meta,
2079-
&blob_file_additions, snapshot_seqs, earliest_snapshot,
2080-
earliest_write_conflict_snapshot, kMaxSequenceNumber,
2081-
snapshot_checker, paranoid_file_checks,
2082-
cfd->internal_stats(), &io_s, io_tracer_,
2083-
BlobFileCreationReason::kRecovery,
2084-
nullptr /* seqno_to_time_mapping */, &event_logger_,
2085-
job_id, nullptr /* table_properties */, write_hint,
2086-
nullptr /*full_history_ts_low*/, &blob_callback_, version,
2087-
&num_input_entries);
2078+
TableProperties temp_table_proerties;
2079+
s = BuildTable(
2080+
dbname_, versions_.get(), immutable_db_options_, tboptions,
2081+
file_options_for_compaction_, cfd->table_cache(), iter.get(),
2082+
std::move(range_del_iters), &meta, &blob_file_additions,
2083+
snapshot_seqs, earliest_snapshot, earliest_write_conflict_snapshot,
2084+
kMaxSequenceNumber, snapshot_checker, paranoid_file_checks,
2085+
cfd->internal_stats(), &io_s, io_tracer_,
2086+
BlobFileCreationReason::kRecovery,
2087+
nullptr /* seqno_to_time_mapping */, &event_logger_, job_id,
2088+
&temp_table_proerties /* table_properties */, write_hint,
2089+
nullptr /*full_history_ts_low*/, &blob_callback_, version,
2090+
nullptr /* memtable_payload_bytes */,
2091+
nullptr /* memtable_garbage_bytes */, &flush_stats);
20882092
version->Unref();
20892093
LogFlush(immutable_db_options_.info_log);
20902094
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
@@ -2100,10 +2104,31 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
21002104
}
21012105

21022106
uint64_t total_num_entries = mem->NumEntries();
2103-
if (s.ok() && total_num_entries != num_input_entries) {
2107+
if (s.ok() && total_num_entries != flush_stats.num_input_records) {
21042108
std::string msg = "Expected " + std::to_string(total_num_entries) +
21052109
" entries in memtable, but read " +
2106-
std::to_string(num_input_entries);
2110+
std::to_string(flush_stats.num_input_records);
2111+
ROCKS_LOG_WARN(immutable_db_options_.info_log,
2112+
"[%s] [JOB %d] Level-0 flush during recover: %s",
2113+
cfd->GetName().c_str(), job_id, msg.c_str());
2114+
if (immutable_db_options_.flush_verify_memtable_count) {
2115+
s = Status::Corruption(msg);
2116+
}
2117+
}
2118+
// Only verify on table with format collects table properties
2119+
const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
2120+
if (s.ok() &&
2121+
(mutable_cf_options.table_factory->IsInstanceOf(
2122+
TableFactory::kBlockBasedTableName()) ||
2123+
mutable_cf_options.table_factory->IsInstanceOf(
2124+
TableFactory::kPlainTableName())) &&
2125+
flush_stats.num_output_records != temp_table_proerties.num_entries) {
2126+
std::string msg =
2127+
"Number of keys in flush output SST files does not match "
2128+
"number of keys added to the table. Expected " +
2129+
std::to_string(flush_stats.num_output_records) + " but there are " +
2130+
std::to_string(temp_table_proerties.num_entries) +
2131+
" in output SST files";
21072132
ROCKS_LOG_WARN(immutable_db_options_.info_log,
21082133
"[%s] [JOB %d] Level-0 flush during recover: %s",
21092134
cfd->GetName().c_str(), job_id, msg.c_str());
@@ -2151,25 +2176,25 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
21512176
}
21522177
}
21532178

2154-
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
2155-
stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
2179+
flush_stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
21562180

21572181
if (has_output) {
2158-
stats.bytes_written = meta.fd.GetFileSize();
2159-
stats.num_output_files = 1;
2182+
flush_stats.bytes_written = meta.fd.GetFileSize();
2183+
flush_stats.num_output_files = 1;
21602184
}
21612185

21622186
const auto& blobs = edit->GetBlobFileAdditions();
21632187
for (const auto& blob : blobs) {
2164-
stats.bytes_written_blob += blob.GetTotalBlobBytes();
2188+
flush_stats.bytes_written_blob += blob.GetTotalBlobBytes();
21652189
}
21662190

2167-
stats.num_output_files_blob = static_cast<int>(blobs.size());
2191+
flush_stats.num_output_files_blob = static_cast<int>(blobs.size());
21682192

2169-
cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
2193+
cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER,
2194+
flush_stats);
21702195
cfd->internal_stats()->AddCFStats(
21712196
InternalStats::BYTES_FLUSHED,
2172-
stats.bytes_written + stats.bytes_written_blob);
2197+
flush_stats.bytes_written + flush_stats.bytes_written_blob);
21732198
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
21742199
return s;
21752200
}

db/db_test.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5463,7 +5463,6 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
54635463
options.max_bytes_for_level_multiplier = 8;
54645464
options.max_background_compactions = 1;
54655465
options.num_levels = 5;
5466-
options.compaction_verify_record_count = false;
54675466
std::shared_ptr<mock::MockTableFactory> mtf(new mock::MockTableFactory);
54685467
options.table_factory = mtf;
54695468

0 commit comments

Comments
 (0)