Skip to content

Commit a49f1db

Browse files
author
Petr Matousek
committed
use session insted of transaction argument in event calls
1 parent d80ee5c commit a49f1db

File tree

4 files changed

+90
-90
lines changed

4 files changed

+90
-90
lines changed

src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp

+21-20
Original file line numberDiff line numberDiff line change
@@ -133,55 +133,55 @@ int TxReceiverHandler::getBatchSize() const
133133
// reactor methods
134134

135135
void TxReceiverHandler::on_session_open(session &s) {
136-
sess = s;
137136
logger(trace) << "[on_session_open] declare_txn started...";
138137
s.declare_transaction(*this);
139138
logger(trace) << "[on_session_open] declare_txn ended...";
140139
logger(debug) << "[on_session_open] transaction batch size: " << batch_size;
141140
}
142141

143-
void TxReceiverHandler::on_transaction_declare_failed(transaction) {}
142+
void TxReceiverHandler::on_transaction_declare_failed(session) {}
144143

145-
void TxReceiverHandler::on_transaction_commit_failed(transaction t) {
144+
void TxReceiverHandler::on_transaction_commit_failed(session s) {
146145
logger(debug) << "[on_transaction_commit_failed] Transaction Commit Failed";
147-
t.connection().close();
146+
s.connection().close();
148147
exit(-1);
149148
}
150149

151-
void TxReceiverHandler::on_transaction_declared(transaction t) {
150+
void TxReceiverHandler::on_transaction_declared(session s) {
152151
// TODO python some weird magic around count 0, doesn't make much sense to me yet
153152
// when fixes take care about all count checks ofr zero
154153
if (count != 0 && processed + batch_size > count) {
155154
batch_size = count % batch_size;
156155
} else if (count != 0) {
157156
batch_size = count;
158157
}
159-
logger(trace) << "[on_transaction_declared] txn called " << (&t);
160-
logger(debug) << "[on_transaction_declared] txn is_empty " << (t.is_empty());
161-
tx = t;
158+
logger(trace) << "[on_transaction_declared] txn called " << (&s);
159+
logger(debug) << "[on_transaction_declared] txn is_empty " << (s.txn_is_empty());
160+
// TODO
161+
// tx = t;
162162
}
163163

164-
void TxReceiverHandler::on_transaction_aborted(transaction t) {
164+
void TxReceiverHandler::on_transaction_aborted(session s) {
165165
processed += current_batch;
166166
current_batch = 0;
167167
logger(debug) << "[on_transaction_aborted] messages aborted, processed: " << processed;
168168
if (count == 0 || processed < count) {
169-
sess.declare_transaction(*this);
169+
s.declare_transaction(*this);
170170
} else {
171171
logger(info) << "[on_transaction_committed] All messages processed";
172-
t.connection().close();
172+
s.connection().close();
173173
}
174174
}
175175

176-
void TxReceiverHandler::on_transaction_committed(transaction t) {
176+
void TxReceiverHandler::on_transaction_committed(session s) {
177177
processed += current_batch;
178178
current_batch = 0;
179179
logger(debug) << "[on_transaction_aborted] messages committed, processed: " << processed;
180180
if (count == 0 || processed < count) {
181-
sess.declare_transaction(*this);
181+
s.declare_transaction(*this);
182182
} else {
183183
logger(info) << "[on_transaction_committed] All messages processed";
184-
t.connection().close();
184+
s.connection().close();
185185
}
186186
}
187187

@@ -370,7 +370,8 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
370370
{
371371
logger(debug) << "[on_message] Processing received message";
372372

373-
tx.accept(d);
373+
session s = d.session();
374+
s.txn_accept(d);
374375
current_batch += 1;
375376

376377
logger(debug) << "[on_message] current batch: " << current_batch;
@@ -439,9 +440,9 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
439440
if(current_batch == batch_size) {
440441
logger(debug) << "[send] Transaction attempt: " << tx_action;
441442
if (tx_action == "commit") {
442-
tx.commit();
443+
s.txn_commit();
443444
} else if (tx_action == "rollback") {
444-
tx.abort();
445+
s.txn_abort();
445446
}
446447

447448
if (tx_action == "none") {
@@ -450,7 +451,7 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
450451
} else {
451452
processed += current_batch;
452453
current_batch = 0;
453-
sess.declare_transaction(*this);
454+
s.declare_transaction(*this);
454455
}
455456
}
456457

@@ -461,9 +462,9 @@ void TxReceiverHandler::on_message(delivery &d, message &m)
461462
} else if (count != 0 && processed + current_batch == count) {
462463
logger(debug) << "[send] Transaction attempt (endloop): " << tx_endloop_action;
463464
if (tx_endloop_action == "commit") {
464-
tx.commit();
465+
s.txn_commit();
465466
} else if (tx_endloop_action == "rollback") {
466-
tx.abort();
467+
s.txn_abort();
467468
} else {
468469
recv.connection().close();
469470
}

src/api/qpid-proton/reactor/handler/TxReceiverHandler.h

+5-7
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,11 @@ class TxReceiverHandler : public ReceiverHandler, transaction_handler {
145145

146146
// reactor method
147147
void on_session_open(session &s);
148-
void on_transaction_declare_failed(transaction);
149-
void on_transaction_commit_failed(transaction t);
150-
void on_transaction_declared(transaction t);
151-
void on_transaction_committed(transaction t);
152-
void on_transaction_aborted(transaction t);
148+
void on_transaction_declare_failed(session);
149+
void on_transaction_commit_failed(session s);
150+
void on_transaction_declared(session s);
151+
void on_transaction_committed(session s);
152+
void on_transaction_aborted(session s);
153153

154154
// overrides
155155
void on_container_start(container &c);
@@ -164,8 +164,6 @@ class TxReceiverHandler : public ReceiverHandler, transaction_handler {
164164
int processed = 0;
165165
string tx_action = "commit";
166166
string tx_endloop_action = "commit";
167-
transaction tx;
168-
session sess;
169167
};
170168

171169
} /* namespace reactor */

src/api/qpid-proton/reactor/handler/TxSenderHandler.cpp

+58-55
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,20 @@ int TxSenderHandler::getBatchSize() const
110110
return batch_size;
111111
}
112112

113-
void TxSenderHandler::checkIfCanSend() {
114-
if (processed < count) {
115-
work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this));
116-
117-
if (sndr.credit() > 0) {
118-
send();
119-
} else {
120-
ready = true;
121-
}
122-
}
123-
}
124-
125-
void TxSenderHandler::send()
113+
// void TxSenderHandler::checkIfCanSend() {
114+
// if (processed < count) {
115+
// work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this));
116+
//
117+
// if (sndr.credit() > 0) {
118+
// logger(debug) << "[checkIfCanSend] Preparing to send message";
119+
// send();
120+
// } else {
121+
// ready = true;
122+
// }
123+
// }
124+
// }
125+
126+
void TxSenderHandler::send(session s)
126127
{
127128
logger(debug) << "[send] Preparing to send message";
128129
int credit = sndr.credit();
@@ -150,12 +151,14 @@ void TxSenderHandler::send()
150151
}
151152

152153

153-
logger(trace) << "[send] Transaction is empty: " << tx.is_empty();
154+
logger(trace) << "[send] Transaction is empty: " << s.txn_is_empty();
154155
logger(debug) << "[send] Messages processed: " << processed;
155156
logger(trace) << "[send] Current batch: " << current_batch;
156-
while (!tx.is_empty() && sndr.credit() && (processed + current_batch) < count)
157+
while (s.txn_is_declared() && sndr.credit() && (processed + current_batch) < count)
157158
{
158-
tx.send(sndr, message_to_send);
159+
logger(trace) << "[send] Sending messages through the link NAZDAR";
160+
s.txn_send(sndr, message_to_send);
161+
logger(trace) << "[send] Sending messages through the link BAZAR";
159162
current_batch += 1;
160163

161164
if (log_msgs == "dict") {
@@ -185,27 +188,26 @@ void TxSenderHandler::send()
185188
if(current_batch == batch_size) {
186189
logger(debug) << "[send] Transaction attempt: " << tx_action;
187190
if (tx_action == "commit") {
188-
tx.commit();
191+
s.txn_commit();
189192
} else if (tx_action == "rollback") {
190-
tx.abort();
193+
s.txn_abort();
191194
}
192-
tx = transaction();
193195

194196
if (tx_action == "none") {
195197
if (processed + current_batch == count) {
196198
sndr.connection().close();
197199
} else {
198200
processed += current_batch;
199201
current_batch = 0;
200-
sess.declare_transaction(*this);
202+
s.declare_transaction(*this);
201203
}
202204
}
203205
} else if (processed + current_batch == count) {
204206
logger(debug) << "[send] Transaction attempt (endloop): " << tx_endloop_action;
205207
if (tx_endloop_action == "commit") {
206-
tx.commit();
208+
s.txn_commit();
207209
} else if (tx_endloop_action == "rollback") {
208-
tx.abort();
210+
s.txn_abort();
209211
}
210212
sndr.connection().close();
211213
}
@@ -221,9 +223,10 @@ void TxSenderHandler::send()
221223

222224
void TxSenderHandler::on_sendable(sender &s)
223225
{
224-
logger(trace) << "[on_sendable] transaction: " << &tx;
226+
logger(trace) << "[on_sendable] IS THIS METHOD EVER CALLED IN TX MODE???";
227+
logger(trace) << "[on_sendable] transaction: " << &s;
225228
if (ready) {
226-
send();
229+
send(s.session());
227230
}
228231
}
229232

@@ -238,39 +241,39 @@ void TxSenderHandler::on_connection_close(connection &c)
238241
logger(debug) << "[on_connection_close] Closing connection";
239242
}
240243

241-
void TxSenderHandler::on_transaction_declared(transaction t) {
242-
logger(trace) << "[on_transaction_declared] txn called " << (&t);
243-
logger(trace) << "[on_transaction_declared] txn is_empty " << (t.is_empty())
244-
<< "\t" << tx.is_empty();
245-
tx = t;
246-
send();
244+
void TxSenderHandler::on_transaction_declared(session s) {
245+
logger(trace) << "[on_transaction_declared] txn called " << (&s);
246+
logger(trace) << "[on_transaction_declared] txn is_empty " << (s.txn_is_empty())
247+
<< "\t" << s.txn_is_empty();
248+
// tx = t;
249+
send(s);
247250
}
248251

249-
void TxSenderHandler::on_transaction_committed(transaction t) {
252+
void TxSenderHandler::on_transaction_committed(session s) {
250253
logger(trace) << "[on_transaction_committed] Messages committed";
251254
processed += current_batch;
252255
logger(debug) << "[on_transaction_committed] Messages processed" << processed;
253256
if (processed == count) {
254257
logger(trace) << "[on_transaction_committed] All messages processed";
255-
t.connection().close();
258+
s.connection().close();
256259
} else {
257260
logger(trace) << "[on_transaction_committed] Declaring new transaction";
258261
current_batch = 0;
259-
sess.declare_transaction(*this);
262+
s.declare_transaction(*this);
260263
}
261264
}
262265

263-
void TxSenderHandler::on_transaction_aborted(transaction t) {
266+
void TxSenderHandler::on_transaction_aborted(session s) {
264267
logger(trace) << "[on_transaction_aborted] Messages aborted";
265268
processed += current_batch;
266-
logger(debug) << "[on_transaction_committed] Messages processed" << processed;
269+
logger(debug) << "[on_transaction_aborted] Messages processed" << processed;
267270
if (processed == count) {
268271
logger(trace) << "[on_transaction_aborted] All messages processed";
269-
t.connection().close();
272+
s.connection().close();
270273
} else {
271-
logger(trace) << "[on_transaction_committed] Declaring new transaction";
274+
logger(trace) << "[on_transaction_aborted] Declaring new transaction";
272275
current_batch = 0;
273-
sess.declare_transaction(*this);
276+
s.declare_transaction(*this);
274277
}
275278
}
276279

@@ -279,7 +282,6 @@ void TxSenderHandler::on_sender_close(sender &s) {
279282
}
280283

281284
void TxSenderHandler::on_session_open(session &s) {
282-
sess = s;
283285
logger(trace) << "[on_session_open] declare_txn started...";
284286
s.declare_transaction(*this);
285287
logger(trace) << "[on_session_open] declare_txn ended...";
@@ -366,26 +368,27 @@ void TxSenderHandler::on_container_start(container &c)
366368

367369
logger(trace) << "[on_container_start] Interval for duration: " << interval.milliseconds() << " ms";
368370
}
369-
#if defined(__REACTOR_HAS_TIMER)
370-
work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::timerEvent, this));
371-
372-
if (duration_time > 0 && duration_mode == "after-send") {
373-
work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this));
374-
} else if (duration_time > 0 && duration_mode == "before-send") {
375-
work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this));
376-
} else {
377-
work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this));
378-
}
379-
#endif
380-
381-
tx = transaction();
371+
// TODO
372+
// #if defined(__REACTOR_HAS_TIMER)
373+
// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::timerEvent, this));
374+
//
375+
// if (duration_time > 0 && duration_mode == "after-send") {
376+
// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this));
377+
// } else if (duration_time > 0 && duration_mode == "before-send") {
378+
// work_q->schedule(interval, make_work(&TxSenderHandler::checkIfCanSend, this));
379+
// } else {
380+
// work_q->schedule(duration::IMMEDIATE, make_work(&TxSenderHandler::checkIfCanSend, this));
381+
// }
382+
// #endif
383+
384+
// tx = transaction();
382385
}
383386

