Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ use restate_types::logs::metadata::{
};
use restate_types::logs::{LogId, LogletId, Lsn};
use restate_types::net::node::NodeState;
use restate_types::net::partition_processor_manager::{CreateSnapshotRequest, Snapshot};
use restate_types::net::partition_processor_manager::{
CreateSnapshotRequest, CreateSnapshotStatus, Snapshot,
};
use restate_types::nodes_config::{NodesConfiguration, StorageState};
use restate_types::partition_table::{
self, PartitionReplication, PartitionTable, PartitionTableBuilder,
Expand Down Expand Up @@ -712,7 +714,8 @@ where
partition_id: PartitionId,
min_target_lsn: Option<Lsn>,
) -> anyhow::Result<Snapshot> {
self.network_sender
let response = self
.network_sender
.call_rpc(
node_id,
Swimlane::default(),
Expand All @@ -723,9 +726,13 @@ where
Some(partition_id.into()),
None,
)
.await?
.result
.map_err(|e| anyhow!("Failed to create snapshot: {:?}", e))
.await?;

match response.status {
CreateSnapshotStatus::Ok => Ok(response.snapshot.expect("to be set")),
CreateSnapshotStatus::Error(e) => Err(anyhow!("Failed to create snapshot: {}", e)),
CreateSnapshotStatus::Unknown => Err(anyhow!("Unknown error creating snapshot")),
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ enum ProtocolVersion {
// V1 = 1;
// [Native RPC] Released in >= v1.3.3
V2 = 2;
// Released in v1.6.0
// Some messages has switched to bilrost
V3 = 3;
}

message NodeId {
Expand All @@ -42,7 +45,7 @@ message Version { uint32 value = 1; }
// The handle name or type tag of the message. For every service there must be
// exactly one message handler implementation.
enum ServiceTag {
reserved 1 to 25, 40 to 43, 50 to 53, 60, 61, 80 to 85 ;
reserved 1 to 25, 40 to 43, 50 to 53, 60, 61, 80 to 85;
ServiceTag_UNKNOWN = 0;
// LogServer
LOG_SERVER_DATA_SERVICE = 26;
Expand Down
66 changes: 44 additions & 22 deletions crates/types/src/net/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,6 @@ pub trait WireDecode {
Self: Sized;
}

impl<T> WireEncode for Box<T>
where
T: WireEncode,
{
fn encode_to_bytes(&self, protocol_version: ProtocolVersion) -> Result<Bytes, EncodeError> {
self.as_ref().encode_to_bytes(protocol_version)
}
}

impl<T> WireDecode for Box<T>
where
T: WireDecode,
{
type Error = T::Error;
fn try_decode(buf: impl Buf, protocol_version: ProtocolVersion) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(Box::new(T::try_decode(buf, protocol_version)?))
}
}

impl<T> WireDecode for Arc<T>
where
T: WireDecode,
Expand Down Expand Up @@ -129,3 +107,47 @@ pub fn decode_as_bilrost<T: OwnedMessage>(

T::decode(buf).context("failed decoding (bilrost) network message")
}

/// Trait for types that need to support migration from flexbuffers to bilrost encoding
/// based on protocol version. Implement this trait for message types that are transitioning
/// from flexbuffers to bilrost serialization.
///
/// - If the protocol version is **less than or equal to** `BILROST_VERSION`, the type will be
/// encoded/decoded using flexbuffers (via serde).
/// - If the protocol version is **greater than** `BILROST_VERSION`, the type will be
/// encoded/decoded using bilrost.
///
/// Types implementing this trait must be serializable and deserializable with both serde and bilrost.
pub trait MigrationCodec:
Serialize + DeserializeOwned + bilrost::Message + bilrost::OwnedMessage
{
/// The protocol version at which bilrost encoding becomes active.
const BILROST_VERSION: ProtocolVersion;
}

impl<T> WireEncode for T
where
T: MigrationCodec,
{
fn encode_to_bytes(&self, protocol_version: ProtocolVersion) -> Result<Bytes, EncodeError> {
if protocol_version <= T::BILROST_VERSION {
Ok(Bytes::from(encode_as_flexbuffers(self)))
} else {
Ok(encode_as_bilrost(self))
}
}
}

impl<T> WireDecode for T
where
T: MigrationCodec,
{
type Error = anyhow::Error;
fn try_decode(buf: impl Buf, protocol_version: ProtocolVersion) -> Result<Self, Self::Error> {
if protocol_version <= T::BILROST_VERSION {
Ok(decode_as_flexbuffers(buf, protocol_version)?)
} else {
Ok(decode_as_bilrost(buf, protocol_version)?)
}
}
}
2 changes: 1 addition & 1 deletion crates/types/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use crate::protobuf::common::ProtocolVersion;
pub use crate::protobuf::common::ServiceTag;

pub static MIN_SUPPORTED_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::V2;
pub static CURRENT_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::V2;
pub static CURRENT_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::V3;

#[derive(
Debug,
Expand Down
145 changes: 133 additions & 12 deletions crates/types/src/net/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use bytes::{Buf, Bytes};
use serde::{Deserialize, Serialize};

use crate::Version;
use crate::identifiers::{PartitionId, SnapshotId};
use crate::logs::{LogId, Lsn};
use crate::net::{ServiceTag, define_service, define_unary_message};
use crate::net::{default_wire_codec, define_rpc};
use crate::net::codec::{
EncodeError, MigrationCodec, WireDecode, WireEncode, decode_as_bilrost, decode_as_flexbuffers,
encode_as_bilrost, encode_as_flexbuffers,
};
use crate::net::define_rpc;
use crate::net::{ProtocolVersion, ServiceTag, define_service, define_unary_message};

pub struct PartitionManagerService;

Expand All @@ -28,37 +33,50 @@ define_unary_message! {
@service = PartitionManagerService,
}

default_wire_codec!(ControlProcessors);
impl MigrationCodec for ControlProcessors {
const BILROST_VERSION: ProtocolVersion = ProtocolVersion::V2;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, bilrost::Message)]
pub struct ControlProcessors {
#[bilrost(1)]
pub min_partition_table_version: Version,
#[bilrost(2)]
pub min_logs_table_version: Version,
#[bilrost(3)]
pub commands: Vec<ControlProcessor>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, bilrost::Message)]
pub struct ControlProcessor {
#[bilrost(1)]
pub partition_id: PartitionId,
#[bilrost(oneof(2-3))]
pub command: ProcessorCommand,
// Version of the current partition configuration used for creating the command for selecting
// the leader. Restate <= 1.3.2 does not set the current version attribute.
#[bilrost(4)]
#[serde(default = "Version::invalid")]
pub current_version: Version,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, derive_more::Display)]
#[derive(
Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, derive_more::Display, bilrost::Oneof,
)]
pub enum ProcessorCommand {
// #[deprecated(
// since = "1.3.3",
// note = "Stopping should happen based on the PartitionReplicaSetStates"
// )]
#[bilrost(empty)]
Stop,
// #[deprecated(
// since = "1.3.3",
// note = "Starting followers should happen based on the PartitionReplicaSetStates"
// )]
#[bilrost(tag(2), message)]
Follower,
#[bilrost(tag(3), message)]
Leader,
}

