Skip to content

Commit 56359da

Browse files
cbi42facebook-github-bot
authored andcommitted
Trigger memtable flush based on number of hidden entries scanned (facebook#13523)
Summary: Introduce a mutable CF option `memtable_op_scan_flush_trigger`. When a DB iterator scans this number of hidden entries (tombstones, overwritten puts) from the active memtable in a Seek() or Next() operation, it marks the memtable to be eligible for flush. Subsequent write operations will schedule the marked memtable for flush. The main change is small and is in db_iter.cc. Some refactoring is done to consolidate and simplify creation of `ArenaWrappedDBIter` and `DBIter`. Pull Request resolved: facebook#13523 Test Plan: - new unit tests added. - added `memtable_op_scan_flush_trigger` in crash test - benchmark: The following benchmark was done with a previous version of the PR where the option was `memtable_tombstone_scan_limit` and it concerns tombstone only. The results should still be applicable for the case when there's no overwritten puts. Tests that when memtable has many tombstones, the option helps to improve scan performance: ``` TEST_TMPDIR=/dev/shm ./db_bench --benchmarks=seekrandomwhilewriting --expand_range_tombstones=true --writes_per_range_tombstone=1 --max_num_range_tombstones=10000000 --perf_level=2 --range_tombstone_width=100 --memtable_tombstone_scan_limit= memtable_tombstone_scan_limit = 10000 seekrandomwhilewriting : 18.527 micros/op 53973 ops/sec 18.527 seconds 1000000 operations; (7348 of 1000000 found) next_on_memtable_count = 122305248 grep "flush_started" /dev/shm/dbbench/LOG | wc 8 200 2417 memtable_tombstone_scan_limit=200 seekrandomwhilewriting : 4.918 micros/op 203315 ops/sec 4.918 seconds 1000000 operations; (4510 of 1000000 found) next_on_memtable_count = 1853167 grep "flush_started" /dev/shm/dbbench/LOG | wc 184 4600 54121 When memtable_tombstone_scan_limit=200, more flush is trigged to drop tombstones sooner and improve scan performance. ``` Tests that the new option does not introduce noticeable regression: ``` TEST_TMPDIR=/dev/shm ./db_bench --benchmarks=seekrandomwhilewriting[-X5] --expand_range_tombstones=true --writes_per_range_tombstone=1 --max_num_range_tombstones=10000000 --perf_level=2 --range_tombstone_width=100 --seed=123 Main: seekrandomwhilewriting [AVG 5 runs] : 46049 (± 4512) ops/sec PR: seekrandomwhilewriting [AVG 5 runs] : 46100 (± 4470) ops/sec The results are noisy with this PR performing better and worse in different runs, with no noticeable regression. ``` Reviewed By: pdillinger Differential Revision: D72596434 Pulled By: cbi42 fbshipit-source-id: 2d51a0221dc20dac844aeba2ad3999d075a4cf91
1 parent 46c37a6 commit 56359da

27 files changed

+489
-333
lines changed

db/arena_wrapped_db_iter.cc

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,25 +42,24 @@ Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
4242
void ArenaWrappedDBIter::Init(
4343
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
4444
const MutableCFOptions& mutable_cf_options, const Version* version,
45-
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration,
46-
uint64_t version_number, ReadCallback* read_callback,
47-
ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh) {
45+
const SequenceNumber& sequence, uint64_t version_number,
46+
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
47+
bool expose_blob_index, bool allow_refresh, ReadOnlyMemTable* active_mem) {
4848
read_options_ = read_options;
4949
if (!CheckFSFeatureSupport(env->GetFileSystem().get(),
5050
FSSupportedOps::kAsyncIO)) {
5151
read_options_.async_io = false;
5252
}
5353
read_options_.total_order_seek |= ioptions.prefix_seek_opt_in_only;
5454

55-
auto mem = arena_.AllocateAligned(sizeof(DBIter));
56-
db_iter_ = new (mem) DBIter(env, read_options_, ioptions, mutable_cf_options,
57-
ioptions.user_comparator,
58-
/* iter */ nullptr, version, sequence, true,
59-
max_sequential_skip_in_iteration, read_callback,
60-
cfh, expose_blob_index);
55+
db_iter_ = DBIter::NewIter(
56+
env, read_options_, ioptions, mutable_cf_options,
57+
ioptions.user_comparator, /*internal_iter=*/nullptr, version, sequence,
58+
read_callback, cfh, expose_blob_index, active_mem, &arena_);
6159

6260
sv_number_ = version_number;
6361
allow_refresh_ = allow_refresh;
62+
allow_mark_memtable_for_flush_ = active_mem;
6463
memtable_range_tombstone_iter_ = nullptr;
6564
}
6665

@@ -166,9 +165,8 @@ void ArenaWrappedDBIter::DoRefresh(const Snapshot* snapshot,
166165
read_callback_->Refresh(read_seq);
167166
}
168167
Init(env, read_options_, cfd->ioptions(), sv->mutable_cf_options, sv->current,
169-
read_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
170-
sv->version_number, read_callback_, cfh_, expose_blob_index_,
171-
allow_refresh_);
168+
read_seq, sv->version_number, read_callback_, cfh_, expose_blob_index_,
169+
allow_refresh_, allow_mark_memtable_for_flush_ ? sv->mem : nullptr);
172170

