Skip to content

Commit 2032f14

Browse files
committed
init
1 parent f730ebc commit 2032f14

File tree

11 files changed

+51
-38
lines changed

11 files changed

+51
-38
lines changed

api/grpc/controller.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (c controller) Create(ctx context.Context, req *CreateRequest) (resp *Creat
2727
case "":
2828
err = status.Error(codes.InvalidArgument, "empty url")
2929
default:
30-
err = c.svc.Create(ctx, req.Url, req.Auth, req.GroupId, req.UserId, time.Now().UTC())
30+
err = c.svc.Create(ctx, req.Url, req.Req, req.GroupId, req.UserId, time.Now().UTC())
3131
err = translateError(err)
3232
}
3333
return
@@ -38,9 +38,10 @@ func (c controller) Read(ctx context.Context, req *ReadRequest) (resp *ReadRespo
3838
var str model.Stream
3939
str, err = c.svc.Read(ctx, req.Url)
4040
if err == nil {
41+
resp.CreatedAt = timestamppb.New(str.CreatedAt.UTC())
42+
resp.Req = str.Request
4143
resp.GroupId = str.GroupId
4244
resp.UserId = str.UserId
43-
resp.CreatedAt = timestamppb.New(str.CreatedAt.UTC())
4445
}
4546
err = translateError(err)
4647
return

api/grpc/service.proto

+5-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ service Service {
1515

1616
message CreateRequest {
1717
string url = 1;
18-
string auth = 2;
18+
string req = 2; // initial request, typically a json payload to subscribe
1919
string groupId = 3;
2020
string userId = 4;
2121
}
@@ -27,9 +27,10 @@ message ReadRequest {
2727
}
2828

2929
message ReadResponse {
30-
string groupId = 1;
31-
string userId = 2;
32-
google.protobuf.Timestamp createdAt = 3;
30+
google.protobuf.Timestamp createdAt = 1;
31+
string req = 2;
32+
string groupId = 3;
33+
string userId = 4;
3334
}
3435

3536
message DeleteRequest {

config/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type DbConfig struct {
4949
}
5050

5151
type WebsocketConfig struct {
52-
StreamTimeout time.Duration `envconfig:"WEBSOCKET_STREAM_TIMEOUT" default:"1m" required:"true"`
52+
StreamTimeout time.Duration `envconfig:"WEBSOCKET_STREAM_TIMEOUT" default:"15m" required:"true"`
5353
Type string `envconfig:"WEBSOCKET_TYPE" required:"true" default:"com_awakari_websocket_v1"`
5454
}
5555

helm/source-websocket/values.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ db:
107107
insecure: false
108108
websocket:
109109
stream:
110-
timeout: "1m"
110+
timeout: "15m"
111111
type: "com_awakari_websocket_v1"
112112
log:
113113
# https://pkg.go.dev/golang.org/x/exp/slog#Level

model/stream.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package model
33
import "time"
44

55
type Stream struct {
6-
Auth string
6+
CreatedAt time.Time
7+
Request string
78
GroupId string
89
UserId string
9-
CreatedAt time.Time
1010
Replica uint32
1111
}

scripts/cover.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/bash
22

33
COVERAGE=$(cat cover.tmp)
4-
THRESHOLD=47
4+
THRESHOLD=46
55
if [[ ${COVERAGE} -lt ${THRESHOLD} ]]; \
66
then \
77
echo "FAILED: test coverage ${COVERAGE}% < ${THRESHOLD}%"; \

service/handler/handler.go

+17-6
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package handler
22

33
import (
44
"context"
5+
"fmt"
56
"github.com/awakari/source-websocket/config"
67
"github.com/awakari/source-websocket/model"
78
"github.com/awakari/source-websocket/service/interceptor"
89
"github.com/coder/websocket"
910
"github.com/coder/websocket/wsjson"
1011
"io"
12+
"net/http"
13+
"time"
1114
)
1215

1316
type Handler interface {
@@ -47,19 +50,27 @@ func (h *handler) Handle(ctx context.Context) {
4750
for {
4851
evtN, err := h.handleStream(ctx)
4952
if evtN == 0 && err != nil {
50-
panic(err)
53+
fmt.Println(err)
54+
time.Sleep(1 * time.Minute)
5155
}
5256
}
5357
}
5458

5559
func (h *handler) handleStream(ctx context.Context) (evtN uint64, err error) {
56-
h.conn, _, err = websocket.Dial(ctx, h.url, nil)
60+
var resp *http.Response
61+
h.conn, resp, err = websocket.Dial(ctx, h.url, nil)
5762
if err == nil {
5863
defer h.conn.CloseNow()
59-
for {
60-
err = h.handleStreamEvent(ctx, h.url)
61-
if err != nil {
62-
break
64+
fmt.Printf("%s response: %d\n", h.url, resp.StatusCode)
65+
if h.str.Request != "" {
66+
err = wsjson.Write(ctx, h.conn, h.str.Request)
67+
}
68+
if err == nil {
69+
for {
70+
err = h.handleStreamEvent(ctx, h.url)
71+
if err != nil {
72+
break
73+
}
6374
}
6475
}
6576
}

service/service.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
type Service interface {
15-
Create(ctx context.Context, url, auth, groupId, userId string, at time.Time) (err error)
15+
Create(ctx context.Context, url, sub, groupId, userId string, at time.Time) (err error)
1616
Read(ctx context.Context, url string) (str model.Stream, err error)
1717
Delete(ctx context.Context, url, groupId, userId string) (err error)
1818
List(ctx context.Context, limit uint32, filter model.Filter, order model.Order, cursor string) (urls []string, err error)
@@ -46,9 +46,9 @@ func NewService(
4646
}
4747
}
4848

49-
func (s svc) Create(ctx context.Context, url, auth, groupId, userId string, at time.Time) (err error) {
49+
func (s svc) Create(ctx context.Context, url, sub, groupId, userId string, at time.Time) (err error) {
5050
str := model.Stream{
51-
Auth: auth,
51+
Request: sub,
5252
GroupId: groupId,
5353
UserId: userId,
5454
CreatedAt: at,

service/service_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestService_Create(t *testing.T) {
1818
s = NewServiceLogging(s, slog.Default())
1919
cases := map[string]struct {
2020
url string
21-
auth string
21+
sub string
2222
groupId string
2323
userId string
2424
at time.Time
@@ -39,7 +39,7 @@ func TestService_Create(t *testing.T) {
3939
}
4040
for k, c := range cases {
4141
t.Run(k, func(t *testing.T) {
42-
err := s.Create(context.TODO(), c.url, c.auth, c.groupId, c.userId, c.at)
42+
err := s.Create(context.TODO(), c.url, c.sub, c.groupId, c.userId, c.at)
4343
assert.ErrorIs(t, err, c.err)
4444
assert.Equal(t, c.handlerCount, len(handlerByUrl))
4545
clear(handlerByUrl)

storage/mongo/storage.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ type storageMongo struct {
2222

2323
type record struct {
2424
Url string `bson:"url"`
25-
Auth string `bson:"auth"`
25+
Req string `bson:"req"`
2626
GroupId string `bson:"gid"`
2727
UserId string `bson:"uid"`
28-
ReplicaIndex uint32 `bson:"ridx"`
2928
CreatedAt time.Time `bson:"createdAt"`
29+
ReplicaIndex uint32 `bson:"ridx"`
3030
}
3131

3232
const attrUrl = "url"
33-
const attrAuth = "auth"
33+
const attrReq = "req"
3434
const attrGroupId = "gid"
3535
const attrUserId = "uid"
3636
const attrReplicaIndex = "ridx"
@@ -43,7 +43,7 @@ var optsGet = options.
4343
SetProjection(projRead)
4444
var projRead = bson.D{
4545
{
46-
Key: attrAuth,
46+
Key: attrReq,
4747
Value: 1,
4848
},
4949
{
@@ -137,7 +137,7 @@ func (sm storageMongo) Close() error {
137137
func (sm storageMongo) Create(ctx context.Context, url string, str model.Stream) (err error) {
138138
_, err = sm.coll.InsertOne(ctx, record{
139139
Url: url,
140-
Auth: str.Auth,
140+
Req: str.Request,
141141
GroupId: str.GroupId,
142142
UserId: str.UserId,
143143
ReplicaIndex: str.Replica,
@@ -159,7 +159,7 @@ func (sm storageMongo) Read(ctx context.Context, url string) (str model.Stream,
159159
err = result.Decode(&rec)
160160
}
161161
if err == nil {
162-
str.Auth = rec.Auth
162+
str.Request = rec.Req
163163
str.CreatedAt = rec.CreatedAt.UTC()
164164
str.GroupId = rec.GroupId
165165
str.UserId = rec.UserId

storage/mongo/storage_test.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func TestStorageMongo_Create(t *testing.T) {
6161
//
6262
_, err = sm.coll.InsertOne(ctx, record{
6363
Url: "url0",
64-
Auth: "token1",
64+
Req: "token1",
6565
GroupId: "group2",
6666
UserId: "user3",
6767
ReplicaIndex: 4,
@@ -71,7 +71,7 @@ func TestStorageMongo_Create(t *testing.T) {
7171
//
7272
cases := map[string]struct {
7373
url string
74-
auth string
74+
sub string
7575
groupId string
7676
userId string
7777
replicaIndex uint32
@@ -81,7 +81,7 @@ func TestStorageMongo_Create(t *testing.T) {
8181
"ok empty": {},
8282
"ok": {
8383
url: "url1",
84-
auth: "token1",
84+
sub: "sub1",
8585
groupId: "group2",
8686
userId: "user3",
8787
replicaIndex: 4,
@@ -96,7 +96,7 @@ func TestStorageMongo_Create(t *testing.T) {
9696
for k, c := range cases {
9797
t.Run(k, func(t *testing.T) {
9898
err = s.Create(ctx, c.url, model.Stream{
99-
Auth: c.auth,
99+
Request: c.sub,
100100
GroupId: c.groupId,
101101
UserId: c.userId,
102102
CreatedAt: c.at,
@@ -128,7 +128,7 @@ func TestStorageMongo_Read(t *testing.T) {
128128
//
129129
_, err = sm.coll.InsertOne(ctx, record{
130130
Url: "url0",
131-
Auth: "token1",
131+
Req: "sub1",
132132
GroupId: "group2",
133133
UserId: "user3",
134134
ReplicaIndex: 4,
@@ -144,7 +144,7 @@ func TestStorageMongo_Read(t *testing.T) {
144144
"ok": {
145145
url: "url0",
146146
out: model.Stream{
147-
Auth: "token1",
147+
Request: "sub1",
148148
GroupId: "group2",
149149
UserId: "user3",
150150
CreatedAt: time.Date(2024, 11, 4, 18, 49, 25, 0, time.UTC),
@@ -188,7 +188,7 @@ func TestStorageMongo_Delete(t *testing.T) {
188188
//
189189
_, err = sm.coll.InsertOne(ctx, record{
190190
Url: "url0",
191-
Auth: "token1",
191+
Req: "token1",
192192
GroupId: "group2",
193193
UserId: "user3",
194194
ReplicaIndex: 4,
@@ -197,7 +197,7 @@ func TestStorageMongo_Delete(t *testing.T) {
197197
require.Nil(t, err)
198198
_, err = sm.coll.InsertOne(ctx, record{
199199
Url: "url1",
200-
Auth: "token2",
200+
Req: "token2",
201201
GroupId: "group3",
202202
UserId: "user4",
203203
ReplicaIndex: 5,
@@ -258,7 +258,7 @@ func TestStorageMongo_List(t *testing.T) {
258258
//
259259
_, err = sm.coll.InsertOne(ctx, record{
260260
Url: "url0",
261-
Auth: "token1",
261+
Req: "token1",
262262
GroupId: "group2",
263263
UserId: "user3",
264264
ReplicaIndex: 4,
@@ -267,7 +267,7 @@ func TestStorageMongo_List(t *testing.T) {
267267
require.Nil(t, err)
268268
_, err = sm.coll.InsertOne(ctx, record{
269269
Url: "url1",
270-
Auth: "token2",
270+
Req: "token2",
271271
GroupId: "group3",
272272
UserId: "user4",
273273
ReplicaIndex: 5,

0 commit comments

Comments
 (0)