Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 3.1.6

- Rate limit DataSet LRQ sessions rather than HTTP requests
- Prevents LRQ sessions from being terminated if a LRQ ping is throttled

## 3.1.5

- Updated grafana-plugin-sdk-go to 0.250.0
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "sentinelone-dataset-datasource",
"version": "3.1.5",
"version": "3.1.6",
"description": "Scalyr Observability Platform",
"scripts": {
"build": "webpack -c ./.config/webpack/webpack.config.ts --env production",
Expand Down
17 changes: 11 additions & 6 deletions pkg/plugin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewDataSetClient(dataSetUrl string, apiKey string) DataSetClient {

// TODO Are there alternate approaches to implementing rate limits via the Grafana SDK?
// Consult with Grafana support about this, potentially there's a simplier option.
rateLimiter := rate.NewLimiter(100*rate.Every(1*time.Minute), 100) // 100 requests / minute
rateLimiter := rate.NewLimiter(100*rate.Every(1*time.Minute), 100) // 100 LRQ sessions / minute

return &dataSetClient{
dataSetUrl: dataSetUrl,
Expand All @@ -57,11 +57,16 @@ func NewDataSetClient(dataSetUrl string, apiKey string) DataSetClient {
}

func (d *dataSetClient) newRequest(method, url string, body io.Reader) (*http.Request, error) {
const VERSION = "3.1.5"

if err := d.rateLimiter.Wait(context.Background()); err != nil {
log.DefaultLogger.Error("error applying rate limiter", "err", err)
return nil, err
const VERSION = "3.1.6"

// Apply the rate limiter to the initial POST request of the LRQ api session.
// This ensures that later LRQ api "pings" (ie GET requests) are not rate-limited;
// timely pings are necessary to prevent the LRQ session from being terminated.
if method == http.MethodPost {
if err := d.rateLimiter.Wait(context.Background()); err != nil {
log.DefaultLogger.Error("error applying rate limiter", "err", err)
return nil, err
}
}

request, err := http.NewRequest(method, url, body)
Expand Down
124 changes: 124 additions & 0 deletions pkg/plugin/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package plugin

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"sort"
"strconv"
"sync"
"testing"
"time"

"golang.org/x/time/rate"
)

func TestClientRateLimiter(t *testing.T) {
mockedServerState := struct {
requestTimesByMethod map[string][]time.Time
nextSessionId int
mutex sync.Mutex
}{
requestTimesByMethod: make(map[string][]time.Time),
}

mockedServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
const totalSteps = 2

mockedServerState.mutex.Lock()
defer mockedServerState.mutex.Unlock()

mockedServerState.requestTimesByMethod[r.Method] = append(mockedServerState.requestTimesByMethod[r.Method], time.Now())
t.Logf("request %s %s %s", time.Now().Format("15:04:05"), r.Method, r.URL.String())

switch r.Method {
case http.MethodPost:
sessionId := mockedServerState.nextSessionId
mockedServerState.nextSessionId++
w.WriteHeader(http.StatusOK)
_, err := fmt.Fprintf(w, "{\"id\":\"%d\",\"stepsCompleted\":0,\"totalSteps\":%d}", sessionId, totalSteps)
if err != nil {
t.Fatalf("failed to write response: %v", err)
}

case http.MethodGet:
sessionId := r.URL.Path
lastStepSeen, err := strconv.Atoi(r.URL.Query().Get("lastStepSeen"))
if err != nil {
t.Fatalf("failed to parse path %s: %v", r.URL.String(), err)
}
w.WriteHeader(http.StatusOK)
_, err = fmt.Fprintf(w, "{\"id\":\"%s\",\"stepsCompleted\":%d,\"totalSteps\":%d}", sessionId, lastStepSeen+1, totalSteps)
if err != nil {
t.Fatalf("failed to write response: %v", err)
}

case http.MethodDelete:
w.WriteHeader(http.StatusOK)
}
}))
defer mockedServer.Close()

datasetClient := &dataSetClient{
dataSetUrl: mockedServer.URL,
apiKey: "<api-key>",
netClient: &http.Client{},
rateLimiter: rate.NewLimiter(1*rate.Every(1*time.Second), 1),
}

request := LRQRequest{
QueryType: PQ,
StartTime: time.Now().Add(-4 * time.Hour).Unix(),
EndTime: time.Now().Unix(),
Pq: &PQOptions{
Query: "message contains 'error'\n| columns timestamp,severity,message",
ResultType: TABLE,
},
}

var waitGroup sync.WaitGroup
var clientErr error
for i := 0; i < 3; i++ {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
_, err := datasetClient.DoLRQRequest(context.Background(), request)
if err != nil {
clientErr = err
}
}()
}
waitGroup.Wait()
if clientErr != nil {
t.Fatalf("failed to execute (at least one) request: %v", clientErr)
}

// Find the minimum time difference between consecutive elements.
// The elements are expected to be already sorted ascendingly.
minTimeDiff := func(times []time.Time) time.Duration {
rv := times[1].Sub(times[0])
for i := 2; i < len(times); i++ {
if diff := times[i].Sub(times[i-1]); diff < rv {
rv = diff
}
}
return rv
}

// The rate limiter is set to only allow one session (POST request) per second.
// Verify that the minimum amount of time between consecutive POST requests is at least one second.
if diff := minTimeDiff(mockedServerState.requestTimesByMethod[http.MethodPost]); diff.Round(time.Second) < time.Second {
t.Errorf("expected >= 1s, actual = %v", diff)
}

// There are no restrictions on GET or DELETE requests.
// Verify that the minimum amount of time between non-POST requests is less than 500 milliseconds.
var requestTimes []time.Time
requestTimes = append(requestTimes, mockedServerState.requestTimesByMethod[http.MethodGet]...)
requestTimes = append(requestTimes, mockedServerState.requestTimesByMethod[http.MethodDelete]...)
sort.Slice(requestTimes, func(i, j int) bool { return requestTimes[i].Before(requestTimes[j]) })
if diff := minTimeDiff(requestTimes); diff.Round(time.Millisecond) > 500*time.Millisecond {
t.Errorf("expected < 500ms, actual = %v", diff)
}
}
4 changes: 2 additions & 2 deletions src/plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
"path": "img/DatasetConfig.png"
}
],
"version": "3.1.5",
"updated": "2025-07-24"
"version": "3.1.6",
"updated": "2025-09-19"
},
"dependencies": {
"grafanaDependency": ">=8.2.0",
Expand Down
Loading