Skip to content

Latest commit

 

History

History
454 lines (317 loc) · 17 KB

File metadata and controls

454 lines (317 loc) · 17 KB

Overview

$mq9.AI.* is the protocol designed by mq9 for asynchronous Agent communication. The core problem it solves: asynchronous communication between Agents where the sender and receiver do not need to be online at the same time.

mq9 only handles the communication problem — how messages are reliably delivered. Message content is a byte array; mq9 does not parse, validate, or restrict it. Whether the upper layer uses A2A, MCP, or another protocol is none of mq9's concern.


mail_address Format Specification

Character set: lowercase letters (a-z), digits (0-9), dots (.)

Length: 1 to 128 characters

Case: strictly lowercase; a mail_address containing uppercase characters will be rejected by the broker

Position rules: . may only appear in the middle — the first and last characters must be a lowercase letter or digit; consecutive . are not allowed

Semantics: mail_address is an opaque string; . does not participate in protocol routing or matching and serves only as a visual grouping aid

Encoding: URL percent-encoding is not allowed

Valid examples Invalid examples
task.001 task-001 (contains hyphen)
agent.inbox task_001 (contains underscore)
analytics.result Task.001 (contains uppercase)
acme.org.task.queue .task.001 (leading dot)
session.20260502 task.001. (trailing dot)
task..001 (consecutive dots)

Full address examples:

task.001
agent.inbox
analytics.result
acme.task.queue
session.20260502
order.processing.urgent
agent.001.inbox

Core Concepts

mail_address: The communication address defined by the user when creating a mailbox via MAILBOX.CREATE. It is not bound to an Agent identity — a single Agent may request different mail_addresses for different tasks. No cleanup is needed after use; TTL handles automatic expiry.

mail_address unguessability is the security boundary. Knowing a mail_address is sufficient to send messages or subscribe. Without the mail_address there is nothing to operate on. No tokens, no ACLs.

TTL: Declared at mailbox creation time; the mailbox is automatically destroyed on expiry and its messages are cleaned up along with it. Creating a mailbox with a duplicate name returns an error (mailbox xxx already exists); you can use QUERY to check whether a mailbox exists before creating it.

priority: Optional. If not specified, the default is normal priority; specifying urgent or critical raises the processing order. Same-priority messages follow FIFO; across priorities, higher priority is processed first. The storage layer guarantees ordering — consumers do not need to sort themselves.

msg_id: The unique identifier of each message (its offset in storage), used by clients for deduplication and deletion operations.

Consumption semantics: FETCH uses pull mode and supports both stateful consumption (pass group_name and the broker records the offset, resumable after reconnect) and stateless consumption (omit group_name, each call consumes independently according to the deliver policy with no offset recorded).

Message flow: Message arrives → written to storage → client actively calls FETCH to pull → ACK to confirm → broker advances the consumption offset for that group.


Protocol Overview

Category Subject Description
Mailbox management $mq9.AI.MAILBOX.CREATE Create a mailbox
Messaging $mq9.AI.MSG.SEND.{mail_address} Send a message (priority via mq9-priority header)
Messaging $mq9.AI.MSG.FETCH.{mail_address} Fetch mailbox messages
Messaging $mq9.AI.MSG.ACK.{mail_address} Acknowledge a message
Messaging $mq9.AI.MSG.QUERY.{mail_address} Query messages in a mailbox
Messaging $mq9.AI.MSG.DELETE.{mail_address}.{msg_id} Delete a specific message
Agent management $mq9.AI.AGENT.REGISTER Register an Agent
Agent management $mq9.AI.AGENT.UNREGISTER Unregister an Agent
Agent management $mq9.AI.AGENT.REPORT Report Agent status
Agent management $mq9.AI.AGENT.DISCOVER Discover Agents

All commands use request/reply mode (nats request); the server always returns a response.


Response Format

Each command has its own response structure. All responses include an error field:

Value Meaning
"" Success
Non-empty string Failure; the value is the error description

$mq9.AI.MAILBOX.CREATE

Create a mailbox with a user-defined mail_address.

Request Fields

Field Type Required Description
name string No The mail_address for the mailbox; must conform to the format specification. Auto-generated by the broker if omitted.
ttl u64? No Time-to-live in seconds; 0 means never expires

Response Fields

Field Type Description
error string Empty on success; error message on failure
mail_address string The mailbox address after successful creation

Example

nats request '$mq9.AI.MAILBOX.CREATE' '{"name": "agent.translator.inbox", "ttl": 0}'
# Response
{"error":"","mail_address":"agent.translator.inbox"}

# Duplicate creation → error
nats request '$mq9.AI.MAILBOX.CREATE' '{"name": "agent.translator.inbox"}'
# Response
{"error":"mailbox agent.translator.inbox already exists","mail_address":""}

$mq9.AI.MSG.SEND.{mail_address}

Send a message to the specified mailbox. The payload is a byte array; mq9 does not parse the content.

