Skip to content

Commit 49ec1a8

Browse files
committed
fix: add more telemetry to ingestor
1 parent 784b76c commit 49ec1a8

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

Diff for: pragma-ingestor/src/db.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub async fn process_spot_entries(pool: Pool, mut rx: mpsc::Receiver<NewEntry>)
5050
#[tracing::instrument(skip(pool, rx))]
5151
pub async fn process_future_entries(pool: Pool, mut rx: mpsc::Receiver<NewFutureEntry>) {
5252
const BUFFER_CAPACITY: usize = 100;
53-
const FLUSH_TIMEOUT: Duration = Duration::from_secs(30);
53+
const FLUSH_TIMEOUT: Duration = Duration::from_millis(50);
5454

5555
let mut buffer = Vec::with_capacity(BUFFER_CAPACITY);
5656
let mut flush_interval = interval(FLUSH_TIMEOUT);
@@ -129,7 +129,7 @@ pub async fn process_funding_rate_entries(pool: Pool, mut rx: mpsc::Receiver<New
129129
}
130130
}
131131

132-
#[tracing::instrument(skip(pool))]
132+
#[tracing::instrument(skip_all, fields(num_entries = new_entries.len()))]
133133
async fn insert_spot_entries(pool: &Pool, new_entries: Vec<NewEntry>) -> Result<(), InfraError> {
134134
let conn = pool.get().await.map_err(InfraError::DbPoolError)?;
135135
let entries = conn
@@ -148,7 +148,7 @@ async fn insert_spot_entries(pool: &Pool, new_entries: Vec<NewEntry>) -> Result<
148148
Ok(())
149149
}
150150

151-
#[tracing::instrument(skip(pool))]
151+
#[tracing::instrument(skip_all, fields(num_entries = new_entries.len()))]
152152
async fn insert_future_entries(
153153
pool: &Pool,
154154
new_entries: Vec<NewFutureEntry>,
@@ -189,7 +189,7 @@ async fn insert_future_entries(
189189
Ok(())
190190
}
191191

192-
#[tracing::instrument(skip(pool))]
192+
#[tracing::instrument(skip_all, fields(num_entries = new_entries.len()))]
193193
async fn insert_funding_rate_entries(
194194
pool: &Pool,
195195
new_entries: Vec<NewFundingRate>,

Diff for: pragma-ingestor/src/main.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ mod config;
2323
mod db;
2424
mod error;
2525

26-
const CHANNEL_CAPACITY: usize = 10_000;
26+
const CHANNEL_CAPACITY: usize = 1_000_000;
2727
const PUBLISHER_NAME: &str = "PRAGMA";
28-
const KAFKA_GROUP_ID: &str = "pragma-ingestor";
28+
const KAFKA_GROUP_ID: &str = "pragma-ingestor-local";
2929

3030
#[tokio::main]
3131
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -93,6 +93,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9393
}
9494

9595
/// Runs a Kafka consumer for price entries
96+
#[tracing::instrument(skip_all)]
9697
async fn run_price_consumer(
9798
config: FauConfig,
9899
group_id: String,
@@ -162,6 +163,7 @@ async fn run_price_consumer(
162163
}
163164

164165
/// Runs a Kafka consumer for funding rate entries
166+
#[tracing::instrument(skip_all)]
165167
async fn run_funding_rate_consumer(
166168
config: FauConfig,
167169
group_id: String,

0 commit comments

Comments
 (0)