Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: set ref id for data frame #20

Closed
wants to merge 14 commits into from
Prev Previous commit
Next Next commit
feat: propagate query error to frontend, default batch
ye11ow committed Jan 21, 2025
commit a5d40e64fd434084c0106d0d728d915a94560696
59 changes: 50 additions & 9 deletions pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package plugin
import (
"context"
"encoding/json"
"time"

"github.com/google/uuid"
"github.com/grafana/grafana-plugin-sdk-go/backend"
@@ -14,6 +15,11 @@ import (
"github.com/timeplus-io/proton-grafana-source/pkg/timeplus"
)

const (
batchSize = 1000
batchIntervalMS = 100
)

var (
_ backend.CheckHealthHandler = (*Datasource)(nil)
_ backend.StreamHandler = (*Datasource)(nil)
@@ -49,19 +55,26 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques
response := backend.NewQueryDataResponse()

for _, query := range req.Queries {
resp := backend.DataResponse{}

q := queryModel{}
if err := json.Unmarshal(query.JSON, &q); err != nil {
return nil, err
resp.Error = err
resp.Status = backend.StatusBadRequest
response.Responses[query.RefID] = resp
continue
}

resp := backend.DataResponse{}
frame := data.NewFrame("response")

isStreaming, err := d.engine.IsStreamingQuery(ctx, q.SQL)
if err != nil {
return nil, err
resp.Error = err
resp.Status = backend.StatusBadRequest
response.Responses[query.RefID] = resp
continue
}

frame := data.NewFrame("response")

if isStreaming {
id := uuid.NewString()
d.queries[id] = q.SQL
@@ -76,7 +89,10 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques
count := 0
columnTypes, ch, err := d.engine.RunQuery(ctx, q.SQL)
if err != nil {
return nil, err
resp.Error = err
resp.Status = backend.StatusInternal
response.Responses[query.RefID] = resp
continue
}

for _, col := range columnTypes {
@@ -159,6 +175,12 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques
return err
}

ticker := time.NewTicker(batchIntervalMS * time.Millisecond)
var (
frame *data.Frame
count int
)

for {
select {
case <-ctx.Done():
@@ -168,10 +190,12 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques
logger.Warn("Streaming query terminated")
return nil
}
frame := data.NewFrame("response")
if frame == nil {
frame = data.NewFrame("response")

for _, c := range columnTypes {
frame.Fields = append(frame.Fields, timeplus.NewDataFieldByType(c.Name(), c.DatabaseTypeName()))
for _, c := range columnTypes {
frame.Fields = append(frame.Fields, timeplus.NewDataFieldByType(c.Name(), c.DatabaseTypeName()))
}
}

fData := make([]any, len(columnTypes))
@@ -181,9 +205,26 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques
}

frame.AppendRow(fData...)
count++

if count >= batchSize {
if err := sender.SendFrame(frame, data.IncludeAll); err != nil {
logger.Error("Failed send frame", "error", err)
}
frame = nil
count = 0
}

case <-ticker.C:
if frame == nil || count == 0 {
continue
}

if err := sender.SendFrame(frame, data.IncludeAll); err != nil {
logger.Error("Failed send frame", "error", err)
}
frame = nil
count = 0
}
}
}