Skip to content

Commit e05c1bc

Browse files
0xrinegadeclaude
andcommitted
feat(research): Add horizontal pipeline renderer for wallet flow analysis
Added comprehensive horizontal LEFT→RIGHT pipeline visualization for tracking token flows through wallet networks with rich metadata. **New Features:** - **Depth-based columnar layout** - Wallets organized by BFS depth from origin - **Transfer aggregation** - Groups transfers by destination and token type - **Rich metadata display** - Shows amounts, tx counts, date ranges on arrows - **Convergence detection** - Highlights wallets where multiple paths merge - **Visual wallet icons** - Context-aware icons (🏦 exchanges, 🌀 mixers, etc.) - **Multi-token support** - Handles wallets receiving multiple token types **Implementation:** - `compute_depths()` - BFS-based depth calculation from origin wallet - `get_aggregated_transfers()` - Groups and aggregates transfers by destination - `get_wallet_icon()` - Context-aware icon selection based on wallet labels - `render_horizontal_pipeline()` - Main rendering with columnar layout **Example Output:** ``` ╔═══════════════ DEPTH 0 ═══════════════╗ 🏦 [Binance Exchange] ┌──────────────────────────────┐ │ ABC123...XYZ │ └──────────────────────────────┘ │ ├─→ [$10.5M USDC] ──→ DEF456...UVW [DEPTH 1] │ 150 txs │ 2024-01-15 → 2024-01-20 │ └─→ MULTI-TOKEN → GHI789...RST [DEPTH 1] [$5.2M USDC] (75 txs) [$3.8M SOL] (42 txs) ╔═══════════════ DEPTH 1 ═══════════════╗ ... ``` **Use Cases:** - Money laundering investigation (track flow through mixer networks) - Exchange flow analysis (identify withdrawal patterns) - Token distribution tracking (airdrops, vesting schedules) - Convergence point detection (consolidation wallets) Complements existing vertical tree renderer with alternative horizontal layout optimized for pipeline-style flow visualization. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 6ef8044 commit e05c1bc

File tree

1 file changed

+334
-0
lines changed

1 file changed

+334
-0
lines changed

src/services/research_agent.rs

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,22 @@ pub struct GraphNode {
119119
pub outgoing: Vec<Transfer>,
120120
}
121121

