@@ -12,10 +12,17 @@ use crate::core::dedupe_engine::DeduplicationEngine;
1212use crate :: core:: subscription:: { FanoutMessage , SubscriptionService } ;
1313use nostr_sdk:: Kind ;
1414use nostr_sdk:: nips:: nip04;
15- use nostr_sdk:: prelude:: { Client , EventBuilder , Keys , PublicKey , Tag } ;
15+ use nostr_sdk:: prelude:: { Client , EventBuilder , Keys , PublicKey , Tag , Timestamp } ;
1616use serde_json:: Value ;
1717use std:: str:: FromStr ;
1818
19+ const KIND_TRADE_SIGNAL : u16 = 30931 ;
20+ const KIND_COPYTRADE_INTENT : u16 = 30932 ;
21+ const KIND_HEARTBEAT : u16 = 30933 ;
22+ const KIND_EXECUTION_REPORT : u16 = 30934 ;
23+ const KIND_AGENT_REGISTER : u16 = 30935 ;
24+ const STALE_AFTER : Duration = Duration :: from_secs ( 10 * 60 ) ;
25+
1926/// Wrapper for Event to enable sorting by timestamp
2027#[ derive( Clone ) ]
2128struct EventWrapper {
@@ -190,6 +197,17 @@ impl EventRouter {
190197
191198 // Send events to downstream in timestamp order
192199 for event in batch {
200+ if self . is_stale ( & event) {
201+ debug ! (
202+ "Skip stale event id={} kind={} age_secs={}" ,
203+ event. id. to_hex( ) ,
204+ event. kind. as_u16( ) ,
205+ Timestamp :: now( )
206+ . as_secs( )
207+ . saturating_sub( event. created_at. as_secs( ) )
208+ ) ;
209+ continue ;
210+ }
193211 self . maybe_update_last_seen ( & event) . await ;
194212 if let Err ( e) = self . handle_copytrade_fanout ( & event) . await {
195213 error ! ( "Fanout processing failed: {}" , e) ;
@@ -237,6 +255,72 @@ impl EventRouter {
237255 }
238256
239257 async fn handle_copytrade_fanout ( & self , event : & Event ) -> Result < ( ) > {
258+ // Short-circuit heartbeat-like events: no decrypt/fanout required
259+ if matches ! ( event. kind. as_u16( ) , KIND_HEARTBEAT | KIND_EXECUTION_REPORT ) {
260+ return Ok ( ( ) ) ;
261+ }
262+
263+ // Agent registration is plaintext and upserts the bot record
264+ if event. kind . as_u16 ( ) == KIND_AGENT_REGISTER {
265+ let subs = match & self . subscription_service {
266+ Some ( s) => s,
267+ None => return Ok ( ( ) ) ,
268+ } ;
269+
270+ let parsed: Value = match serde_json:: from_str ( & event. content ) {
271+ Ok ( v) => v,
272+ Err ( e) => {
273+ error ! (
274+ "Agent register decode failed for {}: {}" ,
275+ event. id. to_hex( ) ,
276+ e
277+ ) ;
278+ return Ok ( ( ) ) ;
279+ }
280+ } ;
281+
282+ let nostr_pubkey = parsed
283+ . get ( "nostr_pubkey" )
284+ . and_then ( |v| v. as_str ( ) )
285+ . map ( |s| s. to_string ( ) )
286+ . unwrap_or_else ( || event. pubkey . to_hex ( ) ) ;
287+ let bot_pubkey = parsed
288+ . get ( "bot_pubkey" )
289+ . and_then ( |v| v. as_str ( ) )
290+ . map ( |s| s. to_string ( ) )
291+ . unwrap_or_else ( || event. pubkey . to_hex ( ) ) ;
292+ let eth_address = parsed
293+ . get ( "eth_address" )
294+ . or_else ( || parsed. get ( "account" ) )
295+ . and_then ( |v| v. as_str ( ) )
296+ . unwrap_or ( "" )
297+ . to_string ( ) ;
298+ let name = parsed
299+ . get ( "name" )
300+ . and_then ( |v| v. as_str ( ) )
301+ . unwrap_or ( "agent" )
302+ . to_string ( ) ;
303+
304+ if eth_address. is_empty ( ) {
305+ error ! ( "Agent register missing eth_address for {}" , bot_pubkey) ;
306+ return Ok ( ( ) ) ;
307+ }
308+
309+ if let Err ( e) = subs
310+ . register_bot ( & bot_pubkey, & nostr_pubkey, & eth_address, & name)
311+ . await
312+ {
313+ error ! ( "Agent register upsert failed for {}: {}" , bot_pubkey, e) ;
314+ } else {
315+ info ! (
316+ "Registered bot via nostr: bot_pubkey={} eth={}" ,
317+ bot_pubkey, eth_address
318+ ) ;
319+ }
320+
321+ return Ok ( ( ) ) ;
322+ }
323+
240324 // Preconditions: need subscription service and platform nostr keys
241325 let subs = match & self . subscription_service {
242326 Some ( s) => s,
@@ -247,6 +331,12 @@ impl EventRouter {
247331 None => return Ok ( ( ) ) ,
248332 } ;
249333
334+ // Skip decrypting events we just published (self-sent fanout echoes)
335+ if event. pubkey == nostr_keys. public_key ( ) {
336+ debug ! ( "Skip self-published fanout event {}" , event. id. to_hex( ) ) ;
337+ return Ok ( ( ) ) ;
338+ }
339+
250340 // Decrypt content using platform key and sender pubkey
251341 let plaintext = match nip04:: decrypt ( nostr_keys. secret_key ( ) , & event. pubkey , & event. content )
252342 {
@@ -257,6 +347,19 @@ impl EventRouter {
257347 }
258348 } ;
259349
350+ let preview = if plaintext. len ( ) > 256 {
351+ format ! ( "{}..." , & plaintext[ ..256 ] )
352+ } else {
353+ plaintext. clone ( )
354+ } ;
355+ debug ! (
356+ "Decrypted nostr event id={} kind={} from={} preview={}" ,
357+ event. id. to_hex( ) ,
358+ event. kind. as_u16( ) ,
359+ event. pubkey. to_hex( ) ,
360+ preview,
361+ ) ;
362+
260363 // Extract agent eth address from JSON payload
261364 let agent_eth = extract_agent_eth ( & plaintext)
262365 . ok_or_else ( || anyhow:: anyhow!( "agent eth address missing" ) ) ?;
@@ -299,12 +402,13 @@ impl EventRouter {
299402 // Publish encrypted nostr events to followers if client exists
300403 if let Some ( client) = & self . nostr_client {
301404 for follower in followers {
302- let follower_pk = match PublicKey :: from_str ( & follower. follower_pubkey ) {
405+ let follower_pk_str = follower. shared_secret . as_str ( ) ;
406+ let follower_pk = match PublicKey :: from_str ( follower_pk_str) {
303407 Ok ( pk) => pk,
304408 Err ( e) => {
305409 error ! (
306- "Invalid follower pubkey {}: {}" ,
307- follower . follower_pubkey , e
410+ "Invalid follower shared_secret pubkey {}: {}" ,
411+ follower_pk_str , e
308412 ) ;
309413 continue ;
310414 }
@@ -314,10 +418,7 @@ impl EventRouter {
314418 match nip04:: encrypt ( nostr_keys. secret_key ( ) , & follower_pk, & plaintext) {
315419 Ok ( ct) => ct,
316420 Err ( e) => {
317- error ! (
318- "Encrypt for follower {} failed: {}" ,
319- follower. follower_pubkey, e
320- ) ;
421+ error ! ( "Encrypt for follower {} failed: {}" , follower_pk_str, e) ;
321422 continue ;
322423 }
323424 } ;
@@ -326,10 +427,7 @@ impl EventRouter {
326427 builder = builder. tag ( Tag :: public_key ( follower_pk) ) ;
327428
328429 if let Err ( e) = client. send_event_builder ( builder) . await {
329- error ! (
330- "Publish to follower {} failed: {}" ,
331- follower. follower_pubkey, e
332- ) ;
430+ error ! ( "Publish to follower {} failed: {}" , follower_pk_str, e) ;
333431 }
334432 }
335433 }
@@ -339,8 +437,14 @@ impl EventRouter {
339437}
340438
341439impl EventRouter {
440+ fn is_stale ( & self , event : & Event ) -> bool {
441+ let now = Timestamp :: now ( ) . as_secs ( ) ;
442+ let created = event. created_at . as_secs ( ) ;
443+ now. saturating_sub ( created) > STALE_AFTER . as_secs ( )
444+ }
445+
342446 async fn maybe_update_last_seen ( & self , event : & Event ) {
343- const HEARTBEAT_KIND : u16 = 30934 ;
447+ const HEARTBEAT_KIND : u16 = KIND_HEARTBEAT ;
344448 const MIN_INTERVAL : Duration = Duration :: from_secs ( 15 * 60 ) ;
345449
346450 if event. kind . as_u16 ( ) != HEARTBEAT_KIND {
@@ -384,6 +488,7 @@ fn extract_agent_eth(plaintext: &str) -> Option<String> {
384488 parsed
385489 . get ( "agent_eth_address" )
386490 . or_else ( || parsed. get ( "agent" ) )
491+ . or_else ( || parsed. get ( "account" ) )
387492 . or_else ( || parsed. get ( "eth_address" ) )
388493 . and_then ( |v| v. as_str ( ) )
389494 . map ( |s| s. to_string ( ) )
0 commit comments