Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split up ClusterMediaDriver and ClusteredService #26

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions aeron-cluster-poc-examples/scripts/cluster-tool.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env bash

cd $(dirname $0)
cd ../

JAR_FILE=$(ls target |grep jar)

echo $JAR_FILE

java \
-cp target/${JAR_FILE}:target/lib/* \
${JVM_OPTS} io.aeron.cluster.ClusterTool "$@"
28 changes: 28 additions & 0 deletions aeron-cluster-poc-examples/scripts/clustered-media-driver-0.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env bash

cd $(dirname $0)
cd ../

JAR_FILE=$(ls target |grep jar)

echo $JAR_FILE

java \
-cp target/${JAR_FILE}:target/lib/* \
-Daeron.archive.control.channel="aeron:udp?term-length=64k|endpoint=localhost:8010" \
-Daeron.archive.control.stream.id="100" \
-Daeron.archive.control.response.channel="aeron:udp?term-length=64k|endpoint=localhost:8020" \
-Daeron.archive.control.response.stream.id="100" \
-Daeron.archive.recording.events.channel="aeron:udp?control-mode=dynamic|control=localhost:8030" \
-Daeron.archive.local.control.channel="aeron:ipc?term-length=64k" \
-Daeron.cluster.member.id="0" \
-Daeron.cluster.members="0,localhost:20110,localhost:20220,localhost:20330,localhost:20440,localhost:8010|1,localhost:20111,localhost:20221,localhost:20331,localhost:20441,localhost:8011|2,localhost:20112,localhost:20222,localhost:20332,localhost:20442,localhost:8012" \
-Daeron.cluster.ingress.channel="aeron:udp?term-length=64k" \
-Daeron.cluster.log.channel="aeron:udp?term-length=256k|control-mode=manual|control=localhost:20550" \
-Dio.scalecube.acpoc.instanceId=n0 \
-Dio.scalecube.acpoc.cleanStart=false \
-Dio.scalecube.acpoc.cleanShutdown=false \
-Daeron.cluster.session.timeout=30000000000 \
-Daeron.cluster.leader.heartbeat.timeout=2000000000 \
-Dio.scalecube.acpoc.volume=target2 \
${JVM_OPTS} io.scalecube.acpoc.ClusteredMediaDriverRunner
28 changes: 28 additions & 0 deletions aeron-cluster-poc-examples/scripts/clustered-media-driver-1.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env bash

cd $(dirname $0)
cd ../

JAR_FILE=$(ls target |grep jar)

echo $JAR_FILE

java \
-cp target/${JAR_FILE}:target/lib/* \
-Daeron.archive.control.channel="aeron:udp?term-length=64k|endpoint=localhost:8011" \
-Daeron.archive.control.stream.id="100" \
-Daeron.archive.control.response.channel="aeron:udp?term-length=64k|endpoint=localhost:8021" \
-Daeron.archive.control.response.stream.id="101" \
-Daeron.archive.recording.events.channel="aeron:udp?control-mode=dynamic|control=localhost:8031" \
-Daeron.archive.local.control.channel="aeron:ipc?term-length=64k" \
-Daeron.cluster.member.id="1" \
-Daeron.cluster.members="0,localhost:20110,localhost:20220,localhost:20330,localhost:20440,localhost:8010|1,localhost:20111,localhost:20221,localhost:20331,localhost:20441,localhost:8011|2,localhost:20112,localhost:20222,localhost:20332,localhost:20442,localhost:8012" \
-Daeron.cluster.ingress.channel="aeron:udp?term-length=64k" \
-Daeron.cluster.log.channel="aeron:udp?term-length=256k|control-mode=manual|control=localhost:20551" \
-Dio.scalecube.acpoc.instanceId=n1 \
-Dio.scalecube.acpoc.cleanStart=false \
-Dio.scalecube.acpoc.cleanShutdown=false \
-Daeron.cluster.session.timeout=30000000000 \
-Daeron.cluster.leader.heartbeat.timeout=2000000000 \
-Dio.scalecube.acpoc.volume=target2 \
${JVM_OPTS} io.scalecube.acpoc.ClusteredMediaDriverRunner
28 changes: 28 additions & 0 deletions aeron-cluster-poc-examples/scripts/clustered-media-driver-2.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env bash

cd $(dirname $0)
cd ../

JAR_FILE=$(ls target |grep jar)

echo $JAR_FILE

java \
-cp target/${JAR_FILE}:target/lib/* \
-Daeron.archive.control.channel="aeron:udp?term-length=64k|endpoint=localhost:8012" \
-Daeron.archive.control.stream.id="100" \
-Daeron.archive.control.response.channel="aeron:udp?term-length=64k|endpoint=localhost:8022" \
-Daeron.archive.control.response.stream.id="102" \
-Daeron.archive.recording.events.channel="aeron:udp?control-mode=dynamic|control=localhost:8032" \
-Daeron.archive.local.control.channel="aeron:ipc?term-length=64k" \
-Daeron.cluster.member.id="2" \
-Daeron.cluster.members="0,localhost:20110,localhost:20220,localhost:20330,localhost:20440,localhost:8010|1,localhost:20111,localhost:20221,localhost:20331,localhost:20441,localhost:8011|2,localhost:20112,localhost:20222,localhost:20332,localhost:20442,localhost:8012" \
-Daeron.cluster.ingress.channel="aeron:udp?term-length=64k" \
-Daeron.cluster.log.channel="aeron:udp?term-length=256k|control-mode=manual|control=localhost:20552" \
-Dio.scalecube.acpoc.instanceId=n2 \
-Dio.scalecube.acpoc.cleanStart=false \
-Dio.scalecube.acpoc.cleanShutdown=false \
-Daeron.cluster.session.timeout=30000000000 \
-Daeron.cluster.leader.heartbeat.timeout=2000000000 \
-Dio.scalecube.acpoc.volume=target2 \
${JVM_OPTS} io.scalecube.acpoc.ClusteredMediaDriverRunner
4 changes: 3 additions & 1 deletion aeron-cluster-poc-examples/scripts/node-0.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ java \
-Dio.scalecube.acpoc.snapshotPeriodSecs=99999 \
-Daeron.cluster.session.timeout=30000000000 \
-Daeron.cluster.leader.heartbeat.timeout=2000000000 \
${JVM_OPTS} io.scalecube.acpoc.ClusteredServiceRunner
-Dio.scalecube.acpoc.clusteredMediaDriverEmbedded=false \
-Dio.scalecube.acpoc.volume=target2 \
${JVM_OPTS} io.scalecube.acpoc.ClusteredServiceRunner
4 changes: 3 additions & 1 deletion aeron-cluster-poc-examples/scripts/node-1.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ java \
-Dio.scalecube.acpoc.snapshotPeriodSecs=99999 \
-Daeron.cluster.session.timeout=30000000000 \
-Daeron.cluster.leader.heartbeat.timeout=2000000000 \
${JVM_OPTS} io.scalecube.acpoc.ClusteredServiceRunner
-Dio.scalecube.acpoc.clusteredMediaDriverEmbedded=false \
-Dio.scalecube.acpoc.volume=target2 \
${JVM_OPTS} io.scalecube.acpoc.ClusteredServiceRunner
4 changes: 3 additions & 1 deletion aeron-cluster-poc-examples/scripts/node-2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ java \
-Dio.scalecube.acpoc.snapshotPeriodSecs=99999 \
-Daeron.cluster.session.timeout=30000000000 \
-Daeron.cluster.leader.heartbeat.timeout=2000000000 \
${JVM_OPTS} io.scalecube.acpoc.ClusteredServiceRunner
-Dio.scalecube.acpoc.clusteredMediaDriverEmbedded=false \
-Dio.scalecube.acpoc.volume=target2 \
${JVM_OPTS} io.scalecube.acpoc.ClusteredServiceRunner
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.scalecube.acpoc;

import io.aeron.archive.Archive;
import io.aeron.archive.ArchiveThreadingMode;
import io.aeron.archive.client.AeronArchive;
import io.aeron.cluster.ClusteredMediaDriver;
import io.aeron.cluster.ConsensusModule;
import io.aeron.cluster.ConsensusModule.Configuration;
import io.aeron.driver.DefaultAllowTerminationValidator;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.MinMulticastFlowControlSupplier;
import io.aeron.driver.ThreadingMode;
import java.io.File;
import java.nio.file.Paths;
import org.agrona.CloseHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/**
* Main class that starts single node in cluster, though expecting most of cluster configuration
* passed via VM args.
*/
public class ClusteredMediaDriverRunner {

private static final Logger logger = LoggerFactory.getLogger(ClusteredMediaDriverRunner.class);

/**
* Main function runner.
*
* @param args arguments
*/
public static void main(String[] args) {
String clusterMemberId = Integer.toHexString(Configuration.clusterMemberId());
String nodeId = "node-" + clusterMemberId + "-" + Utils.instanceId();
String volumeDir = Paths.get(Configurations.VOLUME_DIR, "aeron", "cluster", nodeId).toString();
String aeronDirectoryName = Paths.get(volumeDir, "media").toString();

System.out.println("Volume directory: " + volumeDir);
System.out.println("Aeron directory: " + aeronDirectoryName);

ClusteredMediaDriver clusteredMediaDriver =
launchClusteredMediaDriver(aeronDirectoryName, volumeDir);

Mono<Void> onShutdown =
Utils.onShutdown(
() -> {
CloseHelper.close(clusteredMediaDriver);
return null;
});
onShutdown.block();
}

static ClusteredMediaDriver launchClusteredMediaDriver(
String aeronDirectoryName, String volumeDir) {
AeronArchive.Context aeronArchiveContext =
new AeronArchive.Context().aeronDirectoryName(aeronDirectoryName);

MediaDriver.Context mediaDriverContext =
new MediaDriver.Context()
.errorHandler(ex -> logger.error("Exception occurred at MediaDriver: ", ex))
.terminationHook(() -> logger.info("TerminationHook called on MediaDriver "))
.terminationValidator(new DefaultAllowTerminationValidator())
.threadingMode(ThreadingMode.SHARED)
.warnIfDirectoryExists(true)
.dirDeleteOnStart(true)
.aeronDirectoryName(aeronDirectoryName)
.multicastFlowControlSupplier(new MinMulticastFlowControlSupplier());

Archive.Context archiveContext =
new Archive.Context()
.errorHandler(ex -> logger.error("Exception occurred at Archive: ", ex))
.maxCatalogEntries(Configurations.MAX_CATALOG_ENTRIES)
.aeronDirectoryName(aeronDirectoryName)
.archiveDir(new File(volumeDir, "archive"))
.controlChannel(aeronArchiveContext.controlRequestChannel())
.controlStreamId(aeronArchiveContext.controlRequestStreamId())
.localControlStreamId(aeronArchiveContext.controlRequestStreamId())
.recordingEventsChannel(aeronArchiveContext.recordingEventsChannel())
.threadingMode(ArchiveThreadingMode.SHARED);

ConsensusModule.Context consensusModuleContext =
new ConsensusModule.Context()
.errorHandler(ex -> logger.error("Exception occurred at ConsensusModule: ", ex))
.terminationHook(() -> logger.info("TerminationHook called on ConsensusModule"))
.aeronDirectoryName(aeronDirectoryName)
.clusterDir(new File(volumeDir, "consensus"))
.archiveContext(aeronArchiveContext.clone());

return ClusteredMediaDriver.launch(mediaDriverContext, archiveContext, consensusModuleContext);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.acpoc;

import static java.nio.charset.StandardCharsets.US_ASCII;

import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.cluster.ClusterControl;
Expand All @@ -17,6 +19,7 @@
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;
import org.agrona.concurrent.status.CountersReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,17 +33,15 @@ public class ClusteredServiceImpl implements ClusteredService {
public static final String TIMER_2_COMMAND = "SCHEDULE_TIMER_2";
public static final String SNAPSHOT_COMMAND = "SNAPSHOT";

private final CountersManager countersManager;
private CountersManager countersManager;

private Cluster cluster;

// State

private final AtomicInteger serviceCounter = new AtomicInteger();

public ClusteredServiceImpl(CountersManager countersManager) {
this.countersManager = countersManager;
}
public ClusteredServiceImpl() {}

@Override
public void onStart(Cluster cluster, Image snapshotImage) {
Expand All @@ -53,6 +54,11 @@ public void onStart(Cluster cluster, Image snapshotImage) {
if (snapshotImage != null) {
onLoadSnapshot(snapshotImage);
}

CountersReader countersReader = cluster.context().aeron().countersReader();
countersManager =
new CountersManager(
countersReader.metaDataBuffer(), countersReader.valuesBuffer(), US_ASCII);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
package io.scalecube.acpoc;

import io.aeron.archive.Archive;
import io.aeron.archive.ArchiveThreadingMode;
import io.aeron.archive.client.AeronArchive;
import io.aeron.cluster.ClusteredMediaDriver;
import io.aeron.cluster.ConsensusModule;
import io.aeron.cluster.ConsensusModule.Configuration;
import io.aeron.cluster.service.ClusteredService;
import io.aeron.cluster.service.ClusteredServiceContainer;
import io.aeron.driver.DefaultAllowTerminationValidator;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.MinMulticastFlowControlSupplier;
import io.aeron.driver.ThreadingMode;
import java.io.File;
import java.nio.file.Paths;
import org.agrona.CloseHelper;
Expand All @@ -35,58 +28,25 @@ public class ClusteredServiceRunner {
public static void main(String[] args) {
String clusterMemberId = Integer.toHexString(Configuration.clusterMemberId());
String nodeId = "node-" + clusterMemberId + "-" + Utils.instanceId();
String nodeDirName = Paths.get("target", "aeron", "cluster", nodeId).toString();
String volumeDir = Paths.get(Configurations.VOLUME_DIR, "aeron", "cluster", nodeId).toString();
String aeronDirectoryName = Paths.get(volumeDir, "media").toString();

System.out.println("Cluster node directory: " + nodeDirName);

String aeronDirectoryName = Paths.get(nodeDirName, "media").toString();

AeronArchive.Context aeronArchiveContext =
new AeronArchive.Context().aeronDirectoryName(aeronDirectoryName);

MediaDriver.Context mediaDriverContext =
new MediaDriver.Context()
.errorHandler(ex -> logger.error("Exception occurred at MediaDriver: ", ex))
.terminationHook(() -> logger.info("TerminationHook called on MediaDriver "))
.terminationValidator(new DefaultAllowTerminationValidator())
.threadingMode(ThreadingMode.SHARED)
.warnIfDirectoryExists(true)
.dirDeleteOnStart(true)
.aeronDirectoryName(aeronDirectoryName)
.multicastFlowControlSupplier(new MinMulticastFlowControlSupplier());

Archive.Context archiveContext =
new Archive.Context()
.errorHandler(ex -> logger.error("Exception occurred at Archive: ", ex))
.maxCatalogEntries(Configurations.MAX_CATALOG_ENTRIES)
.aeronDirectoryName(aeronDirectoryName)
.archiveDir(new File(nodeDirName, "archive"))
.controlChannel(aeronArchiveContext.controlRequestChannel())
.controlStreamId(aeronArchiveContext.controlRequestStreamId())
.localControlStreamId(aeronArchiveContext.controlRequestStreamId())
.recordingEventsChannel(aeronArchiveContext.recordingEventsChannel())
.threadingMode(ArchiveThreadingMode.SHARED);

ConsensusModule.Context consensusModuleContext =
new ConsensusModule.Context()
.errorHandler(ex -> logger.error("Exception occurred at ConsensusModule: ", ex))
.terminationHook(() -> logger.info("TerminationHook called on ConsensusModule"))
.aeronDirectoryName(aeronDirectoryName)
.clusterDir(new File(nodeDirName, "consensus"))
.archiveContext(aeronArchiveContext.clone());
System.out.println("Volume directory: " + volumeDir);
System.out.println("Aeron directory: " + aeronDirectoryName);

ClusteredMediaDriver clusteredMediaDriver =
ClusteredMediaDriver.launch(mediaDriverContext, archiveContext, consensusModuleContext);
Configurations.CLUSTERED_MEDIA_DRIVER_EMBEDDED
? ClusteredMediaDriverRunner.launchClusteredMediaDriver(aeronDirectoryName, volumeDir)
: null;

ClusteredService clusteredService =
new ClusteredServiceImpl(clusteredMediaDriver.mediaDriver().context().countersManager());
ClusteredService clusteredService = new ClusteredServiceImpl();

ClusteredServiceContainer.Context clusteredServiceCtx =
new ClusteredServiceContainer.Context()
.errorHandler(ex -> logger.error("Exception occurred: " + ex, ex))
.aeronDirectoryName(aeronDirectoryName)
.archiveContext(aeronArchiveContext.clone())
.clusterDir(new File(nodeDirName, "service"))
.archiveContext(new AeronArchive.Context().aeronDirectoryName(aeronDirectoryName))
.clusterDir(new File(volumeDir, "service"))
.clusteredService(clusteredService);

ClusteredServiceContainer clusteredServiceContainer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public class Configurations {
public static final boolean CLEAN_SHUTDOWN =
Boolean.getBoolean("io.scalecube.acpoc.cleanShutdown");

public static final boolean CLUSTERED_MEDIA_DRIVER_EMBEDDED =
Boolean.getBoolean("io.scalecube.acpoc.clusteredMediaDriverEmbedded");

public static final String VOLUME_DIR = System.getProperty("io.scalecube.acpoc.volume", "target");

private Configurations() {
// no-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
public class Utils {

public static final Logger logger = LoggerFactory.getLogger(Utils.class);
private static final long INSTANCE_ID = System.currentTimeMillis();

private Utils() {
// no-op
Expand Down Expand Up @@ -64,7 +65,6 @@ public static Mono<Void> onShutdown(Callable callable) {
* @return instance id
*/
public static String instanceId() {
return Optional.ofNullable(Configurations.INSTANCE_ID)
.orElseGet(() -> "" + System.currentTimeMillis());
return Optional.ofNullable(Configurations.INSTANCE_ID).orElseGet(() -> "" + INSTANCE_ID);
}
}