diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 70b6e7609c..6be3fdc726 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -47,7 +47,9 @@ use restate_types::logs::metadata::{ }; use restate_types::logs::{LogId, LogletId, Lsn}; use restate_types::net::node::NodeState; -use restate_types::net::partition_processor_manager::{CreateSnapshotRequest, Snapshot}; +use restate_types::net::partition_processor_manager::{ + CreateSnapshotRequest, CreateSnapshotStatus, Snapshot, +}; use restate_types::nodes_config::{NodesConfiguration, StorageState}; use restate_types::partition_table::{ self, PartitionReplication, PartitionTable, PartitionTableBuilder, @@ -712,7 +714,8 @@ where partition_id: PartitionId, min_target_lsn: Option, ) -> anyhow::Result { - self.network_sender + let response = self + .network_sender .call_rpc( node_id, Swimlane::default(), @@ -723,9 +726,13 @@ where Some(partition_id.into()), None, ) - .await? - .result - .map_err(|e| anyhow!("Failed to create snapshot: {:?}", e)) + .await?; + + match response.status { + CreateSnapshotStatus::Ok => Ok(response.snapshot.expect("to be set")), + CreateSnapshotStatus::Error(e) => Err(anyhow!("Failed to create snapshot: {}", e)), + CreateSnapshotStatus::Unknown => Err(anyhow!("Unknown error creating snapshot")), + } } } diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index ecbbad62e7..60dd45ed0e 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -18,6 +18,9 @@ enum ProtocolVersion { // V1 = 1; // [Native RPC] Released in >= v1.3.3 V2 = 2; + // Released in v1.6.0 + // Some messages has switched to bilrost + V3 = 3; } message NodeId { @@ -42,7 +45,7 @@ message Version { uint32 value = 1; } // The handle name or type tag of the message. For every service there must be // exactly one message handler implementation. enum ServiceTag { - reserved 1 to 25, 40 to 43, 50 to 53, 60, 61, 80 to 85 ; + reserved 1 to 25, 40 to 43, 50 to 53, 60, 61, 80 to 85; ServiceTag_UNKNOWN = 0; // LogServer LOG_SERVER_DATA_SERVICE = 26; diff --git a/crates/types/src/net/codec.rs b/crates/types/src/net/codec.rs index 39ef7b1c6e..1139a0d1d2 100644 --- a/crates/types/src/net/codec.rs +++ b/crates/types/src/net/codec.rs @@ -51,28 +51,6 @@ pub trait WireDecode { Self: Sized; } -impl WireEncode for Box -where - T: WireEncode, -{ - fn encode_to_bytes(&self, protocol_version: ProtocolVersion) -> Result { - self.as_ref().encode_to_bytes(protocol_version) - } -} - -impl WireDecode for Box -where - T: WireDecode, -{ - type Error = T::Error; - fn try_decode(buf: impl Buf, protocol_version: ProtocolVersion) -> Result - where - Self: Sized, - { - Ok(Box::new(T::try_decode(buf, protocol_version)?)) - } -} - impl WireDecode for Arc where T: WireDecode, @@ -129,3 +107,47 @@ pub fn decode_as_bilrost( T::decode(buf).context("failed decoding (bilrost) network message") } + +/// Trait for types that need to support migration from flexbuffers to bilrost encoding +/// based on protocol version. Implement this trait for message types that are transitioning +/// from flexbuffers to bilrost serialization. +/// +/// - If the protocol version is **less than or equal to** `BILROST_VERSION`, the type will be +/// encoded/decoded using flexbuffers (via serde). +/// - If the protocol version is **greater than** `BILROST_VERSION`, the type will be +/// encoded/decoded using bilrost. +/// +/// Types implementing this trait must be serializable and deserializable with both serde and bilrost. +pub trait MigrationCodec: + Serialize + DeserializeOwned + bilrost::Message + bilrost::OwnedMessage +{ + /// The protocol version at which bilrost encoding becomes active. + const BILROST_VERSION: ProtocolVersion; +} + +impl WireEncode for T +where + T: MigrationCodec, +{ + fn encode_to_bytes(&self, protocol_version: ProtocolVersion) -> Result { + if protocol_version <= T::BILROST_VERSION { + Ok(Bytes::from(encode_as_flexbuffers(self))) + } else { + Ok(encode_as_bilrost(self)) + } + } +} + +impl WireDecode for T +where + T: MigrationCodec, +{ + type Error = anyhow::Error; + fn try_decode(buf: impl Buf, protocol_version: ProtocolVersion) -> Result { + if protocol_version <= T::BILROST_VERSION { + Ok(decode_as_flexbuffers(buf, protocol_version)?) + } else { + Ok(decode_as_bilrost(buf, protocol_version)?) + } + } +} diff --git a/crates/types/src/net/mod.rs b/crates/types/src/net/mod.rs index a0f88b2b93..f7fd62127d 100644 --- a/crates/types/src/net/mod.rs +++ b/crates/types/src/net/mod.rs @@ -30,7 +30,7 @@ pub use crate::protobuf::common::ProtocolVersion; pub use crate::protobuf::common::ServiceTag; pub static MIN_SUPPORTED_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::V2; -pub static CURRENT_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::V2; +pub static CURRENT_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::V3; #[derive( Debug, diff --git a/crates/types/src/net/partition_processor_manager.rs b/crates/types/src/net/partition_processor_manager.rs index 3cd3195934..c6c5fa3a23 100644 --- a/crates/types/src/net/partition_processor_manager.rs +++ b/crates/types/src/net/partition_processor_manager.rs @@ -8,13 +8,18 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use bytes::{Buf, Bytes}; use serde::{Deserialize, Serialize}; use crate::Version; use crate::identifiers::{PartitionId, SnapshotId}; use crate::logs::{LogId, Lsn}; -use crate::net::{ServiceTag, define_service, define_unary_message}; -use crate::net::{default_wire_codec, define_rpc}; +use crate::net::codec::{ + EncodeError, MigrationCodec, WireDecode, WireEncode, decode_as_bilrost, decode_as_flexbuffers, + encode_as_bilrost, encode_as_flexbuffers, +}; +use crate::net::define_rpc; +use crate::net::{ProtocolVersion, ServiceTag, define_service, define_unary_message}; pub struct PartitionManagerService; @@ -28,37 +33,50 @@ define_unary_message! { @service = PartitionManagerService, } -default_wire_codec!(ControlProcessors); +impl MigrationCodec for ControlProcessors { + const BILROST_VERSION: ProtocolVersion = ProtocolVersion::V2; +} -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, bilrost::Message)] pub struct ControlProcessors { + #[bilrost(1)] pub min_partition_table_version: Version, + #[bilrost(2)] pub min_logs_table_version: Version, + #[bilrost(3)] pub commands: Vec, } -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, bilrost::Message)] pub struct ControlProcessor { + #[bilrost(1)] pub partition_id: PartitionId, + #[bilrost(oneof(2-3))] pub command: ProcessorCommand, // Version of the current partition configuration used for creating the command for selecting // the leader. Restate <= 1.3.2 does not set the current version attribute. + #[bilrost(4)] #[serde(default = "Version::invalid")] pub current_version: Version, } -#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, derive_more::Display)] +#[derive( + Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, derive_more::Display, bilrost::Oneof, +)] pub enum ProcessorCommand { // #[deprecated( // since = "1.3.3", // note = "Stopping should happen based on the PartitionReplicaSetStates" // )] + #[bilrost(empty)] Stop, // #[deprecated( // since = "1.3.3", // note = "Starting followers should happen based on the PartitionReplicaSetStates" // )] + #[bilrost(tag(2), message)] Follower, + #[bilrost(tag(3), message)] Leader, } @@ -68,24 +86,43 @@ define_rpc! { @service = PartitionManagerService, } -default_wire_codec!(CreateSnapshotRequest); -default_wire_codec!(CreateSnapshotResponse); +impl MigrationCodec for CreateSnapshotRequest { + const BILROST_VERSION: ProtocolVersion = ProtocolVersion::V2; +} -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, bilrost::Message)] pub struct CreateSnapshotRequest { + #[bilrost(1)] pub partition_id: PartitionId, + #[bilrost(2)] pub min_target_lsn: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, bilrost::Oneof)] +pub enum CreateSnapshotStatus { + #[bilrost(empty)] + Unknown, + #[bilrost(tag(1), message)] + Ok, + #[bilrost(tag(2), message)] + Error(String), +} + +#[derive(Debug, Clone, bilrost::Message)] pub struct CreateSnapshotResponse { - pub result: Result, + #[bilrost(oneof(1-2))] + pub status: CreateSnapshotStatus, + #[bilrost(3)] + pub snapshot: Option, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, bilrost::Message)] pub struct Snapshot { + #[bilrost(1)] pub snapshot_id: SnapshotId, + #[bilrost(2)] pub log_id: LogId, + #[bilrost(3)] pub min_applied_lsn: Lsn, } @@ -93,3 +130,87 @@ pub struct Snapshot { pub enum SnapshotError { SnapshotCreationFailed(String), } + +impl WireEncode for CreateSnapshotResponse { + fn encode_to_bytes(&self, protocol_version: ProtocolVersion) -> Result { + match protocol_version { + ProtocolVersion::V2 => { + let message_v2 = compat::CreateSnapshotResponse::from(self.clone()); + Ok(Bytes::from(encode_as_flexbuffers(&message_v2))) + } + ProtocolVersion::V3 => Ok(encode_as_bilrost(self)), + _ => Err(EncodeError::IncompatibleVersion { + type_tag: stringify!(CreateSnapshotResponse), + min_required: ProtocolVersion::V2, + actual: protocol_version, + }), + } + } +} + +impl WireDecode for CreateSnapshotResponse { + type Error = anyhow::Error; + + fn try_decode(buf: impl Buf, protocol_version: ProtocolVersion) -> Result { + match protocol_version { + ProtocolVersion::V2 => { + let message_v2: compat::CreateSnapshotResponse = + decode_as_flexbuffers(buf, protocol_version)?; + Ok(message_v2.into()) + } + ProtocolVersion::V3 => decode_as_bilrost(buf, protocol_version), + _ => Err(anyhow::anyhow!( + "invalid protocol version: {}", + protocol_version.as_str_name() + )), + } + } +} + +mod compat { + use serde::{Deserialize, Serialize}; + + use crate::net::partition_processor_manager::Snapshot; + + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct CreateSnapshotResponse { + pub result: Result, + } + + #[derive(Debug, Clone, Serialize, Deserialize)] + pub enum SnapshotError { + SnapshotCreationFailed(String), + } + + impl From for CreateSnapshotResponse { + fn from(value: super::CreateSnapshotResponse) -> Self { + match value.status { + super::CreateSnapshotStatus::Unknown | super::CreateSnapshotStatus::Ok => Self { + result: value.snapshot.ok_or_else(|| { + SnapshotError::SnapshotCreationFailed( + "invalid snapshot response".to_string(), + ) + }), + }, + super::CreateSnapshotStatus::Error(e) => Self { + result: Err(SnapshotError::SnapshotCreationFailed(e)), + }, + } + } + } + + impl From for super::CreateSnapshotResponse { + fn from(value: CreateSnapshotResponse) -> Self { + match value.result { + Ok(snapshot) => Self { + status: super::CreateSnapshotStatus::Ok, + snapshot: Some(snapshot), + }, + Err(SnapshotError::SnapshotCreationFailed(e)) => Self { + status: super::CreateSnapshotStatus::Error(e), + snapshot: None, + }, + } + } + } +} diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index cd289f8f3d..1c8bb1d0d2 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -67,7 +67,7 @@ use restate_types::net::metadata::MetadataKind; use restate_types::net::partition_processor::PartitionLeaderService; use restate_types::net::partition_processor_manager::{ ControlProcessor, ControlProcessors, CreateSnapshotRequest, CreateSnapshotResponse, - PartitionManagerService, ProcessorCommand, Snapshot, SnapshotError as NetSnapshotError, + CreateSnapshotStatus, PartitionManagerService, ProcessorCommand, Snapshot, }; use restate_types::net::{RpcRequest as _, UnaryMessage}; use restate_types::nodes_config::{NodesConfigError, NodesConfiguration, WorkerState}; @@ -1209,14 +1209,16 @@ impl PartitionProcessorManager { }; match result { Ok(snapshot) => reciprocal.send(CreateSnapshotResponse { - result: Ok(Snapshot { + status: CreateSnapshotStatus::Ok, + snapshot: Some(Snapshot { snapshot_id: snapshot.snapshot_id, log_id: snapshot.log_id, min_applied_lsn: snapshot.min_applied_lsn, }), }), Err(err) => reciprocal.send(CreateSnapshotResponse { - result: Err(NetSnapshotError::SnapshotCreationFailed(err.to_string())), + status: CreateSnapshotStatus::Error(err.to_string()), + snapshot: None, }), }; });