Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions docs/reference/jetstream-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# JetStream Wire API Reference

JetStream exposes administrative and operational APIs through standard NATS subjects. All APIs respond with JSON and follow consistent patterns.

## API Subject Patterns

### General Format
```
$JS.API.<RESOURCE>.<ACTION>[.<PARAMETERS>]
```

### Domain Support
```
$JS.<DOMAIN>.API.<RESOURCE>.<ACTION>[.<PARAMETERS>]
```

## Core API Categories

### Account Information
- `$JS.API.INFO` - Account statistics and limits
- `$JS.API.ACCOUNT.INFO` - Detailed account information

### Stream Management
- `$JS.API.STREAM.CREATE.<stream>` - Create a stream
- `$JS.API.STREAM.UPDATE.<stream>` - Update stream configuration
- `$JS.API.STREAM.DELETE.<stream>` - Delete a stream
- `$JS.API.STREAM.INFO.<stream>` - Get stream information
- `$JS.API.STREAM.LIST` - List all streams
- `$JS.API.STREAM.NAMES` - List stream names only

### Stream Message Operations
- `$JS.API.STREAM.MSG.GET.<stream>` - Get message by sequence
- `$JS.API.STREAM.MSG.DELETE.<stream>` - Delete a message
- `$JS.API.DIRECT.GET.<stream>.<subject>` - Direct message access

### Consumer Management
- `$JS.API.CONSUMER.CREATE.<stream>` - Create ephemeral consumer
- `$JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer>` - Create durable consumer
- `$JS.API.CONSUMER.DELETE.<stream>.<consumer>` - Delete a consumer
- `$JS.API.CONSUMER.INFO.<stream>.<consumer>` - Get consumer info
- `$JS.API.CONSUMER.LIST.<stream>` - List stream consumers
- `$JS.API.CONSUMER.NAMES.<stream>` - List consumer names only

### Message Consumption
- `$JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>` - Pull next message(s)
- `$JS.ACK.<stream>.<consumer>.<delivered>.<stream-seq>.<consumer-seq>.<timestamp>.<pending>` - Acknowledgment subject pattern

## Response Format

All JetStream API responses include a `type` field indicating the JSON schema:

```json
{
"type": "io.nats.jetstream.api.v1.stream_info_response",
"config": { ... },
"state": { ... }
}
```

### Error Responses
```json
{
"type": "io.nats.jetstream.api.v1.error_response",
"error": {
"code": 404,
"err_code": 10059,
"description": "stream not found"
}
}
```

## Flow Control

### Acknowledgments
Messages requiring acknowledgment have reply subjects like:
```
$JS.ACK.<stream>.<consumer>.<delivered>.<stream-seq>.<consumer-seq>.<timestamp>.<pending>
```

### Flow Control Messages
When flow control is enabled, messages may include the header:
```
NATS/1.0 100 FlowControl Request
```
These MUST be replied to, or the consumer may stall.

## Pull Consumer Protocol

### Request Next Messages
```
# Subject: $JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>
# Payload: number of messages to fetch (or empty for 1)
1
```

### Batch Request
```json
{
"batch": 10,
"expires": 30000000000
}
```

## Common Configuration Fields

### Stream Configuration
```json
{
"name": "ORDERS",
"subjects": ["orders.*"],
"retention": "limits",
"max_consumers": -1,
"max_msgs": -1,
"max_bytes": -1,
"max_age": 0,
"max_msg_size": -1,
"storage": "file",
"num_replicas": 1,
"duplicate_window": 120000000000
}
```

### Consumer Configuration
```json
{
"durable_name": "processor",
"deliver_policy": "all",
"ack_policy": "explicit",
"ack_wait": 30000000000,
"max_deliver": -1,
"filter_subject": "",
"replay_policy": "instant",
"max_ack_pending": 1000,
"flow_control": true
}
```

## Authentication and Authorization

JetStream operations respect NATS authentication and authorization:
- Account isolation
- Subject-based permissions
- Resource limits per account
- User/role-based access control

## Implementation Notes

- All durations are in nanoseconds
- Sequence numbers start at 1
- Stream and consumer names are case-sensitive
- API subjects are found as constants in nats-server source code
- Response types enable JSON schema validation
229 changes: 229 additions & 0 deletions docs/reference/jetstream-consumers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
# JetStream Consumers

A Consumer is a stateful view of a Stream that tracks message delivery and acknowledgment. Consumers define how messages are delivered to applications and manage the consumption state independently from message storage.

## Consumer Types

### Pull Consumers (Recommended)
- Client requests message batches on demand
- Better scalability and flow control
- Explicit batch size and timeout control
- Recommended for new projects

### Push Consumers
- Server automatically delivers messages to specified subjects
- Good for simple use cases and load balancing
- Supports flow control and acknowledgments
- Legacy pattern, pull consumers preferred

### Ordered Consumers
- Always ephemeral, no acknowledgments required
- Single-threaded message dispatching
- Automatic flow control and replay on errors
- Ideal for ordered processing requirements

## Consumer Configuration

### Core Settings
```json
{
"durable_name": "processor",
"description": "Order processing consumer",
"deliver_policy": "all",
"opt_start_seq": 0,
"opt_start_time": "2023-01-15T10:30:00Z",
"ack_policy": "explicit",
"ack_wait": 30000000000,
"max_deliver": 5,
"backoff": [1000000000, 5000000000, 10000000000],
"max_ack_pending": 1000,
"replay_policy": "instant"
}
```

