Skip to content

Commit 0e3f799

Browse files
committed
chore: update Iceoryx and tungstenite.
1 parent 88cac10 commit 0e3f799

File tree

17 files changed

+70
-66
lines changed

17 files changed

+70
-66
lines changed

collector/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ serde = { version = "1.0.196", features = ["derive"] }
1111
tokio = { version = "1.35.1", features = ["full"] }
1212
serde_json = "1.0.113"
1313
futures-util = "0.3.30"
14-
tokio-tungstenite = { version = "0.24.0", features = ["native-tls"] }
14+
tokio-tungstenite = { version = "0.26.1", features = ["native-tls"] }
1515
tracing = "0.1.40"
1616
tracing-subscriber = { version = "0.3.18", features = [] }
1717
anyhow = "1.0.79"

collector/src/binance/http.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use futures_util::{SinkExt, StreamExt};
1010
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
1111
use tokio_tungstenite::{
1212
connect_async,
13-
tungstenite::{client::IntoClientRequest, Message},
13+
tungstenite::{client::IntoClientRequest, Bytes, Message, Utf8Bytes},
1414
};
1515
use tracing::{error, warn};
1616

@@ -52,7 +52,7 @@ pub async fn fetch_depth_snapshot(symbol: &str) -> Result<String, reqwest::Error
5252

