Detailed technical documentation for Ahenk's peer-to-peer synchronization system.
Ahenk uses a decentralized P2P architecture based on libp2p for synchronizing data across devices without requiring a central server.
┌─────────────────────────────────────────────────┐
│ Application Layer │
│ (OpLog sync, device authorization) │
├─────────────────────────────────────────────────┤
│ Protocol Layer │
│ - SyncProtocol (request/response) │
│ - Heartbeat (keep-alive) │
│ - Discovery (announce presence) │
├─────────────────────────────────────────────────┤
│ Transport Layer │
│ - mDNS (local discovery) │
│ - Gossipsub (message propagation) │
│ - Relay (NAT traversal) │
│ - DCUtR (hole punching) │
├─────────────────────────────────────────────────┤
│ Security Layer │
│ - Noise Protocol (encryption) │
│ - Ed25519 (authentication) │
└─────────────────────────────────────────────────┘
Network behavior that combines multiple libp2p protocols:
pub struct AhenkBehaviour {
pub mdns: mdns::tokio::Behaviour, // Local discovery
pub gossipsub: gossipsub::Behaviour, // Message propagation
pub relay_client: relay::client::Behaviour, // NAT traversal
pub dcutr: dcutr::Behaviour, // Hole punching
}Local Network (mDNS):
- Broadcasts presence on local network
- Discovers peers on same WiFi/LAN
- Zero configuration required
- Fastest connection method
Global Network (Relay):
- Connects through relay servers
- Works across different networks
- Enables NAT traversal
- Fallback for direct connections
DCUtR (Direct Connection Upgrade):
- Attempts to establish direct connection
- Uses relay for coordination
- Reduces relay server load
- Better performance and privacy
Sync messages use JSON-encoded structures:
pub enum SyncMessage {
// Handshake
Hello {
peer_id: PeerId,
device_id: Uuid,
user_id: Uuid,
schema_version: i32,
},
// Request operations
RequestOps {
since: i64, // HLC timestamp
},
// Send operations
SendOps {
operations: Vec<OplogEntry>,
},
// Heartbeat
Ping { timestamp: i64 },
Pong { timestamp: i64 },
// Error
Error { message: String },
}Device A Device B
| |
|------ Hello (device_id, user) ->|
|<----- Hello (device_id, user) --|
| |
| [Verify both devices belong |
| to same user account] |
| |
|------ RequestOps (since: 0) --->|
|<----- SendOps (all ops) ---------|
| |
|------ RequestOps (since: 0) --->|
|<----- SendOps (all ops) ---------|
| |
| [Both devices now have |
| complete operation history] |
Device A Device B
| |
| [Device A creates new operation] |
| |
|------ SendOps (new ops) -------->|
| |
| [Device B applies operations] |
| [Device B sends back ACK] |
| |
|<----- Pong -----------------------|
Provides causal ordering without clock synchronization:
pub struct HybridLogicalClock {
physical: i64, // System time (milliseconds)
logical: u16, // Counter for simultaneous events
}Properties:
- Monotonically increasing
- Captures causality
- Tolerates clock skew
- Deterministic ordering
Example:
Device A (time 1000): op1 → HLC{1000, 0}
Device B (time 999): op2 → HLC{999, 0}
After sync:
Both devices order: op2 (999,0) < op1 (1000,0)
Conflict resolution strategy:
fn resolve_conflict(local: OplogEntry, remote: OplogEntry) -> OplogEntry {
if remote.timestamp > local.timestamp {
remote // Remote wins
} else if remote.timestamp == local.timestamp {
// Tie-breaker: higher device_id wins (deterministic)
if remote.device_id > local.device_id {
remote
} else {
local
}
} else {
local // Local wins
}
}pub fn merge(conn: &mut Connection, remote_ops: &[OplogEntry]) -> Result<()> {
conn.execute("BEGIN TRANSACTION", [])?;
for remote_op in remote_ops {
// Check if operation already exists
let exists = check_oplog_exists(conn, &remote_op.id)?;
if !exists {
// Insert new operation
insert_oplog(conn, remote_op)?;
// Apply to tables
apply_oplog_entry(conn, remote_op)?;
}
}
conn.execute("COMMIT", [])?;
Ok(())
}Home Network (NAT):
Phone: 192.168.1.100 (private)
Router: 203.0.113.45 (public)
Office Network (NAT):
Laptop: 10.0.0.50 (private)
Router: 198.51.100.23 (public)
Problem: Devices can't directly connect
Phone → Relay Server ← Laptop
(203.0.113.100)
- Always works
- Higher latency
- Uses relay bandwidth
- Privacy concern (relay sees traffic)
Step 1: Both connect to relay
Phone → Relay ← Laptop
Step 2: Relay coordinates
Step 3: Simultaneous connect
Phone ←────────→ Laptop
(direct P2P)
- Lower latency
- Better privacy
- Saves relay bandwidth
- May fail with strict NAT
// Create swarm with relay support
let config = P2PConfig {
enable_mdns: true,
enable_relay: true,
relay_servers: vec![
"/ip4/relay1.example.com/tcp/4001/p2p/...".to_string(),
"/ip4/relay2.example.com/tcp/4001/p2p/...".to_string(),
],
..Default::default()
};
let mut swarm = create_swarm(keypair, config)?;
// Connect to relay servers
connect_to_relay_servers(&mut swarm, &config.relay_servers)?;
// Listen for incoming connections
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;Noise Protocol:
- XX handshake pattern
- ChaCha20-Poly1305 encryption
- Forward secrecy
- Mutual authentication
// Noise encryption is automatic in libp2p
transport
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&keypair)?)
.multiplex(yamux::Config::default())Only authorized devices can sync:
// Check device authorization before syncing
fn verify_device_authorized(
conn: &Connection,
peer_device_id: Uuid,
local_user_id: Uuid,
) -> Result<bool> {
let device = get_device(conn, peer_device_id)?;
Ok(device.user_id == local_user_id)
}Future enhancement for message authenticity:
// Sign operation with device key
let signature = device_key.sign(&operation_data);
let signed_op = SignedOplogEntry {
operation,
signature,
device_public_key,
};
// Verify on receiving end
if !verify_signature(&signed_op) {
return Err("Invalid signature");
}Send multiple operations in one message:
const BATCH_SIZE: usize = 100;
let mut batch = Vec::new();
for op in pending_ops {
batch.push(op);
if batch.len() >= BATCH_SIZE {
send_ops(&mut swarm, &batch)?;
batch.clear();
}
}
// Send remaining
if !batch.is_empty() {
send_ops(&mut swarm, &batch)?;
}Only send operations since last sync:
// Store last sync timestamp per peer
let last_sync = get_last_sync_timestamp(conn, peer_id)?;
// Request only new operations
let request = SyncMessage::RequestOps {
since: last_sync,
};
send_message(&mut swarm, peer_id, request)?;Compress large operation payloads:
use flate2::write::GzEncoder;
fn compress_ops(ops: &[OplogEntry]) -> Result<Vec<u8>> {
let json = serde_json::to_vec(ops)?;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&json)?;
Ok(encoder.finish()?)
}Enable detailed P2P logs:
RUST_LOG=ahenk::logic::sync=debug ahenk-cli startTrack sync statistics:
pub struct SyncStats {
pub connected_peers: usize,
pub total_ops_sent: u64,
pub total_ops_received: u64,
pub last_sync: DateTime<Utc>,
pub bytes_sent: u64,
pub bytes_received: u64,
}Monitor libp2p events:
while let Some(event) = swarm.select_next_some().await {
match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {}", address);
}
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
println!("Connected to {}", peer_id);
}
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
println!("Disconnected from {}: {:?}", peer_id, cause);
}
// ... handle other events
}
}Check:
- mDNS enabled:
config.enable_mdns = true - Same local network
- Firewall allows multicast
- Same user account on both devices
Solution:
# Try relay servers instead
ahenk-cli config set network.relay_servers "/ip4/relay.example.com/tcp/4001/p2p/..."Possible causes:
- Using relay instead of direct connection
- Geographic distance between peers
- Network congestion
Check:
# See if using relay or direct
ahenk-cli peer list --verboseCheck operation logs:
SELECT * FROM oplog WHERE timestamp > ?
ORDER BY timestamp DESC;Verify HLC ordering:
let ops = get_all_ops(conn)?;
for i in 1..ops.len() {
assert!(ops[i].timestamp >= ops[i-1].timestamp);
}- Always verify device authorization before syncing
- Use batching for multiple operations
- Enable both mDNS and relay for reliability
- Monitor sync status in production
- Implement retry logic for failed syncs
- Log sync events for debugging
- Test with network partitions to ensure eventual consistency
- AHENK_INFRASTRUCTURE_GUIDE.md - Complete architecture
- README.md - Getting started
- CLI_USAGE.md - CLI sync commands
- libp2p Documentation - libp2p details