Skip to content

Commit 28c59aa

Browse files
authored
fix: handle failed event for in http listeners (#313)
* fix: handle failed event for in http listeners - for input-proof, user-decrypt and public-decrypt.
1 parent 1060e58 commit 28c59aa

File tree

3 files changed

+255
-134
lines changed

3 files changed

+255
-134
lines changed

apps/relayer/fhevm-relayer/src/http/input_http_listener.rs

Lines changed: 89 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,32 @@ impl<D: EventDispatcher<RelayerEvent> + HandlerRegistry<RelayerEvent>> InputProo
8686
info!("Validated and assigned request id: {}", request_id);
8787

8888
// Register once handlers for receiving the decryption response from the gateway
89-
let (handler, rx): (OnceHandler<RelayerEvent>, oneshot::Receiver<RelayerEvent>) =
90-
OnceHandler::new();
91-
let handler = Arc::new(handler);
89+
let (gateway_response_handler, gateway_response_rx): (
90+
OnceHandler<RelayerEvent>,
91+
oneshot::Receiver<RelayerEvent>,
92+
) = OnceHandler::new();
93+
let gateway_response_handler = Arc::new(gateway_response_handler);
9294

9395
self.orchestrator.register_once_handler(
9496
InputProofEventId::RespRcvdFromGw.into(),
9597
request_id,
96-
handler,
98+
gateway_response_handler,
9799
);
98-
info!("Registered once handler");
100+
info!("Registered once handler for handling input proof gateway response");
101+
102+
// Register once handlers for receiving the decryption response from the gateway
103+
let (error_handler, error_rx): (
104+
OnceHandler<RelayerEvent>,
105+
oneshot::Receiver<RelayerEvent>,
106+
) = OnceHandler::new();
107+
let error_handler = Arc::new(error_handler);
108+
109+
self.orchestrator.register_once_handler(
110+
InputProofEventId::Failed.into(),
111+
request_id,
112+
error_handler,
113+
);
114+
info!("Registered once handler for handling input proof failure");
99115

100116
let request_data: InputProofRequest = match payload.try_into() {
101117
Ok(event_data) => event_data,
@@ -115,51 +131,77 @@ impl<D: EventDispatcher<RelayerEvent> + HandlerRegistry<RelayerEvent>> InputProo
115131
);
116132
let _ = self.orchestrator.dispatch_event(event).await;
117133
info!("dispatched event to orchestrator to initiate processing");
118-
let event = {
119-
let _waiting_for_response_span =
120-
span!(Level::INFO, "waiting-for-response", request_id = %request_id);
121-
info!("waiting for reponse event");
122-
123-
// Wait for response on the rx of Onshot channel.
124-
match rx.await {
125-
Ok(event) => {
126-
info!("received response event");
127-
event
128-
}
129-
Err(_) => {
130-
info!("received errror while waiting for response event");
131-
let error_response = InputProofErrorResponseJson {
132-
message: "Failed to receive response from the gateway.".to_string(),
133-
};
134-
return (StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
135-
.into_response();
136-
}
137-
}
138-
};
139134

140-
info!("Response event type {:?}", event.data);
141-
match event.data {
142-
RelayerEventData::InputProof(InputProofEventData::RespRcvdFromGw {
143-
input_proof_response,
144-
}) => match InputProofResponseJson::try_from(input_proof_response) {
145-
Ok(response_json) => {
146-
info!("Sending success reponse to user");
147-
(StatusCode::OK, Json(response_json)).into_response()
135+
let _waiting_for_response_span =
136+
span!(Level::INFO, "waiting-for-response", request_id = %request_id);
137+
info!("waiting for reponse event");
138+
139+
// Wait for response or error on the rx of Oneshot channels concurrently.
140+
use futures::pin_mut;
141+
pin_mut!(gateway_response_rx);
142+
pin_mut!(error_rx);
143+
144+
tokio::select! {
145+
res = &mut gateway_response_rx => {
146+
match res {
147+
Ok(event) => {
148+
info!("Response event type {:?}", event.data);
149+
match event.data {
150+
RelayerEventData::InputProof(InputProofEventData::RespRcvdFromGw {
151+
input_proof_response,
152+
}) => {
153+
match InputProofResponseJson::try_from(input_proof_response) {
154+
Ok(response_json) => {
155+
info!("Sending success response to user");
156+
(StatusCode::OK, Json(response_json)).into_response()
157+
}
158+
Err(_) => {
159+
info!("sending error reponse to user as response event cannot be decoded");
160+
let error_response = InputProofErrorResponseJson {
161+
message: "request could not be completed 2".to_string(),
162+
};
163+
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response))
164+
.into_response()
165+
}
166+
}
167+
}
168+
_ => {
169+
info!(
170+
"sending error reponse to user as response event is not expected type"
171+
);
172+
let error_response = InputProofErrorResponseJson {
173+
message: "request could not be completed 3".to_string(),
174+
};
175+
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
176+
}
177+
}
178+
}
179+
Err(_) => {
180+
info!("received errror while waiting for response event");
181+
let error_response = InputProofErrorResponseJson {
182+
message: "Failed to receive response from the gateway.".to_string(),
183+
};
184+
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
185+
}
148186
}
149-
Err(_) => {
150-
info!("sending error reponse to user as response event cannot be decoded");
151-
let error_response = InputProofErrorResponseJson {
152-
message: "request could not be completed 2".to_string(),
153-
};
154-
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
187+
}
188+
res = &mut error_rx => {
189+
match res {
190+
Ok(_event) => {
191+
info!("received error event on error_rx");
192+
let error_response = InputProofErrorResponseJson {
193+
message: "REQUEST FAILED RESPONSE".to_string(),
194+
};
195+
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
196+
}
197+
Err(_) => {
198+
info!("received error while waiting for error event on error_rx");
199+
let error_response = InputProofErrorResponseJson {
200+
message: "Failed to receive error response from the gateway.".to_string(),
201+
};
202+
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
203+
}
155204
}
156-
},
157-
_ => {
158-
info!("sending error reponse to user as response event is not expected type");
159-
let error_response = InputProofErrorResponseJson {
160-
message: "request could not be completed 3".to_string(),
161-
};
162-
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
163205
}
164206
}
165207
}

apps/relayer/fhevm-relayer/src/http/public_decrypt_http_listener.rs

Lines changed: 83 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,32 @@ impl<D: EventDispatcher<RelayerEvent> + HandlerRegistry<RelayerEvent>> PublicDec
9393
info!("Validated and assigned request id: {}", request_id);
9494

9595
// Register once handlers for receiving the decryption response from the gateway
96-
let (handler, rx): (OnceHandler<RelayerEvent>, oneshot::Receiver<RelayerEvent>) =
97-
OnceHandler::new();
98-
let handler = Arc::new(handler);
96+
let (response_handler, response_rx): (
97+
OnceHandler<RelayerEvent>,
98+
oneshot::Receiver<RelayerEvent>,
99+
) = OnceHandler::new();
100+
let response_handler = Arc::new(response_handler);
99101

100102
self.orchestrator.register_once_handler(
101103
PublicDecryptEventId::RespRcvdFromGw.into(),
102104
request_id,
103-
handler,
105+
response_handler,
104106
);
105-
info!("Registered once handler");
107+
info!("Registered once handler for response");
108+
109+
// Register once handler for error/failure event
110+
let (error_handler, error_rx): (
111+
OnceHandler<RelayerEvent>,
112+
oneshot::Receiver<RelayerEvent>,
113+
) = OnceHandler::new();
114+
let error_handler = Arc::new(error_handler);
115+
116+
self.orchestrator.register_once_handler(
117+
PublicDecryptEventId::Failed.into(),
118+
request_id,
119+
error_handler,
120+
);
121+
info!("Registered once handler for error");
106122

107123
let request_data = PublicDecryptEventData::ReqRcvdFromFhevm {
108124
decrypt_request: public_decrypt_request,
@@ -115,48 +131,71 @@ impl<D: EventDispatcher<RelayerEvent> + HandlerRegistry<RelayerEvent>> PublicDec
115131
let _ = self.orchestrator.dispatch_event(event).await;
116132
info!("Dispatched event to orchestrator to initiate processing");
117133

118-
info!("Waiting for public decrypt reponse event");
119-
// TODO(Mano): Handle failed event as well.
120-
// Wait for response on the rx of Onshot channel.
121-
let event = match rx.await {
122-
Ok(event) => {
123-
info!("Received public decrypt response event");
124-
event
125-
}
126-
Err(_) => {
127-
info!("Received errror while waiting for response event");
128-
let error_response = PublicDecryptErrorResponseJson {
129-
message: "Failed to receive response from the gateway.".to_string(),
130-
};
131-
return (StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response();
132-
}
133-
};
134+
info!("Waiting for public decrypt response or error event");
135+
136+
use futures::pin_mut;
137+
pin_mut!(response_rx);
138+
pin_mut!(error_rx);
134139

135-
info!("Response event type {:?}", event.data);
136-
match event.data {
137-
RelayerEventData::PublicDecrypt(PublicDecryptEventData::RespRcvdFromGw {
138-
decrypt_response,
139-
}) => match PublicDecryptResponseJson::try_from(decrypt_response) {
140-
Ok(response_json) => {
141-
info!("Sending success reponse to public");
142-
(StatusCode::OK, Json(response_json)).into_response()
140+
tokio::select! {
141+
res = &mut response_rx => {
142+
match res {
143+
Ok(event) => {
144+
info!("Received public decrypt response event");
145+
info!("Response event type {:?}", event.data);
146+
match event.data {
147+
RelayerEventData::PublicDecrypt(PublicDecryptEventData::RespRcvdFromGw {
148+
decrypt_response,
149+
}) => match PublicDecryptResponseJson::try_from(decrypt_response) {
150+
Ok(response_json) => {
151+
info!("Sending success reponse to public");
152+
(StatusCode::OK, Json(response_json)).into_response()
153+
}
154+
Err(error) => {
155+
info!(
156+
"sending error reponse to public as response event cannot be decoded: {}",
157+
error
158+
);
159+
let error_response = PublicDecryptErrorResponseJson {
160+
message: "request could not be completed".to_string(),
161+
};
162+
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
163+
}
164+
},
165+
_ => {
166+
let error_response = PublicDecryptErrorResponseJson {
167+
message: "unexpected error".to_string(),
168+
};
169+
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
170+
}
171+
}
172+
}
173+
Err(_) => {
174+
info!("Received error while waiting for response event");
175+
let error_response = PublicDecryptErrorResponseJson {
176+
message: "Failed to receive response from the gateway.".to_string(),
177+
};
178+
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
179+
}
143180
}
144-
Err(error) => {
145-
info!(
146-
"sending error reponse to public as response event cannot be decoded: {}",
147-
error
148-
);
149-
let error_response = PublicDecryptErrorResponseJson {
150-
message: "request could not be completed".to_string(),
151-
};
152-
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
181+
}
182+
res = &mut error_rx => {
183+
match res {
184+
Ok(_event) => {
185+
info!("Received error event on error_rx");
186+
let error_response = PublicDecryptErrorResponseJson {
187+
message: "REQUEST FAILED RESPONSE".to_string(),
188+
};
189+
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
190+
}
191+
Err(_) => {
192+
info!("Received error while waiting for error event on error_rx");
193+
let error_response = PublicDecryptErrorResponseJson {
194+
message: "Failed to receive error response from the gateway.".to_string(),
195+
};
196+
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
197+
}
153198
}
154-
},
155-
_ => {
156-
let error_response = PublicDecryptErrorResponseJson {
157-
message: "unexpected error".to_string(),
158-
};
159-
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
160199
}
161200
}
162201
}

0 commit comments

Comments
 (0)