122+
/// Aggregated transfer data for horizontal pipeline rendering
123+
#[derive(Debug, Clone)]
124+
struct AggregatedTransfer {
125+
to: String,
126+
tokens: HashMap<String, TokenAggregate>,
127+
}
128+
129+
/// Aggregated token transfer statistics
130+
#[derive(Debug, Clone)]
131+
struct TokenAggregate {
132+
total_amount: f64,
133+
tx_count: usize,
134+
first_timestamp: Option<String>,
135+
last_timestamp: Option<String>,
136+
}
137+
122138
/// Configuration for rendering ASCII output
123139
#[derive(Debug, Clone)]
124140
pub struct RenderConfig {
@@ -489,6 +505,252 @@ impl TransferGraph {
489505
format!("{}.{}", result, decimal_part)
490506
}
491507

508+
// ═══════════════════════════════════════════════════════════════════════════
509+
// HORIZONTAL PIPELINE RENDERER - LEFT→RIGHT Columnar Layout
510+
// ═══════════════════════════════════════════════════════════════════════════
511+
512+
/// Compute depth of each wallet using BFS from origin
513+
fn compute_depths(&self, origin: &str) -> HashMap<String, usize> {
514+
use std::collections::VecDeque;
515+
516+
let mut depths = HashMap::new();
517+
let mut queue = VecDeque::new();
518+
519+
depths.insert(origin.to_string(), 0);
520+
queue.push_back((origin.to_string(), 0));
521+
522+
while let Some((addr, depth)) = queue.pop_front() {
523+
if let Some(node) = self.nodes.get(&addr) {
524+
for transfer in &node.outgoing {
525+
if !depths.contains_key(&transfer.to) {
526+
depths.insert(transfer.to.clone(), depth + 1);
527+
queue.push_back((transfer.to.clone(), depth + 1));
528+
}
529+
}
530+
}
531+
}
532+
533+
depths
534+
}
535+
536+
/// Get aggregated transfers from a wallet
537+
fn get_aggregated_transfers(&self, from_addr: &str) -> Vec<AggregatedTransfer> {
538+
use std::collections::HashMap;
539+
540+
let node = match self.nodes.get(from_addr) {
541+
Some(n) => n,
542+
None => return Vec::new(),
543+
};
544+
545+
// Group by destination address
546+
let mut by_destination: HashMap<String, HashMap<String, Vec<&Transfer>>> = HashMap::new();
547+
548+
for transfer in &node.outgoing {
549+
by_destination
550+
.entry(transfer.to.clone())
551+
.or_insert_with(HashMap::new)
552+
.entry(transfer.token_symbol.clone())
553+
.or_insert_with(Vec::new)
554+
.push(transfer);
555+
}
556+
557+
// Aggregate each (destination, token) pair
558+
let mut result = Vec::new();
559+
for (to_addr, tokens_map) in by_destination {
560+
let mut token_aggregates = HashMap::new();
561+
562+
for (token_symbol, transfers) in tokens_map {
563+
let total_amount: f64 = transfers.iter().map(|t| t.amount).sum();
564+
let tx_count = transfers.len();
565+
566+
// Find first and last timestamps
567+
let mut timestamps: Vec<&Option<String>> = transfers.iter().map(|t| &t.timestamp).collect();
568+
timestamps.sort();
569+
let first_timestamp = timestamps.first().and_then(|t| t.as_ref()).cloned();
570+
let last_timestamp = timestamps.last().and_then(|t| t.as_ref()).cloned();
571+
572+
token_aggregates.insert(
573+
token_symbol,
574+
TokenAggregate {
575+
total_amount,
576+
tx_count,
577+
first_timestamp,
578+
last_timestamp,
579+
},
580+
);
581+
}
582+
583+
result.push(AggregatedTransfer {
584+
to: to_addr,
585+
tokens: token_aggregates,
586+
});
587+
}
588+
589+
result
590+
}
591+
592+
/// Get visual icon based on wallet label
593+
fn get_wallet_icon(&self, addr: &str) -> &str {
594+
if let Some(node) = self.nodes.get(addr) {
595+
if let Some(label) = &node.label {
596+
let label_lower = label.to_lowercase();
597+
if label_lower.contains("exchange") || label_lower.contains("binance") || label_lower.contains("coinbase") {
598+
return "🏦";
599+
} else if label_lower.contains("mixer") || label_lower.contains("tornado") || label_lower.contains("cyclone") {
600+
return "🌀";
601+
} else if label_lower.contains("burner") || label_lower.contains("temp") {
602+
return "🔥";
603+
} else if label_lower.contains("cold") || label_lower.contains("vault") || label_lower.contains("storage") {
604+
return "💎";
605+
} else if label_lower.contains("dex") || label_lower.contains("swap") {
606+
return "🔄";
607+
} else if label_lower.contains("distrib") {
608+
return "📊";
609+
} else if label_lower.contains("consolidat") {
610+
return "🔄";
611+
}
612+
}
613+
}
614+
"○"
615+
}
616+
617+
/// Render horizontal pipeline with LEFT→RIGHT flow
618+
pub fn render_horizontal_pipeline(&self) -> String {
619+
let mut output = String::new();
620+
let cfg = &self.render_config;
621+
622+
// Header
623+
if cfg.show_header {
624+
let title = "HORIZONTAL PIPELINE - WALLET FLOW ANALYSIS";
625+
let title_padded = self.center_text(title, 74);
626+
output.push_str("╔══════════════════════════════════════════════════════════════════════════╗\n");
627+
output.push_str(&format!("║{}║\n", title_padded));
628+
output.push_str("╚══════════════════════════════════════════════════════════════════════════╝\n\n");
629+
}
630+
631+
if let Some(token) = &self.token_name {
632+
output.push_str(&format!("TOKEN: {}\n", token));
633+
}
634+
635+
output.push_str("LAYOUT: Each depth level = one column, wallets flow LEFT→RIGHT\n");
636+
output.push_str("METADATA: Amounts, tx counts, date ranges shown on arrows\n\n");
637+
output.push_str("═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════\n\n");
638+
639+
// Compute depths if we have an origin
640+
let origin_addr = match &self.origin {
641+
Some(addr) => addr,
642+
None => {
643+
output.push_str("ERROR: No origin wallet set for pipeline rendering\n");
644+
return output;
645+
}
646+
};
647+
648+
let depths = self.compute_depths(origin_addr);
649+
let max_depth = *depths.values().max().unwrap_or(&0);
650+
651+
// Group wallets by depth
652+
let mut by_depth: HashMap<usize, Vec<String>> = HashMap::new();
653+
for (addr, depth) in &depths {
654+
by_depth.entry(*depth).or_insert_with(Vec::new).push(addr.clone());
655+
}
656+
657+
// Render depth by depth
658+
for depth_level in 0..=max_depth {
659+
output.push_str(&format!("\n╔═══════════════ DEPTH {} ═══════════════╗\n", depth_level));
660+
661+
let wallets_at_depth = by_depth.get(&depth_level).cloned().unwrap_or_default();
662+
663+
for wallet_addr in &wallets_at_depth {
664+
let icon = self.get_wallet_icon(wallet_addr);
665+
let label = self.nodes.get(wallet_addr)
666+
.and_then(|n| n.label.as_ref())
667+
.map(|l| format!(" [{}]", l))
668+
.unwrap_or_default();
669+
670+
// Count incoming transfers (convergence detection)
671+
let incoming_count = self.nodes.values()
672+
.flat_map(|n| &n.outgoing)
673+
.filter(|t| &t.to == wallet_addr)
674+
.count();
675+
676+
let convergence = if incoming_count > 1 {
677+
format!(" [×{} PATHS CONVERGE]", incoming_count)
678+
} else {
679+
String::new()
680+
};
681+
682+
// Draw wallet box
683+
output.push_str(&format!("\n{}{}{}\n", icon, label, convergence));
684+
let box_top = format!("┌{}┐", "─".repeat(wallet_addr.len() + 2));
685+
let box_mid = format!("│ {} │", wallet_addr);
686+
let box_bot = format!("└{}┘", "─".repeat(wallet_addr.len() + 2));
687+
output.push_str(&format!("{}\n", box_top));
688+
output.push_str(&format!("{}\n", box_mid));
689+
output.push_str(&format!("{}\n", box_bot));
690+
691+
// Show outgoing transfers with rich metadata
692+
let aggregated = self.get_aggregated_transfers(wallet_addr);
693+
if !aggregated.is_empty() {
694+
output.push_str(" │\n");
695+
696+
for (idx, agg_transfer) in aggregated.iter().enumerate() {
697+
let is_last = idx == aggregated.len() - 1;
698+
let branch_char = if is_last { "└─" } else { "├─" };
699+
700+
// Show each token separately
701+
let token_count = agg_transfer.tokens.len();
702+
if token_count == 1 {
703+
// Single token - compact format
704+
let (token_symbol, agg) = agg_transfer.tokens.iter().next().unwrap();
705+
output.push_str(&format!(" {}→ [${}M {}] ────────────────→ {} [DEPTH {}]\n",
706+
branch_char,
707+
(agg.total_amount / 1_000_000.0),
708+
token_symbol,
709+
&agg_transfer.to,
710+
depths.get(&agg_transfer.to).unwrap_or(&0)
711+
));
712+
output.push_str(&format!(" {} txs\n", agg.tx_count));
713+
if let (Some(first), Some(last)) = (&agg.first_timestamp, &agg.last_timestamp) {
714+
output.push_str(&format!(" {} → {}\n", first, last));
715+
}
716+
} else {
717+
// Multi-token - stacked format
718+
output.push_str(&format!(" {}→ MULTI-TOKEN → {} [DEPTH {}]\n",
719+
branch_char,
720+
&agg_transfer.to,
721+
depths.get(&agg_transfer.to).unwrap_or(&0)
722+
));
723+
for (token_symbol, agg) in &agg_transfer.tokens {
724+
output.push_str(&format!(" [${}M {}] ({} txs)\n",
725+
(agg.total_amount / 1_000_000.0),
726+
token_symbol,
727+
agg.tx_count
728+
));
729+
}
730+
}
731+
732+
if !is_last {
733+
output.push_str(" │\n");
734+
}
735+
}
736+
}
737+
}
738+
}
739+
740+
output.push_str("\n\n═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════\n");
741+
742+
// Summary
743+
if cfg.show_stats_summary {
744+
output.push_str("\n┌─────────────────────────────────────────────────────────────────────────┐\n");
745+
output.push_str(&format!("│ Max Depth: {:>62} │\n", max_depth));
746+
output.push_str(&format!("│ Total Wallets: {:>58} │\n", self.nodes.len()));
747+
output.push_str(&format!("│ Total Depth Levels: {:>55} │\n", max_depth + 1));
748+
output.push_str("└─────────────────────────────────────────────────────────────────────────┘\n");
749+
}
750+
751+
output
752+
}
753+
492754
/// Launch interactive TUI viewer for this graph
493755
pub fn launch_tui(self) -> anyhow::Result<()> {
494756
use crate::services::graph_tui::launch_graph_viewer;
@@ -3023,3 +3285,75 @@ mod tests {
30233285
assert!(output.contains("Burner Wallet"));
30243286
assert!(output.contains("Cold Storage Vault"));
30253287
}
3288+
3289+
#[test]
3290+
fn test_horizontal_pipeline_renderer() {
3291+
let mut graph = TransferGraph::new();
3292+
3293+
// Origin: Exchange
3294+
let origin = "ExchangeWallet_Binance_HotWallet_MainUSDT_2025_ABC123XYZ456".to_string();
3295+
graph.origin = Some(origin.clone());
3296+
graph.set_node_label(&origin, "Binance Exchange Wallet".to_string());
3297+
3298+
// Depth 1: Mixer
3299+
let mixer = "MixerHub_TornadoCash_PoolAlpha_USDT_2025_DEF789GHI012".to_string();
3300+
graph.set_node_label(&mixer, "Tornado Cash Mixer A".to_string());
3301+
3302+
graph.add_transfer(Transfer {
3303+
from: origin.clone(),
3304+
to: mixer.clone(),
3305+
amount: 5_000_000.0,
3306+
token_symbol: "USDT".to_string(),
3307+
timestamp: Some("2025-01-01T00:00:00Z".to_string()),
3308+
note: Some("Exchange withdrawal".to_string()),
3309+
});
3310+
3311+
// Depth 2: Burner (with multi-token)
3312+
let burner = "BurnerWallet_Temporary_L2_Keychain_001_JKL345MNO678".to_string();
3313+
graph.set_node_label(&burner, "Burner Wallet 001".to_string());
3314+
3315+
graph.add_transfer(Transfer {
3316+
from: mixer.clone(),
3317+
to: burner.clone(),
3318+
amount: 2_500_000.0,
3319+
token_symbol: "USDT".to_string(),
3320+
timestamp: Some("2025-01-02T00:00:00Z".to_string()),
3321+
note: None,
3322+
});
3323+
3324+
graph.add_transfer(Transfer {
3325+
from: mixer.clone(),
3326+
to: burner.clone(),
3327+
amount: 500_000.0,
3328+
token_symbol: "SOL".to_string(),
3329+
timestamp: Some("2025-01-02T01:00:00Z".to_string()),
3330+
note: None,
3331+
});
3332+
3333+
// Depth 3: Cold Storage
3334+
let cold_storage = "ColdStorage_HardwareWallet_Ledger_Final_PQR901STU234".to_string();
3335+
graph.set_node_label(&cold_storage, "Cold Storage Vault - Final Destination".to_string());
3336+
3337+
graph.add_transfer(Transfer {
3338+
from: burner.clone(),
3339+
to: cold_storage.clone(),
3340+
amount: 3_000_000.0,
3341+
token_symbol: "MULTI".to_string(),
3342+
timestamp: Some("2025-01-03T00:00:00Z".to_string()),
3343+
note: Some("Final deposit".to_string()),
3344+
});
3345+
3346+
// Render horizontal pipeline
3347+
let output = graph.render_horizontal_pipeline();
3348+
3349+
println!("\n{}", output);
3350+
3351+
// Assertions
3352+
assert!(output.contains("DEPTH 0"));
3353+
assert!(output.contains("DEPTH 3"));
3354+
assert!(output.contains("ExchangeWallet_Binance")); // Full address
3355+
assert!(output.contains("ColdStorage_HardwareWallet")); // Full address
3356+
assert!(output.contains("[$")); // Dollar amounts
3357+
assert!(output.contains("txs")); // Transaction counts
3358+
assert!(output.contains("🏦") || output.contains("○")); // Icons
3359+
}

0 commit comments

Comments
 (0)