Skip to content

Commit f976ace

Browse files
DSET-3874: Add retry configuration similar to OTel (#17)
* DSET-3874: Add retry configuration similar to OTel * Remove client.worker - it's not needed * Update Release Notes
1 parent b501f4d commit f976ace

File tree

13 files changed

+400
-138
lines changed

13 files changed

+400
-138
lines changed

RELEASE_NOTES.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
# Release Notes
22

3-
## 0.0.6 Fix Concurency Issues
3+
## 0.0.7 Add more retry parameters
44

5-
* OpenTelemetry can call AddEvents multiple times in parallel. Lets use another Pub/Sub to publish events into topic and then consume them independently.
5+
* To make OpenTelemetry configuration more stable we have introduced more retry options. We have to propagate them to the library.
6+
* Fix another data race caused by `client.worker`.
7+
8+
## 0.0.6 Fix Concurrency Issues
9+
10+
* OpenTelemetry can call AddEvents multiple times in parallel. Let's use another Pub/Sub to publish events into topic and then consume them independently.
611

712
## 0.0.5 Quick Hack
813

examples/client/main.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"net/http"
2222
"time"
2323

24+
"github.com/scalyr/dataset-go/pkg/buffer_config"
25+
2426
"github.com/scalyr/dataset-go/pkg/api/add_events"
2527
"github.com/scalyr/dataset-go/pkg/client"
2628
"github.com/scalyr/dataset-go/pkg/config"
@@ -41,7 +43,14 @@ func main() {
4143
}
4244

4345
// manually adjust delay between sending buffers
44-
cfg, err = cfg.Update(config.WithMaxBufferDelay(3 * BatchDelay))
46+
bufCfg, err := cfg.BufferSettings.Update(buffer_config.WithMaxLifetime(3 * BatchDelay))
47+
if err != nil {
48+
panic(err)
49+
}
50+
cfg, err = cfg.Update(config.WithBufferSettings(*bufCfg))
51+
if err != nil {
52+
panic(err)
53+
}
4554

4655
// build client
4756
cl, err := client.NewClient(

pkg/buffer/buffer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,8 @@ func (buffer *Buffer) ShouldSendSize() bool {
360360
return buffer.countEvents.Load() > 0 && buffer.BufferLengths() > ShouldSentBufferSize
361361
}
362362

363-
func (buffer *Buffer) ShouldSendAge(delay time.Duration) bool {
364-
return buffer.countEvents.Load() > 0 && time.Since(time.Unix(0, buffer.createdAt.Load())) > delay
363+
func (buffer *Buffer) ShouldSendAge(lifetime time.Duration) bool {
364+
return buffer.countEvents.Load() > 0 && time.Since(time.Unix(0, buffer.createdAt.Load())) > lifetime
365365
}
366366

367367
func (buffer *Buffer) BufferLengths() int32 {

pkg/buffer_config/buffer_config.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2023 SentinelOne, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package buffer_config
18+
19+
import (
20+
"time"
21+
)
22+
23+
type DataSetBufferSettings struct {
24+
MaxLifetime time.Duration
25+
MaxSize int
26+
GroupBy []string
27+
RetryInitialInterval time.Duration
28+
RetryMaxInterval time.Duration
29+
RetryMaxElapsedTime time.Duration
30+
}
31+
32+
type DataSetBufferSettingsOption func(*DataSetBufferSettings) error
33+
34+
func WithMaxLifetime(maxLifetime time.Duration) DataSetBufferSettingsOption {
35+
return func(c *DataSetBufferSettings) error {
36+
c.MaxLifetime = maxLifetime
37+
return nil
38+
}
39+
}
40+
41+
func WithMaxSize(maxSize int) DataSetBufferSettingsOption {
42+
return func(c *DataSetBufferSettings) error {
43+
c.MaxSize = maxSize
44+
return nil
45+
}
46+
}
47+
48+
func WithGroupBy(groupBy []string) DataSetBufferSettingsOption {
49+
return func(c *DataSetBufferSettings) error {
50+
c.GroupBy = groupBy
51+
return nil
52+
}
53+
}
54+
55+
func WithRetryInitialInterval(retryInitialInterval time.Duration) DataSetBufferSettingsOption {
56+
return func(c *DataSetBufferSettings) error {
57+
c.RetryInitialInterval = retryInitialInterval
58+
return nil
59+
}
60+
}
61+
62+
func WithRetryMaxInterval(retryMaxInterval time.Duration) DataSetBufferSettingsOption {
63+
return func(c *DataSetBufferSettings) error {
64+
c.RetryMaxInterval = retryMaxInterval
65+
return nil
66+
}
67+
}
68+
69+
func WithRetryMaxElapsedTime(retryMaxElapsedTime time.Duration) DataSetBufferSettingsOption {
70+
return func(c *DataSetBufferSettings) error {
71+
c.RetryMaxElapsedTime = retryMaxElapsedTime
72+
return nil
73+
}
74+
}
75+
76+
func New(opts ...DataSetBufferSettingsOption) (*DataSetBufferSettings, error) {
77+
cfg := &DataSetBufferSettings{}
78+
for _, opt := range opts {
79+
if err := opt(cfg); err != nil {
80+
return nil, err
81+
}
82+
}
83+
return cfg, nil
84+
}
85+
86+
func (cfg *DataSetBufferSettings) Update(opts ...DataSetBufferSettingsOption) (*DataSetBufferSettings, error) {
87+
newCfg := *cfg
88+
for _, opt := range opts {
89+
if err := opt(&newCfg); err != nil {
90+
return &newCfg, err
91+
}
92+
}
93+
return &newCfg, nil
94+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright 2023 SentinelOne, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package buffer_config
17+
18+
import (
19+
"os"
20+
"testing"
21+
"time"
22+
23+
"github.com/maxatome/go-testdeep/helpers/tdsuite"
24+
"github.com/maxatome/go-testdeep/td"
25+
)
26+
27+
type SuiteBufferSettings struct{}
28+
29+
func (s *SuiteBufferSettings) PreTest(t *td.T, testName string) error {
30+
os.Clearenv()
31+
return nil
32+
}
33+
34+
func (s *SuiteBufferSettings) PostTest(t *td.T, testName string) error {
35+
os.Clearenv()
36+
return nil
37+
}
38+
39+
func (s *SuiteBufferSettings) Destroy(t *td.T) error {
40+
os.Clearenv()
41+
return nil
42+
}
43+
44+
func TestSuiteBufferSettings(t *testing.T) {
45+
td.NewT(t)
46+
tdsuite.Run(t, &SuiteBufferSettings{})
47+
}
48+
49+
func (s *SuiteBufferSettings) TestConfigWithOptions(assert, require *td.T) {
50+
bufCfg, errB := New(
51+
WithMaxLifetime(3*time.Second),
52+
WithMaxSize(12345),
53+
WithGroupBy([]string{"aaa", "bbb"}),
54+
WithRetryInitialInterval(8*time.Second),
55+
WithRetryMaxInterval(30*time.Second),
56+
WithRetryMaxElapsedTime(10*time.Minute),
57+
)
58+
59+
assert.CmpNoError(errB)
60+
61+
assert.Cmp(*bufCfg, DataSetBufferSettings{
62+
MaxLifetime: 3 * time.Second,
63+
MaxSize: 12345,
64+
GroupBy: []string{"aaa", "bbb"},
65+
RetryInitialInterval: 8 * time.Second,
66+
RetryMaxInterval: 30 * time.Second,
67+
RetryMaxElapsedTime: 10 * time.Minute,
68+
})
69+
}
70+
71+
func (s *SuiteBufferSettings) TestDataConfigUpdate(assert, require *td.T) {
72+
bufCfg, errB := New(
73+
WithMaxLifetime(3*time.Second),
74+
WithMaxSize(12345),
75+
WithGroupBy([]string{"aaa", "bbb"}),
76+
WithRetryInitialInterval(8*time.Second),
77+
WithRetryMaxInterval(30*time.Second),
78+
WithRetryMaxElapsedTime(10*time.Minute),
79+
)
80+
assert.CmpNoError(errB)
81+
82+
assert.Cmp(*bufCfg, DataSetBufferSettings{
83+
MaxLifetime: 3 * time.Second,
84+
MaxSize: 12345,
85+
GroupBy: []string{"aaa", "bbb"},
86+
RetryInitialInterval: 8 * time.Second,
87+
RetryMaxInterval: 30 * time.Second,
88+
RetryMaxElapsedTime: 10 * time.Minute,
89+
})
90+
91+
bufCfg2, err := bufCfg.Update(
92+
WithMaxLifetime(23*time.Second),
93+
WithMaxSize(212345),
94+
WithGroupBy([]string{"2aaa", "2bbb"}),
95+
WithRetryInitialInterval(28*time.Second),
96+
WithRetryMaxInterval(230*time.Second),
97+
WithRetryMaxElapsedTime(210*time.Minute),
98+
)
99+
assert.CmpNoError(err)
100+
101+
// original config is unchanged
102+
assert.Cmp(*bufCfg, DataSetBufferSettings{
103+
MaxLifetime: 3 * time.Second,
104+
MaxSize: 12345,
105+
GroupBy: []string{"aaa", "bbb"},
106+
RetryInitialInterval: 8 * time.Second,
107+
RetryMaxInterval: 30 * time.Second,
108+
RetryMaxElapsedTime: 10 * time.Minute,
109+
})
110+
111+
// new config is changed
112+
assert.Cmp(*bufCfg2, DataSetBufferSettings{
113+
MaxLifetime: 23 * time.Second,
114+
MaxSize: 212345,
115+
GroupBy: []string{"2aaa", "2bbb"},
116+
RetryInitialInterval: 28 * time.Second,
117+
RetryMaxInterval: 230 * time.Second,
118+
RetryMaxElapsedTime: 210 * time.Minute,
119+
})
120+
}

pkg/client/add_events.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error
5353
// first, figure out which keys are part of the batch
5454
seenKeys := make(map[string]bool)
5555
for _, bundle := range bundles {
56-
key := bundle.Key(client.Config.GroupBy)
56+
key := bundle.Key(client.Config.BufferSettings.GroupBy)
5757
seenKeys[key] = true
5858
}
5959

@@ -73,7 +73,7 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error
7373

7474
// and as last step - publish them
7575
for _, bundle := range bundles {
76-
key := bundle.Key(client.Config.GroupBy)
76+
key := bundle.Key(client.Config.BufferSettings.GroupBy)
7777
client.eventsEnqueued.Add(1)
7878
client.addEventsPubSub.Pub(bundle, key)
7979
}
@@ -172,13 +172,13 @@ func (client *DataSetClient) ListenAndSendBundlesForKey(key string, ch chan inte
172172
}
173173
}
174174

175-
// IsProcessingBuffers returns True if there are still some unprocessed data.
175+
// IsProcessingBuffers returns True if there are still some unprocessed buffers.
176176
// False otherwise.
177177
func (client *DataSetClient) IsProcessingBuffers() bool {
178178
return client.buffersEnqueued.Load() > client.buffersProcessed.Load()
179179
}
180180

181-
// IsProcessingBuffers returns True if there are still some unprocessed data.
181+
// IsProcessingEvents returns True if there are still some unprocessed events.
182182
// False otherwise.
183183
func (client *DataSetClient) IsProcessingEvents() bool {
184184
return client.eventsEnqueued.Load() > client.eventsProcessed.Load()
@@ -198,7 +198,7 @@ func (client *DataSetClient) Finish() {
198198
zap.Uint64("eventsEnqueued", client.eventsEnqueued.Load()),
199199
zap.Uint64("eventsProcessed", client.eventsProcessed.Load()),
200200
)
201-
time.Sleep(client.Config.RetryBase)
201+
time.Sleep(client.Config.BufferSettings.RetryInitialInterval)
202202
i++
203203
if i > 50 {
204204
break
@@ -216,13 +216,12 @@ func (client *DataSetClient) Finish() {
216216
zap.Uint64("buffersEnqueued", client.buffersEnqueued.Load()),
217217
zap.Uint64("buffersProcessed", client.buffersProcessed.Load()),
218218
)
219-
time.Sleep(client.Config.RetryBase)
219+
time.Sleep(client.Config.BufferSettings.RetryInitialInterval)
220220
j++
221221
if j > 50 {
222222
break
223223
}
224224
}
225-
client.workers.Wait()
226225

227226
client.Logger.Info("All buffers have been processed")
228227
}

pkg/client/add_events_long_running_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"testing"
2929
"time"
3030

31+
"github.com/scalyr/dataset-go/pkg/buffer_config"
32+
3133
"github.com/maxatome/go-testdeep/helpers/tdsuite"
3234

3335
"github.com/scalyr/dataset-go/pkg/api/add_events"
@@ -65,11 +67,13 @@ func TestSuiteAddEventsLongRunning(t *testing.T) {
6567
func (s *SuiteAddEventsLongRunning) TestAddEventsManyLogsShouldSucceed(assert, require *td.T) {
6668
const MaxDelayMs = 200
6769
config := &config.DataSetConfig{
68-
Endpoint: "https://example.com",
69-
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
70-
MaxPayloadB: 1000,
71-
MaxBufferDelay: time.Duration(MaxDelayMs) * time.Millisecond,
72-
RetryBase: RetryBase,
70+
Endpoint: "https://example.com",
71+
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
72+
BufferSettings: buffer_config.DataSetBufferSettings{
73+
MaxSize: 1000,
74+
MaxLifetime: time.Duration(MaxDelayMs) * time.Millisecond,
75+
RetryInitialInterval: RetryBase,
76+
},
7377
}
7478
sc, _ := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
7579

0 commit comments

Comments
 (0)