173171
InternalIterator* internal_iter = db_impl->NewInternalIterator(
174172
read_options_, cfd, sv, &arena_, read_seq,
@@ -253,20 +251,26 @@ Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
253251
}
254252

255253
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
256-
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
257-
const MutableCFOptions& mutable_cf_options, const Version* version,
258-
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
259-
uint64_t version_number, ReadCallback* read_callback,
260-
ColumnFamilyHandleImpl* cfh, bool expose_blob_index, bool allow_refresh) {
261-
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
262-
iter->Init(env, read_options, ioptions, mutable_cf_options, version, sequence,
263-
max_sequential_skip_in_iterations, version_number, read_callback,
264-
cfh, expose_blob_index, allow_refresh);
254+
Env* env, const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
255+
SuperVersion* sv, const SequenceNumber& sequence,
256+
ReadCallback* read_callback, DBImpl* db_impl, bool expose_blob_index,
257+
bool allow_refresh, bool allow_mark_memtable_for_flush) {
258+
ArenaWrappedDBIter* db_iter = new ArenaWrappedDBIter();
259+
db_iter->Init(env, read_options, cfh->cfd()->ioptions(),
260+
sv->mutable_cf_options, sv->current, sequence,
261+
sv->version_number, read_callback, cfh, expose_blob_index,
262+
allow_refresh,
263+
allow_mark_memtable_for_flush ? sv->mem : nullptr);
265264
if (cfh != nullptr && allow_refresh) {
266-
iter->StoreRefreshInfo(cfh, read_callback, expose_blob_index);
265+
db_iter->StoreRefreshInfo(cfh, read_callback, expose_blob_index);
267266
}
268267

269-
return iter;
268+
InternalIterator* internal_iter = db_impl->NewInternalIterator(
269+
db_iter->GetReadOptions(), cfh->cfd(), sv, db_iter->GetArena(), sequence,
270+
/*allow_unprepared_value=*/true, db_iter);
271+
db_iter->SetIterUnderDBIter(internal_iter);
272+
273+
return db_iter;
270274
}
271275

272276
} // namespace ROCKSDB_NAMESPACE

db/arena_wrapped_db_iter.h

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
#include "options/cf_options.h"
2020
#include "rocksdb/db.h"
2121
#include "rocksdb/iterator.h"
22-
#include "util/autovector.h"
2322

2423
namespace ROCKSDB_NAMESPACE {
2524

@@ -103,13 +102,15 @@ class ArenaWrappedDBIter : public Iterator {
103102
db_iter_->Prepare(scan_opts);
104103
}
105104

105+
// FIXME: we could just pass SV in for mutable cf option, version and version
106+
// number, but this is used by SstFileReader which does not have a SV.
106107
void Init(Env* env, const ReadOptions& read_options,
107108
const ImmutableOptions& ioptions,
108109
const MutableCFOptions& mutable_cf_options, const Version* version,
109-
const SequenceNumber& sequence,
110-
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
110+
const SequenceNumber& sequence, uint64_t version_number,
111111
ReadCallback* read_callback, ColumnFamilyHandleImpl* cfh,
112-
bool expose_blob_index, bool allow_refresh);
112+
bool expose_blob_index, bool allow_refresh,
113+
ReadOnlyMemTable* active_mem);
113114

114115
// Store some parameters so we can refresh the iterator at a later point
115116
// with these same params
@@ -132,20 +133,16 @@ class ArenaWrappedDBIter : public Iterator {
132133
ReadCallback* read_callback_;
133134
bool expose_blob_index_ = false;
134135
bool allow_refresh_ = true;
136+
bool allow_mark_memtable_for_flush_ = true;
135137
// If this is nullptr, it means the mutable memtable does not contain range
136138
// tombstone when added under this DBIter.
137139
std::unique_ptr<TruncatedRangeDelIterator>* memtable_range_tombstone_iter_ =
138140
nullptr;
139141
};
140142

