Skip to content

Commit 784b76c

Browse files
authored
feat: funding rates table ingested from kafka (#226)
* feat: refacto pragma-ingestor * feat: funding rates table migratin * feat: funding rates ingestor tasks * fix: clippy
1 parent 76ba9d6 commit 784b76c

File tree

15 files changed

+453
-269
lines changed

15 files changed

+453
-269
lines changed

Cargo.lock

+4-21
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,9 @@ nonzero_ext = { version = "0.3.0" }
9898
serde_json = { version = "1.0.122", features = ["arbitrary_precision"] }
9999
starknet = "0.14.0"
100100
starknet-crypto = "0.7.4"
101+
clap = { version = "4.4", features = ["derive", "env"] }
101102
reqwest = { version = "0.12.5", features = ["blocking"] }
102-
rdkafka = "0.36.2"
103+
rdkafka = "0.37.0"
103104
thiserror = "2.0.12"
104105
strum = { version = "0.26", features = ["derive"] }
105106
url = "2.5.0"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
-- This file should undo anything in `up.sql`
2+
3+
-- Remove compression policy first
4+
SELECT remove_compression_policy('funding_rates');
5+
6+
-- Disable compression
7+
ALTER TABLE funding_rates SET (timescaledb.compress = false);
8+
9+
-- Drop the hypertable
10+
DROP TABLE funding_rates CASCADE;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
-- Your SQL goes here
2+
3+
CREATE TABLE funding_rates (
4+
id uuid DEFAULT uuid_generate_v4(),
5+
source VARCHAR NOT NULL,
6+
pair VARCHAR NOT NULL,
7+
annualized_rate DOUBLE PRECISION NOT NULL,
8+
timestamp TIMESTAMPTZ NOT NULL,
9+
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
10+
PRIMARY KEY (id, timestamp)
11+
);
12+
13+
-- Convert the table to a hypertable
14+
SELECT create_hypertable('funding_rates', 'timestamp', chunk_time_interval => INTERVAL '1 day');
15+
16+
-- Create an index for efficient querying by pair
17+
CREATE INDEX idx_funding_rates_pair ON funding_rates(pair);
18+
19+
-- Enable compression
20+
ALTER TABLE funding_rates SET (
21+
timescaledb.compress,
22+
timescaledb.compress_segmentby = 'source,pair'
23+
);
24+
25+
-- Add compression policy to compress chunks older than 7 days
26+
SELECT add_compression_policy('funding_rates', INTERVAL '7 days');

pragma-entities/src/dto/future_entry.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ impl From<crate::FutureEntry> for FutureEntry {
2727
source: future_entry.source,
2828
timestamp: future_entry.timestamp.and_utc().timestamp_millis() as u64,
2929
expiration_timestamp,
30-
publisher_signature: future_entry.publisher_signature,
30+
publisher_signature: future_entry.publisher_signature.unwrap_or_default(),
3131
price: future_entry.price.to_u128().unwrap_or(0), // change default value ?
3232
}
3333
}

