Skip to content
Open
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ Available configuration fields are as follows:
Database (Optional) | Specifies the default database to use once connected.
Schema (Optional) | Specifies the default schema to use for the specified database once connected.
Extra Options (Optional) | Specifies a series of one or more parameters, in the form of `<param>=<value>`, with each parameter separated by the ampersand character (&), and no spaces anywhere in the connection string.
max. open Connections | How many connections to snowflake are opened at a time. If the limit of open connections is exceeded newer queries will be cached in the queue. [default: 100]
max. queued Queries | Queue size of the internal query queue. If this limit is exceeded the query will be dropped and and error is thrown. Should always be higher as `max. open Connections`. 0 to disable. [default: 400]
Connection lifetime | Time in minutes until unnused connections are recycled. [default: 60min]

#### Supported Macros

Expand Down
22 changes: 7 additions & 15 deletions pkg/check_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"database/sql"
"fmt"

"github.com/grafana/grafana-plugin-sdk-go/backend"
Expand All @@ -14,25 +13,18 @@ import (
// datasource configuration page which allows users to verify that
// a datasource is working as expected.
func (td *SnowflakeDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {

connectionString, result := createAndValidationConnectionString(req)
_, result := createAndValidationConnectionString(req)
if result != nil {
return result, nil
}
// Use the existing db field instead of opening a new connection
if td.db == nil {
var err error
td.db, err = sql.Open("snowflake", connectionString)
if err != nil {
return &backend.CheckHealthResult{
Status: backend.HealthStatusError,
Message: fmt.Sprintf("Connection issue : %s", err),
}, nil
}
i, err := td.im.Get(ctx, req.PluginContext)
if err != nil {
return nil, err
}
defer td.db.Close()
instance := i.(*instanceSettings)
db := instance.db

row, err := td.db.QueryContext(ctx, "SELECT 1")
row, err := db.QueryContext(ctx, "SELECT 1")
if err != nil {
return &backend.CheckHealthResult{
Status: backend.HealthStatusError,
Expand Down
36 changes: 29 additions & 7 deletions pkg/check_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"database/sql"
"fmt"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/stretchr/testify/require"
"testing"
)

func TestCheckHealthWithValidConnection(t *testing.T) {
Expand All @@ -16,7 +18,6 @@ func TestCheckHealthWithValidConnection(t *testing.T) {
defer db.Close()

mock.ExpectQuery("SELECT 1").WillReturnRows(sqlmock.NewRows([]string{"1"}).AddRow(1))

req := &backend.CheckHealthRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
Expand All @@ -26,8 +27,10 @@ func TestCheckHealthWithValidConnection(t *testing.T) {
},
}
ctx := context.Background()
td := &SnowflakeDatasource{db: db}
result, err := td.CheckHealth(ctx, req)

service := GetMockService(db)
service.im.Get(ctx, backend.PluginContext{})
result, err := service.CheckHealth(ctx, req)
require.NoError(t, err)
require.Equal(t, backend.HealthStatusOk, result.Status)
require.Equal(t, "Data source is working", result.Message)
Expand All @@ -39,7 +42,6 @@ func TestCheckHealthWithInvalidConnection(t *testing.T) {
defer db.Close()

mock.ExpectQuery("SELECT 1").WillReturnError(sql.ErrConnDone)

req := &backend.CheckHealthRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
Expand All @@ -49,8 +51,9 @@ func TestCheckHealthWithInvalidConnection(t *testing.T) {
},
}
ctx := context.Background()
td := &SnowflakeDatasource{db: db}
result, err := td.CheckHealth(ctx, req)
service := GetMockService(db)
service.im.Get(ctx, backend.PluginContext{})
result, err := service.CheckHealth(ctx, req)
require.NoError(t, err)
require.Equal(t, backend.HealthStatusError, result.Status)
require.Contains(t, result.Message, "Validation query error")
Expand Down Expand Up @@ -174,3 +177,22 @@ func TestCreateAndValidationConnectionString(t *testing.T) {
})
}
}

type FakeInstanceManager struct {
db *sql.DB
}

func (fakeInstanceManager *FakeInstanceManager) Get(ctx context.Context, setting backend.PluginContext) (instancemgmt.Instance, error) {
config := pluginConfig{} ///getConfig(&setting)
return &instanceSettings{db: fakeInstanceManager.db, config: &config}, nil
}

func (*FakeInstanceManager) Do(_ context.Context, _ backend.PluginContext, _ instancemgmt.InstanceCallbackFunc) error {
return nil
}

func GetMockService(db *sql.DB) *SnowflakeDatasource {
return &SnowflakeDatasource{
im: &FakeInstanceManager{db: db},
}
}
121 changes: 91 additions & 30 deletions pkg/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"math/big"
"reflect"
"runtime/debug"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
Expand All @@ -25,6 +29,20 @@ func (qc *queryConfigStruct) isTimeSeriesType() bool {
return qc.QueryType == timeSeriesType
}

type queryCounter int32

func (c *queryCounter) inc() int32 {
return atomic.AddInt32((*int32)(c), 1)
}

func (c *queryCounter) dec() int32 {
return atomic.AddInt32((*int32)(c), -1)
}

func (c *queryCounter) get() int32 {
return atomic.LoadInt32((*int32)(c))
}

type queryConfigStruct struct {
FinalQuery string
QueryType string
Expand All @@ -35,6 +53,9 @@ type queryConfigStruct struct {
MaxDataPoints int64
FillMode string
FillValue float64
db *sql.DB
config *pluginConfig
actQueryCount *queryCounter
}

// type
Expand All @@ -58,22 +79,26 @@ type queryModel struct {
FillMode string `json:"fillMode"`
}

func (qc *queryConfigStruct) fetchData(ctx context.Context, config *pluginConfig, password string, privateKey string) (result DataQueryResult, err error) {
func (qc *queryConfigStruct) fetchData(ctx context.Context) (result DataQueryResult, err error) {
qc.actQueryCount.inc()
// Custom configuration to reduce memory footprint
sf.MaxChunkDownloadWorkers = 2
sf.CustomJSONDecoderEnabled = true

connectionString := getConnectionString(config, password, privateKey)

db, err := sql.Open("snowflake", connectionString)
if err != nil {
log.DefaultLogger.Error("Could not open database", "err", err)
start := time.Now()
stats := qc.db.Stats()
defer func() {
qc.actQueryCount.dec()
duration := time.Since(start)
log.DefaultLogger.Info(fmt.Sprintf("%+v - %s - %d", stats, duration, int(qc.actQueryCount.get())))

}()
if int(qc.config.IntMaxQueuedQueries) > 0 && int(qc.actQueryCount.get()) >= (int(qc.config.IntMaxQueuedQueries)) {
err := errors.New("too many queries in queue. Check Snowflake connectivity or increase MaxQueuedQeries count")
log.DefaultLogger.Error("Poolsize exceeded", "query", qc.FinalQuery, "err", err)
return result, err
}
defer db.Close()

log.DefaultLogger.Info("Query", "finalQuery", qc.FinalQuery)
rows, err := db.QueryContext(ctx, qc.FinalQuery)
rows, err := qc.db.QueryContext(ctx, qc.FinalQuery)
if err != nil {
if strings.Contains(err.Error(), "000605") {
log.DefaultLogger.Info("Query got cancelled", "query", qc.FinalQuery, "err", err)
Expand All @@ -83,7 +108,11 @@ func (qc *queryConfigStruct) fetchData(ctx context.Context, config *pluginConfig
log.DefaultLogger.Error("Could not execute query", "query", qc.FinalQuery, "err", err)
return result, err
}
defer rows.Close()
defer func() {
if err := rows.Close(); err != nil {
log.DefaultLogger.Warn("Failed to close rows", "err", err)
}
}()

columnTypes, err := rows.ColumnTypes()
if err != nil {
Expand Down Expand Up @@ -185,19 +214,39 @@ func (qc *queryConfigStruct) transformQueryResult(columnTypes []*sql.ColumnType,
return values, nil
}

func (td *SnowflakeDatasource) query(ctx context.Context, dataQuery backend.DataQuery, config pluginConfig, password string, privateKey string) (response backend.DataResponse) {
func (td *SnowflakeDatasource) query(ctx context.Context, wg *sync.WaitGroup, ch chan DBDataResponse, instance *instanceSettings, dataQuery backend.DataQuery) {
defer wg.Done()
queryResult := DBDataResponse{
dataResponse: backend.DataResponse{},
refID: dataQuery.RefID,
}

defer func() {
if r := recover(); r != nil {
log.DefaultLogger.Error("ExecuteQuery panic", "error", r, "stack", string(debug.Stack()))
if theErr, ok := r.(error); ok {
queryResult.dataResponse.Error = theErr
} else if theErrString, ok := r.(string); ok {
queryResult.dataResponse.Error = fmt.Errorf(theErrString)
} else {
//queryResult.dataResponse.Error = fmt.Errorf("unexpected error - %s", td.userError)
}
ch <- queryResult
}
}()

var qm queryModel
err := json.Unmarshal(dataQuery.JSON, &qm)
if err != nil {
log.DefaultLogger.Error("Could not unmarshal query", "err", err)
response.Error = err
return response
//log.DefaultLogger.Error("Could not unmarshal query", "err", err)
//queryResult.dataResponse.Error = err
panic("Could not unmarshal query")
}

if qm.QueryText == "" {
log.DefaultLogger.Error("SQL query must no be empty")
response.Error = fmt.Errorf("SQL query must no be empty")
return response
//log.DefaultLogger.Error("SQL query must no be empty")
//queryResult.dataResponse.Error = fmt.Errorf("SQL query must no be empty")
panic("Query model property rawSql should not be empty at this point")
}

queryConfig := queryConfigStruct{
Expand All @@ -209,32 +258,44 @@ func (td *SnowflakeDatasource) query(ctx context.Context, dataQuery backend.Data
Interval: dataQuery.Interval,
TimeRange: dataQuery.TimeRange,
MaxDataPoints: dataQuery.MaxDataPoints,
db: instance.db,
config: instance.config,
actQueryCount: &instance.actQueryCount,
}

log.DefaultLogger.Info("Query config", "config", qm)
errAppendDebug := func(frameErr string, err error, query string) {
var emptyFrame data.Frame
emptyFrame.SetMeta(&data.FrameMeta{
ExecutedQueryString: query,
})
queryResult.dataResponse.Error = fmt.Errorf("%s: %w", frameErr, err)
queryResult.dataResponse.Frames = data.Frames{&emptyFrame}
ch <- queryResult
}

// Apply macros
queryConfig.FinalQuery, err = Interpolate(&queryConfig)
if err != nil {
response.Error = err
return response
errAppendDebug("interpolation failed", err, queryConfig.FinalQuery)
return
}

// Remove final semi column
queryConfig.FinalQuery = strings.TrimSuffix(strings.TrimSpace(queryConfig.FinalQuery), ";")

frame := data.NewFrame("")
dataResponse, err := queryConfig.fetchData(ctx, &config, password, privateKey)
dataResponse, err := queryConfig.fetchData(ctx)
if err != nil {
response.Error = err
return response
errAppendDebug("db query error", err, queryConfig.FinalQuery)
return
}
log.DefaultLogger.Debug("Response", "data", dataResponse)
for _, table := range dataResponse.Tables {
timeColumnIndex := -1
for i, column := range table.Columns {
if err != nil {
return backend.DataResponse{}
errAppendDebug("db query error", err, queryConfig.FinalQuery)
return
}
// Check time column
if queryConfig.isTimeSeriesType() && equalsIgnoreCase(queryConfig.TimeColumns, column.Name()) {
Expand Down Expand Up @@ -287,8 +348,9 @@ func (td *SnowflakeDatasource) query(ctx context.Context, dataQuery backend.Data
if queryConfig.isTimeSeriesType() {
frame, err = td.longToWide(frame, queryConfig, dataResponse)
if err != nil {
response.Error = err
return response
queryResult.dataResponse.Error = fmt.Errorf("%w", err)
queryResult.dataResponse.Frames = data.Frames{frame}
ch <- queryResult
}
}
log.DefaultLogger.Debug("Converted wide time Frame is:", frame)
Expand All @@ -298,9 +360,8 @@ func (td *SnowflakeDatasource) query(ctx context.Context, dataQuery backend.Data
ExecutedQueryString: queryConfig.FinalQuery,
}

response.Frames = append(response.Frames, frame)

return response
queryResult.dataResponse.Frames = data.Frames{frame}
ch <- queryResult
}

func (td *SnowflakeDatasource) longToWide(frame *data.Frame, queryConfig queryConfigStruct, dataResponse DataQueryResult) (*data.Frame, error) {
Expand Down
Loading