Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ spark_locals_without_parens = [
around: 2,
around: 3,
async?: 1,
backoff: 1,
batch_size: 1,
before_all: 1,
collect: 1,
Expand Down
5 changes: 3 additions & 2 deletions documentation/dsls/DSL-Reactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -2305,8 +2305,9 @@ end
|------|------|---------|------|
| [`description`](#reactor-step-description){: #reactor-step-description } | `String.t` | | An optional description for the step. |
| [`run`](#reactor-step-run){: #reactor-step-run } | `(any -> any) \| mfa \| (any, any -> any) \| mfa` | | Provide an anonymous function which implements a `run/1-2` callback. Cannot be provided at the same time as the `impl` argument. |
| [`undo`](#reactor-step-undo){: #reactor-step-undo } | `(any -> any) \| mfa \| (any, any -> any) \| mfa \| (any, any, any -> any) \| mfa` | | Provide an anonymous function which implements a `undo/1-3` callback. Cannot be provided at the same time as the `impl` argument. |
| [`compensate`](#reactor-step-compensate){: #reactor-step-compensate } | `(any -> any) \| mfa \| (any, any -> any) \| mfa \| (any, any, any -> any) \| mfa` | | Provide an anonymous function which implements a `compensate/1-3` callback. Cannot be provided at the same time as the `impl` argument. |
| [`undo`](#reactor-step-undo){: #reactor-step-undo } | `(any -> any) \| mfa \| (any, any -> any) \| mfa \| (any, any, any -> any) \| mfa` | | Provide an anonymous function which implements a `undo/1..3` callback. Cannot be provided at the same time as the `impl` argument. |
| [`compensate`](#reactor-step-compensate){: #reactor-step-compensate } | `(any -> any) \| mfa \| (any, any -> any) \| mfa \| (any, any, any -> any) \| mfa` | | Provide an anonymous function which implements a `compensate/1..3` callback. Cannot be provided at the same time as the `impl` argument. |
| [`backoff`](#reactor-step-backoff){: #reactor-step-backoff } | `(any -> any) \| mfa \| (any, any -> any) \| mfa \| (any, any, any -> any) \| mfa` | | Provide an anonymous function which implements a `backoff/1..3` callback. Cannot be provided at the same time as the `impl` argument. |
| [`max_retries`](#reactor-step-max_retries){: #reactor-step-max_retries } | `:infinity \| non_neg_integer` | `:infinity` | The maximum number of times that the step can be retried before failing. Only used when the result of the `compensate` callback is `:retry`. |
| [`async?`](#reactor-step-async?){: #reactor-step-async? } | `boolean` | `true` | When set to true the step will be executed asynchronously via Reactor's `TaskSupervisor`. |
| [`transform`](#reactor-step-transform){: #reactor-step-transform } | `(any -> any) \| module \| nil` | | An optional transformation function which can be used to modify the entire argument map before it is passed to the step. |
Expand Down
225 changes: 197 additions & 28 deletions documentation/how-to/performance-optimization.md
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,200 @@ end
```


## Backoff Strategies for External Services

### 1. Choosing the Right Backoff Strategy

When dealing with external services, proper backoff strategies can significantly improve both reliability and performance:

```elixir
defmodule SmartAPIStep do
use Reactor.Step

@impl true
def run(args, _context, _options) do
case HTTPoison.get(args.url) do
{:ok, %{status_code: 200} = response} ->
{:ok, response.body}
{:ok, %{status_code: 429}} ->
{:error, %{type: :rate_limit, retry_after: get_retry_after(response)}}
{:ok, %{status_code: 503}} ->
{:error, %{type: :service_unavailable}}
{:error, %{reason: :timeout}} ->
{:error, %{type: :network_timeout}}
other ->
{:error, other}
end
end

@impl true
def compensate(error, _args, _context, _options) do
case error do
%{type: type} when type in [:rate_limit, :service_unavailable, :network_timeout] ->
:retry
_other ->
:ok # Don't retry permanent errors
end
end

@impl true
# Respect server's Retry-After header
def backoff(%{type: :rate_limit, retry_after: seconds}, _args, context, _step) when is_integer(seconds), do: (seconds + 1) * 1000

# No Retry-After header - use conservative fixed delay
def backoff(%{type: :rate_limit}, _args, _context, _step), do: 30_000

# Service temporarily down - exponential backoff with jitter
def backoff(%{type: :service_unavailable}, _args, context, _step) do
retry_count = Map.get(context, :current_try, 0)
base_delay = :math.pow(2, retry_count) * 1000 |> round()
jitter = :rand.uniform(1000) # 0-1000ms jitter
min(base_delay + jitter, 60_000) # Cap at 1 minute
end

def backoff(_, _args, _context, _step), do: :now

defp get_retry_after(response) do
case HTTPoison.Response.get_header(response, "retry-after") do
[value] -> String.to_integer(value)
_ -> nil
end
end
end
```

### 2. Backoff Impact on Throughput

Understanding how backoff affects system throughput is crucial for performance tuning:

```elixir
defmodule BackoffBenchmark do
def compare_backoff_strategies do
# Simulate API that fails 30% of the time
defmodule FlakeyAPI do
use Reactor.Step

@impl true
def run(_args, _context, _options) do
if :rand.uniform() < 0.3 do
{:error, %{type: :transient_failure}}
else
{:ok, "success"}
end
end

@impl true
def compensate(_error, _args, _context, _options), do: :retry
end

# No backoff strategy
defmodule NoBackoffAPI do
use Reactor.Step
defdelegate run(args, context, options), to: FlakeyAPI
defdelegate compensate(error, args, context, options), to: FlakeyAPI

@impl true
def backoff(_error, _args, _context, _step), do: :now
end

# Fixed backoff strategy
defmodule FixedBackoffAPI do
use Reactor.Step
defdelegate run(args, context, options), to: FlakeyAPI
defdelegate compensate(error, args, context, options), to: FlakeyAPI

@impl true
def backoff(_error, _args, _context, _step), do: 100 # 100ms fixed delay
end

data = Enum.to_list(1..100)

Benchee.run(
%{
"no_backoff" => fn ->
reactor = build_reactor(NoBackoffAPI, data)
Reactor.run(reactor, %{items: data}, %{}, max_concurrency: 10)
end,
"fixed_backoff" => fn ->
reactor = build_reactor(FixedBackoffAPI, data)
Reactor.run(reactor, %{items: data}, %{}, max_concurrency: 10)
end
},
warmup: 1,
time: 3
)
end
end
```

### 3. Adaptive Backoff for Complex Scenarios

For sophisticated systems, implement adaptive backoff that responds to system conditions:

```elixir
defmodule AdaptiveBackoffStep do
use Reactor.Step
use GenServer

# Track error rates in a GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

@impl true
def init(state), do: {:ok, %{error_count: 0, success_count: 0, window_start: System.monotonic_time()}}

@impl true
def run(args, _context, _options) do
result = call_external_service(args)

case result do
{:ok, data} ->
GenServer.cast(__MODULE__, :record_success)
{:ok, data}
{:error, reason} ->
GenServer.cast(__MODULE__, :record_error)
{:error, reason}
end
end

@impl true
def compensate(_error, _args, _context, _options), do: :retry

@impl true
def backoff(_error, _args, _context, _step) do
error_rate = GenServer.call(__MODULE__, :get_error_rate)

base_delay = case error_rate do
rate when rate > 0.5 -> 10_000 # High error rate - long delays
rate when rate > 0.2 -> 5_000 # Moderate error rate - medium delays
rate when rate > 0.1 -> 2_000 # Low error rate - short delays
_ -> 500 # Very low error rate - minimal delays
end

# Add jitter to prevent thundering herd
jitter = :rand.uniform(1000)
base_delay + jitter
end

# GenServer callbacks for tracking error rates
@impl true
def handle_cast(:record_success, state) do
{:noreply, %{state | success_count: state.success_count + 1}}
end

def handle_cast(:record_error, state) do
{:noreply, %{state | error_count: state.error_count + 1}}
end

def handle_call(:get_error_rate, _from, state) do
total = state.error_count + state.success_count
rate = if total > 0, do: state.error_count / total, else: 0.0
{:reply, rate, state}
end
end
```

## Performance Benchmarking

### 1. Using Benchee for Reactor Performance Testing
Expand Down Expand Up @@ -754,33 +948,8 @@ end
- Map steps with batch sizes that are too large
- Streams not being processed lazily

**Problem**: External service rate limiting errors
**Solution**: Use compensation with exponential backoff and reduce `max_concurrency`:

```elixir
step :api_call do
run fn args, _context ->
call_external_api(args)
end

compensate fn _args, context ->
# Use current_try for exponential backoff
retry_count = context.current_try
delay_ms = :math.pow(2, retry_count) * 1000 |> round()

Process.sleep(delay_ms)
:retry
end
end

# Also reduce concurrency to respect service limits
{:ok, result} = Reactor.run(
APIReactor,
inputs,
%{},
max_concurrency: 5 # Lower concurrency for rate-limited APIs
)
```
**Problem**: External service rate limiting errors
**Solution**: Use backoff strategies and reduce `max_concurrency`.

**Problem**: Inconsistent performance
**Solution**: Ensure proper resource isolation and monitoring:
Expand Down Expand Up @@ -826,4 +995,4 @@ Optimizing Reactor performance requires understanding your workload characterist
- [Building Async Workflows](../tutorials/03-async-workflows.md) - Understanding Reactor's concurrency model
- [Data Processing Pipelines](data-pipelines.md) - Efficient batch processing patterns
- [Testing Strategies](testing-strategies.md) - Performance testing approaches
- [Debugging Workflows](debugging-workflows.md) - Performance monitoring and profiling
- [Debugging Workflows](debugging-workflows.md) - Performance monitoring and profiling
14 changes: 11 additions & 3 deletions documentation/reference/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,25 @@ This glossary defines key terms, concepts, and technical vocabulary used through

## Error Handling Concepts

**Backoff** - Delay mechanism that adds intelligent waiting periods between retry attempts to prevent overwhelming external services and improve system stability. Backoff delays are minimum delays - actual retry timing may be longer as the executor prioritises processing ready steps before checking for expired backoffs.

**Backoff Callback** - Optional `c:Reactor.Step.backoff/4` callback that determines retry delay based on arguments, context, step metadata, and error reason.

**Compensation Logic** - Step-level error handling that decides whether to retry, continue with alternative values, or fail.

**Error Classification** - Categorisation of errors using splode library into Invalid, Internal, Unknown, and Validation types.

**Exponential Backoff** - Retry strategy where delay increases exponentially with each retry attempt.
**Exponential Backoff** - Retry strategy where delay increases exponentially with each retry attempt (1s, 2s, 4s, 8s...), commonly used for network issues and service overload.

**Fixed Backoff** - Retry strategy using consistent delays between attempts, often used for rate limiting when reset intervals are known.

**Jittered Backoff** - Backoff strategy that adds randomness to delays to prevent thundering herd problems when multiple clients retry simultaneously.

**Max Retries** - Configuration limiting how many times a step can be retried through compensation.

**Retry Logic** - Automatic re-execution of failed steps when compensation returns `:retry`.
**Retry Logic** - Automatic re-execution of failed steps when compensation returns `:retry`, now enhanced with optional backoff delays.

**Three-Tier Error Handling** - Reactor's approach using compensation (retry), undo (rollback), and global rollback levels.
**Three-Tier Error Handling** - Reactor's approach using compensation (retry), backoff (delay), undo (rollback), and global rollback levels.

**Undo Stack** - Last-in-first-out collection of successfully completed undoable steps for rollback purposes.

Expand Down
Loading