Expand All @@ -68,28 +86,131 @@ define_rpc! {
@service = PartitionManagerService,
}

default_wire_codec!(CreateSnapshotRequest);
default_wire_codec!(CreateSnapshotResponse);
impl MigrationCodec for CreateSnapshotRequest {
const BILROST_VERSION: ProtocolVersion = ProtocolVersion::V2;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, bilrost::Message)]
pub struct CreateSnapshotRequest {
#[bilrost(1)]
pub partition_id: PartitionId,
#[bilrost(2)]
pub min_target_lsn: Option<Lsn>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, bilrost::Oneof)]
pub enum CreateSnapshotStatus {
#[bilrost(empty)]
Unknown,
#[bilrost(tag(1), message)]
Ok,
#[bilrost(tag(2), message)]
Error(String),
}

#[derive(Debug, Clone, bilrost::Message)]
pub struct CreateSnapshotResponse {
pub result: Result<Snapshot, SnapshotError>,
#[bilrost(oneof(1-2))]
pub status: CreateSnapshotStatus,
#[bilrost(3)]
pub snapshot: Option<Snapshot>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, bilrost::Message)]
pub struct Snapshot {
#[bilrost(1)]
pub snapshot_id: SnapshotId,
#[bilrost(2)]
pub log_id: LogId,
#[bilrost(3)]
pub min_applied_lsn: Lsn,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SnapshotError {
SnapshotCreationFailed(String),
}

impl WireEncode for CreateSnapshotResponse {
fn encode_to_bytes(&self, protocol_version: ProtocolVersion) -> Result<Bytes, EncodeError> {
match protocol_version {
ProtocolVersion::V2 => {
let message_v2 = compat::CreateSnapshotResponse::from(self.clone());
Ok(Bytes::from(encode_as_flexbuffers(&message_v2)))
}
ProtocolVersion::V3 => Ok(encode_as_bilrost(self)),
_ => Err(EncodeError::IncompatibleVersion {
type_tag: stringify!(CreateSnapshotResponse),
min_required: ProtocolVersion::V2,
actual: protocol_version,
}),
}
}
}

impl WireDecode for CreateSnapshotResponse {
type Error = anyhow::Error;

fn try_decode(buf: impl Buf, protocol_version: ProtocolVersion) -> Result<Self, Self::Error> {
match protocol_version {
ProtocolVersion::V2 => {
let message_v2: compat::CreateSnapshotResponse =
decode_as_flexbuffers(buf, protocol_version)?;
Ok(message_v2.into())
}
ProtocolVersion::V3 => decode_as_bilrost(buf, protocol_version),
_ => Err(anyhow::anyhow!(
"invalid protocol version: {}",
protocol_version.as_str_name()
)),
}
}
}

mod compat {
use serde::{Deserialize, Serialize};

use crate::net::partition_processor_manager::Snapshot;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateSnapshotResponse {
pub result: Result<Snapshot, SnapshotError>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SnapshotError {
SnapshotCreationFailed(String),
}

impl From<super::CreateSnapshotResponse> for CreateSnapshotResponse {
fn from(value: super::CreateSnapshotResponse) -> Self {
match value.status {
super::CreateSnapshotStatus::Unknown | super::CreateSnapshotStatus::Ok => Self {
result: value.snapshot.ok_or_else(|| {
SnapshotError::SnapshotCreationFailed(
"invalid snapshot response".to_string(),
)
}),
},
super::CreateSnapshotStatus::Error(e) => Self {
result: Err(SnapshotError::SnapshotCreationFailed(e)),
},
}
}
}

impl From<CreateSnapshotResponse> for super::CreateSnapshotResponse {
fn from(value: CreateSnapshotResponse) -> Self {
match value.result {
Ok(snapshot) => Self {
status: super::CreateSnapshotStatus::Ok,
snapshot: Some(snapshot),
},
Err(SnapshotError::SnapshotCreationFailed(e)) => Self {
status: super::CreateSnapshotStatus::Error(e),
snapshot: None,
},
}
}
}
}
8 changes: 5 additions & 3 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor::PartitionLeaderService;
use restate_types::net::partition_processor_manager::{
ControlProcessor, ControlProcessors, CreateSnapshotRequest, CreateSnapshotResponse,
PartitionManagerService, ProcessorCommand, Snapshot, SnapshotError as NetSnapshotError,
CreateSnapshotStatus, PartitionManagerService, ProcessorCommand, Snapshot,
};
use restate_types::net::{RpcRequest as _, UnaryMessage};
use restate_types::nodes_config::{NodesConfigError, NodesConfiguration, WorkerState};
Expand Down Expand Up @@ -1209,14 +1209,16 @@ impl PartitionProcessorManager {
};
match result {
Ok(snapshot) => reciprocal.send(CreateSnapshotResponse {
result: Ok(Snapshot {
status: CreateSnapshotStatus::Ok,
snapshot: Some(Snapshot {
snapshot_id: snapshot.snapshot_id,
log_id: snapshot.log_id,
min_applied_lsn: snapshot.min_applied_lsn,
}),
}),
Err(err) => reciprocal.send(CreateSnapshotResponse {
result: Err(NetSnapshotError::SnapshotCreationFailed(err.to_string())),
status: CreateSnapshotStatus::Error(err.to_string()),
snapshot: None,
}),
};
});
Expand Down
Loading