Skip to content

Commit aa1b379

Browse files
committed
fix: add some configs to ingestor
1 parent 57d08c0 commit aa1b379

File tree

7 files changed

+202
-186
lines changed

7 files changed

+202
-186
lines changed

Diff for: .gitignore

-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
/target
22
.idea
33
.env
4-
db/
54
infra/db/password.txt
65

76
# Vscode settings

Diff for: pragma-ingestor/src/config.rs

+16
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,22 @@ pub(crate) struct Ingestor {
99
/// Number of consumers to run
1010
#[arg(long, env = "NUM_CONSUMERS", default_value = "10")]
1111
pub(crate) num_consumers: usize,
12+
13+
/// Channel capacity for message queues
14+
#[arg(long, env = "CHANNEL_CAPACITY", default_value = "1000000")]
15+
pub(crate) channel_capacity: usize,
16+
17+
/// Publisher name for entries
18+
#[arg(long, env = "PUBLISHER_NAME", default_value = "PRAGMA")]
19+
pub(crate) publisher_name: String,
20+
21+
/// Kafka consumer group ID
22+
#[arg(long, env = "KAFKA_GROUP_ID", default_value = "pragma-ingestor")]
23+
pub(crate) kafka_group_id: String,
24+
25+
/// OpenTelemetry endpoint for telemetry data
26+
#[arg(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")]
27+
pub(crate) otel_endpoint: Option<String>,
1228
}
1329

