Skip to content

Commit 72df011

Browse files
author
Petr Matousek
committed
refactoring
1 parent a49f1db commit 72df011

File tree

3 files changed

+21
-23
lines changed

3 files changed

+21
-23
lines changed

src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,6 @@ void TxReceiverHandler::on_transaction_declared(session s) {
157157
}
158158
logger(trace) << "[on_transaction_declared] txn called " << (&s);
159159
logger(debug) << "[on_transaction_declared] txn is_empty " << (s.txn_is_empty());
160-
// TODO
161-
// tx = t;
162160
}
163161

164162
void TxReceiverHandler::on_transaction_aborted(session s) {
@@ -168,15 +166,15 @@ void TxReceiverHandler::on_transaction_aborted(session s) {
168166
if (count == 0 || processed < count) {
169167
s.declare_transaction(*this);
170168
} else {
171-
logger(info) << "[on_transaction_committed] All messages processed";
169+
logger(info) << "[on_transaction_aborted] All messages processed";
172170
s.connection().close();
173171
}
174172
}
175173

176174
void TxReceiverHandler::on_transaction_committed(session s) {
177175
processed += current_batch;
178176
current_batch = 0;
179-
logger(debug) << "[on_transaction_aborted] messages committed, processed: " << processed;
177+
logger(debug) << "[on_transaction_committed] messages committed, processed: " << processed;
180178
if (count == 0 || processed < count) {
181179
s.declare_transaction(*this);
182180
} else {
@@ -370,7 +368,9 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
370368
{
371369
logger(debug) << "[on_message] Processing received message";
372370

371+
// TODO legit?
373372
session s = d.session();
373+
374374
s.txn_accept(d);
375375
current_batch += 1;
376376

src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp

+16-18
Original file line numberDiff line numberDiff line change
@@ -110,18 +110,19 @@ int TxSenderHandler::getBatchSize() const
110110
return batch_size;
111111
}
112112

113-
// void TxSenderHandler::checkIfCanSend() {
114-
// if (processed < count) {
115-
// work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this));
116-
//
117-
// if (sndr.credit() > 0) {
118-
// logger(debug) << "[checkIfCanSend] Preparing to send message";
119-
// send();
120-
// } else {
121-
// ready = true;
122-
// }
123-
// }
124-
// }
113+
void TxSenderHandler::checkIfCanSend() {
114+
if (processed < count) {
115+
work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this));
116+
117+
if (sndr.credit() > 0) {
118+
logger(debug) << "[checkIfCanSend] Preparing to send message";
119+
// TODO test w/ session defined
120+
send();
121+
} else {
122+
ready = true;
123+
}
124+
}
125+
}
125126

126127
void TxSenderHandler::send(session s)
127128
{
@@ -156,9 +157,7 @@ void TxSenderHandler::send(session s)
156157
logger(trace) << "[send] Current batch: " << current_batch;
157158
while (s.txn_is_declared() && sndr.credit() && (processed + current_batch) < count)
158159
{
159-
logger(trace) << "[send] Sending messages through the link NAZDAR";
160160
s.txn_send(sndr, message_to_send);
161-
logger(trace) << "[send] Sending messages through the link BAZAR";
162161
current_batch += 1;
163162

164163
if (log_msgs == "dict") {
@@ -223,6 +222,7 @@ void TxSenderHandler::send(session s)
223222

224223
void TxSenderHandler::on_sendable(sender &s)
225224
{
225+
// TODO
226226
logger(trace) << "[on_sendable] IS THIS METHOD EVER CALLED IN TX MODE???";
227227
logger(trace) << "[on_sendable] transaction: " << &s;
228228
if (ready) {
@@ -245,7 +245,6 @@ void TxSenderHandler::on_transaction_declared(session s) {
245245
logger(trace) << "[on_transaction_declared] txn called " << (&s);
246246
logger(trace) << "[on_transaction_declared] txn is_empty " << (s.txn_is_empty())
247247
<< "\t" << s.txn_is_empty();
248-
// tx = t;
249248
send(s);
250249
}
251250

@@ -368,10 +367,11 @@ void TxSenderHandler::on_container_start(container &c)
368367

369368
logger(trace) << "[on_container_start] Interval for duration: " << interval.milliseconds() << " ms";
370369
}
370+
371371
// TODO
372372
// #if defined(__REACTOR_HAS_TIMER)
373373
// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::timerEvent, this));
374-
//
374+
//
375375
// if (duration_time > 0 && duration_mode == "after-send") {
376376
// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this));
377377
// } else if (duration_time > 0 && duration_mode == "before-send") {
@@ -380,8 +380,6 @@ void TxSenderHandler::on_container_start(container &c)
380380
// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this));
381381
// }
382382
// #endif
383-
384-
// tx = transaction();
385383
}
386384

387385
void TxSenderHandler::on_transaction_declare_failed(session) {}

src/api/qpid-proton/reactor/handler/TxSenderHandler.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class TxSenderHandler : public SenderHandler, transaction_handler {
115115

116116
// overrides
117117
void checkIfCanSend();
118-
void send(session sess);
118+
void send(session s);
119119

120120
// reactor methods
121121
void on_sender_close(sender &s);

0 commit comments

Comments
 (0)