-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.go
More file actions
294 lines (241 loc) · 12 KB
/
Copy pathclient.go
File metadata and controls
294 lines (241 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
package catbird
import (
"context"
"time"
)
// Client is a facade for interacting with Catbird
type Client struct {
Conn Conn
}
// New creates a new Client with the given database connection.
//
// The connection can be a *pgxpool.Pool, *pgx.Conn, or pgx.Tx.
func New(conn Conn) *Client {
return &Client{Conn: conn}
}
// CreateQueue creates a queue with the given name and optional options.
func (c *Client) CreateQueue(ctx context.Context, queueName string, opts ...QueueOpts) error {
return CreateQueue(ctx, c.Conn, queueName, opts...)
}
// GetQueue retrieves queue metadata by name.
func (c *Client) GetQueue(ctx context.Context, queueName string) (*QueueInfo, error) {
return GetQueue(ctx, c.Conn, queueName)
}
// ListQueues returns all queues
func (c *Client) ListQueues(ctx context.Context) ([]*QueueInfo, error) {
return ListQueues(ctx, c.Conn)
}
// DeleteQueue deletes a queue and all its messages.
// Returns true if the queue existed.
func (c *Client) DeleteQueue(ctx context.Context, queueName string) (bool, error) {
return DeleteQueue(ctx, c.Conn, queueName)
}
// Send enqueues a message to the specified queue.
func (c *Client) Send(ctx context.Context, queueName string, body any, opts ...SendOpts) error {
return Send(ctx, c.Conn, queueName, body, opts...)
}
// SendMany enqueues multiple messages to the specified queue.
func (c *Client) SendMany(ctx context.Context, queueName string, bodies []any, opts ...SendManyOpts) error {
return SendMany(ctx, c.Conn, queueName, bodies, opts...)
}
// Bind subscribes a queue to a topic pattern.
// Pattern supports exact topics and wildcards: * (single token), # (multi-token tail).
// Examples: "foo.bar", "foo.*.bar", "foo.bar.#"
func (c *Client) Bind(ctx context.Context, queueName string, pattern string) error {
return Bind(ctx, c.Conn, queueName, pattern)
}
// Unbind unsubscribes a queue from a topic pattern.
func (c *Client) Unbind(ctx context.Context, queueName string, pattern string) (bool, error) {
return Unbind(ctx, c.Conn, queueName, pattern)
}
// Publish sends a message to all queues subscribed to the specified topic.
func (c *Client) Publish(ctx context.Context, topic string, body any, opts ...PublishOpts) (int, error) {
return Publish(ctx, c.Conn, topic, body, opts...)
}
// PublishMany sends multiple messages to all queues subscribed to the specified topic.
func (c *Client) PublishMany(ctx context.Context, topic string, bodies []any, opts ...PublishManyOpts) (int, error) {
return PublishMany(ctx, c.Conn, topic, bodies, opts...)
}
// Read reads up to quantity messages from the queue, hiding them from other
// readers for the specified duration.
func (c *Client) Read(ctx context.Context, queueName string, quantity int, hideFor time.Duration) ([]Message, error) {
return Read(ctx, c.Conn, queueName, quantity, hideFor)
}
// ReadPoll reads messages from a queue with polling support.
// It polls repeatedly at the specified interval until messages are available
// or the pollFor timeout is reached.
func (c *Client) ReadPoll(ctx context.Context, queueName string, quantity int, hideFor time.Duration, opts ...ReadPollOpts) ([]Message, error) {
return ReadPoll(ctx, c.Conn, queueName, quantity, hideFor, opts...)
}
// Hide hides a single message from being read for the specified duration.
// Returns true if the message existed.
func (c *Client) Hide(ctx context.Context, queueName string, id int64, hideFor time.Duration) (bool, error) {
return Hide(ctx, c.Conn, queueName, id, hideFor)
}
// HideMany hides multiple messages from being read for the specified duration.
func (c *Client) HideMany(ctx context.Context, queueName string, ids []int64, hideFor time.Duration) ([]int64, error) {
return HideMany(ctx, c.Conn, queueName, ids, hideFor)
}
// Delete deletes a single message from the queue.
// Returns true if the message existed.
func (c *Client) Delete(ctx context.Context, queueName string, id int64) (bool, error) {
return Delete(ctx, c.Conn, queueName, id)
}
// DeleteMany deletes multiple messages from the queue.
func (c *Client) DeleteMany(ctx context.Context, queueName string, ids []int64) ([]int64, error) {
return DeleteMany(ctx, c.Conn, queueName, ids)
}
// CreateTask creates a task definition.
func (c *Client) CreateTask(ctx context.Context, task *Task) error {
return CreateTask(ctx, c.Conn, task)
}
// GetTask retrieves task metadata by name.
func (c *Client) GetTask(ctx context.Context, taskName string) (*TaskInfo, error) {
return GetTask(ctx, c.Conn, taskName)
}
// ListTasks returns all tasks
func (c *Client) ListTasks(ctx context.Context) ([]*TaskInfo, error) {
return ListTasks(ctx, c.Conn)
}
// RunTask enqueues a task execution and returns a handle for monitoring
// progress and retrieving output.
func (c *Client) RunTask(ctx context.Context, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error) {
return RunTask(ctx, c.Conn, taskName, input, opts...)
}
// CancelTaskRun cancels a task run.
func (c *Client) CancelTaskRun(ctx context.Context, taskName string, runID int64, opts ...CancelOpts) (bool, error) {
return CancelTaskRun(ctx, c.Conn, taskName, runID, opts...)
}
// GetTaskRun retrieves a specific task run result by ID.
func (c *Client) GetTaskRun(ctx context.Context, taskName string, taskRunID int64) (*TaskRunInfo, error) {
return GetTaskRun(ctx, c.Conn, taskName, taskRunID)
}
// ListTaskRuns returns recent task runs for the specified task.
func (c *Client) ListTaskRuns(ctx context.Context, taskName string) ([]*TaskRunInfo, error) {
return ListTaskRuns(ctx, c.Conn, taskName)
}
// CreateFlow creates a flow definition.
func (c *Client) CreateFlow(ctx context.Context, flow *Flow) error {
return CreateFlow(ctx, c.Conn, flow)
}
// GetFlow retrieves flow metadata by name.
func (c *Client) GetFlow(ctx context.Context, flowName string) (*FlowInfo, error) {
return GetFlow(ctx, c.Conn, flowName)
}
// ListFlows returns all flows
func (c *Client) ListFlows(ctx context.Context) ([]*FlowInfo, error) {
return ListFlows(ctx, c.Conn)
}
// RunFlow enqueues a flow execution and returns a handle for monitoring.
func (c *Client) RunFlow(ctx context.Context, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error) {
return RunFlow(ctx, c.Conn, flowName, input, opts...)
}
// CancelFlowRun cancels a flow run.
func (c *Client) CancelFlowRun(ctx context.Context, flowName string, runID int64, opts ...CancelOpts) (bool, error) {
return CancelFlowRun(ctx, c.Conn, flowName, runID, opts...)
}
// GetFlowRun retrieves a specific flow run result by ID.
func (c *Client) GetFlowRun(ctx context.Context, flowName string, flowRunID int64) (*FlowRunInfo, error) {
return GetFlowRun(ctx, c.Conn, flowName, flowRunID)
}
// ListFlowRuns returns recent flow runs for the specified flow.
func (c *Client) ListFlowRuns(ctx context.Context, flowName string) ([]*FlowRunInfo, error) {
return ListFlowRuns(ctx, c.Conn, flowName)
}
// GetFlowRunSteps retrieves all step runs for a specific flow run.
func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error) {
return GetFlowRunSteps(ctx, c.Conn, flowName, flowRunID)
}
// SignalFlow delivers a signal to a waiting step in a flow run.
// The step must have been defined with a signal variant (e.g., NewStepWithSignal).
// Returns an error if the signal was already delivered or the step doesn't require a signal.
func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, input any) error {
return SignalFlow(ctx, c.Conn, flowName, flowRunID, stepName, input)
}
// PurgeTaskRuns deletes terminal task runs older than the given duration.
// See PurgeTaskRuns for details.
func (c *Client) PurgeTaskRuns(ctx context.Context, taskName string, olderThan time.Duration) (int, error) {
return PurgeTaskRuns(ctx, c.Conn, taskName, olderThan)
}
// PurgeFlowRuns deletes terminal flow runs older than the given duration.
// See PurgeFlowRuns for details.
func (c *Client) PurgeFlowRuns(ctx context.Context, flowName string, olderThan time.Duration) (int, error) {
return PurgeFlowRuns(ctx, c.Conn, flowName, olderThan)
}
// ClearTaskRuns deletes all runs for the given task regardless of status.
// See ClearTaskRuns for details.
func (c *Client) ClearTaskRuns(ctx context.Context, taskName string) (int, error) {
return ClearTaskRuns(ctx, c.Conn, taskName)
}
// ClearFlowRuns deletes all runs for the given flow regardless of status.
// See ClearFlowRuns for details.
func (c *Client) ClearFlowRuns(ctx context.Context, flowName string) (int, error) {
return ClearFlowRuns(ctx, c.Conn, flowName)
}
// BindTask subscribes a task to a topic pattern.
// When a message is published to a matching topic, a task run is created
// with the message body as input.
func (c *Client) BindTask(ctx context.Context, taskName string, pattern string) error {
return BindTask(ctx, c.Conn, taskName, pattern)
}
// UnbindTask removes a task trigger binding.
func (c *Client) UnbindTask(ctx context.Context, taskName string, pattern string) (bool, error) {
return UnbindTask(ctx, c.Conn, taskName, pattern)
}
// BindFlow subscribes a flow to a topic pattern.
// When a message is published to a matching topic, a flow run is created
// with the message body as input.
func (c *Client) BindFlow(ctx context.Context, flowName string, pattern string) error {
return BindFlow(ctx, c.Conn, flowName, pattern)
}
// UnbindFlow removes a flow trigger binding.
func (c *Client) UnbindFlow(ctx context.Context, flowName string, pattern string) (bool, error) {
return UnbindFlow(ctx, c.Conn, flowName, pattern)
}
// Notify sends an ephemeral notification via pg NOTIFY.
func (c *Client) Notify(ctx context.Context, topic, message string, opts ...NotifyOpts) error {
return Notify(ctx, c.Conn, topic, message, opts...)
}
// Reader continuously reads messages from a queue and processes them.
// Blocks until ctx is cancelled.
func (c *Client) Reader(ctx context.Context, queueName string, quantity int, hideFor time.Duration, handler ReaderHandler, opts ...ReadPollOpts) error {
return Reader(ctx, c.Conn, queueName, quantity, hideFor, handler, opts...)
}
// GC runs garbage collection and returns a summary report.
func (c *Client) GC(ctx context.Context) (*GCInfo, error) {
return GC(ctx, c.Conn)
}
// ListWorkers returns all registered workers.
func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error) {
return ListWorkers(ctx, c.Conn)
}
// CreateTaskSchedule creates a cron-based schedule for a task.
// cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily.
// opts are optional ScheduleOpt values configuring the schedule.
func (c *Client) CreateTaskSchedule(ctx context.Context, taskName, cronSpec string, opts ...ScheduleOpt) error {
return CreateTaskSchedule(ctx, c.Conn, taskName, cronSpec, opts...)
}
// CreateFlowSchedule creates a cron-based schedule for a flow.
// cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily.
// opts are optional ScheduleOpt values configuring the schedule.
func (c *Client) CreateFlowSchedule(ctx context.Context, flowName, cronSpec string, opts ...ScheduleOpt) error {
return CreateFlowSchedule(ctx, c.Conn, flowName, cronSpec, opts...)
}
// DeleteTaskSchedule removes the cron schedule for a task.
// It reports whether a schedule existed; deleting a missing schedule is a no-op.
func (c *Client) DeleteTaskSchedule(ctx context.Context, taskName string) (bool, error) {
return DeleteTaskSchedule(ctx, c.Conn, taskName)
}
// DeleteFlowSchedule removes the cron schedule for a flow.
// It reports whether a schedule existed; deleting a missing schedule is a no-op.
func (c *Client) DeleteFlowSchedule(ctx context.Context, flowName string) (bool, error) {
return DeleteFlowSchedule(ctx, c.Conn, flowName)
}
// ListTaskSchedules returns all task schedules ordered by next_run_at.
func (c *Client) ListTaskSchedules(ctx context.Context) ([]*TaskScheduleInfo, error) {
return ListTaskSchedules(ctx, c.Conn)
}
// ListFlowSchedules returns all flow schedules ordered by next_run_at.
func (c *Client) ListFlowSchedules(ctx context.Context) ([]*FlowScheduleInfo, error) {
return ListFlowSchedules(ctx, c.Conn)
}