Skip to content

Commit 6fa44a6

Browse files
Add rate limiting, fix circuit breaker race condition, setup CI
- Fix TOCTOU race in CircuitBreaker by replacing RwLock with Mutex - Add token-bucket RateLimiter with configurable rate and burst - Sanitize error messages to avoid leaking token existence info - Add GitHub Actions CI pipeline (fmt, clippy, test, build) - Add cargo-deny config for dependency auditing - Optimize routing with Arc-wrapped snapshots and arena-based BFS - Add rustdoc to public APIs - Add integration tests for end-to-end flows
1 parent e69b276 commit 6fa44a6

File tree

9 files changed

+640
-49
lines changed

9 files changed

+640
-49
lines changed

.github/workflows/ci.yml

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
name: CI
2+
3+
on:
4+
push:
5+
branches: [main]
6+
pull_request:
7+
branches: [main]
8+
9+
env:
10+
CARGO_TERM_COLOR: always
11+
RUSTFLAGS: "-Dwarnings"
12+
13+
jobs:
14+
check:
15+
name: Check
16+
runs-on: ubuntu-latest
17+
steps:
18+
- uses: actions/checkout@v4
19+
- name: Install Rust
20+
run: |
21+
rustup update stable
22+
rustup default stable
23+
- uses: Swatinem/rust-cache@v2
24+
- run: cargo check --all-targets
25+
26+
fmt:
27+
name: Format
28+
runs-on: ubuntu-latest
29+
steps:
30+
- uses: actions/checkout@v4
31+
- name: Install Rust
32+
run: |
33+
rustup update stable
34+
rustup default stable
35+
rustup component add rustfmt
36+
- run: cargo fmt --all --check
37+
38+
clippy:
39+
name: Clippy
40+
runs-on: ubuntu-latest
41+
steps:
42+
- uses: actions/checkout@v4
43+
- name: Install Rust
44+
run: |
45+
rustup update stable
46+
rustup default stable
47+
rustup component add clippy
48+
- uses: Swatinem/rust-cache@v2
49+
- run: cargo clippy --all-targets -- -D warnings
50+
51+
test:
52+
name: Test
53+
runs-on: ubuntu-latest
54+
steps:
55+
- uses: actions/checkout@v4
56+
- name: Install Rust
57+
run: |
58+
rustup update stable
59+
rustup default stable
60+
- uses: Swatinem/rust-cache@v2
61+
- run: cargo test --all
62+
63+
build-release:
64+
name: Build Release
65+
runs-on: ubuntu-latest
66+
steps:
67+
- uses: actions/checkout@v4
68+
- name: Install Rust
69+
run: |
70+
rustup update stable
71+
rustup default stable
72+
- uses: Swatinem/rust-cache@v2
73+
- run: cargo build --release

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
/target
2-
.DS_Store
2+
.DS_Store
3+
/notes

deny.toml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# cargo-deny configuration
2+
# Run with: cargo deny check
3+
4+
[advisories]
5+
vulnerability = "deny"
6+
unmaintained = "warn"
7+
yanked = "deny"
8+
notice = "warn"
9+
10+
[licenses]
11+
unlicensed = "deny"
12+
allow = [
13+
"MIT",
14+
"Apache-2.0",
15+
"Apache-2.0 WITH LLVM-exception",
16+
"BSD-2-Clause",
17+
"BSD-3-Clause",
18+
"ISC",
19+
"Zlib",
20+
"Unicode-DFS-2016",
21+
]
22+
copyleft = "warn"
23+
24+
[bans]
25+
multiple-versions = "warn"
26+
wildcards = "deny"
27+
highlight = "all"
28+
29+
# Deny specific crates if needed
30+
deny = []
31+
32+
[sources]
33+
unknown-registry = "deny"
34+
unknown-git = "deny"
35+
allow-registry = ["https://github.com/rust-lang/crates.io-index"]

packages/aggregator/src/core/event_processor.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,33 @@ use crate::core::format_duration;
77
use crate::core::state::{AggregatorState, SharedState};
88
use crate::traits::EventProcessor;
99

10+
/// Errors that can occur during event processing.
1011
#[derive(Debug, thiserror::Error)]
1112
pub enum Error {
1213
#[error("invalid orderbook: {0}")]
1314
InvalidOrderbook(String),
1415
}
1516

