@@ -4,11 +4,12 @@ import (
4
4
"bufio"
5
5
"bytes"
6
6
"fmt"
7
- "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
8
- "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
9
7
"sync"
10
8
"sync/atomic"
11
9
"time"
10
+
11
+ "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
12
+ "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
12
13
)
13
14
14
15
type ConfirmationStatus struct {
@@ -50,7 +51,17 @@ func (cs *ConfirmationStatus) GetErrorCode() uint16 {
50
51
return cs .errorCode
51
52
}
52
53
54
+ func (cs * ConfirmationStatus ) updateStatus (errorCode uint16 , confirmed bool ) {
55
+ cs .confirmed = confirmed
56
+ if confirmed {
57
+ return
58
+ }
59
+ cs .errorCode = errorCode
60
+ cs .err = lookErrorCode (errorCode )
61
+ }
62
+
53
63
type messageSequence struct {
64
+ sourceMsg message.StreamMessage
54
65
messageBytes []byte
55
66
publishingId int64
56
67
filterValue string
@@ -193,18 +204,10 @@ func NewProducerOptions() *ProducerOptions {
193
204
}
194
205
}
195
206
196
- func (producer * Producer ) GetUnConfirmed () map [int64 ]* ConfirmationStatus {
197
- return producer .unConfirmed .getAll ()
198
- }
199
-
200
207
func (po * ProducerOptions ) isSubEntriesBatching () bool {
201
208
return po .SubEntrySize > 1
202
209
}
203
210
204
- func (producer * Producer ) lenUnConfirmed () int {
205
- return producer .unConfirmed .size ()
206
- }
207
-
208
211
// NotifyPublishConfirmation returns a channel that receives the confirmation status of the messages sent by the producer.
209
212
func (producer * Producer ) NotifyPublishConfirmation () ChannelPublishConfirm {
210
213
ch := make (chan []* ConfirmationStatus , 1 )
@@ -292,13 +295,17 @@ func (producer *Producer) processPendingSequencesQueue() {
292
295
var lastError error
293
296
294
297
if producer .pendingSequencesQueue .IsStopped () {
298
+ // add also the last message to sequenceToSend
299
+ // otherwise it will be lost
300
+ sequenceToSend = append (sequenceToSend , msg )
295
301
break
296
302
}
297
303
// There is something in the queue. Checks the buffer is still less than the maxFrame
298
304
totalBufferToSend += len (msg .messageBytes )
299
305
if totalBufferToSend > maxFrame {
300
306
// if the totalBufferToSend is greater than the requestedMaxFrameSize
301
307
// the producer sends the messages and reset the buffer
308
+ producer .unConfirmed .addFromSequences (sequenceToSend , producer .GetID ())
302
309
lastError = producer .internalBatchSend (sequenceToSend )
303
310
sequenceToSend = sequenceToSend [:0 ]
304
311
totalBufferToSend = initBufferPublishSize
@@ -310,6 +317,7 @@ func (producer *Producer) processPendingSequencesQueue() {
310
317
// the messages during the checks of the buffer. In this case
311
318
if producer .pendingSequencesQueue .IsEmpty () || len (sequenceToSend ) >= producer .options .BatchSize {
312
319
if len (sequenceToSend ) > 0 {
320
+ producer .unConfirmed .addFromSequences (sequenceToSend , producer .GetID ())
313
321
lastError = producer .internalBatchSend (sequenceToSend )
314
322
sequenceToSend = sequenceToSend [:0 ]
315
323
totalBufferToSend += initBufferPublishSize
@@ -323,13 +331,36 @@ func (producer *Producer) processPendingSequencesQueue() {
323
331
// just in case there are messages in the buffer
324
332
// not matter is sent or not the messages will be timed out
325
333
if len (sequenceToSend ) > 0 {
326
- _ = producer .internalBatchSend (sequenceToSend )
334
+ producer .markUnsentAsUnconfirmed (sequenceToSend )
327
335
}
328
336
329
337
}()
330
338
logs .LogDebug ("producer %d processPendingSequencesQueue closed" , producer .id )
331
339
}
332
340
341
+ func (producer * Producer ) markUnsentAsUnconfirmed (sequences []* messageSequence ) {
342
+ if len (sequences ) == 0 {
343
+ return
344
+ }
345
+
346
+ // Send as unconfirmed the messages in the pendingSequencesQueue,
347
+ // that have never been sent,
348
+ // with the "entityClosed" error.
349
+ confirms := make ([]* ConfirmationStatus , 0 , len (sequences ))
350
+ for _ , ps := range sequences {
351
+ cs := & ConfirmationStatus {
352
+ inserted : time .Now (),
353
+ message : ps .sourceMsg ,
354
+ producerID : producer .GetID (),
355
+ publishingId : ps .publishingId ,
356
+ confirmed : false ,
357
+ }
358
+ cs .updateStatus (entityClosed , false )
359
+ confirms = append (confirms , cs )
360
+ }
361
+ producer .sendConfirmationStatus (confirms )
362
+ }
363
+
333
364
func (producer * Producer ) assignPublishingID (message message.StreamMessage ) int64 {
334
365
sequence := message .GetPublishingId ()
335
366
// in case of sub entry the deduplication is disabled
@@ -349,10 +380,12 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str
349
380
if producer .options .IsFilterEnabled () {
350
381
filterValue = producer .options .Filter .FilterValue (streamMessage )
351
382
}
352
- msqSeq := & messageSequence {}
353
- msqSeq .messageBytes = marshalBinary
354
- msqSeq .publishingId = seq
355
- msqSeq .filterValue = filterValue
383
+ msqSeq := & messageSequence {
384
+ sourceMsg : streamMessage ,
385
+ messageBytes : marshalBinary ,
386
+ publishingId : seq ,
387
+ filterValue : filterValue ,
388
+ }
356
389
return msqSeq , nil
357
390
}
358
391
@@ -366,9 +399,13 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error {
366
399
if err != nil {
367
400
return err
368
401
}
369
- producer .unConfirmed .addFromSequence (messageSeq , & streamMessage , producer .GetID ())
402
+ if producer .getStatus () == closed {
403
+ producer .markUnsentAsUnconfirmed ([]* messageSequence {messageSeq })
404
+ return fmt .Errorf ("producer id: %d closed" , producer .id )
405
+ }
370
406
371
407
if len (messageSeq .messageBytes ) > defaultMaxFrameSize {
408
+ producer .unConfirmed .addFromSequences ([]* messageSequence {messageSeq }, producer .GetID ())
372
409
tooLarge := producer .unConfirmed .extractWithError (messageSeq .publishingId , responseCodeFrameTooLarge )
373
410
producer .sendConfirmationStatus ([]* ConfirmationStatus {tooLarge })
374
411
return FrameTooLarge
@@ -377,7 +414,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error {
377
414
// se the processPendingSequencesQueue function
378
415
err = producer .pendingSequencesQueue .Enqueue (messageSeq )
379
416
if err != nil {
380
- return fmt .Errorf ("error during enqueue message: %s. Message will be in timed . Producer id: %d " , err , producer .id )
417
+ return fmt .Errorf ("error during enqueue message: %s pending queue closed . Producer id: %d " , err , producer .id )
381
418
}
382
419
return nil
383
420
}
@@ -389,32 +426,40 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error {
389
426
// returns an error if the message could not be sent for marshal problems or if the buffer is too large
390
427
func (producer * Producer ) BatchSend (batchMessages []message.StreamMessage ) error {
391
428
maxFrame := defaultMaxFrameSize
392
- var messagesSequence = make ([]* messageSequence , 0 )
429
+ var messagesSequences = make ([]* messageSequence , 0 , len ( batchMessages ) )
393
430
totalBufferToSend := 0
431
+
394
432
for _ , batchMessage := range batchMessages {
395
433
messageSeq , err := producer .fromMessageToMessageSequence (batchMessage )
396
434
if err != nil {
397
435
return err
398
436
}
399
- producer .unConfirmed .addFromSequence (messageSeq , & batchMessage , producer .GetID ())
400
437
401
438
totalBufferToSend += len (messageSeq .messageBytes )
402
- messagesSequence = append (messagesSequence , messageSeq )
439
+ messagesSequences = append (messagesSequences , messageSeq )
440
+ }
441
+
442
+ if producer .getStatus () == closed {
443
+ producer .markUnsentAsUnconfirmed (messagesSequences )
444
+ return fmt .Errorf ("producer id: %d closed" , producer .id )
445
+ }
446
+
447
+ if len (messagesSequences ) > 0 {
448
+ producer .unConfirmed .addFromSequences (messagesSequences , producer .GetID ())
403
449
}
404
- //
405
450
406
451
if totalBufferToSend + initBufferPublishSize > maxFrame {
407
452
// if the totalBufferToSend is greater than the requestedMaxFrameSize
408
453
// all the messages are unconfirmed
409
454
410
- for _ , msg := range messagesSequence {
455
+ for _ , msg := range messagesSequences {
411
456
m := producer .unConfirmed .extractWithError (msg .publishingId , responseCodeFrameTooLarge )
412
457
producer .sendConfirmationStatus ([]* ConfirmationStatus {m })
413
458
}
414
459
return FrameTooLarge
415
460
}
416
461
417
- return producer .internalBatchSend (messagesSequence )
462
+ return producer .internalBatchSend (messagesSequences )
418
463
}
419
464
420
465
func (producer * Producer ) GetID () uint8 {
@@ -605,7 +650,6 @@ func (producer *Producer) close(reason Event) error {
605
650
logs .LogDebug ("producer options is nil, the close will be ignored" )
606
651
return nil
607
652
}
608
- _ , _ = producer .options .client .coordinator .ExtractProducerById (producer .id )
609
653
610
654
if ! producer .options .client .socket .isOpen () {
611
655
return fmt .Errorf ("tcp connection is closed" )
@@ -616,6 +660,8 @@ func (producer *Producer) close(reason Event) error {
616
660
_ = producer .options .client .deletePublisher (producer .id )
617
661
}
618
662
663
+ _ , _ = producer .options .client .coordinator .ExtractProducerById (producer .id )
664
+
619
665
if producer .options .client .coordinator .ProducersCount () == 0 {
620
666
_ = producer .options .client .Close ()
621
667
}
@@ -635,7 +681,8 @@ func (producer *Producer) stopAndWaitPendingSequencesQueue() {
635
681
636
682
// Stop the pendingSequencesQueue, so the producer can't send messages anymore
637
683
// but the producer can still handle the inflight messages
638
- producer .pendingSequencesQueue .Stop ()
684
+ pendingSequences := producer .pendingSequencesQueue .Stop ()
685
+ producer .markUnsentAsUnconfirmed (pendingSequences )
639
686
640
687
// Stop the confirmationTimeoutTicker. It will flush the unconfirmed messages
641
688
producer .confirmationTimeoutTicker .Stop ()
@@ -657,9 +704,9 @@ func (producer *Producer) waitForInflightMessages() {
657
704
658
705
tentatives := 0
659
706
660
- for (producer .lenUnConfirmed () > 0 ) && tentatives < 5 {
707
+ for (producer .unConfirmed . size () > 0 ) && tentatives < 5 {
661
708
logs .LogInfo ("wait inflight messages - unconfirmed len: %d - retry: %d" ,
662
- producer .lenUnConfirmed (), tentatives )
709
+ producer .unConfirmed . size (), tentatives )
663
710
producer .flushUnConfirmedMessages ()
664
711
time .Sleep (time .Duration (500 ) * time .Millisecond )
665
712
tentatives ++
0 commit comments