pragma-entities/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub use error::InfraError;
1515
pub use models::{
1616
checkpoint_error::CheckpointError,
1717
entry::{Entry, NewEntry},
18+
funding_rate::{FundingRate, NewFundingRate},
1819
future_entry::{FutureEntry, NewFutureEntry},
1920
publisher::{NewPublisher, Publishers},
2021
publisher_error::PublisherError,

pragma-entities/src/models/entries/future_entry.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub struct FutureEntry {
2525
// If expiration_timestamp is None, it means the entry is a perpetual future
2626
// else it is a regular future entry that will expire at the expiration_timestamp.
2727
pub expiration_timestamp: Option<NaiveDateTime>,
28-
pub publisher_signature: String,
28+
pub publisher_signature: Option<String>,
2929
pub price: BigDecimal,
3030
}
3131

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use chrono::NaiveDateTime;
2+
use diesel::prelude::*;
3+
use serde::{Deserialize, Serialize};
4+
use uuid::Uuid;
5+
6+
use crate::schema::funding_rates;
7+
8+
#[derive(Debug, Clone, Queryable, Serialize, Deserialize)]
9+
#[diesel(table_name = funding_rates)]
10+
#[diesel(check_for_backend(diesel::pg::Pg))]
11+
pub struct FundingRate {
12+
pub id: Uuid,
13+
pub source: String,
14+
pub pair: String,
15+
pub annualized_rate: f64,
16+
pub timestamp: NaiveDateTime,
17+
pub created_at: NaiveDateTime,
18+
}
19+
20+
#[derive(Debug, Clone, Serialize, Deserialize, Insertable, AsChangeset)]
21+
#[diesel(table_name = funding_rates)]
22+
pub struct NewFundingRate {
23+
pub source: String,
24+
pub pair: String,
25+
pub annualized_rate: f64,
26+
pub timestamp: NaiveDateTime,
27+
}
28+
29+
impl FundingRate {
30+
pub fn create_many(
31+
conn: &mut PgConnection,
32+
new_entries: Vec<NewFundingRate>,
33+
) -> Result<Vec<Self>, diesel::result::Error> {
34+
diesel::insert_into(funding_rates::table)
35+
.values(&new_entries)
36+
.get_results(conn)
37+
}
38+
}

pragma-entities/src/models/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod checkpoint_error;
22
pub mod entries;
3+
pub mod funding_rate;
34
pub mod publisher;
45
pub mod publisher_error;
56

pragma-entities/src/schema.rs

+17-6
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,22 @@ diesel::table! {
44
entries (id, timestamp) {
55
id -> Uuid,
66
pair_id -> Varchar,
7-
publisher -> Text,
8-
timestamp -> Timestamptz,
97
price -> Numeric,
8+
timestamp -> Timestamptz,
9+
publisher -> Text,
10+
publisher_signature -> Nullable<Text>,
1011
source -> Varchar,
11-
publisher_signature -> Nullable<Varchar>,
12+
}
13+
}
14+
15+
diesel::table! {
16+
funding_rates (id, timestamp) {
17+
id -> Uuid,
18+
source -> Varchar,
19+
pair -> Varchar,
20+
annualized_rate -> Float8,
21+
timestamp -> Timestamptz,
22+
created_at -> Timestamptz,
1223
}
1324
}
1425

@@ -20,7 +31,7 @@ diesel::table! {
2031
timestamp -> Timestamptz,
2132
expiration_timestamp -> Nullable<Timestamptz>,
2233
publisher -> Text,
23-
publisher_signature -> Text,
34+
publisher_signature -> Nullable<Text>,
2435
source -> Varchar,
2536
}
2637
}
@@ -31,9 +42,9 @@ diesel::table! {
3142
name -> Varchar,
3243
master_key -> Varchar,
3344
active_key -> Varchar,
34-
active -> Bool,
3545
account_address -> Varchar,
46+
active -> Bool,
3647
}
3748
}
3849

39-
diesel::allow_tables_to_appear_in_same_query!(entries, future_entries, publishers,);
50+
diesel::allow_tables_to_appear_in_same_query!(entries, funding_rates, future_entries, publishers,);

pragma-ingestor/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@ serde_json = { workspace = true }
2323
thiserror = { workspace = true }
2424
tokio = { workspace = true, features = ["full"] }
2525
tracing = { workspace = true }
26+
clap = { workspace = true, features = ["derive"] }

pragma-ingestor/src/config.rs

+10-15
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,16 @@
1-
use serde::{Deserialize, Serialize};
1+
use clap::Parser;
22
use std::sync::LazyLock;
33

4-
use crate::error::PragmaConsumerError;
4+
pub(crate) static CONFIG: LazyLock<Ingestor> = LazyLock::new(load_configuration);
55

6-
pub static CONFIG: LazyLock<Ingestor> = LazyLock::new(load_configuration);
7-
8-
#[derive(Debug, Serialize, Deserialize)]
9-
pub struct Ingestor {
10-
pub num_consumers: usize,
11-
}
12-
13-
impl Ingestor {
14-
pub fn from_env() -> Result<Self, PragmaConsumerError> {
15-
envy::from_env::<Self>().map_err(PragmaConsumerError::LoadConfig)
16-
}
6+
#[derive(Parser, Debug)]
7+
#[command(author, version, about, long_about = None)]
8+
pub(crate) struct Ingestor {
9+
/// Number of consumers to run
10+
#[arg(long, env = "NUM_CONSUMERS", default_value = "10")]
11+
pub(crate) num_consumers: usize,
1712
}
1813

19-
pub fn load_configuration() -> Ingestor {
20-
Ingestor::from_env().expect("cannot load configuration env")
14+
pub(crate) fn load_configuration() -> Ingestor {
15+
Ingestor::parse()
2116
}

0 commit comments

Comments
 (0)