Skip to content

Commit

Permalink
fix unit test and refactor periodic timer task
Browse files Browse the repository at this point in the history
Signed-off-by: Usman Saleem <[email protected]>
  • Loading branch information
usmansaleem committed May 26, 2024
1 parent 31a121b commit 45d8faa
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public class DNSDaemon extends AbstractVerticle {
private final String enrLink;
private final long seq;
private final long period;
private final String dnsServer;
private long periodicTaskId;
private final Optional<DNSDaemonListener> listener;
private final DNSResolver dnsResolver;
private final Optional<String> dnsServer;
private Optional<Long> periodicTaskId = Optional.empty();
private DNSResolver dnsResolver;

/**
* Creates a new DNSDaemon.
Expand All @@ -58,38 +58,28 @@ public DNSDaemon(
this.listener = Optional.ofNullable(listener);
this.seq = seq;
this.period = period;
this.dnsServer = dnsServer;
dnsResolver = new DNSResolver(vertx, enrLink, seq, dnsServer);
}

/**
* Callback method to update the listeners with resolved enr records.
*
* @param records List of resolved Ethereum Node Records.
*/
private void updateRecords(final List<EthereumNodeRecord> records) {
listener.ifPresent(it -> it.newRecords(seq, records));
this.dnsServer = Optional.ofNullable(dnsServer);
}

/** Starts the DNSDaemon. */
@Override
public void start() {
LOG.info("Starting DNSDaemon for {}", enrLink);
// Use Vertx to run periodic task if period is set
this.dnsResolver = new DNSResolver(vertx, enrLink, seq, dnsServer);
if (period > 0) {
this.periodicTaskId = vertx.setPeriodic(period, this::refreshENRRecords);
periodicTaskId = Optional.of(vertx.setPeriodic(period, this::refreshENRRecords));
} else {
// do one-shot resolution
refreshENRRecords(0L);
}
}

/** Stops the DNSDaemon. */
@Override
public void stop() {
if (period > 0) {
vertx.cancelTimer(this.periodicTaskId);
} // otherwise we didn't start the timer
// TODO: Call dnsResolver stop
LOG.info("Stopping DNSDaemon for {}", enrLink);
periodicTaskId.ifPresent(vertx::cancelTimer);
dnsResolver.close();
}

/**
Expand All @@ -103,6 +93,6 @@ void refreshENRRecords(final Long taskId) {
final List<EthereumNodeRecord> ethereumNodeRecords = dnsResolver.collectAll();
final long endTime = System.nanoTime();
LOG.debug("Time taken to DNSResolver.collectAll: {} ms", (endTime - startTime) / 1_000_000);
updateRecords(ethereumNodeRecords);
listener.ifPresent(it -> it.newRecords(seq, ethereumNodeRecords));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

// Adapted from https://github.com/tmio/tuweni and licensed under Apache 2.0
/** Resolves a set of ENR nodes from a host name. */
public class DNSResolver {
public class DNSResolver implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DNSResolver.class);
private final ExecutorService rawTxtRecordsExecutor = Executors.newSingleThreadExecutor();
private final String enrLink;
Expand All @@ -55,17 +55,15 @@ public class DNSResolver {
* @param enrLink the ENR link to start with, of the form enrtree://PUBKEY@domain
* @param seq the sequence number of the root record. If the root record seq is higher, proceed
* with visit.
* @param dnsServer the DNS server to use for DNS query. If null, the default DNS server will be
* @param dnsServer the DNS server to use for DNS query. If empty, the default DNS server will be
* used.
*/
public DNSResolver(
final Vertx vertx, final String enrLink, final long seq, final String dnsServer) {
final Vertx vertx, final String enrLink, final long seq, final Optional<String> dnsServer) {
this.enrLink = enrLink;
this.seq = seq;
final DnsClientOptions dnsClientOptions = new DnsClientOptions();
if (dnsServer != null) {
dnsClientOptions.setHost(dnsServer);
}
dnsServer.ifPresent(dnsClientOptions::setHost);
dnsClient = vertx.createDnsClient(dnsClientOptions);
}

Expand Down Expand Up @@ -204,4 +202,9 @@ private boolean checkSignature(
Hash.keccak256(Bytes.wrap(root.signedContent().getBytes(StandardCharsets.UTF_8)));
return SECP256K1.verifyHashed(hash, sig, pubKey);
}

@Override
public void close() {
rawTxtRecordsExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,18 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.ThreadingModel;
import io.vertx.core.Vertx;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.devp2p.EthereumNodeRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -149,7 +152,8 @@ public class DefaultP2PNetwork implements P2PNetwork {
private final CountDownLatch shutdownLatch = new CountDownLatch(2);
private final Duration shutdownTimeout = Duration.ofSeconds(15);
private final Vertx vertx;
private DNSDaemon dnsDaemon;
private final AtomicReference<Optional<Pair<String, DNSDaemon>>> dnsDaemonRef =
new AtomicReference<>(Optional.empty());

/**
* Creates a peer networking service for production purposes.
Expand Down Expand Up @@ -229,19 +233,25 @@ public void start() {
LOG.info(
"Starting DNS discovery with DNS Server override {}", dnsServer));

dnsDaemon =
final DNSDaemon dnsDaemon =
new DNSDaemon(
disco,
createDaemonListener(),
0L,
600000L,
config.getDnsDiscoveryServerOverride().orElse(null));
// dnsDaemon.start();

// TODO: Java 21, we can move to Virtual Thread model
final DeploymentOptions options =
new DeploymentOptions()
.setThreadingModel(ThreadingModel.WORKER)
.setInstances(1)
.setWorkerPoolSize(1);
vertx.deployVerticle(dnsDaemon, options);

final Future<String> deployId = vertx.deployVerticle(dnsDaemon, options);
final String dnsDaemonDeployId =
deployId.toCompletionStage().toCompletableFuture().join();
dnsDaemonRef.set(Optional.of(Pair.of(dnsDaemonDeployId, dnsDaemon)));
});

final int listeningPort = rlpxAgent.start().join();
Expand Down Expand Up @@ -288,8 +298,9 @@ public void stop() {
return;
}

// this will close the timer, but won't undeploy the vertical. vertx.stop should do that.
getDnsDaemon().ifPresent(DNSDaemon::stop);
// since dnsDaemon is a vertx verticle, vertx.close will undeploy it.
// However, we can safely call stop as well.
dnsDaemonRef.get().map(Pair::getRight).ifPresent(DNSDaemon::stop);

peerConnectionScheduler.shutdownNow();
peerDiscoveryAgent.stop().whenComplete((res, err) -> shutdownLatch.countDown());
Expand Down Expand Up @@ -346,7 +357,7 @@ public boolean removeMaintainedConnectionPeer(final Peer peer) {

@VisibleForTesting
Optional<DNSDaemon> getDnsDaemon() {
return Optional.ofNullable(dnsDaemon);
return dnsDaemonRef.get().map(Pair::getRight);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
import java.util.stream.Stream;

import io.vertx.core.Vertx;
import io.vertx.core.dns.DnsClient;
import io.vertx.core.impl.ContextInternal;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.crypto.SECP256K1;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -335,16 +333,21 @@ public void shouldStartDnsDiscoveryWhenDnsURLIsConfigured() {
final NetworkingConfiguration dnsConfig =
when(spy(config).getDiscovery()).thenReturn(disco).getMock();

Vertx vertx = mock(Vertx.class);
when(vertx.createDnsClient(any())).thenReturn(mock(DnsClient.class));
when(vertx.getOrCreateContext()).thenReturn(mock(ContextInternal.class));
final Vertx vertx = Vertx.vertx(); // use real instance

// spy on DefaultP2PNetwork
final DefaultP2PNetwork testClass =
(DefaultP2PNetwork) builder().vertx(vertx).config(dnsConfig).build();

testClass.start();
assertThat(testClass.getDnsDaemon()).isPresent();
try {
// the actual lookup won't work because of mock discovery url, however, a valid DNSDaemon
// should be created.
assertThat(testClass.getDnsDaemon()).isPresent();
} finally {
testClass.stop();
vertx.close();
}
}

@Test
Expand All @@ -358,17 +361,19 @@ public void shouldUseDnsServerOverrideIfPresent() {
doReturn(disco).when(dnsConfig).getDiscovery();
doReturn(Optional.of("localhost")).when(dnsConfig).getDnsDiscoveryServerOverride();

Vertx vertx = mock(Vertx.class);
when(vertx.createDnsClient(any())).thenReturn(mock(DnsClient.class));
when(vertx.getOrCreateContext()).thenReturn(mock(ContextInternal.class));

Vertx vertx = Vertx.vertx(); // use real instance
final DefaultP2PNetwork testClass =
(DefaultP2PNetwork) builder().config(dnsConfig).vertx(vertx).build();
testClass.start();

// ensure we used the dns server override config when building DNSDaemon:
assertThat(testClass.getDnsDaemon()).isPresent();
verify(dnsConfig, times(2)).getDnsDiscoveryServerOverride();
try {
assertThat(testClass.getDnsDaemon()).isPresent();
verify(dnsConfig, times(2)).getDnsDiscoveryServerOverride();
} finally {
testClass.stop();
vertx.close();
}
}

private DefaultP2PNetwork network() {
Expand Down

0 comments on commit 45d8faa

Please sign in to comment.