Skip to content

Commit e68c8ee

Browse files
authored
Merge pull request #136 from scalyr/dtin-6364
DTIN-6364: Rate limit by LRQ session rather than http request
2 parents 1ebe805 + bf2d09b commit e68c8ee

File tree

5 files changed

+143
-9
lines changed

5 files changed

+143
-9
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## 3.1.6
4+
5+
- Rate limit DataSet LRQ sessions rather than HTTP requests
6+
- Prevents LRQ sessions from being terminated if a LRQ ping is throttled
7+
38
## 3.1.5
49

510
- Updated grafana-plugin-sdk-go to 0.250.0

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "sentinelone-dataset-datasource",
3-
"version": "3.1.5",
3+
"version": "3.1.6",
44
"description": "Scalyr Observability Platform",
55
"scripts": {
66
"build": "webpack -c ./.config/webpack/webpack.config.ts --env production",

pkg/plugin/client.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func NewDataSetClient(dataSetUrl string, apiKey string) DataSetClient {
4646

4747
// TODO Are there alternate approaches to implementing rate limits via the Grafana SDK?
4848
// Consult with Grafana support about this, potentially there's a simplier option.
49-
rateLimiter := rate.NewLimiter(100*rate.Every(1*time.Minute), 100) // 100 requests / minute
49+
rateLimiter := rate.NewLimiter(100*rate.Every(1*time.Minute), 100) // 100 LRQ sessions / minute
5050

5151
return &dataSetClient{
5252
dataSetUrl: dataSetUrl,
@@ -57,11 +57,16 @@ func NewDataSetClient(dataSetUrl string, apiKey string) DataSetClient {
5757
}
5858

5959
func (d *dataSetClient) newRequest(method, url string, body io.Reader) (*http.Request, error) {
60-
const VERSION = "3.1.5"
61-
62-
if err := d.rateLimiter.Wait(context.Background()); err != nil {
63-
log.DefaultLogger.Error("error applying rate limiter", "err", err)
64-
return nil, err
60+
const VERSION = "3.1.6"
61+
62+
// Apply the rate limiter to the initial POST request of the LRQ api session.
63+
// This ensures that later LRQ api "pings" (ie GET requests) are not rate-limited;
64+
// timely pings are necessary to prevent the LRQ session from being terminated.
65+
if method == http.MethodPost {
66+
if err := d.rateLimiter.Wait(context.Background()); err != nil {
67+
log.DefaultLogger.Error("error applying rate limiter", "err", err)
68+
return nil, err
69+
}
6570
}
6671

6772
request, err := http.NewRequest(method, url, body)

pkg/plugin/client_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package plugin
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"net/http/httptest"
8+
"sort"
9+
"strconv"
10+
"sync"
11+
"testing"
12+
"time"
13+
14+
"golang.org/x/time/rate"
15+
)
16+
17+
func TestClientRateLimiter(t *testing.T) {
18+
mockedServerState := struct {
19+
requestTimesByMethod map[string][]time.Time
20+
nextSessionId int
21+
mutex sync.Mutex
22+
}{
23+
requestTimesByMethod: make(map[string][]time.Time),
24+
}
25+
26+
mockedServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
27+
const totalSteps = 2
28+
29+
mockedServerState.mutex.Lock()
30+
defer mockedServerState.mutex.Unlock()
31+
32+
mockedServerState.requestTimesByMethod[r.Method] = append(mockedServerState.requestTimesByMethod[r.Method], time.Now())
33+
t.Logf("request %s %s %s", time.Now().Format("15:04:05"), r.Method, r.URL.String())
34+
35+
switch r.Method {
36+
case http.MethodPost:
37+
sessionId := mockedServerState.nextSessionId
38+
mockedServerState.nextSessionId++
39+
w.WriteHeader(http.StatusOK)
40+
_, err := fmt.Fprintf(w, "{\"id\":\"%d\",\"stepsCompleted\":0,\"totalSteps\":%d}", sessionId, totalSteps)
41+
if err != nil {
42+
t.Fatalf("failed to write response: %v", err)
43+
}
44+
45+
case http.MethodGet:
46+
sessionId := r.URL.Path
47+
lastStepSeen, err := strconv.Atoi(r.URL.Query().Get("lastStepSeen"))
48+
if err != nil {
49+
t.Fatalf("failed to parse path %s: %v", r.URL.String(), err)
50+
}
51+
w.WriteHeader(http.StatusOK)
52+
_, err = fmt.Fprintf(w, "{\"id\":\"%s\",\"stepsCompleted\":%d,\"totalSteps\":%d}", sessionId, lastStepSeen+1, totalSteps)
53+
if err != nil {
54+
t.Fatalf("failed to write response: %v", err)
55+
}
56+
57+
case http.MethodDelete:
58+
w.WriteHeader(http.StatusOK)
59+
}
60+
}))
61+
defer mockedServer.Close()
62+
63+
datasetClient := &dataSetClient{
64+
dataSetUrl: mockedServer.URL,
65+
apiKey: "<api-key>",
66+
netClient: &http.Client{},
67+
rateLimiter: rate.NewLimiter(1*rate.Every(1*time.Second), 1),
68+
}
69+
70+
request := LRQRequest{
71+
QueryType: PQ,
72+
StartTime: time.Now().Add(-4 * time.Hour).Unix(),
73+
EndTime: time.Now().Unix(),
74+
Pq: &PQOptions{
75+
Query: "message contains 'error'\n| columns timestamp,severity,message",
76+
ResultType: TABLE,
77+
},
78+
}
79+
80+
var waitGroup sync.WaitGroup
81+
var clientErr error
82+
for i := 0; i < 3; i++ {
83+
waitGroup.Add(1)
84+
go func() {
85+
defer waitGroup.Done()
86+
_, err := datasetClient.DoLRQRequest(context.Background(), request)
87+
if err != nil {
88+
clientErr = err
89+
}
90+
}()
91+
}
92+
waitGroup.Wait()
93+
if clientErr != nil {
94+
t.Fatalf("failed to execute (at least one) request: %v", clientErr)
95+
}
96+
97+
// Find the minimum time difference between consecutive elements.
98+
// The elements are expected to be already sorted ascendingly.
99+
minTimeDiff := func(times []time.Time) time.Duration {
100+
rv := times[1].Sub(times[0])
101+
for i := 2; i < len(times); i++ {
102+
if diff := times[i].Sub(times[i-1]); diff < rv {
103+
rv = diff
104+
}
105+
}
106+
return rv
107+
}
108+
109+
// The rate limiter is set to only allow one session (POST request) per second.
110+
// Verify that the minimum amount of time between consecutive POST requests is at least one second.
111+
if diff := minTimeDiff(mockedServerState.requestTimesByMethod[http.MethodPost]); diff.Round(time.Second) < time.Second {
112+
t.Errorf("expected >= 1s, actual = %v", diff)
113+
}
114+
115+
// There are no restrictions on GET or DELETE requests.
116+
// Verify that the minimum amount of time between non-POST requests is less than 500 milliseconds.
117+
var requestTimes []time.Time
118+
requestTimes = append(requestTimes, mockedServerState.requestTimesByMethod[http.MethodGet]...)
119+
requestTimes = append(requestTimes, mockedServerState.requestTimesByMethod[http.MethodDelete]...)
120+
sort.Slice(requestTimes, func(i, j int) bool { return requestTimes[i].Before(requestTimes[j]) })
121+
if diff := minTimeDiff(requestTimes); diff.Round(time.Millisecond) > 500*time.Millisecond {
122+
t.Errorf("expected < 500ms, actual = %v", diff)
123+
}
124+
}

src/plugin.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
"path": "img/DatasetConfig.png"
4444
}
4545
],
46-
"version": "3.1.5",
47-
"updated": "2025-07-24"
46+
"version": "3.1.6",
47+
"updated": "2025-09-19"
4848
},
4949
"dependencies": {
5050
"grafanaDependency": ">=8.2.0",

0 commit comments

Comments
 (0)