Skip to content

Commit 974d2b7

Browse files
committed
fix
1 parent 179baa5 commit 974d2b7

File tree

4 files changed

+137
-48
lines changed

4 files changed

+137
-48
lines changed

Diff for: pkg/plugin/datasource.go

+89-13
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import (
44
"context"
55
"encoding/json"
66

7+
"github.com/google/uuid"
78
"github.com/grafana/grafana-plugin-sdk-go/backend"
89
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
910
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
1011
"github.com/grafana/grafana-plugin-sdk-go/data"
12+
"github.com/grafana/grafana-plugin-sdk-go/live"
1113
"github.com/timeplus-io/proton-grafana-source/pkg/models"
1214
"github.com/timeplus-io/proton-grafana-source/pkg/timeplus"
1315
)
@@ -36,14 +38,87 @@ func NewDatasource(ctx context.Context, settings backend.DataSourceInstanceSetti
3638
logger.Debug("new timeplus source")
3739

3840
return &Datasource{
39-
engine: engine,
41+
engine: engine,
42+
queries: make(map[string]string),
4043
}, nil
4144
}
4245

4346
// Datasource is an example datasource which can respond to data queries, reports
4447
// its health and has streaming skills.
4548
type Datasource struct {
46-
engine timeplus.Engine
49+
engine timeplus.Engine
50+
queries map[string]string
51+
}
52+
53+
func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
54+
logger := log.DefaultLogger.FromContext(ctx)
55+
logger.Info("QueryData called")
56+
response := backend.NewQueryDataResponse()
57+
58+
for _, query := range req.Queries {
59+
q := queryModel{}
60+
if err := json.Unmarshal(query.JSON, &q); err != nil {
61+
return nil, err
62+
}
63+
64+
resp := backend.DataResponse{}
65+
frame := data.NewFrame("response")
66+
67+
isStreaming, err := d.engine.IsStreamingQuery(ctx, q.SQL)
68+
if err != nil {
69+
return nil, err
70+
}
71+
72+
if isStreaming {
73+
id := uuid.NewString()
74+
d.queries[id] = q.SQL
75+
channel := live.Channel{
76+
Scope: live.ScopeDatasource,
77+
Namespace: req.PluginContext.DataSourceInstanceSettings.UID,
78+
Path: id,
79+
}
80+
frame.SetMeta(&data.FrameMeta{Channel: channel.String()})
81+
resp.Frames = append(resp.Frames, frame)
82+
} else {
83+
columnTypes, ch, err := d.engine.RunQuery(ctx, q.SQL)
84+
if err != nil {
85+
return nil, err
86+
}
87+
88+
for _, col := range columnTypes {
89+
frame.Fields = append(frame.Fields, timeplus.NewDataFieldByType(col.Name(), col.DatabaseTypeName()))
90+
}
91+
92+
LOOP:
93+
for {
94+
select {
95+
case <-ctx.Done():
96+
logger.Info("RunStream ctx done")
97+
return nil, ctx.Err()
98+
case row, ok := <-ch:
99+
if !ok {
100+
logger.Info("Query finished")
101+
102+
resp.Frames = append(resp.Frames, frame)
103+
break LOOP
104+
}
105+
106+
fData := make([]any, len(columnTypes))
107+
for i, r := range row {
108+
col := columnTypes[i]
109+
fData[i] = timeplus.ParseValue(col.Name(), col.DatabaseTypeName(), nil, r, false)
110+
}
111+
112+
frame.AppendRow(fData...)
113+
}
114+
}
115+
116+
}
117+
118+
response.Responses[query.RefID] = resp
119+
}
120+
121+
return response, nil
47122
}
48123

49124
// Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance
@@ -58,17 +133,19 @@ func (d *Datasource) Dispose() {
58133
}
59134

60135
func (d *Datasource) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
61-
logger := log.DefaultLogger.FromContext(ctx)
62-
logger.Debug("SubscribeStream", "path", req.Path, "sql", req.Data)
136+
var status backend.SubscribeStreamStatus
137+
if _, ok := d.queries[req.Path]; ok {
138+
status = backend.SubscribeStreamStatusOK
139+
} else {
140+
status = backend.SubscribeStreamStatusNotFound
141+
}
63142

64143
return &backend.SubscribeStreamResponse{
65-
Status: backend.SubscribeStreamStatusOK,
144+
Status: status,
66145
}, nil
67146
}
68147

