Skip to content

Commit c99d334

Browse files
authored
feat: propagate query error to frontend, default batch
2 parents b0a1ada + a5d40e6 commit c99d334

File tree

1 file changed

+50
-9
lines changed

1 file changed

+50
-9
lines changed

pkg/plugin/datasource.go

+50-9
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package plugin
33
import (
44
"context"
55
"encoding/json"
6+
"time"
67

78
"github.com/google/uuid"
89
"github.com/grafana/grafana-plugin-sdk-go/backend"
@@ -14,6 +15,11 @@ import (
1415
"github.com/timeplus-io/proton-grafana-source/pkg/timeplus"
1516
)
1617

18+
const (
19+
batchSize = 1000
20+
batchIntervalMS = 100
21+
)
22+
1723
var (
1824
_ backend.CheckHealthHandler = (*Datasource)(nil)
1925
_ backend.StreamHandler = (*Datasource)(nil)
@@ -49,19 +55,26 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques
4955
response := backend.NewQueryDataResponse()
5056

5157
for _, query := range req.Queries {
58+
resp := backend.DataResponse{}
59+
5260
q := queryModel{}
5361
if err := json.Unmarshal(query.JSON, &q); err != nil {
54-
return nil, err
62+
resp.Error = err
63+
resp.Status = backend.StatusBadRequest
64+
response.Responses[query.RefID] = resp
65+
continue
5566
}
5667

57-
resp := backend.DataResponse{}
58-
frame := data.NewFrame("response")
59-
6068
isStreaming, err := d.engine.IsStreamingQuery(ctx, q.SQL)
6169
if err != nil {
62-
return nil, err
70+
resp.Error = err
71+
resp.Status = backend.StatusBadRequest
72+
response.Responses[query.RefID] = resp
73+
continue
6374
}
6475

76+
frame := data.NewFrame("response")
77+
6578
if isStreaming {
6679
id := uuid.NewString()
6780
d.queries[id] = q.SQL
@@ -76,7 +89,10 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques
7689
count := 0
7790
columnTypes, ch, err := d.engine.RunQuery(ctx, q.SQL)
7891
if err != nil {
79-
return nil, err
92+
resp.Error = err
93+
resp.Status = backend.StatusInternal
94+
response.Responses[query.RefID] = resp
95+
continue
8096
}
8197

8298
for _, col := range columnTypes {
@@ -159,6 +175,12 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques
159175
return err
160176
}
161177

178+
ticker := time.NewTicker(batchIntervalMS * time.Millisecond)
179+
var (
180+
frame *data.Frame
181+
count int
182+
)
183+
162184
for {
163185
select {
164186
case <-ctx.Done():
@@ -168,10 +190,12 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques
168190
logger.Warn("Streaming query terminated")
169191
return nil
170192
}
171-
frame := data.NewFrame("response")
193+
if frame == nil {
194+
frame = data.NewFrame("response")
172195

173-
for _, c := range columnTypes {
174-
frame.Fields = append(frame.Fields, timeplus.NewDataFieldByType(c.Name(), c.DatabaseTypeName()))
196+
for _, c := range columnTypes {
197+
frame.Fields = append(frame.Fields, timeplus.NewDataFieldByType(c.Name(), c.DatabaseTypeName()))
198+
}
175199
}
176200

177201
fData := make([]any, len(columnTypes))
@@ -181,9 +205,26 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques
181205
}
182206

183207
frame.AppendRow(fData...)
208+
count++
209+
210+
if count >= batchSize {
211+
if err := sender.SendFrame(frame, data.IncludeAll); err != nil {
212+
logger.Error("Failed send frame", "error", err)
213+
}
214+
frame = nil
215+
count = 0
216+
}
217+
218+
case <-ticker.C:
219+
if frame == nil || count == 0 {
220+
continue
221+
}
222+
184223
if err := sender.SendFrame(frame, data.IncludeAll); err != nil {
185224
logger.Error("Failed send frame", "error", err)
186225
}
226+
frame = nil
227+
count = 0
187228
}
188229
}
189230
}

0 commit comments

Comments
 (0)