1430
pub(crate) fn load_configuration() -> Ingestor {

Diff for: pragma-ingestor/src/db.rs

-168
This file was deleted.

Diff for: pragma-ingestor/src/db/insert.rs

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
use deadpool_diesel::postgres::Pool;
2+
use pragma_entities::{
3+
Entry, FundingRate, FutureEntry, InfraError, NewEntry, NewFundingRate, NewFutureEntry,
4+
};
5+
use tracing::debug;
6+
7+
#[tracing::instrument(skip_all, fields(num_entries = new_entries.len()))]
8+
pub(crate) async fn insert_spot_entries(
9+
pool: &Pool,
10+
new_entries: Vec<NewEntry>,
11+
) -> Result<(), InfraError> {
12+
let conn = pool.get().await.map_err(InfraError::DbPoolError)?;
13+
conn.interact(move |conn| Entry::create_many(conn, new_entries))
14+
.await
15+
.map_err(InfraError::DbInteractionError)?
16+
.map_err(InfraError::DbResultError)?;
17+
18+
Ok(())
19+
}
20+
21+
#[tracing::instrument(skip_all, fields(num_entries = new_entries.len()))]
22+
pub(crate) async fn insert_future_entries(
23+
pool: &Pool,
24+
new_entries: Vec<NewFutureEntry>,
25+
) -> Result<(), InfraError> {
26+
let conn = pool.get().await.map_err(InfraError::DbPoolError)?;
27+
28+
let new_entries = new_entries
29+
.into_iter()
30+
.map(|mut entry| {
31+
if let Some(expiration_timestamp) = entry.expiration_timestamp {
32+
if expiration_timestamp.and_utc().timestamp() == 0 {
33+
entry.expiration_timestamp = None;
34+
}
35+
}
36+
entry
37+
})
38+
.collect::<Vec<_>>();
39+
40+
debug!(
41+
"[PERP] {} new entries available",
42+
new_entries
43+
.iter()
44+
.filter(|entry| entry.expiration_timestamp.is_none())
45+
.count()
46+
);
47+
48+
conn.interact(move |conn| FutureEntry::create_many(conn, new_entries))
49+
.await
50+
.map_err(InfraError::DbInteractionError)?
51+
.map_err(InfraError::DbResultError)?;
52+
Ok(())
53+
}
54+
55+
#[tracing::instrument(skip_all, fields(num_entries = new_entries.len()))]
56+
pub(crate) async fn insert_funding_rate_entries(
57+
pool: &Pool,
58+
new_entries: Vec<NewFundingRate>,
59+
) -> Result<(), InfraError> {
60+
let conn = pool.get().await.map_err(InfraError::DbPoolError)?;
61+
let entries = conn
62+
.interact(move |conn| FundingRate::create_many(conn, new_entries))
63+
.await
64+
.map_err(InfraError::DbInteractionError)?
65+
.map_err(InfraError::DbResultError)?;
66+
67+
for entry in &entries {
68+
debug!(
69+
"new funding rate entry created {} - {}({}) - {}",
70+
entry.source, entry.pair, entry.annualized_rate, entry.timestamp
71+
);
72+
}
73+
74+
Ok(())
75+
}

Diff for: pragma-ingestor/src/db/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub(crate) mod insert;
2+
pub(crate) mod process;

Diff for: pragma-ingestor/src/db/process.rs

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use deadpool_diesel::postgres::Pool;
2+
use pragma_entities::{NewEntry, NewFundingRate, NewFutureEntry};
3+
use tokio::sync::mpsc;
4+
5+
use crate::db::insert::{insert_funding_rate_entries, insert_future_entries, insert_spot_entries};
6+
7+
const BATCH_SIZE: usize = 100;
8+
9+
#[tracing::instrument(skip(pool, rx))]
10+
pub async fn process_spot_entries(pool: Pool, mut rx: mpsc::Receiver<NewEntry>) {
11+
let mut buffer = Vec::with_capacity(BATCH_SIZE);
12+
13+
loop {
14+
tokio::select! {
15+
Some(entry) = rx.recv() => {
16+
buffer.push(entry);
17+
18+
if buffer.len() >= BATCH_SIZE {
19+
if let Err(e) = insert_spot_entries(&pool, std::mem::take(&mut buffer)).await {
20+
tracing::error!("❌ Failed to insert spot entries: {}", e);
21+
}
22+
buffer = Vec::with_capacity(BATCH_SIZE);
23+
}
24+
}
25+
else => {
26+
// Channel closed, flush remaining entries
27+
if !buffer.is_empty() {
28+
if let Err(e) = insert_spot_entries(&pool, buffer).await {
29+
tracing::error!("❌ Failed to flush final spot entries: {}", e);
30+
}
31+
}
32+
break;
33+
}
34+
}
35+
}
36+
}
37+
38+
#[tracing::instrument(skip(pool, rx))]
39+
pub async fn process_future_entries(pool: Pool, mut rx: mpsc::Receiver<NewFutureEntry>) {
40+
let mut buffer = Vec::with_capacity(BATCH_SIZE);
41+
42+
loop {
43+
tokio::select! {
44+
Some(entry) = rx.recv() => {
45+
buffer.push(entry);
46+
47+
if buffer.len() >= BATCH_SIZE {
48+
if let Err(e) = insert_future_entries(&pool, std::mem::take(&mut buffer)).await {
49+
tracing::error!("❌ Failed to insert future entries: {}", e);
50+
}
51+
buffer = Vec::with_capacity(BATCH_SIZE);
52+
}
53+
}
54+
else => {
55+
// Channel closed, flush remaining entries
56+
if !buffer.is_empty() {
57+
if let Err(e) = insert_future_entries(&pool, buffer).await {
58+
tracing::error!("❌ Failed to flush final future entries: {}", e);
59+
}
60+
}
61+
break;
62+
}
63+
}
64+
}
65+
}
66+
67+
#[tracing::instrument(skip(pool, rx))]
68+
pub async fn process_funding_rate_entries(pool: Pool, mut rx: mpsc::Receiver<NewFundingRate>) {
69+
let mut buffer = Vec::with_capacity(BATCH_SIZE);
70+
71+
loop {
72+
tokio::select! {
73+
Some(entry) = rx.recv() => {
74+
buffer.push(entry);
75+
76+
if buffer.len() >= BATCH_SIZE {
77+
if let Err(e) = insert_funding_rate_entries(&pool, std::mem::take(&mut buffer)).await {
78+
tracing::error!("❌ Failed to insert funding rate entries: {}", e);
79+
}
80+
buffer = Vec::with_capacity(BATCH_SIZE);
81+
}
82+
}
83+
else => {
84+
// Channel closed, flush remaining entries
85+
if !buffer.is_empty() {
86+
if let Err(e) = insert_funding_rate_entries(&pool, buffer).await {
87+
tracing::error!("❌ Failed to flush final funding rate entries: {}", e);
88+
}
89+
}
90+
break;
91+
}
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)