384-
void TxSenderHandler::on_transaction_declare_failed(transaction) {}
387+
void TxSenderHandler::on_transaction_declare_failed(session) {}
385388

386-
void TxSenderHandler::on_transaction_commit_failed(transaction t) {
389+
void TxSenderHandler::on_transaction_commit_failed(session s) {
387390
logger(error) << "[on_transaction_commit_failed] Transaction Commit Failed";
388-
t.connection().close();
391+
s.connection().close();
389392
exit(1);
390393
}
391394

src/api/qpid-proton/reactor/handler/TxSenderHandler.h

+6-8
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,15 @@ class TxSenderHandler : public SenderHandler, transaction_handler {
115115

116116
// overrides
117117
void checkIfCanSend();
118-
void send();
118+
void send(session sess);
119119

120120
// reactor methods
121121
void on_sender_close(sender &s);
122-
void on_transaction_declared(transaction t);
123-
void on_transaction_committed(transaction t);
124-
void on_transaction_aborted(transaction t);
125-
void on_transaction_declare_failed(transaction t);
126-
void on_transaction_commit_failed(transaction t);
122+
void on_transaction_declared(session s);
123+
void on_transaction_committed(session s);
124+
void on_transaction_aborted(session s);
125+
void on_transaction_declare_failed(session s);
126+
void on_transaction_commit_failed(session s);
127127

128128
// overrides
129129
void on_container_start(container &c);
@@ -140,8 +140,6 @@ class TxSenderHandler : public SenderHandler, transaction_handler {
140140
int processed = 0;
141141
string tx_action = "commit";
142142
string tx_endloop_action = "commit";
143-
transaction tx;
144-
session sess;
145143
};
146144

147145
} /* namespace reactor */

0 commit comments

Comments
 (0)