Skip to content

Commit 1df84c7

Browse files
authored
Merge pull request #317
* make `result.Wait()` configurable * Merge remote-tracking branch 'origin/master' into configurable-wait
1 parent 1d412f7 commit 1df84c7

File tree

3 files changed

+233
-14
lines changed

3 files changed

+233
-14
lines changed

azkustoingest/internal/status/status_table_client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,18 @@ const (
1616
fullMetadata = aztables.MetadataFormatFull
1717
)
1818

19+
type TableClientReader interface {
20+
Read(ctx context.Context, ingestionSourceID string) (map[string]interface{}, error)
21+
}
22+
1923
// TableClient allows reading and writing to azure tables.
2024
type TableClient struct {
2125
tableURI resources.URI
2226
client *aztables.Client
2327
}
2428

29+
var _ TableClientReader = (*TableClient)(nil)
30+
2531
// NewTableClient Creates an azure table client.
2632
func NewTableClient(client policy.Transporter, uri resources.URI) (*TableClient, error) {
2733
tableClient, err := aztables.NewClientWithNoCredential(uri.String(), &aztables.ClientOptions{

azkustoingest/result.go

Lines changed: 67 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
// Result provides a way for users track the state of ingestion jobs.
1515
type Result struct {
1616
record statusRecord
17-
tableClient *status.TableClient
17+
tableClient status.TableClientReader
1818
reportToTable bool
1919
}
2020

@@ -78,9 +78,58 @@ func (r *Result) putQueued(ctx context.Context, i *Ingestion) {
7878
r.tableClient = client
7979
}
8080

81+
type waitConfig struct {
82+
interval time.Duration
83+
immediateFirst bool
84+
retryBackoffDelay []time.Duration
85+
retryBackoffJitter time.Duration
86+
}
87+
88+
type WaitOption func(o *waitConfig)
89+
90+
func WithImmediateFirst() WaitOption {
91+
return func(o *waitConfig) {
92+
o.immediateFirst = true
93+
}
94+
}
95+
96+
func WithInterval(interval time.Duration) WaitOption {
97+
return func(o *waitConfig) {
98+
o.interval = interval
99+
}
100+
}
101+
102+
func WithRetryBackoffDelay(delay ...time.Duration) WaitOption {
103+
return func(o *waitConfig) {
104+
o.retryBackoffDelay = delay
105+
}
106+
}
107+
108+
func WithRetryBackoffJitter(jitter time.Duration) WaitOption {
109+
return func(o *waitConfig) {
110+
o.retryBackoffJitter = jitter
111+
}
112+
}
113+
114+
var (
115+
DefaultWaitPollInterval = 10 * time.Second
116+
DefaultWaitPollRetryBackoffDelay = []time.Duration{10 * time.Second, 60 * time.Second, 120 * time.Second}
117+
DefaultWaitPollRetryBackoffJitter = 5 * time.Second
118+
)
119+
81120
// Wait returns a channel that can be checked for ingestion results.
82121
// In order to check actual status please use the ReportResultToTable option when ingesting data.
83-
func (r *Result) Wait(ctx context.Context) <-chan error {
122+
func (r *Result) Wait(ctx context.Context, options ...WaitOption) <-chan error {
123+
cfg := waitConfig{
124+
interval: DefaultWaitPollInterval,
125+
retryBackoffDelay: DefaultWaitPollRetryBackoffDelay,
126+
retryBackoffJitter: DefaultWaitPollRetryBackoffJitter,
127+
}
128+
129+
for _, o := range options {
130+
o(&cfg)
131+
}
132+
84133
ch := make(chan error, 1)
85134

86135
if r.record.Status.IsFinal() {
@@ -106,7 +155,7 @@ func (r *Result) Wait(ctx context.Context) <-chan error {
106155
go func() {
107156
defer close(ch)
108157

109-
r.poll(ctx)
158+
r.poll(ctx, &cfg)
110159
if !r.record.Status.IsSuccess() {
111160
ch <- r.record
112161
}
@@ -115,13 +164,13 @@ func (r *Result) Wait(ctx context.Context) <-chan error {
115164
return ch
116165
}
117166

118-
func (r *Result) poll(ctx context.Context) {
119-
const pollInterval = 10 * time.Second
120-
attempts := 3
121-
delay := [3]int{120, 60, 10} // attempts are counted backwards
122-
123-
// Create a ticker to poll the table in 10 second intervals.
124-
timer := time.NewTimer(pollInterval)
167+
func (r *Result) poll(ctx context.Context, cfg *waitConfig) {
168+
initialInterval := cfg.interval
169+
if cfg.immediateFirst {
170+
initialInterval = 0
171+
}
172+
attempts := cfg.retryBackoffDelay[:]
173+
timer := time.NewTimer(initialInterval)
125174
defer timer.Stop()
126175

127176
for {
@@ -133,24 +182,28 @@ func (r *Result) poll(ctx context.Context) {
133182

134183
case <-timer.C:
135184
smap, err := r.tableClient.Read(ctx, r.record.IngestionSourceID.String())
185+
sleepTime := cfg.interval
136186
if err != nil {
137-
if attempts == 0 {
187+
if len(attempts) == 0 {
138188
r.record.Status = StatusRetrievalFailed
139189
r.record.FailureStatus = Transient
140190
r.record.Details = "Failed reading from Status Table: " + err.Error()
141191
return
142192
}
143193

144-
attempts = attempts - 1
145-
time.Sleep(time.Duration(delay[attempts]+rand.Intn(5)) * time.Second)
194+
sleepTime += attempts[0]
195+
attempts = attempts[1:]
196+
if cfg.retryBackoffJitter > 0 {
197+
sleepTime += time.Duration(rand.Intn(int(cfg.retryBackoffJitter)))
198+
}
146199
} else {
147200
r.record.FromMap(smap)
148201
if r.record.Status.IsFinal() {
149202
return
150203
}
151204
}
152205

153-
timer.Reset(pollInterval)
206+
timer.Reset(sleepTime)
154207
}
155208
}
156209
}

azkustoingest/result_test.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package azkustoingest
2+
3+
import (
4+
"context"
5+
"testing"
6+
"testing/synctest"
7+
"time"
8+
9+
"github.com/Azure/azure-kusto-go/azkustoingest/internal/status"
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
type TableClientReaderFunc func(ctx context.Context, ingestionSourceID string) (map[string]interface{}, error)
14+
15+
func (t TableClientReaderFunc) Read(ctx context.Context, ingestionSourceID string) (map[string]interface{}, error) {
16+
return t(ctx, ingestionSourceID)
17+
}
18+
19+
var _ status.TableClientReader = (*TableClientReaderFunc)(nil)
20+
21+
func TestWait(t *testing.T) {
22+
synctest.Test(t, func(t *testing.T) {
23+
res := &Result{
24+
reportToTable: true,
25+
tableClient: TableClientReaderFunc(func(ctx context.Context, ingestionSourceID string) (map[string]interface{}, error) {
26+
assert.FailNow(t, "Expected table client to not be called")
27+
return nil, nil
28+
}),
29+
record: statusRecord{
30+
Status: Pending,
31+
},
32+
}
33+
34+
ch := res.Wait(t.Context())
35+
synctest.Wait()
36+
37+
select {
38+
case <-ch:
39+
assert.FailNow(t, "Expected nothing to be sent on channel")
40+
default:
41+
}
42+
})
43+
}
44+
45+
func TestWait_ImmediateFirst(t *testing.T) {
46+
synctest.Test(t, func(t *testing.T) {
47+
res := &Result{
48+
reportToTable: true,
49+
tableClient: TableClientReaderFunc(func(ctx context.Context, ingestionSourceID string) (map[string]any, error) {
50+
return map[string]any{"Status": string(Succeeded)}, nil
51+
}),
52+
record: statusRecord{
53+
Status: Pending,
54+
},
55+
}
56+
57+
ch := res.Wait(t.Context(), WithImmediateFirst())
58+
synctest.Wait()
59+
60+
select {
61+
case <-ch:
62+
default:
63+
assert.FailNow(t, "Expected something to be sent on channel")
64+
}
65+
})
66+
}
67+
68+
func TestWait_WithInterval(t *testing.T) {
69+
synctest.Test(t, func(t *testing.T) {
70+
startTime := time.Now()
71+
var calledTimes []time.Duration
72+
73+
res := &Result{
74+
reportToTable: true,
75+
tableClient: TableClientReaderFunc(func(ctx context.Context, ingestionSourceID string) (map[string]any, error) {
76+
calledTimes = append(calledTimes, time.Now().Sub(startTime))
77+
ret := map[string]any{"Status": string(Pending)}
78+
if len(calledTimes) >= 3 {
79+
ret["Status"] = string(Failed)
80+
}
81+
return ret, nil
82+
}),
83+
record: statusRecord{
84+
Status: Pending,
85+
},
86+
}
87+
88+
ch := res.Wait(t.Context(), WithInterval(5*time.Second))
89+
synctest.Wait()
90+
assert.Empty(t, calledTimes, "Expected no calls to table client")
91+
92+
time.Sleep(5 * time.Second)
93+
synctest.Wait()
94+
assert.Len(t, calledTimes, 1)
95+
assert.Equal(t, 5*time.Second, calledTimes[0])
96+
97+
time.Sleep(5 * time.Second)
98+
synctest.Wait()
99+
assert.Len(t, calledTimes, 2)
100+
assert.Equal(t, 10*time.Second, calledTimes[1])
101+
102+
time.Sleep(5 * time.Second)
103+
synctest.Wait()
104+
assert.Len(t, calledTimes, 3)
105+
assert.Equal(t, 15*time.Second, calledTimes[2])
106+
107+
select {
108+
case <-ch:
109+
default:
110+
assert.FailNow(t, "Expected something to be sent on channel")
111+
}
112+
})
113+
}
114+
115+
func TestWait_WithRetryBackoffDelay(t *testing.T) {
116+
synctest.Test(t, func(t *testing.T) {
117+
startTime := time.Now()
118+
var calledTimes []time.Duration
119+
120+
res := &Result{
121+
reportToTable: true,
122+
tableClient: TableClientReaderFunc(func(ctx context.Context, ingestionSourceID string) (map[string]any, error) {
123+
calledTimes = append(calledTimes, time.Now().Sub(startTime))
124+
return nil, assert.AnError
125+
}),
126+
record: statusRecord{
127+
Status: Pending,
128+
},
129+
}
130+
131+
ch := res.Wait(t.Context(), WithRetryBackoffDelay(1*time.Second, 3*time.Second), WithRetryBackoffJitter(0))
132+
synctest.Wait()
133+
assert.Empty(t, calledTimes, "Expected no calls to table client")
134+
135+
// First call after DefaultWaitPollInterval (10s)
136+
time.Sleep(10 * time.Second)
137+
synctest.Wait()
138+
assert.Len(t, calledTimes, 1)
139+
assert.Equal(t, 10*time.Second, calledTimes[0])
140+
141+
// Second call after first backoff delay (1s) + poll interval (10s)
142+
time.Sleep(11 * time.Second)
143+
synctest.Wait()
144+
assert.Len(t, calledTimes, 2)
145+
assert.Equal(t, 21*time.Second, calledTimes[1])
146+
147+
// Third call after second backoff delay (3s) + poll interval (10s)
148+
time.Sleep(13 * time.Second)
149+
synctest.Wait()
150+
assert.Len(t, calledTimes, 3)
151+
assert.Equal(t, 34*time.Second, calledTimes[2])
152+
153+
select {
154+
case err := <-ch:
155+
assert.NotNil(t, err)
156+
default:
157+
assert.FailNow(t, "Expected something to be sent on channel")
158+
}
159+
})
160+
}

0 commit comments

Comments
 (0)