Skip to content

Commit 5df7ce3

Browse files
committed
feat: enhance event handling and signal processing in the relay system
1 parent 132500b commit 5df7ce3

File tree

8 files changed

+315
-39
lines changed

8 files changed

+315
-39
lines changed

relayer/examples/downstream.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@ use tracing_subscriber::EnvFilter;
66

77
async fn handle_event(Json(payload): Json<Value>) -> impl IntoResponse {
88
info!("=== Received Nostr Event ===");
9-
info!(
10-
"{}",
11-
serde_json::to_string_pretty(&payload).unwrap()
12-
);
9+
info!("{}", serde_json::to_string_pretty(&payload).unwrap());
1310
info!("============================");
1411

1512
(StatusCode::OK, "Event received successfully")

relayer/src/api/metrics.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use prometheus::{register_gauge, register_histogram, register_int_counter, Gauge, Histogram, IntCounter};
1+
use prometheus::{
2+
Gauge, Histogram, IntCounter, register_gauge, register_histogram, register_int_counter,
3+
};
24

35
/// Metrics for monitoring the relay system
46
pub struct Metrics {
@@ -26,10 +28,7 @@ impl Metrics {
2628
"processing_latency_seconds",
2729
"Event processing latency in seconds"
2830
)?,
29-
memory_usage: register_gauge!(
30-
"memory_usage_mb",
31-
"Memory usage in Million Bytes"
32-
)?,
31+
memory_usage: register_gauge!("memory_usage_mb", "Memory usage in Million Bytes")?,
3332
active_connections: register_gauge!(
3433
"active_connections",
3534
"Number of active relay connections"

relayer/src/api/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
pub mod metrics;
22
pub mod rest_api;
33
pub mod websocket;
4-

relayer/src/api/websocket.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@ pub struct WsState {
2626
// use crate::core::relay_pool::RelayPool;
2727

2828
/// WebSocket handler for streaming events to downstream systems
29-
async fn websocket_handler(
30-
ws: WebSocketUpgrade,
31-
State(state): State<WsState>,
32-
) -> Response {
29+
async fn websocket_handler(ws: WebSocketUpgrade, State(state): State<WsState>) -> Response {
3330
let rx = state.event_rx.clone();
3431
ws.on_upgrade(|socket| handle_socket(socket, rx))
3532
}

relayer/src/core/event_router.rs

Lines changed: 209 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ use tracing::{debug, error, info};
99

1010
use crate::api::metrics::Metrics;
1111
use crate::core::dedupe_engine::DeduplicationEngine;
12-
use crate::core::subscription::{FanoutMessage, SubscriptionService};
12+
use crate::core::subscription::{FanoutMessage, SignalInsert, SubscriptionService};
13+
use chrono::{DateTime, TimeZone, Utc};
1314
use nostr_sdk::Kind;
1415
use nostr_sdk::nips::nip04;
1516
use nostr_sdk::prelude::{Client, EventBuilder, Keys, PublicKey, Tag, Timestamp};
@@ -347,6 +348,12 @@ impl EventRouter {
347348
}
348349
};
349350

351+
if event.kind.as_u16() == KIND_TRADE_SIGNAL {
352+
return self
353+
.process_trade_signal(event, &plaintext, subs, nostr_keys)
354+
.await;
355+
}
356+
350357
let preview = if plaintext.len() > 256 {
351358
format!("{}...", &plaintext[..256])
352359
} else {
@@ -436,6 +443,123 @@ impl EventRouter {
436443
}
437444
}
438445

446+
impl EventRouter {
447+
async fn process_trade_signal(
448+
&self,
449+
event: &Event,
450+
plaintext: &str,
451+
subs: &SubscriptionService,
452+
nostr_keys: &Keys,
453+
) -> Result<()> {
454+
let meta = extract_signal_meta(plaintext);
455+
let agent_eth = meta.agent_eth_address.clone();
456+
let event_created_at = to_event_datetime(event);
457+
let leader_pubkey = event.pubkey.to_hex();
458+
459+
let bot = match agent_eth.as_deref() {
460+
Some(eth) => match subs.find_bot_by_eth(eth).await? {
461+
Some(b) => Some(b),
462+
None => {
463+
error!("No bot registered for eth address {}", eth);
464+
None
465+
}
466+
},
467+
None => {
468+
error!(
469+
"agent eth address missing in trade signal {}",
470+
event.id.to_hex()
471+
);
472+
None
473+
}
474+
};
475+
476+
let signal_insert = SignalInsert {
477+
event_id: event.id.to_hex(),
478+
kind: event.kind.as_u16(),
479+
bot_pubkey: bot.as_ref().map(|b| b.bot_pubkey.clone()),
480+
leader_pubkey: leader_pubkey.clone(),
481+
follower_pubkey: meta.follower_pubkey.clone(),
482+
agent_eth_address: agent_eth.clone(),
483+
role: meta.role.clone(),
484+
symbol: meta.symbol.clone(),
485+
side: meta.side.clone(),
486+
size: meta.size,
487+
price: meta.price,
488+
status: meta.status.clone(),
489+
tx_hash: meta.tx_hash.clone(),
490+
pnl: meta.pnl,
491+
pnl_usd: meta.pnl_usd,
492+
raw_content: plaintext.to_string(),
493+
event_created_at,
494+
};
495+
496+
if let Err(e) = subs.record_signal(signal_insert).await {
497+
error!("Failed to record trade signal {}: {}", event.id.to_hex(), e);
498+
}
499+
500+
let bot = match bot {
501+
Some(b) => b,
502+
None => return Ok(()),
503+
};
504+
505+
self.maybe_record_trade(subs, &bot.bot_pubkey, plaintext)
506+
.await;
507+
let followers = subs.list_subscriptions(&bot.bot_pubkey).await?;
508+
if followers.is_empty() {
509+
return Ok(());
510+
}
511+
512+
if let Some(fanout_tx) = &self.fanout_tx {
513+
for follower in &followers {
514+
let msg = FanoutMessage {
515+
target_pubkey: follower.follower_pubkey.clone(),
516+
bot_pubkey: bot.bot_pubkey.clone(),
517+
kind: event.kind.as_u16(),
518+
original_event_id: event.id.to_hex(),
519+
payload: plaintext.to_string(),
520+
};
521+
if let Err(e) = fanout_tx.send_async(msg).await {
522+
error!("Failed to send fanout ws payload: {}", e);
523+
}
524+
}
525+
}
526+
527+
if let Some(client) = &self.nostr_client {
528+
for follower in followers {
529+
let follower_pk_str = follower.shared_secret.as_str();
530+
let follower_pk = match PublicKey::from_str(follower_pk_str) {
531+
Ok(pk) => pk,
532+
Err(e) => {
533+
error!(
534+
"Invalid follower shared_secret pubkey {}: {}",
535+
follower_pk_str, e
536+
);
537+
continue;
538+
}
539+
};
540+
541+
let encrypted =
542+
match nip04::encrypt(nostr_keys.secret_key(), &follower_pk, plaintext) {
543+
Ok(ct) => ct,
544+
Err(e) => {
545+
error!("Encrypt for follower {} failed: {}", follower_pk_str, e);
546+
continue;
547+
}
548+
};
549+
550+
let mut builder = EventBuilder::new(Kind::Custom(event.kind.as_u16()), encrypted);
551+
builder = builder.tag(Tag::public_key(follower_pk));
552+
553+
if let Err(e) = client.send_event_builder(builder).await {
554+
error!("Publish to follower {} failed: {}", follower_pk_str, e);
555+
}
556+
}
557+
}
558+
559+
Ok(())
560+
}
561+
}
562+
439563
impl EventRouter {
440564
fn is_stale(&self, event: &Event) -> bool {
441565
let now = Timestamp::now().as_secs();
@@ -483,17 +607,6 @@ impl EventRouter {
483607
}
484608
}
485609

486-
fn extract_agent_eth(plaintext: &str) -> Option<String> {
487-
let parsed: serde_json::Value = serde_json::from_str(plaintext).ok()?;
488-
parsed
489-
.get("agent_eth_address")
490-
.or_else(|| parsed.get("agent"))
491-
.or_else(|| parsed.get("account"))
492-
.or_else(|| parsed.get("eth_address"))
493-
.and_then(|v| v.as_str())
494-
.map(|s| s.to_string())
495-
}
496-
497610
#[derive(Debug)]
498611
struct TradeMeta {
499612
tx_hash: String,
@@ -508,6 +621,21 @@ struct TradeMeta {
508621
role: String,
509622
}
510623

624+
#[derive(Debug, Default)]
625+
struct SignalMeta {
626+
agent_eth_address: Option<String>,
627+
follower_pubkey: Option<String>,
628+
role: Option<String>,
629+
symbol: Option<String>,
630+
side: Option<String>,
631+
size: Option<f64>,
632+
price: Option<f64>,
633+
status: Option<String>,
634+
tx_hash: Option<String>,
635+
pnl: Option<f64>,
636+
pnl_usd: Option<f64>,
637+
}
638+
511639
impl EventRouter {
512640
async fn maybe_record_trade(
513641
&self,
@@ -590,3 +718,72 @@ fn extract_trade_meta(plaintext: &str) -> Option<TradeMeta> {
590718
role,
591719
})
592720
}
721+
722+
fn extract_signal_meta(plaintext: &str) -> SignalMeta {
723+
let parsed: Value = match serde_json::from_str(plaintext) {
724+
Ok(v) => v,
725+
Err(_) => return SignalMeta::default(),
726+
};
727+
728+
let agent_eth_address = parsed
729+
.get("agent_eth_address")
730+
.or_else(|| parsed.get("agent"))
731+
.or_else(|| parsed.get("account"))
732+
.or_else(|| parsed.get("eth_address"))
733+
.and_then(|v| v.as_str())
734+
.map(|s| s.to_string());
735+
736+
let follower_pubkey = parsed
737+
.get("follower_pubkey")
738+
.or_else(|| parsed.get("follower"))
739+
.and_then(|v| v.as_str())
740+
.map(|s| s.to_string());
741+
742+
let role = parsed
743+
.get("role")
744+
.and_then(|v| v.as_str())
745+
.map(|s| s.to_string());
746+
let symbol = parsed
747+
.get("symbol")
748+
.and_then(|v| v.as_str())
749+
.map(|s| s.to_string());
750+
let side = parsed
751+
.get("side")
752+
.and_then(|v| v.as_str())
753+
.map(|s| s.to_string());
754+
let size = parsed.get("size").and_then(|v| v.as_f64());
755+
let price = parsed.get("price").and_then(|v| v.as_f64());
756+
let status = parsed
757+
.get("status")
758+
.and_then(|v| v.as_str())
759+
.map(|s| s.to_string());
760+
let tx_hash = parsed
761+
.get("tx_hash")
762+
.and_then(|v| v.as_str())
763+
.map(|s| s.to_string());
764+
let pnl = parsed.get("pnl").and_then(|v| v.as_f64());
765+
let pnl_usd = parsed.get("pnl_usd").and_then(|v| v.as_f64());
766+
767+
SignalMeta {
768+
agent_eth_address,
769+
follower_pubkey,
770+
role,
771+
symbol,
772+
side,
773+
size,
774+
price,
775+
status,
776+
tx_hash,
777+
pnl,
778+
pnl_usd,
779+
}
780+
}
781+
782+
fn extract_agent_eth(plaintext: &str) -> Option<String> {
783+
extract_signal_meta(plaintext).agent_eth_address
784+
}
785+
786+
fn to_event_datetime(event: &Event) -> DateTime<Utc> {
787+
let secs = event.created_at.as_secs() as i64;
788+
Utc.timestamp_opt(secs, 0).single().unwrap_or_else(Utc::now)
789+
}

relayer/src/core/settlement_worker.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use reqwest::StatusCode;
66
use tokio::time::sleep;
77
use tracing::{debug, error, info, warn};
88

9-
use crate::core::subscription::SubscriptionService;
109
use crate::config::SettlementCreditConfig;
10+
use crate::core::subscription::SubscriptionService;
1111

1212
#[derive(Clone, Debug)]
1313
pub struct SettlementWorker {
@@ -47,10 +47,7 @@ impl SettlementWorker {
4747
}
4848

4949
async fn tick(&self) -> Result<()> {
50-
let trades = self
51-
.svc
52-
.list_pending_trades(self.batch_limit)
53-
.await?;
50+
let trades = self.svc.list_pending_trades(self.batch_limit).await?;
5451

5552
if trades.is_empty() {
5653
debug!("settlement: no pending trades");
@@ -64,12 +61,8 @@ impl SettlementWorker {
6461
.update_trade_settlement(&t.tx_hash, "confirmed", None, None)
6562
.await?;
6663
if let Some(credit) = self.compute_credit(&t) {
67-
let recipient = t
68-
.follower_pubkey
69-
.as_deref()
70-
.unwrap_or(&t.bot_pubkey);
71-
self
72-
.svc
64+
let recipient = t.follower_pubkey.as_deref().unwrap_or(&t.bot_pubkey);
65+
self.svc
7366
.award_credits(&t.bot_pubkey, recipient, credit)
7467
.await?;
7568
}

0 commit comments

Comments
 (0)