Skip to content

Commit 470fa46

Browse files
committed
feat: redirect bluesky events to int-bluesky
1 parent 35dcca5 commit 470fa46

27 files changed

+691
-530
lines changed

Makefile

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ proto:
99
go install github.com/golang/protobuf/protoc-gen-go@latest
1010
go install google.golang.org/grpc/cmd/[email protected]
1111
PATH=${PATH}:~/go/bin protoc --go_out=plugins=grpc:. --go_opt=paths=source_relative \
12-
api/grpc/*.proto
12+
api/grpc/*.proto \
13+
api/grpc/events/*.proto
1314

1415
vet: proto
1516
go vet

api/grpc/events/client_mock.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"google.golang.org/grpc"
6+
"google.golang.org/grpc/codes"
7+
"google.golang.org/grpc/status"
8+
)
9+
10+
type clientMock struct {
11+
}
12+
13+
func NewClientMock() ServiceClient {
14+
return clientMock{}
15+
}
16+
17+
func (cm clientMock) SetStream(ctx context.Context, req *SetStreamRequest, opts ...grpc.CallOption) (resp *SetStreamResponse, err error) {
18+
switch req.Topic {
19+
case "":
20+
err = status.Error(codes.InvalidArgument, "empty topic")
21+
case "fail":
22+
err = status.Error(codes.Internal, "internal failure")
23+
default:
24+
resp = &SetStreamResponse{}
25+
}
26+
return
27+
}
28+
29+
func (cm clientMock) Publish(ctx context.Context, opts ...grpc.CallOption) (Service_PublishClient, error) {
30+
return newPublishStreamMock(), nil
31+
}

api/grpc/events/client_pool.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package events
2+
3+
import (
4+
"context"
5+
grpcpool "github.com/processout/grpc-go-pool"
6+
"google.golang.org/grpc"
7+
)
8+
9+
type clientPool struct {
10+
connPool *grpcpool.Pool
11+
}
12+
13+
func NewClientPool(connPool *grpcpool.Pool) ServiceClient {
14+
return clientPool{
15+
connPool: connPool,
16+
}
17+
}
18+
19+
func (cp clientPool) SetStream(ctx context.Context, req *SetStreamRequest, opts ...grpc.CallOption) (resp *SetStreamResponse, err error) {
20+
var conn *grpcpool.ClientConn
21+
conn, err = cp.connPool.Get(ctx)
22+
if err == nil {
23+
defer conn.Close()
24+
}
25+
var client ServiceClient
26+
if err == nil {
27+
client = NewServiceClient(conn)
28+
resp, err = client.SetStream(ctx, req, opts...)
29+
}
30+
return
31+
}
32+
33+
func (cp clientPool) Publish(ctx context.Context, opts ...grpc.CallOption) (stream Service_PublishClient, err error) {
34+
var conn *grpcpool.ClientConn
35+
conn, err = cp.connPool.Get(ctx)
36+
var c *grpc.ClientConn
37+
if err == nil {
38+
c = conn.ClientConn
39+
conn.Close() // return back to the conn pool immediately
40+
}
41+
var client ServiceClient
42+
if err == nil {
43+
client = NewServiceClient(c)
44+
stream, err = client.Publish(ctx, opts...)
45+
}
46+
return
47+
}

api/grpc/events/events_writer.go

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
6+
"io"
7+
)
8+
9+
type Writer interface {
10+
io.Closer
11+
12+
// Write writes the specified messages and returns the accepted count preserving the order.
13+
// Returns io.EOF if the destination file/connection/whatever is closed.
14+
Write(ctx context.Context, msgs []*pb.CloudEvent) (ackCount uint32, err error)
15+
}

api/grpc/events/logging.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
)
8+
9+
type loggingMiddleware struct {
10+
svc Service
11+
log *slog.Logger
12+
}
13+
14+
func NewLoggingMiddleware(svc Service, log *slog.Logger) Service {
15+
return loggingMiddleware{
16+
svc: svc,
17+
log: log,
18+
}
19+
}
20+
21+
func (lm loggingMiddleware) SetStream(ctx context.Context, topic string, limit uint32) (err error) {
22+
err = lm.svc.SetStream(ctx, topic, limit)
23+
lm.log.Debug(fmt.Sprintf("events.SetStream(topic=%s, limit=%d): err=%s", topic, limit, err))
24+
return
25+
}
26+
27+
func (lm loggingMiddleware) NewPublisher(ctx context.Context, topic string) (p Writer, err error) {
28+
p, err = lm.svc.NewPublisher(ctx, topic)
29+
lm.log.Debug(fmt.Sprintf("events.Publish(topic=%s): err=%s", topic, err))
30+
return
31+
}
+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"google.golang.org/grpc/codes"
6+
"google.golang.org/grpc/metadata"
7+
"google.golang.org/grpc/status"
8+
"io"
9+
)
10+
11+
type publishStreamMock struct {
12+
lastReq *PublishRequest
13+
lastErr error
14+
}
15+
16+
func newPublishStreamMock() Service_PublishClient {
17+
return &publishStreamMock{}
18+
}
19+
20+
func (sm *publishStreamMock) Send(req *PublishRequest) (err error) {
21+
switch sm.lastErr {
22+
case nil:
23+
switch req.Topic {
24+
case "recv_fail":
25+
sm.lastErr = status.Error(codes.Internal, "send failure")
26+
case "send_eof":
27+
sm.lastErr = io.EOF
28+
err = io.EOF
29+
case "recv_eof":
30+
sm.lastErr = io.EOF
31+
case "missing":
32+
sm.lastErr = status.Error(codes.NotFound, "queue missing")
33+
default:
34+
sm.lastReq = req
35+
}
36+
default:
37+
err = io.EOF
38+
}
39+
return
40+
}
41+
42+
func (sm *publishStreamMock) Recv() (resp *PublishResponse, err error) {
43+
resp = &PublishResponse{}
44+
switch sm.lastErr {
45+
case nil:
46+
resp.AckCount = uint32(len(sm.lastReq.Evts))
47+
if resp.AckCount > 2 {
48+
resp.AckCount = 2
49+
}
50+
default:
51+
err = sm.lastErr
52+
}
53+
return
54+
}
55+
56+
func (sm *publishStreamMock) Header() (metadata.MD, error) {
57+
//TODO implement me
58+
panic("implement me")
59+
}
60+
61+
func (sm *publishStreamMock) Trailer() metadata.MD {
62+
//TODO implement me
63+
panic("implement me")
64+
}
65+
66+
func (sm *publishStreamMock) CloseSend() error {
67+
return nil
68+
}
69+
70+
func (sm *publishStreamMock) Context() context.Context {
71+
//TODO implement me
72+
panic("implement me")
73+
}
74+
75+
func (sm *publishStreamMock) SendMsg(m interface{}) error {
76+
//TODO implement me
77+
panic("implement me")
78+
}
79+
80+
func (sm *publishStreamMock) RecvMsg(m interface{}) error {
81+
//TODO implement me
82+
panic("implement me")
83+
}

api/grpc/events/publisher.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
6+
"io"
7+
)
8+
9+
type publisher struct {
10+
stream Service_PublishClient
11+
queue string
12+
}
13+
14+
func newPublisher(stream Service_PublishClient, queue string) Writer {
15+
return publisher{
16+
stream: stream,
17+
queue: queue,
18+
}
19+
}
20+
21+
func (mw publisher) Close() (err error) {
22+
err = mw.stream.CloseSend()
23+
if err != nil {
24+
err = decodeError(err)
25+
}
26+
return
27+
}
28+
29+
func (mw publisher) Write(ctx context.Context, msgs []*pb.CloudEvent) (ackCount uint32, err error) {
30+
req := PublishRequest{
31+
Topic: mw.queue,
32+
Evts: msgs,
33+
}
34+
err = mw.stream.Send(&req)
35+
var resp *PublishResponse
36+
if err == nil || err == io.EOF {
37+
resp, err = mw.stream.Recv()
38+
}
39+
if err != nil {
40+
err = decodeError(err)
41+
}
42+
if resp != nil {
43+
ackCount = resp.AckCount
44+
}
45+
return
46+
}

api/grpc/events/publisher_mock.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
6+
)
7+
8+
type publisherMock struct{}
9+
10+
func NewPublisherMock() Writer {
11+
return publisherMock{}
12+
}
13+
14+
func (mw publisherMock) Close() error {
15+
return nil
16+
}
17+
18+
func (mw publisherMock) Write(ctx context.Context, msgs []*pb.CloudEvent) (ackCount uint32, err error) {
19+
for _, msg := range msgs {
20+
switch msg.Id {
21+
case "queue_fail":
22+
err = ErrInternal
23+
default:
24+
ackCount++
25+
}
26+
if err != nil {
27+
break
28+
}
29+
}
30+
return
31+
}

api/grpc/events/publisher_test.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
6+
"github.com/stretchr/testify/assert"
7+
"io"
8+
"testing"
9+
)
10+
11+
func TestMessagesWriter_Close(t *testing.T) {
12+
mw := newPublisher(newPublishStreamMock(), "queue0")
13+
err := mw.Close()
14+
assert.Nil(t, err)
15+
}
16+
17+
func TestMessagesWriter_Write(t *testing.T) {
18+
cases := map[string]struct {
19+
queue string
20+
msgs []*pb.CloudEvent
21+
ackCount uint32
22+
err error
23+
}{
24+
"1 => ack 1": {
25+
queue: "queue0",
26+
msgs: []*pb.CloudEvent{
27+
{
28+
Id: "msg0",
29+
},
30+
},
31+
ackCount: 1,
32+
},
33+
"3 => ack 2": {
34+
queue: "queue0",
35+
msgs: []*pb.CloudEvent{
36+
{
37+
Id: "msg0",
38+
},
39+
{
40+
Id: "msg1",
41+
},
42+
{
43+
Id: "msg2",
44+
},
45+
},
46+
ackCount: 2,
47+
},
48+
"send eof": {
49+
queue: "send_eof",
50+
msgs: []*pb.CloudEvent{
51+
{
52+
Id: "msg0",
53+
},
54+
},
55+
err: io.EOF,
56+
},
57+
"recv fail": {
58+
queue: "recv_fail",
59+
msgs: []*pb.CloudEvent{
60+
{
61+
Id: "msg0",
62+
},
63+
},
64+
err: ErrInternal,
65+
},
66+
"recv eof": {
67+
queue: "recv_eof",
68+
msgs: []*pb.CloudEvent{
69+
{
70+
Id: "msg0",
71+
},
72+
},
73+
err: io.EOF,
74+
},
75+
}
76+
for k, c := range cases {
77+
t.Run(k, func(t *testing.T) {
78+
mw := newPublisher(newPublishStreamMock(), c.queue)
79+
ackCount, err := mw.Write(context.TODO(), c.msgs)
80+
assert.Equal(t, c.ackCount, ackCount)
81+
assert.ErrorIs(t, err, c.err)
82+
})
83+
}
84+
}

0 commit comments

Comments
 (0)