Skip to content

Commit e5ecedc

Browse files
committed
fix
1 parent 974d2b7 commit e5ecedc

File tree

6 files changed

+104
-47
lines changed

6 files changed

+104
-47
lines changed

Diff for: pkg/models/settings.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import (
99

1010
type PluginSettings struct {
1111
Host string `json:"host"`
12-
Port int `json:"port"`
12+
TCPPort int `json:"tcpPort"`
13+
HTTPPort int `json:"httpPort"`
1314
Username string `json:"username"`
1415
Secrets *SecretPluginSettings `json:"-"`
1516
}

Diff for: pkg/plugin/datasource.go

+6-26
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,22 @@ import (
1414
"github.com/timeplus-io/proton-grafana-source/pkg/timeplus"
1515
)
1616

17-
// Make sure Datasource implements required interfaces. This is important to do
18-
// since otherwise we will only get a not implemented error response from plugin in
19-
// runtime. In this example datasource instance implements backend.QueryDataHandler,
20-
// backend.CheckHealthHandler interfaces. Plugin should not implement all these
21-
// interfaces - only those which are required for a particular task.
2217
var (
2318
_ backend.CheckHealthHandler = (*Datasource)(nil)
2419
_ backend.StreamHandler = (*Datasource)(nil)
2520
_ instancemgmt.InstanceDisposer = (*Datasource)(nil)
2621
)
2722

28-
// NewDatasource creates a new datasource instance.
2923
func NewDatasource(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
3024
logger := log.DefaultLogger.FromContext(ctx)
3125
conf, err := models.LoadPluginSettings(settings)
3226
if err != nil {
3327
return nil, err
3428
}
3529

36-
engine := timeplus.NewEngine(logger, conf.Host, conf.Port, conf.Username, conf.Secrets.Password)
30+
engine := timeplus.NewEngine(logger, conf.Host, conf.TCPPort, conf.HTTPPort, conf.Username, conf.Secrets.Password)
3731

38-
logger.Debug("new timeplus source")
32+
logger.Debug("new timeplus source created")
3933

4034
return &Datasource{
4135
engine: engine,
@@ -52,7 +46,6 @@ type Datasource struct {
5246

5347
func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
5448
logger := log.DefaultLogger.FromContext(ctx)
55-
logger.Info("QueryData called")
5649
response := backend.NewQueryDataResponse()
5750

5851
for _, query := range req.Queries {
@@ -93,7 +86,6 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques
9386
for {
9487
select {
9588
case <-ctx.Done():
96-
logger.Info("RunStream ctx done")
9789
return nil, ctx.Err()
9890
case row, ok := <-ch:
9991
if !ok {
@@ -168,11 +160,10 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques
168160
for {
169161
select {
170162
case <-ctx.Done():
171-
logger.Info("RunStream ctx done")
172163
return ctx.Err()
173164
case row, ok := <-ch:
174165
if !ok {
175-
logger.Info("Query finished")
166+
logger.Warn("Streaming query terminated")
176167
return nil
177168
}
178169
frame := data.NewFrame("response")
@@ -188,28 +179,17 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques
188179
}
189180

190181
frame.AppendRow(fData...)
191-
192-
err := sender.SendFrame(
193-
frame,
194-
data.IncludeAll,
195-
)
196-
197-
if err != nil {
182+
if err := sender.SendFrame(frame, data.IncludeAll); err != nil {
198183
logger.Error("Failed send frame", "error", err)
199184
}
200185
}
201186
}
202-
203187
}
204188

205189
type queryModel struct {
206190
SQL string `json:"sql"`
207191
}
208192

209-
// CheckHealth handles health checks sent from Grafana to the plugin.
210-
// The main use case for these health checks is the test button on the
211-
// datasource configuration page which allows users to verify that
212-
// a datasource is working as expected.
213193
func (d *Datasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
214194
logger := log.DefaultLogger.FromContext(ctx)
215195
res := &backend.CheckHealthResult{}
@@ -227,9 +207,9 @@ func (d *Datasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRe
227207
return res, nil
228208
}
229209

230-
engine := timeplus.NewEngine(logger, config.Host, config.Port, config.Username, config.Secrets.Password)
210+
engine := timeplus.NewEngine(logger, config.Host, config.TCPPort, config.HTTPPort, config.Username, config.Secrets.Password)
231211

232-
if err := engine.Ping(); err != nil {
212+
if err := engine.Ping(ctx); err != nil {
233213
res.Status = backend.HealthStatusError
234214
res.Message = "failed to ping timeplusd: " + err.Error()
235215
return res, nil

Diff for: pkg/timeplus/engine.go

+64-11
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"bytes"
55
"context"
66
"database/sql"
7+
"encoding/base64"
78
"encoding/json"
89
"fmt"
910
"io"
11+
"net"
1012
"net/http"
1113
"time"
1214

@@ -15,6 +17,11 @@ import (
1517
protonDriver "github.com/timeplus-io/proton-go-driver/v2"
1618
)
1719

20+
const (
21+
bufferSize = 1000
22+
defaultTimeout = 10 * time.Second
23+
)
24+
1825
type Column struct {
1926
Name string
2027
Type string
@@ -24,28 +31,54 @@ type TimeplusEngine struct {
2431
connection *sql.DB
2532
logger log.Logger
2633
analyzeURL string
34+
pingURL string
35+
client *http.Client
36+
header http.Header
2737
}
2838

29-
func NewEngine(logger log.Logger, host string, port int, username, password string) *TimeplusEngine {
39+
func NewEngine(logger log.Logger, host string, tcpPort, httpPort int, username, password string) *TimeplusEngine {
3040
connection := protonDriver.OpenDB(&protonDriver.Options{
31-
Addr: []string{fmt.Sprintf("%s:%d", host, port)},
41+
Addr: []string{fmt.Sprintf("%s:%d", host, tcpPort)},
3242
Auth: protonDriver.Auth{
3343
Username: username,
3444
Password: password,
3545
},
36-
DialTimeout: 10 * time.Second,
46+
DialTimeout: defaultTimeout,
3747
Debug: false,
3848
})
3949

50+
header := http.Header{}
51+
header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", username, password))))
52+
header.Set("Content-Type", "application/json")
53+
4054
return &TimeplusEngine{
4155
connection: connection,
4256
logger: logger,
43-
analyzeURL: fmt.Sprintf("http://%s:%d/proton/v1/sqlanalyzer", host, 3218),
57+
analyzeURL: fmt.Sprintf("http://%s:%d/proton/v1/sqlanalyzer", host, httpPort),
58+
pingURL: fmt.Sprintf("http://%s:%d/proton/ping", host, httpPort),
59+
header: header,
60+
client: &http.Client{
61+
Timeout: defaultTimeout,
62+
Transport: &http.Transport{
63+
Dial: (&net.Dialer{
64+
Timeout: defaultTimeout,
65+
}).Dial,
66+
TLSHandshakeTimeout: defaultTimeout,
67+
},
68+
},
4469
}
4570
}
4671

47-
func (e *TimeplusEngine) Ping() error {
48-
return e.connection.Ping()
72+
func (e *TimeplusEngine) Ping(ctx context.Context) error {
73+
if err := e.pingHttp(ctx); err != nil {
74+
return fmt.Errorf("failed to ping via http: %w", err)
75+
}
76+
77+
if err := e.connection.Ping(); err != nil {
78+
return fmt.Errorf("failed to ping via tcp: %w", err)
79+
}
80+
81+
return nil
4982
}
5083

5184
func (e *TimeplusEngine) RunQuery(ctx context.Context, sql string) ([]*sql.ColumnType, chan []any, error) {
@@ -61,7 +94,7 @@ func (e *TimeplusEngine) RunQuery(ctx context.Context, sql string) ([]*sql.Colum
6194
return nil, nil, err
6295
}
6396

64-
ch := make(chan []any, 1000)
97+
ch := make(chan []any, bufferSize)
6598

6699
go func() {
67100
defer func() {
@@ -109,15 +142,16 @@ func (e *TimeplusEngine) IsStreamingQuery(ctx context.Context, query string) (bo
109142
if err != nil {
110143
return false, err
111144
}
112-
req.Header.Set("Content-Type", "application/json")
145+
req.Header = e.header
113146

114-
client := &http.Client{}
115-
resp, err := client.Do(req)
147+
resp, err := e.client.Do(req)
116148
if err != nil {
117149
return false, err
118150
}
119-
120151
defer resp.Body.Close()
152+
if resp.StatusCode < 200 || resp.StatusCode > 399 {
153+
return false, fmt.Errorf("failed to analyze %d", resp.StatusCode)
154+
}
121155

122156
body, err := io.ReadAll(resp.Body)
123157
if err != nil {
@@ -136,3 +170,22 @@ func (e *TimeplusEngine) IsStreamingQuery(ctx context.Context, query string) (bo
136170

137171
return isStreaming, nil
138172
}
173+
174+
func (e *TimeplusEngine) pingHttp(ctx context.Context) error {
175+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, e.pingURL, nil)
176+
if err != nil {
177+
return err
178+
}
179+
req.Header = e.header
180+
181+
resp, err := e.client.Do(req)
182+
if err != nil {
183+
return err
184+
}
185+
186+
if resp.StatusCode != http.StatusOK {
187+
return fmt.Errorf("failed to ping, got %d", resp.StatusCode)
188+
}
189+
190+
return nil
191+
}

Diff for: pkg/timeplus/interface.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ type Engine interface {
99
RunQuery(ctx context.Context, query string) ([]*sql.ColumnType, chan []any, error)
1010
IsStreamingQuery(ctx context.Context, query string) (bool, error)
1111

12-
Ping() error
12+
Ping(ctx context.Context) error
1313
Dispose() error
1414
}

Diff for: src/components/ConfigEditor.tsx

+29-7
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,26 @@ export function ConfigEditor(props: Props) {
2929
});
3030
};
3131

32-
const onPortChange = (event: ChangeEvent<HTMLInputElement>) => {
32+
const onTCPPortChange = (event: ChangeEvent<HTMLInputElement>) => {
3333
onOptionsChange({
3434
...options,
3535
jsonData: {
3636
...jsonData,
37-
port: parseInt(event.target.value, 10),
37+
tcpPort: parseInt(event.target.value, 10) ,
3838
},
3939
});
4040
};
41+
4142

43+
const onHTTPPortChange = (event: ChangeEvent<HTMLInputElement>) => {
44+
onOptionsChange({
45+
...options,
46+
jsonData: {
47+
...jsonData,
48+
httpPort: parseInt(event.target.value, 10),
49+
},
50+
});
51+
};
4252

4353
// Secure field (only sent to the backend)
4454
const onPasswordChange = (event: ChangeEvent<HTMLInputElement>) => {
@@ -66,7 +76,7 @@ export function ConfigEditor(props: Props) {
6676

6777
return (
6878
<>
69-
<InlineField label="Host" labelWidth={14} interactive tooltip={'Hostname and port'}>
79+
<InlineField label="Host" labelWidth={14} interactive tooltip={'Hostname'}>
7080
<Input
7181
id="config-editor-host"
7282
onChange={onHostChange}
@@ -75,16 +85,28 @@ export function ConfigEditor(props: Props) {
7585
width={40}
7686
/>
7787
</InlineField>
78-
<InlineField label="Port" labelWidth={14} interactive tooltip={'Port'}>
88+
<InlineField label="TCP Port" labelWidth={14} interactive tooltip={'TCP Port'}>
7989
<Input
80-
id="config-editor-port"
90+
id="config-editor-tcp-port"
8191
type='number'
82-
onChange={onPortChange}
83-
value={jsonData.port}
92+
onChange={onTCPPortChange}
93+
value={jsonData.tcpPort}
94+
defaultValue="8463"
8495
placeholder="8463"
8596
width={40}
8697
/>
8798
</InlineField>
99+
<InlineField label="HTTP Port" labelWidth={14} interactive tooltip={'HTTP Port'}>
100+
<Input
101+
id="config-editor-http-port"
102+
type='number'
103+
onChange={onHTTPPortChange}
104+
value={jsonData.httpPort}
105+
defaultValue="3218"
106+
placeholder="3218"
107+
width={40}
108+
/>
109+
</InlineField>
88110
<InlineField label="Username" labelWidth={14} interactive tooltip={'Username'}>
89111
<Input
90112
id="config-editor-username"

Diff for: src/types.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ export interface TpQuery extends DataQuery {
1010
*/
1111
export interface TpDataSourceOptions extends DataSourceJsonData {
1212
host?: string;
13-
port?: number;
13+
tcpPort?: number;
14+
httpPort?: number;
1415
username?: string
1516
}
1617

0 commit comments

Comments
 (0)