Skip to content

Commit

Permalink
process mailbox messages in sequential order
Browse files Browse the repository at this point in the history
  • Loading branch information
woodser committed Dec 3, 2023
1 parent 38bbfaf commit 50744e8
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 60 deletions.
34 changes: 7 additions & 27 deletions core/src/main/java/haveno/core/support/SupportManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import haveno.core.support.messages.SupportMessage;
import haveno.core.trade.Trade;
import haveno.core.trade.TradeManager;
import haveno.core.trade.protocol.TradeProtocol;
import haveno.core.trade.protocol.TradeProtocol.MailboxMessageComparator;
import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.AckMessage;
import haveno.network.p2p.AckMessageSourceType;
Expand All @@ -40,10 +38,10 @@
import haveno.network.p2p.SendMailboxMessageListener;
import haveno.network.p2p.mailbox.MailboxMessage;
import haveno.network.p2p.mailbox.MailboxMessageService;
import haveno.network.p2p.mailbox.MailboxMessageService.DecryptedMessageWithPubKeyComparator;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -82,19 +80,19 @@ public SupportManager(P2PService p2PService,

// We get first the message handler called then the onBootstrapped
p2PService.addDecryptedDirectMessageListener((decryptedMessageWithPubKey, senderAddress) -> {
if (isReady()) applyDirectMessage(decryptedMessageWithPubKey);
else {
synchronized (lock) {
synchronized (lock) {
if (isReady()) applyDirectMessage(decryptedMessageWithPubKey);
else {
// As decryptedDirectMessageWithPubKeys is a CopyOnWriteArraySet we do not need to check if it was already stored
decryptedDirectMessageWithPubKeys.add(decryptedMessageWithPubKey);
tryApplyMessages();
}
}
});
mailboxMessageService.addDecryptedMailboxListener((decryptedMessageWithPubKey, senderAddress) -> {
if (isReady()) applyMailboxMessage(decryptedMessageWithPubKey);
else {
synchronized (lock) {
synchronized (lock) {
if (isReady()) applyMailboxMessage(decryptedMessageWithPubKey);
else {
// As decryptedMailboxMessageWithPubKeys is a CopyOnWriteArraySet we do not need to check if it was already stored
decryptedDirectMessageWithPubKeys.add(decryptedMessageWithPubKey);
tryApplyMessages();
Expand Down Expand Up @@ -395,22 +393,4 @@ private void applyMailboxMessage(DecryptedMessageWithPubKey decryptedMessageWith
mailboxMessageService.removeMailboxMsg(ackMessage);
}
}

private static class DecryptedMessageWithPubKeyComparator implements Comparator<DecryptedMessageWithPubKey> {

MailboxMessageComparator mailboxMessageComparator;
public DecryptedMessageWithPubKeyComparator() {
mailboxMessageComparator = new TradeProtocol.MailboxMessageComparator();
}

@Override
public int compare(DecryptedMessageWithPubKey m1, DecryptedMessageWithPubKey m2) {
if (m1.getNetworkEnvelope() instanceof MailboxMessage) {
if (m2.getNetworkEnvelope() instanceof MailboxMessage) return mailboxMessageComparator.compare((MailboxMessage) m1.getNetworkEnvelope(), (MailboxMessage) m2.getNetworkEnvelope());
else return 1;
} else {
return m2.getNetworkEnvelope() instanceof MailboxMessage ? -1 : 0;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,13 @@ public void onAllServicesInitialized() {
p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
tryApplyMessages();
new Thread(() -> tryApplyMessages()).start();
}
});

xmrWalletService.downloadPercentageProperty().addListener((observable, oldValue, newValue) -> {
if (xmrWalletService.isSyncedWithinTolerance())
tryApplyMessages();
new Thread(() -> tryApplyMessages()).start();
});

tryApplyMessages();
Expand Down Expand Up @@ -518,8 +518,7 @@ protected void handleDisputeOpenedMessage(DisputeOpenedMessage message) {
errorMessage = null;
} else {
// valid case if both have opened a dispute and agent was not online
log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}",
dispute.getTradeId());
log.debug("We got a dispute already open for that trade and trading peer. TradeId = {}", dispute.getTradeId());
}

// add chat message with mediation info if applicable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,15 @@ public void onSupportMessage(SupportMessage message) {
log.info("Received {} from {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), message.getSenderNodeAddress(), message.getTradeId(), message.getUid());

new Thread(() -> {
if (message instanceof DisputeOpenedMessage) {
handleDisputeOpenedMessage((DisputeOpenedMessage) message);
} else if (message instanceof ChatMessage) {
handleChatMessage((ChatMessage) message);
} else if (message instanceof DisputeClosedMessage) {
handleDisputeClosedMessage((DisputeClosedMessage) message);
} else {
log.warn("Unsupported message at dispatchMessage. message={}", message);
}
}).start();
if (message instanceof DisputeOpenedMessage) {
handleDisputeOpenedMessage((DisputeOpenedMessage) message);
} else if (message instanceof ChatMessage) {
handleChatMessage((ChatMessage) message);
} else if (message instanceof DisputeClosedMessage) {
handleDisputeClosedMessage((DisputeClosedMessage) message);
} else {
log.warn("Unsupported message at dispatchMessage. message={}", message);
}
}
}

Expand Down Expand Up @@ -270,7 +268,9 @@ private void handleDisputeClosedMessage(DisputeClosedMessage disputeClosedMessag
}

// sign and publish dispute payout tx if peer still has not published
if (!trade.isPayoutPublished()) {
if (trade.isPayoutPublished()) {
log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
} else {
try {
log.info("Signing and publishing dispute payout tx for {} {}", trade.getClass().getSimpleName(), trade.getId());
signAndPublishDisputePayoutTx(trade);
Expand All @@ -284,14 +284,17 @@ private void handleDisputeClosedMessage(DisputeClosedMessage disputeClosedMessag
throw new RuntimeException("Failed to sign and publish dispute payout tx from arbitrator: " + e.getMessage() + ". TradeId = " + tradeId);
}
}
} else {
log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
}
} else {
if (trade.isPayoutPublished()) log.info("Dispute payout tx already published for {} {}", trade.getClass().getSimpleName(), trade.getId());
else if (disputeClosedMessage.getUnsignedPayoutTxHex() == null) log.info("{} did not receive unsigned dispute payout tx for trade {} because the arbitrator did not have their updated multisig info (can happen if trader went offline after trade started)", trade.getClass().getSimpleName(), trade.getId());
}

// complete disputed trade
if (trade.isPayoutPublished()) {
tradeManager.closeDisputedTrade(trade.getId(), Trade.DisputeState.DISPUTE_CLOSED);
}

// We use the chatMessage as we only persist those not the DisputeClosedMessage.
// If we would use the DisputeClosedMessage we could not lookup for the msg when we receive the AckMessage.
sendAckMessage(chatMessage, dispute.getAgentPubKeyRing(), true, null);
Expand Down
35 changes: 21 additions & 14 deletions core/src/main/java/haveno/core/trade/protocol/TradeProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public abstract class TradeProtocol implements DecryptedDirectMessageListener, D

private int reprocessPaymentReceivedMessageCount;

// set comparator for processing mailbox messages
static {
MailboxMessageService.setMailboxMessageComparator(new MailboxMessageComparator());
}

///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -245,12 +250,14 @@ protected void onInitialized() {
if (!trade.isCompleted()) processModel.getP2PService().addDecryptedDirectMessageListener(this);

// initialize trade
trade.initialize(processModel.getProvider());
synchronized (trade) {
trade.initialize(processModel.getProvider());

// process mailbox messages
MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService();
if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this);
handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages());
// process mailbox messages
MailboxMessageService mailboxMessageService = processModel.getP2PService().getMailboxMessageService();
if (!trade.isCompleted()) mailboxMessageService.addDecryptedMailboxListener(this);
handleMailboxCollection(mailboxMessageService.getMyDecryptedMailboxMessages());
}

// send deposits confirmed message if applicable
maybeSendDepositsConfirmedMessage();
Expand Down Expand Up @@ -477,18 +484,18 @@ protected void handle(PaymentSentMessage message, NodeAddress peer) {
handleTaskRunnerSuccess(peer, message);
return;
}
if (trade.getPayoutTx() != null) {
log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(peer, message, true, null);
removeMailboxMessageAfterProcessing(message);
return;
}
latchTrade();
expect(anyPhase(Trade.Phase.DEPOSITS_CONFIRMED, Trade.Phase.DEPOSITS_UNLOCKED)
.with(message)
.from(peer)
.preCondition(trade.getPayoutTx() == null,
() -> {
log.warn("We received a PaymentSentMessage but we have already created the payout tx " +
"so we ignore the message. This can happen if the ACK message to the peer did not " +
"arrive and the peer repeats sending us the message. We send another ACK msg.");
sendAckMessage(peer, message, true, null);
removeMailboxMessageAfterProcessing(message);
}))
.from(peer))
.setup(tasks(
ApplyFilter.class,
ProcessPaymentSentMessage.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class MailboxMessageService implements HashMapChangedListener, PersistedD
private boolean isBootstrapped;
private boolean allServicesInitialized;
private boolean initAfterBootstrapped;
private static Comparator<MailboxMessage> mailboxMessageComparator;

@Inject
public MailboxMessageService(NetworkNode networkNode,
Expand Down Expand Up @@ -397,7 +398,24 @@ public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries)
.forEach(this::removeMailboxItemFromLocalStore);
}

public static void setMailboxMessageComparator(Comparator<MailboxMessage> comparator) {
mailboxMessageComparator = comparator;
}

public static class DecryptedMessageWithPubKeyComparator implements Comparator<DecryptedMessageWithPubKey> {

@Override
public int compare(DecryptedMessageWithPubKey m1, DecryptedMessageWithPubKey m2) {
if (m1.getNetworkEnvelope() instanceof MailboxMessage) {
if (m2.getNetworkEnvelope() instanceof MailboxMessage) return mailboxMessageComparator.compare((MailboxMessage) m1.getNetworkEnvelope(), (MailboxMessage) m2.getNetworkEnvelope());
else return 1;
} else {
return m2.getNetworkEnvelope() instanceof MailboxMessage ? -1 : 0;
}
}
}


///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -429,7 +447,7 @@ private void threadedBatchProcessMailboxEntries(Collection<ProtectedMailboxStora

Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Set<MailboxItem> decryptedMailboxMessageWithEntries) {
UserThread.execute(() -> decryptedMailboxMessageWithEntries.forEach(e -> handleMailboxItem(e)));
new Thread(() -> handleMailboxItems(decryptedMailboxMessageWithEntries)).start();
}

public void onFailure(@NotNull Throwable throwable) {
Expand Down Expand Up @@ -471,6 +489,30 @@ private MailboxItem tryDecryptProtectedMailboxStorageEntry(ProtectedMailboxStora
return new MailboxItem(protectedMailboxStorageEntry, null);
}

private void handleMailboxItems(Set<MailboxItem> mailboxItems) {

// sort mailbox items
List<MailboxItem> mailboxItemsSorted = mailboxItems.stream()
.filter(e -> !e.isMine())
.collect(Collectors.toList());
mailboxItemsSorted.addAll(mailboxItems.stream()
.filter(e -> e.isMine())
.sorted(new MailboxItemComparator())
.collect(Collectors.toList()));

// handle mailbox items
mailboxItemsSorted.forEach(e -> handleMailboxItem(e));
}

private static class MailboxItemComparator implements Comparator<MailboxItem> {
private DecryptedMessageWithPubKeyComparator comparator = new DecryptedMessageWithPubKeyComparator();

@Override
public int compare(MailboxItem m1, MailboxItem m2) {
return comparator.compare(m1.getDecryptedMessageWithPubKey(), m2.getDecryptedMessageWithPubKey());
}
}

private void handleMailboxItem(MailboxItem mailboxItem) {
String uid = mailboxItem.getUid();
if (!mailboxItemsByUid.containsKey(uid)) {
Expand Down

0 comments on commit 50744e8

Please sign in to comment.