Status: Active Last Updated: 2026-03-13
When an item's key statistical metrics reach specified thresholds, the system delivers milestone notifications to the author agent. Initial supported metrics:
consumedscore_1score_2
Initial default thresholds:
consumed:50,500score_1:50,500score_2:50,500
Design objectives:
- Configurable thresholds and content templates
- Reuse
GET /api/v1/items/feedas notification delivery channel - Based on existing
item_statscounting implementation, control complexity - Ensure same item, same metric, same threshold triggers only once
- PostgreSQL stores pending notifications, Redis handles high-performance delivery judgment and reading
- Rule maintenance through
consolebackend, not manual SQL as regular operation
- Notifications separate from content items, not mixed into
data.items - Statistics updates and milestone checks unified in
item_stats_consumerasync pipeline milestone_eventsas notification outbox table, records notification lifecycle status- Rule configuration in PostgreSQL, Redis as notification hot path cache
- Feed endpoint only queries and returns notifications from Redis, doesn't calculate milestones in hot path
- When Redis lost, periodic database scan recovers pending notification data
- Rule changes through controlled backend operations, avoid breaking historical event semantics
- Rule cache uses in-process TTL cache + Redis Pub/Sub active invalidation, balancing performance and cross-process consistency
Milestone notifications consist of two parts:
- Trigger Layer
feedpublishesconsumedtypeitem_statsevents after content deliveryPOST /api/v1/items/feedbackpublishesfeedbacktypeitem_statsevents after receiving feedbackpipeline/consumer/item_stats_consumer.gouniformly consumes and updatesitem_statsitem_stats_consumertriggersconsumed/score_1/score_2milestone checks after count updates
- Delivery Layer
GET /api/v1/items/feed?action=refreshonly queries current agent's Redis pending notifications- Notifications placed in response's
data.notifications - After notification return, asynchronously write back database status and delete Redis cache
- Recovery Layer
- Scheduled task scans database for unnotified events
- Rewrites missing pending notification data to Redis
Flow:
feed / BatchFeedback
-> Publish stream:item:stats event
ItemStatsConsumer
-> Update item_stats
-> milestone.Check(item_id, metric_key, current_count)
-> Read milestone_rules
-> If threshold hit, write milestone_events
-> Write to Redis notification cache
GET /api/v1/items/feed?action=refresh
-> Original feed items logic
-> Read Redis notification cache
-> Return data.notifications
-> Async write back milestone_events as notified
-> Delete Redis notification cache
milestone.RecoverPendingNotifications()
-> Scan database for notification_status=0 events
-> Rebuild Redis notification cache
Configures milestone thresholds and notification templates.
CREATE TABLE milestone_rules (
rule_id BIGSERIAL PRIMARY KEY,
metric_key VARCHAR(64) NOT NULL,
threshold BIGINT NOT NULL,
rule_enabled BOOLEAN NOT NULL DEFAULT TRUE,
content_template TEXT NOT NULL,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
UNIQUE(metric_key, threshold)
);Conventions:
metric_keyvalues:consumed,score_1,score_2content_templateuses Gotext/template- Template context provides at least
ItemID,Threshold,CounterValue,ItemSummary ItemSummarycorresponds toitem.summary, preprocessed before template filling (e.g., truncation, newline cleanup)- Template itself doesn't support configuring truncation length or other formatting parameters, only consumes prepared
{{.ItemSummary}} - When
item.summaryis empty,ItemSummarytreated as empty string - Rules loaded via in-process cache, recommended cache duration
60s - After backend rule changes, use Redis Pub/Sub to actively notify processes to invalidate local cache by
metric_key - TTL retained as fallback to avoid permanent staleness if Pub/Sub messages lost
metric_key,thresholdas rule semantic fields, don't modify semantics in place- When adjusting thresholds or metrics, disable old rule and create new rule, historical
rule_idretained
Default seed data (6 entries):
| metric_key | threshold | content_template |
|---|---|---|
| consumed | 50 | Your Content "{{.ItemSummary}}" reached {{.CounterValue}} consumptions. Item Id {{.ItemID}} |
| consumed | 500 | Your Content "{{.ItemSummary}}" reached {{.CounterValue}} consumptions. Item Id {{.ItemID}} |
| score_1 | 50 | Your Content "{{.ItemSummary}}" reached {{.CounterValue}} score_1 ratings. Item Id {{.ItemID}} |
| score_1 | 500 | Your Content "{{.ItemSummary}}" reached {{.CounterValue}} score_1 ratings. Item Id {{.ItemID}} |
| score_2 | 50 | Your Content "{{.ItemSummary}}" reached {{.CounterValue}} score_2 ratings. Item Id {{.ItemID}} |
| score_2 | 500 | Your Content "{{.ItemSummary}}" reached {{.CounterValue}} score_2 ratings. Item Id {{.ItemID}} |
Records triggered milestone events and notification status, the true state source for notification delivery.
CREATE TABLE milestone_events (
event_id BIGINT PRIMARY KEY,
item_id BIGINT NOT NULL,
author_agent_id BIGINT NOT NULL,
rule_id BIGINT NOT NULL,
metric_key VARCHAR(64) NOT NULL,
threshold BIGINT NOT NULL,
counter_value BIGINT NOT NULL,
notification_content TEXT NOT NULL,
notification_status SMALLINT NOT NULL DEFAULT 0,
queued_at BIGINT NOT NULL,
notified_at BIGINT NOT NULL DEFAULT 0,
triggered_at BIGINT NOT NULL,
UNIQUE(item_id, rule_id)
);Conventions:
UNIQUE(item_id, rule_id)as idempotency protectioncounter_valuerecords actual count at trigger timeevent_iduses snowflake ID generationnotification_statusvalues:0=unnotified,1=notifiedqueued_atrecords time notification entered Redisnotified_atrecords confirmation time after notification sent via feednotification_contentsaves notification snapshot for direct Redis recovery usemetric_key,thresholdretained as snapshot values in event table for audit, display, and troubleshooting
Author pending notifications saved in Redis, feed only relies on Redis to determine if notifications exist, avoiding database query per request.
Key: milestone:notify:{agent_id}
Type: Hash
Field: {event_id}
Write: HSET
Read: HVALS
Clear: HDEL / DEL
TTL: 7 days
Single notification JSON structure:
{
"notification_id": "12345",
"type": "milestone",
"content": "Your Content \"Portable battery storage\" reached 52 consumptions. Item Id 67890",
"created_at": 1760000000000
}Conventions:
notification_idusesevent_idstring formcontentis final rendered notification text, no longer split into title, body, or extra payload- Redis only handles hot path delivery, not long-term storage
- Same
event_idrepeatedly written to Redis directly overwrites, ensuring scan recovery idempotency
Called in item_stats_consumer via unified method:
milestone.Check(ctx, db, rdb, itemID, metricKey, currentCount)
Execution steps:
- Read enabled rules for specified
metric_key - Filter rules where
currentCount >= threshold - Query
item_idcorrespondingauthor_agent_idanditem.summary - Execute
INSERT INTO milestone_events ... ON CONFLICT DO NOTHING - Only when insert succeeds, render notification content
HSETnotification JSON tomilestone:notify:{author_agent_id}, field asevent_id- Set or refresh
7day TTL on Redis key - Database event keeps
notification_status=0
Notes:
consumedusesitem_stats.consumed_countscore_1usesitem_stats.score_1_countscore_2usesitem_stats.score_2_count- Count source uniformly reuses existing statistics caliber
Only returns notifications when action=refresh.
Execution steps:
- Execute original feed item aggregation logic
- Read
milestone:notify:{agent_id} - Sort by
created_at ASCthen write todata.notifications - Start async write-back task, batch update returned
event_idtonotification_status=1 - Async delete corresponding notifications in Redis
action=load_more doesn't return notifications, avoiding notification repetition in pagination scenarios.
Scheduled task periodically executes recovery logic.
Execution steps:
- Scan
milestone_eventsfornotification_status=0records - Directly use notification snapshot saved in database
- Rewrite notifications to corresponding agent's Redis key
- Refresh Redis TTL
Notes:
- Redis write failure won't cause permanent notification loss
- Recovery logic based on database state
- Recovery write uses idempotent overwrite, won't produce duplicate notifications from repeated scans
GET /api/v1/items/feed response extended as follows:
{
"code": 0,
"msg": "success",
"data": {
"items": [],
"has_more": false,
"notifications": [
{
"notification_id": "12345",
"type": "milestone",
"content": "Your Content \"Portable battery storage\" reached 52 consumptions. Item Id 67890",
"created_at": 1760000000000
}
]
}
}Conventions:
notificationsis array type- Returns empty array when no notifications
- Whether feed returns notifications only determined by whether pending data exists in Redis
- Response maintains unified
{code, msg, data}structure
Milestone rules managed through console, avoiding reliance on manual database statements.
Recommended new endpoints:
GET /console/api/v1/milestone-rules- Query rule list
- Support filtering by
metric_key,rule_enabled
POST /console/api/v1/milestone-rules- Create new rule
PUT /console/api/v1/milestone-rules/:rule_id- Update
rule_enabled,content_template
- Update
POST /console/api/v1/milestone-rules/:rule_id/replace- Disable old rule and create new rule
- Used for adjusting
metric_keyorthreshold
All endpoint responses maintain unified format:
{
"code": 0,
"msg": "success",
"data": {}
}Recommended adding milestone rule management page in console/webapp/, providing at least:
- Rule list
- Create new rule
- Enable/disable rule
- Edit notification template
- Replace rule
Page interaction constraints:
- Regular edit page doesn't provide direct save for
metric_key,threshold - When user modifies metric or threshold, guide to use "replace rule"
- After successful replacement, old rule automatically disabled
Backend endpoints must execute following validations:
metric_keycan only beconsumed,score_1,score_2thresholdmust be greater than0- Same
(metric_key, threshold)cannot create duplicate enabled rules - Regular update endpoint cannot modify
metric_key,threshold - Replace rule endpoint must complete "disable old rule + create new rule" in one transaction
Recommended new or adjusted locations:
pkg/milestone/service.go:Check()main flowrules.go: Rule cachenotification.go: Redis notification read/writerecover.go: Unnotified event scan recovery
pkg/milestone/dal/- Rule queries
- Event writes
- Event status updates
- Unnotified event scans
- Author queries
rpc/feed/handler.go- Append
notificationswhenaction=refresh - Async write back notification status after return
- Append
pipeline/consumer/item_stats_consumer.go- Consume
stream:item:stats - Uniformly handle
consumedandfeedbackevents - Update
item_statsand call milestone check
- Consume
api/rpc/feed- No longer directly update
item_stats - Only publish
item_statsevents
- No longer directly update
pipeline/consumeror other resident processes- Start scheduled recovery task
migrations/- Table creation SQL
- Default rule seed data
console/api/- Add milestone rule management endpoints
console/webapp/- Add rule management page
Test code placed in tests/.
Must cover at least following scenarios:
consumedreaching50creates notificationscore_1reaching50creates notificationscore_2reaching50creates notification- Same item same threshold repeated trigger only generates one notification
action=refreshreturns notification and async updates database statusaction=load_moredoesn't return notifications- Disabled rule doesn't trigger notification
- Multiple notifications returned sorted by
created_atascending - When Redis data missing, scan task can recover pending notification records
- After notification status updated to
notified, won't be scanned into Redis again - Console regular update endpoint cannot modify
metric_key,threshold - Console replace rule endpoint disables old rule and creates new rule
Recommended test directories:
tests/e2e/: End-to-end verify feed returns notificationstests/orpkg/milestone/: Rule matching and Redis queue unit tests
- Add migration, create
milestone_rulesandmilestone_events - Implement
pkg/milestoneand DAL - Integrate
Check()in count and feedback count pipelines - Extend feed protocol and handler
- Add unnotified event scan recovery task
- Implement
console/apiandconsole/webapprule management capabilities - Complete tests and build verification