141-
// Generate the arena wrapped iterator class.
142-
// `cfh` is used for reneweal. If left null, renewal will not
143-
// be supported.
144143
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
145-
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
146-
const MutableCFOptions& mutable_cf_options, const Version* version,
147-
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
148-
uint64_t version_number, ReadCallback* read_callback,
149-
ColumnFamilyHandleImpl* cfh = nullptr, bool expose_blob_index = false,
150-
bool allow_refresh = true);
144+
Env* env, const ReadOptions& read_options, ColumnFamilyHandleImpl* cfh,
145+
SuperVersion* sv, const SequenceNumber& sequence,
146+
ReadCallback* read_callback, DBImpl* db_impl, bool expose_blob_index,
147+
bool allow_refresh, bool allow_mark_memtable_for_flush);
151148
} // namespace ROCKSDB_NAMESPACE

db/column_family.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,13 @@ ColumnFamilyOptions SanitizeCfOptions(const ImmutableDBOptions& db_options,
448448
result.preclude_last_level_data_seconds = 0;
449449
}
450450

451+
if (read_only && result.memtable_op_scan_flush_trigger != 0) {
452+
ROCKS_LOG_WARN(db_options.info_log.get(),
453+
"option memtable_op_scan_flush_trigger is sanitized to "
454+
"0(disabled) for read only DB.");
455+
result.memtable_op_scan_flush_trigger = 0;
456+
}
457+
451458
return result;
452459
}
453460

db/db_impl/db_impl.cc

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3857,11 +3857,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& _read_options,
38573857

38583858
auto iter = new ForwardIterator(this, read_options, cfd, sv,
38593859
/* allow_unprepared_value */ true);
3860-
result = NewDBIterator(
3861-
env_, read_options, cfd->ioptions(), sv->mutable_cf_options,
3862-
cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
3863-
sv->mutable_cf_options.max_sequential_skip_in_iterations,
3864-
nullptr /* read_callback */, cfh);
3860+
result = DBIter::NewIter(env_, read_options, cfd->ioptions(),
3861+
sv->mutable_cf_options, cfd->user_comparator(),
3862+
iter, sv->current, kMaxSequenceNumber,
3863+
/*read_callback=*/nullptr, cfh,
3864+
/*expose_blob_index=*/false,
3865+
/*active_mem=*/sv->mem);
38653866
} else {
38663867
// Note: no need to consider the special case of
38673868
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
@@ -3939,18 +3940,9 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
39393940
// Laying out the iterators in the order of being accessed makes it more
39403941
// likely that any iterator pointer is close to the iterator it points to so
39413942
// that they are likely to be in the same cache line and/or page.
3942-
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
3943-
env_, read_options, cfh->cfd()->ioptions(), sv->mutable_cf_options,
3944-
sv->current, snapshot,
3945-
sv->mutable_cf_options.max_sequential_skip_in_iterations,
3946-
sv->version_number, read_callback, cfh, expose_blob_index, allow_refresh);
3947-
3948-
InternalIterator* internal_iter = NewInternalIterator(
3949-
db_iter->GetReadOptions(), cfh->cfd(), sv, db_iter->GetArena(), snapshot,
3950-
/* allow_unprepared_value */ true, db_iter);
3951-
db_iter->SetIterUnderDBIter(internal_iter);
3952-
3953-
return db_iter;
3943+
return NewArenaWrappedDbIterator(
3944+
env_, read_options, cfh, sv, snapshot, read_callback, this,
3945+
expose_blob_index, allow_refresh, /*allow_mark_memtable_for_flush=*/true);
39543946
}
39553947

