Skip to content

Latest commit

 

History

History
275 lines (208 loc) · 8.25 KB

File metadata and controls

275 lines (208 loc) · 8.25 KB

Fault Tolerance

Snakepit provides fault tolerance mechanisms to build resilient applications that gracefully handle worker failures and transient errors.

Overview

Component Purpose
CircuitBreaker Prevents cascading failures by stopping calls to failing services
RetryPolicy Configurable retry with exponential backoff and jitter
HealthMonitor Legacy optional compatibility monitor for crash windows
Executor Legacy optional convenience wrapper for retry/timeout patterns

Snakepit.HealthMonitor and Snakepit.Executor remain available for compatibility, but Snakepit does not call them internally. For new code, prefer direct use of Snakepit.CircuitBreaker, Snakepit.RetryPolicy, worker lifecycle telemetry, and host-managed health policy logic.

When either legacy module is used, Snakepit emits one telemetry event per module per VM: [:snakepit, :deprecated, :module_used].

Circuit Breaker

The Snakepit.CircuitBreaker implements the circuit breaker pattern to prevent cascading failures.

States

State Description
:closed Normal operation. All calls allowed.
:open Threshold exceeded. Calls rejected with {:error, :circuit_open}.
:half_open Testing recovery. Limited probe calls allowed.

Configuration

Option Default Description
:name nil GenServer registration name
:failure_threshold 5 Failures before opening
:reset_timeout_ms 30000 Time before half-open
:half_open_max_calls 1 Probe calls in half-open

API

# Start circuit breaker
{:ok, cb} = Snakepit.CircuitBreaker.start_link(
  name: :my_cb, failure_threshold: 5, reset_timeout_ms: 30_000
)

# Execute through circuit breaker
case Snakepit.CircuitBreaker.call(cb, fn -> risky_operation() end) do
  {:ok, result} -> handle_success(result)
  {:error, :circuit_open} -> handle_circuit_open()
  {:error, reason} -> handle_error(reason)
end

# Check state and stats
state = Snakepit.CircuitBreaker.state(cb)  # => :closed | :open | :half_open
stats = Snakepit.CircuitBreaker.stats(cb)
# => %{state: :closed, failure_count: 2, success_count: 150, failure_threshold: 5}

# Manual reset
Snakepit.CircuitBreaker.reset(cb)

Example

defmodule MyApp.ExternalService do
  alias Snakepit.CircuitBreaker

  def start_link do
    CircuitBreaker.start_link(name: :api_cb, failure_threshold: 5, reset_timeout_ms: 30_000)
  end

  def call_api(params) do
    case CircuitBreaker.call(:api_cb, fn -> do_api_call(params) end) do
      {:ok, result} -> {:ok, result}
      {:error, :circuit_open} -> {:ok, get_cached_result(params)}
      {:error, reason} -> {:error, reason}
    end
  end
end

Retry Policies

The Snakepit.RetryPolicy provides configurable retry behavior with exponential backoff.

RetryPolicy.new/1

Option Default Description
:max_attempts 3 Maximum retry attempts
:backoff_ms [100, 200, 400, 800, 1600] Backoff delays per attempt
:base_backoff_ms 100 Base delay for exponential calculation
:backoff_multiplier 2.0 Multiplier for exponential backoff
:max_backoff_ms 30000 Maximum backoff cap
:jitter false Enable random jitter
:jitter_factor 0.25 Jitter range as fraction of delay
:retriable_errors [:timeout, :unavailable, :connection_refused, :worker_crash] Errors to retry

API

policy = Snakepit.RetryPolicy.new(
  max_attempts: 3, backoff_ms: [100, 200, 400], jitter: true,
  retriable_errors: [:timeout, :unavailable]
)

Snakepit.RetryPolicy.should_retry?(policy, attempt)      # More retries available?
Snakepit.RetryPolicy.retry_for_error?(policy, error)     # Is error retriable?
Snakepit.RetryPolicy.backoff_for_attempt(policy, attempt) # Get delay for attempt

Exponential Backoff with Jitter

With jitter enabled: delay = base_delay +/- (base_delay * jitter_factor)

For 100ms delay with 0.25 jitter: actual delay is 75-125ms. This prevents "thundering herd" problems.

Example

