Skip to content

Commit 659a490

Browse files
jaykoreanfacebook-github-bot
authored andcommitted
Add reverse order support to MultiScan (#14876)
Summary: Add reverse scan support to `MultiScanArgs`/`MultiScan` so callers can iterate bounded scan ranges in descending order. Plumb reverse scan state through `DBIter`, `BlockBasedTableIterator`, and `MultiScanIndexIterator`, including reverse prefetch windowing and async I/O support. Update `db_bench` `multiscanrandom` to honor `--reverse_iterator` and report rows scanned so reverse MultiScan and iterator baselines can be compared on row throughput. Add `db_stress --multiscan_reverse` coverage so the prepared MultiScan path can be stressed with `SeekForPrev()`/`Prev()` against a control iterator. During review I fixed the table child iterator path to accept reverse `SeekForPrev()` targets above a file's prepared ranges. `MergingIterator` and `LevelIterator` can legally seek every child to the current range limit; a child file may only have lower prepared ranges, so treating that as invalid caused intermittent `SeekForPrev target is outside prepared ranges` exceptions in the benchmark. Differential Revision: D109357730
1 parent 30f42e0 commit 659a490

13 files changed

Lines changed: 707 additions & 98 deletions

‎db/db_iter.cc‎

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1921,6 +1921,9 @@ Status DBIter::ValidateScanOptions(const MultiScanArgs& multiscan_opts) const {
19211921

19221922
const std::vector<ScanOptions>& scan_opts = multiscan_opts.GetScanRanges();
19231923
const bool has_limit = scan_opts.front().range.limit.has_value();
1924+
if (multiscan_opts.reverse && !has_limit) {
1925+
return Status::InvalidArgument("Reverse MultiScan requires upper bounds");
1926+
}
19241927
if (!has_limit && scan_opts.size() > 1) {
19251928
return Status::InvalidArgument("Scan has no upper bound");
19261929
}
@@ -1969,7 +1972,7 @@ Status DBIter::SetScanOptionsForPrepare(const MultiScanArgs& scan_opts) {
19691972
std::optional<MultiScanArgs> new_scan_opts;
19701973
new_scan_opts.emplace(scan_opts);
19711974
scan_opts_.swap(new_scan_opts);
1972-
scan_index_ = 0;
1975+
scan_index_ = scan_opts.reverse ? scan_opts.size() - 1 : 0;
19731976

19741977
// Create a shared IODispatcher if not provided. This allows all
19751978
// BlockBasedTableIterators in this scan to share a single dispatcher,
@@ -2004,6 +2007,12 @@ void DBIter::Seek(const Slice& target) {
20042007
StopWatch sw(clock_, statistics_, DB_SEEK);
20052008

20062009
if (scan_opts_.has_value()) {
2010+
if (scan_opts_->reverse) {
2011+
status_ = Status::InvalidArgument("Seek called on reverse MultiScan");
2012+
valid_ = false;
2013+
return;
2014+
}
2015+
20072016
// Validate the seek target is as expected in the previously prepared range
20082017
auto const& scan_ranges = scan_opts_.value().GetScanRanges();
20092018
if (scan_index_ >= scan_ranges.size()) {
@@ -2110,6 +2119,65 @@ void DBIter::SeekForPrev(const Slice& target) {
21102119
PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, clock_);
21112120
StopWatch sw(clock_, statistics_, DB_SEEK);
21122121

2122+
if (scan_opts_.has_value()) {
2123+
if (!scan_opts_->reverse) {
2124+
status_ =
2125+
Status::InvalidArgument("SeekForPrev called on forward MultiScan");
2126+
valid_ = false;
2127+
return;
2128+
}
2129+
2130+
auto const& scan_ranges = scan_opts_.value().GetScanRanges();
2131+
if (scan_index_ >= scan_ranges.size()) {
2132+
status_ = Status::InvalidArgument(
2133+
"SeekForPrev called after exhausting all of the scan ranges");
2134+
valid_ = false;
2135+
return;
2136+
}
2137+
2138+
auto const& range = scan_ranges[scan_index_];
2139+
auto const& limit = range.range.limit;
2140+
assert(limit.has_value());
2141+
if (!limit.has_value() ||
2142+
user_comparator_.CompareWithoutTimestamp(target, *limit) != 0) {
2143+
status_ = Status::InvalidArgument(
2144+
"SeekForPrev target does not match the limit of the next prepared "
2145+
"range at index " +
2146+
std::to_string(scan_index_));
2147+
valid_ = false;
2148+
return;
2149+
}
2150+
2151+
auto const& start = range.range.start;
2152+
assert(start.has_value());
2153+
if (iterate_lower_bound_ == nullptr ||
2154+
user_comparator_.CompareWithoutTimestamp(start.value(),
2155+
*iterate_lower_bound_) != 0) {
2156+
status_ = Status::InvalidArgument(
2157+
"Lower bound is not set to the same start value of the next "
2158+
"prepared range at index " +
2159+
std::to_string(scan_index_));
2160+
valid_ = false;
2161+
return;
2162+
}
2163+
if (iterate_upper_bound_ == nullptr ||
2164+
user_comparator_.CompareWithoutTimestamp(limit.value(),
2165+
*iterate_upper_bound_) != 0) {
2166+
status_ = Status::InvalidArgument(
2167+
"Upper bound is not set to the same limit value of the next "
2168+
"prepared range at index " +
2169+
std::to_string(scan_index_));
2170+
valid_ = false;
2171+
return;
2172+
}
2173+
2174+
if (scan_index_ == 0) {
2175+
scan_index_ = scan_ranges.size();
2176+
} else {
2177+
--scan_index_;
2178+
}
2179+
}
2180+
21132181
if (has_trace_state_) {
21142182
// TODO: What do we do if this returns an error?
21152183
Slice lower_bound, upper_bound;

‎db/db_iterator_test.cc‎

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5418,6 +5418,91 @@ TEST_P(DBMultiScanIteratorTest, AsyncPrefetchAcrossMultipleFiles) {
54185418
iter.reset();
54195419
}
54205420

5421+
TEST_P(DBMultiScanIteratorTest, ReversePrefetchAcrossMultipleRanges) {
5422+
auto options = CurrentOptions();
5423+
options.target_file_size_base = 1 << 15; // 32KiB
5424+
options.compaction_style = kCompactionStyleUniversal;
5425+
options.num_levels = 50;
5426+
options.compression = kNoCompression;
5427+
DestroyAndReopen(options);
5428+
5429+
Random rnd(303);
5430+
for (int i = 0; i < 1000; ++i) {
5431+
ASSERT_OK(Put(Key(i), rnd.RandomString(1 << 10)));
5432+
}
5433+
ASSERT_OK(Flush());
5434+
ASSERT_OK(db_->CompactRange({}, nullptr, nullptr));
5435+
ASSERT_GT(NumTableFilesAtLevel(49), 3);
5436+
5437+
ReadOptions ro;
5438+
ro.fill_cache = GetParam();
5439+
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
5440+
auto tracking_dispatcher = std::make_shared<TrackingIODispatcher>();
5441+
5442+
MultiScanArgs scan_options(BytewiseComparator());
5443+
scan_options.use_async_io = false;
5444+
scan_options.reverse = true;
5445+
scan_options.io_dispatcher = tracking_dispatcher;
5446+
std::vector<std::string> key_ranges(
5447+
{Key(100), Key(200), Key(500), Key(600), Key(800), Key(900)});
5448+
scan_options.insert(key_ranges[0], key_ranges[1]);
5449+
scan_options.insert(key_ranges[2], key_ranges[3]);
5450+
scan_options.insert(key_ranges[4], key_ranges[5]);
5451+
5452+
std::unique_ptr<MultiScan> iter =
5453+
dbfull()->NewMultiScan(ro, cfh, scan_options);
5454+
ASSERT_NE(iter, nullptr);
5455+
5456+
std::vector<std::string> actual_keys;
5457+
try {
5458+
for (auto range : *iter) {
5459+
for (auto it : range) {
5460+
actual_keys.push_back(it.first.ToString());
5461+
}
5462+
}
5463+
} catch (MultiScanException& ex) {
5464+
FAIL() << "Iterator returned status " << ex.what();
5465+
} catch (std::logic_error& ex) {
5466+
FAIL() << "Iterator returned logic error " << ex.what();
5467+
}
5468+
5469+
std::vector<std::string> expected_keys;
5470+
for (int i = 899; i >= 800; --i) {
5471+
expected_keys.push_back(Key(i));
5472+
}
5473+
for (int i = 599; i >= 500; --i) {
5474+
expected_keys.push_back(Key(i));
5475+
}
5476+
for (int i = 199; i >= 100; --i) {
5477+
expected_keys.push_back(Key(i));
5478+
}
5479+
ASSERT_EQ(actual_keys, expected_keys);
5480+
ASSERT_GT(tracking_dispatcher->GetReadSets().size(), 0);
5481+
}
5482+
5483+
TEST_P(DBMultiScanIteratorTest, ReverseRequiresUpperBounds) {
5484+
auto options = CurrentOptions();
5485+
options.compression = kNoCompression;
5486+
DestroyAndReopen(options);
5487+
5488+
ASSERT_OK(Put("a", "va"));
5489+
ASSERT_OK(Put("b", "vb"));
5490+
ASSERT_OK(Flush());
5491+
5492+
ReadOptions ro;
5493+
ro.fill_cache = GetParam();
5494+
ColumnFamilyHandle* cfh = dbfull()->DefaultColumnFamily();
5495+
5496+
MultiScanArgs scan_options(BytewiseComparator());
5497+
scan_options.reverse = true;
5498+
scan_options.insert("a");
5499+
5500+
std::unique_ptr<MultiScan> iter =
5501+
dbfull()->NewMultiScan(ro, cfh, scan_options);
5502+
ASSERT_NE(iter, nullptr);
5503+
ASSERT_THROW({ (void)iter->begin(); }, MultiScanException);
5504+
}
5505+
54215506
// Wrapper filesystem that does not support async IO.
54225507
// Used to verify that MultiScan gracefully falls back to sync IO.
54235508
class NoAsyncIOFS : public FileSystemWrapper {

‎db/multi_scan.cc‎

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,27 @@ MultiScan::MultiScan(const ReadOptions& read_options,
2929
db_(db),
3030
cfh_(cfh) {
3131
bool slow_path = false;
32-
// Setup read_options with iterate_uuper_bound based on the first scan.
33-
// Subsequent scans will update and allocate a new DB iterator as necessary
34-
if (scan_opts_.GetScanRanges()[0].range.limit) {
35-
upper_bound_ = *scan_opts_.GetScanRanges()[0].range.limit;
36-
read_options_.iterate_upper_bound = &upper_bound_;
37-
} else {
38-
read_options_.iterate_upper_bound = nullptr;
32+
// Setup read_options bounds based on the first scan. Subsequent scans will
33+
// update the bounds and allocate a new DB iterator as necessary.
34+
if (!scan_opts_.GetScanRanges().empty()) {
35+
const size_t first_scan_idx =
36+
scan_opts_.reverse ? scan_opts_.GetScanRanges().size() - 1 : 0;
37+
const ScanOptions& first_scan = scan_opts_.GetScanRanges()[first_scan_idx];
38+
if (scan_opts_.reverse) {
39+
lower_bound_ = *first_scan.range.start;
40+
read_options_.iterate_lower_bound = &lower_bound_;
41+
if (first_scan.range.limit) {
42+
upper_bound_ = *first_scan.range.limit;
43+
read_options_.iterate_upper_bound = &upper_bound_;
44+
} else {
45+
read_options_.iterate_upper_bound = nullptr;
46+
}
47+
} else if (first_scan.range.limit) {
48+
upper_bound_ = *first_scan.range.limit;
49+
read_options_.iterate_upper_bound = &upper_bound_;
50+
} else {
51+
read_options_.iterate_upper_bound = nullptr;
52+
}
3953
}
4054
for (const auto& opts : scan_opts_.GetScanRanges()) {
4155
// Check that all the ScanOptions either specify an upper bound or not. If
@@ -54,17 +68,49 @@ MultiScan::MultiScan(const ReadOptions& read_options,
5468
}
5569
}
5670

71+
void MultiScanIterator::SeekCurrentRange() {
72+
const ScanOptions& scan_opt = scan_opts_[idx_];
73+
if (scan_opt.range.limit) {
74+
*upper_bound_ = *scan_opt.range.limit;
75+
read_options_.iterate_upper_bound = upper_bound_;
76+
} else {
77+
read_options_.iterate_upper_bound = nullptr;
78+
}
79+
80+
if (reverse_) {
81+
*lower_bound_ = *scan_opt.range.start;
82+
read_options_.iterate_lower_bound = lower_bound_;
83+
assert(scan_opt.range.limit.has_value());
84+
db_iter_->SeekForPrev(*scan_opt.range.limit);
85+
} else {
86+
db_iter_->Seek(*scan_opt.range.start);
87+
}
88+
}
89+
5790
MultiScanIterator& MultiScanIterator::operator++() {
5891
status_ = db_iter_->status();
5992
if (!status_.ok()) {
6093
throw MultiScanException(status_);
6194
}
6295

63-
if (idx_ >= scan_opts_.size()) {
96+
if (!valid_) {
6497
throw std::logic_error("Index out of range");
6598
}
66-
idx_++;
67-
if (idx_ < scan_opts_.size()) {
99+
if (reverse_) {
100+
if (idx_ == 0) {
101+
valid_ = false;
102+
return *this;
103+
}
104+
--idx_;
105+
} else {
106+
++idx_;
107+
if (idx_ >= scan_opts_.size()) {
108+
valid_ = false;
109+
return *this;
110+
}
111+
}
112+
113+
if (valid_) {
68114
// Check if we need to update read_options_
69115
if (scan_opts_[idx_].range.limit.has_value() !=
70116
(read_options_.iterate_upper_bound != nullptr)) {
@@ -75,11 +121,11 @@ MultiScanIterator& MultiScanIterator::operator++() {
75121
read_options_.iterate_upper_bound = nullptr;
76122
}
77123
db_iter_.reset(db_->NewIterator(read_options_, cfh_));
78-
scan_.Reset(db_iter_.get());
124+
scan_.Reset(db_iter_.get(), reverse_);
79125
} else if (scan_opts_[idx_].range.limit) {
80126
*upper_bound_ = *scan_opts_[idx_].range.limit;
81127
}
82-
db_iter_->Seek(*scan_opts_[idx_].range.start);
128+
SeekCurrentRange();
83129
status_ = db_iter_->status();
84130
if (!status_.ok()) {
85131
throw MultiScanException(status_);

‎db_stress_tool/db_stress_common.h‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,7 @@ DECLARE_uint32(min_tombstones_for_range_conversion);
468468
DECLARE_uint32(ingest_wbwi_one_in);
469469
DECLARE_bool(universal_reduce_file_locking);
470470
DECLARE_bool(use_multiscan);
471+
DECLARE_bool(multiscan_reverse);
471472
DECLARE_bool(multiscan_use_async_io);
472473
DECLARE_bool(read_scoped_block_buffer_provider);
473474
DECLARE_uint64(multiscan_max_prefetch_memory_bytes);

‎db_stress_tool/db_stress_gflags.cc‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1707,6 +1707,10 @@ DEFINE_bool(
17071707
DEFINE_bool(use_multiscan, false,
17081708
"If set, use the batched MultiScan API for scans.");
17091709

1710+
DEFINE_bool(multiscan_reverse, false,
1711+
"If set with use_multiscan, run MultiScan ranges in reverse "
1712+
"order using SeekForPrev and Prev.");
1713+
17101714
DEFINE_bool(multiscan_use_async_io, false,
17111715
"If set, enable async_io for MultiScan operations.");
17121716

0 commit comments

Comments
 (0)