ct/rr: plumb components into redpanda#29658
Open
andrwng wants to merge 15 commits intoredpanda-data:devfrom
Open
ct/rr: plumb components into redpanda#29658andrwng wants to merge 15 commits intoredpanda-data:devfrom
andrwng wants to merge 15 commits intoredpanda-data:devfrom
Conversation
The shorthand constructor for cloud topics is nice to use -- allow passing the test fixture configs to give cloud topic test authors the control to enable different features.
Some upcoming ducktape tests will need to spin up a RRR configured with a non-existend bucket, and there's an expectation in those tests will be that nothing in that cluster will access S3 for that cluster's bucket (the read replica topics will instead access the source cluster's bucket). To that end, we will want to disable the metastore flush and L0 GC on the RRR cluster.
Adds an STM specific for cloud topic read replicas that manages the metadata required to serve the synchronous part of the Kafka API (e.g. non-futurized bits of partition_proxy_impl). In addition to Kafka-related metadata, it also keeps track of: - the domain UUID that serves this partition, kept as an optimization to avoid having to rediscover it every time we need to access the metastore - the sequence number of the database at the time of collecting the Kafka-related metadata, so that if needed, callers of the Kafka API can know to get a newer view of the database The implementation is fairly boilerplate. This STM isn't plugged in anywhere yet.
Adds the snapshot_metastore, which wraps an lsm::snapshot to serve the l1::metastore interface. It uses the logic in l1::state_reader to perform queries, using logic pretty similar to db_domain_manager, but with slightly different error handling since we're returning metastore::errc instead of RPC errors. The metastore owns a gate holder and an lsm::snapshot for lifetime safety, with the expectation that the gate holder ensures that we don't destruct the database while we're still using the snapshot. For all write operations and certain read operations that are not required by the Kafka API, this returns an error.
Adds partition_metadata structure to hold various metadata for a read replica partition. The expectation is that this is going to be cached per partition and used to query the metastore with some bounded staleness.
Adds the snapshot_manager which implements the new snapshot_provider interface that returns snapshot_metastores with some bounded staleness. In the background, the snapshot manager maintains a database_refresher per domain, which periodically refreshes each database from object storage. There are a few staleness bounds that are expected to be used: - when a partition becomes managed on a shard, we want snapshots returned by the snapshot manager for this partition to be later than those used by previous shards - when a leader becomes managed by a shard, we want snapshots returned by the snapshot manager for this partition to be later than those used by previous leaders - callers can provide a sequence number and wait to ensure that the returned snapshot is at or above it, ensuring that e.g. we can see a later snapshot than was used to replicate the read_replica STM state The former two are expected to be implemented by passing in a timestamp from the partition_metadata. The latter is expected to be implemented by passing in the sequence number from the STM state. This class is wrapped in an interface to allow plumbing through cluster::partition (similar to the state_accessors for vanilla cloud topics) with minimal dependencies.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR integrates read replica components for cloud topics into the Redpanda system. It builds on PR #29657 to enable cloud topics to function as read replicas, allowing clusters to consume data from a source cluster's cloud storage using the cloud topics metastore (LSM-based) rather than traditional tiered storage manifests.
Changes:
- Enables cloud topics to be read replicas by removing the mutual exclusion between these two modes
- Adds read replica STM, state refresh loop, snapshot management, and metadata management components
- Integrates read replica subsystem into partition manager and application lifecycle
- Adds comprehensive test coverage for both tiered storage and cloud topics read replica modes
Reviewed changes
Copilot reviewed 54 out of 54 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| tests/rptest/tests/*.py | Extended test coverage to support both TIERED_STORAGE and CLOUD_TOPICS read replica modes |
| tests/rptest/context/cloud_storage.py | Added ReadReplicaSourceMode enum and helper function |
| src/v/storage/ntp_config.h | Removed early return preventing cloud topics from being read replicas |
| src/v/model/record_batch_types.h | Added read_replica_stm batch type |
| src/v/cluster/partition*.* | Added read replica STM accessor and updated archiver/metrics logic |
| src/v/kafka/data/partition_proxy.cc | Added read replica partition proxy creation path |
| src/v/kafka/data/cloud_topic_read_replica.* | New implementation of partition_proxy for read replica cloud topics |
| src/v/config/configuration.* | Added test-only flags for disabling metastore flush and level zero GC |
| src/v/redpanda/application*.* | Wired up read replica components and configuration |
| src/v/cloud_topics/app.* | Constructed and started read replica services (snapshot manager, metadata manager, accessors) |
| src/v/cloud_topics/read_replica/* | Complete read replica implementation including STM, state refresh, snapshot management, metadata management |
| src/v/cloud_topics/tests/read_replica_test.cc | End-to-end tests for cloud topic read replicas |
Comments suppressed due to low confidence (1)
src/v/kafka/data/cloud_topic_read_replica.cc:347
- The format string has 4 placeholders but only provides the error in the 5th argument position. The format should be: "error getting snapshot for domain {} bucket {}, earliest refresh time: {}, min_seqno: {}, error: {}" with snapshot_res.error() as the 5th argument, or remove one placeholder.
Collaborator
Retry command for Build#80821please wait until all jobs are finished before running the slash command |
Collaborator
CI test resultstest results on build#80821
test results on build#80851
|
Adds state_refresh_loop which runs on the leader for a given term and: 1. Discovers the domain UUID from the metastore manifest 2. Periodically syncs metadata from cloud database snapshots 3. Replicates updates through the STM The loop queries the snapshot_metastore for partition offsets and terms, compares with current STM state, and replicates updates when changes are detected.
Add metadata_manager which implements metadata_provider and: - Listens for partition manage/unmanage events - Starts/stops state_refresh_loops for leader partitions - Tracks partition_metadata for all read replica partitions - Provides metadata lookups by NTP The metadata_provider interface allows consumers to query partition metadata including bucket name and snapshot timing information.
Add accessors helper that aggregates metadata_provider and snapshot_provider interfaces. This provides a lightweight way to pass read replica services through layers without circular dependencies.
Implement partition_proxy for read replica cloud topic partitions. The partition_proxy: - Returns synchronous state (offsets, terms) from the STM - Creates readers using snapshot_metastore from snapshot_manager - Supports timequery using metastore extent lookups - Returns errors for write operations (read-only) This allows read replica partitions to serve Kafka fetch requests using data from the source cluster's cloud database.
Integrate read replica system into the application: - Add read_replica_app to cloud_topics with metadata_manager and snapshot_manager instances - Register read_replica_stm in ctp_stm_factory - Create cloud_topic_read_replica partition_proxy for read replica partitions in partition_manager - Mark read replica topics in remote_topic_configuration_source - Wire up read_replica_app lifecycle in application_services This completes the integration, allowing read replica partitions to be created and serve Kafka requests.
Add read_replica_test which validates: - STM state replication - Metadata discovery and sync from cloud - Snapshot management and refreshing - Partition proxy operations (offsets, reads, timequery) - Integration with Raft and partition lifecycle The test creates a mock cloud database, creates a read replica partition, and verifies it can discover metadata and serve reads.
Will be useful to return values from the partition_probe, which will only have access to the cluster::partition.
The max_offset metric is meant to be the HWM. For a cloud_topic read_replica we need to pull this from the state machine.
Parameterizes the existing read replica ducktape tests to use cloud_topics as a source. For the most part these tests only depend on the Kafka API, so there isn't much updating needed except parameterizing.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TODO
Based on #29657
Backports Required
Release Notes