Skip to content

Commit 00ece22

Browse files
authored
Merge pull request #68 from xephonhq/client/grpc/copy-libtsdb-go
[client][grpc] Copy client code from libtsdb-go
2 parents aab24d3 + 4686570 commit 00ece22

5 files changed

Lines changed: 168 additions & 0 deletions

File tree

doc/design/protocol.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Protocol
2+
3+
Framework independent
4+
5+
- TCP
6+
- HTTP/JSON
7+
- HTTP/Line
8+
9+
Framework
10+
11+
- GRPC

xk/client/grpcclient/client.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Package grpcclient is grpc client of Xephon-K, it's in separated package to avoid import grpc
2+
// for clients that don't use grpc
3+
package grpcclient
4+
5+
import (
6+
"context"
7+
"net/url"
8+
9+
"github.com/dyweb/gommon/errors"
10+
"google.golang.org/grpc"
11+
12+
"github.com/libtsdb/libtsdb-go/libtsdb"
13+
pb "github.com/libtsdb/libtsdb-go/libtsdb/libtsdbpb"
14+
15+
xkc "github.com/xephonhq/xephon-k/xk/client"
16+
"github.com/xephonhq/xephon-k/xk/config"
17+
rpc "github.com/xephonhq/xephon-k/xk/transport/grpc"
18+
)
19+
20+
var _ libtsdb.WriteClient = (*Client)(nil)
21+
22+
// TODO: support prepare and columnar format
23+
type Client struct {
24+
cfg config.ClientConfig
25+
client rpc.XephonkClient
26+
conn *grpc.ClientConn
27+
28+
pointsInt []pb.PointIntTagged
29+
pointsDouble []pb.PointDoubleTagged
30+
seriesInt []pb.SeriesIntTagged
31+
seriesDouble []pb.SeriesDoubleTagged
32+
}
33+
34+
func New(cfg config.ClientConfig) (*Client, error) {
35+
_, err := url.Parse(cfg.Addr)
36+
if err != nil {
37+
return nil, errors.Wrap(err, "can't parse server address")
38+
}
39+
conn, err := grpc.Dial(cfg.Addr, grpc.WithInsecure())
40+
if err != nil {
41+
return nil, errors.Wrapf(err, "grpc dial failed %s", cfg.Addr)
42+
}
43+
client := rpc.NewClient(conn)
44+
return &Client{
45+
cfg: cfg,
46+
client: client,
47+
conn: conn,
48+
}, nil
49+
}
50+
51+
func (c *Client) Meta() libtsdb.Meta {
52+
return xkc.Meta()
53+
}
54+
55+
func (c *Client) Close() error {
56+
if err := c.conn.Close(); err != nil {
57+
return errors.Wrap(err, "can't close grpc client connection")
58+
}
59+
return nil
60+
}
61+
62+
func (c *Client) WriteIntPoint(p *pb.PointIntTagged) {
63+
// TODO: deal with prepare and columnar
64+
c.pointsInt = append(c.pointsInt, *p)
65+
}
66+
67+
func (c *Client) WriteDoublePoint(p *pb.PointDoubleTagged) {
68+
// TODO: deal with prepare and columnar
69+
c.pointsDouble = append(c.pointsDouble, *p)
70+
}
71+
72+
func (c *Client) WriteSeriesIntTagged(p *pb.SeriesIntTagged) {
73+
// TODO: deal with prepare and columnar
74+
c.seriesInt = append(c.seriesInt, *p)
75+
}
76+
77+
func (c *Client) WriteSeriesDoubleTagged(p *pb.SeriesDoubleTagged) {
78+
// TODO: deal with prepare and columnar
79+
c.seriesDouble = append(c.seriesDouble, *p)
80+
}
81+
82+
func (c *Client) Flush() error {
83+
return c.send()
84+
}
85+
86+
func (c *Client) send() error {
87+
merr := errors.NewMultiErr()
88+
// NOTE: normally we assume user only use one methods, so we just use one go routine
89+
if len(c.pointsInt) != 0 || len(c.pointsDouble) != 0 {
90+
req := rpc.WritePointsReq{
91+
Int: c.pointsInt,
92+
Double: c.pointsDouble,
93+
}
94+
_, err := c.client.WritePoints(context.Background(), &req)
95+
if err != nil {
96+
merr.Append(err)
97+
}
98+
c.pointsInt = c.pointsInt[:0]
99+
c.pointsDouble = c.pointsDouble[:0]
100+
}
101+
if len(c.seriesInt) != 0 || len(c.seriesDouble) != 0 {
102+
req := rpc.WriteSeriesReq{
103+
Int: c.seriesInt,
104+
Double: c.seriesDouble,
105+
}
106+
_, err := c.client.WriteSeries(context.Background(), &req)
107+
if err != nil {
108+
merr.Append(err)
109+
}
110+
c.seriesInt = c.seriesInt[:0]
111+
c.seriesDouble = c.seriesDouble[:0]
112+
}
113+
return merr.ErrorOrNil()
114+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package grpcclient
2+
3+
import "testing"
4+
5+
func TestClient_WriteIntPoint(t *testing.T) {
6+
t.Skipf("TODO: wait until server is implemented")
7+
}

xk/client/meta.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package client
2+
3+
import (
4+
"time"
5+
6+
"github.com/libtsdb/libtsdb-go/libtsdb"
7+
)
8+
9+
const (
10+
name = "xephonk"
11+
precision = time.Nanosecond
12+
)
13+
14+
var meta = libtsdb.Meta{
15+
Name: name,
16+
TimePrecision: precision,
17+
SupportTag: true,
18+
SupportInt: true,
19+
SupportDouble: true,
20+
SupportBatchSeries: true,
21+
SupportBatchPoints: true,
22+
}
23+
24+
func Meta() libtsdb.Meta {
25+
return meta
26+
}
27+
28+
func init() {
29+
libtsdb.RegisterMeta(name, meta)
30+
}

xk/config/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,7 @@
11
package config
2+
3+
type ClientConfig struct {
4+
Addr string `yaml:"addr"`
5+
Prepare bool `yaml:"prepare"`
6+
Columnar bool `yaml:"columnar"`
7+
}

0 commit comments

Comments
 (0)