Skip to content
This repository was archived by the owner on Mar 15, 2022. It is now read-only.

Commit f485b57

Browse files
committed
Added Google PubSub sink
Signed-off-by: live-wire <[email protected]>
1 parent edc1251 commit f485b57

File tree

2 files changed

+112
-0
lines changed

2 files changed

+112
-0
lines changed

sinks/interfaces.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package sinks
1818

1919
import (
20+
"context"
2021
"errors"
2122

2223
"github.com/golang/glog"
@@ -217,6 +218,27 @@ func ManufactureSink() (e EventSinkInterface) {
217218
}
218219
go eh.Run(make(chan bool))
219220
return eh
221+
case "pubsub":
222+
projectID := viper.GetString("pubSubProjectID")
223+
if projectID == "" {
224+
panic("pubsub sink specified but pubSubProjectID not specified")
225+
}
226+
topic := viper.GetString("pubSubTopic")
227+
if topic == "" {
228+
panic("pubsub sink specified but pubSubTopic not specified")
229+
}
230+
deadLetterTopic := viper.GetString("pubSubDeadLetterTopic")
231+
var pubSubSink *PubSubSink
232+
var err error
233+
if deadLetterTopic == "" {
234+
pubSubSink, err = NewPubSubSink(context.TODO(), projectID, topic)
235+
} else {
236+
pubSubSink, err = NewPubSubSinkWithDeadLetter(context.TODO(), projectID, topic, deadLetterTopic)
237+
}
238+
if err != nil {
239+
panic(err.Error())
240+
}
241+
return pubSubSink
220242
// case "logfile"
221243
default:
222244
err := errors.New("Invalid Sink Specified")

sinks/pubsubsink.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package sinks
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
8+
"cloud.google.com/go/pubsub"
9+
v1 "k8s.io/api/core/v1"
10+
)
11+
12+
type PubSubSink struct {
13+
ProjectId string
14+
client *pubsub.Client
15+
topic *pubsub.Topic
16+
deadLetter *pubsub.Topic
17+
}
18+
19+
func NewPubSubSink(ctx context.Context, projectId string, topic string) (*PubSubSink, error) {
20+
client, err := pubsub.NewClient(ctx, projectId)
21+
if err != nil {
22+
return nil, err
23+
}
24+
clientTopic := client.Topic(topic)
25+
ok, err := clientTopic.Exists(ctx)
26+
if err != nil {
27+
return nil, err
28+
}
29+
if !ok {
30+
return nil, fmt.Errorf("topic %s does not exist", topic)
31+
}
32+
33+
return &PubSubSink{
34+
ProjectId: projectId,
35+
topic: clientTopic,
36+
deadLetter: nil,
37+
}, nil
38+
}
39+
40+
func NewPubSubSinkWithDeadLetter(ctx context.Context, projectId string, topic string, deadLetterTopic string) (*PubSubSink, error) {
41+
ps, err := NewPubSubSink(ctx, projectId, topic)
42+
if err != nil {
43+
return nil, err
44+
}
45+
clientDeadLetterTopic := ps.client.Topic(deadLetterTopic)
46+
ok, err := clientDeadLetterTopic.Exists(ctx)
47+
if err != nil {
48+
return nil, err
49+
}
50+
if !ok {
51+
return nil, fmt.Errorf("dead-letter topic %s does not exist", deadLetterTopic)
52+
}
53+
ps.deadLetter = clientDeadLetterTopic
54+
return ps, nil
55+
}
56+
57+
func (x *PubSubSink) publishAsync(ctx context.Context, message []byte, topic *pubsub.Topic) *pubsub.PublishResult {
58+
return topic.Publish(ctx, &pubsub.Message{
59+
Data: message,
60+
})
61+
}
62+
63+
func (x *PubSubSink) publishSync(ctx context.Context, message []byte, topic *pubsub.Topic) (string, error) {
64+
res := x.publishAsync(ctx, message, topic)
65+
msgID, err := res.Get(ctx) // blocks until published
66+
if err != nil {
67+
return "", err
68+
}
69+
return msgID, nil
70+
}
71+
72+
func (x *PubSubSink) Cleanup() {
73+
x.topic.Stop()
74+
if x.deadLetter != nil {
75+
x.deadLetter.Stop()
76+
}
77+
}
78+
79+
// UpdateEvents implements EventSinkInterface.UpdateEvents
80+
func (x *PubSubSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event) {
81+
eData := NewEventData(eNew, eOld)
82+
83+
if eJSONBytes, err := json.Marshal(eData); err == nil {
84+
x.publishAsync(context.Background(), eJSONBytes, x.topic)
85+
} else {
86+
if x.deadLetter != nil {
87+
x.publishAsync(context.Background(), eJSONBytes, x.deadLetter)
88+
}
89+
}
90+
}

0 commit comments

Comments
 (0)