# Writing API Programs **Complete guide to building robust ExaBGP API programs** --- ## Table of Contents - [Introduction](#introduction) - [Language-Agnostic Patterns](#language-agnostic-patterns) - [Choosing Text vs JSON API](#choosing-text-vs-json-api) - [Program Structure](#program-structure) - [STDIN/STDOUT Communication](#stdinstdout-communication) - [Error Handling Strategies](#error-handling-strategies) - [State Management](#state-management) - [Production Patterns](#production-patterns) - [Complete Examples](#complete-examples) - [Testing Your Programs](#testing-your-programs) - [Common Pitfalls](#common-pitfalls) - [Best Practices](#best-practices) --- ## Introduction ExaBGP API programs are external processes that communicate with ExaBGP via **STDIN/STDOUT pipes**. This design is language-agnostic, simple, and powerful. ### What Makes a Good API Program? **Essential characteristics:** - ✅ **Robust error handling** - Don't crash on unexpected input - ✅ **Proper buffering** - Always flush STDOUT - ✅ **State tracking** - Avoid redundant announcements - ✅ **Logging** - Use STDERR for diagnostics - ✅ **Signal handling** - Graceful shutdown on SIGTERM - ✅ **Health awareness** - React to service state changes **Important principle:** > 🔴 **ExaBGP does NOT manipulate RIB/FIB** - Your program controls WHEN routes are announced. ExaBGP handles HOW they're sent via BGP. Route installation happens on the router, not in ExaBGP. --- ## Language-Agnostic Patterns ExaBGP works with **any language** that can read/write streams. ### Universal Pattern **All languages follow this pattern:** ``` 1. Wait for ExaBGP to be ready (sleep 2-5 seconds) 2. Initialize state (track what's announced) 3. Enter main loop: a. Check service health b. Compare with current state c. Send commands if state changed d. FLUSH output (critical!) e. Sleep/wait for next check ``` --- ### Python **Advantages:** - ✅ Standard library (no dependencies needed) - ✅ Easy JSON parsing - ✅ Good string handling - ✅ Rich libraries (socket, subprocess, etc.) **Basic template:** ```python #!/usr/bin/env python3 import sys import time # Wait for ExaBGP time.sleep(2) # Main loop while True: # Your logic here command = "announce route 100.10.0.0/24 next-hop self" sys.stdout.write(command + "\n") sys.stdout.flush() # CRITICAL time.sleep(5) ``` --- ### Bash **Advantages:** - ✅ No dependencies (available everywhere) - ✅ Simple for basic tasks - ✅ Easy to wrap existing tools **Disadvantages:** - ❌ Limited JSON parsing - ❌ Harder to manage state - ❌ Less robust error handling **Basic template:** ```bash #!/bin/bash # Wait for ExaBGP sleep 2 # Main loop while true; do # Your logic here echo "announce route 100.10.0.0/24 next-hop self" sleep 5 done ``` --- ### Go **Advantages:** - ✅ Fast and efficient - ✅ Static binary (no dependencies) - ✅ Excellent concurrency - ✅ Strong typing **Basic template:** ```go package main import ( "bufio" "fmt" "os" "time" ) func main() { // Wait for ExaBGP time.Sleep(2 * time.Second) // Main loop for { // Your logic here fmt.Println("announce route 100.10.0.0/24 next-hop self") time.Sleep(5 * time.Second) } } ``` --- ### Ruby **Basic template:** ```ruby #!/usr/bin/env ruby # Wait for ExaBGP sleep 2 # Main loop loop do # Your logic here puts "announce route 100.10.0.0/24 next-hop self" STDOUT.flush # CRITICAL sleep 5 end ``` --- ### Node.js **Basic template:** ```javascript #!/usr/bin/env node // Wait for ExaBGP setTimeout(() => { // Main loop setInterval(() => { // Your logic here console.log("announce route 100.10.0.0/24 next-hop self"); }, 5000); }, 2000); ``` --- ## Choosing Text vs JSON API ### Text API **Use text encoder when:** - ✅ Only **sending** commands (announce/withdraw) - ✅ Simple use cases (static routes, basic health checks) - ✅ Want human-readable format - ✅ Testing manually **Configuration:** ```ini process announce { run /etc/exabgp/api/announce.py; encoder text; } ``` **Example:** ```python print("announce route 100.10.0.0/24 next-hop self") sys.stdout.flush() ``` --- ### JSON API **Use JSON encoder when:** - ✅ **Receiving** BGP updates from router - ✅ Need structured data parsing - ✅ Building complex integrations - ✅ Processing routes from peers **Configuration:** ```ini process receive { run /etc/exabgp/api/receive.py; encoder json; receive { parsed; updates; neighbor-changes; } } ``` **Example:** ```python import json for line in sys.stdin: msg = json.loads(line) if msg['type'] == 'update': # Process BGP updates pass ``` --- ### Using Both **Best practice for complex scenarios:** ```ini # Process 1: Send commands (text) process announce { run /etc/exabgp/api/announce.py; encoder text; } # Process 2: Receive updates (JSON) process receive { run /etc/exabgp/api/receive.py; encoder json; receive { parsed; updates; } } neighbor 192.168.1.1 { router-id 192.168.1.2; local-address 192.168.1.2; local-as 65001; peer-as 65000; api { processes [ announce, receive ]; } } ``` --- ## Program Structure ### Basic Structure **Every API program should have:** ```python #!/usr/bin/env python3 """ program.py - Description of what this does """ import sys import time import signal # Configuration CONFIG = { 'check_interval': 5, 'service_ip': '100.10.0.100', 'service_port': 80 } # Global state announced = False def log(message): """Log to STDERR (goes to ExaBGP log)""" sys.stderr.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}\n") sys.stderr.flush() def signal_handler(signum, frame): """Handle SIGTERM gracefully""" log(f"Received signal {signum}, shutting down") sys.exit(0) def check_health(): """Check if service is healthy""" # Your health check logic return True def announce_route(prefix): """Announce route to ExaBGP""" global announced sys.stdout.write(f"announce route {prefix} next-hop self\n") sys.stdout.flush() announced = True log(f"Announced {prefix}") def withdraw_route(prefix): """Withdraw route from ExaBGP""" global announced sys.stdout.write(f"withdraw route {prefix}\n") sys.stdout.flush() announced = False log(f"Withdrawn {prefix}") def main(): """Main program loop""" global announced # Register signal handler signal.signal(signal.SIGTERM, signal_handler) # Wait for ExaBGP log("Starting, waiting for ExaBGP...") time.sleep(2) log("Ready") # Main loop prefix = f"{CONFIG['service_ip']}/32" while True: try: healthy = check_health() if healthy and not announced: announce_route(prefix) elif not healthy and announced: withdraw_route(prefix) time.sleep(CONFIG['check_interval']) except Exception as e: log(f"Error in main loop: {e}") time.sleep(CONFIG['check_interval']) if __name__ == '__main__': try: main() except KeyboardInterrupt: log("Interrupted by user") sys.exit(0) except Exception as e: log(f"Fatal error: {e}") sys.exit(1) ``` --- ### Receiving BGP Updates Structure **For programs that process BGP updates:** ```python #!/usr/bin/env python3 """ receive.py - Process incoming BGP updates """ import sys import json def log(message): """Log to STDERR""" sys.stderr.write(f"{message}\n") sys.stderr.flush() def handle_update(msg): """Process UPDATE messages""" try: update = msg['neighbor']['message']['update'] # Process announcements if 'announce' in update: if 'ipv4 unicast' in update['announce']: routes = update['announce']['ipv4 unicast'] for prefix, attrs_list in routes.items(): for attrs in attrs_list: nexthop = attrs.get('next-hop', 'unknown') log(f"[ANNOUNCE] {prefix} via {nexthop}") # Process withdrawals if 'withdraw' in update: if 'ipv4 unicast' in update['withdraw']: for prefix in update['withdraw']['ipv4 unicast'].keys(): log(f"[WITHDRAW] {prefix}") except Exception as e: log(f"[ERROR] Failed to process update: {e}") def handle_state(msg): """Process STATE messages""" try: peer = msg['neighbor']['address']['peer'] state = msg['neighbor']['state'] log(f"[STATE] BGP session with {peer}: {state}") except Exception as e: log(f"[ERROR] Failed to process state: {e}") def handle_notification(msg): """Process NOTIFICATION messages""" try: peer = msg['neighbor']['address']['peer'] notification = msg['neighbor']['message']['notification'] code = notification.get('code', 'unknown') log(f"[NOTIFICATION] From {peer}: code={code}") except Exception as e: log(f"[ERROR] Failed to process notification: {e}") def main(): """Main message processing loop""" log("[INFO] Starting BGP message processor") while True: line = sys.stdin.readline() if not line: break # EOF try: msg = json.loads(line.strip()) msg_type = msg.get('type', 'unknown') if msg_type == 'update': handle_update(msg) elif msg_type == 'state': handle_state(msg) elif msg_type == 'notification': handle_notification(msg) elif msg_type == 'keepalive': # Usually don't need to process pass else: log(f"[WARN] Unknown message type: {msg_type}") except json.JSONDecodeError as e: log(f"[ERROR] JSON parse error: {e}") except Exception as e: log(f"[ERROR] Processing error: {e}") if __name__ == '__main__': try: main() except KeyboardInterrupt: log("[INFO] Interrupted") sys.exit(0) ``` --- ## STDIN/STDOUT Communication ### Critical Rules **ALWAYS flush STDOUT:** ```python # WRONG - buffered, ExaBGP won't see it immediately print("announce route 100.10.0.0/24 next-hop self") # CORRECT - flushed print("announce route 100.10.0.0/24 next-hop self") sys.stdout.flush() # ALSO CORRECT - flush parameter print("announce route 100.10.0.0/24 next-hop self", flush=True) # ALSO CORRECT - write + flush sys.stdout.write("announce route 100.10.0.0/24 next-hop self\n") sys.stdout.flush() ``` --- ### Unbuffered Mode **Python unbuffered mode (recommended):** ```python #!/usr/bin/env python3 -u # The -u flag disables buffering ``` **Or in configuration:** ```ini process announce { run python3 -u /etc/exabgp/api/announce.py; encoder text; } ``` **Environment variable:** ```bash export PYTHONUNBUFFERED=1 python3 /etc/exabgp/api/announce.py ``` --- ### Reading from STDIN **Blocking read (simple):** ```python while True: line = sys.stdin.readline() if not line: break # EOF process(line) ``` **Non-blocking read with select() (advanced):** ```python import select # Check if data available with timeout ready, _, _ = select.select([sys.stdin], [], [], 1.0) if ready: line = sys.stdin.readline() process(line) ``` --- ### Using Robust ACK Handling **ExaBGP 4.x and 5.x support ACK responses (enabled by default):** ```python import sys import select import time def wait_for_ack(expected_count=1, timeout=30): """ Wait for ACK responses with polling loop. ExaBGP may not respond immediately, so we poll with sleep. Handles both text and JSON encoder formats: - Text: "done", "error", "shutdown" - JSON: {"answer": "done|error|shutdown", "message": "..."} """ import json received = 0 start_time = time.time() while received < expected_count: if time.time() - start_time >= timeout: return False ready, _, _ = select.select([sys.stdin], [], [], 0.1) if ready: line = sys.stdin.readline().strip() # Parse response (could be text or JSON) answer = None if line.startswith('{'): try: data = json.loads(line) answer = data.get('answer') except: pass else: answer = line if answer == "done": received += 1 elif answer == "error": return False elif answer == "shutdown": raise SystemExit(0) else: time.sleep(0.1) return True def send_with_ack(command): """Send command and wait for ACK""" sys.stdout.write(command + "\n") sys.stdout.flush() return wait_for_ack(expected_count=1) # Use it if send_with_ack("announce route 100.10.0.0/24 next-hop self"): # Command succeeded pass else: # Command failed - handle error sys.exit(1) ``` --- ## Error Handling Strategies ### Defensive Programming **Always use try/except:** ```python def check_health(): """Check service health with error handling""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2) result = sock.connect_ex(('100.10.0.100', 80)) sock.close() return result == 0 except socket.error as e: log(f"[ERROR] Socket error: {e}") return False except Exception as e: log(f"[ERROR] Unexpected error: {e}") return False ``` --- ### Retry Logic **Retry with exponential backoff:** ```python def retry_with_backoff(func, max_retries=3, initial_delay=1): """Retry function with exponential backoff""" delay = initial_delay for attempt in range(max_retries): try: return func() except Exception as e: if attempt == max_retries - 1: log(f"[ERROR] Failed after {max_retries} attempts: {e}") raise log(f"[WARN] Attempt {attempt + 1} failed: {e}, retrying in {delay}s") time.sleep(delay) delay *= 2 # Exponential backoff # Use it def announce(): sys.stdout.write("announce route 100.10.0.0/24 next-hop self\n") sys.stdout.flush() retry_with_backoff(announce, max_retries=3) ``` --- ### Circuit Breaker Pattern **Prevent cascade failures:** ```python class CircuitBreaker: def __init__(self, failure_threshold=5, timeout=60): self.failure_count = 0 self.failure_threshold = failure_threshold self.timeout = timeout self.last_failure_time = None self.state = 'closed' # closed, open, half-open def call(self, func): """Call function with circuit breaker protection""" if self.state == 'open': # Check if timeout expired if time.time() - self.last_failure_time >= self.timeout: self.state = 'half-open' log("[CIRCUIT] Half-open, trying again") else: raise Exception("Circuit breaker is OPEN") try: result = func() # Success - reset circuit if self.state == 'half-open': self.state = 'closed' self.failure_count = 0 log("[CIRCUIT] Closed, service recovered") return result except Exception as e: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = 'open' log(f"[CIRCUIT] OPEN after {self.failure_count} failures") raise # Use it breaker = CircuitBreaker(failure_threshold=5, timeout=60) while True: try: healthy = breaker.call(check_health) if healthy: announce_route() except Exception as e: log(f"[ERROR] Circuit breaker: {e}") time.sleep(5) ``` --- ## State Management ### Track What's Announced **Avoid redundant announcements:** ```python # Global state tracking announced_routes = set() def announce_if_needed(prefix): """Only announce if not already announced""" global announced_routes if prefix not in announced_routes: sys.stdout.write(f"announce route {prefix} next-hop self\n") sys.stdout.flush() announced_routes.add(prefix) log(f"[ANNOUNCE] {prefix}") else: log(f"[SKIP] {prefix} already announced") def withdraw_if_needed(prefix): """Only withdraw if currently announced""" global announced_routes if prefix in announced_routes: sys.stdout.write(f"withdraw route {prefix}\n") sys.stdout.flush() announced_routes.remove(prefix) log(f"[WITHDRAW] {prefix}") else: log(f"[SKIP] {prefix} not announced") ``` --- ### State Machine Pattern **Formal state management:** ```python class ServiceState: def __init__(self): self.current_state = 'unknown' self.route_announced = False def transition(self, new_state): """Handle state transitions""" if new_state == self.current_state: return # No change log(f"[STATE] {self.current_state} -> {new_state}") old_state = self.current_state self.current_state = new_state # Handle transitions if new_state == 'healthy' and not self.route_announced: self.announce() elif new_state == 'unhealthy' and self.route_announced: self.withdraw() def announce(self): """Announce route""" sys.stdout.write("announce route 100.10.0.100/32 next-hop self\n") sys.stdout.flush() self.route_announced = True log("[ACTION] Route announced") def withdraw(self): """Withdraw route""" sys.stdout.write("withdraw route 100.10.0.100/32\n") sys.stdout.flush() self.route_announced = False log("[ACTION] Route withdrawn") # Use it state = ServiceState() while True: healthy = check_health() state.transition('healthy' if healthy else 'unhealthy') time.sleep(5) ``` --- ## Production Patterns ### Health Check Patterns **HTTP health check:** ```python import urllib.request def http_health_check(url, timeout=2): """HTTP GET health check""" try: req = urllib.request.Request(url) response = urllib.request.urlopen(req, timeout=timeout) return response.status == 200 except Exception as e: log(f"[HEALTH] HTTP check failed: {e}") return False # Use it if http_health_check('http://127.0.0.1:8080/health'): announce_route('100.10.0.100/32') ``` **TCP port check:** ```python import socket def tcp_port_check(host, port, timeout=2): """TCP connection check""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((host, port)) sock.close() return result == 0 except Exception as e: log(f"[HEALTH] TCP check failed: {e}") return False # Use it if tcp_port_check('127.0.0.1', 80): announce_route('100.10.0.100/32') ``` **Command execution check:** ```python import subprocess def command_health_check(command, timeout=5): """Execute command, check exit code""" try: result = subprocess.run( command, shell=True, capture_output=True, timeout=timeout ) return result.returncode == 0 except Exception as e: log(f"[HEALTH] Command check failed: {e}") return False # Use it if command_health_check('curl -sf http://localhost/health'): announce_route('100.10.0.100/32') ``` --- ### Hysteresis Pattern **Prevent route flapping:** ```python class HealthTracker: def __init__(self, threshold_up=3, threshold_down=2): self.threshold_up = threshold_up self.threshold_down = threshold_down self.consecutive_up = 0 self.consecutive_down = 0 self.current_state = 'down' def update(self, healthy): """Update health state with hysteresis""" if healthy: self.consecutive_up += 1 self.consecutive_down = 0 if self.consecutive_up >= self.threshold_up: if self.current_state != 'up': log(f"[HEALTH] Service UP (after {self.consecutive_up} checks)") self.current_state = 'up' else: self.consecutive_down += 1 self.consecutive_up = 0 if self.consecutive_down >= self.threshold_down: if self.current_state != 'down': log(f"[HEALTH] Service DOWN (after {self.consecutive_down} checks)") self.current_state = 'down' return self.current_state # Use it health = HealthTracker(threshold_up=3, threshold_down=2) while True: check_result = check_health() state = health.update(check_result) if state == 'up': announce_route('100.10.0.100/32') else: withdraw_route('100.10.0.100/32') time.sleep(5) ``` --- ### Graceful Shutdown **Handle SIGTERM properly:** ```python import signal def shutdown_handler(signum, frame): """Handle shutdown signal""" log("[SHUTDOWN] Received SIGTERM, cleaning up...") # Withdraw all announced routes for prefix in announced_routes: sys.stdout.write(f"withdraw route {prefix}\n") sys.stdout.flush() log(f"[CLEANUP] Withdrawn {prefix}") log("[SHUTDOWN] Complete") sys.exit(0) # Register handler signal.signal(signal.SIGTERM, shutdown_handler) signal.signal(signal.SIGINT, shutdown_handler) ``` --- ## Complete Examples ### Example 1: Production Health Check (Python) ```python #!/usr/bin/env python3 """ healthcheck.py - Production-grade health check for ExaBGP """ import sys import time import socket import signal import urllib.request # Configuration SERVICE_IP = "100.10.0.100" SERVICE_PORT = 80 CHECK_INTERVAL = 5 HEALTH_THRESHOLD_UP = 3 HEALTH_THRESHOLD_DOWN = 2 TIMEOUT = 2 # State announced = False consecutive_healthy = 0 consecutive_unhealthy = 0 def log(message): """Log to STDERR""" timestamp = time.strftime('%Y-%m-%d %H:%M:%S') sys.stderr.write(f"[{timestamp}] {message}\n") sys.stderr.flush() def signal_handler(signum, frame): """Handle shutdown gracefully""" log(f"Received signal {signum}, withdrawing routes and exiting") if announced: sys.stdout.write(f"withdraw route {SERVICE_IP}/32\n") sys.stdout.flush() sys.exit(0) def check_tcp_port(host, port, timeout): """Check if TCP port is open""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((host, port)) sock.close() return result == 0 except Exception as e: log(f"TCP check error: {e}") return False def check_http(url, timeout): """Check HTTP endpoint""" try: req = urllib.request.Request(url) response = urllib.request.urlopen(req, timeout=timeout) return response.status == 200 except Exception as e: log(f"HTTP check error: {e}") return False def is_healthy(): """Perform health checks""" # TCP port check if not check_tcp_port('127.0.0.1', SERVICE_PORT, TIMEOUT): return False # HTTP health endpoint check if not check_http(f'http://127.0.0.1:{SERVICE_PORT}/health', TIMEOUT): return False return True def announce_route(): """Announce route""" global announced sys.stdout.write(f"announce route {SERVICE_IP}/32 next-hop self\n") sys.stdout.flush() announced = True log(f"ANNOUNCED {SERVICE_IP}/32") def withdraw_route(): """Withdraw route""" global announced sys.stdout.write(f"withdraw route {SERVICE_IP}/32\n") sys.stdout.flush() announced = False log(f"WITHDRAWN {SERVICE_IP}/32") def main(): """Main loop with hysteresis""" global consecutive_healthy, consecutive_unhealthy # Register signal handlers signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) log("Starting health check daemon") time.sleep(2) # Wait for ExaBGP log("Ready") while True: try: healthy = is_healthy() if healthy: consecutive_healthy += 1 consecutive_unhealthy = 0 # Announce after N consecutive healthy checks if consecutive_healthy >= HEALTH_THRESHOLD_UP and not announced: announce_route() else: consecutive_unhealthy += 1 consecutive_healthy = 0 # Withdraw after N consecutive unhealthy checks if consecutive_unhealthy >= HEALTH_THRESHOLD_DOWN and announced: withdraw_route() time.sleep(CHECK_INTERVAL) except Exception as e: log(f"Error in main loop: {e}") time.sleep(CHECK_INTERVAL) if __name__ == '__main__': main() ``` --- ### Example 2: BGP Route Monitor (Python) ```python #!/usr/bin/env python3 """ route_monitor.py - Monitor BGP routes and trigger actions """ import sys import json import subprocess # Track routes routes = {} def log(message): """Log to STDERR""" sys.stderr.write(f"{message}\n") sys.stderr.flush() def trigger_script(action, prefix, nexthop): """Trigger external script on route change""" script = f"/etc/exabgp/scripts/on_{action}.sh" try: subprocess.run([script, prefix, nexthop], timeout=5) log(f"[TRIGGER] Executed {script} for {prefix}") except Exception as e: log(f"[ERROR] Script execution failed: {e}") def handle_announcement(prefix, attrs): """Handle route announcement""" nexthop = attrs.get('next-hop', 'unknown') if prefix not in routes: log(f"[NEW] {prefix} via {nexthop}") routes[prefix] = attrs trigger_script('announce', prefix, nexthop) else: log(f"[UPDATE] {prefix} via {nexthop}") routes[prefix] = attrs def handle_withdrawal(prefix): """Handle route withdrawal""" if prefix in routes: nexthop = routes[prefix].get('next-hop', 'unknown') log(f"[WITHDRAWN] {prefix}") del routes[prefix] trigger_script('withdraw', prefix, nexthop) def main(): """Main message processing loop""" log("[START] BGP route monitor") while True: line = sys.stdin.readline() if not line: break try: msg = json.loads(line.strip()) if msg['type'] == 'update': update = msg['neighbor']['message']['update'] # Process announcements if 'announce' in update: if 'ipv4 unicast' in update['announce']: for prefix, attrs_list in update['announce']['ipv4 unicast'].items(): handle_announcement(prefix, attrs_list[0]) # Process withdrawals if 'withdraw' in update: if 'ipv4 unicast' in update['withdraw']: for prefix in update['withdraw']['ipv4 unicast'].keys(): handle_withdrawal(prefix) elif msg['type'] == 'state': peer = msg['neighbor']['address']['peer'] state = msg['neighbor']['state'] log(f"[STATE] BGP session {peer}: {state}") except json.JSONDecodeError as e: log(f"[ERROR] JSON parse error: {e}") except Exception as e: log(f"[ERROR] Processing error: {e}") if __name__ == '__main__': main() ``` --- ### Example 3: Health Check (Go) ```go package main import ( "fmt" "net" "os" "os/signal" "syscall" "time" ) const ( serviceIP = "100.10.0.100" servicePort = 80 checkInterval = 5 * time.Second ) var announced = false func log(message string) { timestamp := time.Now().Format("2006-01-02 15:04:05") fmt.Fprintf(os.Stderr, "[%s] %s\n", timestamp, message) } func checkHealth() bool { conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", servicePort), 2*time.Second) if err != nil { return false } conn.Close() return true } func announceRoute() { fmt.Printf("announce route %s/32 next-hop self\n", serviceIP) announced = true log(fmt.Sprintf("ANNOUNCED %s/32", serviceIP)) } func withdrawRoute() { fmt.Printf("withdraw route %s/32\n", serviceIP) announced = false log(fmt.Sprintf("WITHDRAWN %s/32", serviceIP)) } func main() { // Handle signals sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) go func() { sig := <-sigChan log(fmt.Sprintf("Received signal %v, exiting", sig)) if announced { withdrawRoute() } os.Exit(0) }() log("Starting health check daemon") time.Sleep(2 * time.Second) // Wait for ExaBGP log("Ready") for { healthy := checkHealth() if healthy && !announced { announceRoute() } else if !healthy && announced { withdrawRoute() } time.Sleep(checkInterval) } } ``` --- ### Example 4: Health Check (Bash) ```bash #!/bin/bash # healthcheck.sh - Simple health check in bash SERVICE_IP="100.10.0.100" SERVICE_PORT=80 CHECK_INTERVAL=5 ANNOUNCED=0 log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2 } check_health() { # Use nc (netcat) or curl for health check timeout 2 nc -z 127.0.0.1 $SERVICE_PORT 2>/dev/null return $? } announce_route() { echo "announce route ${SERVICE_IP}/32 next-hop self" ANNOUNCED=1 log "ANNOUNCED ${SERVICE_IP}/32" } withdraw_route() { echo "withdraw route ${SERVICE_IP}/32" ANNOUNCED=0 log "WITHDRAWN ${SERVICE_IP}/32" } cleanup() { log "Received SIGTERM, cleaning up" if [ $ANNOUNCED -eq 1 ]; then withdraw_route fi exit 0 } trap cleanup SIGTERM SIGINT log "Starting health check daemon" sleep 2 # Wait for ExaBGP log "Ready" while true; do if check_health; then if [ $ANNOUNCED -eq 0 ]; then announce_route fi else if [ $ANNOUNCED -eq 1 ]; then withdraw_route fi fi sleep $CHECK_INTERVAL done ``` --- ## Testing Your Programs ### Manual Testing **Test commands without ExaBGP:** ```bash # Test your program's output ./announce.py | head -5 # Expected output: # announce route 100.10.0.0/24 next-hop self ``` **Test with mock stdin:** ```bash # Send test JSON to your receive program echo '{"type":"state","neighbor":{"state":"up"}}' | ./receive.py ``` --- ### Unit Testing **Python unittest example:** ```python import unittest from io import StringIO import sys class TestHealthCheck(unittest.TestCase): def test_tcp_check(self): """Test TCP health check""" result = tcp_port_check('127.0.0.1', 80, timeout=2) self.assertIsInstance(result, bool) def test_announce_format(self): """Test announce command format""" output = StringIO() sys.stdout = output announce_route('100.10.0.0/24') result = output.getvalue() self.assertIn('announce route 100.10.0.0/24', result) sys.stdout = sys.__stdout__ if __name__ == '__main__': unittest.main() ``` --- ### Integration Testing **Test with ExaBGP:** ```bash # 1. Start ExaBGP in one terminal exabgp /etc/exabgp/exabgp.conf # 2. Check ExaBGP logs for your program output tail -f /var/log/exabgp.log # 3. Trigger health changes # - Stop service -> Should withdraw route # - Start service -> Should announce route ``` --- ## Common Pitfalls ### 1. Forgot to Flush **Problem:** ```python print("announce route 100.10.0.0/24 next-hop self") # Route never announced - stuck in buffer ``` **Solution:** ```python print("announce route 100.10.0.0/24 next-hop self") sys.stdout.flush() # Always flush! ``` --- ### 2. Process Exits Immediately **Problem:** ```python print("announce route 100.10.0.0/24 next-hop self") sys.stdout.flush() # Script exits, ExaBGP restarts it repeatedly ``` **Solution:** ```python print("announce route 100.10.0.0/24 next-hop self") sys.stdout.flush() # Keep running while True: time.sleep(60) ``` --- ### 3. No Error Handling **Problem:** ```python # Crashes on any error result = check_health() ``` **Solution:** ```python try: result = check_health() except Exception as e: log(f"Health check failed: {e}") result = False ``` --- ### 4. Route Flapping **Problem:** ```python # Announces and withdraws on every tiny change if check_health(): announce() else: withdraw() ``` **Solution:** ```python # Use hysteresis (require N consecutive checks) if consecutive_healthy >= 3: announce() elif consecutive_unhealthy >= 2: withdraw() ``` --- ### 5. Redundant Announcements **Problem:** ```python # Announces same route repeatedly while True: announce_route('100.10.0.0/24') # Wastes bandwidth time.sleep(5) ``` **Solution:** ```python # Track state if healthy and not announced: announce_route('100.10.0.0/24') announced = True ``` --- ## Best Practices ### 1. Always Flush Output ```python sys.stdout.flush() # After every command ``` ### 2. Use STDERR for Logging ```python # STDOUT = commands to ExaBGP # STDERR = logging (goes to ExaBGP log) sys.stderr.write(f"[INFO] Service healthy\n") ``` ### 3. Track State ```python # Avoid redundant operations if new_state != old_state: take_action() ``` ### 4. Handle Signals ```python signal.signal(signal.SIGTERM, shutdown_handler) ``` ### 5. Use Hysteresis ```python # Require N consecutive checks before changing state if consecutive_healthy >= THRESHOLD: announce() ``` ### 6. Add Timeouts ```python # All I/O operations should timeout sock.settimeout(2) urllib.request.urlopen(req, timeout=2) subprocess.run(cmd, timeout=5) ``` ### 7. Validate Input ```python # When receiving JSON try: msg = json.loads(line) # Validate structure if 'type' not in msg: raise ValueError("Missing type field") except Exception as e: log(f"Invalid message: {e}") ``` ### 8. Log Everything Important ```python log(f"[STATE] Service: {state}") log(f"[ACTION] Announced {prefix}") log(f"[ERROR] Health check failed: {error}") ``` ### 9. Use Configuration ```python # Don't hardcode - use config file or environment SERVICE_IP = os.getenv('SERVICE_IP', '100.10.0.100') CHECK_INTERVAL = int(os.getenv('CHECK_INTERVAL', '5')) ``` ### 10. Test Without ExaBGP ```python # Make your program testable independently if __name__ == '__main__': # Can run without ExaBGP for testing main() ``` --- ## See Also - **[API Overview](API-Overview)** - API architecture and concepts - **[Text API Reference](Text-API-Reference)** - Command syntax reference - **[JSON API Reference](JSON-API-Reference)** - JSON message format - **[Error Handling](Error-Handling)** - Comprehensive error handling guide - **[Production Best Practices](Production-Best-Practices)** - Production deployment - **[Service High Availability](Service-High-Availability)** - HA patterns - **[Configuration Syntax](Configuration-Syntax)** - Process configuration --- **Ready for production?** See [Production Best Practices](Production-Best-Practices) → ---