Skip to content

Commit 75e2ade

Browse files
jpaskhayzhonghui12
authored andcommitted
fix bug where using concurrency feature with 0 retries could drop all messages
- added missing Controller.Finish calls to tests using mock so expectations are actually checked - added WaitGroup to test cases that invoke goroutines to ensure they complete before test method exits - added test to protect against bugfix case (panics without bugfix)
1 parent 15f2efe commit 75e2ade

File tree

2 files changed

+53
-5
lines changed

2 files changed

+53
-5
lines changed

kinesis/kinesis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ func (outputPlugin *OutputPlugin) FlushWithRetries(count int, records []*kinesis
358358
currentRetries := outputPlugin.getConcurrentRetries()
359359
outputPlugin.addGoroutineCount(1)
360360

361-
for tries = 0; tries < outputPlugin.concurrencyRetryLimit; tries++ {
361+
for tries = 0; tries <= outputPlugin.concurrencyRetryLimit; tries++ {
362362
if currentRetries > 0 {
363363
// Wait if other goroutines are retrying, as well as implement a progressive backoff
364364
if currentRetries > uint32(outputPlugin.concurrencyRetryLimit) {

kinesis/kinesis_test.go

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/json"
55
"math/rand"
66
"os"
7+
"sync"
78
"testing"
89
"time"
910

@@ -116,6 +117,7 @@ func TestAddRecordAndFlush(t *testing.T) {
116117
}
117118

118119
ctrl := gomock.NewController(t)
120+
defer ctrl.Finish()
119121
mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl)
120122

121123
mockKinesis.EXPECT().PutRecords(gomock.Any()).Return(&kinesis.PutRecordsOutput{
@@ -140,6 +142,7 @@ func TestAddRecordAndFlushAggregate(t *testing.T) {
140142
}
141143

142144
ctrl := gomock.NewController(t)
145+
defer ctrl.Finish()
143146
mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl)
144147

145148
mockKinesis.EXPECT().PutRecords(gomock.Any()).Return(&kinesis.PutRecordsOutput{
@@ -170,11 +173,20 @@ func TestAddRecordWithConcurrency(t *testing.T) {
170173
}
171174

172175
ctrl := gomock.NewController(t)
176+
defer ctrl.Finish()
173177
mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl)
174-
175-
mockKinesis.EXPECT().PutRecords(gomock.Any()).Return(&kinesis.PutRecordsOutput{
176-
FailedRecordCount: aws.Int64(0),
177-
}, nil)
178+
// Need to use synchronization to ensure goroutine completes before test method exits
179+
var wg sync.WaitGroup
180+
wg.Add(1)
181+
defer wg.Wait()
182+
183+
mockKinesis.EXPECT().PutRecords(gomock.Any()).DoAndReturn(
184+
func(arg0 *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
185+
wg.Done()
186+
return &kinesis.PutRecordsOutput{
187+
FailedRecordCount: aws.Int64(0),
188+
}, nil
189+
})
178190

179191
outputPlugin, _ := newMockOutputPlugin(mockKinesis, false)
180192
// Enable concurrency
@@ -188,6 +200,42 @@ func TestAddRecordWithConcurrency(t *testing.T) {
188200
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected FlushConcurrent return code to be FLB_OK")
189201
}
190202

203+
func TestAddRecordWithConcurrencyNoRetries(t *testing.T) {
204+
records := make([]*kinesis.PutRecordsRequestEntry, 0, 500)
205+
206+
record := map[interface{}]interface{}{
207+
"testkey": []byte("test value"),
208+
}
209+
210+
ctrl := gomock.NewController(t)
211+
defer ctrl.Finish()
212+
mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl)
213+
// Need to use synchronization to ensure goroutine completes before test method exits
214+
var wg sync.WaitGroup
215+
wg.Add(1)
216+
defer wg.Wait()
217+
218+
mockKinesis.EXPECT().PutRecords(gomock.Any()).DoAndReturn(
219+
func(arg0 *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
220+
wg.Done()
221+
return &kinesis.PutRecordsOutput{
222+
FailedRecordCount: aws.Int64(0),
223+
}, nil
224+
})
225+
226+
outputPlugin, _ := newMockOutputPlugin(mockKinesis, false)
227+
// Enable concurrency but no retries
228+
outputPlugin.Concurrency = 2
229+
outputPlugin.concurrencyRetryLimit = 0
230+
231+
timeStamp := time.Now()
232+
retCode := outputPlugin.AddRecord(&records, record, &timeStamp)
233+
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected AddRecord return code to be FLB_OK")
234+
235+
retCode = outputPlugin.FlushConcurrent(len(records), records)
236+
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected FlushConcurrent return code to be FLB_OK")
237+
}
238+
191239
var compressors = map[string]func([]byte) ([]byte, error){
192240
"zlib": zlibCompress,
193241
"gzip": gzipCompress,

0 commit comments

Comments
 (0)