Skip to content

Commit ba2208f

Browse files
authored
Broker Interface (#22)
1 parent d15890c commit ba2208f

39 files changed

Lines changed: 2101 additions & 564 deletions

.env.template

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DATABASE_URL=postgres://radish:turnip42@localhost:5432/radish_test?sslmode=disable

broker/broker.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package broker
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"time"
8+
9+
"go.rtnl.ai/radish/broker/mock"
10+
"go.rtnl.ai/radish/broker/postgres"
11+
"go.rtnl.ai/radish/broker/sqlite"
12+
"go.rtnl.ai/radish/models"
13+
"go.rtnl.ai/x/dsn"
14+
)
15+
16+
type Broker interface {
17+
io.Closer
18+
Info(ctx context.Context, id int64) (task *models.TaskMeta, err error)
19+
Enqueue(ctx context.Context, kind string, payload []byte) (id int64, err error)
20+
Schedule(ctx context.Context, kind string, payload []byte, executeAfter time.Time) (id int64, err error)
21+
Dequeue(ctx context.Context, ttl time.Duration) (task *models.TaskMeta, err error)
22+
Cancel(ctx context.Context, id int64) (err error)
23+
Fail(ctx context.Context, id int64, errors models.AttemptErrors) (err error)
24+
Retry(ctx context.Context, id int64, errors models.AttemptErrors, delay time.Duration) (err error)
25+
Success(ctx context.Context, id int64) (err error)
26+
Vacuum(ctx context.Context, retention time.Duration) (err error)
27+
}
28+
29+
func Connect(databaseURL string) (b Broker, err error) {
30+
var uri *dsn.DSN
31+
if uri, err = dsn.Parse(databaseURL); err != nil {
32+
return nil, fmt.Errorf("could not parse broker connection string: %w", err)
33+
}
34+
35+
switch uri.Provider {
36+
case dsn.Postgres:
37+
return postgres.Connect(uri)
38+
case dsn.SQLite3:
39+
return sqlite.Connect(uri)
40+
case "mock":
41+
return mock.Connect(uri)
42+
default:
43+
return nil, fmt.Errorf("unsupported broker provider: %s", uri.Provider)
44+
}
45+
}
Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,25 @@
1-
package db
1+
package errors
22

33
import (
4-
"database/sql"
54
"errors"
6-
"fmt"
7-
8-
"github.com/lib/pq"
95
)
106

117
var (
128
// Database constraint errors
9+
ErrBusy = errors.New("database is busy and cannot acquire lock")
1310
ErrAlreadyExists = errors.New("record already exists in the database")
1411
ErrConstraint = errors.New("a database constraint was violated")
1512
ErrDBReference = errors.New("missing id of foreign key reference")
1613
ErrDeleteRestricted = errors.New("cannot delete record because other records depend on it")
1714
ErrNotFound = errors.New("record not found in the database")
1815
ErrNotNull = errors.New("cannot set a required field to null")
1916
ErrReadOnly = errors.New("database or transaction is read-only")
20-
ErrAlreadyConnected = errors.New("database connection already established")
2117
ErrNotConnected = errors.New("database connection not established")
2218
ErrTaskNotCancelable = errors.New("task cannot be cancelled")
2319
)
2420

25-
func dbe(err error) error {
26-
if err == nil {
27-
return nil
28-
}
29-
30-
if errors.Is(err, sql.ErrNoRows) {
31-
return ErrNotFound
32-
}
33-
34-
var pgErr *pq.Error
35-
if errors.As(err, &pgErr) {
36-
switch pgErr.Code {
37-
case "23505":
38-
return ErrAlreadyExists
39-
case "23503":
40-
return ErrDBReference
41-
case "23502":
42-
return ErrNotNull
43-
case "23000", "23514":
44-
return ErrConstraint
45-
case "23001":
46-
return ErrDeleteRestricted
47-
case "25006":
48-
return ErrReadOnly
49-
}
50-
}
51-
52-
return fmt.Errorf("pgx error: %w", err)
53-
}
21+
var (
22+
Is = errors.Is
23+
As = errors.As
24+
Join = errors.Join
25+
)

