Skip to content

Commit 85495b1

Browse files
committed
support streaming update_client
1 parent 0fa6397 commit 85495b1

File tree

3 files changed

+201
-3
lines changed

3 files changed

+201
-3
lines changed

modules/service/src/elc.rs

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use crate::service::AppService;
22
use enclave_api::EnclaveProtoAPI;
3+
use lcp_proto::google::protobuf::Any;
4+
use lcp_proto::lcp::service::elc::v1::msg_update_client_stream_chunk::Chunk;
35
use lcp_proto::lcp::service::elc::v1::{
46
msg_server::Msg, query_server::Query, MsgAggregateMessages, MsgAggregateMessagesResponse,
57
MsgCreateClient, MsgCreateClientResponse, MsgUpdateClient, MsgUpdateClientResponse,
6-
MsgVerifyMembership, MsgVerifyMembershipResponse, MsgVerifyNonMembership,
7-
MsgVerifyNonMembershipResponse, QueryClientRequest, QueryClientResponse,
8+
MsgUpdateClientStreamChunk, MsgVerifyMembership, MsgVerifyMembershipResponse,
9+
MsgVerifyNonMembership, MsgVerifyNonMembershipResponse, QueryClientRequest,
10+
QueryClientResponse,
811
};
912
use store::transaction::CommitStore;
10-
use tonic::{Request, Response, Status};
13+
use tonic::{Request, Response, Status, Streaming};
1114

1215
#[tonic::async_trait]
1316
impl<E, S> Msg for AppService<E, S>
@@ -35,6 +38,69 @@ where
3538
}
3639
}
3740

