Skip to content

Commit aa0035f

Browse files
committed
modify streaming update client
1 parent 9bef600 commit aa0035f

File tree

6 files changed

+149
-574
lines changed

6 files changed

+149
-574
lines changed

modules/service/src/elc.rs

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
use crate::service::AppService;
22
use enclave_api::EnclaveProtoAPI;
3+
use lcp_proto::google::protobuf::Any;
4+
use lcp_proto::lcp::service::elc::v1::msg_update_client_stream_chunk::Chunk;
35
use lcp_proto::lcp::service::elc::v1::{
46
msg_server::Msg, query_server::Query, MsgAggregateMessages, MsgAggregateMessagesResponse,
57
MsgCreateClient, MsgCreateClientResponse, MsgUpdateClient, MsgUpdateClientResponse,
6-
MsgVerifyMembership, MsgVerifyMembershipResponse, MsgVerifyNonMembership,
7-
MsgVerifyNonMembershipResponse, QueryClientRequest, QueryClientResponse,
8+
MsgUpdateClientStreamChunk, MsgVerifyMembership, MsgVerifyMembershipResponse,
9+
MsgVerifyNonMembership, MsgVerifyNonMembershipResponse, QueryClientRequest,
10+
QueryClientResponse,
811
};
912
use store::transaction::CommitStore;
1013
use tonic::{Request, Response, Status, Streaming};
@@ -35,30 +38,63 @@ where
3538
}
3639
}
3740

