diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index 7080357..0e52b87 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -3,6 +3,7 @@ package plugin import ( "context" "encoding/json" + "time" "github.com/google/uuid" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -14,6 +15,11 @@ import ( "github.com/timeplus-io/proton-grafana-source/pkg/timeplus" ) +const ( + batchSize = 1000 + batchIntervalMS = 100 +) + var ( _ backend.CheckHealthHandler = (*Datasource)(nil) _ backend.StreamHandler = (*Datasource)(nil) @@ -49,19 +55,26 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques response := backend.NewQueryDataResponse() for _, query := range req.Queries { + resp := backend.DataResponse{} + q := queryModel{} if err := json.Unmarshal(query.JSON, &q); err != nil { - return nil, err + resp.Error = err + resp.Status = backend.StatusBadRequest + response.Responses[query.RefID] = resp + continue } - resp := backend.DataResponse{} - frame := data.NewFrame("response") - isStreaming, err := d.engine.IsStreamingQuery(ctx, q.SQL) if err != nil { - return nil, err + resp.Error = err + resp.Status = backend.StatusBadRequest + response.Responses[query.RefID] = resp + continue } + frame := data.NewFrame("response") + if isStreaming { id := uuid.NewString() d.queries[id] = q.SQL @@ -76,7 +89,10 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques count := 0 columnTypes, ch, err := d.engine.RunQuery(ctx, q.SQL) if err != nil { - return nil, err + resp.Error = err + resp.Status = backend.StatusInternal + response.Responses[query.RefID] = resp + continue } for _, col := range columnTypes { @@ -159,6 +175,12 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques return err } + ticker := time.NewTicker(batchIntervalMS * time.Millisecond) + var ( + frame *data.Frame + count int + ) + for { select { case <-ctx.Done(): @@ -168,10 +190,12 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques logger.Warn("Streaming query terminated") return nil } - frame := data.NewFrame("response") + if frame == nil { + frame = data.NewFrame("response") - for _, c := range columnTypes { - frame.Fields = append(frame.Fields, timeplus.NewDataFieldByType(c.Name(), c.DatabaseTypeName())) + for _, c := range columnTypes { + frame.Fields = append(frame.Fields, timeplus.NewDataFieldByType(c.Name(), c.DatabaseTypeName())) + } } fData := make([]any, len(columnTypes)) @@ -181,9 +205,26 @@ func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamReques } frame.AppendRow(fData...) + count++ + + if count >= batchSize { + if err := sender.SendFrame(frame, data.IncludeAll); err != nil { + logger.Error("Failed send frame", "error", err) + } + frame = nil + count = 0 + } + + case <-ticker.C: + if frame == nil || count == 0 { + continue + } + if err := sender.SendFrame(frame, data.IncludeAll); err != nil { logger.Error("Failed send frame", "error", err) } + frame = nil + count = 0 } } }