41+
async fn update_client_stream(
42+
&self,
43+
request: Request<Streaming<MsgUpdateClientStreamChunk>>,
44+
) -> Result<Response<MsgUpdateClientResponse>, Status> {
45+
let mut stream = request.into_inner();
46+
47+
// read the first message (must be Init)
48+
let init = match stream.message().await? {
49+
Some(chunk) => match chunk.chunk {
50+
Some(Chunk::Init(init)) => init,
51+
_ => {
52+
return Err(Status::invalid_argument(
53+
"first message must be of type Init",
54+
))
55+
}
56+
},
57+
None => {
58+
return Err(Status::invalid_argument(
59+
"expected Init message as the first message",
60+
))
61+
}
62+
};
63+
64+
// accumulate header chunks
65+
let mut header_bytes = Vec::new();
66+
67+
while let Some(chunk_msg) = stream.message().await? {
68+
match chunk_msg.chunk {
69+
Some(Chunk::HeaderChunk(header_chunk)) => {
70+
header_bytes.extend(header_chunk.data);
71+
}
72+
Some(Chunk::Init(_)) => {
73+
return Err(Status::invalid_argument(
74+
"Init must only appear as the first message",
75+
));
76+
}
77+
None => {
78+
return Err(Status::invalid_argument("received empty chunk message"));
79+
}
80+
}
81+
}
82+
83+
if header_bytes.is_empty() {
84+
return Err(Status::invalid_argument("no header data received"));
85+
}
86+
87+
// create MsgUpdateClient from Init and collected header data
88+
let msg = MsgUpdateClient {
89+
client_id: init.client_id,
90+
include_state: init.include_state,
91+
signer: init.signer,
92+
header: Some(Any {
93+
type_url: init.type_url,
94+
value: header_bytes,
95+
}),
96+
};
97+
98+
match self.enclave.proto_update_client(msg) {
99+
Ok(res) => Ok(Response::new(res)),
100+
Err(e) => Err(Status::aborted(e.to_string())),
101+
}
102+
}
103+
38104
async fn aggregate_messages(
39105
&self,
40106
request: Request<MsgAggregateMessages>,

proto/definitions/lcp/service/elc/v1/tx.proto

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ service Msg {
1616
// UpdateClient defines a rpc handler method for MsgUpdateClient.
1717
rpc UpdateClient(MsgUpdateClient) returns (MsgUpdateClientResponse);
1818

19+
// UpdateClientStream defines a rpc handler method for MsgUpdateClient.
20+
rpc UpdateClientStream(stream MsgUpdateClientStreamChunk) returns (MsgUpdateClientResponse);
21+
1922
// AggregateMessages defines a rpc handler method for MsgAggregateMessages
2023
rpc AggregateMessages(MsgAggregateMessages) returns (MsgAggregateMessagesResponse);
2124

@@ -134,3 +137,21 @@ message MsgVerifyNonMembershipResponse {
134137
bytes message = 1;
135138
bytes signature = 2;
136139
}
140+
141+
message MsgUpdateClientStreamChunk {
142+
oneof chunk {
143+
UpdateClientStreamInit init = 1;
144+
UpdateClientStreamHeaderChunk header_chunk = 2;
145+
}
146+
}
147+
148+
message UpdateClientStreamInit {
149+
string client_id = 1;
150+
bool include_state = 2;
151+
bytes signer = 3;
152+
string type_url = 4;
153+
}
154+
155+
message UpdateClientStreamHeaderChunk {
156+
bytes data = 1;
157+
}

proto/src/prost/lcp.service.elc.v1.rs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,45 @@ pub struct MsgVerifyNonMembershipResponse {
407407
#[prost(bytes = "vec", tag = "2")]
408408
pub signature: ::prost::alloc::vec::Vec<u8>,
409409
}
410+
#[derive(::serde::Serialize, ::serde::Deserialize)]
411+
#[allow(clippy::derive_partial_eq_without_eq)]
412+
#[derive(Clone, PartialEq, ::prost::Message)]
413+
pub struct MsgUpdateClientStreamChunk {
414+
#[prost(oneof = "msg_update_client_stream_chunk::Chunk", tags = "1, 2")]
415+
pub chunk: ::core::option::Option<msg_update_client_stream_chunk::Chunk>,
416+
}
417+
/// Nested message and enum types in `MsgUpdateClientStreamChunk`.
418+
pub mod msg_update_client_stream_chunk {
419+
#[derive(::serde::Serialize, ::serde::Deserialize)]
420+
#[allow(clippy::derive_partial_eq_without_eq)]
421+
#[derive(Clone, PartialEq, ::prost::Oneof)]
422+
pub enum Chunk {
423+
#[prost(message, tag = "1")]
424+
Init(super::UpdateClientStreamInit),
425+
#[prost(message, tag = "2")]
426+
HeaderChunk(super::UpdateClientStreamHeaderChunk),
427+
}
428+
}
429+
#[derive(::serde::Serialize, ::serde::Deserialize)]
430+
#[allow(clippy::derive_partial_eq_without_eq)]
431+
#[derive(Clone, PartialEq, ::prost::Message)]
432+
pub struct UpdateClientStreamInit {
433+
#[prost(string, tag = "1")]
434+
pub client_id: ::prost::alloc::string::String,
435+
#[prost(bool, tag = "2")]
436+
pub include_state: bool,
437+
#[prost(bytes = "vec", tag = "3")]
438+
pub signer: ::prost::alloc::vec::Vec<u8>,
439+
#[prost(string, tag = "4")]
440+
pub type_url: ::prost::alloc::string::String,
441+
}
442+
#[derive(::serde::Serialize, ::serde::Deserialize)]
443+
#[allow(clippy::derive_partial_eq_without_eq)]
444+
#[derive(Clone, PartialEq, ::prost::Message)]
445+
pub struct UpdateClientStreamHeaderChunk {
446+
#[prost(bytes = "vec", tag = "1")]
447+
pub data: ::prost::alloc::vec::Vec<u8>,
448+
}
410449
/// Generated client implementations.
411450
#[cfg(feature = "client")]
412451
pub mod msg_client {
@@ -518,6 +557,30 @@ pub mod msg_client {
518557
);
519558
self.inner.unary(request.into_request(), path, codec).await
520559
}
560+
/// UpdateClientStream defines a rpc handler method for MsgUpdateClient.
561+
pub async fn update_client_stream(
562+
&mut self,
563+
request: impl tonic::IntoStreamingRequest<
564+
Message = super::MsgUpdateClientStreamChunk,
565+
>,
566+
) -> Result<tonic::Response<super::MsgUpdateClientResponse>, tonic::Status> {
567+
self.inner
568+
.ready()
569+
.await
570+
.map_err(|e| {
571+
tonic::Status::new(
572+
tonic::Code::Unknown,
573+
format!("Service was not ready: {}", e.into()),
574+
)
575+
})?;
576+
let codec = tonic::codec::ProstCodec::default();
577+
let path = http::uri::PathAndQuery::from_static(
578+
"/lcp.service.elc.v1.Msg/UpdateClientStream",
579+
);
580+
self.inner
581+
.client_streaming(request.into_streaming_request(), path, codec)
582+
.await
583+
}
521584
/// AggregateMessages defines a rpc handler method for MsgAggregateMessages
522585
pub async fn aggregate_messages(
523586
&mut self,
@@ -604,6 +667,11 @@ pub mod msg_server {
604667
&self,
605668
request: tonic::Request<super::MsgUpdateClient>,
606669
) -> Result<tonic::Response<super::MsgUpdateClientResponse>, tonic::Status>;
670+
/// UpdateClientStream defines a rpc handler method for MsgUpdateClient.
671+
async fn update_client_stream(
672+
&self,
673+
request: tonic::Request<tonic::Streaming<super::MsgUpdateClientStreamChunk>>,
674+
) -> Result<tonic::Response<super::MsgUpdateClientResponse>, tonic::Status>;
607675
/// AggregateMessages defines a rpc handler method for MsgAggregateMessages
608676
async fn aggregate_messages(
609677
&self,
@@ -759,6 +827,49 @@ pub mod msg_server {
759827
};
760828
Box::pin(fut)
761829
}
830+
"/lcp.service.elc.v1.Msg/UpdateClientStream" => {
831+
#[allow(non_camel_case_types)]
832+
struct UpdateClientStreamSvc<T: Msg>(pub Arc<T>);
833+
impl<
834+
T: Msg,
835+
> tonic::server::ClientStreamingService<
836+
super::MsgUpdateClientStreamChunk,
837+
> for UpdateClientStreamSvc<T> {
838+
type Response = super::MsgUpdateClientResponse;
839+
type Future = BoxFuture<
840+
tonic::Response<Self::Response>,
841+
tonic::Status,
842+
>;
843+
fn call(
844+
&mut self,
845+
request: tonic::Request<
846+
tonic::Streaming<super::MsgUpdateClientStreamChunk>,
847+
>,
848+
) -> Self::Future {
849+
let inner = self.0.clone();
850+
let fut = async move {
851+
(*inner).update_client_stream(request).await
852+
};
853+
Box::pin(fut)
854+
}
855+
}
856+
let accept_compression_encodings = self.accept_compression_encodings;
857+
let send_compression_encodings = self.send_compression_encodings;
858+
let inner = self.inner.clone();
859+
let fut = async move {
860+
let inner = inner.0;
861+
let method = UpdateClientStreamSvc(inner);
862+
let codec = tonic::codec::ProstCodec::default();
863+
let mut grpc = tonic::server::Grpc::new(codec)
864+
.apply_compression_config(
865+
accept_compression_encodings,
866+
send_compression_encodings,
867+
);
868+
let res = grpc.client_streaming(method, req).await;
869+
Ok(res)
870+
};
871+
Box::pin(fut)
872+
}
762873
"/lcp.service.elc.v1.Msg/AggregateMessages" => {
763874
#[allow(non_camel_case_types)]
764875
struct AggregateMessagesSvc<T: Msg>(pub Arc<T>);

0 commit comments

Comments
 (0)