Skip to content

Commit b6e4dba

Browse files
committed
Cancel running Trino queries when a new request arrives
Track active queries per panel and cancel in-flight queries when a new QueryData request arrives for the same panel (e.g. user changes filters or presses cancel). The context cancellation propagates through database/sql to the Trino driver, which sends a DELETE request to terminate the query on the Trino server. Fixes #310
1 parent b50c439 commit b6e4dba

File tree

1 file changed

+58
-1
lines changed

1 file changed

+58
-1
lines changed

pkg/trino/datasource-context.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ import (
44
"context"
55
"fmt"
66
"strings"
7+
"sync"
8+
"sync/atomic"
79

810
"github.com/grafana/grafana-plugin-sdk-go/backend"
911
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
12+
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
1013
"github.com/grafana/sqlds/v2"
1114
"github.com/trinodb/grafana-trino/pkg/trino/models"
1215
)
@@ -18,8 +21,17 @@ const (
1821
bearerPrefix = "Bearer "
1922
)
2023

24+
type activeQuery struct {
25+
id uint64
26+
cancel context.CancelFunc
27+
}
28+
2129
type SQLDatasourceWithTrinoUserContext struct {
2230
sqlds.SQLDatasource
31+
32+
mu sync.Mutex
33+
activeQueries map[string]activeQuery
34+
queryCounter atomic.Uint64
2335
}
2436

2537
func (ds *SQLDatasourceWithTrinoUserContext) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
@@ -45,9 +57,51 @@ func (ds *SQLDatasourceWithTrinoUserContext) QueryData(ctx context.Context, req
4557
ctx = context.WithValue(ctx, trinoClientTagsKey, settings.ClientTags)
4658
}
4759

60+
// Create a cancellable context so we can cancel running queries
61+
// when a new request arrives for the same panel (e.g. user changes
62+
// filters or presses cancel). The cancel propagates through
63+
// database/sql to the Trino driver, which sends a DELETE request
64+
// to terminate the query on the Trino server.
65+
ctx, cancel := context.WithCancel(ctx)
66+
defer cancel()
67+
68+
queryID := ds.queryCounter.Add(1)
69+
70+
// Cancel any in-flight query for the same panel/refID.
71+
for _, q := range req.Queries {
72+
key := panelKey(req, q.RefID)
73+
ds.mu.Lock()
74+
if prev, ok := ds.activeQueries[key]; ok {
75+
log.DefaultLogger.Debug("Cancelling previous query", "refId", q.RefID)
76+
prev.cancel()
77+
}
78+
ds.activeQueries[key] = activeQuery{id: queryID, cancel: cancel}
79+
ds.mu.Unlock()
80+
}
81+
82+
defer func() {
83+
for _, q := range req.Queries {
84+
key := panelKey(req, q.RefID)
85+
ds.mu.Lock()
86+
if aq, ok := ds.activeQueries[key]; ok && aq.id == queryID {
87+
delete(ds.activeQueries, key)
88+
}
89+
ds.mu.Unlock()
90+
}
91+
}()
92+
4893
return ds.SQLDatasource.QueryData(ctx, req)
4994
}
5095

96+
// panelKey builds a unique key for a query within a datasource.
97+
func panelKey(req *backend.QueryDataRequest, refID string) string {
98+
dsUID := ""
99+
if req.PluginContext.DataSourceInstanceSettings != nil {
100+
dsUID = req.PluginContext.DataSourceInstanceSettings.UID
101+
}
102+
return dsUID + "/" + refID
103+
}
104+
51105
func (ds *SQLDatasourceWithTrinoUserContext) NewDatasource(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
52106
_, err := ds.SQLDatasource.NewDatasource(settings)
53107
if err != nil {
@@ -58,7 +112,10 @@ func (ds *SQLDatasourceWithTrinoUserContext) NewDatasource(settings backend.Data
58112

59113
func NewDatasource(c sqlds.Driver) *SQLDatasourceWithTrinoUserContext {
60114
base := sqlds.NewDatasource(c)
61-
return &SQLDatasourceWithTrinoUserContext{*base}
115+
return &SQLDatasourceWithTrinoUserContext{
116+
SQLDatasource: *base,
117+
activeQueries: make(map[string]activeQuery),
118+
}
62119
}
63120

64121
func injectAccessToken(ctx context.Context, req *backend.QueryDataRequest) context.Context {

0 commit comments

Comments
 (0)