Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 99 additions & 61 deletions design_docs/20260202-force_promote_failover.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,155 +45,193 @@ message UpdateReplicateConfigurationRequest {
}
```

Add `force_promote` field to the internal message header:
Add `force_promote` and `ignore` fields to the internal message header:

```protobuf
// In messages.proto
message AlterReplicateConfigMessageHeader {
common.ReplicateConfiguration replicate_configuration = 1;
bool force_promote = 2;
bool ignore = 3; // Skip processing of this message (used for incomplete broadcasts)
}
```

Add metadata fields to track force-promoted configurations:
Add metadata field to track force-promoted configurations:

```protobuf
// In streaming.proto
message ReplicateConfigurationMeta {
common.ReplicateConfiguration replicate_configuration = 1;
bool force_promoted = 2;
uint64 force_promote_timestamp = 3;
}
```

### Force Promote Constraints

For safety, force promote is restricted to failover scenarios only:
For safety, force promote requires an empty configuration and auto-constructs the standalone primary config from existing meta:

| Constraint | Validation | Rationale |
|---|---|---|
| Secondary cluster only | Attempt broadcast lock; reject if successful (we are primary) | Only secondary clusters need emergency promotion |
| Single cluster config | `len(config.Clusters) == 1` | Promoted cluster becomes standalone |
| Current cluster only | `config.Clusters[0].ClusterId == currentClusterID` | Cannot promote a different cluster |
| No topology | `len(config.CrossClusterTopology) == 0` | Standalone cluster has no replication edges |
| Secondary cluster only | Use `WithSecondaryClusterResourceKey()` API; returns error if primary | Only secondary clusters need emergency promotion |
| Empty clusters field | `len(config.Clusters) == 0` | Config is auto-constructed from existing meta |
| Empty topology field | `len(config.CrossClusterTopology) == 0` | Config is auto-constructed from existing meta |

The auto-constructed configuration:
- Contains a single cluster entry for the current cluster
- Uses existing pchannels from the cluster's meta
- Has no cross-cluster topology (standalone primary)

### Force Promote Flow

```
Client SDK
│ UpdateReplicateConfiguration(config, force_promote=true)
│ UpdateReplicateConfiguration(config={}, force_promote=true)
Proxy
│ Forward to StreamingCoord
StreamingCoord (Assignment Service)
│ 1. Validate constraints (secondary, single cluster, no topology)
│ 2. Bypass broadcaster lock (use Broadcast().Append() directly)
│ 3. Append AlterReplicateConfigMessage with ForcePromote=true
│ 1. Validate empty cluster/topology fields
│ 2. Use WithSecondaryClusterResourceKey() to acquire lock and verify secondary
│ 3. Auto-construct standalone primary config from existing meta
│ 4. Build message with AckSyncUp=true (disable fast DDL ack)
│ 5. Broadcast AlterReplicateConfigMessage with ForcePromote=true
StreamingNode (TxnBuffer)
│ 6. Detect ForcePromote && !Ignore in message
│ 7. Roll back all uncommitted transactions via RollbackAllUncommittedTxn()
StreamingNode (Replicate Interceptor)
│ 4. Detect ForcePromote in message header
│ 5. Roll back all in-flight transactions via TxnManager
│ 6. Switch replication mode
│ 8. Detect ForcePromote && !Ignore in message header
│ 9. Switch replication mode to primary
StreamingCoord (Broadcast Callback)
│ 7. Persist config with ForcePromoted=true flag
│ 8. Supplement incomplete broadcast messages to missing vchannels
│ 10. Skip if Ignore=true (incomplete old message)
│ 11. Fix incomplete broadcasts: mark with Ignore=true, supplement to remaining vchannels
│ 12. Persist config with ForcePromoted=true flag
Done — cluster is now standalone primary
```

### Handling Incomplete Messages

When force promote completes, incomplete messages from the old topology must be handled:
When force promote executes, incomplete messages from the old topology must be handled:

#### Transaction Rollback (Automatic per StreamingNode)
#### Transaction Rollback (via TxnBuffer)

When each StreamingNode processes the forced `AlterReplicateConfigMessage`:
When TxnBuffer processes the forced `AlterReplicateConfigMessage`:

1. Replicate interceptor detects `ForcePromote == true` in the message header
2. Calls `TxnManager.RollbackAllInFlightTransactions()`
3. All active transaction sessions are cleaned up
4. Rollback happens **after** the message is persisted in WAL (atomic)
1. TxnBuffer detects `ForcePromote == true && Ignore == false` in the message header
2. Calls `RollbackAllUncommittedTxn()` to clean up all pending transactions
3. All buffered transaction messages are discarded
4. Rollback happens before the message is passed to downstream consumers

```go
// TxnManager new method
func (m *txnManagerImpl) RollbackAllInFlightTransactions() {
m.mu.Lock()
defer m.mu.Unlock()
for txnID, session := range m.sessions {
session.Cleanup()
delete(m.sessions, txnID)
// TxnBuffer method
func (b *TxnBuffer) RollbackAllUncommittedTxn() {
for txnID := range b.builders {
b.rollbackTxn(txnID)
}
b.logger.Info("Rolled back all uncommitted transactions in TxnBuffer due to force promote")
}
```

No remote detection or coordinator intervention is needed — each node handles its own transactions.
No remote detection or coordinator intervention is needed — each vchannel's TxnBuffer handles its own transactions.

#### Incomplete Broadcast Fixing (In Callback on StreamingCoord)

#### Broadcast Supplementation (In Callback on StreamingCoord)
During force promote, incomplete broadcasts from previous operations (e.g., failed switchover) must be handled to prevent their callbacks from overwriting the force promote configuration.

In the `alterReplicateConfiguration()` callback:

1. Detect force promote from the message header flag
2. Query broadcaster for pending broadcast messages (`GetPendingBroadcastMessages()`)
3. Re-append pending messages to their target vchannels via WAL
4. Ensures partially-broadcasted DDL operations complete
1. Skip processing if `Ignore == true` (this is an old incomplete message)
2. For force promote messages, call `FixIncompleteBroadcastsForForcePromote()`
3. Mark incomplete `AlterReplicateConfigMessage` broadcasts with `Ignore=true`
4. Supplement marked messages to their remaining vchannels
5. This ensures old callbacks don't overwrite the new force promote config

```go
// Broadcaster new interface method
type Broadcaster interface {
// ...existing methods...
GetPendingBroadcastMessages() []message.MutableMessage
// Broadcaster internal method
func (bm *broadcastTaskManager) FixIncompleteBroadcastsForForcePromote(ctx context.Context) error {
// 1. Find incomplete AlterReplicateConfig broadcasts
// 2. Update task messages with Ignore=true
// 3. Persist updated tasks to catalog
// 4. Supplement to remaining vchannels
}
```

#### The `ignore` Field

The `ignore` field in `AlterReplicateConfigMessageHeader` prevents processing of messages that were broadcast before force promote but completed after:

| Location | Behavior when `Ignore=true` |
|----------|----------------------------|
| TxnBuffer | Skip transaction rollback |
| Replicate Interceptor | Skip replication mode switch |
| DDL ACK Callback | Skip config update and DDL fixing |
| CDC Channel Replicator | Skip replication removal check |
| CDC Stream Client | Skip message handling |
| Replicate Service | Skip message overwrite |
| Recovery Storage | Skip checkpoint and config update |

## Files Modified

### Proto Changes
- `pkg/proto/messages.proto` — Add `force_promote` to `AlterReplicateConfigMessageHeader`
- `pkg/proto/streaming.proto` — Add `force_promoted`, `force_promote_timestamp` to `ReplicateConfigurationMeta`
- `pkg/proto/messages.proto` — Add `force_promote` and `ignore` fields to `AlterReplicateConfigMessageHeader`
- `pkg/proto/streaming.proto` — Add `force_promoted` to `ReplicateConfigurationMeta`

### Core Implementation
- `internal/streamingcoord/server/service/assignment.go` — Add `handleForcePromote()`, `validateForcePromoteConfiguration()`, `supplementIncompleteBroadcasts()`
- `internal/streamingcoord/server/service/assignment.go` — Add `handleForcePromote()`, ignore field checks in ACK callback
- `internal/streamingcoord/server/balancer/channel/manager.go` — Persist force promote flag in configuration meta
- `internal/streamingnode/server/wal/interceptors/replicate/replicate_interceptor.go` — Detect force promote and trigger transaction rollback
- `internal/streamingnode/server/wal/interceptors/txn/txn_manager.go` — Add `RollbackAllInFlightTransactions()`
- `internal/streamingcoord/server/broadcaster/broadcast_manager.go` — Add `GetPendingBroadcastMessages()`
- `internal/streamingcoord/server/broadcaster/broadcaster.go` — Add method to `Broadcaster` interface
- `internal/streamingcoord/server/broadcaster/broadcast_manager.go` — Add `WithSecondaryClusterResourceKey()`, `FixIncompleteBroadcastsForForcePromote()`
- `internal/streamingcoord/server/broadcaster/broadcaster.go` — Add methods to `Broadcaster` interface
- `internal/streamingnode/server/wal/utility/txn_buffer.go` — Add `RollbackAllUncommittedTxn()`, force promote detection in `HandleImmutableMessages()`
- `internal/streamingnode/server/wal/interceptors/replicate/replicate_interceptor.go` — Add ignore field check
- `internal/streamingnode/server/wal/recovery/recovery_storage_impl.go` — Add ignore field check

### CDC Integration
- `internal/cdc/replication/replicatemanager/channel_replicator.go` — Add ignore field check
- `internal/cdc/replication/replicatestream/replicate_stream_client_impl.go` — Add ignore field check
- `internal/cdc/util/util.go` — Add ignore field check in `IsReplicationRemovedByAlterReplicateConfigMessage()`

### Client & Proxy
- `internal/proxy/impl.go` — Pass through `force_promote` flag
- `client/milvusclient/replicate_builder.go` — Add `WithForcePromote()` builder method
- `internal/distributed/streaming/replicate_service.go` — Accept request object
- `internal/distributed/streaming/replicate_service.go` — Accept request object, add ignore field check
- `internal/distributed/streaming/streaming.go` — Update `ReplicateService` interface
- `pkg/util/replicateutil/util.go` — Add logging helper

### Tests
- `internal/streamingcoord/server/service/assignment_test.go` — Force promote validation tests
- `internal/streamingnode/server/wal/interceptors/replicate/replicate_interceptor_test.go` — Force promote rollback tests
- `internal/streamingnode/server/wal/interceptors/txn/session_test.go` — RollbackAllInFlightTransactions tests
- `internal/streamingcoord/server/service/assignment_test.go` — Force promote validation, ignore field, and DDL fixing tests
- `internal/streamingnode/server/wal/utility/txn_buffer_test.go` — TxnBuffer rollback and force promote tests
- `tests/integration/replication/force_promote_test.go` — Integration tests

## Edge Cases

1. **Primary cluster rejection** — Force promote explicitly rejected on primary clusters
2. **Concurrent force promotes** — Catalog atomic saves prevent corruption
3. **Idempotency** — `proto.Equal()` check skips duplicate updates
4. **Append failure** — No transaction rollback if message fails to persist (atomic guarantee)
5. **Nil TxnManager** — Gracefully handled (no-op) when TxnManager is not initialized
6. **Empty pending broadcasts** — Supplementation is a no-op when nothing is pending
1. **Primary cluster rejection** — Force promote rejected via `WithSecondaryClusterResourceKey()` returning `ErrNotSecondary`
2. **Non-empty config rejection** — Force promote requires empty clusters/topology fields; non-empty configs are rejected
3. **Concurrent force promotes** — `WithSecondaryClusterResourceKey()` acquires exclusive cluster-level lock
4. **Idempotency** — `proto.Equal()` check skips duplicate updates
5. **Incomplete switchover messages** — Marked with `ignore=true` before supplementing, preventing config overwrite
6. **Empty pending broadcasts** — DDL fixing is a no-op when no incomplete broadcasts exist
7. **Ignored messages** — All 7 locations check `ignore` field and skip processing

## Alternatives Considered

### 1. Append RollbackTxn messages to WAL for each transaction
Rejected: Requires enumerating all in-flight transactions at the coordinator level and appending individual rollback messages. The programmatic `TxnManager.RollbackAllInFlightTransactions()` approach is simpler and avoids remote detection complexity.
Rejected: Requires enumerating all in-flight transactions at the coordinator level and appending individual rollback messages. The `TxnBuffer.RollbackAllUncommittedTxn()` approach is simpler and handles transactions locally in each vchannel's buffer.

### 2. Handle transaction rollback during WAL recovery
Rejected: Force promote is not a WAL recovery event. The `AlterReplicateConfigMessage` propagates naturally through the WAL to each StreamingNode, making the interceptor the correct place to trigger rollback.
Rejected: Force promote is not a WAL recovery event. The `AlterReplicateConfigMessage` propagates naturally through the WAL, and TxnBuffer's message handling is the correct place to trigger rollback.

### 3. Separate API endpoint for force promote
Rejected: Force promote is a specialized mode of `UpdateReplicateConfiguration`. Adding a separate endpoint would duplicate validation logic and complicate the client SDK.

### 4. User-specified config for force promote
Rejected: Allowing user-specified clusters/topology creates opportunities for configuration mismatches. Auto-constructing the config from existing meta ensures consistency and simplifies the API.

### 5. Timestamp-based detection of incomplete messages
Rejected: Using a `force_promote_timestamp` field to detect stale messages is fragile and requires clock synchronization. The `ignore` field approach is explicit and doesn't depend on timing.

## Related Issues

- https://github.com/milvus-io/milvus/issues/47351
Expand Down