Skip to content

Commit

Permalink
move processing off UserThread for smoother experience
Browse files Browse the repository at this point in the history
  • Loading branch information
woodser committed Dec 17, 2023
1 parent 7700caa commit d999d5f
Show file tree
Hide file tree
Showing 16 changed files with 279 additions and 218 deletions.
14 changes: 14 additions & 0 deletions common/src/main/java/haveno/common/UserThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -59,6 +60,19 @@ public static void execute(Runnable command) {
UserThread.executor.execute(command);
}

public static void await(Runnable command) {
CountDownLatch latch = new CountDownLatch(1);
executor.execute(() -> {
command.run();
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

// Prefer FxTimer if a delay is needed in a JavaFx class (gui module)
public static Timer runAfterRandomDelay(Runnable runnable, long minDelayInSec, long maxDelayInSec) {
return UserThread.runAfterRandomDelay(runnable, minDelayInSec, maxDelayInSec, TimeUnit.SECONDS);
Expand Down
72 changes: 48 additions & 24 deletions core/src/main/java/haveno/core/api/XmrConnectionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javafx.beans.property.ObjectProperty;
import javafx.beans.property.ReadOnlyDoubleProperty;
import javafx.beans.property.ReadOnlyIntegerProperty;
import javafx.beans.property.ReadOnlyLongProperty;
import javafx.beans.property.ReadOnlyObjectProperty;
import javafx.beans.property.SimpleIntegerProperty;
import javafx.beans.property.SimpleLongProperty;
Expand Down Expand Up @@ -61,9 +62,11 @@ public final class XmrConnectionService {
private final MoneroConnectionManager connectionManager;
private final EncryptedConnectionList connectionList;
private final ObjectProperty<List<MoneroPeer>> peers = new SimpleObjectProperty<>();
private final ObjectProperty<MoneroRpcConnection> connectionProperty = new SimpleObjectProperty<>();
private final IntegerProperty numPeers = new SimpleIntegerProperty(0);
private final LongProperty chainHeight = new SimpleLongProperty(0);
private final DownloadListener downloadListener = new DownloadListener();
private final LongProperty numUpdates = new SimpleLongProperty(0);
private Socks5ProxyProvider socks5ProxyProvider;

private boolean isInitialized;
Expand Down Expand Up @@ -286,6 +289,10 @@ public ReadOnlyObjectProperty<List<MoneroPeer>> peerConnectionsProperty() {
return peers;
}

public ReadOnlyObjectProperty<MoneroRpcConnection> connectionProperty() {
return connectionProperty;
}

public boolean hasSufficientPeersForBroadcast() {
return numPeers.get() >= getMinBroadcastConnections();
}
Expand All @@ -306,6 +313,10 @@ public boolean isDownloadComplete() {
return downloadPercentageProperty().get() == 1d;
}

public ReadOnlyLongProperty numUpdatesProperty() {
return numUpdates;
}

// ------------------------------- HELPERS --------------------------------

private void doneDownload() {
Expand Down Expand Up @@ -517,6 +528,12 @@ private void onConnectionChanged(MoneroRpcConnection currentConnection) {
connectionList.addConnection(currentConnection);
connectionList.setCurrentConnectionUri(currentConnection.getUri());
}

// set connection property on user thread
UserThread.execute(() -> {
connectionProperty.set(currentConnection);
numUpdates.set(numUpdates.get() + 1);
});
}
updatePolling();

Expand Down Expand Up @@ -564,31 +581,38 @@ private void pollDaemonInfo() {
if (daemon == null) throw new RuntimeException("No daemon connection");
lastInfo = daemon.getInfo();

// set chain height
chainHeight.set(lastInfo.getHeight());

// update sync progress
boolean isTestnet = Config.baseCurrencyNetwork() == BaseCurrencyNetwork.XMR_LOCAL;
if (lastInfo.isSynchronized() || isTestnet) doneDownload(); // TODO: skipping synchronized check for testnet because tests cannot sync 3rd local node, see "Can manage Monero daemon connections"
else if (lastInfo.isBusySyncing()) {
long targetHeight = lastInfo.getTargetHeight();
long blocksLeft = targetHeight - lastInfo.getHeight();
if (syncStartHeight == null) syncStartHeight = lastInfo.getHeight();
double percent = targetHeight == syncStartHeight ? 1.0 : ((double) Math.max(1, lastInfo.getHeight() - syncStartHeight) / (double) (targetHeight - syncStartHeight)) * 100d; // grant at least 1 block to show progress
downloadListener.progress(percent, blocksLeft, null);
}
// update properties on user thread
UserThread.execute(() -> {

// set chain height
chainHeight.set(lastInfo.getHeight());

// update sync progress
boolean isTestnet = Config.baseCurrencyNetwork() == BaseCurrencyNetwork.XMR_LOCAL;
if (lastInfo.isSynchronized() || isTestnet) doneDownload(); // TODO: skipping synchronized check for testnet because tests cannot sync 3rd local node, see "Can manage Monero daemon connections"
else if (lastInfo.isBusySyncing()) {
long targetHeight = lastInfo.getTargetHeight();
long blocksLeft = targetHeight - lastInfo.getHeight();
if (syncStartHeight == null) syncStartHeight = lastInfo.getHeight();
double percent = targetHeight == syncStartHeight ? 1.0 : ((double) Math.max(1, lastInfo.getHeight() - syncStartHeight) / (double) (targetHeight - syncStartHeight)) * 100d; // grant at least 1 block to show progress
downloadListener.progress(percent, blocksLeft, null);
}

// set peer connections
// TODO: peers often uknown due to restricted RPC call, skipping call to get peer connections
// try {
// peers.set(getOnlinePeers());
// } catch (Exception err) {
// // TODO: peers unknown due to restricted RPC call
// }
// numPeers.set(peers.get().size());
numPeers.set(lastInfo.getNumOutgoingConnections() + lastInfo.getNumIncomingConnections());
peers.set(new ArrayList<MoneroPeer>());

// notify update
numUpdates.set(numUpdates.get() + 1);
});

// set peer connections
// TODO: peers often uknown due to restricted RPC call, skipping call to get peer connections
// try {
// peers.set(getOnlinePeers());
// } catch (Exception err) {
// // TODO: peers unknown due to restricted RPC call
// }
// numPeers.set(peers.get().size());
numPeers.set(lastInfo.getNumOutgoingConnections() + lastInfo.getNumIncomingConnections());
peers.set(new ArrayList<MoneroPeer>());

// handle error recovery
if (lastErrorTimestamp != null) {
log.info("Successfully fetched daemon info after previous error");
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/java/haveno/core/app/HavenoExecutable.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
import haveno.core.setup.CoreSetup;
import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.TradeManager;
import haveno.core.trade.statistics.TradeStatisticsManager;
import haveno.core.xmr.setup.WalletsSetup;
import haveno.core.xmr.wallet.BtcWalletService;
import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.P2PService;
import haveno.network.p2p.network.Connection;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -337,7 +339,12 @@ public void gracefulShutDown(ResultHandler onShutdown, boolean systemExit) {
Set<Runnable> tasks = new HashSet<Runnable>();
tasks.add(() -> injector.getInstance(XmrWalletService.class).onShutDownStarted());
tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted());
HavenoUtils.executeTasks(tasks); // notify in parallel
tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted());
try {
HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
} catch (Exception e) {
e.printStackTrace();
}

injector.getInstance(PriceFeedService.class).shutDown();
injector.getInstance(ArbitratorManager.class).shutDown();
Expand All @@ -357,6 +364,10 @@ public void gracefulShutDown(ResultHandler onShutdown, boolean systemExit) {

// shut down monero wallets and connections
injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> {
log.info("Shutting down connections");
Connection.shutDownExecutor(30);

// done shutting down
log.info("Graceful shutdown completed. Exiting now.");
module.close(injector);
completeShutdown(resultHandler, EXIT_SUCCESS, systemExit);
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/java/haveno/core/app/WalletAppSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ void init(@Nullable Consumer<String> chainFileLockedExceptionHandler,
log.info("Initialize WalletAppSetup with monero-java version {}", MoneroUtils.getVersion());

ObjectProperty<Throwable> walletServiceException = new SimpleObjectProperty<>();
xmrInfoBinding = EasyBind.combine(xmrConnectionService.downloadPercentageProperty(),
xmrConnectionService.chainHeightProperty(),
xmrInfoBinding = EasyBind.combine(
xmrConnectionService.numUpdatesProperty(), // receives notification of any connection update
xmrWalletService.downloadPercentageProperty(),
xmrWalletService.walletHeightProperty(),
walletServiceException,
getWalletServiceErrorMsg(),
(chainDownloadPercentage, chainHeight, walletDownloadPercentage, walletHeight, exception, errorMsg) -> {
(numConnectionUpdates, walletDownloadPercentage, walletHeight, exception, errorMsg) -> {
String result;
if (exception == null && errorMsg == null) {

Expand All @@ -137,9 +137,9 @@ void init(@Nullable Consumer<String> chainFileLockedExceptionHandler,
} else {

// update daemon sync progress
double chainDownloadPercentageD = (double) chainDownloadPercentage;
double chainDownloadPercentageD = xmrConnectionService.downloadPercentageProperty().doubleValue();
xmrDaemonSyncProgress.set(chainDownloadPercentageD);
Long bestChainHeight = chainHeight == null ? null : (Long) chainHeight;
Long bestChainHeight = xmrConnectionService.chainHeightProperty().get();
String chainHeightAsString = bestChainHeight != null && bestChainHeight > 0 ? String.valueOf(bestChainHeight) : "";
if (chainDownloadPercentageD == 1) {
String synchronizedWith = Res.get("mainView.footer.xmrInfo.connectedTo", getXmrDaemonNetworkAsString(), chainHeightAsString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import haveno.core.offer.OpenOfferManager;
import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
import haveno.core.trade.HavenoUtils;
import haveno.core.trade.TradeManager;
import haveno.core.xmr.setup.WalletsSetup;
import haveno.core.xmr.wallet.BtcWalletService;
import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.P2PService;
import haveno.network.p2p.network.Connection;
import haveno.network.p2p.seed.SeedNodeRepository;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -93,11 +95,16 @@ public void gracefulShutDown(ResultHandler resultHandler) {
try {
if (injector != null) {

// notify trade protocols and wallets to prepare for shut down before shutting down
// notify trade protocols and wallets to prepare for shut down
Set<Runnable> tasks = new HashSet<Runnable>();
tasks.add(() -> injector.getInstance(XmrWalletService.class).onShutDownStarted());
tasks.add(() -> injector.getInstance(XmrConnectionService.class).onShutDownStarted());
HavenoUtils.executeTasks(tasks); // notify in parallel
tasks.add(() -> injector.getInstance(TradeManager.class).onShutDownStarted());
try {
HavenoUtils.awaitTasks(tasks, tasks.size(), 120l); // run in parallel with timeout
} catch (Exception e) {
e.printStackTrace();
}

JsonFileManager.shutDownAllInstances();
injector.getInstance(ArbitratorManager.class).shutDown();
Expand All @@ -117,8 +124,12 @@ public void gracefulShutDown(ResultHandler resultHandler) {
injector.getInstance(WalletsSetup.class).shutDownComplete.addListener((ov, o, n) -> {
module.close(injector);
PersistenceManager.flushAllDataToDiskAtShutdown(() -> {
resultHandler.handleResult();
log.info("Shutting down connections");
Connection.shutDownExecutor(30);

// done shutting down
log.info("Graceful shutdown completed. Exiting now.");
resultHandler.handleResult();
UserThread.runAfter(() -> System.exit(HavenoExecutable.EXIT_SUCCESS), 1);
});
});
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/java/haveno/core/offer/OfferFilterService.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,12 @@ public boolean hasValidSignature(Offer offer) {
Arbitrator thisArbitrator = user.getRegisteredArbitrator();
if (thisArbitrator != null && thisArbitrator.getNodeAddress().equals(offer.getOfferPayload().getArbitratorSigner())) {
if (thisArbitrator.getNodeAddress().equals(p2PService.getNetworkNode().getNodeAddress())) arbitrator = thisArbitrator; // TODO: unnecessary to compare arbitrator and p2pservice address?
} else {

// otherwise log warning that arbitrator is unregistered
List<NodeAddress> arbitratorAddresses = user.getAcceptedArbitrators().stream().map(Arbitrator::getNodeAddress).collect(Collectors.toList());
log.warn("No arbitrator is registered with offer's signer. offerId={}, arbitrator signer={}, accepted arbitrators={}", offer.getId(), offer.getOfferPayload().getArbitratorSigner(), arbitratorAddresses);
}

// otherwise log warning
List<NodeAddress> arbitratorAddresses = user.getAcceptedArbitrators().stream().map(Arbitrator::getNodeAddress).collect(Collectors.toList());
log.warn("No arbitrator is registered with offer's signer. offerId={}, arbitrator signer={}, accepted arbitrators={}", offer.getId(), offer.getOfferPayload().getArbitratorSigner(), arbitratorAddresses);
}

if (arbitrator == null) return false; // invalid arbitrator
Expand Down
Loading

0 comments on commit d999d5f

Please sign in to comment.