5353
pub async fn connect(
5454
url: &str,
55-
ws_tx: UnboundedSender<(DateTime<Utc>, String)>,
55+
ws_tx: UnboundedSender<(DateTime<Utc>, Utf8Bytes)>,
5656
) -> Result<(), anyhow::Error> {
5757
let request = url.into_client_request()?;
5858
let (ws_stream, _) = connect_async(request).await?;
@@ -61,7 +61,7 @@ pub async fn connect(
6161

6262
tokio::spawn(async move {
6363
while rx.recv().await.is_some() {
64-
if write.send(Message::Pong(Vec::new())).await.is_err() {
64+
if write.send(Message::Pong(Bytes::default())).await.is_err() {
6565
return;
6666
}
6767
}
@@ -102,7 +102,7 @@ pub async fn connect(
102102
pub async fn keep_connection(
103103
streams: Vec<String>,
104104
symbol_list: Vec<String>,
105-
ws_tx: UnboundedSender<(DateTime<Utc>, String)>,
105+
ws_tx: UnboundedSender<(DateTime<Utc>, Utf8Bytes)>,
106106
) {
107107
let mut error_count = 0;
108108
loop {

collector/src/binance/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::collections::HashMap;
55
use chrono::{DateTime, Utc};
66
pub use http::{fetch_depth_snapshot, keep_connection};
77
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
8+
use tokio_tungstenite::tungstenite::Utf8Bytes;
89
use tracing::{error, warn};
910

1011
use crate::{error::ConnectorError, throttler::Throttler};
@@ -13,10 +14,10 @@ fn handle(
1314
prev_u_map: &mut HashMap<String, i64>,
1415
writer_tx: &UnboundedSender<(DateTime<Utc>, String, String)>,
1516
recv_time: DateTime<Utc>,
16-
data: String,
17+
data: Utf8Bytes,
1718
throttler: &Throttler,
1819
) -> Result<(), ConnectorError> {
19-
let j: serde_json::Value = serde_json::from_str(&data)?;
20+
let j: serde_json::Value = serde_json::from_str(data.as_str())?;
2021
if let Some(j_data) = j.get("data") {
2122
if let Some(j_symbol) = j_data
2223
.as_object()
@@ -69,7 +70,7 @@ fn handle(
6970
*prev_u_map.entry(symbol.to_string()).or_insert(0) = u;
7071
}
7172
}
72-
let _ = writer_tx.send((recv_time, symbol.to_string(), data));
73+
let _ = writer_tx.send((recv_time, symbol.to_string(), data.to_string()));
7374
}
7475
}
7576
Ok(())

collector/src/binancefuturescm/http.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use futures_util::{SinkExt, StreamExt};
1010
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
1111
use tokio_tungstenite::{
1212
connect_async,
13-
tungstenite::{client::IntoClientRequest, Message},
13+
tungstenite::{client::IntoClientRequest, Bytes, Message, Utf8Bytes},
1414
};
1515
use tracing::{error, warn};
1616

@@ -53,7 +53,7 @@ pub async fn fetch_depth_snapshot(symbol: &str) -> Result<String, reqwest::Error
5353

5454
pub async fn connect(
5555
url: &str,
56-
ws_tx: UnboundedSender<(DateTime<Utc>, String)>,
56+
ws_tx: UnboundedSender<(DateTime<Utc>, Utf8Bytes)>,
5757
) -> Result<(), anyhow::Error> {
5858
let request = url.into_client_request()?;
5959
let (ws_stream, _) = connect_async(request).await?;
@@ -62,7 +62,7 @@ pub async fn connect(
6262

6363
tokio::spawn(async move {
6464
while rx.recv().await.is_some() {
65-
if write.send(Message::Pong(Vec::new())).await.is_err() {
65+
if write.send(Message::Pong(Bytes::default())).await.is_err() {
6666
return;
6767
}
6868
}
@@ -103,7 +103,7 @@ pub async fn connect(
103103
pub async fn keep_connection(
104104
streams: Vec<String>,
105105
symbol_list: Vec<String>,
106-
ws_tx: UnboundedSender<(DateTime<Utc>, String)>,
106+
ws_tx: UnboundedSender<(DateTime<Utc>, Utf8Bytes)>,
107107
) {
108108
let mut error_count = 0;
109109
loop {

collector/src/binancefuturescm/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::collections::HashMap;
55
use chrono::{DateTime, Utc};
66
pub use http::{fetch_depth_snapshot, keep_connection};
77
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
8+
use tokio_tungstenite::tungstenite::Utf8Bytes;
89
use tracing::{error, warn};
910

1011
use crate::{error::ConnectorError, throttler::Throttler};
@@ -13,10 +14,10 @@ fn handle(
1314
prev_u_map: &mut HashMap<String, i64>,
1415
writer_tx: &UnboundedSender<(DateTime<Utc>, String, String)>,
1516
recv_time: DateTime<Utc>,
16-
data: String,
17+
data: Utf8Bytes,
1718
throttler: &Throttler,
1819
) -> Result<(), ConnectorError> {
19-
let j: serde_json::Value = serde_json::from_str(&data)?;
20+
let j: serde_json::Value = serde_json::from_str(data.as_str())?;
2021
if let Some(j_data) = j.get("data") {
2122
if let Some(j_symbol) = j_data
2223
.as_object()
@@ -70,7 +71,7 @@ fn handle(
7071
}
7172
*prev_u_map.entry(symbol.to_string()).or_insert(0) = u;
7273
}
73-
let _ = writer_tx.send((recv_time, symbol.to_string(), data));
74+
let _ = writer_tx.send((recv_time, symbol.to_string(), data.to_string()));
7475
}
7576
}
7677
Ok(())

collector/src/binancefuturesum/http.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use futures_util::{SinkExt, StreamExt};
1010
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
1111
use tokio_tungstenite::{
1212
connect_async,
13-
tungstenite::{client::IntoClientRequest, Message},
13+
tungstenite::{client::IntoClientRequest, Bytes, Message, Utf8Bytes},
1414
};
1515
use tracing::{error, warn};
1616

@@ -53,7 +53,7 @@ pub async fn fetch_depth_snapshot(symbol: &str) -> Result<String, reqwest::Error
5353

5454
pub async fn connect(
5555
url: &str,
56-
ws_tx: UnboundedSender<(DateTime<Utc>, String)>,
56+
ws_tx: UnboundedSender<(DateTime<Utc>, Utf8Bytes)>,
5757
) -> Result<(), anyhow::Error> {
5858
let request = url.into_client_request()?;
5959
let (ws_stream, _) = connect_async(request).await?;
@@ -62,7 +62,7 @@ pub async fn connect(
6262

6363
tokio::spawn(async move {
6464
while rx.recv().await.is_some() {
65-
if write.send(Message::Pong(Vec::new())).await.is_err() {
65+
if write.send(Message::Pong(Bytes::default())).await.is_err() {
6666
return;
6767
}
6868
}
@@ -103,7 +103,7 @@ pub async fn connect(
103103
pub async fn keep_connection(
104104
streams: Vec<String>,
105105
symbol_list: Vec<String>,
106-
ws_tx: UnboundedSender<(DateTime<Utc>, String)>,
106+
ws_tx: UnboundedSender<(DateTime<Utc>, Utf8Bytes)>,
107107
) {
108108
let mut error_count = 0;
109109
loop {

collector/src/binancefuturesum/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::collections::HashMap;
55
use chrono::{DateTime, Utc};
66
pub use http::{fetch_depth_snapshot, keep_connection};
77
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
8+
use tokio_tungstenite::tungstenite::Utf8Bytes;
89
use tracing::{error, warn};
910

1011
use crate::{error::ConnectorError, throttler::Throttler};
@@ -13,10 +14,10 @@ fn handle(
1314
prev_u_map: &mut HashMap<String, i64>,
1415
writer_tx: &UnboundedSender<(DateTime<Utc>, String, String)>,
1516
recv_time: DateTime<Utc>,
16-
data: String,
17+
data: Utf8Bytes,
1718
throttler: &Throttler,
1819
) -> Result<(), ConnectorError> {
19-
let j: serde_json::Value = serde_json::from_str(&data)?;
20+
let j: serde_json::Value = serde_json::from_str(data.as_str())?;
2021
if let Some(j_data) = j.get("data") {
2122
if let Some(j_symbol) = j_data
2223
.as_object()
@@ -70,7 +71,7 @@ fn handle(
7071
}
7172
*prev_u_map.entry(symbol.to_string()).or_insert(0) = u;
7273
}
73-
let _ = writer_tx.send((recv_time, symbol.to_string(), data));
74+
let _ = writer_tx.send((recv_time, symbol.to_string(), data.to_string()));
7475
}
7576
}
7677
Ok(())

collector/src/bybit/http.rs

+16-13
Original file line numberDiff line numberDiff line change
@@ -13,29 +13,32 @@ use tokio::{
1313
};
1414
use tokio_tungstenite::{
1515
connect_async,
16-
tungstenite::{client::IntoClientRequest, Message},
16+
tungstenite::{client::IntoClientRequest, Bytes, Message, Utf8Bytes},
1717
};
1818
use tracing::{error, warn};
1919

2020
pub async fn connect(
2121
url: &str,
2222
topics: Vec<String>,
23-
ws_tx: UnboundedSender<(DateTime<Utc>, String)>,
23+
ws_tx: UnboundedSender<(DateTime<Utc>, Utf8Bytes)>,
2424
) -> Result<(), anyhow::Error> {
2525
let request = url.into_client_request()?;
2626
let (ws_stream, _) = connect_async(request).await?;
2727
let (mut write, mut read) = ws_stream.split();
2828
let (tx, mut rx) = unbounded_channel::<()>();
2929

3030
write
31-
.send(Message::Text(format!(
32-
r#"{{"req_id": "subscribe", "op": "subscribe", "args": [{}]}}"#,
33-
topics
34-
.iter()
35-
.map(|s| format!("\"{s}\""))
36-
.collect::<Vec<_>>()
37-
.join(",")
38-
)))
31+
.send(Message::Text(
32+
format!(
33+
r#"{{"req_id": "subscribe", "op": "subscribe", "args": [{}]}}"#,
34+
topics
35+
.iter()
36+
.map(|s| format!("\"{s}\""))
37+
.collect::<Vec<_>>()
38+
.join(",")
39+
)
40+
.into(),
41+
))
3942
.await?;
4043

4144
tokio::spawn(async move {
@@ -45,7 +48,7 @@ pub async fn connect(
4548
result = rx.recv() => {
4649
match result {
4750
Some(_) => {
48-
if write.send(Message::Pong(Vec::new())).await.is_err() {
51+
if write.send(Message::Pong(Bytes::default())).await.is_err() {
4952
return;
5053
}
5154
}
@@ -56,7 +59,7 @@ pub async fn connect(
5659
}
5760
_ = ping_interval.tick() => {
5861
if write.send(
59-
Message::Text(r#"{"req_id": "ping", "op": "ping"}"#.to_string())
62+
Message::Text(r#"{"req_id": "ping", "op": "ping"}"#.into())
6063
).await.is_err() {
6164
return;
6265
}
@@ -100,7 +103,7 @@ pub async fn connect(
100103
pub async fn keep_connection(
101104
topics: Vec<String>,
102105
symbol_list: Vec<String>,
103-
ws_tx: UnboundedSender<(DateTime<Utc>, String)>,
106+
ws_tx: UnboundedSender<(DateTime<Utc>, Utf8Bytes)>,
104107
) {
105108
let mut error_count = 0;
106109
loop {

collector/src/bybit/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use chrono::{DateTime, Utc};
22
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
3+
use tokio_tungstenite::tungstenite::Utf8Bytes;
34
use tracing::error;
45

56
use self::http::keep_connection;
@@ -10,13 +11,13 @@ mod http;
1011
fn handle(
1112
writer_tx: &UnboundedSender<(DateTime<Utc>, String, String)>,
1213
recv_time: DateTime<Utc>,
13-
data: String,
14+
data: Utf8Bytes,
1415
) -> Result<(), ConnectorError> {
15-
let j: serde_json::Value = serde_json::from_str(&data)?;
16+
let j: serde_json::Value = serde_json::from_str(data.as_str())?;
1617
if let Some(j_topic) = j.get("topic") {
1718
let topic = j_topic.as_str().ok_or(ConnectorError::FormatError)?;
1819
let symbol = topic.split(".").last().ok_or(ConnectorError::FormatError)?;
19-
let _ = writer_tx.send((recv_time, symbol.to_string(), data));
20+
let _ = writer_tx.send((recv_time, symbol.to_string(), data.to_string()));
2021
} else if let Some(j_success) = j.get("success") {
2122
let success = j_success.as_bool().ok_or(ConnectorError::FormatError)?;
2223
if !success {

connector/Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ chrono = { version = "0.4.33" }
1818
serde = { version = "1.0.196", features = ["derive"] }
1919
serde_json = { version = "1.0.113" }
2020
tokio = { version = "1.35.1", features = ["full"] }
21-
tokio-tungstenite = { version = "0.24.0", features = ["rustls-tls-native-roots"] }
21+
tokio-tungstenite = { version = "0.26.1", features = ["rustls-tls-native-roots"] }
2222
reqwest = { version = "0.12.3", default-features = false, features = ["json", "rustls-tls-native-roots"] }
2323
futures-util = { version = "0.3.30" }
2424
sha2 = { version = "0.11.0-pre.3" }
2525
hmac = { version = "0.13.0-pre.3" }
26-
iceoryx2 = { version = "0.4.1", features = ["logger_tracing"] }
26+
iceoryx2 = { version = "0.5.0", features = ["logger_tracing"] }
2727
toml = "0.8.19"
2828
tracing-subscriber = "0.3.18"
2929
clap = { version = "4.5.15", features = ["derive"] }

connector/src/binancefutures/market_data_stream.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ impl MarketDataStream {
275275
"{symbol}@depth@0ms"
276276
],
277277
"id": "{id}"
278-
}}"#))).await?;
278+
}}"#).into())).await?;
279279
}
280280
Err(RecvError::Closed) => {
281281
return Ok(());

connector/src/bybit/private_stream.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tokio::{
1414
};
1515
use tokio_tungstenite::{
1616
connect_async,
17-
tungstenite::{client::IntoClientRequest, Message},
17+
tungstenite::{client::IntoClientRequest, Bytes, Message},
1818
MaybeTlsStream,
1919
WebSocketStream,
2020
};
@@ -90,7 +90,7 @@ impl PrivateStream {
9090
],
9191
};
9292
let s = serde_json::to_string(&op).unwrap();
93-
write.send(Message::Text(s)).await?;
93+
write.send(Message::Text(s.into())).await?;
9494
} else {
9595
return Err(BybitError::AuthError {
9696
msg: resp.ret_msg.unwrap(),
@@ -258,7 +258,7 @@ impl PrivateStream {
258258
args: vec![self.api_key.clone(), expires.to_string(), signature],
259259
};
260260
let s = serde_json::to_string(&op).unwrap();
261-
write.send(Message::Text(s)).await?;
261+
write.send(Message::Text(s.into())).await?;
262262

263263
loop {
264264
select! {
@@ -269,7 +269,7 @@ impl PrivateStream {
269269
args: vec![]
270270
};
271271
let s = serde_json::to_string(&op).unwrap();
272-
write.send(Message::Text(s)).await?;
272+
write.send(Message::Text(s.into())).await?;
273273
}
274274
msg = self.symbol_rx.recv() => {
275275
match msg {
@@ -337,7 +337,7 @@ impl PrivateStream {
337337
}
338338
}
339339
Some(Ok(Message::Ping(_))) => {
340-
write.send(Message::Pong(Vec::new())).await?;
340+
write.send(Message::Pong(Bytes::default())).await?;
341341
}
342342
Some(Ok(Message::Close(close_frame))) => {
343343
return Err(BybitError::ConnectionAbort(

0 commit comments

Comments
 (0)