Request Parameters

Subject: $mq9.AI.MSG.SEND.{mail_address}

Payload: Arbitrary byte array; mq9 does not parse the content.

Headers (all optional)

Header Description
mq9-key: {key} Dedup/compaction key. For the same key, the storage layer retains only the latest message, overwriting older ones
mq9-delay: {seconds} Delay delivery by this many seconds. The message is not visible in FETCH results until the delay expires. Delayed messages return msg_id: -1
mq9-ttl: {seconds} Message-level TTL in seconds. The message expires at send_time + ttl, independent of the mailbox TTL
mq9-tags: {tag1},{tag2} Comma-separated user tags, e.g. billing,vip. Filterable via the tags field in QUERY
mq9-priority: {value} Message priority: normal (default) / urgent / critical. Defaults to normal if omitted

SEND Response Fields

Field Type Description
error string Empty on success; error message on failure (e.g., mailbox does not exist)
msg_id i64 Storage offset assigned after write; -1 for delayed messages

Priority Description

Value Typical use case
normal (default) Task dispatch, result return, status reporting
urgent Approval requests, important notifications
critical Task interruption, emergency instructions

Same-priority messages follow FIFO; across priorities: critical > urgent > normal.

SEND Example

# Normal message
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' '{"text":"hello"}'
# Response
{"error":"","msg_id":0}

# With key (storage retains only the latest message for the same key)
nats request '$mq9.AI.MSG.SEND.task.001.callback' \
  -H "mq9-key:status" \
  '{"status":"running"}'
# Response
{"error":"","msg_id":1}

# With tags (filterable via QUERY tags field)
nats request '$mq9.AI.MSG.SEND.agent.order.inbox' \
  -H "mq9-tags:billing,vip" \
  '{"order_id":"o-001"}'
# Response
{"error":"","msg_id":2}

# Delayed delivery — 60 seconds (msg_id -1 means delayed)
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' \
  -H "mq9-delay:60" \
  '{"text":"delayed task"}'
# Response
{"error":"","msg_id":-1}

# Message-level TTL — expires 300 seconds after send (independent of mailbox TTL)
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' \
  -H "mq9-ttl:300" \
  '{"text":"short-lived message"}'
# Response
{"error":"","msg_id":3}

# Urgent message (priority via header)
nats request '$mq9.AI.MSG.SEND.agent.translator.inbox' \
  -H "mq9-priority:urgent" \
  '{"alert":"please expedite"}'
# Response
{"error":"","msg_id":4}

$mq9.AI.MSG.FETCH.{mail_address}

Pull messages from a mailbox. Supports two consumption modes: stateful consumption (pass group_name and the server records the offset) and stateless consumption (omit group_name and each call starts independently according to the deliver policy).

FETCH Request Fields

Field Type Required Description
group_name string? No Consumer group name. When provided, enables stateful consumption — members of the same group share an offset. When omitted, consumption is stateless: each call starts according to the deliver policy with no offset recorded.
deliver string No Starting point policy; defaults to latest. For stateful consumption, only takes effect when there is no existing offset record or when force_deliver: true.
from_time u64? No Effective when deliver: "from_time"; Unix timestamp in seconds
from_id u64? No Effective when deliver: "from_id"; fetch starts from this msg_id (inclusive)
force_deliver bool? No Only valid for stateful consumption. When true, the existing offset is ignored and consumption restarts according to deliver
config object? No Fetch behavior configuration; see below

deliver Policy

Value Description
latest (default) Only pull new messages from this point forward
earliest Start from the oldest message in the mailbox
from_time Start from after a specified timestamp; requires the from_time field
from_id Start from a specified msg_id (inclusive); requires the from_id field

Offset Behavior for Stateful Consumption (group_name provided)

Condition Behavior
Offset record exists and force_deliver: false Resume from the last checkpoint; deliver has no effect
Offset record exists and force_deliver: true Ignore the offset and restart according to the deliver policy
No offset record Start according to the deliver policy (first-time consumption)

Stateless Consumption (group_name omitted)

The server generates a temporary random group, uses the deliver policy to locate the starting point, and does not commit an offset after consumption. Suitable for inspection, debugging, or one-off reads.

config Fields

Field Type Default Description
num_msgs u32? 100 Maximum number of messages to pull in a single call
max_wait_ms u64? 500 How long the server waits when there is no data (milliseconds). Defaults to 500ms if omitted; 0 means return immediately without waiting. An empty list is returned when the wait expires, preventing clients from hammering the server in a tight poll loop.

FETCH Response Fields

Field Type Description
error string Empty on success; error message on failure (e.g., mailbox does not exist)
messages array List of messages; each entry contains msg_id, payload, priority, and create_time

FETCH Example

# Stateless consumption: no group_name, starts from the latest message each time (default deliver: latest)
nats request '$mq9.AI.MSG.FETCH.task.001.callback' '{}'

