@@ -110,7 +110,7 @@ TxReceiverHandler::TxReceiverHandler(
110
110
recv_listen_port,
111
111
recv_credit_window,
112
112
recv_drain_after_credit_window
113
- ),
113
+ ),
114
114
tx_action(tx_action),
115
115
tx_endloop_action(tx_endloop_action)
116
116
{
@@ -149,33 +149,39 @@ void TxReceiverHandler::on_transaction_commit_failed(transaction t) {
149
149
}
150
150
151
151
void TxReceiverHandler::on_transaction_declared (transaction t) {
152
+ // TODO python some weird magic around count 0, doesn't make much sense to me yet
153
+ // when fixes take care about all count checks ofr zero
154
+ if (count != 0 && processed + batch_size > count) {
155
+ batch_size = count % batch_size;
156
+ } else if (count != 0 ) {
157
+ batch_size = count;
158
+ }
152
159
logger (trace) << " [on_transaction_declared] txn called " << (&t);
153
160
logger (debug) << " [on_transaction_declared] txn is_empty " << (t.is_empty ());
154
161
tx = t;
155
162
}
156
163
157
164
void TxReceiverHandler::on_transaction_aborted (transaction t) {
158
- confirmed += current_batch;
159
- logger (debug) << " [on_transaction_aborted] messages aborted, confirmed: " << confirmed;
160
- if (confirmed == count) {
161
- logger (info) << " [on_transaction_committed] All messages proccessed" ;
162
- t.connection ().close ();
163
- }
164
- else {
165
+ processed += current_batch;
166
+ current_batch = 0 ;
167
+ logger (debug) << " [on_transaction_aborted] messages aborted, processed: " << processed;
168
+ if (count == 0 || processed < count) {
165
169
sess.declare_transaction (*this );
170
+ } else {
171
+ logger (info) << " [on_transaction_committed] All messages processed" ;
172
+ t.connection ().close ();
166
173
}
167
174
}
168
175
169
176
void TxReceiverHandler::on_transaction_committed (transaction t) {
170
- confirmed += current_batch;
177
+ processed += current_batch;
171
178
current_batch = 0 ;
172
- logger (debug) << " [on_transaction_aborted] messages committed, confirmed: " << confirmed;
173
- if (confirmed == count) {
174
- logger (info) << " [on_transaction_committed] All messages proccessed" ;
175
- t.connection ().close ();
176
- }
177
- else {
179
+ logger (debug) << " [on_transaction_aborted] messages committed, processed: " << processed;
180
+ if (count == 0 || processed < count) {
178
181
sess.declare_transaction (*this );
182
+ } else {
183
+ logger (info) << " [on_transaction_committed] All messages processed" ;
184
+ t.connection ().close ();
179
185
}
180
186
}
181
187
@@ -192,7 +198,7 @@ void TxReceiverHandler::on_container_start(container &c)
192
198
logger (debug) << " [on_container_start] Transaction action: " << tx_action;
193
199
logger (debug) << " [on_container_start] Transaction endloop action: " << tx_endloop_action;
194
200
logger (trace) << " [on_container_start] Messages count: " << count;
195
- logger (debug) << " [on_container_start] Messages confirmed : " << confirmed ;
201
+ logger (debug) << " [on_container_start] Messages processed : " << processed ;
196
202
logger (debug) << " [on_container_start] Peer to Peer: " << recv_listen;
197
203
198
204
if (recv_listen == " true" ) {
@@ -364,6 +370,11 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
364
370
{
365
371
logger (debug) << " [on_message] Processing received message" ;
366
372
373
+ tx.accept (d);
374
+ current_batch += 1 ;
375
+
376
+ logger (debug) << " [on_message] current batch: " << current_batch;
377
+
367
378
if (log_msgs == " dict" ) {
368
379
logger (trace) << " [on_message] Decoding message" ;
369
380
ReactorDecoder decoder = ReactorDecoder (m);
@@ -384,19 +395,15 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
384
395
385
396
if (duration_time > 0 && duration_mode == " after-receive" ) {
386
397
logger (debug) << " [on_message] Waiting..." ;
387
- sleep4next (ts, count, duration_time, confirmed );
398
+ sleep4next (ts, count, duration_time, processed + current_batch );
388
399
}
389
400
390
- if ((confirmed % msg_action_size) == 0 ) {
401
+ if (((processed + current_batch) % msg_action_size) == 0 ) {
391
402
do_message_action (d);
392
403
}
393
404
394
405
if (duration_time > 0 && duration_mode == " after-receive-action" ) {
395
- sleep4next (ts, count, duration_time, confirmed);
396
- }
397
-
398
- if (duration_time > 0 && duration_mode == " after-receive-action-tx-action" ) {
399
- // TODO: not implemented yet
406
+ sleep4next (ts, count, duration_time, processed + current_batch);
400
407
}
401
408
402
409
logger (debug) << " [on_message] Process-reply-to: " << process_reply_to;
@@ -411,12 +418,12 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
411
418
}
412
419
}
413
420
414
- if (recv_drain_after_credit_window && confirmed == recv_credit_window) {
421
+ if (recv_drain_after_credit_window && processed + current_batch == recv_credit_window) {
415
422
logger (debug) << " [on_message] Scheduling drain" ;
416
423
d.receiver ().work_queue ().add (make_work (&TxReceiverHandler::drain, this ));
417
424
}
418
425
419
- if (!process_reply_to && confirmed == count) {
426
+ if (!process_reply_to && processed + current_batch == count) {
420
427
if (durable_subscriber) {
421
428
d.receiver ().detach ();
422
429
} else {
@@ -429,32 +436,46 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
429
436
#endif
430
437
}
431
438
432
- tx.accept (d);
433
- current_batch += 1 ;
434
- logger (debug) << " [on_message] current batch: " << current_batch;
435
- if (confirmed + current_batch == count) {
436
- logger (debug) << " [on_message] Transaction attempt (endloop): " << tx_endloop_action;
437
- if (tx_endloop_action == " commit" ) {
439
+ if (current_batch == batch_size) {
440
+ logger (debug) << " [send] Transaction attempt: " << tx_action;
441
+ if (tx_action == " commit" ) {
438
442
tx.commit ();
439
- } else if (tx_endloop_action == " rollback" ) {
443
+ } else if (tx_action == " rollback" ) {
440
444
tx.abort ();
441
445
}
442
- } else if (current_batch == batch_size) {
443
- logger (debug) << " [on_message] messages commited: " << current_batch;
446
+
447
+ if (tx_action == " none" ) {
448
+ if (processed + current_batch == count) {
449
+ recv.connection ().close ();
450
+ } else {
451
+ processed += current_batch;
452
+ current_batch = 0 ;
453
+ sess.declare_transaction (*this );
454
+ }
455
+ }
456
+
457
+ if (duration_time > 0 && duration_mode == " after-receive-action-tx-action" ) {
458
+ // TODO: not implemented yet
459
+ }
460
+
461
+ } else if (count != 0 && processed + current_batch == count) {
462
+ logger (debug) << " [send] Transaction attempt (endloop): " << tx_endloop_action;
444
463
if (tx_endloop_action == " commit" ) {
445
464
tx.commit ();
446
465
} else if (tx_endloop_action == " rollback" ) {
447
466
tx.abort ();
467
+ } else {
468
+ recv.connection ().close ();
448
469
}
449
470
}
450
471
}
451
472
452
473
void TxReceiverHandler::on_transport_close (transport &t) {
453
474
logger (debug) << " [on_transport_close] Closing the transport" ;
454
-
475
+ current_batch = 0 ;
455
476
if (conn_reconnect == " false" ) {
456
477
exit (1 );
457
- } else if (confirmed == count) {
478
+ } else if (processed == count) {
458
479
exit (0 );
459
480
}
460
481
}
0 commit comments