Skip to content

Commit ff9333f

Browse files
authored
Merge pull request #26006 from WillemKauf/manual-backport-25932-v24.3.x-5
[v24.3.x] `storage`: decompress batch during `batch_timequery()` (Manual backport)
2 parents b49e2dc + 6705abb commit ff9333f

File tree

6 files changed

+67
-69
lines changed

6 files changed

+67
-69
lines changed

src/v/cloud_storage/remote_partition.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,16 +1280,17 @@ remote_partition::timequery(storage::timequery_config cfg) {
12801280
auto translating_reader = co_await make_reader(config);
12811281

12821282
// Read one batch from the reader to learn the offset
1283-
model::record_batch_reader::storage_t data
1284-
= co_await model::consume_reader_to_memory(
1285-
std::move(translating_reader.reader), model::no_timeout);
1283+
auto batches = co_await model::consume_reader_to_memory(
1284+
std::move(translating_reader.reader), model::no_timeout);
12861285

1287-
auto& batches = std::get<model::record_batch_reader::data_t>(data);
12881286
vlog(_ctxlog.debug, "timequery: {} batches", batches.size());
12891287

12901288
if (batches.size()) {
1291-
co_return storage::batch_timequery(
1292-
*(batches.begin()), cfg.min_offset, cfg.time, cfg.max_offset);
1289+
co_return co_await storage::batch_timequery(
1290+
std::move(*(batches.begin())),
1291+
cfg.min_offset,
1292+
cfg.time,
1293+
cfg.max_offset);
12931294
} else {
12941295
co_return std::nullopt;
12951296
}

src/v/kafka/server/tests/list_offsets_test.cc

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,8 @@ FIXTURE_TEST(list_offsets_by_time, redpanda_thread_fixture) {
222222
// Arbitrary synthetic timestamp for start of produce
223223
auto base_timestamp = 100000;
224224

225-
for (long i = 0; i < batch_count; ++i) {
226-
// Mixture of compressed and uncompressed, they have distinct offset
227-
// lookup behavior when searching by timequery, which will be
228-
// validated
225+
for (size_t i = 0; i < batch_count; ++i) {
226+
// Mixture of compressed and uncompressed batches.
229227
bool compressed = i % 3 == 0;
230228
batches.push_back(make_random_batch(model::test::record_batch_spec{
231229
// after queries below.
@@ -279,30 +277,17 @@ FIXTURE_TEST(list_offsets_by_time, redpanda_thread_fixture) {
279277
}},
280278
});
281279

282-
const auto& batch = batches[i];
283280
auto resp_midbatch
284281
= client.dispatch(std::move(req2), kafka::api_version(1)).get();
285282
BOOST_REQUIRE_EQUAL(resp_midbatch.data.topics.size(), 1);
286283
BOOST_REQUIRE_EQUAL(resp_midbatch.data.topics[0].partitions.size(), 1);
287-
if (batch.compressed()) {
288-
// Compressed batch: result will point to start of batch, slightly
289-
// earlier than the query timestamp
290-
BOOST_CHECK(
291-
resp_midbatch.data.topics[0].partitions[0].timestamp
292-
== model::timestamp(base_timestamp + i * record_count));
293-
BOOST_CHECK(
294-
resp_midbatch.data.topics[0].partitions[0].offset
295-
== model::offset(i * record_count));
296-
} else {
297-
// Uncompressed batch: result should have seeked to correct record
298-
BOOST_CHECK(
299-
resp_midbatch.data.topics[0].partitions[0].timestamp
300-
== model::timestamp(
301-
base_timestamp + i * record_count + record_offset));
302-
BOOST_CHECK(
303-
resp_midbatch.data.topics[0].partitions[0].offset
304-
== model::offset(i * record_count + record_offset));
305-
}
284+
BOOST_CHECK(
285+
resp_midbatch.data.topics[0].partitions[0].timestamp
286+
== model::timestamp(
287+
base_timestamp + i * record_count + record_offset));
288+
BOOST_CHECK(
289+
resp_midbatch.data.topics[0].partitions[0].offset
290+
== model::offset(i * record_count + record_offset));
306291
}
307292

308293
client.stop().then([&client] { client.shutdown(); }).get();

src/v/model/record.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "serde/rw/iobuf.h"
2626
#include "serde/rw/rw.h"
2727

28+
#include <seastar/core/loop.hh>
2829
#include <seastar/core/smp.hh>
2930
#include <seastar/util/optimized_optional.hh>
3031

@@ -871,7 +872,18 @@ class record_batch
871872
ss::future<> for_each_record_async(Func f) const {
872873
auto it = record_batch_iterator::create(*this);
873874
while (it.has_next()) {
874-
co_await ss::futurize_invoke(f, it.next());
875+
if constexpr (std::is_same_v<
876+
ss::futurize_t<
877+
std::invoke_result_t<Func, model::record>>,
878+
ss::future<ss::stop_iteration>>) {
879+
ss::stop_iteration s = co_await ss::futurize_invoke(
880+
f, it.next());
881+
if (s == ss::stop_iteration::yes) {
882+
co_return;
883+
}
884+
} else {
885+
co_await ss::futurize_invoke(f, it.next());
886+
}
875887
}
876888
}
877889

src/v/storage/disk_log_impl.cc

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2572,23 +2572,20 @@ ss::future<std::optional<timequery_result>>
25722572
disk_log_impl::timequery(timequery_config cfg) {
25732573
vassert(!_closed, "timequery on closed log - {}", *this);
25742574
if (_segs.empty()) {
2575-
return ss::make_ready_future<std::optional<timequery_result>>();
2576-
}
2577-
return make_reader(cfg).then([cfg](model::record_batch_reader reader) {
2578-
return model::consume_reader_to_memory(
2579-
std::move(reader), model::no_timeout)
2580-
.then([cfg](model::record_batch_reader::storage_t st) {
2581-
using ret_t = std::optional<timequery_result>;
2582-
auto& batches = std::get<model::record_batch_reader::data_t>(st);
2583-
if (
2584-
!batches.empty()
2585-
&& batches.front().header().max_timestamp >= cfg.time) {
2586-
return ret_t(batch_timequery(
2587-
batches.front(), cfg.min_offset, cfg.time, cfg.max_offset));
2588-
}
2589-
return ret_t();
2590-
});
2591-
});
2575+
co_return std::nullopt;
2576+
}
2577+
2578+
auto reader = co_await make_reader(cfg);
2579+
auto batches = co_await model::consume_reader_to_memory(
2580+
std::move(reader), model::no_timeout);
2581+
2582+
if (
2583+
!batches.empty() && batches.front().header().max_timestamp >= cfg.time) {
2584+
co_return co_await batch_timequery(
2585+
std::move(batches.front()), cfg.min_offset, cfg.time, cfg.max_offset);
2586+
}
2587+
2588+
co_return std::nullopt;
25922589
}
25932590

25942591
ss::future<> disk_log_impl::remove_segment_permanently(

src/v/storage/log_reader.cc

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "storage/logger.h"
1919
#include "storage/offset_translator_state.h"
2020
#include "storage/parser_errc.h"
21+
#include "storage/parser_utils.h"
2122
#include "storage/segment_set.h"
2223
#include "storage/types.h"
2324

@@ -581,8 +582,8 @@ bool log_reader::is_done() {
581582
|| is_finished_offset(_lease->range, _config.start_offset);
582583
}
583584

584-
timequery_result batch_timequery(
585-
const model::record_batch& b,
585+
ss::future<timequery_result> batch_timequery(
586+
model::record_batch b,
586587
model::offset min_offset,
587588
model::timestamp t,
588589
model::offset max_offset) {
@@ -595,23 +596,25 @@ timequery_result batch_timequery(
595596
// records in the batch have different timestamps.
596597
model::offset result_o = b.base_offset();
597598
model::timestamp result_t = b.header().first_timestamp;
598-
if (!b.compressed()) {
599-
b.for_each_record(
600-
[&result_o, &result_t, &b, query_interval, t](
601-
const model::record& r) -> ss::stop_iteration {
602-
auto record_o = model::offset{r.offset_delta()} + b.base_offset();
603-
auto record_t = model::timestamp(
604-
b.header().first_timestamp() + r.timestamp_delta());
605-
if (record_t >= t && query_interval.contains(record_o)) {
606-
result_o = record_o;
607-
result_t = record_t;
608-
return ss::stop_iteration::yes;
609-
} else {
610-
return ss::stop_iteration::no;
611-
}
612-
});
613-
}
614-
return {result_o, result_t};
599+
auto batch = co_await internal::decompress_batch(std::move(b));
600+
co_await batch.for_each_record_async(
601+
[&result_o, &result_t, &batch, query_interval, t](
602+
const model::record& r) -> ss::future<ss::stop_iteration> {
603+
auto record_o = model::offset{r.offset_delta()} + batch.base_offset();
604+
auto record_t = model::timestamp(
605+
batch.header().first_timestamp() + r.timestamp_delta());
606+
if (record_t >= t && query_interval.contains(record_o)) {
607+
result_o = record_o;
608+
result_t = record_t;
609+
return ss::make_ready_future<ss::stop_iteration>(
610+
ss::stop_iteration::yes);
611+
} else {
612+
return ss::make_ready_future<ss::stop_iteration>(
613+
ss::stop_iteration::no);
614+
}
615+
});
616+
617+
co_return timequery_result{result_o, result_t};
615618
}
616619

617620
} // namespace storage

src/v/storage/log_reader.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,8 @@ class log_reader final : public model::record_batch_reader::impl {
297297
* \param t The timestamp to search for
298298
* \param max_offset The maximum offset to consider
299299
*/
300-
timequery_result batch_timequery(
301-
const model::record_batch& b,
300+
ss::future<timequery_result> batch_timequery(
301+
model::record_batch b,
302302
model::offset min_offset,
303303
model::timestamp t,
304304
model::offset max_offset);

0 commit comments

Comments
 (0)