Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added bitmart futures data collection #188

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions collector/src/bitmart/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use std::{
io,
io::ErrorKind,
time::{Duration, Instant},
};

use anyhow::Error;
use chrono::{DateTime, Utc};
use futures_util::{SinkExt, StreamExt};
use tokio::{
select,
sync::mpsc::{UnboundedSender, unbounded_channel},
};
use tokio_tungstenite::{
connect_async,
tungstenite::{Bytes, Message, Utf8Bytes, client::IntoClientRequest},
};
use tracing::{error, warn};

pub async fn fetch_depth_snapshot(symbol: &str) -> Result<String, reqwest::Error> {
reqwest::Client::new()
.get(format!(
"https://api-cloud-v2.bitmart.com/contract/public/depth?symbol={symbol}"
))
.header("Accept", "application/json")
.send()
.await?
.text()
.await
}

pub async fn connect(
url: &str,
topics: Vec<String>,
ws_tx: UnboundedSender<(DateTime<Utc>, Utf8Bytes)>,
) -> Result<(), anyhow::Error> {
let request = url.into_client_request()?;
let (ws_stream, _) = connect_async(request).await?;
let (mut write, mut read) = ws_stream.split();
let (tx, mut rx) = unbounded_channel::<()>();

let s = format!(
r#"{{"action":"subscribe","args":[{}]}}"#,
topics
.iter()
.map(|s| format!("\"{s}\""))
.collect::<Vec<_>>()
.join(",")
);

write
.send(Message::Text(
format!(
// r#"{{"req_id": "subscribe", "op": "subscribe", "args": [{}]}}"#,
r#"{{"action":"subscribe","args":[{}]}}"#,
topics
.iter()
.map(|s| format!("\"{s}\""))
.collect::<Vec<_>>()
.join(",")
)
.into(),
))
.await?;

tokio::spawn(async move {
let mut ping_interval = tokio::time::interval(Duration::from_secs(30));
loop {
select! {
result = rx.recv() => {
match result {
Some(_) => {
if write.send(Message::Pong(Bytes::default())).await.is_err() {
return;
}
}
None => {
break;
}
}
}
_ = ping_interval.tick() => {
if write.send(
Message::Text(r#"{"action":"ping"}"#.into())
).await.is_err() {
return;
}
}
}
}
});

loop {
match read.next().await {
Some(Ok(Message::Text(text))) => {
let recv_time = Utc::now();
if ws_tx.send((recv_time, text)).is_err() {
break;
}
}
Some(Ok(Message::Binary(_))) => {}
Some(Ok(Message::Ping(_))) => {
tx.send(()).unwrap();
}
Some(Ok(Message::Pong(_))) => {}
Some(Ok(Message::Close(close_frame))) => {
warn!(?close_frame, "closed");
return Err(Error::from(io::Error::new(
ErrorKind::ConnectionAborted,
"closed",
)));
}
Some(Ok(Message::Frame(_))) => {}
Some(Err(e)) => {
return Err(Error::from(e));
}
None => {
break;
}
}
}
Ok(())
}

pub async fn keep_connection(
topics: Vec<String>,
symbol_list: Vec<String>,
ws_tx: UnboundedSender<(DateTime<Utc>, Utf8Bytes)>,
) {
let mut error_count = 0;
loop {
let connect_time = Instant::now();
let topics_ = symbol_list
.iter()
.flat_map(|pair| {
topics
.iter()
.cloned()
.map(|stream| {
stream
.replace("$symbol", pair.to_uppercase().as_str())
.to_string()
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
if let Err(error) = connect(
"wss://openapi-ws-v2.bitmart.com/api?protocol=1.1",
topics_,
ws_tx.clone(),
)
.await
{
error!(?error, "websocket error");
error_count += 1;
if connect_time.elapsed() > Duration::from_secs(30) {
error_count = 0;
}
if error_count > 3 {
tokio::time::sleep(Duration::from_secs(1)).await;
} else if error_count > 10 {
tokio::time::sleep(Duration::from_secs(5)).await;
} else if error_count > 20 {
tokio::time::sleep(Duration::from_secs(10)).await;
}
} else {
break;
}
}
}
52 changes: 52 additions & 0 deletions collector/src/bitmart/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
mod http;

use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
use chrono::{DateTime, Utc};
pub use http::{fetch_depth_snapshot, keep_connection};
use tracing::{error, warn};
use tokio_tungstenite::tungstenite::Utf8Bytes;

use crate::{error::ConnectorError, throttler::Throttler};
use std::collections::HashMap;

fn handle(
writer_tx: &UnboundedSender<(DateTime<Utc>, String, String)>,
recv_time: DateTime<Utc>,
data: Utf8Bytes
) -> Result<(), ConnectorError> {
let j: serde_json::Value = serde_json::from_str(data.as_str())?;
let group = j.get("group").ok_or(ConnectorError::FormatError)?.as_str().ok_or(ConnectorError::FormatError)?;
// If the group string starts with "futures/trade"
if group.starts_with("futures/trade") {
let symbol = group.split("/trade:").last().ok_or(ConnectorError::FormatError)?;
let _ = writer_tx.send((recv_time, symbol.to_string(), data.to_string()));
} else if group.starts_with("futures/depthIncrease50") {
if let Some(j_data) = j.get("data") {
if let Some(j_symbol) = j_data
.as_object()
.ok_or(ConnectorError::FormatError)?
.get("symbol")
{
let symbol = j_symbol.as_str().ok_or(ConnectorError::FormatError)?;
let _ = writer_tx.send((recv_time, symbol.to_string(), data.to_string()));
}
}
}
Ok(())
}

pub async fn run_collection(
topics: Vec<String>,
symbols: Vec<String>,
writer_tx: UnboundedSender<(DateTime<Utc>, String, String)>,
) -> Result<(), anyhow::Error> {
let (ws_tx, mut ws_rx) = unbounded_channel();
let h = tokio::spawn(keep_connection(topics, symbols, ws_tx.clone()));
while let Some((recv_time, data)) = ws_rx.recv().await {
if let Err(error) = handle(&writer_tx, recv_time, data) {
error!(?error, "couldn't handle the received data.");
}
}
let _ = h.await;
Ok(())
}
12 changes: 12 additions & 0 deletions collector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod bybit;
mod error;
mod file;
mod throttler;
mod bitmart;

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
Expand Down Expand Up @@ -90,6 +91,17 @@ async fn main() -> Result<(), anyhow::Error> {

tokio::spawn(bybit::run_collection(topics, args.symbols, writer_tx))
}
"bitmart" => {
let topics = [
"futures/depthIncrease50:$symbol@100ms",
"futures/trade:$symbol",
]
.iter()
.map(|topic| topic.to_string())
.collect();

tokio::spawn(bitmart::run_collection(topics, args.symbols, writer_tx))
}
exchange => {
return Err(anyhow!("{exchange} is not supported."));
}
Expand Down
Loading