@@ -10,8 +10,9 @@ import (
1010 "sync"
1111 "time"
1212
13- "github.com/aws/aws-sdk-go/aws"
14- "github.com/aws/aws-sdk-go/service/kinesis"
13+ "github.com/aws/aws-sdk-go-v2/aws"
14+ "github.com/aws/aws-sdk-go-v2/service/kinesis"
15+ "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
1516
1617 "github.com/fission/keda-connectors/common"
1718
@@ -20,23 +21,23 @@ import (
2021
2122type pullFunc func (* record ) error
2223type record struct {
23- * kinesis .Record
24+ * types .Record
2425 shardID string
2526 millisBehindLatest * int64
2627}
2728type awsKinesisConnector struct {
2829 ctx context.Context
29- client * kinesis.Kinesis
30+ client * kinesis.Client
3031 connectordata common.ConnectorMetadata
3132 logger * zap.Logger
32- shardc chan * kinesis .Shard
33- maxRecords int64
33+ shardc chan * types .Shard
34+ maxRecords int32
3435}
3536
3637// listShards get called every 30sec to get all the shards
37- func (conn * awsKinesisConnector ) listShards () ([]* kinesis .Shard , error ) {
38+ func (conn * awsKinesisConnector ) listShards (ctx context. Context ) ([]types .Shard , error ) {
3839 // call DescribeStream to get updated shards
39- stream , err := conn .client .DescribeStream (& kinesis.DescribeStreamInput {
40+ stream , err := conn .client .DescribeStream (ctx , & kinesis.DescribeStreamInput {
4041 StreamName : & conn .connectordata .Topic ,
4142 })
4243 if err != nil {
@@ -56,7 +57,7 @@ func (conn *awsKinesisConnector) findNewShards() {
5657 return
5758 case <- ticker .C :
5859 // check if new shards are available in every 30 seconds
59- shardList , err := conn .listShards ()
60+ shardList , err := conn .listShards (conn . ctx )
6061 if err != nil {
6162 return
6263 }
@@ -65,7 +66,7 @@ func (conn *awsKinesisConnector) findNewShards() {
6566 // send only new shards
6667 _ , loaded := shards .LoadOrStore (* s .ShardId , s )
6768 if ! loaded {
68- conn .shardc <- s
69+ conn .shardc <- & s
6970 }
7071 }
7172 }
@@ -82,26 +83,26 @@ func (conn *awsKinesisConnector) getIterator(shardID string, checkpoint string)
8283 if checkpoint != "" {
8384 // Start from, where we left
8485 params .StartingSequenceNumber = aws .String (checkpoint )
85- params .ShardIteratorType = aws . String ( kinesis . ShardIteratorTypeAfterSequenceNumber )
86- iteratorOutput , err := conn .client .GetShardIteratorWithContext (conn .ctx , params )
86+ params .ShardIteratorType = types . ShardIteratorTypeAfterSequenceNumber
87+ iteratorOutput , err := conn .client .GetShardIterator (conn .ctx , params )
8788 if err != nil {
8889 return nil , err
8990 }
9091 return iteratorOutput , err
9192 }
9293 // Start from, oldest record in the shard
93- params .ShardIteratorType = aws . String ( kinesis . ShardIteratorTypeTrimHorizon )
94- iteratorOutput , err := conn .client .GetShardIteratorWithContext (conn .ctx , params )
94+ params .ShardIteratorType = types . ShardIteratorTypeTrimHorizon
95+ iteratorOutput , err := conn .client .GetShardIterator (conn .ctx , params )
9596 if err != nil {
9697 return nil , err
9798 }
9899 return iteratorOutput , err
99100}
100101
101102// getRecords get the data for the specific shard
102- func (conn * awsKinesisConnector ) getRecords (shardIterator * string ) (* kinesis.GetRecordsOutput , error ) {
103+ func (conn * awsKinesisConnector ) getRecords (ctx context. Context , shardIterator * string ) (* kinesis.GetRecordsOutput , error ) {
103104 // get records use shard iterator for making request
104- records , err := conn .client .GetRecords (& kinesis.GetRecordsInput {
105+ records , err := conn .client .GetRecords (ctx , & kinesis.GetRecordsInput {
105106 ShardIterator : shardIterator ,
106107 Limit : & conn .maxRecords ,
107108 })
@@ -148,7 +149,7 @@ func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {
148149 }
149150 iterator := iteratorOutput .ShardIterator
150151 if iterator != nil {
151- resp , err := conn .getRecords (iterator )
152+ resp , err := conn .getRecords (conn . ctx , iterator )
152153 if err != nil {
153154 conn .logger .Error ("error in getting records" ,
154155 zap .String ("shardID" , shardID ),
@@ -158,7 +159,7 @@ func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {
158159
159160 for _ , r := range resp .Records {
160161 // send records
161- err := fn (& record {r , shardID , resp .MillisBehindLatest })
162+ err := fn (& record {& r , shardID , resp .MillisBehindLatest })
162163 checkpoints .Store (shardID , * r .SequenceNumber )
163164 if err != nil {
164165 conn .logger .Error ("error in processing records" ,
@@ -201,17 +202,17 @@ func (conn *awsKinesisConnector) consumeMessage(r *record) {
201202 conn .logger .Error ("error processing message" ,
202203 zap .String ("shardID" , r .shardID ),
203204 zap .Error (err ))
204- conn .errorHandler (r , err .Error ())
205+ conn .errorHandler (conn . ctx , r , err .Error ())
205206 } else {
206207 defer resp .Body .Close ()
207208 body , err := io .ReadAll (resp .Body )
208209 if err != nil {
209210 conn .logger .Error ("error processing message" ,
210211 zap .String ("shardID" , r .shardID ),
211212 zap .Error (err ))
212- conn .errorHandler (r , err .Error ())
213+ conn .errorHandler (conn . ctx , r , err .Error ())
213214 } else {
214- if err := conn .responseHandler (r , string (body )); err != nil {
215+ if err := conn .responseHandler (conn . ctx , r , string (body )); err != nil {
215216 conn .logger .Error ("failed to publish response body from http request to topic" ,
216217 zap .Error (err ),
217218 zap .String ("topic" , conn .connectordata .ResponseTopic ),
@@ -225,23 +226,23 @@ func (conn *awsKinesisConnector) consumeMessage(r *record) {
225226 }
226227}
227228
228- func (conn * awsKinesisConnector ) responseHandler (r * record , response string ) error {
229+ func (conn * awsKinesisConnector ) responseHandler (ctx context. Context , r * record , response string ) error {
229230 if len (conn .connectordata .ResponseTopic ) > 0 {
230231 params := & kinesis.PutRecordInput {
231232 Data : []byte (response ), // Required
232233 PartitionKey : aws .String (* r .PartitionKey ), // Required
233234 StreamName : aws .String (conn .connectordata .ResponseTopic ), // Required
234235 SequenceNumberForOrdering : aws .String (* r .SequenceNumber ),
235236 }
236- _ , err := conn .client .PutRecord (params )
237+ _ , err := conn .client .PutRecord (ctx , params )
237238 if err != nil {
238239 return err
239240 }
240241 }
241242 return nil
242243}
243244
244- func (conn * awsKinesisConnector ) errorHandler (r * record , errMsg string ) {
245+ func (conn * awsKinesisConnector ) errorHandler (ctx context. Context , r * record , errMsg string ) {
245246 if len (conn .connectordata .ErrorTopic ) > 0 {
246247 params := & kinesis.PutRecordInput {
247248 Data : []byte (errMsg ), // Required
@@ -250,7 +251,7 @@ func (conn *awsKinesisConnector) errorHandler(r *record, errMsg string) {
250251 SequenceNumberForOrdering : aws .String (* r .SequenceNumber ),
251252 }
252253
253- _ , err := conn .client .PutRecord (params )
254+ _ , err := conn .client .PutRecord (ctx , params )
254255 if err != nil {
255256 conn .logger .Error ("failed to publish message to error topic" ,
256257 zap .Error (err ),
@@ -272,29 +273,27 @@ func main() {
272273 if err != nil {
273274 log .Fatalf ("can't initialize zap logger: %v" , err )
274275 }
275- defer logger .Sync ()
276+ defer func () {
277+ _ = logger .Sync ()
278+ }()
276279
277280 ctx , cancel := context .WithCancel (context .Background ())
278281 defer cancel ()
279282
280- config , err := common .GetAwsConfig ()
283+ config , err := common .GetAwsConfig (ctx )
281284 if err != nil {
282285 logger .Error ("failed to fetch aws config" , zap .Error (err ))
283286 return
284287 }
285288
286- s , err := common .CreateValidatedSession (config )
287- if err != nil {
288- logger .Error ("not able to create the session" , zap .Error (err ))
289- return
290- }
291- kc := kinesis .New (s )
289+ kc := kinesis .NewFromConfig (config )
292290 connectordata , err := common .ParseConnectorMetadata ()
293291 if err != nil {
294292 logger .Error ("error while parsing metadata" , zap .Error (err ))
295293 return
296294 }
297- if err := kc .WaitUntilStreamExists (& kinesis.DescribeStreamInput {StreamName : & connectordata .Topic }); err != nil {
295+ waiter := kinesis .NewStreamExistsWaiter (kc )
296+ if err := waiter .Wait (ctx , & kinesis.DescribeStreamInput {StreamName : & connectordata .Topic }, 5 * time .Minute ); err != nil {
298297 logger .Error ("not able to connect to kinesis stream" , zap .Error (err ))
299298 return
300299 }
@@ -306,7 +305,7 @@ func main() {
306305 cancel () // call cancellation
307306 }()
308307
309- shardc := make (chan * kinesis .Shard , 1 )
308+ shardc := make (chan * types .Shard , 1 )
310309
311310 conn := awsKinesisConnector {
312311 ctx : ctx ,
0 commit comments