@@ -3,6 +3,7 @@ package plugin
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
+ "time"
6
7
7
8
"github.com/google/uuid"
8
9
"github.com/grafana/grafana-plugin-sdk-go/backend"
@@ -14,6 +15,11 @@ import (
14
15
"github.com/timeplus-io/proton-grafana-source/pkg/timeplus"
15
16
)
16
17
18
+ const (
19
+ batchSize = 1000
20
+ batchIntervalMS = 100
21
+ )
22
+
17
23
var (
18
24
_ backend.CheckHealthHandler = (* Datasource )(nil )
19
25
_ backend.StreamHandler = (* Datasource )(nil )
@@ -49,19 +55,26 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques
49
55
response := backend .NewQueryDataResponse ()
50
56
51
57
for _ , query := range req .Queries {
58
+ resp := backend.DataResponse {}
59
+
52
60
q := queryModel {}
53
61
if err := json .Unmarshal (query .JSON , & q ); err != nil {
54
- return nil , err
62
+ resp .Error = err
63
+ resp .Status = backend .StatusBadRequest
64
+ response .Responses [query .RefID ] = resp
65
+ continue
55
66
}
56
67
57
- resp := backend.DataResponse {}
58
- frame := data .NewFrame ("response" )
59
-
60
68
isStreaming , err := d .engine .IsStreamingQuery (ctx , q .SQL )
61
69
if err != nil {
62
- return nil , err
70
+ resp .Error = err
71
+ resp .Status = backend .StatusBadRequest
72
+ response .Responses [query .RefID ] = resp
73
+ continue
63
74
}
64
75
76
+ frame := data .NewFrame ("response" )
77
+
65
78
if isStreaming {
66
79
id := uuid .NewString ()
67
80
d .queries [id ] = q .SQL
@@ -76,7 +89,10 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques
76
89
count := 0
77
90
columnTypes , ch , err := d .engine .RunQuery (ctx , q .SQL )
78
91
if err != nil {
79
- return nil , err
92
+ resp .Error = err
93
+ resp .Status = backend .StatusInternal
94
+ response .Responses [query .RefID ] = resp
95
+ continue
80
96
}
81
97
82
98
for _ , col := range columnTypes {
@@ -159,6 +175,12 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques
159
175
return err
160
176
}
161
177
178
+ ticker := time .NewTicker (batchIntervalMS * time .Millisecond )
179
+ var (
180
+ frame * data.Frame
181
+ count int
182
+ )
183
+
162
184
for {
163
185
select {
164
186
case <- ctx .Done ():
@@ -168,10 +190,12 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques
168
190
logger .Warn ("Streaming query terminated" )
169
191
return nil
170
192
}
171
- frame := data .NewFrame ("response" )
193
+ if frame == nil {
194
+ frame = data .NewFrame ("response" )
172
195
173
- for _ , c := range columnTypes {
174
- frame .Fields = append (frame .Fields , timeplus .NewDataFieldByType (c .Name (), c .DatabaseTypeName ()))
196
+ for _ , c := range columnTypes {
197
+ frame .Fields = append (frame .Fields , timeplus .NewDataFieldByType (c .Name (), c .DatabaseTypeName ()))
198
+ }
175
199
}
176
200
177
201
fData := make ([]any , len (columnTypes ))
@@ -181,9 +205,26 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques
181
205
}
182
206
183
207
frame .AppendRow (fData ... )
208
+ count ++
209
+
210
+ if count >= batchSize {
211
+ if err := sender .SendFrame (frame , data .IncludeAll ); err != nil {
212
+ logger .Error ("Failed send frame" , "error" , err )
213
+ }
214
+ frame = nil
215
+ count = 0
216
+ }
217
+
218
+ case <- ticker .C :
219
+ if frame == nil || count == 0 {
220
+ continue
221
+ }
222
+
184
223
if err := sender .SendFrame (frame , data .IncludeAll ); err != nil {
185
224
logger .Error ("Failed send frame" , "error" , err )
186
225
}
226
+ frame = nil
227
+ count = 0
187
228
}
188
229
}
189
230
}
0 commit comments