# Stateless consumption: full read from the earliest message each time
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
  '{"deliver": "earliest"}'

# Stateful consumption: resume from checkpoint if offset exists; first call pulls only new messages (default deliver: latest)
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
  '{"group_name": "worker-group-1"}'

# Stateful consumption: resume from checkpoint if offset exists; first call starts from earliest message
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
  '{"group_name": "worker-group-1", "deliver": "earliest"}'

# Stateful consumption: force reset offset and restart from earliest
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
  '{"group_name": "worker-group-1", "deliver": "earliest", "force_deliver": true}'

# Specify max messages per call (default is 100)
nats request '$mq9.AI.MSG.FETCH.task.001.callback' \
  '{"group_name": "worker-group-1", "config": {"num_msgs": 50}}'

$mq9.AI.MSG.ACK.{mail_address}

Confirm that a message has been processed; the broker advances the consumption offset for the consumer group.

ACK Request Fields

Field Type Required Description
group_name string Yes Consumer group name, matching the one used in FETCH
mail_address string Yes Mailbox address
msg_id u64 Yes The ID of the message to acknowledge (from the FETCH response)

ACK Response Fields

Field Type Description
error string Empty on success; error message on failure

ACK Example

nats request '$mq9.AI.MSG.ACK.task.001.callback' \
  '{"group_name": "worker-group-1", "mail_address": "task.001.callback", "msg_id": 5}'
# Response
{"error":""}

$mq9.AI.MSG.QUERY.{mail_address}

Query messages currently stored in a mailbox. Does not affect subscription delivery.

QUERY Request Fields

Field Type Description
key string? Query by key; returns the latest message for that key
limit u64? Maximum number of messages to return
since u64? Return messages after this timestamp

QUERY Response Fields

Field Type Description
error string Empty on success
messages array List of messages

QUERY Example

# Query all messages
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{}'

# Query the latest message with key=status
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{"key": "status"}'

# Most recent 10 messages
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{"limit": 10}'

# Messages after a specific timestamp
nats request '$mq9.AI.MSG.QUERY.task.001.callback' '{"since": 1234567890}'

$mq9.AI.MSG.DELETE.{mail_address}.{msg_id}

Delete a specific message from a mailbox.

DELETE Example

nats request '$mq9.AI.MSG.DELETE.task.001.callback.2' ''
# Response
{"error":"","deleted":true}

$mq9.AI.AGENT.REGISTER

Register an Agent. The body contains upper-layer protocol content (the current example uses an A2A AgentCard); mq9 does not interfere with the body and only requires that it carry a mailbox field as the routing identifier. The same applies if a different protocol is used in the future.

REGISTER Example

nats request '$mq9.AI.AGENT.REGISTER' \
  '{ ...AgentCard, mailbox = "mq9://broker/agent.translator.inbox"... }'
# Response
{"error":""}

$mq9.AI.AGENT.UNREGISTER

Unregister an Agent.

UNREGISTER Example

nats request '$mq9.AI.AGENT.UNREGISTER' \
  '{ ...mailbox = "mq9://broker/agent.translator.inbox"... }'
# Response
{"error":""}

$mq9.AI.AGENT.REPORT

Report Agent status. The body contains upper-layer protocol content; mq9 does not interfere.

REPORT Example

nats request '$mq9.AI.AGENT.REPORT' \
  '{ ...mailbox = "mq9://broker/agent.translator.inbox", status fields defined by the upper-layer protocol... }'
# Response
{"error":""}

$mq9.AI.AGENT.DISCOVER

Search for registered Agents by criteria. Returns the raw list of registered content; mq9 does not transform or wrap it.

DISCOVER Request Fields

Field Type Description
text string? Full-text search query (keyword-based)
semantic string? Semantic search query (natural language, vector-based). Takes priority over text when both are provided.
limit number? Maximum number of results per page (default 20)
page number? Page number, starting from 1 (default 1)

When neither text nor semantic is provided, all registered Agents for the tenant are returned.

DISCOVER Example

# Full-text search
nats request '$mq9.AI.AGENT.DISCOVER' '{"text": "payment invoice"}'

# Semantic search (vector-based, takes priority over text)
nats request '$mq9.AI.AGENT.DISCOVER' '{"semantic": "process a payment and generate invoice"}'

# Pagination: page 2, 10 results per page
nats request '$mq9.AI.AGENT.DISCOVER' '{"text": "payment", "limit": 10, "page": 2}'

# List all
nats request '$mq9.AI.AGENT.DISCOVER' '{}'
# Returns: [{ ...raw registered content... }, ...]

Error Reference

Scenario Response example
mailbox does not exist (SEND/SUB/QUERY/DELETE) {"error":"mailbox xxx does not exist"}
mailbox already exists (CREATE is not idempotent) {"error":"mailbox xxx already exists","mail_address":""}
msg_id does not exist (DELETE) {"error":"message not found"}