diff --git a/docs/reference/jetstream-api.md b/docs/reference/jetstream-api.md new file mode 100644 index 0000000..c13df3d --- /dev/null +++ b/docs/reference/jetstream-api.md @@ -0,0 +1,152 @@ +# JetStream Wire API Reference + +JetStream exposes administrative and operational APIs through standard NATS subjects. All APIs respond with JSON and follow consistent patterns. + +## API Subject Patterns + +### General Format +``` +$JS.API..[.] +``` + +### Domain Support +``` +$JS..API..[.] +``` + +## Core API Categories + +### Account Information +- `$JS.API.INFO` - Account statistics and limits +- `$JS.API.ACCOUNT.INFO` - Detailed account information + +### Stream Management +- `$JS.API.STREAM.CREATE.` - Create a stream +- `$JS.API.STREAM.UPDATE.` - Update stream configuration +- `$JS.API.STREAM.DELETE.` - Delete a stream +- `$JS.API.STREAM.INFO.` - Get stream information +- `$JS.API.STREAM.LIST` - List all streams +- `$JS.API.STREAM.NAMES` - List stream names only + +### Stream Message Operations +- `$JS.API.STREAM.MSG.GET.` - Get message by sequence +- `$JS.API.STREAM.MSG.DELETE.` - Delete a message +- `$JS.API.DIRECT.GET..` - Direct message access + +### Consumer Management +- `$JS.API.CONSUMER.CREATE.` - Create ephemeral consumer +- `$JS.API.CONSUMER.DURABLE.CREATE..` - Create durable consumer +- `$JS.API.CONSUMER.DELETE..` - Delete a consumer +- `$JS.API.CONSUMER.INFO..` - Get consumer info +- `$JS.API.CONSUMER.LIST.` - List stream consumers +- `$JS.API.CONSUMER.NAMES.` - List consumer names only + +### Message Consumption +- `$JS.API.CONSUMER.MSG.NEXT..` - Pull next message(s) +- `$JS.ACK.......` - Acknowledgment subject pattern + +## Response Format + +All JetStream API responses include a `type` field indicating the JSON schema: + +```json +{ + "type": "io.nats.jetstream.api.v1.stream_info_response", + "config": { ... }, + "state": { ... } +} +``` + +### Error Responses +```json +{ + "type": "io.nats.jetstream.api.v1.error_response", + "error": { + "code": 404, + "err_code": 10059, + "description": "stream not found" + } +} +``` + +## Flow Control + +### Acknowledgments +Messages requiring acknowledgment have reply subjects like: +``` +$JS.ACK....... +``` + +### Flow Control Messages +When flow control is enabled, messages may include the header: +``` +NATS/1.0 100 FlowControl Request +``` +These MUST be replied to, or the consumer may stall. + +## Pull Consumer Protocol + +### Request Next Messages +``` +# Subject: $JS.API.CONSUMER.MSG.NEXT.. +# Payload: number of messages to fetch (or empty for 1) +1 +``` + +### Batch Request +```json +{ + "batch": 10, + "expires": 30000000000 +} +``` + +## Common Configuration Fields + +### Stream Configuration +```json +{ + "name": "ORDERS", + "subjects": ["orders.*"], + "retention": "limits", + "max_consumers": -1, + "max_msgs": -1, + "max_bytes": -1, + "max_age": 0, + "max_msg_size": -1, + "storage": "file", + "num_replicas": 1, + "duplicate_window": 120000000000 +} +``` + +### Consumer Configuration +```json +{ + "durable_name": "processor", + "deliver_policy": "all", + "ack_policy": "explicit", + "ack_wait": 30000000000, + "max_deliver": -1, + "filter_subject": "", + "replay_policy": "instant", + "max_ack_pending": 1000, + "flow_control": true +} +``` + +## Authentication and Authorization + +JetStream operations respect NATS authentication and authorization: +- Account isolation +- Subject-based permissions +- Resource limits per account +- User/role-based access control + +## Implementation Notes + +- All durations are in nanoseconds +- Sequence numbers start at 1 +- Stream and consumer names are case-sensitive +- API subjects are found as constants in nats-server source code +- Response types enable JSON schema validation \ No newline at end of file diff --git a/docs/reference/jetstream-consumers.md b/docs/reference/jetstream-consumers.md new file mode 100644 index 0000000..f44b0c1 --- /dev/null +++ b/docs/reference/jetstream-consumers.md @@ -0,0 +1,229 @@ +# JetStream Consumers + +A Consumer is a stateful view of a Stream that tracks message delivery and acknowledgment. Consumers define how messages are delivered to applications and manage the consumption state independently from message storage. + +## Consumer Types + +### Pull Consumers (Recommended) +- Client requests message batches on demand +- Better scalability and flow control +- Explicit batch size and timeout control +- Recommended for new projects + +### Push Consumers +- Server automatically delivers messages to specified subjects +- Good for simple use cases and load balancing +- Supports flow control and acknowledgments +- Legacy pattern, pull consumers preferred + +### Ordered Consumers +- Always ephemeral, no acknowledgments required +- Single-threaded message dispatching +- Automatic flow control and replay on errors +- Ideal for ordered processing requirements + +## Consumer Configuration + +### Core Settings +```json +{ + "durable_name": "processor", + "description": "Order processing consumer", + "deliver_policy": "all", + "opt_start_seq": 0, + "opt_start_time": "2023-01-15T10:30:00Z", + "ack_policy": "explicit", + "ack_wait": 30000000000, + "max_deliver": 5, + "backoff": [1000000000, 5000000000, 10000000000], + "max_ack_pending": 1000, + "replay_policy": "instant" +} +``` + +### Delivery Policies +- **all**: Deliver all available messages from the stream +- **last**: Deliver only the last message per subject +- **new**: Deliver only messages published after consumer creation +- **by_start_sequence**: Start from specific sequence number +- **by_start_time**: Start from specific timestamp +- **last_per_subject**: Last message for each subject + +### Acknowledgment Policies +- **explicit**: Messages must be explicitly acknowledged +- **none**: No acknowledgment required (fire-and-forget) +- **all**: Acknowledging one message acknowledges all prior messages + +### Replay Policies +- **instant**: Deliver messages as fast as possible (default) +- **original**: Replay at original publishing rate + +## Pull Consumer Protocol + +### Fetch Messages +```bash +# Subject: $JS.API.CONSUMER.MSG.NEXT.ORDERS.processor +# Payload: batch size (number) or request object (JSON) +``` + +#### Simple Batch Request +``` +5 +``` + +#### Advanced Request +```json +{ + "batch": 10, + "expires": 30000000000, + "no_wait": false, + "max_bytes": 1048576 +} +``` + +### Response Handling +Messages are delivered with standard NATS MSG protocol: +``` +MSG orders.created 1 $JS.ACK.ORDERS.processor.1.2.2.1642249800.0 145 +{"order_id": "12345", "amount": 99.99} +``` + +### Message Acknowledgment +```bash +# Reply to the acknowledgment subject from the message +# Empty payload for simple ACK +# JSON payload for advanced ACK options +``` + +#### Acknowledgment Types +- **+ACK**: Positive acknowledgment (message processed successfully) +- **-NAK**: Negative acknowledgment (redelivery requested) +- **+WPI**: Working/In-Progress (extend ack wait time) +- **+TERM**: Terminate redelivery (poison message handling) + +## Push Consumer Configuration + +### Push-Specific Settings +```json +{ + "deliver_subject": "orders.processed", + "deliver_group": "processors", + "flow_control": true, + "idle_heartbeat": 30000000000, + "headers_only": false, + "max_waiting": 512 +} +``` + +- **deliver_subject**: Subject to deliver messages to +- **deliver_group**: Queue group for load balancing +- **flow_control**: Enable flow control protocol +- **idle_heartbeat**: Send heartbeats when idle (nanoseconds) +- **headers_only**: Deliver only headers, not payload +- **max_waiting**: Maximum outstanding unacknowledged messages + +## Consumer State + +Consumers maintain detailed state information: + +```json +{ + "stream_name": "ORDERS", + "name": "processor", + "config": { ... }, + "delivered": { + "consumer_seq": 1024, + "stream_seq": 2048, + "last_active": "2023-01-15T14:30:00Z" + }, + "ack_floor": { + "consumer_seq": 1020, + "stream_seq": 2044, + "last_active": "2023-01-15T14:29:45Z" + }, + "num_ack_pending": 4, + "num_redelivered": 2, + "num_waiting": 0, + "num_pending": 500 +} +``` + +## Consumer Management + +### Creating Consumers +```bash +# Ephemeral Consumer +# Subject: $JS.API.CONSUMER.CREATE.ORDERS +# Payload: Consumer configuration (no durable_name) + +# Durable Consumer +# Subject: $JS.API.CONSUMER.DURABLE.CREATE.ORDERS.processor +# Payload: Consumer configuration (with durable_name) +``` + +### Consumer Information +```bash +# Subject: $JS.API.CONSUMER.INFO.ORDERS.processor +# Payload: (empty) +``` + +### Deleting Consumers +```bash +# Subject: $JS.API.CONSUMER.DELETE.ORDERS.processor +# Payload: (empty) +``` + +### Listing Consumers +```bash +# All consumer info +# Subject: $JS.API.CONSUMER.LIST.ORDERS + +# Names only +# Subject: $JS.API.CONSUMER.NAMES.ORDERS +``` + +## Flow Control + +### For Pull Consumers +- Built-in via request/response pattern +- Client controls message flow explicitly +- No additional flow control protocol needed + +### For Push Consumers +When `flow_control: true`: +1. Consumer tracks pending messages +2. Sends flow control messages when approaching limits +3. Client must respond to flow control requests +4. Consumer resumes delivery after receiving response + +### Flow Control Headers +``` +NATS/1.0 100 FlowControl Request +``` +Must be acknowledged by replying to the message. + +## Error Handling + +### Redelivery +- Failed messages are redelivered based on `max_deliver` setting +- `backoff` array defines increasing delays between attempts +- After max attempts, messages can be handled via: + - Dropping (default) + - Dead letter queue (custom implementation) + - Manual intervention + +### Consumer Stalls +- Pull consumers: Handle via request timeouts +- Push consumers: Use heartbeats and flow control +- Monitor `num_waiting` and `last_active` timestamps + +## Best Practices + +1. **Use pull consumers** for new applications +2. **Set appropriate ack_wait** timeouts +3. **Configure max_deliver** for poison message handling +4. **Use durable consumers** for persistent processing +5. **Monitor consumer lag** via `num_pending` +6. **Handle flow control** messages in push consumers +7. **Design for idempotency** - messages may be redelivered +8. **Use appropriate batch sizes** for pull consumers (10-100 typical) \ No newline at end of file diff --git a/docs/reference/jetstream-kv.md b/docs/reference/jetstream-kv.md new file mode 100644 index 0000000..3e8c5f2 --- /dev/null +++ b/docs/reference/jetstream-kv.md @@ -0,0 +1,275 @@ +# JetStream Key/Value Store + +The JetStream Key/Value (KV) Store provides an immediately consistent, persistent key-value storage system built on top of JetStream streams. It offers a familiar map-like interface with NATS-native features like clustering, replication, and security. + +## Architecture + +### Implementation Details +- Each KV bucket is backed by a dedicated JetStream stream +- Stream names use prefix: `KV_` +- Message subjects follow pattern: `$KV..` +- Key operations are stored as stream messages +- Values are message payloads, metadata in headers + +### Consistency Model +- **Immediately consistent** within a single NATS server +- **Eventually consistent** across cluster replicas +- **No read-after-write guarantee** - reads may see different values across replicas +- **Monotonic writes** - values don't go backward in time + +## Key Constraints + +### Valid Key Format +Keys must match regex: `\A[-/_=\.a-zA-Z0-9]+\z` + +**Allowed characters:** +- Letters: `a-z`, `A-Z` +- Numbers: `0-9` +- Special: `-`, `/`, `_`, `=`, `.` + +**Restrictions:** +- Cannot start or end with `.` +- Keys starting with `_kv` are reserved for internal use +- Case sensitive + +### Bucket Names +Bucket names must match regex: `\A[a-zA-Z0-9_-]+\z` +- Alphanumeric characters only +- Underscores and hyphens allowed +- No dots or special characters + +## Bucket Configuration + +### Basic Configuration +```json +{ + "bucket": "user-sessions", + "description": "User session data", + "max_value_size": 1048576, + "history": 5, + "ttl": 3600000000000, + "max_bucket_size": -1, + "storage": "file", + "num_replicas": 1 +} +``` + +### Advanced Configuration +```json +{ + "bucket": "config-data", + "compression": "s2", + "metadata": { + "owner": "platform-team", + "environment": "production" + }, + "republish": { + "src": ">", + "dest": "kv.config.{{wildcard(1)}}" + }, + "placement": { + "cluster": "us-west", + "tags": ["ssd"] + } +} +``` + +## Key Operations + +### Put Operation +Store a value for a key: +```bash +# Subject: $KV.BUCKET.key +# Payload: value data +# Headers: optional metadata +``` + +#### Put with Revision Check +```json +{ + "operation": "PUT", + "key": "user:12345", + "value": "base64-encoded-data", + "revision": 5 +} +``` + +### Get Operation +Retrieve current value for a key: +```bash +# Subject: $KV.BUCKET.key +# Use NATS request/reply pattern +``` + +#### Get Response +```json +{ + "bucket": "user-sessions", + "key": "user:12345", + "value": "base64-encoded-data", + "revision": 6, + "created": "2023-01-15T10:30:00Z", + "delta": 0 +} +``` + +### Create Operation +Add key only if it doesn't exist: +```json +{ + "operation": "CREATE", + "key": "new-key", + "value": "initial-value" +} +``` + +### Update Operation +Modify key only if revision matches: +```json +{ + "operation": "UPDATE", + "key": "existing-key", + "value": "new-value", + "revision": 3 +} +``` + +### Delete Operation +Mark key as deleted (preserves history): +```bash +# Subject: $KV.BUCKET.key +# Payload: (empty) +# Headers: KV-Operation: DEL +``` + +### Purge Operation +Remove all history for a key: +```bash +# Subject: $KV.BUCKET.key +# Payload: (empty) +# Headers: KV-Operation: PURGE +``` + +## Advanced Operations + +### Watch Operations +Monitor changes to keys in real-time: + +#### Watch Single Key +```bash +# Subject: $KV.BUCKET.specific-key +# Subscribe for real-time updates +``` + +#### Watch All Keys +```bash +# Subject: $KV.BUCKET.> +# Subscribe to all key changes in bucket +``` + +#### Watch with Filters +```bash +# Subject: $KV.BUCKET.users.* +# Watch all keys matching pattern +``` + +### History Operations +Retrieve historical values for keys: + +```json +{ + "operation": "HISTORY", + "key": "user:12345", + "include_deleted": false +} +``` + +### List Keys +Enumerate all keys in bucket: +```json +{ + "operation": "LIST", + "filter": "users.*" +} +``` + +## Bucket Management + +### Create Bucket +```bash +# Subject: $JS.API.STREAM.CREATE.KV_BUCKET_NAME +# Payload: Stream configuration with KV-specific settings +``` + +### Get Bucket Status +```bash +# Subject: $JS.API.STREAM.INFO.KV_BUCKET_NAME +# Payload: (empty) +``` + +### Delete Bucket +```bash +# Subject: $JS.API.STREAM.DELETE.KV_BUCKET_NAME +# Payload: (empty) +``` + +## Message Format + +### KV Message Headers +``` +NATS/1.0 +KV-Operation: PUT +KV-Revision: 3 +KV-Created: 2023-01-15T10:30:00.123456789Z +``` + +### Special Operations +- `KV-Operation: DEL` - Soft delete (preserves history) +- `KV-Operation: PURGE` - Hard delete (removes history) +- `KV-Revision` - Used for optimistic concurrency control + +## Error Handling + +### Common Error Codes +- **10037**: Key not found +- **10071**: Wrong last sequence (revision mismatch) +- **10048**: Key already exists (on CREATE operation) + +### Error Response Format +```json +{ + "type": "io.nats.jetstream.api.v1.error_response", + "error": { + "code": 404, + "err_code": 10037, + "description": "key not found" + } +} +``` + +## Performance Considerations + +### Optimization Tips +1. **Batch operations** when possible +2. **Use appropriate history** limits (default: 64) +3. **Enable compression** for large values +4. **Monitor bucket size** and set limits +5. **Use TTL** for automatic cleanup +6. **Choose storage backend** based on performance needs + +### Monitoring +- Track bucket size and key count +- Monitor operation latencies +- Watch for revision conflicts +- Alert on storage limits + +## Best Practices + +1. **Design key hierarchies** for efficient watching and filtering +2. **Use meaningful key names** with consistent naming conventions +3. **Handle revision conflicts** gracefully in concurrent scenarios +4. **Set appropriate TTLs** for automatic cleanup +5. **Use compression** for large values to save space +6. **Implement retry logic** for temporary failures +7. **Monitor bucket usage** and set appropriate limits +8. **Choose replication level** based on availability requirements \ No newline at end of file diff --git a/docs/reference/jetstream-objectstore.md b/docs/reference/jetstream-objectstore.md new file mode 100644 index 0000000..97dd86c --- /dev/null +++ b/docs/reference/jetstream-objectstore.md @@ -0,0 +1,273 @@ +# JetStream Object Store + +The JetStream Object Store provides scalable storage for large objects by automatically chunking them into smaller pieces stored across JetStream streams. It supports metadata management, object versioning, and integrates with NATS security and clustering features. + +## Architecture + +### Design Principles +- Objects stored as chunked messages in JetStream streams +- Each object store backed by a dedicated stream +- Object metadata stored separately from object data +- Configurable chunk size for optimal performance +- Built on JetStream's persistence and replication + +### Storage Structure +- **Object Info**: JSON metadata stored in stream messages +- **Object Chunks**: Binary data stored as message payloads +- **Chunk Subjects**: Follow pattern for ordered retrieval +- **Stream Names**: Use predictable naming conventions + +## Object Metadata + +### Object Information Structure +```json +{ + "name": "document.pdf", + "description": "Project specification document", + "headers": { + "Content-Type": "application/pdf", + "Author": "platform-team" + }, + "options": { + "max_chunk_size": 131072, + "ttl": 86400000000000, + "compression": "s2" + } +} +``` + +### Generated Fields +```json +{ + "nuid": "ABCDEF123456789", + "size": 2048576, + "chunks": 16, + "digest": "SHA-256:a1b2c3d4...", + "deleted": false, + "revision": 3, + "created": "2023-01-15T10:30:00Z", + "modified": "2023-01-15T14:30:00Z" +} +``` + +## Object Operations + +### Put Object +Store a new object: +```json +{ + "name": "report.json", + "description": "Monthly analytics report", + "data": "base64-encoded-object-data", + "headers": { + "Content-Type": "application/json", + "Generated": "2023-01-15T10:30:00Z" + }, + "options": { + "max_chunk_size": 65536 + } +} +``` + +### Get Object +Retrieve complete object: +```json +{ + "name": "report.json" +} +``` + +Response includes reconstructed object data and metadata. + +### Get Object Info +Retrieve only metadata without object data: +```json +{ + "name": "report.json", + "info_only": true +} +``` + +### Update Metadata +Modify object metadata without changing data: +```json +{ + "name": "report.json", + "description": "Updated monthly analytics report", + "headers": { + "Content-Type": "application/json", + "Modified": "2023-01-15T15:00:00Z" + } +} +``` + +### Delete Object +Mark object as deleted: +```json +{ + "name": "report.json", + "purge": false +} +``` + +### List Objects +Enumerate objects in store: +```json +{ + "filter": "*.json", + "include_deleted": false +} +``` + +## Object Store Configuration + +### Basic Configuration +```json +{ + "bucket": "documents", + "description": "Project document storage", + "max_object_size": 10485760, + "storage": "file", + "num_replicas": 1, + "ttl": 0 +} +``` + +### Advanced Configuration +```json +{ + "bucket": "media-assets", + "description": "Media file storage", + "max_object_size": 104857600, + "storage": "file", + "num_replicas": 3, + "compression": "s2", + "placement": { + "cluster": "storage-tier", + "tags": ["high-capacity", "redundant"] + }, + "metadata": { + "department": "media", + "retention_policy": "7-years" + } +} +``` + +## Chunking Strategy + +### Default Chunk Size +- **Standard**: 128KB chunks +- **Configurable** per object store or individual objects +- **Performance trade-offs**: Larger chunks = fewer messages, smaller chunks = more parallel processing + +### Chunk Organization +- Chunks stored as sequential stream messages +- Chunk order preserved via message sequence +- Final chunk may be smaller than configured size +- Chunk subjects enable efficient retrieval + +### Chunk Subjects Pattern +``` +$OBJ...C. +``` + +## API Operations + +### Object Store Management +- Create object store: Configure backing stream +- Delete object store: Remove stream and all objects +- Get store status: Stream information and statistics +- List object stores: Enumerate available stores + +### Object Management +- Put: Store new object or update existing +- Get: Retrieve complete object data +- GetInfo: Retrieve metadata only +- Delete: Mark object as deleted +- List: Enumerate objects with filtering +- Watch: Monitor store changes +- UpdateMeta: Modify object metadata + +## Advanced Features + +### Compression +- **S2 compression**: Automatic compression of object chunks +- **Per-object**: Configurable compression settings +- **Storage efficiency**: Reduces storage requirements for text/JSON objects + +### Linking +- **Object linking**: Reference relationships between objects +- **Bucket linking**: Cross-store references +- **Metadata links**: Store references in object headers + +### Versioning +- **Revision tracking**: Automatic version numbers +- **Historical access**: Retrieve previous object versions +- **Version metadata**: Track changes and timestamps + +### Security +- **Subject-based ACLs**: Control access to objects +- **Account isolation**: Objects scoped to NATS accounts +- **Encryption**: Application-level encryption supported +- **Audit trails**: Track object access and modifications + +## Error Handling + +### Common Error Codes +- **10404**: Object not found +- **10413**: Object too large +- **10409**: Object already exists +- **10507**: Storage limit exceeded + +### Error Response Format +```json +{ + "error": { + "code": 404, + "err_code": 10404, + "description": "object not found" + } +} +``` + +## Performance Optimization + +### Chunk Size Tuning +- **Small objects** (< 128KB): Single chunk optimal +- **Large objects** (> 10MB): Consider larger chunks (512KB-1MB) +- **Network considerations**: Balance chunk size with network latency +- **Storage considerations**: Align with storage backend block sizes + +### Concurrent Operations +- **Parallel chunk retrieval**: Reconstruct objects faster +- **Streaming**: Stream chunks during reconstruction +- **Caching**: Cache frequently accessed objects +- **Compression**: Enable for text-based objects + +## Monitoring and Observability + +### Key Metrics +- Object count and total storage usage +- Average object size and chunk count +- Put/Get operation rates and latencies +- Storage efficiency (compression ratios) +- Error rates by operation type + +### Stream Statistics +- Message count (total chunks + metadata) +- Storage bytes used +- Consumer activity +- Replication status across replicas + +## Best Practices + +1. **Choose appropriate chunk sizes** based on object types and network characteristics +2. **Enable compression** for text-based objects to save storage +3. **Use meaningful object names** with consistent naming conventions +4. **Set appropriate TTLs** for automatic cleanup of temporary objects +5. **Monitor storage usage** and set limits to prevent runaway growth +6. **Use replication** for critical objects requiring high availability +7. **Implement retry logic** for temporary failures during chunked operations +8. **Consider object lifecycle** - use metadata to track object stages +9. **Leverage NATS security** features for access control and audit trails +10. **Test with realistic object sizes** to validate performance characteristics \ No newline at end of file diff --git a/docs/reference/jetstream-overview.md b/docs/reference/jetstream-overview.md new file mode 100644 index 0000000..f3fb688 --- /dev/null +++ b/docs/reference/jetstream-overview.md @@ -0,0 +1,68 @@ +# JetStream Overview + +JetStream is a distributed streaming platform built on top of NATS Core that provides: + +- **Persistent message storage** with configurable retention policies +- **At-least-once delivery guarantees** with optional exactly-once semantics +- **Horizontal scalability** with clustering and replication +- **Stream and consumer abstractions** for flexible message processing +- **Built-in Key/Value and Object stores** for data persistence + +## Core Architecture + +JetStream separates **storage** (Streams) from **consumption** (Consumers): + +- **Stream**: Defines how messages are stored, retained, and replicated +- **Consumer**: Defines how messages are delivered and acknowledged + +This separation allows multiple consumers with different processing requirements to consume from the same stream independently. + +## Quality of Service + +Unlike Core NATS which provides at-most-once delivery, JetStream provides: + +- **At-least-once delivery** by default through message acknowledgments +- **Exactly-once delivery** when using message deduplication features +- **Persistent storage** survives server restarts and failures + +## Key Features + +### Streams +- Configurable retention: by limits, interest, or workqueue policy +- Storage backends: memory or file-based +- Replication: 1-5 replicas across cluster nodes +- Compression: S2 compression support +- Placement: geographical/resource constraints + +### Consumers +- **Push consumers**: Server pushes messages to client subjects +- **Pull consumers**: Client requests message batches (recommended) +- **Ordered consumers**: Single-threaded with automatic flow control +- Acknowledgment policies: explicit, none, or all +- Delivery policies: all, last, new, by sequence/time + +### Advanced Features +- Subject transformation and filtering +- Message deduplication with configurable time windows +- Direct message access without consumers +- Account-scoped isolation and resource limits +- Flow control and back-pressure handling + +## Wire Protocol + +JetStream uses NATS Core's text-based protocol with: + +- **JSON-based APIs** on subjects like `$JS.API.*` +- **Standardized responses** with type indicators for JSON schema validation +- **Domain support** for multi-tenant deployments +- **Advisory messages** for system events and monitoring + +## Getting Started + +1. **Enable JetStream** on your NATS server +2. **Create a Stream** to define message storage +3. **Create a Consumer** to define message processing +4. **Publish messages** to stream subjects +5. **Consume messages** using pull or push patterns + +For detailed protocol specifications, see the individual reference documents in this directory. \ No newline at end of file diff --git a/docs/reference/jetstream-streams.md b/docs/reference/jetstream-streams.md new file mode 100644 index 0000000..b8f9e32 --- /dev/null +++ b/docs/reference/jetstream-streams.md @@ -0,0 +1,172 @@ +# JetStream Streams + +A Stream defines how messages are stored, retained, and replicated in JetStream. Streams are the fundamental storage abstraction that capture and persist messages published to configured subjects. + +## Stream Configuration + +### Required Fields +- **name**: Stream name (alphanumeric, dashes, underscores) +- **subjects**: List of subjects to capture (supports wildcards) + +### Retention Policies +- **limits**: Retain until limits are reached (default) +- **interest**: Retain while consumers exist +- **workqueue**: Retain until acknowledged by any consumer + +### Storage Configuration +- **storage**: `"file"` (persistent) or `"memory"` (fast, volatile) +- **max_msgs**: Maximum number of messages (-1 = unlimited) +- **max_bytes**: Maximum storage bytes (-1 = unlimited) +- **max_age**: Maximum message age in nanoseconds (0 = unlimited) +- **max_msg_size**: Maximum individual message size (-1 = unlimited) + +### Replication and Placement +- **num_replicas**: Number of replicas (1-5) +- **placement**: Cluster placement constraints + ```json + { + "cluster": "us-east", + "tags": ["ssd", "fast"] + } + ``` + +### Advanced Features +- **duplicate_window**: Deduplication time window (nanoseconds) +- **compression**: `"s2"` or `"none"` +- **sealed**: Make stream immutable after creation +- **deny_delete**: Prevent message deletion +- **deny_purge**: Prevent stream purging +- **allow_rollup_hdrs**: Enable message rollup headers + +### Mirroring and Sourcing +```json +{ + "mirror": { + "name": "source-stream", + "opt_start_seq": 1000, + "filter_subject": "orders.us.*" + }, + "sources": [ + { + "name": "stream1", + "filter_subject": "region.east.*" + }, + { + "name": "stream2", + "filter_subject": "region.west.*" + } + ] +} +``` + +### Subject Transformation +Transform subjects when storing messages: +```json +{ + "subject_transform": { + "src": "orders.>", + "dest": "transformed.orders.{{wildcard(1)}}" + } +} +``` + +## Stream State + +Streams maintain operational state information: + +```json +{ + "messages": 150432, + "bytes": 248832000, + "first_seq": 1, + "first_ts": "2023-01-15T10:30:00Z", + "last_seq": 150432, + "last_ts": "2023-01-15T14:30:00Z", + "consumer_count": 3, + "deleted": [1001, 1002, 1150] +} +``` + +### State Fields +- **messages**: Total message count +- **bytes**: Total storage bytes used +- **first_seq/last_seq**: Message sequence range +- **first_ts/last_ts**: Timestamp range (RFC3339) +- **consumer_count**: Number of consumers +- **deleted**: List of deleted message sequences + +## Stream Operations + +### Creating a Stream +```bash +# Subject: $JS.API.STREAM.CREATE.ORDERS +# Payload: Stream configuration JSON +``` + +### Updating a Stream +```bash +# Subject: $JS.API.STREAM.UPDATE.ORDERS +# Payload: Updated stream configuration +``` + +### Getting Stream Info +```bash +# Subject: $JS.API.STREAM.INFO.ORDERS +# Payload: (optional) deleted_details and subjects_filter flags +``` + +### Deleting a Stream +```bash +# Subject: $JS.API.STREAM.DELETE.ORDERS +# Payload: (empty) +``` + +### Purging Messages +```bash +# Subject: $JS.API.STREAM.PURGE.ORDERS +# Payload: (optional) purge filters +``` + +## Message Management + +### Direct Message Access +```bash +# Subject: $JS.API.DIRECT.GET.ORDERS.{subject} +# Gets last message on subject from stream +``` + +### Get Message by Sequence +```bash +# Subject: $JS.API.STREAM.MSG.GET.ORDERS +# Payload: {"seq": 12345} +``` + +### Delete Message +```bash +# Subject: $JS.API.STREAM.MSG.DELETE.ORDERS +# Payload: {"seq": 12345, "no_erase": false} +``` + +## Subject Wildcards + +Streams support NATS wildcards in subject configurations: +- `*` matches single token: `orders.*.created` +- `>` matches multiple tokens: `orders.>` + +## Stream Limits and Quotas + +Streams respect account-level limits: +- Maximum number of streams per account +- Maximum storage per account +- Maximum message size +- Maximum consumer connections + +## Best Practices + +1. **Use meaningful names** - Stream names should reflect their purpose +2. **Configure retention** - Set appropriate limits for your use case +3. **Plan for growth** - Consider message rates and storage requirements +4. **Use replication** - For high availability requirements +5. **Monitor state** - Track message counts and storage usage +6. **Subject design** - Design subject hierarchies for efficient filtering +7. **Compression** - Use S2 compression for large messages when appropriate \ No newline at end of file