69148
func (d *Datasource) PublishStream(ctx context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
70-
logger := log.DefaultLogger.FromContext(ctx)
71-
logger.Debug("PublishStream")
72149
return &backend.PublishStreamResponse{
73150
Status: backend.PublishStreamStatusPermissionDenied,
74151
}, nil
@@ -77,14 +154,13 @@ func (d *Datasource) PublishStream(ctx context.Context, _ *backend.PublishStream
77154
func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
78155
logger := log.DefaultLogger.FromContext(ctx)
79156

80-
logger.Debug("RunStream", "sql", req.Data, "path", req.Path)
81-
82-
q := queryModel{}
83-
if err := json.Unmarshal(req.Data, &q); err != nil {
84-
return err
157+
path := req.Path
158+
sql, ok := d.queries[path]
159+
if !ok {
160+
return nil
85161
}
86162

87-
columnTypes, ch, err := d.engine.RunQuery(ctx, q.SQL)
163+
columnTypes, ch, err := d.engine.RunQuery(ctx, sql)
88164
if err != nil {
89165
return err
90166
}

Diff for: pkg/timeplus/engine.go

+45-10
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package timeplus
22

33
import (
4+
"bytes"
45
"context"
56
"database/sql"
7+
"encoding/json"
68
"fmt"
9+
"io"
10+
"net/http"
711
"time"
812

913
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
1014

11-
"github.com/reactivex/rxgo/v2"
1215
protonDriver "github.com/timeplus-io/proton-go-driver/v2"
1316
)
1417

@@ -20,14 +23,7 @@ type Column struct {
2023
type TimeplusEngine struct {
2124
connection *sql.DB
2225
logger log.Logger
23-
}
24-
25-
type TimeplusQueryState struct {
26-
Query string
27-
AddNow bool
28-
Stream chan rxgo.Item
29-
ColumnArray []Column
30-
Cancel context.CancelFunc
26+
analyzeURL string
3127
}
3228

3329
func NewEngine(logger log.Logger, host string, port int, username, password string) *TimeplusEngine {
@@ -44,6 +40,7 @@ func NewEngine(logger log.Logger, host string, port int, username, password stri
4440
return &TimeplusEngine{
4541
connection: connection,
4642
logger: logger,
43+
analyzeURL: fmt.Sprintf("http://%s:%d/proton/v1/sqlanalyzer", host, 3218),
4744
}
4845
}
4946

@@ -98,6 +95,44 @@ func (e *TimeplusEngine) RunQuery(ctx context.Context, sql string) ([]*sql.Colum
9895
}
9996

10097
func (e *TimeplusEngine) Dispose() error {
101-
e.logger.Info("Dispose!!!!!")
10298
return e.connection.Close()
10399
}
100+
101+
func (e *TimeplusEngine) IsStreamingQuery(ctx context.Context, query string) (bool, error) {
102+
queryMap := map[string]string{"query": query}
103+
jsonData, err := json.Marshal(queryMap)
104+
if err != nil {
105+
return false, err
106+
}
107+
108+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, e.analyzeURL, bytes.NewBuffer(jsonData))
109+
if err != nil {
110+
return false, err
111+
}
112+
req.Header.Set("Content-Type", "application/json")
113+
114+
client := &http.Client{}
115+
resp, err := client.Do(req)
116+
if err != nil {
117+
return false, err
118+
}
119+
120+
defer resp.Body.Close()
121+
122+
body, err := io.ReadAll(resp.Body)
123+
if err != nil {
124+
return false, err
125+
}
126+
127+
var response map[string]interface{}
128+
if err = json.Unmarshal(body, &response); err != nil {
129+
return false, err
130+
}
131+
132+
isStreaming, ok := response["is_streaming"].(bool)
133+
if !ok {
134+
return false, fmt.Errorf("invalid response %s", response)
135+
}
136+
137+
return isStreaming, nil
138+
}

Diff for: pkg/timeplus/interface.go

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
type Engine interface {
99
RunQuery(ctx context.Context, query string) ([]*sql.ColumnType, chan []any, error)
10+
IsStreamingQuery(ctx context.Context, query string) (bool, error)
1011

1112
Ping() error
1213
Dispose() error

Diff for: src/datasource.ts

+2-25
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
import { DataSourceInstanceSettings, ScopedVars, DataQueryRequest, DataQueryResponse, LiveChannelScope } from '@grafana/data';
2-
import { DataSourceWithBackend, getGrafanaLiveSrv, getTemplateSrv } from '@grafana/runtime';
1+
import { DataSourceInstanceSettings, ScopedVars } from '@grafana/data';
2+
import { DataSourceWithBackend, getTemplateSrv } from '@grafana/runtime';
33

44
import { TpQuery, TpDataSourceOptions } from './types';
5-
import { merge, Observable } from 'rxjs';
65

76
export class DataSource extends DataSourceWithBackend<TpQuery, TpDataSourceOptions> {
87
constructor(instanceSettings: DataSourceInstanceSettings<TpDataSourceOptions>) {
@@ -22,27 +21,5 @@ export class DataSource extends DataSourceWithBackend<TpQuery, TpDataSourceOptio
2221
return !!query.sql;
2322
}
2423

25-
query(request: DataQueryRequest<TpQuery>): Observable<DataQueryResponse> {
26-
const observables = request.targets.map((query, index) => {
27-
28-
return getGrafanaLiveSrv().getDataStream({
29-
addr: {
30-
scope: LiveChannelScope.DataSource,
31-
namespace: this.uid,
32-
path: `timeplus/${this.uid}/${uuidv4()}`, // this will allow each new query to create a new connection
33-
data: {
34-
...query,
35-
},
36-
},
37-
});
38-
});
39-
40-
return merge(...observables);
41-
}
4224
}
4325

44-
function uuidv4() {
45-
return "10000000-1000-4000-8000-100000000000".replace(/[018]/g, c =>
46-
(+c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> +c / 4).toString(16)
47-
);
48-
}

0 commit comments

Comments
 (0)