Skip to content

Commit bfbd1d3

Browse files
authored
Merge pull request #29304 from pgellert/fix/fetch-session-partition-include
CORE-14617 kafka: fix fetch session retry losing partition inclusion
2 parents a5c7a0d + b7c6970 commit bfbd1d3

File tree

3 files changed

+163
-29
lines changed

3 files changed

+163
-29
lines changed

src/v/kafka/server/handlers/fetch.cc

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1569,45 +1569,52 @@ void op_context::create_response_placeholders() {
15691569
}
15701570
}
15711571

1572-
bool update_fetch_partition(
1572+
// Determines if a partition should be included in an incremental fetch
1573+
// response per KIP-227.
1574+
bool partition_has_changes(
15731575
const fetch_response::partition_response& resp,
1574-
fetch_session_partition& partition) {
1575-
bool include = false;
1576+
const fetch_session_partition& session_partition) {
15761577
if (resp.records && resp.records->size_bytes() > 0) {
1577-
// Partitions with new data are always included in the response.
1578-
include = true;
1578+
return true;
15791579
}
1580-
if (partition.high_watermark != resp.high_watermark) {
1581-
include = true;
1582-
partition.high_watermark = model::offset(resp.high_watermark);
1580+
if (session_partition.high_watermark != resp.high_watermark) {
1581+
return true;
15831582
}
1584-
if (partition.last_stable_offset != resp.last_stable_offset) {
1585-
include = true;
1586-
partition.last_stable_offset = model::offset(resp.last_stable_offset);
1583+
if (session_partition.last_stable_offset != resp.last_stable_offset) {
1584+
return true;
15871585
}
1588-
if (partition.start_offset != resp.log_start_offset) {
1589-
include = true;
1590-
partition.start_offset = model::offset(resp.log_start_offset);
1586+
if (session_partition.start_offset != resp.log_start_offset) {
1587+
return true;
15911588
}
15921589
/**
15931590
* Always include partition in a response if it contains information about
15941591
* the preferred replica
15951592
*/
15961593
if (resp.preferred_read_replica != -1) {
1597-
include = true;
1598-
}
1599-
if (include) {
1600-
return include;
1594+
return true;
16011595
}
16021596
if (resp.error_code != error_code::none) {
16031597
// Partitions with errors are always included in the response.
1604-
// We also set the cached highWatermark to an invalid offset, -1.
1605-
// This ensures that when the error goes away, we re-send the
1606-
// partition.
1607-
partition.high_watermark = model::offset{-1};
1608-
include = true;
1598+
return true;
1599+
}
1600+
return false;
1601+
}
1602+
1603+
// Updates the fetch session's partition with the response. Called in
1604+
// send_response() when committing the response, not during fetch iteration (to
1605+
// avoid premature updates on retries).
1606+
void update_session_partition(
1607+
const fetch_response::partition_response& resp,
1608+
fetch_session_partition& session_partition) {
1609+
session_partition.high_watermark = model::offset(resp.high_watermark);
1610+
session_partition.last_stable_offset = model::offset(
1611+
resp.last_stable_offset);
1612+
session_partition.start_offset = model::offset(resp.log_start_offset);
1613+
if (resp.error_code != error_code::none) {
1614+
// Set high_watermark to -1 so we re-send this partition once the error
1615+
// clears.
1616+
session_partition.high_watermark = model::offset{-1};
16091617
}
1610-
return include;
16111618
}
16121619

16131620
ss::future<response_ptr> op_context::send_response() && {
@@ -1641,7 +1648,24 @@ ss::future<response_ptr> op_context::send_response() && {
16411648
}
16421649
// bellow we handle incremental fetches, set response session id
16431650
response.data.session_id = session_ctx.session()->id();
1651+
1652+
auto& session_partitions = session_ctx.session()->partitions();
1653+
auto update_session = [&session_partitions](const auto& resp_it) {
1654+
auto key = model::kitp_view(
1655+
resp_it->partition->topic_id,
1656+
resp_it->partition->topic,
1657+
resp_it->partition_response->partition_index);
1658+
if (auto sp_it = session_partitions.find(key);
1659+
sp_it != session_partitions.end()) {
1660+
update_session_partition(
1661+
*resp_it->partition_response, sp_it->second->partition);
1662+
}
1663+
};
1664+
16441665
if (session_ctx.is_full_fetch()) {
1666+
for (auto it = response.begin(false); it != response.end(); ++it) {
1667+
update_session(it);
1668+
}
16451669
return rctx.respond(std::move(response));
16461670
}
16471671

@@ -1651,6 +1675,8 @@ ss::future<response_ptr> op_context::send_response() && {
16511675
final_response.internal_topic_bytes = response.internal_topic_bytes;
16521676

16531677
for (auto it = response.begin(true); it != response.end(); ++it) {
1678+
update_session(it);
1679+
16541680
if (it->is_new_topic) {
16551681
final_response.data.responses.emplace_back(
16561682
fetchable_topic_response{
@@ -1762,7 +1788,7 @@ void op_context::response_placeholder::set(
17621788

17631789
if (auto it = session_partitions.find(key);
17641790
it != session_partitions.end()) {
1765-
auto has_to_be_included = update_fetch_partition(
1791+
auto has_to_be_included = partition_has_changes(
17661792
*_it->partition_response, it->second->partition);
17671793
/**
17681794
* From KIP-227

src/v/kafka/server/tests/fetch_test.cc

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,3 +1209,113 @@ FIXTURE_TEST(fetch_response_bytes_eq_units, redpanda_thread_fixture) {
12091209
BOOST_REQUIRE(octx.response_size > 0);
12101210
BOOST_REQUIRE(octx.response_size == octx.total_response_memory_units());
12111211
}
1212+
1213+
// Regression test for CORE-14617: When a fetch is retried internally (due to
1214+
// min_bytes not being satisfied), partitions with changed metadata (like
1215+
// log_start_offset) must still be included in the final response.
1216+
FIXTURE_TEST(
1217+
fetch_session_propagates_log_start_offset, redpanda_thread_fixture) {
1218+
model::topic topic("foo");
1219+
model::partition_id pid(0);
1220+
auto ntp = make_default_ntp(topic, pid);
1221+
1222+
wait_for_controller_leadership().get();
1223+
add_topic(model::topic_namespace_view(ntp)).get();
1224+
wait_for_partition_offset(ntp, model::offset(0)).get();
1225+
1226+
// Produce some data
1227+
auto shard = app.shard_table.local().shard_for(ntp);
1228+
app.partition_manager
1229+
.invoke_on(
1230+
*shard,
1231+
[ntp](cluster::partition_manager& mgr) {
1232+
return model::test::make_random_batches(model::offset(0), 20)
1233+
.then([ntp, &mgr](auto batches) {
1234+
auto partition = mgr.get(ntp);
1235+
return partition->raft()->replicate(
1236+
chunked_vector<model::record_batch>(
1237+
std::from_range,
1238+
std::move(batches) | std::views::as_rvalue),
1239+
raft::replicate_options(
1240+
raft::consistency_level::quorum_ack));
1241+
});
1242+
})
1243+
.get();
1244+
1245+
auto client = make_kafka_client().get();
1246+
client.connect().get();
1247+
1248+
// Full fetch to establish session (session_epoch=0, invalid session_id)
1249+
kafka::fetch_request req1;
1250+
req1.data.max_bytes = std::numeric_limits<int32_t>::max();
1251+
req1.data.min_bytes = 1;
1252+
req1.data.max_wait_ms = 1000ms;
1253+
req1.data.session_id = kafka::invalid_fetch_session_id;
1254+
req1.data.session_epoch = kafka::initial_fetch_session_epoch;
1255+
req1.data.topics.emplace_back(
1256+
kafka::fetch_topic{
1257+
.topic = topic,
1258+
.partitions = {{
1259+
.partition = pid,
1260+
.fetch_offset = model::offset(5),
1261+
}},
1262+
});
1263+
1264+
auto resp1 = client.dispatch(std::move(req1), kafka::api_version(12)).get();
1265+
BOOST_REQUIRE_EQUAL(resp1.data.responses.size(), 1);
1266+
BOOST_REQUIRE_EQUAL(
1267+
resp1.data.responses[0].partitions[0].error_code,
1268+
kafka::error_code::none);
1269+
BOOST_REQUIRE_NE(resp1.data.session_id, kafka::invalid_fetch_session_id);
1270+
1271+
auto session_id = resp1.data.session_id;
1272+
auto initial_log_start
1273+
= resp1.data.responses[0].partitions[0].log_start_offset;
1274+
1275+
// Prefix truncate to change log_start_offset
1276+
auto trunc_err = app.partition_manager
1277+
.invoke_on(
1278+
*shard,
1279+
[ntp](cluster::partition_manager& mgr) {
1280+
auto partition = mgr.get(ntp);
1281+
auto k_trunc_offset = kafka::offset(5);
1282+
auto rp_trunc_offset
1283+
= partition->log()->to_log_offset(
1284+
model::offset(k_trunc_offset));
1285+
return partition->prefix_truncate(
1286+
rp_trunc_offset,
1287+
k_trunc_offset,
1288+
ss::lowres_clock::time_point::max());
1289+
})
1290+
.get();
1291+
BOOST_REQUIRE(!trunc_err);
1292+
1293+
// Incremental fetch with min_bytes=1 - will retry internally waiting for
1294+
// data, but should still include the partition due to log_start_offset
1295+
// change even if no new data arrives during the retry window.
1296+
kafka::fetch_request req2;
1297+
req2.data.max_bytes = std::numeric_limits<int32_t>::max();
1298+
req2.data.min_bytes = 1;
1299+
req2.data.max_wait_ms = 1000ms;
1300+
req2.data.session_id = session_id;
1301+
req2.data.session_epoch = kafka::fetch_session_epoch(1);
1302+
req2.data.topics.emplace_back(
1303+
kafka::fetch_topic{
1304+
.topic = topic,
1305+
.partitions = {{
1306+
.partition = pid,
1307+
.fetch_offset = model::offset(20),
1308+
}},
1309+
});
1310+
1311+
auto resp2 = client.dispatch(std::move(req2), kafka::api_version(12)).get();
1312+
client.stop().then([&client] { client.shutdown(); }).get();
1313+
1314+
// The partition must be included even though no new data arrived, because
1315+
// log_start_offset changed. This is the regression test for CORE-14617.
1316+
BOOST_REQUIRE_EQUAL(resp2.data.responses.size(), 1);
1317+
BOOST_REQUIRE_EQUAL(resp2.data.responses[0].partitions.size(), 1);
1318+
auto new_log_start = resp2.data.responses[0].partitions[0].log_start_offset;
1319+
BOOST_REQUIRE_GT(new_log_start, initial_log_start);
1320+
BOOST_REQUIRE_EQUAL(new_log_start, model::offset(5));
1321+
}

tests/rptest/tests/cluster_linking_e2e_test.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1665,10 +1665,8 @@ def _check_partitions_match(
16651665
self.logger.debug(
16661666
f"Partition {partition_id}: source hwm={hwm}, shadow_hwm{p_info.source_high_watermark}, last_update={p_info.source_last_updated_timestamp}"
16671667
)
1668-
# TODO: Re-enable once CORE-14617 is addressed
1669-
# TODO: CORE-14653
1670-
# if p_info.source_high_watermark != hwm:
1671-
# return False
1668+
if p_info.source_high_watermark != hwm:
1669+
return False
16721670
return True
16731671

16741672
def _fetch_shadow_topic_and_compare_results(

0 commit comments

Comments
 (0)