Skip to content

Commit ecc78a8

Browse files
committed
init
1 parent 2032f14 commit ecc78a8

File tree

14 files changed

+240
-126
lines changed

14 files changed

+240
-126
lines changed

config/config.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@ import (
66
)
77

88
type Config struct {
9-
Api ApiConfig
10-
Db DbConfig
11-
Event WebsocketConfig
12-
Log struct {
9+
Api ApiConfig
10+
Db DbConfig
11+
Log struct {
1312
Level int `envconfig:"LOG_LEVEL" default:"-4" required:"true"`
1413
}
1514
Replica ReplicaConfig
@@ -30,6 +29,7 @@ type ApiConfig struct {
3029

3130
type EventsConfig struct {
3231
Source string `envconfig:"API_EVENTS_SOURCE" default:"https://awakari.com/pub.html?srcType=ws" required:"true"`
32+
Type string `envconfig:"API_EVENTS_TYPE" required:"true" default:"com_awakari_websocket_v1"`
3333
}
3434

3535
type DbConfig struct {
@@ -48,11 +48,6 @@ type DbConfig struct {
4848
}
4949
}
5050

51-
type WebsocketConfig struct {
52-
StreamTimeout time.Duration `envconfig:"WEBSOCKET_STREAM_TIMEOUT" default:"15m" required:"true"`
53-
Type string `envconfig:"WEBSOCKET_TYPE" required:"true" default:"com_awakari_websocket_v1"`
54-
}
55-
5651
type ReplicaConfig struct {
5752
Count uint32 `envconfig:"REPLICA_COUNT" required:"true"`
5853
Name string `envconfig:"REPLICA_NAME" required:"true"`

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/coder/websocket v1.8.12
1010
github.com/hashicorp/golang-lru/v2 v2.0.7
1111
github.com/kelseyhightower/envconfig v1.4.0
12+
github.com/segmentio/ksuid v1.0.4
1213
github.com/stretchr/testify v1.9.0
1314
go.mongodb.org/mongo-driver v1.17.1
1415
google.golang.org/grpc v1.68.0

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ github.com/processout/grpc-go-pool v1.2.1 h1:hbp1BOA02CIxEAoRLHGpUhhPFv77nwfBLBe
3333
github.com/processout/grpc-go-pool v1.2.1/go.mod h1:F4hiNj96O6VQ87jv4rdz8R9tkHdelQQJ/J2B1a5VSt4=
3434
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
3535
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
36+
github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c=
37+
github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE=
3638
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
3739
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
3840
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=

helm/source-websocket/templates/sts.yaml

+2-4
Original file line numberDiff line numberDiff line change
@@ -70,16 +70,14 @@ spec:
7070
value: "{{ .Values.db.tls.insecure }}"
7171
- name: DB_TABLE_RETENTION
7272
value: "{{ .Values.db.table.retention }}"
73-
- name: WEBSOCKET_TYPE
74-
value: "{{ .Values.websocket.type }}"
73+
- name: API_EVENTS_TYPE
74+
value: "{{ .Values.api.events.type }}"
7575
- name: LOG_LEVEL
7676
value: "{{ .Values.log.level }}"
7777
- name: API_USER_AGENT
7878
value: "{{ .Values.api.userAgent }}"
7979
- name: API_GROUP_ID
8080
value: "{{ .Values.api.groupId }}"
81-
- name: WEBSOCKET_STREAM_TIMEOUT
82-
value: "{{ .Values.websocket.stream.timeout }}"
8381
- name: API_EVENTS_SOURCE
8482
value: "{{ .Values.api.events.source }}"
8583
- name: REPLICA_COUNT

helm/source-websocket/values.yaml

+1-4
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ api:
8888
groupId: "default"
8989
events:
9090
source: "https://awakari.com/pub.html?srcType=ws"
91+
type: "com_awakari_websocket_v1"
9192
db:
9293
# Database name to use.
9394
name: source
@@ -105,10 +106,6 @@ db:
105106
tls:
106107
enabled: false
107108
insecure: false
108-
websocket:
109-
stream:
110-
timeout: "15m"
111-
type: "com_awakari_websocket_v1"
112109
log:
113110
# https://pkg.go.dev/golang.org/x/exp/slog#Level
114111
level: -4

main.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88
"github.com/awakari/source-websocket/config"
99
"github.com/awakari/source-websocket/model"
1010
"github.com/awakari/source-websocket/service"
11+
"github.com/awakari/source-websocket/service/converter"
1112
"github.com/awakari/source-websocket/service/handler"
12-
"github.com/awakari/source-websocket/service/interceptor"
1313
"github.com/awakari/source-websocket/service/writer"
1414
"github.com/awakari/source-websocket/storage/mongo"
1515
"log/slog"
@@ -68,13 +68,12 @@ func main() {
6868
}
6969
defer stor.Close()
7070

71-
interceptors := []interceptor.Interceptor{
72-
interceptor.NewLogging(interceptor.NewDefault(svcWriter), log, "default"),
73-
}
71+
conv := converter.NewService(cfg.Api.Events.Type)
72+
conv = converter.NewLogging(conv, log)
7473

7574
handlersLock := &sync.Mutex{}
7675
handlerByUrl := make(map[string]handler.Handler)
77-
handlerFactory := handler.NewFactory(cfg.Api, cfg.Event, interceptors)
76+
handlerFactory := handler.NewFactory(cfg.Api, conv, svcWriter)
7877

7978
svc := service.NewService(stor, uint32(replicaIndex), handlersLock, handlerByUrl, handlerFactory)
8079
svc = service.NewServiceLogging(svc, log)

model/metadata.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ package model
22

33
const CeSpecVersion = "1.0"
44
const CeKeyAction = "action"
5-
const CeKeyLanguage = "language"
6-
const CeKeyLength = "length"
75
const CeKeyObjectUrl = "objecturl"
8-
const CeKeyRevision = "revision"
9-
const CeKeySchema = "schema"
106
const CeKeySubject = "subject"
117
const CeKeyTime = "time"
128
const CeKeyTitle = "title"
13-
14-
const CeKeyWikiNotifyUrl = "wikinotifyurl"
15-
const CeKeyWikiServerUrl = "wikiserverurl"
9+
const CeKeyElevation = "elevation"
10+
const CeKeyLocation = "location"
11+
const CeKeyLatitude = "latitude"
12+
const CeKeyLongitude = "longitude"
13+
const CeKeyMagnitude = "magnitude"
14+
const CeKeyMagnitudeType = "magnitudetype"
15+
const CeKeySourceCatalog = "sourcecatalog"
16+
const CeKeySourceId = "sourceid"

scripts/cover.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/bin/bash
22

33
COVERAGE=$(cat cover.tmp)
4-
THRESHOLD=46
4+
THRESHOLD=42
55
if [[ ${COVERAGE} -lt ${THRESHOLD} ]]; \
66
then \
77
echo "FAILED: test coverage ${COVERAGE}% < ${THRESHOLD}%"; \

service/converter/converter.go

+156
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package converter
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"github.com/awakari/source-websocket/model"
7+
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
8+
"github.com/segmentio/ksuid"
9+
"google.golang.org/protobuf/types/known/timestamppb"
10+
"reflect"
11+
"strconv"
12+
"time"
13+
)
14+
15+
type Service interface {
16+
Convert(src string, raw map[string]any) (evt *pb.CloudEvent, err error)
17+
}
18+
19+
type svc struct {
20+
et string
21+
}
22+
23+
type ConvertFunc func(evt *pb.CloudEvent, v any) (err error)
24+
25+
const seismicportalEuEventDetailsHtmlUnid = "https://www.seismicportal.eu/eventdetails.html?unid="
26+
27+
var convSchema = map[string]any{
28+
"action": toStringFunc(model.CeKeyAction),
29+
"data": map[string]any{
30+
"properties": map[string]any{
31+
"auth": toStringFunc(model.CeKeySubject),
32+
"depth": toStringWithPrefixFunc(model.CeKeyElevation, "-"),
33+
"flynn_region": toStringFunc(model.CeKeyLocation),
34+
"lat": toStringFunc(model.CeKeyLatitude),
35+
"lon": toStringFunc(model.CeKeyLongitude),
36+
"mag": toStringFunc(model.CeKeyMagnitude),
37+
"magtype": toStringFunc(model.CeKeyMagnitudeType),
38+
"sourcecatalog": toStringFunc(model.CeKeySourceCatalog),
39+
"sourceid": toStringFunc(model.CeKeySourceId),
40+
"time": toTimestampFunc(model.CeKeyTime),
41+
"unid": toStringWithPrefixFunc(model.CeKeyObjectUrl, seismicportalEuEventDetailsHtmlUnid),
42+
},
43+
},
44+
}
45+
46+
var ErrConversion = errors.New("conversion failure")
47+
48+
func NewService(et string) Service {
49+
return svc{
50+
et: et,
51+
}
52+
}
53+
54+
func (s svc) Convert(src string, raw map[string]any) (evt *pb.CloudEvent, err error) {
55+
evt = &pb.CloudEvent{
56+
Id: ksuid.New().String(),
57+
Source: src,
58+
SpecVersion: model.CeSpecVersion,
59+
Type: s.et,
60+
Attributes: make(map[string]*pb.CloudEventAttributeValue),
61+
Data: &pb.CloudEvent_TextData{},
62+
}
63+
err = convert(evt, raw, convSchema)
64+
return
65+
}
66+
67+
func convert(evt *pb.CloudEvent, node map[string]any, schema map[string]any) (err error) {
68+
for k, v := range node {
69+
schemaChild, schemaChildOk := schema[k]
70+
if schemaChildOk {
71+
switch schemaChildT := schemaChild.(type) {
72+
case ConvertFunc:
73+
err = errors.Join(err, schemaChildT(evt, v))
74+
case map[string]any:
75+
branch, branchOk := v.(map[string]any)
76+
if branchOk {
77+
err = errors.Join(convert(evt, branch, schemaChildT))
78+
}
79+
}
80+
}
81+
}
82+
return
83+
}
84+
85+
func toStringFunc(k string) ConvertFunc {
86+
return func(evt *pb.CloudEvent, v any) (err error) {
87+
var str string
88+
str, err = toString(k, v)
89+
if err == nil {
90+
evt.Attributes[k] = &pb.CloudEventAttributeValue{
91+
Attr: &pb.CloudEventAttributeValue_CeString{
92+
CeString: str,
93+
},
94+
}
95+
}
96+
return
97+
}
98+
}
99+
100+
func toString(k string, v any) (str string, err error) {
101+
switch vt := v.(type) {
102+
case bool:
103+
str = strconv.FormatBool(vt)
104+
case int:
105+
str = strconv.Itoa(vt)
106+
case int8:
107+
str = strconv.Itoa(int(vt))
108+
case int16:
109+
str = strconv.Itoa(int(vt))
110+
case int32:
111+
str = strconv.Itoa(int(vt))
112+
case int64:
113+
str = strconv.FormatInt(vt, 10)
114+
case float32, float64:
115+
str = fmt.Sprintf("%f", vt)
116+
case string:
117+
str = vt
118+
default:
119+
err = fmt.Errorf("%w: key: %s, value: %v, type: %s, expected: string/bool/int/float", ErrConversion, k, v, reflect.TypeOf(v))
120+
}
121+
return
122+
}
123+
124+
func toStringWithPrefixFunc(k, prefix string) ConvertFunc {
125+
return func(evt *pb.CloudEvent, v any) (err error) {
126+
var str string
127+
str, err = toString(k, v)
128+
if err == nil {
129+
evt.Attributes[k] = &pb.CloudEventAttributeValue{
130+
Attr: &pb.CloudEventAttributeValue_CeString{
131+
CeString: prefix + str,
132+
},
133+
}
134+
}
135+
return
136+
}
137+
}
138+
139+
func toTimestampFunc(k string) ConvertFunc {
140+
return func(evt *pb.CloudEvent, v any) (err error) {
141+
str, strOk := v.(string)
142+
switch strOk {
143+
case true:
144+
var t time.Time
145+
t, err = time.Parse(time.RFC3339, str)
146+
evt.Attributes[k] = &pb.CloudEventAttributeValue{
147+
Attr: &pb.CloudEventAttributeValue_CeTimestamp{
148+
CeTimestamp: timestamppb.New(t),
149+
},
150+
}
151+
default:
152+
err = fmt.Errorf("%w: key: %s, value %v, type: %s, expected timestamp in RFC3339 format", ErrConversion, k, v, reflect.TypeOf(k))
153+
}
154+
return
155+
}
156+
}

service/converter/logging.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package converter
2+
3+
import (
4+
"fmt"
5+
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
6+
"log/slog"
7+
)
8+
9+
type logging struct {
10+
svc Service
11+
log *slog.Logger
12+
}
13+
14+
func NewLogging(svc Service, log *slog.Logger) Service {
15+
return logging{
16+
svc: svc,
17+
log: log,
18+
}
19+
}
20+
21+
func (l logging) Convert(src string, raw map[string]any) (evt *pb.CloudEvent, err error) {
22+
evt, err = l.svc.Convert(src, raw)
23+
switch err {
24+
case nil:
25+
l.log.Debug(fmt.Sprintf("converter.Convert(%s): evt.Id=%s", src, evt.Id))
26+
default:
27+
l.log.Warn(fmt.Sprintf("converter.Convert(%s): %s", src, err))
28+
}
29+
return
30+
}

0 commit comments

Comments
 (0)