Skip to content

Commit ee43d7a

Browse files
committed
feat: add pubsub subscribe function
1 parent 3d6a2dd commit ee43d7a

File tree

8 files changed

+331
-10
lines changed

8 files changed

+331
-10
lines changed

app.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
)
1414

1515
type AppConfig struct {
16+
Name string
17+
Version string
1618
LogLevel zerolog.Level
1719
HealthCheck *HealthCheck
1820
WebApp *WebApp
@@ -43,6 +45,10 @@ func NewApp(config *AppConfig) *App {
4345
}
4446
}
4547

48+
func (app *App) AddWorker(worker *Worker) {
49+
app.Worker = worker
50+
}
51+
4652
func (app *App) Run() error {
4753
zerolog.SetGlobalLevel(app.LogLevel)
4854

@@ -100,6 +106,13 @@ func (app *App) Run() error {
100106
}()
101107
}
102108

109+
if app.Worker != nil {
110+
go func() {
111+
log.Info().Msg("starting worker")
112+
app.Worker.Start()
113+
}()
114+
}
115+
103116
<-shutdown
104117
log.Info().Msg("shutting down")
105118

examples/worker/worker

5.55 MB
Binary file not shown.

examples/worker_run_forever_task/main.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"fmt"
5+
"time"
56

67
"github.com/fandujar/golaze"
78
)
@@ -14,23 +15,38 @@ func main() {
1415
task := golaze.NewTask(
1516
&golaze.TaskConfig{
1617
Name: "task run forever",
17-
Repeat: -1,
18+
Repeat: 1,
1819
RepeatDelay: 10,
20+
Timeout: 15 * time.Second,
1921
Exec: func(state *golaze.State) error {
2022
if state.Get("counter") == nil {
2123
state.Set("counter", 0)
2224
}
2325

2426
state.Set("counter", state.Get("counter").(int)+1)
2527
fmt.Printf("running task example: %d\n", state.Get("counter"))
26-
// time.Sleep(10 * time.Second)
28+
time.Sleep(10 * time.Second)
2729
return nil
2830
},
2931
})
3032

3133
worker.AddTask(task)
3234
go worker.Start()
3335

36+
// print if task is running
37+
for {
38+
if task.IsRunning() {
39+
fmt.Println("task is running")
40+
} else {
41+
fmt.Println("task is not running")
42+
}
43+
44+
if <-task.Done {
45+
fmt.Println("task finished")
46+
break
47+
}
48+
}
49+
3450
// wait for the worker to finish
3551
<-worker.Shutdown
3652
fmt.Println("worker finished")

gcp/pubsub.go

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,73 @@
11
package gcp
22

3+
import (
4+
"context"
5+
"fmt"
6+
7+
"cloud.google.com/go/pubsub"
8+
)
9+
310
type PubSubClientConfig struct {
4-
ProjectID string
5-
TopicID string
11+
ProjectID string
12+
TopicID string
13+
SubscriptionID string
614
}
715

816
type PubSubClient struct {
917
*PubSubClientConfig
18+
client *pubsub.Client
1019
}
1120

12-
func NewPubSubClient(config *PubSubClientConfig) *PubSubClient {
21+
func NewPubSubClient(config *PubSubClientConfig) (*PubSubClient, error) {
22+
ctx := context.Background()
23+
client, err := pubsub.NewClient(ctx, config.ProjectID)
24+
if err != nil {
25+
return nil, fmt.Errorf("pubsub.NewClient: %v", err)
26+
}
27+
1328
return &PubSubClient{
1429
config,
15-
}
30+
client,
31+
}, nil
1632
}
1733

1834
func (client *PubSubClient) Publish(message string) error {
1935
return nil
2036
}
2137

22-
func (client *PubSubClient) Subscribe() (chan string, error) {
23-
return nil, nil
38+
func (client *PubSubClient) Subscribe(ctx context.Context) (chan string, error) {
39+
sub := client.client.Subscription(client.SubscriptionID)
40+
41+
// Create the subscription if it doesn't exist.
42+
exists, err := sub.Exists(ctx)
43+
if err != nil {
44+
return nil, err
45+
}
46+
47+
if !exists {
48+
_, err := client.client.CreateSubscription(ctx, client.SubscriptionID, pubsub.SubscriptionConfig{
49+
Topic: client.client.Topic(client.TopicID),
50+
})
51+
if err != nil {
52+
return nil, fmt.Errorf("failed to create subscription: %v", err)
53+
}
54+
}
55+
56+
// Create a channel to pass messages received from Pub/Sub.
57+
msgCh := make(chan string)
58+
59+
// Start a goroutine to receive messages.
60+
go func() {
61+
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
62+
// Forward message data to channel and then acknowledge the message.
63+
msgCh <- string(msg.Data)
64+
msg.Ack()
65+
})
66+
if err != nil {
67+
// Handling error, for example logging it. Not closing channel here to avoid panic in case of send on closed channel.
68+
fmt.Printf("Error receiving messages: %v\n", err)
69+
}
70+
}()
71+
72+
return msgCh, nil
2473
}

go.mod

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,41 @@ module github.com/fandujar/golaze
33
go 1.22.1
44

55
require (
6+
cloud.google.com/go v0.112.1 // indirect
7+
cloud.google.com/go/compute v1.24.0 // indirect
8+
cloud.google.com/go/compute/metadata v0.2.3 // indirect
9+
cloud.google.com/go/iam v1.1.6 // indirect
10+
cloud.google.com/go/pubsub v1.37.0 // indirect
11+
github.com/felixge/httpsnoop v1.0.4 // indirect
612
github.com/go-chi/chi/v5 v5.0.12 // indirect
13+
github.com/go-logr/logr v1.4.1 // indirect
14+
github.com/go-logr/stdr v1.2.2 // indirect
15+
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
16+
github.com/golang/protobuf v1.5.3 // indirect
17+
github.com/google/s2a-go v0.1.7 // indirect
18+
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
19+
github.com/googleapis/gax-go/v2 v2.12.2 // indirect
720
github.com/mattn/go-colorable v0.1.13 // indirect
821
github.com/mattn/go-isatty v0.0.19 // indirect
922
github.com/rs/zerolog v1.32.0 // indirect
10-
golang.org/x/sys v0.12.0 // indirect
23+
go.opencensus.io v0.24.0 // indirect
24+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 // indirect
25+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.48.0 // indirect
26+
go.opentelemetry.io/otel v1.23.0 // indirect
27+
go.opentelemetry.io/otel/metric v1.23.0 // indirect
28+
go.opentelemetry.io/otel/trace v1.23.0 // indirect
29+
golang.org/x/crypto v0.19.0 // indirect
30+
golang.org/x/net v0.21.0 // indirect
31+
golang.org/x/oauth2 v0.17.0 // indirect
32+
golang.org/x/sync v0.6.0 // indirect
33+
golang.org/x/sys v0.17.0 // indirect
34+
golang.org/x/text v0.14.0 // indirect
35+
golang.org/x/time v0.5.0 // indirect
36+
google.golang.org/api v0.167.0 // indirect
37+
google.golang.org/appengine v1.6.8 // indirect
38+
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
39+
google.golang.org/genproto/googleapis/api v0.0.0-20240304161311-37d4d3c04a78 // indirect
40+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240228224816-df926f6c8641 // indirect
41+
google.golang.org/grpc v1.62.0 // indirect
42+
google.golang.org/protobuf v1.32.0 // indirect
1143
)

0 commit comments

Comments
 (0)