Skip to content

Commit

Permalink
transactional receiver first version fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Matousek committed Jan 9, 2025
1 parent 8d4974e commit 260d099
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
26 changes: 13 additions & 13 deletions src/api/qpid-proton/reactor/handler/TxReceiverHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,44 +239,44 @@ int TxReceiverHandler::getBatchSize() const

// reactor methods

void TxReceiverHandler::on_session_open(session &s) override {
void TxReceiverHandler::on_session_open(session &s) {
sess = s;
std::cout << " [on_session_open] declare_txn started..." << std::endl;
s.declare_transaction(*this);
std::cout << " [on_session_open] declare_txn ended..." << std::endl;
}

void on_transaction_declare_failed(transaction) {}
void TxReceiverHandler::on_transaction_declare_failed(transaction) {}

void on_transaction_commit_failed(proton::transaction t) {
void TxReceiverHandler::on_transaction_commit_failed(transaction t) {
std::cout << "Transaction Commit Failed" << std::endl;
t.connection().close();
exit(-1);
}

void TxReceiverHandler::on_transaction_declared(transaction t) override {
void TxReceiverHandler::on_transaction_declared(transaction t) {
std::cout << "[on_transaction_declared] txn called " << (&t)
<< std::endl;
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
<< "\t" << transaction.is_empty() << std::endl;
<< "\t" << tx.is_empty() << std::endl;
recv.add_credit(batch_size);
transaction = t;
tx = t;
}

void TxReceiverHandler::on_message(delivery &d, message &msg) override {
void TxReceiverHandler::on_message(delivery &d, message &msg) {
std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl;
transaction.accept(d);
tx.accept(d);
current_batch += 1;
if(current_batch == batch_size) {
transaction = proton::transaction(); // null
tx = transaction(); // null
}
}

void TxReceiverHandler::on_transaction_committed(transaction t) override {
committed += current_batch;
void TxReceiverHandler::on_transaction_committed(transaction t) {
processed += current_batch;
current_batch = 0;
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
if(committed == expected) {
std::cout<<" [OnTxnCommitted] Processed:"<< processed << std::endl;
if(processed == count) {
std::cout << "All messages committed" << std::endl;
t.connection().close();
}
Expand Down
9 changes: 5 additions & 4 deletions src/api/qpid-proton/reactor/handler/TxReceiverHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ using proton::codec::encoder;
using proton::binary;
using proton::uuid;
using proton::transaction;
using proton::session;
using proton::transaction_handler;

#ifdef PN_CPP_HAS_STD_FUNCTION
Expand Down Expand Up @@ -194,9 +195,9 @@ class TxReceiverHandler : public CommonHandler, transaction_handler {
// reactor methods
void on_container_start(container &c);
void on_message(delivery &d, message &m);
void on_receiver_drain_finish(receiver &r);
void on_tracker_accept(tracker &t);
void on_tracker_reject(tracker &t);
// void on_receiver_drain_finish(receiver &r);
// void on_tracker_accept(tracker &t);
// void on_tracker_reject(tracker &t);
void on_transport_close(transport &t);
void on_transport_error(transport &t);
void on_connection_close(connection &conn);
Expand Down Expand Up @@ -259,7 +260,7 @@ class TxReceiverHandler : public CommonHandler, transaction_handler {
string tx_action = "commit";
string tx_endloop_action = "commit";

transaction *tx;
transaction tx;
session sess;
};

Expand Down

0 comments on commit 260d099

Please sign in to comment.