Skip to content

Commit 67279e3

Browse files
authored
fix: The quilc RPCQ client is now safe to share between threads. (#389)
* fix: The quilc RPCQ client now creates a new socket per request so it is safe to use in multi-threaded contexts * clean up
1 parent 56dacea commit 67279e3

File tree

1 file changed

+31
-24
lines changed

1 file changed

+31
-24
lines changed

crates/lib/src/compiler/rpcq.rs

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::collections::HashMap;
44
use std::str::FromStr;
5-
use std::sync::{Arc, Mutex};
65

76
use quil_rs::Program;
87
use rmp_serde::Serializer;
@@ -19,7 +18,6 @@ pub(crate) const DEFAULT_CLIENT_TIMEOUT: f64 = 30.0;
1918
#[derive(Clone)]
2019
pub struct Client {
2120
pub(crate) endpoint: String,
22-
socket: Arc<Mutex<Socket>>,
2321
}
2422

2523
impl std::fmt::Debug for Client {
@@ -31,12 +29,7 @@ impl std::fmt::Debug for Client {
3129
impl Client {
3230
/// Construct a new [`Client`] with no authentication configured.
3331
pub fn new(endpoint: &str) -> Result<Self, Error> {
34-
let socket = Context::new()
35-
.socket(SocketType::DEALER)
36-
.map_err(Error::SocketCreation)?;
37-
socket.connect(endpoint).map_err(Error::Communication)?;
3832
Ok(Self {
39-
socket: Arc::new(Mutex::new(socket)),
4033
endpoint: endpoint.to_owned(),
4134
})
4235
}
@@ -50,37 +43,55 @@ impl Client {
5043
&self,
5144
request: &RPCRequest<'_, Request>,
5245
) -> Result<Response, Error> {
53-
self.send(request)?;
54-
self.receive::<Response>(&request.id)
46+
let socket = self.create_socket()?;
47+
Self::send(request, &socket)?;
48+
Self::receive::<Response>(&request.id, &socket)
5549
}
5650

5751
/// Send an RPC request.
5852
///
5953
/// # Arguments
6054
///
6155
/// * `request`: An [`RPCRequest`] containing some params.
62-
pub(crate) fn send<Request: Serialize>(
63-
&self,
56+
/// * `socket`: The ZMQ socket to send the request on.
57+
fn send<Request: Serialize>(
6458
request: &RPCRequest<'_, Request>,
59+
socket: &Socket,
6560
) -> Result<(), Error> {
6661
let mut data = vec![];
6762
request
6863
.serialize(&mut Serializer::new(&mut data).with_struct_map())
6964
.map_err(Error::Serialization)?;
7065

71-
self.socket
72-
.lock()
73-
.map_err(|e| Error::ZmqSocketLock(e.to_string()))?
74-
.send(data, 0)
75-
.map_err(Error::Communication)
66+
socket.send(data, 0).map_err(Error::Communication)
67+
}
68+
69+
/// Creates a new ZMQ socket and connects it to the endpoint.
70+
///
71+
/// [`SocketType::DEALER`] for compatiblity with the quilc servers
72+
/// [`SocketType::ROUTER`]. These sockets are _not_ thread safe, even
73+
/// with a mutex, so a new socket should be created for each request,
74+
/// and the socket should not be shared between threads.
75+
fn create_socket(&self) -> Result<Socket, Error> {
76+
let socket = Context::new()
77+
.socket(SocketType::DEALER)
78+
.map_err(Error::SocketCreation)?;
79+
socket
80+
.connect(&self.endpoint.clone())
81+
.map_err(Error::Communication)?;
82+
socket.set_linger(0).map_err(Error::Communication)?;
83+
Ok(socket)
7684
}
7785

7886
/// Retrieve and decode a response
7987
///
8088
/// returns: Result<Response, Error> where Response is a generic type that implements
8189
/// [`DeserializeOwned`] (meaning [`Deserialize`] with no lifetimes).
82-
fn receive<Response: DeserializeOwned>(&self, request_id: &str) -> Result<Response, Error> {
83-
let data = self.receive_raw()?;
90+
fn receive<Response: DeserializeOwned>(
91+
request_id: &str,
92+
socket: &Socket,
93+
) -> Result<Response, Error> {
94+
let data = Self::receive_raw(socket)?;
8495

8596
let reply: RPCResponse<Response> =
8697
rmp_serde::from_read(data.as_slice()).map_err(Error::Deserialization)?;
@@ -97,12 +108,8 @@ impl Client {
97108
}
98109

99110
/// Retrieve the raw bytes of a response
100-
pub(crate) fn receive_raw(&self) -> Result<Vec<u8>, Error> {
101-
self.socket
102-
.lock()
103-
.map_err(|e| Error::ZmqSocketLock(e.to_string()))?
104-
.recv_bytes(0)
105-
.map_err(Error::Communication)
111+
fn receive_raw(socket: &Socket) -> Result<Vec<u8>, Error> {
112+
socket.recv_bytes(0).map_err(Error::Communication)
106113
}
107114
}
108115

0 commit comments

Comments
 (0)