39563948
std::unique_ptr<Iterator> DBImpl::NewCoalescingIterator(
@@ -4075,13 +4067,11 @@ Status DBImpl::NewIterators(
40754067
cf_sv_pair.super_version,
40764068
/* allow_unprepared_value */ true);
40774069
iterators->push_back(
4078-
NewDBIterator(env_, read_options, cf_sv_pair.cfd->ioptions(),
4079-
cf_sv_pair.super_version->mutable_cf_options,
4080-
cf_sv_pair.cfd->user_comparator(), iter,
4081-
cf_sv_pair.super_version->current, kMaxSequenceNumber,
4082-
cf_sv_pair.super_version->mutable_cf_options
4083-
.max_sequential_skip_in_iterations,
4084-
nullptr /*read_callback*/, cf_sv_pair.cfh));
4070+
DBIter::NewIter(env_, read_options, cf_sv_pair.cfd->ioptions(),
4071+
cf_sv_pair.super_version->mutable_cf_options,
4072+
cf_sv_pair.cfd->user_comparator(), iter,
4073+
cf_sv_pair.super_version->current, kMaxSequenceNumber,
4074+
nullptr /*read_callback*/, cf_sv_pair.cfh));
40854075
}
40864076
} else {
40874077
for (const auto& cf_sv_pair : cf_sv_pairs) {

db/db_impl/db_impl_readonly.cc

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -185,16 +185,10 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& _read_options,
185185
? static_cast<const SnapshotImpl*>(read_options.snapshot)->number_
186186
: latest_snapshot;
187187
ReadCallback* read_callback = nullptr; // No read callback provided.
188-
auto db_iter = NewArenaWrappedDbIterator(
189-
env_, read_options, cfd->ioptions(), super_version->mutable_cf_options,
190-
super_version->current, read_seq,
191-
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
192-
super_version->version_number, read_callback);
193-
auto internal_iter = NewInternalIterator(
194-
db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
195-
read_seq, /* allow_unprepared_value */ true, db_iter);
196-
db_iter->SetIterUnderDBIter(internal_iter);
197-
return db_iter;
188+
return NewArenaWrappedDbIterator(
189+
env_, read_options, cfh, super_version, read_seq, read_callback, this,
190+
/*expose_blob_index=*/false, /*allow_refresh=*/false,
191+
/*allow_mark_memtable_for_flush=*/false);
198192
}
199193

200194
Status DBImplReadOnly::NewIterators(
@@ -231,36 +225,32 @@ Status DBImplReadOnly::NewIterators(
231225
? static_cast<const SnapshotImpl*>(read_options.snapshot)->number_
232226
: latest_snapshot;
233227

234-
autovector<std::tuple<ColumnFamilyData*, SuperVersion*>> cfd_to_sv;
228+
autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv;
235229

236230
const bool check_read_ts =
237231
read_options.timestamp && read_options.timestamp->size() > 0;
238232
for (auto cfh : column_families) {
239233
auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
240234
auto* sv = cfd->GetSuperVersion()->Ref();
241-
cfd_to_sv.emplace_back(cfd, sv);
235+
cfh_to_sv.emplace_back(static_cast_with_check<ColumnFamilyHandleImpl>(cfh),
236+
sv);
242237
if (check_read_ts) {
243238
const Status s =
244239
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
245240
if (!s.ok()) {
246-
for (auto prev_entry : cfd_to_sv) {
241+
for (auto prev_entry : cfh_to_sv) {
247242
std::get<1>(prev_entry)->Unref();
248243
}
249244
return s;
250245
}
251246
}
252247
}
253-
assert(cfd_to_sv.size() == column_families.size());
254-
for (auto [cfd, sv] : cfd_to_sv) {
248+
assert(cfh_to_sv.size() == column_families.size());
249+
for (auto [cfh, sv] : cfh_to_sv) {
255250
auto* db_iter = NewArenaWrappedDbIterator(
256-
env_, read_options, cfd->ioptions(), sv->mutable_cf_options,
257-
sv->current, read_seq,
258-
sv->mutable_cf_options.max_sequential_skip_in_iterations,
259-
sv->version_number, read_callback);
260-
auto* internal_iter = NewInternalIterator(
261-
db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(), read_seq,
262-
/* allow_unprepared_value */ true, db_iter);
263-
db_iter->SetIterUnderDBIter(internal_iter);
251+
env_, read_options, cfh, sv, read_seq, read_callback, this,
252+
/*expose_blob_index=*/false, /*allow_refresh=*/false,
253+
/*allow_mark_memtable_for_flush=*/false);
264254
iterators->push_back(db_iter);
265255
}
266256

db/db_impl/db_impl_secondary.cc

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -566,17 +566,10 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
566566
assert(snapshot == kMaxSequenceNumber);
567567
snapshot = versions_->LastSequence();
568568
assert(snapshot != kMaxSequenceNumber);
569-
auto db_iter = NewArenaWrappedDbIterator(
570-
env_, read_options, cfh->cfd()->ioptions(),
571-
super_version->mutable_cf_options, super_version->current, snapshot,
572-
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
573-
super_version->version_number, read_callback, cfh, expose_blob_index,
574-
allow_refresh);
575-
auto internal_iter = NewInternalIterator(
576-
db_iter->GetReadOptions(), cfh->cfd(), super_version, db_iter->GetArena(),
577-
snapshot, /* allow_unprepared_value */ true, db_iter);
578-
db_iter->SetIterUnderDBIter(internal_iter);
579-
return db_iter;
569+
return NewArenaWrappedDbIterator(env_, read_options, cfh, super_version,
570+
snapshot, read_callback, this,
571+
expose_blob_index, allow_refresh,
572+
/*allow_mark_memtable_for_flush=*/false);
580573
}
581574

582575
Status DBImplSecondary::NewIterators(

0 commit comments

Comments
 (0)