defmodule MyApp.ResilientClient do
  alias Snakepit.RetryPolicy

  def fetch_with_retry(url) do
    policy = RetryPolicy.new(max_attempts: 3, backoff_ms: [100, 200, 400], jitter: true)
    do_fetch(url, policy, 1)
  end

  defp do_fetch(url, policy, attempt) do
    case HTTPoison.get(url) do
      {:ok, response} -> {:ok, response}
      {:error, _} = error ->
        if RetryPolicy.should_retry?(policy, attempt) and RetryPolicy.retry_for_error?(policy, error) do
          Process.sleep(RetryPolicy.backoff_for_attempt(policy, attempt))
          do_fetch(url, policy, attempt + 1)
        else
          error
        end
    end
  end
end

Health Monitoring

The Snakepit.HealthMonitor tracks crashes within a rolling window.

Configuration

Option Default Description
:name required GenServer registration name
:pool required Pool name to monitor
:max_crashes 10 Max crashes before unhealthy
:crash_window_ms 60000 Rolling window duration
:check_interval_ms 30000 Cleanup interval

API

{:ok, hm} = Snakepit.HealthMonitor.start_link(
  name: :pool_health, pool: :default, max_crashes: 10, crash_window_ms: 60_000
)

Snakepit.HealthMonitor.record_crash(hm, "worker_1", %{reason: :segfault})
Snakepit.HealthMonitor.healthy?(hm)  # => true | false

stats = Snakepit.HealthMonitor.stats(hm)
# => %{pool: :default, total_crashes: 15, crashes_in_window: 3, is_healthy: true}

health = Snakepit.HealthMonitor.worker_health(hm, "worker_1")
# => %{healthy: false, crash_count: 5, last_crash_time: 1703836800000}

Example

defmodule MyApp.HealthAwarePool do
  alias Snakepit.HealthMonitor

  def execute_with_health_check(pool_name, fun) do
    hm = :"#{pool_name}_health"
    unless HealthMonitor.healthy?(hm), do: Logger.warning("Pool unhealthy")

    case fun.() do
      {:error, {:worker_crash, info}} = error ->
        HealthMonitor.record_crash(hm, info.worker_id, %{reason: info.reason})
        error
      result -> result
    end
  end
end

Executor Helpers

The Snakepit.Executor provides convenience wrappers.

execute_with_timeout/2

result = Snakepit.Executor.execute_with_timeout(fn -> slow_op() end, timeout_ms: 5000)
# => {:ok, value} | {:error, :timeout}

execute_with_retry/2

result = Snakepit.Executor.execute_with_retry(fn -> api_call() end,
  max_attempts: 3, backoff_ms: [100, 200, 400], jitter: true
)

execute_with_protection/3

Combines retry with circuit breaker for defense in depth.

{:ok, cb} = Snakepit.CircuitBreaker.start_link(failure_threshold: 5)

result = Snakepit.Executor.execute_with_protection(cb, fn -> risky_op() end,
  max_attempts: 3, backoff_ms: [100, 200, 400]
)

execute_batch/2

Executes multiple functions in parallel.

functions = [fn -> fetch(1) end, fn -> fetch(2) end, fn -> fetch(3) end]
results = Snakepit.Executor.execute_batch(functions, timeout_ms: 10_000, max_concurrency: 5)
# => [{:ok, r1}, {:ok, r2}, {:error, :not_found}]

Combined Protection Example

defmodule MyApp.ResilientService do
  alias Snakepit.{CircuitBreaker, Executor}

  def start_link do
    CircuitBreaker.start_link(name: :service_cb, failure_threshold: 5, reset_timeout_ms: 30_000)
  end

  def call_service(params) do
    Executor.execute_with_protection(:service_cb,
      fn -> Snakepit.execute("service", params, timeout: 5000) end,
      max_attempts: 3, backoff_ms: [100, 500, 1000], jitter: true
    )
  end

  def health_status do
    %{
      state: CircuitBreaker.state(:service_cb),
      stats: CircuitBreaker.stats(:service_cb)
    }
  end
end

Telemetry Events

Event Measurements Metadata
[:snakepit, :circuit_breaker, :opened] %{failure_count: n} %{pool: name}
[:snakepit, :circuit_breaker, :closed] %{} %{pool: name}
[:snakepit, :retry, :attempt] %{attempt: n, delay_ms: ms} %{}
[:snakepit, :retry, :exhausted] %{attempts: n} %{last_error: error}
[:snakepit, :worker, :crash] %{} %{pool: name, worker_id: id}