Skip to content

Commit

Permalink
process mailbox messages in sequential order per trade
Browse files Browse the repository at this point in the history
  • Loading branch information
woodser committed Dec 4, 2023
1 parent 1576aa5 commit 4b349e2
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 74 deletions.
34 changes: 7 additions & 27 deletions core/src/main/java/haveno/core/support/SupportManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import haveno.core.support.messages.SupportMessage;
import haveno.core.trade.Trade;
import haveno.core.trade.TradeManager;
import haveno.core.trade.protocol.TradeProtocol;
import haveno.core.trade.protocol.TradeProtocol.MailboxMessageComparator;
import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.AckMessage;
import haveno.network.p2p.AckMessageSourceType;
Expand All @@ -40,10 +38,10 @@
import haveno.network.p2p.SendMailboxMessageListener;
import haveno.network.p2p.mailbox.MailboxMessage;
import haveno.network.p2p.mailbox.MailboxMessageService;
import haveno.network.p2p.mailbox.MailboxMessageService.DecryptedMessageWithPubKeyComparator;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -82,19 +80,19 @@ public SupportManager(P2PService p2PService,

// We get first the message handler called then the onBootstrapped
p2PService.addDecryptedDirectMessageListener((decryptedMessageWithPubKey, senderAddress) -> {
if (isReady()) applyDirectMessage(decryptedMessageWithPubKey);
else {
synchronized (lock) {
synchronized (lock) {
if (isReady()) applyDirectMessage(decryptedMessageWithPubKey);
else {
// As decryptedDirectMessageWithPubKeys is a CopyOnWriteArraySet we do not need to check if it was already stored
decryptedDirectMessageWithPubKeys.add(decryptedMessageWithPubKey);
tryApplyMessages();
}
}
});
mailboxMessageService.addDecryptedMailboxListener((decryptedMessageWithPubKey, senderAddress) -> {
if (isReady()) applyMailboxMessage(decryptedMessageWithPubKey);
else {
synchronized (lock) {
synchronized (lock) {
if (isReady()) applyMailboxMessage(decryptedMessageWithPubKey);
else {
// As decryptedMailboxMessageWithPubKeys is a CopyOnWriteArraySet we do not need to check if it was already stored
decryptedDirectMessageWithPubKeys.add(decryptedMessageWithPubKey);
tryApplyMessages();
Expand Down Expand Up @@ -395,22 +393,4 @@ private void applyMailboxMessage(DecryptedMessageWithPubKey decryptedMessageWith
mailboxMessageService.removeMailboxMsg(ackMessage);
}
}

private static class DecryptedMessageWithPubKeyComparator implements Comparator<DecryptedMessageWithPubKey> {

MailboxMessageComparator mailboxMessageComparator;
public DecryptedMessageWithPubKeyComparator() {
mailboxMessageComparator = new TradeProtocol.MailboxMessageComparator();
}

@Override
public int compare(DecryptedMessageWithPubKey m1, DecryptedMessageWithPubKey m2) {
if (m1.getNetworkEnvelope() instanceof MailboxMessage) {
if (m2.getNetworkEnvelope() instanceof MailboxMessage) return mailboxMessageComparator.compare((MailboxMessage) m1.getNetworkEnvelope(), (MailboxMessage) m2.getNetworkEnvelope());
else return 1;
} else {
return m2.getNetworkEnvelope() instanceof MailboxMessage ? -1 : 0;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ public void sendDisputeOpenedMessage(Dispute dispute,
disputeList.add(dispute);
}

// create dispute opened message
NodeAddress agentNodeAddress = getAgentNodeAddress(dispute);
DisputeOpenedMessage disputeOpenedMessage = new DisputeOpenedMessage(dispute,
p2PService.getAddress(),
Expand All @@ -367,6 +368,8 @@ public void sendDisputeOpenedMessage(Dispute dispute,
disputeOpenedMessage.getTradeId(), disputeOpenedMessage.getUid(),
chatMessage.getUid());
recordPendingMessage(disputeOpenedMessage.getClass().getSimpleName());

// send dispute opened message
mailboxMessageService.sendEncryptedMailboxMessage(agentNodeAddress,
dispute.getAgentPubKeyRing(),
disputeOpenedMessage,
Expand Down Expand Up @@ -436,7 +439,7 @@ public void onFault(String errorMessage) {
// arbitrator receives dispute opened message from opener, opener's peer receives from arbitrator
protected void handleDisputeOpenedMessage(DisputeOpenedMessage message) {
Dispute dispute = message.getDispute();
log.info("{}.onDisputeOpenedMessage() with trade {}, dispute {}", getClass().getSimpleName(), dispute.getTradeId(), dispute.getId());
log.info("Processing {} with trade {}, dispute {}", message.getClass().getSimpleName(), dispute.getTradeId(), dispute.getId());

// get trade
Trade trade = tradeManager.getTrade(dispute.getTradeId());
Expand Down Expand Up @@ -467,6 +470,7 @@ protected void handleDisputeOpenedMessage(DisputeOpenedMessage message) {
DisputeValidation.validateSenderNodeAddress(dispute, message.getSenderNodeAddress());
//DisputeValidation.testIfDisputeTriesReplay(dispute, disputeList.getList());
} catch (DisputeValidation.ValidationException e) {
e.printStackTrace();
validationExceptions.add(e);
throw e;
}
Expand All @@ -476,6 +480,7 @@ protected void handleDisputeOpenedMessage(DisputeOpenedMessage message) {
try {
DisputeValidation.validatePaymentAccountPayload(dispute);
} catch (Exception e) {
e.printStackTrace();
log.warn(e.getMessage());
trade.prependErrorMessage(e.getMessage());
}
Expand All @@ -499,7 +504,7 @@ protected void handleDisputeOpenedMessage(DisputeOpenedMessage message) {

// update multisig hex
if (message.getUpdatedMultisigHex() != null) sender.setUpdatedMultisigHex(message.getUpdatedMultisigHex());
trade.importMultisigHex();
if (trade.walletExists()) trade.importMultisigHex();

// add chat message with price info
if (trade instanceof ArbitratorTrade) addPriceInfoMessage(dispute, 0);
Expand All @@ -518,8 +523,7 @@ protected void handleDisputeOpenedMessage(DisputeOpenedMessage message) {
errorMessage = null;
} else {
// valid case if both have opened a dispute and agent was not online
log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}",
dispute.getTradeId());
log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}", dispute.getTradeId());
}

// add chat message with mediation info if applicable
Expand All @@ -529,6 +533,7 @@ protected void handleDisputeOpenedMessage(DisputeOpenedMessage message) {
}
}
} catch (Exception e) {
e.printStackTrace();
errorMessage = e.getMessage();
log.warn(errorMessage);
if (trade != null) trade.setErrorMessage(errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void onSupportMessage(SupportMessage message) {
log.info("Received {} from {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), message.getSenderNodeAddress(), message.getTradeId(), message.getUid());

new Thread(() -> {
HavenoUtils.runTask(message.getTradeId(), () -> {
if (message instanceof DisputeOpenedMessage) {
handleDisputeOpenedMessage((DisputeOpenedMessage) message);
} else if (message instanceof ChatMessage) {
Expand All @@ -126,7 +126,7 @@ public void onSupportMessage(SupportMessage message) {
} else {
log.warn("Unsupported message at dispatchMessage. message={}", message);
}
}).start();
});
}
}

Expand Down Expand Up @@ -221,7 +221,7 @@ private void handleDisputeClosedMessage(DisputeClosedMessage disputeClosedMessag
String summaryText = chatMessage.getMessage();
if (summaryText == null || summaryText.isEmpty()) throw new IllegalArgumentException("Summary text for dispute is missing, tradeId=" + tradeId + (dispute == null ? "" : ", disputeId=" + dispute.getId()));
if (dispute != null) DisputeSummaryVerification.verifySignature(summaryText, dispute.getAgentPubKeyRing()); // use dispute's arbitrator pub key ring
else DisputeSummaryVerification.verifySignature(summaryText, arbitratorManager); // verify using registered arbitrator (will fail is arbitrator is unregistered)
else DisputeSummaryVerification.verifySignature(summaryText, arbitratorManager); // verify using registered arbitrator (will fail if arbitrator is unregistered)

// save dispute closed message for reprocessing
trade.getArbitrator().setDisputeClosedMessage(disputeClosedMessage);
Expand Down Expand Up @@ -253,11 +253,9 @@ private void handleDisputeClosedMessage(DisputeClosedMessage disputeClosedMessag
trade.saveWallet();
}

// import multisig hex
if (trade.walletExists()) {
if (disputeClosedMessage.getUpdatedMultisigHex() != null) trade.getArbitrator().setUpdatedMultisigHex(disputeClosedMessage.getUpdatedMultisigHex());
trade.importMultisigHex();
}
// update multisig hex
if (disputeClosedMessage.getUpdatedMultisigHex() != null) trade.getArbitrator().setUpdatedMultisigHex(disputeClosedMessage.getUpdatedMultisigHex());
if (trade.walletExists()) trade.importMultisigHex();

// attempt to sign and publish dispute payout tx if given and not already published
if (disputeClosedMessage.getUnsignedPayoutTxHex() != null && !trade.isPayoutPublished()) {
Expand All @@ -270,7 +268,9 @@ private void handleDisputeClosedMessage(DisputeClosedMessage disputeClosedMessag
}

// sign and publish dispute payout tx if peer still has not published
if (!trade.isPayoutPublished()) {
if (trade.isPayoutPublished()) {
log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
} else {
try {
log.info("Signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId());
signAndPublishDisputePayoutTx(trade);
Expand All @@ -284,14 +284,17 @@ private void handleDisputeClosedMessage(DisputeClosedMessage disputeClosedMessag
throw new RuntimeException("Failed to sign and publish dispute payout tx from arbitrator: " + e.getMessage() + ". TradeId = " + tradeId);
}
}
} else {
log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
}
} else {
if (trade.isPayoutPublished()) log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
else if (disputeClosedMessage.getUnsignedPayoutTxHex() == null) log.info("{} did not receive unsigned dispute payout tx for trade {} because the arbitrator did not have their updated multisig info (can happen if trader went offline after trade started)", trade.getClass().getSimpleName(), trade.getId());
}

// complete disputed trade
if (trade.isPayoutPublished()) {
tradeManager.closeDisputedTrade(trade.getId(), Trade.DisputeState.DISPUTE_CLOSED);
}

// We use the chatMessage as we only persist those not the DisputeClosedMessage.
// If we would use the DisputeClosedMessage we could not lookup for the msg when we receive the AckMessage.
sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), true, null);
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/haveno/core/trade/HavenoUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -75,6 +77,7 @@ public class HavenoUtils {
public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");
private static final int POOL_SIZE = 10;
private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE);
private static final Map<String, ExecutorService> POOLS = new HashMap<>();

// TODO: better way to share references?
public static ArbitrationManager arbitrationManager;
Expand Down Expand Up @@ -467,6 +470,22 @@ public static void awaitLatch(CountDownLatch latch) {
}
}

public static Future<?> runTask(String threadId, Runnable task) {
synchronized (POOLS) {
if (!POOLS.containsKey(threadId)) POOLS.put(threadId, Executors.newFixedThreadPool(1));
return POOLS.get(threadId).submit(task);
}
}

public static void removeThreadId(String threadId) {
synchronized (POOLS) {
if (POOLS.containsKey(threadId)) {
POOLS.get(threadId).shutdown();
POOLS.remove(threadId);
}
}
}

/**
* Submit tasks to a global thread pool.
*/
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/haveno/core/trade/TradeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import haveno.core.trade.messages.InitTradeRequest;
import haveno.core.trade.messages.SignContractRequest;
import haveno.core.trade.messages.SignContractResponse;
import haveno.core.trade.messages.TradeMessage;
import haveno.core.trade.protocol.ArbitratorProtocol;
import haveno.core.trade.protocol.MakerProtocol;
import haveno.core.trade.protocol.ProcessModel;
Expand Down Expand Up @@ -232,7 +233,9 @@ public void readPersisted(Runnable completeHandler) {
@Override
public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress peer) {
NetworkEnvelope networkEnvelope = message.getNetworkEnvelope();
new Thread(() -> {
if (!(networkEnvelope instanceof TradeMessage)) return;
String tradeId = ((TradeMessage) networkEnvelope).getTradeId();
HavenoUtils.runTask(tradeId, () -> {
if (networkEnvelope instanceof InitTradeRequest) {
handleInitTradeRequest((InitTradeRequest) networkEnvelope, peer);
} else if (networkEnvelope instanceof InitMultisigRequest) {
Expand All @@ -246,7 +249,7 @@ public void onDirectMessage(DecryptedMessageWithPubKey message, NodeAddress peer
} else if (networkEnvelope instanceof DepositResponse) {
handleDepositResponse((DepositResponse) networkEnvelope, peer);
}
}).start();
});
}


Expand Down Expand Up @@ -1202,6 +1205,7 @@ private void removeTrade(Trade trade) {

// remove trade
tradableList.remove(trade);
HavenoUtils.removeThreadId(trade.getId());

// unregister and persist
p2PService.removeDecryptedDirectMessageListener(getTradeProtocol(trade));
Expand Down
35 changes: 21 additions & 14 deletions core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D

private int reprocessPaymentReceivedMessageCount;

// set comparator for processing mailbox messages
static {
MailboxMessageService.setMailboxMessageComparator(new MailboxMessageComparator());
}

///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -245,12 +250,14 @@ protected void onInitialized() {
if (!trade.isCompleted()) processModel.getP2PService().addDecryptedDirectMessageListener(this);

// initialize trade
trade.initialize(processModel.getProvider());
synchronized (trade) {
trade.initialize(processModel.getProvider());

// process mailbox messages
MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService();
if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this);
handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages());
// process mailbox messages
MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService();
if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this);
handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages());
}

// send deposits confirmed message if applicable
maybeSendDepositsConfirmedMessage();
Expand Down Expand Up @@ -477,18 +484,18 @@ protected void handle(PaymentSentMessage message, NodeAddress peer) {
handleTaskRunnerSuccess(peer, message);
return;
}
if (trade.getPayoutTx() != null) {
log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(peer, message, true, null);
removeMailboxMessageAfterProcessing(message);
return;
}
latchTrade();
expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED)
.with(message)
.from(peer)
.preCondition(trade.getPayoutTx() == null,
() -> {
log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(peer, message, true, null);
removeMailboxMessageAfterProcessing(message);
}))
.from(peer))
.setup(tasks(
ApplyFilter.class,
ProcessPaymentSentMessage.class,
Expand Down
Loading

0 comments on commit 4b349e2

Please sign in to comment.