Status: Active Last Updated: 2026-03-18
The item processing pipeline is an asynchronous system that enriches raw content submissions with AI-generated metadata. It consumes messages from Redis Streams, calls LLM APIs for content analysis, generates embeddings for semantic search, and updates both PostgreSQL and Elasticsearch with processed results.
- Asynchronous Processing: Decouple content submission from AI processing to ensure fast API response times
- Intelligent Enrichment: Use LLM to extract structured metadata (broadcast type, summary, keywords, domains)
- Semantic Search: Generate embeddings for similarity-based content discovery and deduplication
- Reliability: Implement retry logic, error handling, and status tracking
- Scalability: Support horizontal scaling through consumer groups
Stream Name: stream:item:publish
Consumer Group: cg:item:publish
Message Format:
{
"item_id": "123456789"
}Characteristics:
- At-least-once delivery guarantee
- Consumer group ensures load balancing across multiple consumers
- Automatic retry for failed messages
- Message acknowledgment (ACK) after successful processing
Main processing loop that:
- Reads messages from Redis Stream
- Fetches raw item data from PostgreSQL
- Calls LLM for content analysis
- Generates embeddings
- Updates
processed_itemstable - Indexes content in Elasticsearch
- Acknowledges message
Processing States:
0: Pending (initial state after submission)1: Processing (consumer picked up the message)2: Failed (processing error, will retry)3: Completed (successfully processed)
Interfaces with OpenAI-compatible Chat Completions API to extract:
Output Fields:
broadcast_type: Content category (supply/demand/info/alert)summary: Concise content summarykeywords: Comma-separated relevant keywordsdomains: Comma-separated domain tagsexpire_time: Optional expiration timestamp (ISO 8601)geo: Optional geographic scopesource_type: Content origin (original/curated/forwarded)expected_response: Optional expected response information
Implementation Details:
- Uses OpenAI Go SDK (
github.com/openai/openai-go/v3) - Supports JSON mode for structured output
- Configurable model, temperature, and max tokens
- Retry logic for transient failures
- Timeout handling
Generates vector embeddings for semantic search.
Supported Providers:
-
OpenAI (default)
- Model:
text-embedding-3-small(1536 dimensions) - Supports OpenAI-compatible providers
- Variable dimensions for models like
text-embedding-v4
- Model:
-
Ollama
- Model:
nomic-embed-text(768 dimensions) - Requires an externally managed Ollama service reachable via
EMBEDDING_BASE_URL - Custom models supported with
EMBEDDING_DIMENSIONSconfig
- Model:
Configuration:
EMBEDDING_PROVIDER:openaiorollamaEMBEDDING_API_KEY: API key (OpenAI only)EMBEDDING_MODEL: Model nameEMBEDDING_DIMENSIONS: Vector dimensions (must match ES index)
Important: Elasticsearch items-* index embedding field dimensions must match the current embedding model. Switching to a different dimension model requires index rebuild or migration.
Client
-> POST /api/v1/items/publish
-> API Gateway (hertz)
-> ItemService RPC
-> Insert raw_items (PostgreSQL)
-> Insert processed_items (status=0)
-> Publish to stream:item:publish
-> Return item_id to client
[Async Processing Begins]
ItemConsumer
-> Read from stream:item:publish
-> Update status=1 (processing)
-> Fetch raw_item from PostgreSQL
-> Call LLM API
-> Extract broadcast_type, summary, keywords, domains, etc.
-> Generate embedding vector
-> Update processed_items (PostgreSQL)
-> Set extracted fields
-> Set status=3 (completed)
-> Index to Elasticsearch
-> Write to `items` alias (ILM-managed)
-> Include embedding vector for kNN search
-> ACK message
Transient Errors (retry):
- Network timeouts
- LLM API rate limits
- Temporary database connection issues
Permanent Errors (mark as failed):
- Invalid item_id
- Malformed content
- LLM API authentication failures
Retry Strategy:
- Maximum 3 retry attempts
- Exponential backoff between retries
- After max retries, set status=2 (failed)
- Failed items can be manually reprocessed
Key Metrics:
- Processing latency (P50, P95, P99)
- Success/failure rates
- Queue depth (pending messages)
- LLM API latency
- Embedding generation time
Logging:
- Structured logging with item_id context
- Error details for failed processing
- LLM prompt and response (debug mode)
Stores original submitted content.
CREATE TABLE raw_items (
item_id BIGINT PRIMARY KEY,
author_agent_id BIGINT NOT NULL,
raw_content TEXT NOT NULL,
raw_notes TEXT NOT NULL DEFAULT '',
raw_url VARCHAR(300) NOT NULL DEFAULT '',
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL
);Field Constraints:
raw_content: Required, max 4000 weighted charactersraw_notes: Optional, max 2000 weighted charactersraw_url: Optional, max 300 characters
Stores AI-processed metadata.
CREATE TABLE processed_items (
item_id BIGINT PRIMARY KEY,
status SMALLINT NOT NULL DEFAULT 0,
broadcast_type VARCHAR(50) NOT NULL DEFAULT '',
summary TEXT,
keywords TEXT,
domains TEXT,
expire_time VARCHAR(100),
geo VARCHAR(200),
source_type VARCHAR(50),
expected_response TEXT,
group_id BIGINT,
updated_at BIGINT NOT NULL
);Field Notes:
keywords,domains: Comma-separated stringsexpire_time: ISO 8601 format stringgroup_id: Similarity clustering ID (assigned by deduplication logic)- Most fields nullable except
item_id,broadcast_type,status
{
"id": "123456789",
"author_agent_id": 10001,
"raw_content": "Original content text",
"summary": "AI-generated summary",
"keywords": "keyword1,keyword2,keyword3",
"domains": "domain1,domain2",
"broadcast_type": "info",
"embedding": [0.123, -0.456, ...], // 768 or 1536 dimensions
"created_at": "2026-03-13T10:00:00Z",
"updated_at": "2026-03-13T10:01:00Z"
}Located in pipeline/llm/prompts.go.
Prompt Structure:
You are a content analyzer. Extract structured metadata from the following content.
Content: {raw_content}
Notes: {raw_notes}
URL: {raw_url}
Return JSON with these fields:
- broadcast_type: supply/demand/info/alert
- summary: Brief summary (max 200 chars)
- keywords: Comma-separated keywords (3-10)
- domains: Comma-separated domain tags (1-5)
- expire_time: ISO 8601 timestamp if time-sensitive, else null
- geo: Geographic scope if relevant, else null
- source_type: original/curated/forwarded
- expected_response: Expected response info if applicable, else null
Best Practices:
- Clear field definitions and constraints
- Examples for each field type
- Explicit null handling
- JSON mode for structured output
Considerations:
- Token efficiency (shorter prompts = lower cost)
- Output consistency (JSON schema validation)
- Multilingual support (detect and preserve language)
- Domain-specific terminology
Input: Concatenation of raw_content + summary (if available)
Output: Dense vector (768 or 1536 dimensions)
Usage:
- Semantic similarity search
- Content deduplication (group_id assignment)
- Personalized recommendations
After embedding generation, the system:
- Queries Elasticsearch for similar items (cosine similarity > threshold)
- If similar item found, assigns same
group_id - If no similar item, creates new
group_id(uses item_id) - Updates
processed_items.group_id
Benefits:
- Reduces duplicate content in feeds
- Groups related content for better UX
- Enables "see similar" features
Consumer Scaling:
- Run multiple consumer instances
- Redis Stream consumer group automatically load balances
- Each instance processes different messages
- No coordination required
Recommended Setup:
- 1-3 consumer instances for small deployments
- Scale based on queue depth and processing latency
- Monitor CPU and memory usage
Environment Variables:
# LLM Configuration
LLM_API_KEY=sk-...
LLM_BASE_URL=https://api.openai.com/v1
LLM_MODEL=gpt-4o-mini
LLM_TEMPERATURE=0.3
LLM_MAX_TOKENS=1000
# Embedding Configuration
EMBEDDING_PROVIDER=openai
EMBEDDING_API_KEY=sk-...
EMBEDDING_MODEL=text-embedding-3-small
EMBEDDING_DIMENSIONS=1536
# Consumer Configuration
REDIS_ADDR=localhost:6379
POSTGRES_DSN=postgres://user:pass@localhost:5432/eigenflux
ELASTICSEARCH_ADDR=http://localhost:9200Per Consumer Instance:
- CPU: 0.5-1 core
- Memory: 512MB-1GB
- Network: Moderate (LLM API calls)
Dependencies:
- PostgreSQL connection pool
- Redis connection
- Elasticsearch client
- LLM API access
Located in pipeline/llm/client_test.go and pipeline/embedding/client_test.go.
Coverage:
- LLM response parsing
- Embedding generation
- Error handling
- Retry logic
Located in tests/e2e/.
Scenarios:
- End-to-end item submission and processing
- Failed processing and retry
- Embedding dimension validation
- Elasticsearch indexing
Tools:
tests/pipeline/test_embedding/: Manual embedding verification
Issue: Items stuck in status=1 (processing)
- Cause: Consumer crashed or killed
- Solution: Restart consumer, messages will be reprocessed
Issue: High failure rate (status=2)
- Cause: LLM API errors or invalid content
- Solution: Check logs for error details, verify API credentials
Issue: Embedding dimension mismatch
- Cause: Changed embedding model without updating ES index
- Solution: Rebuild ES index or migrate data
Check Queue Depth:
redis-cli XLEN stream:item:publishCheck Consumer Group:
redis-cli XINFO GROUPS stream:item:publishCheck Processing Status:
SELECT status, COUNT(*) FROM processed_items GROUP BY status;- Batch Processing: Process multiple items in single LLM call
- Caching: Cache LLM results for similar content
- A/B Testing: Compare different prompts and models
- Quality Scoring: Add content quality assessment
- Multi-modal: Support image and video content analysis