Skip to content

Commit 1d412f7

Browse files
authored
Merge pull request #316
* fail when needed * report error if it's already not successful
1 parent 3b4dbfb commit 1d412f7

File tree

1 file changed

+45
-32
lines changed

1 file changed

+45
-32
lines changed

azkustoingest/result.go

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package azkustoingest
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"math/rand"
78
"time"
@@ -82,7 +83,22 @@ func (r *Result) putQueued(ctx context.Context, i *Ingestion) {
8283
func (r *Result) Wait(ctx context.Context) <-chan error {
8384
ch := make(chan error, 1)
8485

85-
if r.record.Status.IsFinal() || !r.reportToTable {
86+
if r.record.Status.IsFinal() {
87+
if !r.record.Status.IsSuccess() {
88+
ch <- r.record
89+
}
90+
close(ch)
91+
return ch
92+
}
93+
94+
if !r.reportToTable {
95+
ch <- errors.New("status reporting is not enabled")
96+
close(ch)
97+
return ch
98+
}
99+
100+
if r.tableClient == nil {
101+
ch <- errors.New("table client is not initialized")
86102
close(ch)
87103
return ch
88104
}
@@ -104,40 +120,37 @@ func (r *Result) poll(ctx context.Context) {
104120
attempts := 3
105121
delay := [3]int{120, 60, 10} // attempts are counted backwards
106122

107-
// create a table client
108-
if r.tableClient != nil {
109-
// Create a ticker to poll the table in 10 second intervals.
110-
timer := time.NewTimer(pollInterval)
111-
defer timer.Stop()
112-
113-
for {
114-
select {
115-
case <-ctx.Done():
116-
r.record.Status = StatusRetrievalCanceled
117-
r.record.FailureStatus = Transient
118-
return
119-
120-
case <-timer.C:
121-
smap, err := r.tableClient.Read(ctx, r.record.IngestionSourceID.String())
122-
if err != nil {
123-
if attempts == 0 {
124-
r.record.Status = StatusRetrievalFailed
125-
r.record.FailureStatus = Transient
126-
r.record.Details = "Failed reading from Status Table: " + err.Error()
127-
return
128-
}
129-
130-
attempts = attempts - 1
131-
time.Sleep(time.Duration(delay[attempts]+rand.Intn(5)) * time.Second)
132-
} else {
133-
r.record.FromMap(smap)
134-
if r.record.Status.IsFinal() {
135-
return
136-
}
123+
// Create a ticker to poll the table in 10 second intervals.
124+
timer := time.NewTimer(pollInterval)
125+
defer timer.Stop()
126+
127+
for {
128+
select {
129+
case <-ctx.Done():
130+
r.record.Status = StatusRetrievalCanceled
131+
r.record.FailureStatus = Transient
132+
return
133+
134+
case <-timer.C:
135+
smap, err := r.tableClient.Read(ctx, r.record.IngestionSourceID.String())
136+
if err != nil {
137+
if attempts == 0 {
138+
r.record.Status = StatusRetrievalFailed
139+
r.record.FailureStatus = Transient
140+
r.record.Details = "Failed reading from Status Table: " + err.Error()
141+
return
137142
}
138143

139-
timer.Reset(pollInterval)
144+
attempts = attempts - 1
145+
time.Sleep(time.Duration(delay[attempts]+rand.Intn(5)) * time.Second)
146+
} else {
147+
r.record.FromMap(smap)
148+
if r.record.Status.IsFinal() {
149+
return
150+
}
140151
}
152+
153+
timer.Reset(pollInterval)
141154
}
142155
}
143156
}

0 commit comments

Comments
 (0)