Skip to content

Commit 210c4c2

Browse files
committed
chore: update dependencies and refactor message handling
- Update `github.com/golang-queue/queue` dependency from v0.2.1 to v0.3.0 - Remove indirect dependencies `github.com/vmihailenco/msgpack/v5` and `github.com/vmihailenco/tagparser/v2` - Change function signatures to use `core.TaskMessage` instead of `core.QueuedMessage` - Modify test cases to use `core.TaskMessage` and update payload handling accordingly - Rename `Payload` field to `Body` in `job.Message` struct Signed-off-by: appleboy <[email protected]>
1 parent ad78b30 commit 210c4c2

File tree

5 files changed

+20
-26
lines changed

5 files changed

+20
-26
lines changed

go.mod

+1-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/golang-queue/nats
33
go 1.22
44

55
require (
6-
github.com/golang-queue/queue v0.2.1
6+
github.com/golang-queue/queue v0.3.0
77
github.com/nats-io/nats.go v1.38.0
88
github.com/stretchr/testify v1.10.0
99
github.com/testcontainers/testcontainers-go v0.35.0
@@ -52,8 +52,6 @@ require (
5252
github.com/sirupsen/logrus v1.9.3 // indirect
5353
github.com/tklauser/go-sysconf v0.3.12 // indirect
5454
github.com/tklauser/numcpus v0.6.1 // indirect
55-
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
56-
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
5755
github.com/yusufpapurcu/wmi v1.2.3 // indirect
5856
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
5957
go.opentelemetry.io/otel v1.24.0 // indirect

go.sum

+2-6
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
4242
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
4343
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
4444
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
45-
github.com/golang-queue/queue v0.2.1 h1:W0JeHniILD4oxvs+E/hDuV0zlnQh0qeTy73BAjrigCw=
46-
github.com/golang-queue/queue v0.2.1/go.mod h1:eUZ3HH9GbhoEKQSlxCBQ4pPXeadbJ7QKBMZ0kIZNDHI=
45+
github.com/golang-queue/queue v0.3.0 h1:gyBLNT9EDOsChazYScp8iLiwLfG0SdnCDmNUybcHig4=
46+
github.com/golang-queue/queue v0.3.0/go.mod h1:SkjMwz1TjxZOrF7kABvbar1CagcMxwRtXt5Tx00wb4g=
4747
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
4848
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
4949
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
@@ -121,10 +121,6 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
121121
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
122122
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
123123
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
124-
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
125-
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
126-
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
127-
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
128124
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
129125
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
130126
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=

nats.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (w *Worker) startConsumer() (err error) {
7373
}
7474

7575
// Run start the worker
76-
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
76+
func (w *Worker) Run(ctx context.Context, task core.TaskMessage) error {
7777
return w.opts.runFunc(ctx, task)
7878
}
7979

@@ -101,7 +101,7 @@ func (w *Worker) Shutdown() error {
101101
}
102102

103103
// Queue send notification to queue
104-
func (w *Worker) Queue(job core.QueuedMessage) error {
104+
func (w *Worker) Queue(job core.TaskMessage) error {
105105
if atomic.LoadInt32(&w.stopFlag) == 1 {
106106
return queue.ErrQueueShutdown
107107
}
@@ -115,7 +115,7 @@ func (w *Worker) Queue(job core.QueuedMessage) error {
115115
}
116116

117117
// Request a new task
118-
func (w *Worker) Request() (core.QueuedMessage, error) {
118+
func (w *Worker) Request() (core.TaskMessage, error) {
119119
_ = w.startConsumer()
120120
clock := 0
121121
loop:

nats_test.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ func TestCustomFuncAndWait(t *testing.T) {
133133
WithAddr(endpoint),
134134
WithSubj("test"),
135135
WithQueue("test"),
136-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
137-
log.Println("show message: " + string(m.Bytes()))
136+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
137+
log.Println("show message: " + string(m.Payload()))
138138
time.Sleep(500 * time.Millisecond)
139139
return nil
140140
}),
@@ -192,11 +192,11 @@ func TestJobReachTimeout(t *testing.T) {
192192
WithAddr(endpoint),
193193
WithSubj("JobReachTimeout"),
194194
WithQueue("test"),
195-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
195+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
196196
for {
197197
select {
198198
case <-ctx.Done():
199-
log.Println("get data:", string(m.Bytes()))
199+
log.Println("get data:", string(m.Payload()))
200200
if errors.Is(ctx.Err(), context.Canceled) {
201201
log.Println("queue has been shutdown and cancel the job")
202202
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
@@ -236,11 +236,11 @@ func TestCancelJobAfterShutdown(t *testing.T) {
236236
WithSubj("CancelJob"),
237237
WithQueue("test"),
238238
WithLogger(queue.NewLogger()),
239-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
239+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
240240
for {
241241
select {
242242
case <-ctx.Done():
243-
log.Println("get data:", string(m.Bytes()))
243+
log.Println("get data:", string(m.Payload()))
244244
if errors.Is(ctx.Err(), context.Canceled) {
245245
log.Println("queue has been shutdown and cancel the job")
246246
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
@@ -280,19 +280,19 @@ func TestGoroutineLeak(t *testing.T) {
280280
WithSubj("GoroutineLeak"),
281281
WithQueue("test"),
282282
WithLogger(queue.NewEmptyLogger()),
283-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
283+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
284284
for {
285285
select {
286286
case <-ctx.Done():
287-
log.Println("get data:", string(m.Bytes()))
287+
log.Println("get data:", string(m.Payload()))
288288
if errors.Is(ctx.Err(), context.Canceled) {
289289
log.Println("queue has been shutdown and cancel the job")
290290
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
291291
log.Println("job deadline exceeded")
292292
}
293293
return nil
294294
default:
295-
log.Println("get data:", string(m.Bytes()))
295+
log.Println("get data:", string(m.Payload()))
296296
time.Sleep(50 * time.Millisecond)
297297
return nil
298298
}
@@ -327,7 +327,7 @@ func TestGoroutinePanic(t *testing.T) {
327327
w := NewWorker(
328328
WithAddr(endpoint),
329329
WithSubj("GoroutinePanic"),
330-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
330+
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
331331
panic("missing something")
332332
}),
333333
)
@@ -351,7 +351,7 @@ func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) {
351351
natsC, endpoint := setupNatsContainer(ctx, t)
352352
defer testcontainers.CleanupContainer(t, natsC)
353353
job := &job.Message{
354-
Payload: []byte("foo"),
354+
Body: []byte("foo"),
355355
}
356356
w := NewWorker(
357357
WithAddr(endpoint),

options.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
type Option func(*options)
1515

1616
type options struct {
17-
runFunc func(context.Context, core.QueuedMessage) error
17+
runFunc func(context.Context, core.TaskMessage) error
1818
logger queue.Logger
1919
addr string
2020
subj string
@@ -45,7 +45,7 @@ func WithQueue(queue string) Option {
4545
}
4646

4747
// WithRunFunc setup the run func of queue
48-
func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option {
48+
func WithRunFunc(fn func(context.Context, core.TaskMessage) error) Option {
4949
return func(w *options) {
5050
w.runFunc = fn
5151
}
@@ -64,7 +64,7 @@ func newOptions(opts ...Option) options {
6464
subj: "foobar",
6565
queue: "foobar",
6666
logger: queue.NewLogger(),
67-
runFunc: func(context.Context, core.QueuedMessage) error {
67+
runFunc: func(context.Context, core.TaskMessage) error {
6868
return nil
6969
},
7070
}

0 commit comments

Comments
 (0)