### Delivery Policies
- **all**: Deliver all available messages from the stream
- **last**: Deliver only the last message per subject
- **new**: Deliver only messages published after consumer creation
- **by_start_sequence**: Start from specific sequence number
- **by_start_time**: Start from specific timestamp
- **last_per_subject**: Last message for each subject

### Acknowledgment Policies
- **explicit**: Messages must be explicitly acknowledged
- **none**: No acknowledgment required (fire-and-forget)
- **all**: Acknowledging one message acknowledges all prior messages

### Replay Policies
- **instant**: Deliver messages as fast as possible (default)
- **original**: Replay at original publishing rate

## Pull Consumer Protocol

### Fetch Messages
```bash
# Subject: $JS.API.CONSUMER.MSG.NEXT.ORDERS.processor
# Payload: batch size (number) or request object (JSON)
```

#### Simple Batch Request
```
5
```

#### Advanced Request
```json
{
"batch": 10,
"expires": 30000000000,
"no_wait": false,
"max_bytes": 1048576
}
```

### Response Handling
Messages are delivered with standard NATS MSG protocol:
```
MSG orders.created 1 $JS.ACK.ORDERS.processor.1.2.2.1642249800.0 145
{"order_id": "12345", "amount": 99.99}
```

### Message Acknowledgment
```bash
# Reply to the acknowledgment subject from the message
# Empty payload for simple ACK
# JSON payload for advanced ACK options
```

#### Acknowledgment Types
- **+ACK**: Positive acknowledgment (message processed successfully)
- **-NAK**: Negative acknowledgment (redelivery requested)
- **+WPI**: Working/In-Progress (extend ack wait time)
- **+TERM**: Terminate redelivery (poison message handling)

## Push Consumer Configuration

### Push-Specific Settings
```json
{
"deliver_subject": "orders.processed",
"deliver_group": "processors",
"flow_control": true,
"idle_heartbeat": 30000000000,
"headers_only": false,
"max_waiting": 512
}
```

- **deliver_subject**: Subject to deliver messages to
- **deliver_group**: Queue group for load balancing
- **flow_control**: Enable flow control protocol
- **idle_heartbeat**: Send heartbeats when idle (nanoseconds)
- **headers_only**: Deliver only headers, not payload
- **max_waiting**: Maximum outstanding unacknowledged messages

## Consumer State

Consumers maintain detailed state information:

```json
{
"stream_name": "ORDERS",
"name": "processor",
"config": { ... },
"delivered": {
"consumer_seq": 1024,
"stream_seq": 2048,
"last_active": "2023-01-15T14:30:00Z"
},
"ack_floor": {
"consumer_seq": 1020,
"stream_seq": 2044,
"last_active": "2023-01-15T14:29:45Z"
},
"num_ack_pending": 4,
"num_redelivered": 2,
"num_waiting": 0,
"num_pending": 500
}
```

## Consumer Management

### Creating Consumers
```bash
# Ephemeral Consumer
# Subject: $JS.API.CONSUMER.CREATE.ORDERS
# Payload: Consumer configuration (no durable_name)

# Durable Consumer
# Subject: $JS.API.CONSUMER.DURABLE.CREATE.ORDERS.processor
# Payload: Consumer configuration (with durable_name)
```

### Consumer Information
```bash
# Subject: $JS.API.CONSUMER.INFO.ORDERS.processor
# Payload: (empty)
```

### Deleting Consumers
```bash
# Subject: $JS.API.CONSUMER.DELETE.ORDERS.processor
# Payload: (empty)
```

### Listing Consumers
```bash
# All consumer info
# Subject: $JS.API.CONSUMER.LIST.ORDERS

# Names only
# Subject: $JS.API.CONSUMER.NAMES.ORDERS
```

## Flow Control

### For Pull Consumers
- Built-in via request/response pattern
- Client controls message flow explicitly
- No additional flow control protocol needed

### For Push Consumers
When `flow_control: true`:
1. Consumer tracks pending messages
2. Sends flow control messages when approaching limits
3. Client must respond to flow control requests
4. Consumer resumes delivery after receiving response

### Flow Control Headers
```
NATS/1.0 100 FlowControl Request
```
Must be acknowledged by replying to the message.

## Error Handling

### Redelivery
- Failed messages are redelivered based on `max_deliver` setting
- `backoff` array defines increasing delays between attempts
- After max attempts, messages can be handled via:
- Dropping (default)
- Dead letter queue (custom implementation)
- Manual intervention

### Consumer Stalls
- Pull consumers: Handle via request timeouts
- Push consumers: Use heartbeats and flow control
- Monitor `num_waiting` and `last_active` timestamps

## Best Practices

1. **Use pull consumers** for new applications
2. **Set appropriate ack_wait** timeouts
3. **Configure max_deliver** for poison message handling
4. **Use durable consumers** for persistent processing
5. **Monitor consumer lag** via `num_pending`
6. **Handle flow control** messages in push consumers
7. **Design for idempotency** - messages may be redelivered
8. **Use appropriate batch sizes** for pull consumers (10-100 typical)
Loading