broker/mock/broker.go

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
package mock
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"go.rtnl.ai/radish/models"
12+
"go.rtnl.ai/x/dsn"
13+
)
14+
15+
const (
16+
Close = "Close"
17+
Info = "Info"
18+
Enqueue = "Enqueue"
19+
Schedule = "Schedule"
20+
Dequeue = "Dequeue"
21+
Cancel = "Cancel"
22+
Fail = "Fail"
23+
Retry = "Retry"
24+
Success = "Success"
25+
Vacuum = "Vacuum"
26+
)
27+
28+
var ErrNoMockFunction = errors.New("no mock function provided")
29+
30+
func ErrNoMock(name string) error {
31+
return fmt.Errorf("no mock function provided for %s: %w", name, ErrNoMockFunction)
32+
}
33+
34+
type Broker struct {
35+
OnClose func() error
36+
OnInfo func(ctx context.Context, id int64) (task *models.TaskMeta, err error)
37+
OnEnqueue func(ctx context.Context, kind string, payload []byte) (id int64, err error)
38+
OnSchedule func(ctx context.Context, kind string, payload []byte, executeAfter time.Time) (id int64, err error)
39+
OnDequeue func(ctx context.Context, ttl time.Duration) (task *models.TaskMeta, err error)
40+
OnCancel func(ctx context.Context, id int64) (err error)
41+
OnFail func(ctx context.Context, id int64, errors models.AttemptErrors) (err error)
42+
OnRetry func(ctx context.Context, id int64, errors models.AttemptErrors, delay time.Duration) (err error)
43+
OnSuccess func(ctx context.Context, id int64) (err error)
44+
OnVacuum func(ctx context.Context, retention time.Duration) (err error)
45+
46+
mu sync.Mutex
47+
calls map[string]int
48+
}
49+
50+
func Connect(uri *dsn.DSN) (b *Broker, err error) {
51+
return &Broker{}, nil
52+
}
53+
54+
func (b *Broker) Reset() {
55+
b.mu.Lock()
56+
b.OnClose = nil
57+
b.OnInfo = nil
58+
b.OnEnqueue = nil
59+
b.OnSchedule = nil
60+
b.OnDequeue = nil
61+
b.OnCancel = nil
62+
b.OnFail = nil
63+
b.OnRetry = nil
64+
b.OnSuccess = nil
65+
b.OnVacuum = nil
66+
b.calls = nil
67+
b.mu.Unlock()
68+
}
69+
70+
func (b *Broker) ErrorOn(name string, err error) {
71+
b.mu.Lock()
72+
defer b.mu.Unlock()
73+
74+
switch name {
75+
case Close:
76+
b.OnClose = func() error {
77+
return err
78+
}
79+
case Info:
80+
b.OnInfo = func(ctx context.Context, id int64) (task *models.TaskMeta, err error) {
81+
return nil, err
82+
}
83+
case Enqueue:
84+
b.OnEnqueue = func(ctx context.Context, kind string, payload []byte) (id int64, err error) {
85+
return 0, err
86+
}
87+
case Schedule:
88+
b.OnSchedule = func(ctx context.Context, kind string, payload []byte, executeAfter time.Time) (id int64, err error) {
89+
return 0, err
90+
}
91+
case Dequeue:
92+
b.OnDequeue = func(ctx context.Context, ttl time.Duration) (task *models.TaskMeta, err error) {
93+
return nil, err
94+
}
95+
case Cancel:
96+
b.OnCancel = func(ctx context.Context, id int64) (err error) {
97+
return err
98+
}
99+
case Fail:
100+
b.OnFail = func(ctx context.Context, id int64, errors models.AttemptErrors) (err error) {
101+
return err
102+
}
103+
case Retry:
104+
b.OnRetry = func(ctx context.Context, id int64, errors models.AttemptErrors, delay time.Duration) (err error) {
105+
return err
106+
}
107+
case Success:
108+
b.OnSuccess = func(ctx context.Context, id int64) (err error) {
109+
return err
110+
}
111+
case Vacuum:
112+
b.OnVacuum = func(ctx context.Context, retention time.Duration) (err error) {
113+
return err
114+
}
115+
default:
116+
panic(fmt.Sprintf("unknown broker method: %s", name))
117+
}
118+
}
119+
120+
func (b *Broker) AssertNCalls(t *testing.T, name string, n int) {
121+
t.Helper()
122+
b.mu.Lock()
123+
defer b.mu.Unlock()
124+
125+
if b.calls == nil {
126+
t.Fatalf("no calls were made to broker.%s()", name)
127+
}
128+
129+
if count, ok := b.calls[name]; ok && count != n {
130+
t.Fatalf("broker.%s() was called %d times, expected %d", name, count, n)
131+
}
132+
}
133+
134+
func (b *Broker) AssertCalled(t *testing.T, name string) {
135+
t.Helper()
136+
b.mu.Lock()
137+
defer b.mu.Unlock()
138+
139+
if b.calls == nil {
140+
t.Fatalf("no calls were made to broker.%s()", name)
141+
}
142+
143+
if count, ok := b.calls[name]; !ok || count == 0 {
144+
t.Fatalf("broker.%s() was not called", name)
145+
}
146+
}
147+
148+
func (b *Broker) AssertNotCalled(t *testing.T, name string) {
149+
t.Helper()
150+
b.mu.Lock()
151+
defer b.mu.Unlock()
152+
153+
if b.calls == nil {
154+
return
155+
}
156+
157+
if count, ok := b.calls[name]; ok && count > 0 {
158+
t.Fatalf("broker.%s() was called %d times", name, count)
159+
}
160+
}
161+
162+
func (b *Broker) Close() error {
163+
b.incr(Close)
164+
if b.OnClose != nil {
165+
return b.OnClose()
166+
}
167+
return ErrNoMock(Close)
168+
}
169+
170+
func (b *Broker) Info(ctx context.Context, id int64) (task *models.TaskMeta, err error) {
171+
b.incr(Info)
172+
if b.OnInfo != nil {
173+
return b.OnInfo(ctx, id)
174+
}
175+
return nil, ErrNoMock(Info)
176+
}
177+
178+
func (b *Broker) Enqueue(ctx context.Context, kind string, payload []byte) (id int64, err error) {
179+
b.incr(Enqueue)
180+
if b.OnEnqueue != nil {
181+
return b.OnEnqueue(ctx, kind, payload)
182+
}
183+
return 0, ErrNoMock(Enqueue)
184+
}
185+
186+
func (b *Broker) Schedule(ctx context.Context, kind string, payload []byte, executeAfter time.Time) (id int64, err error) {
187+
b.incr(Schedule)
188+
if b.OnSchedule != nil {
189+
return b.OnSchedule(ctx, kind, payload, executeAfter)
190+
}
191+
return 0, ErrNoMock(Schedule)
192+
}
193+
194+
func (b *Broker) Dequeue(ctx context.Context, ttl time.Duration) (task *models.TaskMeta, err error) {
195+
b.incr(Dequeue)
196+
if b.OnDequeue != nil {
197+
return b.OnDequeue(ctx, ttl)
198+
}
199+
return nil, ErrNoMock(Dequeue)
200+
}
201+
202+
func (b *Broker) Cancel(ctx context.Context, id int64) (err error) {
203+
b.incr(Cancel)
204+
if b.OnCancel != nil {
205+
return b.OnCancel(ctx, id)
206+
}
207+
return ErrNoMock(Cancel)
208+
}
209+
210+
func (b *Broker) Fail(ctx context.Context, id int64, errors models.AttemptErrors) (err error) {
211+
b.incr(Fail)
212+
if b.OnFail != nil {
213+
return b.OnFail(ctx, id, errors)
214+
}
215+
return ErrNoMock(Fail)
216+
}
217+
218+
func (b *Broker) Retry(ctx context.Context, id int64, errors models.AttemptErrors, delay time.Duration) (err error) {
219+
b.incr(Retry)
220+
if b.OnRetry != nil {
221+
return b.OnRetry(ctx, id, errors, delay)
222+
}
223+
return ErrNoMock(Retry)
224+
}
225+
226+
func (b *Broker) Success(ctx context.Context, id int64) (err error) {
227+
b.incr(Success)
228+
if b.OnSuccess != nil {
229+
return b.OnSuccess(ctx, id)
230+
}
231+
return ErrNoMock(Success)
232+
}
233+
234+
func (b *Broker) Vacuum(ctx context.Context, retention time.Duration) (err error) {
235+
b.incr(Vacuum)
236+
if b.OnVacuum != nil {
237+
return b.OnVacuum(ctx, retention)
238+
}
239+
return ErrNoMock(Vacuum)
240+
}
241+
242+
func (b *Broker) incr(name string) {
243+
b.mu.Lock()
244+
if b.calls == nil {
245+
b.calls = make(map[string]int)
246+
}
247+
b.calls[name]++
248+
b.mu.Unlock()
249+
}

0 commit comments

Comments
 (0)