@@ -37,7 +37,7 @@ namespace storage {
37
37
using stop_parser = batch_consumer::stop_parser;
38
38
// make sure that `msg` parameter is a static string or it is not removed before
39
39
// this function finishes
40
- static ss::future<result<iobuf>> verify_read_iobuf (
40
+ static ss::future<result<iobuf, parser_errc >> verify_read_iobuf (
41
41
ss::input_stream<char >& in,
42
42
size_t expected,
43
43
const char * msg,
@@ -204,15 +204,18 @@ ss::future<result<stop_parser>> continuous_batch_parser::consume_records() {
204
204
auto sz = _header->size_bytes - model::packed_record_batch_header_size;
205
205
return verify_read_iobuf (
206
206
get_stream (), sz, " parser::consume_records" , _recovery)
207
- .then ([this ](result<iobuf> record) -> ss::future<result<stop_parser>> {
208
- if (!record) {
209
- return ss::make_ready_future<result<stop_parser>>(record.error ());
210
- }
211
- _consumer->consume_records (std::move (record.value ()));
212
- return _consumer->consume_batch_end ().then ([](stop_parser sp) {
213
- return ss::make_ready_future<result<stop_parser>>(sp);
214
- });
215
- });
207
+ .then (
208
+ [this ](result<iobuf, parser_errc> record)
209
+ -> ss::future<result<stop_parser>> {
210
+ if (!record) {
211
+ return ss::make_ready_future<result<stop_parser>>(
212
+ record.error ());
213
+ }
214
+ _consumer->consume_records (std::move (record.value ()));
215
+ return _consumer->consume_batch_end ().then ([](stop_parser sp) {
216
+ return ss::make_ready_future<result<stop_parser>>(sp);
217
+ });
218
+ });
216
219
}
217
220
218
221
static constexpr std::array<parser_errc, 3 > benign_error_codes{
0 commit comments