38-
async fn streaming_update_client(
41+
async fn update_client_stream(
3942
&self,
40-
request: Request<Streaming<MsgUpdateClient>>,
43+
request: Request<Streaming<MsgUpdateClientStreamChunk>>,
4144
) -> Result<Response<MsgUpdateClientResponse>, Status> {
42-
let mut complete = MsgUpdateClient {
43-
signer: vec![],
44-
client_id: "".to_string(),
45-
include_state: false,
46-
header: None,
45+
let mut stream = request.into_inner();
46+
// read the first message (must be Init)
47+
let init = match stream.message().await? {
48+
Some(chunk) => match chunk.chunk {
49+
Some(Chunk::Init(init)) => init,
50+
_ => {
51+
return Err(Status::invalid_argument(
52+
"first message must be of type Init",
53+
))
54+
}
55+
},
56+
None => {
57+
return Err(Status::invalid_argument(
58+
"expected Init message as the first message",
59+
))
60+
}
4761
};
4862

49-
let mut stream = request.into_inner();
50-
while let Some(chunk) = stream.message().await? {
51-
if let Some(header) = &mut complete.header {
52-
let any_header = chunk
53-
.header
54-
.ok_or(Status::invalid_argument("header value is required"))?;
55-
header.value.extend(any_header.value);
56-
} else {
57-
complete = chunk;
63+
// accumulate header chunks
64+
let mut header_bytes = Vec::new();
65+
66+
while let Some(chunk_msg) = stream.message().await? {
67+
match chunk_msg.chunk {
68+
Some(Chunk::HeaderChunk(header_chunk)) => {
69+
header_bytes.extend(header_chunk.data);
70+
}
71+
Some(Chunk::Init(_)) => {
72+
return Err(Status::invalid_argument(
73+
"Init must only appear as the first message",
74+
));
75+
}
76+
None => {
77+
return Err(Status::invalid_argument("received empty chunk message"));
78+
}
5879
}
5980
}
6081

61-
match self.enclave.proto_update_client(complete) {
82+
if header_bytes.is_empty() {
83+
return Err(Status::invalid_argument("no header data received"));
84+
}
85+
86+
// set the header with collected data
87+
let msg = MsgUpdateClient {
88+
client_id: init.client_id,
89+
include_state: init.include_state,
90+
signer: init.signer,
91+
header: Some(Any {
92+
type_url: init.type_url,
93+
value: header_bytes,
94+
}),
95+
};
96+
97+
match self.enclave.proto_update_client(msg) {
6298
Ok(res) => Ok(Response::new(res)),
6399
Err(e) => Err(Status::aborted(e.to_string())),
64100
}

proto/definitions/lcp/service/elc/v1/tx.proto

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ service Msg {
1616
// UpdateClient defines a rpc handler method for MsgUpdateClient.
1717
rpc UpdateClient(MsgUpdateClient) returns (MsgUpdateClientResponse);
1818

19-
// StreamingUpdateClient defines a rpc handler method for MsgUpdateClient.
20-
rpc StreamingUpdateClient(stream MsgUpdateClient) returns (MsgUpdateClientResponse);
19+
// UpdateClientStream defines a rpc handler method for MsgUpdateClient.
20+
rpc UpdateClientStream(stream MsgUpdateClientStreamChunk) returns (MsgUpdateClientResponse);
2121

2222
// AggregateMessages defines a rpc handler method for MsgAggregateMessages
2323
rpc AggregateMessages(MsgAggregateMessages) returns (MsgAggregateMessagesResponse);
@@ -137,3 +137,21 @@ message MsgVerifyNonMembershipResponse {
137137
bytes message = 1;
138138
bytes signature = 2;
139139
}
140+
141+
message MsgUpdateClientStreamChunk {
142+
oneof chunk {
143+
UpdateClientStreamInit init = 1;
144+
UpdateClientStreamHeaderChunk header_chunk = 2;
145+
}
146+
}
147+
148+
message UpdateClientStreamInit {
149+
string client_id = 1;
150+
bool include_state = 2;
151+
bytes signer = 3;
152+
string type_url = 4;
153+
}
154+
155+
message UpdateClientStreamHeaderChunk {
156+
bytes data = 1;
157+
}

proto/src/descriptor.bin

-627 Bytes
Binary file not shown.

proto/src/prost/cosmos.upgrade.v1beta1.rs

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@ pub struct Plan {
1111
/// reached and the software will exit.
1212
#[prost(string, tag = "1")]
1313
pub name: ::prost::alloc::string::String,
14-
/// Deprecated: Time based upgrades have been deprecated. Time based upgrade logic
15-
/// has been removed from the SDK.
16-
/// If this field is not empty, an error will be thrown.
17-
#[deprecated]
14+
/// The time after which the upgrade must be performed.
15+
/// Leave set to its zero value to use a pre-defined Height instead.
1816
#[prost(message, optional, tag = "2")]
1917
pub time: ::core::option::Option<super::super::super::google::protobuf::Timestamp>,
2018
/// The height at which the upgrade must be performed.
@@ -25,14 +23,6 @@ pub struct Plan {
2523
/// such as a git commit that validators could automatically upgrade to
2624
#[prost(string, tag = "4")]
2725
pub info: ::prost::alloc::string::String,
28-
/// Deprecated: UpgradedClientState field has been deprecated. IBC upgrade logic has been
29-
/// moved to the IBC module in the sub module 02-client.
30-
/// If this field is not empty, an error will be thrown.
31-
#[deprecated]
32-
#[prost(message, optional, tag = "5")]
33-
pub upgraded_client_state: ::core::option::Option<
34-
super::super::super::google::protobuf::Any,
35-
>,
3626
}
3727
/// SoftwareUpgradeProposal is a gov Content type for initiating a software
3828
/// upgrade.
@@ -56,16 +46,3 @@ pub struct CancelSoftwareUpgradeProposal {
5646
#[prost(string, tag = "2")]
5747
pub description: ::prost::alloc::string::String,
5848
}
59-
/// ModuleVersion specifies a module and its consensus version.
60-
///
61-
/// Since: cosmos-sdk 0.43
62-
#[allow(clippy::derive_partial_eq_without_eq)]
63-
#[derive(Clone, PartialEq, ::prost::Message)]
64-
pub struct ModuleVersion {
65-
/// name of the app module
66-
#[prost(string, tag = "1")]
67-
pub name: ::prost::alloc::string::String,
68-
/// consensus version of the app module
69-
#[prost(uint64, tag = "2")]
70-
pub version: u64,
71-
}

0 commit comments

Comments
 (0)