Skip to content

Commit a6b4d6b

Browse files
authored
Merge pull request #8 from Zondax/ezequiel/new_connections
Wrappers for connection types
2 parents 8276711 + 5feb35f commit a6b4d6b

11 files changed

Lines changed: 329 additions & 54 deletions

File tree

Makefile

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
1-
2-
# Constants to override at build time
3-
PACKAGE := github.com/zondax/zindexer
4-
REVISION := $(shell git rev-parse --short HEAD)
5-
APPNAME := zindexer
6-
71
build:
8-
go build -ldflags "-X $(PACKAGE).GitRevision=$(REVISION)" -o $(APPNAME) ./...
2+
go build ./...
93

104
clean:
115
go clean

cli.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type DefaultConfigHandler func()
1616
var defaultConfigHandler DefaultConfigHandler
1717

1818
func SetupCloseHandler(handler CleanUpHandler) {
19-
c := make(chan os.Signal)
19+
c := make(chan os.Signal, 1)
2020
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
2121
go func() {
2222
<-c

config.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,17 @@ type DBConnectionParams struct {
1313
Port string
1414
}
1515

16+
type GraphqlClientParams struct {
17+
Host string
18+
Token string
19+
}
20+
1621
type DBConnectionConfig struct {
17-
gorm *gorm.Config
22+
Gorm *gorm.Config
1823
}
1924

2025
func (p *DBConnectionParams) GetDSN() (string, error) {
2126
return fmt.Sprintf(
2227
"user=%s password=%s dbname=%s host=%s port=%s sslmode=disable",
2328
p.User, p.Password, p.Name, p.Host, p.Port), nil
2429
}
25-
26-
func ConnectDB(params DBConnectionParams, config DBConnectionConfig) (*gorm.DB, error) {
27-
conn, err := NewPostgresConnection(&params, &config)
28-
if err != nil {
29-
return nil, err
30-
}
31-
32-
return conn.GetDB(), nil
33-
}

connection.go

Lines changed: 0 additions & 32 deletions
This file was deleted.

connections/graphql.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package connections
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"github.com/hasura/go-graphql-client"
8+
"go.uber.org/zap"
9+
"golang.org/x/oauth2"
10+
"time"
11+
)
12+
13+
const ConnectTimeout = 10 * time.Second
14+
15+
type GraphqlClient struct {
16+
Host string
17+
Client *graphql.Client
18+
}
19+
20+
type GraphqlSubscriptionClient struct {
21+
Client *graphql.SubscriptionClient
22+
Id string
23+
}
24+
25+
func NewGraphqlQueryClient(host string, token string) GraphqlClient {
26+
src := oauth2.StaticTokenSource(
27+
&oauth2.Token{AccessToken: token},
28+
)
29+
httpClient := oauth2.NewClient(context.Background(), src)
30+
return GraphqlClient{
31+
Host: host,
32+
Client: graphql.NewClient(host, httpClient),
33+
}
34+
}
35+
36+
func (c GraphqlClient) Connect() error {
37+
return nil
38+
}
39+
40+
func NewGraphqlSubscriptionClient(host string, token string) (error, GraphqlSubscriptionClient) {
41+
client := graphql.NewSubscriptionClient(host).
42+
WithConnectionParams(map[string]interface{}{
43+
"headers": map[string]string{
44+
"x-hasura-admin-secret": token,
45+
},
46+
}).OnError(onClientError)
47+
48+
return nil, GraphqlSubscriptionClient{Client: client}
49+
}
50+
51+
func onClientError(sc *graphql.SubscriptionClient, err error) error {
52+
zap.S().Fatalf("Connection error on subscription client: %s", err.Error())
53+
return err
54+
}
55+
56+
func (c GraphqlSubscriptionClient) Subscribe(query interface{}, handler func(message *json.RawMessage, err error) error) error {
57+
id, err := c.Client.Subscribe(query, nil, handler)
58+
if err != nil {
59+
return err
60+
}
61+
c.Id = id
62+
return nil
63+
}
64+
65+
func (c GraphqlSubscriptionClient) Unsubscribe() error {
66+
err := c.Client.Unsubscribe(c.Id)
67+
if err != nil {
68+
return err
69+
}
70+
return nil
71+
}
72+
73+
func (c GraphqlSubscriptionClient) Start() error {
74+
errCh := make(chan error)
75+
readyCh := make(chan bool)
76+
77+
c.Client.OnConnected(func() {
78+
readyCh <- true
79+
})
80+
81+
go func() {
82+
err := c.Client.Run()
83+
if err != nil {
84+
errCh <- err
85+
}
86+
}()
87+
88+
for {
89+
select {
90+
case err := <-errCh:
91+
close(errCh)
92+
close(readyCh)
93+
return err
94+
case <-readyCh:
95+
close(readyCh)
96+
return nil
97+
case <-time.After(ConnectTimeout):
98+
return fmt.Errorf("timeout while waiting subscriber client to connect to host")
99+
}
100+
}
101+
}
102+
103+
func (c GraphqlSubscriptionClient) Stop() error {
104+
err := c.Client.Close()
105+
if err != nil {
106+
return err
107+
}
108+
return nil
109+
}

connections/postgres.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package connections
2+
3+
import (
4+
"fmt"
5+
"github.com/Zondax/zindexer"
6+
"gorm.io/driver/postgres"
7+
"gorm.io/gorm"
8+
)
9+
10+
func NewPostgresConnection(params *zindexer.DBConnectionParams, config *zindexer.DBConnectionConfig) (*GormConnection, error) {
11+
dsn, err := params.GetDSN()
12+
if err != nil {
13+
return nil, fmt.Errorf("failed to retrieve dsn")
14+
}
15+
16+
conn, err := gorm.Open(postgres.Open(dsn), config.Gorm)
17+
18+
if err != nil {
19+
return nil, fmt.Errorf("failed to dial connect to db '%s@%s:%s': %v", params.Name, params.Host, params.Port, err)
20+
}
21+
22+
return &GormConnection{
23+
db: conn,
24+
}, nil
25+
}
26+
27+
func (c *GormConnection) GetDB() *gorm.DB {
28+
return c.db
29+
}
30+
31+
func ConnectDB(params zindexer.DBConnectionParams, config zindexer.DBConnectionConfig) (*gorm.DB, error) {
32+
conn, err := NewPostgresConnection(&params, &config)
33+
if err != nil {
34+
return nil, err
35+
}
36+
37+
return conn.GetDB(), nil
38+
}

connections/pubsub_goChannel.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package connections
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/ThreeDotsLabs/watermill"
7+
"github.com/ThreeDotsLabs/watermill/message"
8+
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
9+
)
10+
11+
var ChannelPubSubDefaultConfig = gochannel.Config{
12+
OutputChannelBuffer: 10,
13+
Persistent: true,
14+
BlockPublishUntilSubscriberAck: false,
15+
}
16+
17+
type ChannelPubSub struct {
18+
Client *gochannel.GoChannel
19+
}
20+
21+
// NewPubSubHandlerChannel this handler uses Watermill's go channels
22+
// implementation as message engine
23+
func NewPubSubHandlerChannel(config gochannel.Config) ChannelPubSub {
24+
if config == (gochannel.Config{}) {
25+
// Use default config
26+
config = ChannelPubSubDefaultConfig
27+
}
28+
29+
ps := gochannel.NewGoChannel(
30+
config,
31+
watermill.NewStdLogger(false, false),
32+
)
33+
34+
return ChannelPubSub{
35+
Client: ps,
36+
}
37+
}
38+
39+
func (p ChannelPubSub) Subscribe(topic string, cb func(messages <-chan *message.Message)) error {
40+
messages, err := p.Client.Subscribe(context.Background(), topic)
41+
if err != nil {
42+
fmt.Printf("Could not subscribe to topic %s", topic)
43+
return err
44+
}
45+
46+
go cb(messages)
47+
return nil
48+
}
49+
50+
func (p ChannelPubSub) Publish(topic string, msg *message.Message) error {
51+
err := p.Client.Publish(topic, msg)
52+
if err != nil {
53+
fmt.Printf("Could not publish to topic %s", topic)
54+
return err
55+
}
56+
57+
return nil
58+
}

connections/types.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package connections
2+
3+
import (
4+
"github.com/Zondax/zindexer"
5+
"gorm.io/gorm"
6+
)
7+
8+
type GormConnection struct {
9+
db *gorm.DB
10+
}
11+
12+
type DBQueryClient struct {
13+
Client zindexer.IDBQueryClient
14+
}
15+
16+
type DBSubscriptionClient struct {
17+
Client zindexer.IDBSubscriptionClient
18+
}
19+
20+
type TopicPubSubClient struct {
21+
Client zindexer.ITopicPubSubClient
22+
}

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ module github.com/Zondax/zindexer
33
go 1.15
44

55
require (
6+
github.com/ThreeDotsLabs/watermill v1.1.1
7+
github.com/hasura/go-graphql-client v0.2.0
68
github.com/spf13/cobra v1.1.1
79
github.com/spf13/viper v1.7.1
810
go.uber.org/zap v1.16.0
11+
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
912
gorm.io/driver/postgres v1.0.6
1013
gorm.io/gorm v1.20.11
1114
)

0 commit comments

Comments
 (0)