17+
/// Processes incoming orderbook events and updates the shared aggregator state.
18+
///
19+
/// The event processor validates each orderbook update (checking for crossed spreads
20+
/// and empty books) before ingesting it into the state. Invalid orderbooks are
21+
/// rejected and tracked in metrics.
22+
///
23+
/// # Example
24+
///
25+
/// ```ignore
26+
/// let state = create_shared_state();
27+
/// let processor = DexEventProcessor::new(state);
28+
/// processor.process_orderbook(orderbook)?;
29+
/// ```
1630
#[derive(Debug, Clone)]
1731
pub struct DexEventProcessor {
1832
state: SharedState,
1933
}
2034

2135
impl DexEventProcessor {
36+
/// Create a new event processor with the given shared state.
2237
pub fn new(state: SharedState) -> Self {
2338
Self { state }
2439
}

packages/aggregator/src/core/request_processor.rs

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,40 @@ use crate::core::routing::find_best_route;
66
use crate::core::state::SharedState;
77
use crate::traits::RequestProcessor;
88

9+
/// Errors that can occur during request processing.
910
#[derive(Debug, thiserror::Error)]
1011
pub enum Error {
1112
#[error("routing failed: {0}")]
1213
RoutingFailed(String),
1314
}
1415

16+
/// Processes swap requests by finding optimal multi-hop routes.
17+
///
18+
/// The request processor enforces rate limiting, circuit breaker protection,
19+
/// and slippage tolerance before executing swaps. It uses BFS-based routing
20+
/// to find the best path across multiple orderbooks.
21+
///
22+
/// # Features
23+
///
24+
/// - Rate limiting to prevent abuse
25+
/// - Circuit breaker for graceful degradation under failures
26+
/// - Slippage protection based on user-specified minimum output
27+
/// - Multi-hop routing (up to 3 hops) for optimal execution
28+
///
29+
/// # Example
30+
///
31+
/// ```ignore
32+
/// let state = create_shared_state();
33+
/// let processor = DexRequestProcessor::new(state);
34+
/// let response = processor.process_request(swap_request).await?;
35+
/// ```
1536
#[derive(Debug, Clone)]
1637
pub struct DexRequestProcessor {
1738
state: SharedState,
1839
}
1940

2041
impl DexRequestProcessor {
42+
/// Create a new request processor with the given shared state.
2143
pub fn new(state: SharedState) -> Self {
2244
Self { state }
2345
}
@@ -33,6 +55,16 @@ impl RequestProcessor for DexRequestProcessor {
3355
.requests_total
3456
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
3557

58+
// Rate limit check: reject if too many requests
59+
if !self.state.rate_limiter.try_acquire() {
60+
warn!("swap rejected: rate limited");
61+
self.state
62+
.metrics
63+
.requests_rate_limited
64+
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
65+
return Ok(SwapResponse::Failure("rate limited".to_string()));
66+
}
67+
3668
// Circuit breaker check: reject early if system is degraded
3769
if !self.state.circuit_breaker.allow_request() {
3870
warn!(
@@ -62,29 +94,32 @@ impl RequestProcessor for DexRequestProcessor {
6294
return Ok(SwapResponse::Failure("zero amount".to_string()));
6395
}
6496

65-
// Check tokens exist before searching (better error messages)
97+
// Check tokens exist before searching
6698
let input_known = self.state.contains_token(request.input_token);
6799
let output_known = self.state.contains_token(request.output_token);
68100

69101
// Reject swaps with unknown tokens (infrastructure issue, affects circuit breaker)
102+
// Return generic error to client but log details server-side
70103
if !input_known || !output_known {
71-
let reason = match (input_known, output_known) {
72-
(false, false) => "unknown tokens",
73-
(false, true) => "unknown input token",
74-
(true, false) => "unknown output token",
104+
let detail = match (input_known, output_known) {
105+
(false, false) => "both tokens unknown",
106+
(false, true) => "input token unknown",
107+
(true, false) => "output token unknown",
75108
_ => unreachable!(),
76109
};
77110
warn!(
78111
input_token = %request.input_token,
79112
output_token = %request.output_token,
80-
"swap rejected: {reason}"
113+
detail,
114+
"swap rejected: invalid request"
81115
);
82116
self.state
83117
.metrics
84118
.requests_failed
85119
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
86120
self.state.circuit_breaker.record_failure();
87-
return Ok(SwapResponse::Failure(reason.to_string()));
121+
// Generic error to avoid leaking token existence info
122+
return Ok(SwapResponse::Failure("invalid request".to_string()));
88123
}
89124

90125
// Find best route (infrastructure issue if fails, affects circuit breaker)

0 commit comments

Comments
 (0)