Skip to content

Commit 0aef0bb

Browse files
committed
Add example compose file for dev testing
1 parent 5193453 commit 0aef0bb

File tree

5 files changed

+82
-20
lines changed

5 files changed

+82
-20
lines changed

Dockerfile-taskqueue-manager

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ COPY --from=backend-builder /app/backend/taskqueue-manager .
4646
COPY --from=frontend-builder /app/frontend/dist/spa/ ./frontend/
4747

4848
# Expose the service port
49-
EXPOSE 8080
49+
EXPOSE 8050
5050

5151
ENV WEB_STATIC_DIR='/app/frontend/'
5252
# Command to run the service

docker-compose.yml

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,63 @@
11
services:
2-
redis:
2+
taskqueue-redis:
33
image: 'redis:latest'
4-
ports:
5-
- "6379:6379"
4+
init: true
5+
healthcheck:
6+
test: ["CMD", "redis-cli", "ping"]
7+
interval: 5s
8+
timeout: 5s
9+
retries: 5
10+
start_period: 3s
611

712
taskqueue-manager:
8-
image: 'oshank/taskqueue-manager:latest'
13+
image: 'oshank/taskqueue-manager:main'
14+
init: true
915
ports:
1016
- '8050:8050'
1117
environment:
12-
REDIS_QUEUE_ADDR: 'redis:6379'
13-
REDIS_HEARTBEAT_ADDR: 'redis:6379'
18+
REDIS_QUEUE_ADDR: 'taskqueue-redis:6379'
19+
REDIS_HEARTBEAT_ADDR: 'taskqueue-redis:6379'
20+
REDIS_METRICS_BACKEND_ADDR: 'taskqueue-redis:6379'
21+
depends_on:
22+
taskqueue-redis:
23+
condition: service_healthy
24+
25+
example-worker:
26+
image: golang:1.23.0
27+
init: true
28+
deploy:
29+
replicas: 5
30+
working_dir: /workspace/taskqueue-go
31+
command: >
32+
bash -c "go build -o /app ./examples/basic-worker && /app --redis-addr taskqueue-redis:6379"
33+
volumes:
34+
- ${PWD}:/workspace/taskqueue-go:ro
35+
- go-build-cache:/root/.cache/go-build
36+
- go-mod-cache:/go/pkg/mod
1437
depends_on:
15-
- redis
38+
taskqueue-redis:
39+
condition: service_healthy
40+
41+
example-enqueuer:
42+
image: golang:1.23.0
43+
init: true
44+
deploy:
45+
replicas: 5
46+
working_dir: /workspace/taskqueue-go
47+
command: >
48+
bash -c "go build -o /app ./examples/enqueuer && /app --redis-addr taskqueue-redis:6379"
49+
volumes:
50+
- ${PWD}:/workspace/taskqueue-go:ro
51+
- go-build-cache:/root/.cache/go-build
52+
- go-mod-cache:/go/pkg/mod
53+
depends_on:
54+
taskqueue-redis:
55+
condition: service_healthy
56+
57+
networks:
58+
default:
59+
name: taskqueue
60+
61+
volumes:
62+
go-mod-cache:
63+
go-build-cache:

examples/basic-worker/main.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@ import (
2020

2121
const ns = "taskqueue"
2222

23-
var id = flag.String("id", "", "worker id")
23+
var (
24+
id = flag.String("id", "", "worker id")
25+
redisAddr = flag.String("redis-addr", ":6379", "redis address")
26+
)
2427

2528
func main() {
2629
flag.Parse()
2730

28-
rc := redis.NewClient(&redis.Options{Addr: ":6379"})
31+
rc := redis.NewClient(&redis.Options{Addr: *redisAddr})
2932

3033
worker := taskqueue.NewWorker(&taskqueue.WorkerOptions{
3134
ID: *id,
@@ -95,7 +98,7 @@ func main() {
9598

9699
worker.Stop()
97100

98-
fmt.Printf("taskqueue: shutting down. job processed email = %d payment = %d notification = %d\n",
101+
fmt.Printf("taskqueue: shutting down. job processed email=%d payment=%d notification=%d\n",
99102
emailProcessed.Load(), paymentProcessed.Load(), notifyProcessed.Load(),
100103
)
101104
}

examples/enqueuer/main.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"flag"
56
"fmt"
67
"log"
78
"math/rand"
@@ -15,16 +16,20 @@ import (
1516

1617
const ns = "taskqueue"
1718

19+
var redisAddr = flag.String("redis-addr", ":6379", "Redis address")
20+
1821
func main() {
19-
rc := redis.NewClient(&redis.Options{Addr: ":6379"})
22+
flag.Parse()
23+
24+
rc := redis.NewClient(&redis.Options{Addr: *redisAddr})
2025

2126
enq := redisq.NewQueue(rc, redisq.WithNamespace(ns))
2227

23-
n1 := queuePaymentJob(enq)
24-
n2 := queueEmailJob(enq)
28+
n1 := queueEmailJob(enq)
29+
n2 := queuePaymentJob(enq)
2530
n3 := queueNotificationJob(enq)
2631

27-
fmt.Println("Jobs Enqueued", "payment", n1, "email", n2, "notification", n3)
32+
fmt.Println("Jobs Enqueued", "email", n1, "payment", n2, "notification", n3, "total", n1+n2+n3)
2833
}
2934

3035
func queueNotificationJob(enq taskqueue.Enqueuer) int {
@@ -55,15 +60,21 @@ func queueNotificationJob(enq taskqueue.Enqueuer) int {
5560
return count
5661
}
5762

63+
type paymentPayload struct {
64+
Gateway string `json:"gateway"`
65+
Amount int `json:"amount"`
66+
WalletID int `json:"wallet_id"`
67+
}
68+
5869
func queuePaymentJob(enq taskqueue.Enqueuer) int {
5970
count := rand.Intn(100) + 100
6071

6172
for i := range count {
6273
paymentJob := taskqueue.NewJob()
63-
_ = paymentJob.JSONMarshalPayload(map[string]interface{}{
64-
"gateway": "razorpay",
65-
"amount": 500 + i,
66-
"wallet_id": "1",
74+
_ = paymentJob.JSONMarshalPayload(paymentPayload{
75+
Gateway: "razorpay",
76+
Amount: rand.Intn(1000) + 10000,
77+
WalletID: i,
6778
})
6879
if err := enq.Enqueue(context.Background(), paymentJob, &taskqueue.EnqueueOptions{
6980
QueueName: "payment_queue",

worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (w *Worker) Start(ctx context.Context) {
174174
}()
175175

176176
for _, h := range w.queueHandlers {
177-
go w.handleQueue(ctx, h)
177+
w.handleQueue(ctx, h)
178178
}
179179
}
180180

0 commit comments

Comments
 (0)