Skip to content

Commit e61a5ce

Browse files
Unified Event Streaming API (#80)
Replace the dual MessageHub/EventBus system with a unified EventRouter and introduce a new EventService API for streaming events to clients. - Add EventService proto with Subscribe RPC supporting glob pattern filtering (e.g., `task.*`, `message.created`), task scope filtering, and message replay - Implement EventRouter with pattern-based subscription management, replacing both MessageHub (for streaming) and EventBus (for internal coordination) - Add ent hooks to automatically publish CRUD events for agents, models, model providers, and tasks - Update CLI to use EventService.Subscribe for real-time streaming of messages, tool calls, and task updates - Support replaying full conversation history when resuming tasks Co-authored-by: construct-agent <noreply@construct.sh>
1 parent 17170fa commit e61a5ce

48 files changed

Lines changed: 5076 additions & 2423 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ jobs:
3636
3737
- name: Run tests
3838
run: |
39-
(cd backend && go test ./...)
40-
(cd frontend/cli && go test ./...)
39+
(cd backend && go test -p=1 ./...)
40+
(cd frontend/cli && go test -p=1 ./...)
4141
(cd shared && go test ./...)
4242
(cd api/go && go test ./...)
4343
env:

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ construct.db
3636
construct.db-shm
3737
construct.db-wal
3838

39-
.DS_Store
39+
.DS_Store
40+
41+
.plan

api/def/construct/v1/event.proto

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Event API provides real-time event streaming for Construct.
2+
// Events notify clients of changes to tasks, messages, agents, models, and other resources.
3+
// Clients can subscribe with pattern-based filtering and task scope filtering.
4+
syntax = "proto3";
5+
6+
package construct.v1;
7+
8+
import "buf/validate/validate.proto";
9+
import "construct/v1/agent.proto";
10+
import "construct/v1/message.proto";
11+
import "construct/v1/model.proto";
12+
import "construct/v1/modelprovider.proto";
13+
import "construct/v1/task.proto";
14+
import "google/protobuf/timestamp.proto";
15+
16+
option go_package = "github.com/furisto/construct/api/go/v1";
17+
18+
// EventService provides real-time event streaming.
19+
// Clients can subscribe to events with optional filtering by event type patterns and task scope.
20+
service EventService {
21+
// Subscribe streams events matching the filter criteria.
22+
// Supports pattern-based filtering (e.g., "task.*", "*.created") and task scope filtering.
23+
rpc Subscribe(EventSubscribeRequest) returns (stream EventSubscribeResponse) {}
24+
}
25+
26+
// EventSubscribeRequest specifies the subscription filter criteria.
27+
message EventSubscribeRequest {
28+
// event_types specifies which event types to receive using glob patterns.
29+
// Supports: "*" (all), "entity.*" (e.g., "task.*"), "*.action" (e.g., "*.created"), or exact match.
30+
// Empty list subscribes to all events.
31+
repeated string event_types = 1;
32+
33+
// task_id filters events to only those related to a specific task (UUID format, optional).
34+
// When set, only task-scoped events (task.*, message.*, tool.*) for this task are delivered.
35+
optional string task_id = 2 [(buf.validate.field).string.uuid = true];
36+
37+
// replay_after_message_id enables replay of message.created events after this message ID.
38+
// Only applicable when task_id is also set. Only message.created events are replayed.
39+
optional string replay_after_message_id = 3 [(buf.validate.field).string.uuid = true];
40+
}
41+
42+
// EventSubscribeResponse wraps an event in the stream.
43+
message EventSubscribeResponse {
44+
// event is the streamed event.
45+
Event event = 1 [(buf.validate.field).required = true];
46+
}
47+
48+
// EventAction represents the type of change that occurred to a resource.
49+
enum EventAction {
50+
// EVENT_ACTION_UNSPECIFIED is used for non-CRUD events like message.chunk or tool events.
51+
EVENT_ACTION_UNSPECIFIED = 0;
52+
53+
// EVENT_ACTION_CREATED indicates the resource was created.
54+
EVENT_ACTION_CREATED = 1;
55+
56+
// EVENT_ACTION_UPDATED indicates the resource was modified.
57+
EVENT_ACTION_UPDATED = 2;
58+
59+
// EVENT_ACTION_DELETED indicates the resource was deleted.
60+
EVENT_ACTION_DELETED = 3;
61+
}
62+
63+
// Event represents a single event in the stream.
64+
message Event {
65+
// type is the event type string for filtering (e.g., "task.created", "message.chunk", "tool.called").
66+
string type = 1 [(buf.validate.field).required = true];
67+
68+
// action is the action that occurred: created, updated, deleted, or unspecified (for non-CRUD events).
69+
EventAction action = 2 [(buf.validate.field).enum.defined_only = true];
70+
71+
// timestamp is when the change occurred (entity timestamp, e.g., created_at, updated_at).
72+
google.protobuf.Timestamp timestamp = 3 [(buf.validate.field).required = true];
73+
74+
// payload contains the event-specific data.
75+
oneof payload {
76+
TaskEvent task = 10;
77+
MessageEvent message = 11;
78+
MessageChunkEvent message_chunk = 12;
79+
AgentEvent agent = 13;
80+
ModelEvent model = 14;
81+
ModelProviderEvent model_provider = 15;
82+
ToolCalledEvent tool_called = 16;
83+
ToolResultEvent tool_result = 17;
84+
}
85+
}
86+
87+
// TaskEvent contains task event data.
88+
message TaskEvent {
89+
// task is the task entity. For delete events, may only have ID populated.
90+
Task task = 1 [(buf.validate.field).required = true];
91+
92+
// previous_phase is populated on phase change updates.
93+
optional string previous_phase = 2;
94+
}
95+
96+
// MessageEvent contains message event data (created, updated, deleted).
97+
message MessageEvent {
98+
// message is the message entity. For delete events, may only have ID populated.
99+
Message message = 1 [(buf.validate.field).required = true];
100+
}
101+
102+
// MessageChunkEvent contains streaming message chunk data.
103+
// Note: Chunk events are not replayed. After streaming completes, a message.created event is emitted.
104+
message MessageChunkEvent {
105+
// task_id is the task this message belongs to.
106+
string task_id = 1 [(buf.validate.field).string.uuid = true];
107+
108+
// message_id is the message being streamed.
109+
string message_id = 2 [(buf.validate.field).string.uuid = true];
110+
111+
// chunk is the delta content (incremental text to append), not cumulative.
112+
string chunk = 3;
113+
114+
// chunk_index is 0-based, per-message, monotonically increasing.
115+
int32 chunk_index = 4;
116+
}
117+
118+
// AgentEvent contains agent event data.
119+
message AgentEvent {
120+
// agent is the agent entity. For delete events, may only have ID populated.
121+
Agent agent = 1 [(buf.validate.field).required = true];
122+
}
123+
124+
// ModelEvent contains model event data.
125+
message ModelEvent {
126+
// model is the model entity. For delete events, may only have ID populated.
127+
Model model = 1 [(buf.validate.field).required = true];
128+
}
129+
130+
// ModelProviderEvent contains model provider event data.
131+
// Note: Must never include credentials/API keys in the payload.
132+
message ModelProviderEvent {
133+
// model_provider is the model provider entity. For delete events, may only have ID populated.
134+
ModelProvider model_provider = 1 [(buf.validate.field).required = true];
135+
}
136+
137+
// ToolCalledEvent contains tool call event data.
138+
// Emitted before tool execution begins.
139+
message ToolCalledEvent {
140+
// task_id is the task where this tool was called.
141+
string task_id = 1 [(buf.validate.field).string.uuid = true];
142+
143+
// tool_call contains the tool name and input.
144+
ToolCall tool_call = 2 [(buf.validate.field).required = true];
145+
}
146+
147+
// ToolResultEvent contains tool result event data.
148+
// Emitted after tool execution completes.
149+
message ToolResultEvent {
150+
// task_id is the task where this tool was executed.
151+
string task_id = 1 [(buf.validate.field).string.uuid = true];
152+
153+
// tool_result contains the tool name and output.
154+
ToolResult tool_result = 2 [(buf.validate.field).required = true];
155+
}

api/def/construct/v1/message.proto

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,20 +77,11 @@ message MessageSpec {
7777
repeated MessagePart content = 1;
7878
}
7979

80-
enum ContentStatus {
81-
CONTENT_STATUS_UNSPECIFIED = 0;
82-
CONTENT_STATUS_PARTIAL = 1;
83-
CONTENT_STATUS_COMPLETE = 2;
84-
}
85-
8680
// MessageStatus contains the observed state and usage information of the message.
8781
message MessageStatus {
8882
// usage tracks resource consumption and costs associated with generating this message.
8983
MessageUsage usage = 1;
9084

91-
// content_state indicates the status of the message content.
92-
ContentStatus content_state = 2;
93-
9485
// is_final_response indicates whether this message is the final response to the user's request.
9586
bool is_final_response = 3;
9687
}

api/def/construct/v1/task.proto

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package construct.v1;
77

88
import "buf/validate/validate.proto";
99
import "construct/v1/common.proto";
10-
import "construct/v1/message.proto";
1110
import "google/protobuf/timestamp.proto";
1211

1312
option go_package = "github.com/furisto/construct/api/go/v1";
@@ -35,9 +34,6 @@ service TaskService {
3534
// DeleteTask removes a task from the system.
3635
rpc DeleteTask(DeleteTaskRequest) returns (DeleteTaskResponse) {}
3736

38-
// Subscribe to task events.
39-
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse) {}
40-
4137
// SuspendTask suspends a task.
4238
rpc SuspendTask(SuspendTaskRequest) returns (SuspendTaskResponse) {}
4339
}
@@ -231,26 +227,6 @@ message DeleteTaskRequest {
231227
// DeleteTaskResponse confirms the task deletion (empty response).
232228
message DeleteTaskResponse {}
233229

234-
message SubscribeRequest {
235-
string task_id = 1 [(buf.validate.field).string.uuid = true];
236-
}
237-
238-
// TaskEvent represents a task state change notification
239-
message TaskEvent {
240-
// task_id is the ID of the task that changed
241-
string task_id = 1 [(buf.validate.field).string.uuid = true];
242-
243-
// timestamp when the event occurred
244-
google.protobuf.Timestamp timestamp = 2 [(buf.validate.field).required = true];
245-
}
246-
247-
message SubscribeResponse {
248-
oneof event {
249-
Message message = 1;
250-
TaskEvent task_event = 2;
251-
}
252-
}
253-
254230
message SuspendTaskRequest {
255231
string task_id = 1 [(buf.validate.field).string.uuid = true];
256232
}

api/go/client/client.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Client struct {
2323
message v1connect.MessageServiceClient
2424
auth v1connect.AuthServiceClient
2525
skill v1connect.SkillServiceClient
26+
event v1connect.EventServiceClient
2627
}
2728

2829
type ClientOptions struct {
@@ -108,6 +109,7 @@ func NewClient(endpointContext EndpointContext, options ...ClientOption) (*Clien
108109
message: v1connect.NewMessageServiceClient(opts.HTTPClient, baseURL, opts.ConnectOptions...),
109110
auth: v1connect.NewAuthServiceClient(opts.HTTPClient, baseURL, opts.ConnectOptions...),
110111
skill: v1connect.NewSkillServiceClient(opts.HTTPClient, baseURL, opts.ConnectOptions...),
112+
event: v1connect.NewEventServiceClient(opts.HTTPClient, baseURL, opts.ConnectOptions...),
111113
}, nil
112114
}
113115

@@ -139,6 +141,10 @@ func (c *Client) Skill() v1connect.SkillServiceClient {
139141
return c.skill
140142
}
141143

144+
func (c *Client) Event() v1connect.EventServiceClient {
145+
return c.event
146+
}
147+
142148
type MockClient struct {
143149
ModelProvider *mocks.MockModelProviderServiceClient
144150
Model *mocks.MockModelServiceClient
@@ -147,6 +153,7 @@ type MockClient struct {
147153
Message *mocks.MockMessageServiceClient
148154
Auth *mocks.MockAuthServiceClient
149155
Skill *mocks.MockSkillServiceClient
156+
Event *mocks.MockEventServiceClient
150157
}
151158

152159
func NewMockClient(ctrl *gomock.Controller) *MockClient {
@@ -158,6 +165,7 @@ func NewMockClient(ctrl *gomock.Controller) *MockClient {
158165
Message: mocks.NewMockMessageServiceClient(ctrl),
159166
Auth: mocks.NewMockAuthServiceClient(ctrl),
160167
Skill: mocks.NewMockSkillServiceClient(ctrl),
168+
Event: mocks.NewMockEventServiceClient(ctrl),
161169
}
162170
}
163171

@@ -170,6 +178,7 @@ func (c *MockClient) Client() *Client {
170178
message: c.Message,
171179
auth: c.Auth,
172180
skill: c.Skill,
181+
event: c.Event,
173182
}
174183
}
175184

api/go/client/mocks/event.connect_mock.go

Lines changed: 96 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)