Skip to content

Commit f76aed8

Browse files
feat: implement retries with exponential backoff (#55)
* feat: implement retries with exponential backoff * refactor: restructure query runners and optimize backoff * style: remove redundant comment line * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor: extract retry invocation logic * fix: handle overflow when calculating delay * test: add case for Mgmt error * doc: adjust log statements, comments * test: fix expected error --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 3b02d05 commit f76aed8

8 files changed

Lines changed: 547 additions & 27 deletions

File tree

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,23 @@ Where `./testdata/commands.kql` contains Kusto management commands, e.g.:
3939
- `--auth-azcli` or other authentication options (see below)
4040
- `--kusto-endpoint` (required)
4141
- `--kusto-database` (required)
42+
- `--max-retries=3` (optional)
43+
- `--max-timeout=60` (optional)
4244

45+
#### Retry Options
46+
47+
Both file ingestion and management commands support automatic retry with exponential backoff for transient errors.
48+
Default values can be overridden by specifying the arguments as below:
49+
50+
```
51+
$ kusto-ingest file ./testdata/logs.multijson \
52+
--max-retries=5 \
53+
--max-timeout=120 \
54+
# ... other options
55+
```
56+
57+
- `--max-retries=3` - Maximum number of retry attempts (default: 3)
58+
- `--max-timeout=60` - Maximum total time in seconds for all retries (default: 60)
4359

4460
### Authentication
4561

internal/kusto/file.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ func (f FileIngestOptions) Run(cli cli.Provider) error {
4646
"target.table", f.KustoTarget.Table,
4747
"auth.tenant", f.Auth.TenantID,
4848
"auth.clientID", f.Auth.ClientID,
49+
"maxRetries", f.MaxRetries,
50+
"maxTimeout", f.MaxTimeout,
4951
)
5052

5153
fileOptions, err := f.FileOptions()
@@ -62,17 +64,27 @@ func (f FileIngestOptions) Run(cli cli.Provider) error {
6264
ctx, cancel := cli.Context()
6365
defer cancel()
6466

67+
invokeIngest := func() error {
68+
if ctx.Err() != nil {
69+
return ctx.Err()
70+
}
71+
_, err := ingestor.FromFile(ctx, f.SourceFile, fileOptions...)
72+
return err
73+
}
74+
6575
cli.Logger().Info("file ingestion started")
6676
start := time.Now()
67-
_, err = ingestor.FromFile(
68-
ctx,
69-
f.SourceFile,
70-
fileOptions...,
77+
err = invokeWithRetries(
78+
invokeIngest,
79+
f.MaxRetries,
80+
f.MaxTimeout,
81+
cli.Logger(),
7182
)
7283
if err != nil {
73-
return fmt.Errorf("ingest from file %q: %w", f.SourceFile, err)
84+
cli.Logger().Error("failed to ingest file", "error", err, "file", f.SourceFile)
85+
return err
7486
}
75-
cli.Logger().Info("file ingestion completed", "duration", time.Since(start))
7687

88+
cli.Logger().Info("file ingestion completed successfully", "duration", time.Since(start))
7789
return nil
7890
}

internal/kusto/file_test.go

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ func writeToTestFile(t testing.TB, fileName string, content []byte) string {
2525
}
2626

2727
func Test_FileIngestOptions_FileOptions(t *testing.T) {
28-
t.Run("with mapping", func (t *testing.T) {
28+
t.Run("with mapping", func(t *testing.T) {
2929
mappingFile := writeToTestFile(t, "test-mapping.json", []byte(`[]`))
3030

3131
options := FileIngestOptions{
32-
Format: "csv",
32+
Format: "csv",
3333
MappingsFile: mappingFile,
3434
}
3535

@@ -40,15 +40,15 @@ func Test_FileIngestOptions_FileOptions(t *testing.T) {
4040

4141
t.Run("with invalid mapping file", func(t *testing.T) {
4242
options := FileIngestOptions{
43-
Format: "csv",
43+
Format: "csv",
4444
MappingsFile: "some-random-file",
4545
}
4646

4747
_, err := options.FileOptions()
4848
assert.Error(t, err)
4949
})
5050

51-
t.Run("without mapping", func (t *testing.T) {
51+
t.Run("without mapping", func(t *testing.T) {
5252
options := FileIngestOptions{
5353
Format: "csv",
5454
}
@@ -61,7 +61,7 @@ func Test_FileIngestOptions_FileOptions(t *testing.T) {
6161

6262
func Test_FileIngestOptions_Run_IngestFile_NoMapping(t *testing.T) {
6363
sourceFile := writeToTestFile(t, "logs.json", []byte("{}"))
64-
64+
6565
cli := testingcli.New()
6666

6767
ingestor := testingkusto.New(func(ing *testingkusto.Ingestor) {
@@ -74,9 +74,9 @@ func Test_FileIngestOptions_Run_IngestFile_NoMapping(t *testing.T) {
7474
})
7575

7676
opts := FileIngestOptions{
77-
SourceFile: sourceFile,
78-
Format: "multijson",
79-
Auth: newTestAuth(),
77+
SourceFile: sourceFile,
78+
Format: "multijson",
79+
Auth: newTestAuth(),
8080
KustoTarget: newTestKustoTarget(),
8181

8282
ingestorBuildSettings: ingestorBuildSettings{
@@ -93,7 +93,7 @@ func Test_FileIngestOptions_Run_IngestFile_NoMapping(t *testing.T) {
9393
func Test_FileIngestOptions_Run_IngestFile_WithMapping(t *testing.T) {
9494
sourceFile := writeToTestFile(t, "logs.json", []byte("{}"))
9595
sourceFileMapping := writeToTestFile(t, "logs-mapping.json", []byte("[]"))
96-
96+
9797
cli := testingcli.New()
9898

9999
ingestor := testingkusto.New(func(ing *testingkusto.Ingestor) {
@@ -106,11 +106,11 @@ func Test_FileIngestOptions_Run_IngestFile_WithMapping(t *testing.T) {
106106
})
107107

108108
opts := FileIngestOptions{
109-
SourceFile: sourceFile,
110-
Format: "multijson",
109+
SourceFile: sourceFile,
110+
Format: "multijson",
111111
MappingsFile: sourceFileMapping,
112-
Auth: newTestAuth(),
113-
KustoTarget: newTestKustoTarget(),
112+
Auth: newTestAuth(),
113+
KustoTarget: newTestKustoTarget(),
114114

115115
ingestorBuildSettings: ingestorBuildSettings{
116116
CreateIngestor: func(target KustoTargetOptions, auth AuthOptions) (ingest.Ingestor, error) {
@@ -121,4 +121,25 @@ func Test_FileIngestOptions_Run_IngestFile_WithMapping(t *testing.T) {
121121

122122
err := opts.Run(cli)
123123
assert.NoError(t, err)
124-
}
124+
}
125+
126+
func Test_FileIngestOptions_Run_CreateIngestorError(t *testing.T) {
127+
sourceFile := writeToTestFile(t, "logs.json", []byte("{}"))
128+
cli := testingcli.New()
129+
130+
opts := FileIngestOptions{
131+
SourceFile: sourceFile,
132+
Format: "multijson",
133+
Auth: newTestAuth(),
134+
KustoTarget: newTestKustoTarget(),
135+
ingestorBuildSettings: ingestorBuildSettings{
136+
CreateIngestor: func(target KustoTargetOptions, auth AuthOptions) (ingest.Ingestor, error) {
137+
return nil, assert.AnError
138+
},
139+
},
140+
}
141+
142+
err := opts.Run(cli)
143+
assert.Error(t, err)
144+
assert.Contains(t, err.Error(), "create Kusto ingestor")
145+
}

internal/kusto/management.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ func (m ManagementOptions) Run(cli cli.Provider) error {
1515
"target.database", m.KustoTarget.Database,
1616
"auth.tenant", m.Auth.TenantID,
1717
"auth.clientID", m.Auth.ClientID,
18+
"maxRetries", m.MaxRetries,
19+
"maxTimeout", m.MaxTimeout,
1820
)
1921

2022
queryer, err := m.createQueryClient(m.KustoTarget, m.Auth)
@@ -26,14 +28,29 @@ func (m ManagementOptions) Run(cli cli.Provider) error {
2628
ctx, cancel := cli.Context()
2729
defer cancel()
2830

29-
cli.Logger().Info("executing management commands")
30-
start := time.Now()
3131
stmt := kql.New("").AddUnsafe(string(m.Source))
32-
_, err = queryer.Mgmt(ctx, m.KustoTarget.Database, stmt)
32+
invokeQuery := func() error {
33+
if ctx.Err() != nil {
34+
return ctx.Err()
35+
}
36+
_, err := queryer.Mgmt(ctx, m.KustoTarget.Database, stmt)
37+
return err
38+
}
39+
40+
cli.Logger().Info("executing management command")
41+
42+
start := time.Now()
43+
err = invokeWithRetries(
44+
invokeQuery,
45+
m.MaxRetries,
46+
m.MaxTimeout,
47+
cli.Logger(),
48+
)
3349
if err != nil {
34-
return fmt.Errorf("execute management commands: %w", err)
50+
cli.Logger().Error("failed to execute management command", "error", err)
51+
return err
3552
}
36-
cli.Logger().Info("management commands executed", "duration", time.Since(start))
3753

54+
cli.Logger().Info("management command executed successfully", "duration", time.Since(start))
3855
return nil
3956
}

internal/kusto/management_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ func Test_ManagementOptions_Run_MgmtError(t *testing.T) {
6464

6565
q := testingkusto.NewQueryClient(func(qc *testingkusto.QueryClient) {
6666
qc.MgmtFn = func(_ context.Context, db string, stmt kusto.Statement, _ ...kusto.QueryOption) (*kusto.RowIterator, error) {
67-
return nil, errors.New("mgmt failed")
67+
// Return a non-retryable error
68+
return nil, errors.New("mgmt command failed")
6869
}
6970
})
7071

@@ -81,5 +82,6 @@ func Test_ManagementOptions_Run_MgmtError(t *testing.T) {
8182

8283
err := opts.Run(cli)
8384
assert.Error(t, err)
84-
assert.Contains(t, err.Error(), "execute management commands")
85+
// The error will be wrapped by invokeWithRetries as "non-retryable error"
86+
assert.Contains(t, err.Error(), "non-retryable kusto error")
8587
}

internal/kusto/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ type FileIngestOptions struct {
2828
Auth AuthOptions `embed:"" prefix:"auth-"`
2929
KustoTarget KustoTargetOptions `embed:"" prefix:"kusto-"`
3030

31+
// Retry and timeout configuration
32+
MaxRetries int `optional:"" default:"3" help:"Maximum number of retries for transient errors (default: 3)."`
33+
MaxTimeout int `optional:"" default:"60" help:"Maximum timeout in seconds for all retries (default: 60)."`
34+
3135
// for unit test
3236
ingestorBuildSettings `kong:"-"`
3337
}
@@ -39,6 +43,10 @@ type ManagementOptions struct {
3943
Auth AuthOptions `embed:"" prefix:"auth-"`
4044
KustoTarget KustoTargetOptions `embed:"" prefix:"kusto-"`
4145

46+
// Retry and timeout configuration
47+
MaxRetries int `optional:"" default:"3" help:"Maximum number of retries for transient errors (default: 3)."`
48+
MaxTimeout int `optional:"" default:"60" help:"Maximum timeout in seconds for all retries (default: 60)."`
49+
4250
// for unit test
4351
ingestorBuildSettings `kong:"-"`
4452
}

internal/kusto/retry.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package kusto
2+
3+
import (
4+
"fmt"
5+
"math"
6+
"math/rand/v2"
7+
"time"
8+
9+
"github.com/Azure/azure-kusto-go/kusto/data/errors"
10+
"github.com/charmbracelet/log"
11+
)
12+
13+
const (
14+
retryBaseDelay = 1 * time.Second
15+
)
16+
17+
/*
18+
invokeWithRetries executes the provided function with retry logic for transient Kusto errors.
19+
It uses exponential backoff with jitter and respects both maxRetries and maxTimeout constraints.
20+
21+
Parameters:
22+
- invoke: The function to execute with retries
23+
- maxRetries: Maximum number of retry attempts (e.g., 3 means 4 total attempts: initial + 3 retries)
24+
- maxTimeout: Maximum total duration in seconds before timing out
25+
- logger: Logger for logging retry attempts (optional, can be nil)
26+
27+
Returns an error if:
28+
- A non-retryable error is encountered
29+
- Maximum timeout is reached
30+
- Maximum retries are exhausted
31+
*/
32+
func invokeWithRetries(
33+
invoke func() error,
34+
maxRetries int,
35+
maxTimeout int,
36+
logger *log.Logger,
37+
) error {
38+
var err error
39+
maxTimeoutDuration := time.Duration(maxTimeout) * time.Second
40+
deadline := time.Now().Add(maxTimeoutDuration)
41+
42+
for attempt := 0; attempt <= maxRetries; attempt++ {
43+
// Check if we've exceeded the deadline before attempting
44+
if attempt > 0 && time.Now().After(deadline) {
45+
return fmt.Errorf("max timeout reached after %d retries: %w", attempt-1, err)
46+
}
47+
48+
err = invoke()
49+
if err == nil {
50+
return nil
51+
}
52+
53+
// Check error type for retry logic using azure-kusto-go SDK's Retry function
54+
if !errors.Retry(err) {
55+
return fmt.Errorf("non-retryable kusto error: %w", err)
56+
}
57+
58+
// Calculate next backoff duration
59+
backoffDelay := calculateDelay(attempt, retryBaseDelay)
60+
61+
if time.Now().Add(backoffDelay).After(deadline) {
62+
return fmt.Errorf("max timeout reached after %d retries: %w", attempt, err)
63+
}
64+
65+
if logger != nil {
66+
logger.Warn("transient kusto error, will retry", "error", err, "attempt", attempt+1, "backoff", backoffDelay)
67+
}
68+
69+
time.Sleep(backoffDelay)
70+
}
71+
72+
return fmt.Errorf("exhausted max retries (%d): %w", maxRetries, err)
73+
}
74+
75+
/*
76+
calculateDelay computes the next retry delay duration.
77+
Uses exponential backoff with jitter to prevent thundering herd issues.
78+
Formula: baseDelay * (2^attempt) * (1 + jitter)
79+
Where jitter is a random value between 0 and 0.1 (10% jitter)
80+
*/
81+
func calculateDelay(attempt int, baseDelay time.Duration) time.Duration {
82+
if attempt < 0 {
83+
return baseDelay
84+
}
85+
86+
// Check for overflow before multiplying by checking if we would exceed math.MaxInt64
87+
maxDelay := time.Duration(math.MaxInt64)
88+
89+
// Calculate 2^attempt using bit shifting, checking for overflow
90+
// If attempt >= 63, shifting would overflow (2^63 > MaxInt64)
91+
if attempt >= 63 {
92+
return maxDelay
93+
}
94+
95+
multiplier := int64(1) << attempt // 2^attempt
96+
97+
// Check if baseDelay * multiplier would overflow
98+
if baseDelay > maxDelay/time.Duration(multiplier) {
99+
return maxDelay
100+
}
101+
102+
delay := baseDelay * time.Duration(multiplier)
103+
104+
// Add jitter: random value between 0% and 10% of the delay
105+
jitter := rand.Float64() * 0.1 // 0% to 10% jitter
106+
jitteredDelay := time.Duration(float64(delay) * (1.0 + jitter))
107+
108+
// Check for overflow after jitter (float64 conversion could overflow)
109+
if jitteredDelay < 0 || jitteredDelay > maxDelay {
110+
return maxDelay
111+
}
112+
113+
// Apply max cap again after jitter
114+
if jitteredDelay > maxDelay {
115+
return maxDelay
116+
}
117+
118+
return jitteredDelay
119+
}

0 commit comments

Comments
 (0)