diff --git a/pom.xml b/pom.xml
index afd9fb4f..d8fffae6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,6 +38,7 @@
reactor-aeron
+ reactor-aeron-archive
reactor-aeron-benchmarks
@@ -54,6 +55,11 @@
aeron-client
${aeron.version}
+
+ io.aeron
+ aeron-archive
+ ${aeron.version}
+
diff --git a/reactor-aeron-archive/pom.xml b/reactor-aeron-archive/pom.xml
new file mode 100644
index 00000000..dd8da4d3
--- /dev/null
+++ b/reactor-aeron-archive/pom.xml
@@ -0,0 +1,53 @@
+
+
+ 4.0.0
+
+
+ io.scalecube
+ reactor-aeron-parent
+ 0.1.4-SNAPSHOT
+
+
+ reactor-aeron-archive
+
+
+
+ io.scalecube
+ reactor-aeron
+ ${project.version}
+
+
+
+ io.aeron
+ aeron-archive
+
+
+
+ io.projectreactor
+ reactor-core
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ test
+
+
+ org.apache.logging.log4j
+ log4j-core
+ test
+
+
+ com.lmax
+ disruptor
+ test
+
+
+
+
diff --git a/reactor-aeron-archive/src/main/java/reactor/aeron/AeronArchiveResources.java b/reactor-aeron-archive/src/main/java/reactor/aeron/AeronArchiveResources.java
new file mode 100644
index 00000000..2d580a76
--- /dev/null
+++ b/reactor-aeron-archive/src/main/java/reactor/aeron/AeronArchiveResources.java
@@ -0,0 +1,380 @@
+package reactor.aeron;
+
+import io.aeron.Aeron;
+import io.aeron.archive.Archive;
+import io.aeron.archive.ArchivingMediaDriver;
+import io.aeron.driver.MediaDriver;
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import org.agrona.CloseHelper;
+import org.agrona.IoUtil;
+import org.agrona.concurrent.BackoffIdleStrategy;
+import org.agrona.concurrent.IdleStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+public final class AeronArchiveResources implements OnDisposable {
+
+ private static final Logger logger = LoggerFactory.getLogger(AeronArchiveResources.class);
+
+ private static final Supplier defaultBackoffIdleStrategySupplier =
+ () -> new BackoffIdleStrategy(0, 0, 0, 1);
+
+ // Settings
+
+ private int pollFragmentLimit = 32;
+ private int numOfWorkers = Runtime.getRuntime().availableProcessors();
+
+ private Aeron.Context aeronContext =
+ new Aeron.Context().errorHandler(th -> logger.warn("Aeron exception occurred: " + th, th));
+
+ private MediaDriver.Context mediaContext =
+ new MediaDriver.Context()
+ .errorHandler(th -> logger.warn("Exception occurred on MediaDriver: " + th, th))
+ .warnIfDirectoryExists(true)
+ .dirDeleteOnStart(true)
+ // low latency settings
+ .termBufferSparseFile(false)
+ // explicit range of reserved session ids
+ .publicationReservedSessionIdLow(0)
+ .publicationReservedSessionIdHigh(Integer.MAX_VALUE);
+
+ private Archive.Context archiveContext =
+ new Archive.Context()
+ .errorHandler(th -> logger.warn("Exception occurred on Archive: " + th, th));
+
+ private Supplier workerIdleStrategySupplier = defaultBackoffIdleStrategySupplier;
+
+ // State
+ private Aeron aeron;
+ private ArchivingMediaDriver archivingMediaDriver;
+ private AeronEventLoopGroup eventLoopGroup;
+
+ // Lifecycle
+ private final MonoProcessor start = MonoProcessor.create();
+ private final MonoProcessor onStart = MonoProcessor.create();
+ private final MonoProcessor dispose = MonoProcessor.create();
+ private final MonoProcessor onDispose = MonoProcessor.create();
+
+ /**
+ * Default constructor. Setting up start and dispose routings. See methods: {@link #doStart()} and
+ * {@link #doDispose()}.
+ */
+ public AeronArchiveResources() {
+ start
+ .then(doStart())
+ .doOnSuccess(avoid -> onStart.onComplete())
+ .doOnError(onStart::onError)
+ .subscribe(
+ null,
+ th -> {
+ logger.error("{} failed to start, cause: {}", this, th.toString());
+ dispose();
+ });
+
+ dispose
+ .then(doDispose())
+ .doFinally(s -> onDispose.onComplete())
+ .subscribe(
+ null,
+ th -> logger.warn("{} failed on doDispose(): {}", this, th.toString()),
+ () -> logger.debug("Disposed {}", this));
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param ac aeron context
+ * @param mdc media driver context
+ */
+ private AeronArchiveResources(
+ AeronArchiveResources that, Aeron.Context ac, MediaDriver.Context mdc) {
+ this();
+ this.pollFragmentLimit = that.pollFragmentLimit;
+ this.numOfWorkers = that.numOfWorkers;
+ this.workerIdleStrategySupplier = that.workerIdleStrategySupplier;
+ copy(ac);
+ copy(mdc);
+ }
+
+ private AeronArchiveResources copy() {
+ return new AeronArchiveResources(this, aeronContext, mediaContext);
+ }
+
+ private void copy(MediaDriver.Context mdc) {
+ mediaContext
+ .aeronDirectoryName(mdc.aeronDirectoryName())
+ .dirDeleteOnStart(mdc.dirDeleteOnStart())
+ .imageLivenessTimeoutNs(mdc.imageLivenessTimeoutNs())
+ .mtuLength(mdc.mtuLength())
+ .driverTimeoutMs(mdc.driverTimeoutMs())
+ .errorHandler(mdc.errorHandler())
+ .threadingMode(mdc.threadingMode())
+ .applicationSpecificFeedback(mdc.applicationSpecificFeedback())
+ .cachedEpochClock(mdc.cachedEpochClock())
+ .cachedNanoClock(mdc.cachedNanoClock())
+ .clientLivenessTimeoutNs(mdc.clientLivenessTimeoutNs())
+ .conductorIdleStrategy(mdc.conductorIdleStrategy())
+ .conductorThreadFactory(mdc.conductorThreadFactory())
+ .congestControlSupplier(mdc.congestionControlSupplier())
+ .counterFreeToReuseTimeoutNs(mdc.counterFreeToReuseTimeoutNs())
+ .countersManager(mdc.countersManager())
+ .countersMetaDataBuffer(mdc.countersMetaDataBuffer())
+ .countersValuesBuffer(mdc.countersValuesBuffer())
+ .epochClock(mdc.epochClock())
+ .warnIfDirectoryExists(mdc.warnIfDirectoryExists())
+ .useWindowsHighResTimer(mdc.useWindowsHighResTimer())
+ .useConcurrentCountersManager(mdc.useConcurrentCountersManager())
+ .unicastFlowControlSupplier(mdc.unicastFlowControlSupplier())
+ .multicastFlowControlSupplier(mdc.multicastFlowControlSupplier())
+ .timerIntervalNs(mdc.timerIntervalNs())
+ .termBufferSparseFile(mdc.termBufferSparseFile())
+ .tempBuffer(mdc.tempBuffer())
+ .systemCounters(mdc.systemCounters())
+ .statusMessageTimeoutNs(mdc.statusMessageTimeoutNs())
+ .spiesSimulateConnection(mdc.spiesSimulateConnection())
+ .sharedThreadFactory(mdc.sharedThreadFactory())
+ .sharedNetworkThreadFactory(mdc.sharedNetworkThreadFactory())
+ .sharedNetworkIdleStrategy(mdc.sharedNetworkIdleStrategy())
+ .sharedIdleStrategy(mdc.sharedIdleStrategy())
+ .senderThreadFactory(mdc.senderThreadFactory())
+ .senderIdleStrategy(mdc.senderIdleStrategy())
+ .sendChannelEndpointSupplier(mdc.sendChannelEndpointSupplier())
+ .receiverThreadFactory(mdc.receiverThreadFactory())
+ .receiverIdleStrategy(mdc.receiverIdleStrategy())
+ .receiveChannelEndpointThreadLocals(mdc.receiveChannelEndpointThreadLocals())
+ .receiveChannelEndpointSupplier(mdc.receiveChannelEndpointSupplier())
+ .publicationUnblockTimeoutNs(mdc.publicationUnblockTimeoutNs())
+ .publicationTermBufferLength(mdc.publicationTermBufferLength())
+ .publicationReservedSessionIdLow(mdc.publicationReservedSessionIdLow())
+ .publicationReservedSessionIdHigh(mdc.publicationReservedSessionIdHigh())
+ .publicationLingerTimeoutNs(mdc.publicationLingerTimeoutNs())
+ .publicationConnectionTimeoutNs(mdc.publicationConnectionTimeoutNs())
+ .performStorageChecks(mdc.performStorageChecks())
+ .nanoClock(mdc.nanoClock())
+ .lossReport(mdc.lossReport())
+ .ipcTermBufferLength(mdc.ipcTermBufferLength())
+ .ipcMtuLength(mdc.ipcMtuLength())
+ .initialWindowLength(mdc.initialWindowLength())
+ .filePageSize(mdc.filePageSize())
+ .errorLog(mdc.errorLog());
+ }
+
+ private void copy(Aeron.Context ac) {
+ aeronContext
+ .resourceLingerDurationNs(ac.resourceLingerDurationNs())
+ .keepAliveInterval(ac.keepAliveInterval())
+ .errorHandler(ac.errorHandler())
+ .driverTimeoutMs(ac.driverTimeoutMs())
+ .availableImageHandler(ac.availableImageHandler())
+ .unavailableImageHandler(ac.unavailableImageHandler())
+ .idleStrategy(ac.idleStrategy())
+ .aeronDirectoryName(ac.aeronDirectoryName())
+ .availableCounterHandler(ac.availableCounterHandler())
+ .unavailableCounterHandler(ac.unavailableCounterHandler())
+ .useConductorAgentInvoker(ac.useConductorAgentInvoker())
+ .threadFactory(ac.threadFactory())
+ .epochClock(ac.epochClock())
+ .clientLock(ac.clientLock())
+ .nanoClock(ac.nanoClock());
+ }
+
+ private static String generateRandomTmpDirName() {
+ return IoUtil.tmpDirName()
+ + "aeron"
+ + '-'
+ + System.getProperty("user.name", "default")
+ + '-'
+ + UUID.randomUUID().toString();
+ }
+
+ /**
+ * Applies modifier and produces new {@code AeronArchiveResources} object.
+ *
+ * @param o modifier operator
+ * @return new {@code AeronArchiveResources} object
+ */
+ public AeronArchiveResources aeron(UnaryOperator o) {
+ AeronArchiveResources c = copy();
+ Aeron.Context ac = o.apply(c.aeronContext);
+ return new AeronArchiveResources(this, ac, c.mediaContext);
+ }
+
+ /**
+ * Applies modifier and produces new {@code AeronArchiveResources} object.
+ *
+ * @param o modifier operator
+ * @return new {@code AeronArchiveResources} object
+ */
+ public AeronArchiveResources media(UnaryOperator o) {
+ AeronArchiveResources c = copy();
+ MediaDriver.Context mdc = o.apply(c.mediaContext);
+ return new AeronArchiveResources(this, c.aeronContext, mdc);
+ }
+
+ /**
+ * Set to use temp directory instead of default aeron directory.
+ *
+ * @return new {@code AeronArchiveResources} object
+ */
+ public AeronArchiveResources useTmpDir() {
+ return media(mdc -> mdc.aeronDirectoryName(generateRandomTmpDirName()));
+ }
+
+ /**
+ * Shortcut for {@code numOfWorkers(1)}.
+ *
+ * @return new {@code AeronArchiveResources} object
+ */
+ public AeronArchiveResources singleWorker() {
+ return numOfWorkers(1);
+ }
+
+ /**
+ * Setting number of worker threads.
+ *
+ * @param n number of worker threads
+ * @return new {@code AeronArchiveResources} object
+ */
+ public AeronArchiveResources numOfWorkers(int n) {
+ AeronArchiveResources c = copy();
+ c.numOfWorkers = n;
+ return c;
+ }
+
+ /**
+ * Settings fragment limit for polling.
+ *
+ * @param pollFragmentLimit fragment limit for polling
+ * @return new {@code AeronArchiveResources} object
+ */
+ public AeronArchiveResources pollFragmentLimit(int pollFragmentLimit) {
+ AeronArchiveResources c = copy();
+ c.pollFragmentLimit = pollFragmentLimit;
+ return c;
+ }
+
+ /**
+ * Setter for supplier of {@code IdleStrategy} for worker thread(s).
+ *
+ * @param s supplier of {@code IdleStrategy} for worker thread(s)
+ * @return @return new {@code AeronArchiveResources} object
+ */
+ public AeronArchiveResources workerIdleStrategySupplier(Supplier s) {
+ AeronArchiveResources c = copy();
+ c.workerIdleStrategySupplier = s;
+ return c;
+ }
+
+ /**
+ * Starting up this resources instance if not started already.
+ *
+ * @return started {@code AeronArchiveResources} object
+ */
+ public Mono start() {
+ return Mono.defer(
+ () -> {
+ start.onComplete();
+ return onStart.thenReturn(this);
+ });
+ }
+
+ private Mono doStart() {
+ return Mono.fromRunnable(
+ () -> {
+ archivingMediaDriver = ArchivingMediaDriver.launch(mediaContext, archiveContext);
+
+ aeronContext.aeronDirectoryName(archivingMediaDriver.mediaDriver().aeronDirectoryName());
+
+ aeron = Aeron.connect(aeronContext);
+
+ eventLoopGroup =
+ new AeronEventLoopGroup(
+ "reactor-aeron-archive", numOfWorkers, workerIdleStrategySupplier);
+
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () ->
+ deleteAeronDirectory(
+ archivingMediaDriver.mediaDriver().aeronDirectoryName())));
+
+ logger.debug(
+ "{} has initialized embedded media driver, aeron directory: {}",
+ this,
+ aeronContext.aeronDirectoryName());
+ });
+ }
+
+ /**
+ * Shortcut method for {@code eventLoopGroup.next()}.
+ *
+ * @return {@code AeronEventLoop} instance
+ */
+ AeronEventLoop nextEventLoop() {
+ return eventLoopGroup.next();
+ }
+
+ /**
+ * Returns already used first event loop. See {@code eventLoopGroup.first()}.
+ *
+ * @return {@code AeronEventLoop} instance
+ */
+ AeronEventLoop firstEventLoop() {
+ return eventLoopGroup.first();
+ }
+
+ @Override
+ public void dispose() {
+ dispose.onComplete();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return onDispose.isDisposed();
+ }
+
+ @Override
+ public Mono onDispose() {
+ return onDispose;
+ }
+
+ private Mono doDispose() {
+ return Mono.defer(
+ () -> {
+ logger.debug("Disposing {}", this);
+
+ return Mono //
+ .fromRunnable(eventLoopGroup::dispose)
+ .then(eventLoopGroup.onDispose())
+ .doFinally(
+ s -> {
+ CloseHelper.quietClose(aeron);
+
+ CloseHelper.quietClose(archivingMediaDriver);
+
+ Optional.ofNullable(aeronContext)
+ .ifPresent(c -> IoUtil.delete(c.aeronDirectory(), true));
+ });
+ });
+ }
+
+ private void deleteAeronDirectory(String aeronDirectoryName) {
+ File aeronDirectory = Paths.get(aeronDirectoryName).toFile();
+ if (aeronDirectory.exists()) {
+ IoUtil.delete(aeronDirectory, true);
+ logger.debug("{} deleted aeron directory {}", this, aeronDirectoryName);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AeronArchiveResources" + Integer.toHexString(System.identityHashCode(this));
+ }
+}
diff --git a/reactor-aeron-benchmarks/pom.xml b/reactor-aeron-benchmarks/pom.xml
index 947b064e..549b28a0 100644
--- a/reactor-aeron-benchmarks/pom.xml
+++ b/reactor-aeron-benchmarks/pom.xml
@@ -24,6 +24,11 @@
reactor-aeron
${project.version}
+
+ io.scalecube
+ reactor-aeron-archive
+ ${project.version}
+
org.hdrhistogram
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/Utils.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/Utils.java
new file mode 100644
index 00000000..845ddb03
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/Utils.java
@@ -0,0 +1,223 @@
+package reactor.aeron.pure.archive;
+
+import io.aeron.Publication;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.client.RecordingDescriptorConsumer;
+import java.io.File;
+import java.util.UUID;
+import org.agrona.BufferUtil;
+import org.agrona.IoUtil;
+import org.agrona.concurrent.UnsafeBuffer;
+import reactor.core.publisher.Flux;
+
+public class Utils {
+
+ private Utils() {
+ // no-op
+ }
+
+ /**
+ * Creates tmp file with using the given value.
+ *
+ * @param value target.
+ */
+ public static String tmpFileName(String value) {
+ return IoUtil.tmpDirName()
+ + value
+ + '-'
+ + System.getProperty("user.name", "default")
+ + '-'
+ + UUID.randomUUID().toString();
+ }
+
+ /**
+ * Creates tmp file with using the given value.
+ *
+ * @param value target.
+ */
+ public static void removeFile(String value) {
+ IoUtil.delete(new File(value), true);
+ }
+
+ /** Sends the given body via the given publication */
+ public static void send(Publication publication, String body) {
+ byte[] messageBytes = body.getBytes();
+ UnsafeBuffer buffer =
+ new UnsafeBuffer(BufferUtil.allocateDirectAligned(messageBytes.length, 64));
+ buffer.putBytes(0, messageBytes);
+ long result = publication.offer(buffer);
+ System.out.println("Offered " + body + " --- " + checkResult(result));
+ }
+
+ /**
+ * Returns the list of {@link RecordingDescriptor}.
+ *
+ * @param aeronArchive archive client
+ * @param channel target channel
+ * @param channelStreamId target channel stream id
+ */
+ public static Flux findRecording(
+ final AeronArchive aeronArchive,
+ String channel,
+ int channelStreamId,
+ int fromRecordingId,
+ int count) {
+
+ return Flux.create(
+ sink -> {
+ final RecordingDescriptorConsumer consumer =
+ (controlSessionId,
+ correlationId,
+ recordingId,
+ startTimestamp,
+ stopTimestamp,
+ startPosition,
+ stopPosition,
+ initialTermId,
+ segmentFileLength,
+ termBufferLength,
+ mtuLength,
+ sessionId,
+ streamId,
+ strippedChannel,
+ originalChannel,
+ sourceIdentity) ->
+ sink.next(
+ new RecordingDescriptor(
+ controlSessionId,
+ correlationId,
+ recordingId,
+ startTimestamp,
+ stopTimestamp,
+ startPosition,
+ stopPosition,
+ initialTermId,
+ segmentFileLength,
+ termBufferLength,
+ mtuLength,
+ sessionId,
+ streamId,
+ strippedChannel,
+ originalChannel,
+ sourceIdentity));
+
+ aeronArchive.listRecordingsForUri(
+ fromRecordingId, count, channel, channelStreamId, consumer);
+
+ sink.complete();
+ });
+ }
+
+ public static class RecordingDescriptor {
+ public final long controlSessionId;
+ public final long correlationId;
+ public final long recordingId;
+ public final long startTimestamp;
+ public final long stopTimestamp;
+ public final long startPosition;
+ public final long stopPosition;
+ public final int initialTermId;
+ public final int segmentFileLength;
+ public final int termBufferLength;
+ public final int mtuLength;
+ public final int sessionId;
+ public final int streamId;
+ public final String strippedChannel;
+ public final String originalChannel;
+ public final String sourceIdentity;
+
+ public RecordingDescriptor(
+ long controlSessionId,
+ long correlationId,
+ long recordingId,
+ long startTimestamp,
+ long stopTimestamp,
+ long startPosition,
+ long stopPosition,
+ int initialTermId,
+ int segmentFileLength,
+ int termBufferLength,
+ int mtuLength,
+ int sessionId,
+ int streamId,
+ String strippedChannel,
+ String originalChannel,
+ String sourceIdentity) {
+ this.controlSessionId = controlSessionId;
+ this.correlationId = correlationId;
+ this.recordingId = recordingId;
+ this.startTimestamp = startTimestamp;
+ this.stopTimestamp = stopTimestamp;
+ this.startPosition = startPosition;
+ this.stopPosition = stopPosition;
+ this.initialTermId = initialTermId;
+ this.segmentFileLength = segmentFileLength;
+ this.termBufferLength = termBufferLength;
+ this.mtuLength = mtuLength;
+ this.sessionId = sessionId;
+ this.streamId = streamId;
+ this.strippedChannel = strippedChannel;
+ this.originalChannel = originalChannel;
+ this.sourceIdentity = sourceIdentity;
+ }
+
+ @Override
+ public String toString() {
+ return "RecordingDescriptor{"
+ + "controlSessionId="
+ + controlSessionId
+ + ", correlationId="
+ + correlationId
+ + ", recordingId="
+ + recordingId
+ + ", startTimestamp="
+ + startTimestamp
+ + ", stopTimestamp="
+ + stopTimestamp
+ + ", startPosition="
+ + startPosition
+ + ", stopPosition="
+ + stopPosition
+ + ", initialTermId="
+ + initialTermId
+ + ", segmentFileLength="
+ + segmentFileLength
+ + ", termBufferLength="
+ + termBufferLength
+ + ", mtuLength="
+ + mtuLength
+ + ", sessionId="
+ + sessionId
+ + ", streamId="
+ + streamId
+ + ", strippedChannel='"
+ + strippedChannel
+ + '\''
+ + ", originalChannel='"
+ + originalChannel
+ + '\''
+ + ", sourceIdentity='"
+ + sourceIdentity
+ + '\''
+ + '}';
+ }
+ }
+
+ private static String checkResult(long result) {
+ if (result > 0) {
+ return "Success!";
+ } else if (result == Publication.BACK_PRESSURED) {
+ return "Offer failed due to back pressure";
+ } else if (result == Publication.ADMIN_ACTION) {
+ return "Offer failed because of an administration action in the system";
+ } else if (result == Publication.NOT_CONNECTED) {
+ return "Offer failed because publisher is not connected to subscriber";
+ } else if (result == Publication.CLOSED) {
+ return "Offer failed publication is closed";
+ } else if (result == Publication.MAX_POSITION_EXCEEDED) {
+ return "Offer failed due to publication reaching max position";
+ } else {
+ return "Offer failed due to unknown result code: " + result;
+ }
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/ExtendRecordingArchive.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/ExtendRecordingArchive.java
new file mode 100644
index 00000000..74d2956d
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/ExtendRecordingArchive.java
@@ -0,0 +1,183 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.Aeron;
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.ExclusivePublication;
+import io.aeron.Publication;
+import io.aeron.archive.Archive;
+import io.aeron.archive.ArchiveThreadingMode;
+import io.aeron.archive.ArchivingMediaDriver;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.codecs.SourceLocation;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.ThreadingMode;
+import java.time.Duration;
+import org.agrona.BufferUtil;
+import org.agrona.concurrent.UnsafeBuffer;
+import reactor.aeron.pure.archive.Utils;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class ExtendRecordingArchive {
+
+ static final String OUTGOING_ENDPOINT = "localhost:8181";
+ static final int OUTGOING_STREAM_ID = 2223;
+
+ static final int TERM_BUFFER_LENGTH = 64 * 1024;
+
+ static final String OUTGOING_URI =
+ new ChannelUriStringBuilder()
+ .controlEndpoint(OUTGOING_ENDPOINT)
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .termLength(TERM_BUFFER_LENGTH)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int MESSAGE_SIZE = 256;
+ private static final UnsafeBuffer BUFFER =
+ new UnsafeBuffer(BufferUtil.allocateDirectAligned(MESSAGE_SIZE, 64));
+ private static final UnsafeBuffer BUFFER2 =
+ new UnsafeBuffer(BufferUtil.allocateDirectAligned(MESSAGE_SIZE, 64));
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) throws Exception {
+ String aeronDirName = Utils.tmpFileName("aeron");
+ String archiveDirName = aeronDirName + "-archive";
+
+ try (ArchivingMediaDriver archivingMediaDriver =
+ ArchivingMediaDriver.launch(
+ new MediaDriver.Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true),
+ new Archive.Context()
+ .aeronDirectoryName(aeronDirName)
+ .archiveDirectoryName(archiveDirName)
+ .threadingMode(ArchiveThreadingMode.SHARED)
+ .errorHandler(Throwable::printStackTrace)
+ .fileSyncLevel(0)
+ .deleteArchiveOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(new AeronArchive.Context().aeronDirectoryName(aeronDirName))) {
+ print(archivingMediaDriver);
+
+ Aeron aeron = aeronArchive.context().aeron();
+
+ ExclusivePublication outgoingPublication1 =
+ aeron.addExclusivePublication(OUTGOING_URI, OUTGOING_STREAM_ID);
+
+ aeronArchive.startRecording(OUTGOING_URI, OUTGOING_STREAM_ID, SourceLocation.LOCAL);
+
+ long recordingId1 =
+ Utils.findRecording(aeronArchive, OUTGOING_URI, OUTGOING_STREAM_ID, 0, 100)
+ .log("found recordings ")
+ .blockLast()
+ .recordingId;
+
+ int initialTermId1 = outgoingPublication1.initialTermId();
+
+ Flux.interval(Duration.ofSeconds(1))
+ .doOnNext(
+ i -> {
+ String message = "1@" + i;
+ byte[] messageBytes = message.getBytes();
+ BUFFER.putBytes(0, messageBytes);
+ long result = outgoingPublication1.offer(BUFFER, 0, messageBytes.length);
+ System.out.print(
+ "1 Sent: `" + message + "`, cur_pos: " + outgoingPublication1.position() + " - ");
+ checkResult(result);
+ })
+ .subscribe();
+
+ Mono.delay(Duration.ofSeconds(10))
+ .doOnSuccess(
+ $ -> {
+ aeronArchive.stopRecording(OUTGOING_URI, OUTGOING_STREAM_ID);
+
+ long stopPosition = aeronArchive.getStopPosition(recordingId1);
+
+ String extendedUri = OUTGOING_URI;
+
+
+ new ChannelUriStringBuilder()
+ .endpoint(OUTGOING_ENDPOINT)
+ .initialPosition(stopPosition, initialTermId1, TERM_BUFFER_LENGTH)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+
+
+ ExclusivePublication outgoingPublication2 = aeron.addExclusivePublication(extendedUri,
+ OUTGOING_STREAM_ID);
+
+ aeronArchive.extendRecording(recordingId1, OUTGOING_URI, OUTGOING_STREAM_ID, SourceLocation.LOCAL);
+
+
+ Flux.interval(Duration.ofSeconds(1))
+ .doOnNext(
+ i -> {
+ String message = "2@" + i;
+ byte[] messageBytes = message.getBytes();
+ BUFFER2.putBytes(0, messageBytes);
+ long result = outgoingPublication2.offer(BUFFER2, 0, messageBytes.length);
+ System.out.print(
+ "2 Sent: `" + message + "`, cur_pos: " + outgoingPublication2.position() + " - ");
+ checkResult(result);
+ })
+ .subscribe();
+
+
+// aeronArchive.startRecording(OUTGOING_URI, OUTGOING_STREAM_ID_1, SourceLocation.LOCAL);
+ })
+ .subscribe();
+
+ Thread.currentThread().join();
+ } finally {
+ Utils.removeFile(archiveDirName);
+ Utils.removeFile(aeronDirName);
+ }
+ }
+
+ private static void print(ArchivingMediaDriver archivingMediaDriver) {
+ MediaDriver mediaDriver = archivingMediaDriver.mediaDriver();
+ Archive archive = archivingMediaDriver.archive();
+ Archive.Context context = archivingMediaDriver.archive().context();
+
+ System.out.println("Archive threadingMode: " + context.threadingMode());
+ System.out.println("Archive controlChannel: " + context.controlChannel());
+ System.out.println("Archive controlStreamId: " + context.controlStreamId());
+ System.out.println("Archive localControlChannel: " + context.localControlChannel());
+ System.out.println("Archive localControlStreamId: " + context.localControlStreamId());
+ System.out.println("Archive recordingEventsChannel: " + context.recordingEventsChannel());
+ System.out.println("Archive recordingEventsStreamId: " + context.recordingEventsStreamId());
+ System.out.println("Archive controlTermBufferSparse: " + context.controlTermBufferSparse());
+ System.out.println("Archive archiveDirName: " + archive.context().archiveDirectoryName());
+ System.out.println("Archive aeronDirectoryName: " + mediaDriver.aeronDirectoryName());
+ }
+
+ private static void checkResult(final long result) {
+ if (result > 0) {
+ System.out.println("yay!");
+ } else if (result == Publication.BACK_PRESSURED) {
+ System.out.println("Offer failed due to back pressure");
+ } else if (result == Publication.ADMIN_ACTION) {
+ System.out.println("Offer failed because of an administration action in the system");
+ } else if (result == Publication.NOT_CONNECTED) {
+ System.out.println("Offer failed because publisher is not connected to subscriber");
+ } else if (result == Publication.CLOSED) {
+ System.out.println("Offer failed publication is closed");
+ } else if (result == Publication.MAX_POSITION_EXCEEDED) {
+ throw new IllegalStateException("Offer failed due to publication reaching max position");
+ } else {
+ System.out.println("Offer failed due to unknown result code: " + result);
+ }
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/ExtendedRecordingArchiveHandler.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/ExtendedRecordingArchiveHandler.java
new file mode 100644
index 00000000..b3c46fac
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/ExtendedRecordingArchiveHandler.java
@@ -0,0 +1,199 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.Subscription;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.client.RecordingDescriptorConsumer;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.MediaDriver.Context;
+import io.aeron.driver.ThreadingMode;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.agrona.collections.MutableLong;
+import org.agrona.concurrent.SigInt;
+import org.agrona.concurrent.YieldingIdleStrategy;
+import reactor.aeron.pure.archive.Utils;
+import reactor.aeron.pure.archive.Utils.RecordingDescriptor;
+
+public class ExtendedRecordingArchiveHandler {
+
+ private static final String REPLAY_URI =
+ new ChannelUriStringBuilder()
+ .controlEndpoint(ExtendRecordingArchive.OUTGOING_ENDPOINT)
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .termLength(ExtendRecordingArchive.TERM_BUFFER_LENGTH)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int REPLAY_STREAM_ID = 2225;
+ private static final int FRAGMENT_LIMIT = 10;
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) throws Exception {
+ final AtomicBoolean running = new AtomicBoolean(true);
+ SigInt.register(() -> running.set(false));
+
+ String aeronDirName = Utils.tmpFileName("aeron");
+
+ try (MediaDriver mediaDriver =
+ MediaDriver.launch(
+ new Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(
+ new AeronArchive.Context()
+ .controlResponseChannel(
+ new ChannelUriStringBuilder()
+ .controlEndpoint("localhost:8028")
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build())
+ .controlResponseStreamId(18028)
+ .aeronDirectoryName(aeronDirName))) {
+
+ RecordingDescriptor recording =
+ Utils.findRecording(
+ aeronArchive,
+ ExtendRecordingArchive.OUTGOING_URI,
+ ExtendRecordingArchive.OUTGOING_STREAM_ID,
+ 0,
+ 100)
+ .log("found recordings ")
+ .blockLast();
+
+ long recordingId = recording.recordingId;
+ long startPosition = recording.startPosition;
+
+ Subscription subscription =
+ aeronArchive.replay(
+ recordingId,
+ startPosition,
+ Long.MAX_VALUE,
+ REPLAY_URI,
+ REPLAY_STREAM_ID);
+
+ YieldingIdleStrategy idleStrategy = new YieldingIdleStrategy();
+
+ try {
+ while (running.get()) {
+ int works =
+ subscription.poll(
+ (buffer, offset, length, header) -> {
+ final byte[] data = new byte[length];
+ buffer.getBytes(offset, data);
+
+ System.out.println(
+ String.format(
+ "msg{ offset: %s, length: %s, body: %s }, header{ pos: %s, offset: %s, type: %s }, channel { stream: %s, session: %s, initialTermId: %s, termId: %s, termOffset: %s, flags: %s }",
+ offset,
+ length,
+ new String(data),
+ header.position(),
+ header.offset(),
+ header.type(),
+ ExtendRecordingArchive.OUTGOING_STREAM_ID,
+ header.sessionId(),
+ header.initialTermId(),
+ header.termId(),
+ header.termOffset(),
+ header.flags()));
+ },
+ FRAGMENT_LIMIT)
+ > 0
+ ? 1
+ : 0;
+
+ idleStrategy.idle(works);
+ }
+ } finally {
+ aeronArchive.stopReplay(subscription.images().get(0).sessionId());
+ }
+
+ Thread.currentThread().join();
+ } finally {
+ Utils.removeFile(aeronDirName);
+ }
+ }
+
+ private static long findLatestRecording(
+ final AeronArchive archive, String channel, int channelStreamId) {
+ final MutableLong lastRecordingId = new MutableLong();
+
+ final RecordingDescriptorConsumer consumer =
+ (controlSessionId,
+ correlationId,
+ recordingId,
+ startTimestamp,
+ stopTimestamp,
+ startPosition,
+ stopPosition,
+ initialTermId,
+ segmentFileLength,
+ termBufferLength,
+ mtuLength,
+ sessionId,
+ streamId,
+ strippedChannel,
+ originalChannel,
+ sourceIdentity) -> {
+ System.out.println(
+ new StringBuilder()
+ .append("controlSessionId: ")
+ .append(controlSessionId)
+ .append(", correlationId: ")
+ .append(correlationId)
+ .append(", recordingId: ")
+ .append(recordingId)
+ .append(", startTimestamp: ")
+ .append(startTimestamp)
+ .append(", stopTimestamp: ")
+ .append(stopTimestamp)
+ .append(", startPosition: ")
+ .append(startPosition)
+ .append(", stopPosition: ")
+ .append(stopPosition)
+ .append(", initialTermId: ")
+ .append(initialTermId)
+ .append(", segmentFileLength: ")
+ .append(segmentFileLength)
+ .append(", termBufferLength: ")
+ .append(termBufferLength)
+ .append(", mtuLength: ")
+ .append(mtuLength)
+ .append(", sessionId: ")
+ .append(sessionId)
+ .append(", streamId: ")
+ .append(streamId)
+ .append(", strippedChannel: ")
+ .append(strippedChannel)
+ .append(", originalChannel: ")
+ .append(originalChannel)
+ .append(", sourceIdentity: ")
+ .append(sourceIdentity));
+
+ lastRecordingId.set(recordingId);
+ };
+
+ final long fromRecordingId = 0L;
+ final int recordCount = 100;
+
+ final int foundCount =
+ archive.listRecordingsForUri(
+ fromRecordingId, recordCount, channel, channelStreamId, consumer);
+
+ if (foundCount == 0) {
+ throw new IllegalStateException("no recordings found");
+ }
+
+ return lastRecordingId.get();
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/IntermediateArchive.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/IntermediateArchive.java
new file mode 100644
index 00000000..1f48c0c7
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/IntermediateArchive.java
@@ -0,0 +1,180 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.Aeron;
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.archive.Archive;
+import io.aeron.archive.ArchiveThreadingMode;
+import io.aeron.archive.ArchivingMediaDriver;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.codecs.SourceLocation;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.ThreadingMode;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.agrona.concurrent.SigInt;
+import reactor.aeron.Configurations;
+import reactor.aeron.pure.archive.Utils;
+
+public class IntermediateArchive {
+
+ static final String CONTROL_ENDPOINT = "localhost:7171";
+ static final int CONTROL_STREAM_ID = 2222;
+ static final String REPLAY_ENDPOINT = "localhost:8181";
+ static final int REPLAY_STREAM_ID = 2223;
+
+ private static final ChannelUriStringBuilder CONTROL_URI_BUILDER =
+ new ChannelUriStringBuilder()
+ .endpoint(CONTROL_ENDPOINT)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA);
+
+ private static final ChannelUriStringBuilder REPLAY_URI_BUILDER =
+ new ChannelUriStringBuilder()
+ .controlEndpoint(REPLAY_ENDPOINT)
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA);
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) throws Exception {
+ final AtomicBoolean running = new AtomicBoolean(true);
+ SigInt.register(() -> running.set(false));
+
+ String aeronDirName = Utils.tmpFileName("aeron");
+ String archiveDirName = aeronDirName + "-archive";
+
+ try (ArchivingMediaDriver archivingMediaDriver =
+ ArchivingMediaDriver.launch(
+ new MediaDriver.Context()
+ .threadingMode(ThreadingMode.SHARED)
+ // .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true),
+ new Archive.Context()
+ .aeronDirectoryName(aeronDirName)
+ .archiveDirectoryName(archiveDirName)
+ .threadingMode(ArchiveThreadingMode.SHARED)
+ .errorHandler(Throwable::printStackTrace)
+ .fileSyncLevel(0)
+ .deleteArchiveOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(new AeronArchive.Context().aeronDirectoryName(aeronDirName))) {
+ print(archivingMediaDriver);
+
+ Aeron aeron = aeronArchive.context().aeron();
+
+ aeron.addSubscription(
+ REPLAY_URI_BUILDER.build(),
+ REPLAY_STREAM_ID,
+ Configurations::printAvailableImage,
+ Configurations::printUnavailableImage);
+
+ aeron.addSubscription(
+ CONTROL_URI_BUILDER.build(),
+ CONTROL_STREAM_ID,
+ Configurations::printAvailableImage,
+ Configurations::printUnavailableImage);
+
+ String recordingChannel = CONTROL_URI_BUILDER.build();
+ int recordingStreamId = CONTROL_STREAM_ID;
+
+ long subscriptionId =
+ aeronArchive.startRecording(recordingChannel, recordingStreamId, SourceLocation.REMOTE);
+
+ System.out.println(
+ "Created recording subscriptionId: "
+ + subscriptionId
+ + ", for channel: "
+ + recordingChannel
+ + ", streamId: "
+ + recordingStreamId);
+
+ HashSet recordingIds = new HashSet<>();
+
+ while (running.get()) {
+
+ aeronArchive.listRecordingsForUri(
+ 0,
+ 10000,
+ recordingChannel,
+ recordingStreamId,
+ (controlSessionId,
+ correlationId,
+ recordingId,
+ startTimestamp,
+ stopTimestamp,
+ startPosition,
+ stopPosition,
+ initialTermId,
+ segmentFileLength,
+ termBufferLength,
+ mtuLength,
+ sessionId,
+ streamId,
+ strippedChannel,
+ originalChannel,
+ sourceIdentity) -> {
+ if (recordingIds.add(recordingId)) {
+
+ System.out.println("Found new recording id: " + recordingId);
+
+ String replayChannel =
+ REPLAY_URI_BUILDER
+ // .sessionId(sessionId)
+ .build();
+ int replayStreamId = REPLAY_STREAM_ID;
+
+ final long replaySessionId =
+ aeronArchive.startReplay(
+ recordingId, 0, Long.MAX_VALUE, replayChannel, replayStreamId);
+
+ System.out.println(
+ "Started replaying, "
+ + "sessionId: "
+ + sessionId
+ + ", replaySessionId: "
+ + replaySessionId
+ + ", recordingId: "
+ + recordingId
+ + ", replay channel: "
+ + replayChannel
+ + ", streamId: "
+ + replayStreamId);
+ }
+ });
+
+ Thread.sleep(1000);
+ }
+
+ Thread.currentThread().join();
+ } finally {
+ Utils.removeFile(archiveDirName);
+ Utils.removeFile(aeronDirName);
+ }
+ }
+
+ private static void print(ArchivingMediaDriver archivingMediaDriver) {
+ MediaDriver mediaDriver = archivingMediaDriver.mediaDriver();
+ Archive archive = archivingMediaDriver.archive();
+ Archive.Context context = archivingMediaDriver.archive().context();
+
+ System.out.println("Archive threadingMode: " + context.threadingMode());
+ System.out.println("Archive controlChannel: " + context.controlChannel());
+ System.out.println("Archive controlStreamId: " + context.controlStreamId());
+ System.out.println("Archive localControlChannel: " + context.localControlChannel());
+ System.out.println("Archive localControlStreamId: " + context.localControlStreamId());
+ System.out.println("Archive recordingEventsChannel: " + context.recordingEventsChannel());
+ System.out.println("Archive recordingEventsStreamId: " + context.recordingEventsStreamId());
+ System.out.println("Archive controlTermBufferSparse: " + context.controlTermBufferSparse());
+ System.out.println("Archive archiveDirName: " + archive.context().archiveDirectoryName());
+ System.out.println("Archive aeronDirectoryName: " + mediaDriver.aeronDirectoryName());
+
+ System.out.println("Archive listen: " + CONTROL_URI_BUILDER);
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/MatchEngine.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/MatchEngine.java
new file mode 100644
index 00000000..4ae88cae
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/MatchEngine.java
@@ -0,0 +1,207 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.Aeron;
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.ExclusivePublication;
+import io.aeron.Publication;
+import io.aeron.Subscription;
+import io.aeron.archive.Archive;
+import io.aeron.archive.ArchiveThreadingMode;
+import io.aeron.archive.ArchivingMediaDriver;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.codecs.SourceLocation;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.ThreadingMode;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.agrona.BufferUtil;
+import org.agrona.concurrent.SigInt;
+import org.agrona.concurrent.UnsafeBuffer;
+import org.agrona.concurrent.YieldingIdleStrategy;
+import reactor.aeron.Configurations;
+import reactor.aeron.pure.archive.Utils;
+
+public class MatchEngine {
+
+ static final String INCOMING_ENDPOINT = "localhost:7171";
+ static final int INCOMING_STREAM_ID = 2222;
+ static final String INCOMING_RECORDING_ENDPOINT = "localhost:7172";
+ static final int INCOMING_RECORDING_STREAM_ID = 2224;
+ static final String OUTGOING_ENDPOINT = "localhost:8181";
+ static final int OUTGOING_STREAM_ID = 2223;
+
+ private static final String INCOMING_URI =
+ new ChannelUriStringBuilder()
+ .endpoint(INCOMING_ENDPOINT)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final String INCOMING_RECORDING_URI =
+ new ChannelUriStringBuilder()
+ .controlEndpoint(INCOMING_RECORDING_ENDPOINT)
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.IPC_MEDIA)
+ .build();
+ static final String OUTGOING_URI =
+ new ChannelUriStringBuilder()
+ .controlEndpoint(OUTGOING_ENDPOINT)
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int MESSAGE_SIZE = 256;
+ private static final UnsafeBuffer BUFFER =
+ new UnsafeBuffer(BufferUtil.allocateDirectAligned(MESSAGE_SIZE, 64));
+ private static final int FRAGMENT_LIMIT = 10;
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) throws Exception {
+ final AtomicBoolean running = new AtomicBoolean(true);
+ SigInt.register(() -> running.set(false));
+
+ String aeronDirName = Utils.tmpFileName("aeron");
+ String archiveDirName = aeronDirName + "-archive";
+
+ try (ArchivingMediaDriver archivingMediaDriver =
+ ArchivingMediaDriver.launch(
+ new MediaDriver.Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true),
+ new Archive.Context()
+ .aeronDirectoryName(aeronDirName)
+ .archiveDirectoryName(archiveDirName)
+ .threadingMode(ArchiveThreadingMode.SHARED)
+ .errorHandler(Throwable::printStackTrace)
+ .fileSyncLevel(0)
+ .deleteArchiveOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(new AeronArchive.Context().aeronDirectoryName(aeronDirName))) {
+ print(archivingMediaDriver);
+
+ Aeron aeron = aeronArchive.context().aeron();
+
+ aeronArchive.startRecording(
+ INCOMING_RECORDING_URI, INCOMING_RECORDING_STREAM_ID, SourceLocation.LOCAL);
+
+ Subscription incomingSubscription =
+ aeron.addSubscription(
+ INCOMING_URI,
+ INCOMING_STREAM_ID,
+ Configurations::printAvailableImage,
+ Configurations::printUnavailableImage);
+
+ ExclusivePublication recordingPublication =
+ aeron.addExclusivePublication(INCOMING_RECORDING_URI, INCOMING_RECORDING_STREAM_ID);
+
+ Subscription recordingSubscription =
+ aeron.addSubscription(INCOMING_RECORDING_URI, INCOMING_RECORDING_STREAM_ID);
+
+ ExclusivePublication outgoingPublication =
+ aeron.addExclusivePublication(OUTGOING_URI, OUTGOING_STREAM_ID);
+ aeronArchive.startRecording(OUTGOING_URI, OUTGOING_STREAM_ID, SourceLocation.LOCAL);
+
+ YieldingIdleStrategy idleStrategy = new YieldingIdleStrategy();
+
+ while (running.get()) {
+ int works = 0;
+
+ works =
+ incomingSubscription.poll(
+ (buffer, offset, length, header) -> {
+ while (true) {
+ long result = recordingPublication.offer(buffer, offset, length);
+ if (result > 0) {
+ break;
+ }
+ if (result == Publication.NOT_CONNECTED) {
+ System.err.println(
+ "Offer failed because publisher is not connected to subscriber");
+ } else if (result == Publication.CLOSED) {
+ System.err.println("Offer failed publication is closed");
+ } else if (result == Publication.MAX_POSITION_EXCEEDED) {
+ System.err.println(
+ "Offer failed due to publication reaching max position");
+ }
+ }
+ },
+ FRAGMENT_LIMIT)
+ > 0
+ ? 1
+ : 0;
+
+ works +=
+ recordingSubscription.poll(
+ (buffer, offset, length, header) -> {
+
+ // some business logic
+
+ if (length + Long.BYTES > MESSAGE_SIZE) {
+ System.err.println("can't publish msg with its position");
+ return;
+ }
+
+ final byte[] data = new byte[length];
+ buffer.getBytes(offset, data);
+
+ System.out.println(
+ String.format(
+ "msg{ offset: %s, length: %s, body: %s }, header{ pos: %s, offset: %s, type: %s }, channel { stream: %s, session: %s, initialTermId: %s, termId: %s, termOffset: %s, flags: %s }",
+ offset,
+ length,
+ new String(data),
+ header.position(),
+ header.offset(),
+ header.type(),
+ INCOMING_STREAM_ID,
+ header.sessionId(),
+ header.initialTermId(),
+ header.termId(),
+ header.termOffset(),
+ header.flags()));
+
+ BUFFER.putLong(0, header.position());
+ BUFFER.putBytes(Long.BYTES, buffer, offset, length);
+ outgoingPublication.offer(BUFFER, 0, length + Long.BYTES);
+ },
+ FRAGMENT_LIMIT)
+ > 0
+ ? 1
+ : 0;
+
+ idleStrategy.idle(works);
+ }
+
+ Thread.currentThread().join();
+ } finally {
+ Utils.removeFile(archiveDirName);
+ Utils.removeFile(aeronDirName);
+ }
+ }
+
+ private static void print(ArchivingMediaDriver archivingMediaDriver) {
+ MediaDriver mediaDriver = archivingMediaDriver.mediaDriver();
+ Archive archive = archivingMediaDriver.archive();
+ Archive.Context context = archivingMediaDriver.archive().context();
+
+ System.out.println("Archive threadingMode: " + context.threadingMode());
+ System.out.println("Archive controlChannel: " + context.controlChannel());
+ System.out.println("Archive controlStreamId: " + context.controlStreamId());
+ System.out.println("Archive localControlChannel: " + context.localControlChannel());
+ System.out.println("Archive localControlStreamId: " + context.localControlStreamId());
+ System.out.println("Archive recordingEventsChannel: " + context.recordingEventsChannel());
+ System.out.println("Archive recordingEventsStreamId: " + context.recordingEventsStreamId());
+ System.out.println("Archive controlTermBufferSparse: " + context.controlTermBufferSparse());
+ System.out.println("Archive archiveDirName: " + archive.context().archiveDirectoryName());
+ System.out.println("Archive aeronDirectoryName: " + mediaDriver.aeronDirectoryName());
+
+ System.out.println("Archive listen: " + INCOMING_URI);
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/MatchEngineEventHandler.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/MatchEngineEventHandler.java
new file mode 100644
index 00000000..047d8d49
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/MatchEngineEventHandler.java
@@ -0,0 +1,187 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.Subscription;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.client.RecordingDescriptorConsumer;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.MediaDriver.Context;
+import io.aeron.driver.ThreadingMode;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.agrona.collections.MutableLong;
+import org.agrona.concurrent.SigInt;
+import org.agrona.concurrent.YieldingIdleStrategy;
+import reactor.aeron.pure.archive.Utils;
+
+public class MatchEngineEventHandler {
+
+ private static final String REPLAY_URI =
+ new ChannelUriStringBuilder()
+ .controlEndpoint(MatchEngine.OUTGOING_ENDPOINT)
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int REPLAY_STREAM_ID = 2225;
+ private static final int FRAGMENT_LIMIT = 10;
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) throws Exception {
+ final AtomicBoolean running = new AtomicBoolean(true);
+ SigInt.register(() -> running.set(false));
+
+ String aeronDirName = Utils.tmpFileName("aeron");
+
+ try (MediaDriver mediaDriver =
+ MediaDriver.launch(
+ new Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(
+ new AeronArchive.Context()
+ .controlResponseChannel("aeron:udp?endpoint=localhost:8026")
+ .controlResponseStreamId(18026)
+ .aeronDirectoryName(aeronDirName))) {
+
+ long recordingId =
+ findLatestRecording(
+ aeronArchive, MatchEngine.OUTGOING_URI, MatchEngine.OUTGOING_STREAM_ID);
+
+ System.out.println("recordingId: " + recordingId);
+
+ long position = aeronArchive.getRecordingPosition(recordingId);
+ System.out.println("getRecordingPosition: " + position);
+ // position = AeronArchive.NULL_POSITION;
+ position = 0;
+
+ System.out.println("position: " + position);
+
+ Subscription subscription =
+ aeronArchive.replay(recordingId, position, Long.MAX_VALUE, REPLAY_URI, REPLAY_STREAM_ID);
+
+ YieldingIdleStrategy idleStrategy = new YieldingIdleStrategy();
+
+ try {
+ while (running.get()) {
+ int works =
+ subscription.poll(
+ (buffer, offset, length, header) -> {
+ final byte[] data = new byte[length - Long.BYTES];
+ buffer.getBytes(offset + Long.BYTES, data);
+
+ System.out.println(
+ String.format(
+ "msg{ externalOffset: %s offset: %s, length: %s, body: %s }, header{ pos: %s, offset: %s, type: %s }, channel { stream: %s, session: %s, initialTermId: %s, termId: %s, termOffset: %s, flags: %s }",
+ buffer.getLong(offset),
+ offset,
+ length,
+ new String(data),
+ header.position(),
+ header.offset(),
+ header.type(),
+ MatchEngine.OUTGOING_STREAM_ID,
+ header.sessionId(),
+ header.initialTermId(),
+ header.termId(),
+ header.termOffset(),
+ header.flags()));
+ },
+ FRAGMENT_LIMIT)
+ > 0
+ ? 1
+ : 0;
+
+ idleStrategy.idle(works);
+ }
+ } finally {
+ aeronArchive.stopReplay(subscription.images().get(0).sessionId());
+ }
+
+ Thread.currentThread().join();
+ } finally {
+ Utils.removeFile(aeronDirName);
+ }
+ }
+
+ private static long findLatestRecording(
+ final AeronArchive archive, String channel, int channelStreamId) {
+ final MutableLong lastRecordingId = new MutableLong();
+
+ final RecordingDescriptorConsumer consumer =
+ (controlSessionId,
+ correlationId,
+ recordingId,
+ startTimestamp,
+ stopTimestamp,
+ startPosition,
+ stopPosition,
+ initialTermId,
+ segmentFileLength,
+ termBufferLength,
+ mtuLength,
+ sessionId,
+ streamId,
+ strippedChannel,
+ originalChannel,
+ sourceIdentity) -> {
+ System.out.println(
+ new StringBuilder()
+ .append("controlSessionId: ")
+ .append(controlSessionId)
+ .append(", correlationId: ")
+ .append(correlationId)
+ .append(", recordingId: ")
+ .append(recordingId)
+ .append(", startTimestamp: ")
+ .append(startTimestamp)
+ .append(", stopTimestamp: ")
+ .append(stopTimestamp)
+ .append(", startPosition: ")
+ .append(startPosition)
+ .append(", stopPosition: ")
+ .append(stopPosition)
+ .append(", initialTermId: ")
+ .append(initialTermId)
+ .append(", segmentFileLength: ")
+ .append(segmentFileLength)
+ .append(", termBufferLength: ")
+ .append(termBufferLength)
+ .append(", mtuLength: ")
+ .append(mtuLength)
+ .append(", sessionId: ")
+ .append(sessionId)
+ .append(", streamId: ")
+ .append(streamId)
+ .append(", strippedChannel: ")
+ .append(strippedChannel)
+ .append(", originalChannel: ")
+ .append(originalChannel)
+ .append(", sourceIdentity: ")
+ .append(sourceIdentity));
+
+ lastRecordingId.set(recordingId);
+ };
+
+ final long fromRecordingId = 0L;
+ final int recordCount = 100;
+
+ final int foundCount =
+ archive.listRecordingsForUri(
+ fromRecordingId, recordCount, channel, channelStreamId, consumer);
+
+ if (foundCount == 0) {
+ throw new IllegalStateException("no recordings found");
+ }
+
+ return lastRecordingId.get();
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/MultiPubClient.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/MultiPubClient.java
new file mode 100644
index 00000000..33c93706
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/MultiPubClient.java
@@ -0,0 +1,102 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.Aeron;
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.Publication;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.MediaDriver.Context;
+import io.aeron.driver.ThreadingMode;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.agrona.BufferUtil;
+import org.agrona.concurrent.SigInt;
+import org.agrona.concurrent.UnsafeBuffer;
+import reactor.aeron.pure.archive.Utils;
+
+public class MultiPubClient {
+
+ private static final int CLIENTS = 2;
+ private static final int MESSAGE_COUNT = 100000;
+ private static final ChannelUriStringBuilder BROKER_CONTROL_CHANNEL_URI_BUILDER =
+ new ChannelUriStringBuilder()
+ .endpoint(IntermediateArchive.CONTROL_ENDPOINT)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA);
+ private static final int BROKER_CONTROL_STREAM_ID = IntermediateArchive.CONTROL_STREAM_ID;
+
+ private static final UnsafeBuffer BUFFER =
+ new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) throws InterruptedException {
+ final AtomicBoolean running = new AtomicBoolean(true);
+ SigInt.register(() -> running.set(false));
+
+ String aeronDirName = Utils.tmpFileName("aeron");
+
+ try (MediaDriver mediaDriver =
+ MediaDriver.launch(
+ new Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true));
+ Aeron aeron =
+ Aeron.connect(
+ new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName()))) {
+
+ List publications = new ArrayList<>(CLIENTS);
+
+ for (int i = 0; i < CLIENTS; i++) {
+ String channel = BROKER_CONTROL_CHANNEL_URI_BUILDER.sessionId(i).build();
+ Publication publication = aeron.addExclusivePublication(channel, BROKER_CONTROL_STREAM_ID);
+ publications.add(publication);
+ }
+
+ for (int i = 0; i < MESSAGE_COUNT && running.get(); i++) {
+
+ for (int j = 0; j < CLIENTS; j++) {
+ Publication publication = publications.get(j);
+
+ final String message = "client [" + j + "] msg: " + "hello@" + i;
+ final byte[] messageBytes = message.getBytes();
+ BUFFER.putBytes(0, messageBytes);
+
+ System.out.print("Offering " + i + "/" + MESSAGE_COUNT + " - ");
+
+ final long result = publication.offer(BUFFER, 0, messageBytes.length);
+ checkResult(result);
+ }
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+ }
+ } finally {
+
+ Utils.removeFile(aeronDirName);
+ }
+ }
+
+ private static void checkResult(final long result) {
+ if (result > 0) {
+ System.out.println("yay!");
+ } else if (result == Publication.BACK_PRESSURED) {
+ System.out.println("Offer failed due to back pressure");
+ } else if (result == Publication.ADMIN_ACTION) {
+ System.out.println("Offer failed because of an administration action in the system");
+ } else if (result == Publication.NOT_CONNECTED) {
+ System.out.println("Offer failed because publisher is not connected to subscriber");
+ } else if (result == Publication.CLOSED) {
+ System.out.println("Offer failed publication is closed");
+ } else if (result == Publication.MAX_POSITION_EXCEEDED) {
+ throw new IllegalStateException("Offer failed due to publication reaching max position");
+ } else {
+ System.out.println("Offer failed due to unknown result code: " + result);
+ }
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/PubClient.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/PubClient.java
new file mode 100644
index 00000000..ae7f95bf
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/PubClient.java
@@ -0,0 +1,91 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.Aeron;
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.Publication;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.MediaDriver.Context;
+import io.aeron.driver.ThreadingMode;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.agrona.BufferUtil;
+import org.agrona.concurrent.SigInt;
+import org.agrona.concurrent.UnsafeBuffer;
+import reactor.aeron.pure.archive.Utils;
+
+public class PubClient {
+
+ private static final String BROKER_CONTROL_CHANNEL_URI =
+ new ChannelUriStringBuilder()
+ .endpoint(SimpleBroker.BROKER_CONTROL_ENDPOINT)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int BROKER_CONTROL_STREAM_ID = SimpleBroker.BROKER_CONTROL_STREAM_ID;
+
+ private static final UnsafeBuffer BUFFER =
+ new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) throws InterruptedException {
+ String aeronDirName = Utils.tmpFileName("aeron");
+
+ try (MediaDriver mediaDriver =
+ MediaDriver.launch(
+ new Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true));
+ Aeron aeron =
+ Aeron.connect(
+ new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName()))) {
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+ SigInt.register(() -> running.set(false));
+
+ Publication publication =
+ aeron.addExclusivePublication(BROKER_CONTROL_CHANNEL_URI, BROKER_CONTROL_STREAM_ID);
+
+ int n = 100000;
+ for (int i = 0; i < n && running.get(); i++) {
+ final String message = "Hello World! " + i;
+ final byte[] messageBytes = message.getBytes();
+ BUFFER.putBytes(0, messageBytes);
+
+ System.out.print("Offering " + i + "/" + n + " - ");
+
+ final long result = publication.offer(BUFFER, 0, messageBytes.length);
+ checkResult(result);
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+ }
+ } finally {
+
+ Utils.removeFile(aeronDirName);
+ }
+ }
+
+ private static void checkResult(final long result) {
+ if (result > 0) {
+ System.out.println("yay!");
+ } else if (result == Publication.BACK_PRESSURED) {
+ System.out.println("Offer failed due to back pressure");
+ } else if (result == Publication.ADMIN_ACTION) {
+ System.out.println("Offer failed because of an administration action in the system");
+ } else if (result == Publication.NOT_CONNECTED) {
+ System.out.println("Offer failed because publisher is not connected to subscriber");
+ } else if (result == Publication.CLOSED) {
+ System.out.println("Offer failed publication is closed");
+ } else if (result == Publication.MAX_POSITION_EXCEEDED) {
+ throw new IllegalStateException("Offer failed due to publication reaching max position");
+ } else {
+ System.out.println("Offer failed due to unknown result code: " + result);
+ }
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/SimpleBroker.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/SimpleBroker.java
new file mode 100644
index 00000000..a1c889b5
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/SimpleBroker.java
@@ -0,0 +1,257 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.Aeron;
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.Subscription;
+import io.aeron.archive.Archive;
+import io.aeron.archive.ArchiveThreadingMode;
+import io.aeron.archive.ArchivingMediaDriver;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.client.RecordingDescriptorConsumer;
+import io.aeron.archive.codecs.SourceLocation;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.ThreadingMode;
+import java.time.Duration;
+import org.agrona.collections.MutableLong;
+import reactor.aeron.Configurations;
+import reactor.aeron.pure.archive.Utils;
+import reactor.core.publisher.Flux;
+
+public class SimpleBroker {
+
+ static final String BROKER_CONTROL_ENDPOINT = "localhost:7171";
+ static final int BROKER_CONTROL_STREAM_ID = 2222;
+ static final String BROKER_REPLAY_ENDPOINT = "localhost:8181";
+ static final int BROKER_REPLAY_STREAM_ID = 2223;
+
+ private static final ChannelUriStringBuilder BROKER_CONTROL_URI_BUILDER =
+ new ChannelUriStringBuilder()
+ .endpoint(BROKER_CONTROL_ENDPOINT)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA);
+
+ private static final ChannelUriStringBuilder BROKER_REPLAY_CHANNEL_URI_BUILDER =
+ new ChannelUriStringBuilder()
+ .controlEndpoint(BROKER_REPLAY_ENDPOINT)
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA);
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) throws Exception {
+ String aeronDirName = Utils.tmpFileName("aeron");
+ String archiveDirName = aeronDirName + "-archive";
+
+ try (ArchivingMediaDriver archivingMediaDriver =
+ ArchivingMediaDriver.launch(
+ new MediaDriver.Context()
+ .threadingMode(ThreadingMode.SHARED)
+ // .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true),
+ new Archive.Context()
+ .aeronDirectoryName(aeronDirName)
+ .archiveDirectoryName(archiveDirName)
+ .threadingMode(ArchiveThreadingMode.SHARED)
+ .errorHandler(Throwable::printStackTrace)
+ .fileSyncLevel(0)
+ .deleteArchiveOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(
+ new AeronArchive.Context() //
+ .aeronDirectoryName(aeronDirName)
+ // .aeron(archivingMediaDriver.archive().context().aeron())
+ // .controlResponseChannel("aeron:udp?endpoint=localhost:8021")
+ // .controlResponseStreamId(18021)
+ // .ownsAeronClient(true)
+ ); ) {
+ print(archivingMediaDriver);
+
+ Aeron aeron = aeronArchive.context().aeron();
+
+ aeron.addSubscription(
+ BROKER_REPLAY_CHANNEL_URI_BUILDER.build(),
+ BROKER_REPLAY_STREAM_ID,
+ Configurations::printAvailableImage,
+ Configurations::printUnavailableImage);
+
+ Subscription subscription =
+ aeron.addSubscription(
+ BROKER_CONTROL_URI_BUILDER.build(),
+ BROKER_CONTROL_STREAM_ID,
+ image -> {
+ Configurations.printAvailableImage(image);
+
+ String recordingChannel =
+ new ChannelUriStringBuilder()
+ .endpoint(BROKER_CONTROL_ENDPOINT)
+ .sessionId(image.sessionId())
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+
+ long subscriptionId =
+ aeronArchive.startRecording(
+ recordingChannel, BROKER_CONTROL_STREAM_ID, SourceLocation.REMOTE);
+
+ System.out.println(
+ "Created recording subscriptionId: "
+ + subscriptionId
+ + ", for channel: "
+ + recordingChannel
+ + ", streamId: "
+ + BROKER_CONTROL_STREAM_ID);
+
+ long recordingId =
+ findLatestRecording(aeronArchive, recordingChannel, BROKER_CONTROL_STREAM_ID);
+
+ System.out.println("Found recording id: " + recordingId);
+
+ String replayChannel =
+ BROKER_REPLAY_CHANNEL_URI_BUILDER.sessionId(image.sessionId()).build();
+ int replayStreamId = BROKER_REPLAY_STREAM_ID;
+
+ final long sessionId =
+ aeronArchive.startReplay(
+ recordingId, 0, Long.MAX_VALUE, replayChannel, replayStreamId);
+
+ System.out.println("Started replay session id: " + sessionId);
+
+ System.out.println(
+ "Started replaying, recordingId: "
+ + recordingId
+ + ", replay channel: "
+ + replayChannel
+ + ", streamId: "
+ + replayStreamId);
+ },
+ Configurations::printUnavailableImage);
+
+ Flux.interval(Duration.ofSeconds(5))
+ .doOnNext(
+ i -> {
+ System.out.println("------------------- listRecordings -------------------");
+
+ aeronArchive.listRecording(
+ 0,
+ (controlSessionId,
+ correlationId,
+ recordingId,
+ startTimestamp,
+ stopTimestamp,
+ startPosition,
+ stopPosition,
+ initialTermId,
+ segmentFileLength,
+ termBufferLength,
+ mtuLength,
+ sessionId,
+ streamId,
+ strippedChannel,
+ originalChannel,
+ sourceIdentity) -> {
+ System.out.println(
+ new StringBuilder()
+ .append("controlSessionId: ")
+ .append(controlSessionId)
+ .append(", correlationId: ")
+ .append(correlationId)
+ .append(", recordingId: ")
+ .append(recordingId)
+ .append(", startTimestamp: ")
+ .append(startTimestamp)
+ .append(", stopTimestamp: ")
+ .append(stopTimestamp)
+ .append(", startPosition: ")
+ .append(startPosition)
+ .append(", stopPosition: ")
+ .append(stopPosition)
+ .append(", initialTermId: ")
+ .append(initialTermId)
+ .append(", segmentFileLength: ")
+ .append(segmentFileLength)
+ .append(", termBufferLength: ")
+ .append(termBufferLength)
+ .append(", mtuLength: ")
+ .append(mtuLength)
+ .append(", sessionId: ")
+ .append(sessionId)
+ .append(", streamId: ")
+ .append(streamId)
+ .append(", strippedChannel: ")
+ .append(strippedChannel)
+ .append(", originalChannel: ")
+ .append(originalChannel)
+ .append(", sourceIdentity: ")
+ .append(sourceIdentity));
+ });
+ System.out.println("------------------------------------------------------");
+ })
+ .blockLast();
+ } finally {
+ Utils.removeFile(archiveDirName);
+ Utils.removeFile(aeronDirName);
+ }
+ }
+
+ private static void print(ArchivingMediaDriver archivingMediaDriver) {
+ MediaDriver mediaDriver = archivingMediaDriver.mediaDriver();
+ Archive archive = archivingMediaDriver.archive();
+ Archive.Context context = archivingMediaDriver.archive().context();
+
+ System.out.println("Archive threadingMode: " + context.threadingMode());
+ System.out.println("Archive controlChannel: " + context.controlChannel());
+ System.out.println("Archive controlStreamId: " + context.controlStreamId());
+ System.out.println("Archive localControlChannel: " + context.localControlChannel());
+ System.out.println("Archive localControlStreamId: " + context.localControlStreamId());
+ System.out.println("Archive recordingEventsChannel: " + context.recordingEventsChannel());
+ System.out.println("Archive recordingEventsStreamId: " + context.recordingEventsStreamId());
+ System.out.println("Archive controlTermBufferSparse: " + context.controlTermBufferSparse());
+ System.out.println("Archive archiveDirName: " + archive.context().archiveDirectoryName());
+ System.out.println("Archive aeronDirectoryName: " + mediaDriver.aeronDirectoryName());
+
+ System.out.println("Archive listen: " + BROKER_CONTROL_URI_BUILDER);
+ }
+
+ private static long findLatestRecording(
+ final AeronArchive archive, String channel, int channelStreamId) {
+ final MutableLong lastRecordingId = new MutableLong();
+
+ final RecordingDescriptorConsumer consumer =
+ (controlSessionId,
+ correlationId,
+ recordingId,
+ startTimestamp,
+ stopTimestamp,
+ startPosition,
+ stopPosition,
+ initialTermId,
+ segmentFileLength,
+ termBufferLength,
+ mtuLength,
+ sessionId,
+ streamId,
+ strippedChannel,
+ originalChannel,
+ sourceIdentity) -> lastRecordingId.set(recordingId);
+
+ final long fromRecordingId = 0L;
+ final int recordCount = 100;
+
+ final int foundCount =
+ archive.listRecordingsForUri(
+ fromRecordingId, recordCount, channel, channelStreamId, consumer);
+
+ if (foundCount == 0) {
+ throw new IllegalStateException("no recordings found");
+ }
+
+ return lastRecordingId.get();
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/SubClient.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/SubClient.java
new file mode 100644
index 00000000..73c15709
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/SubClient.java
@@ -0,0 +1,86 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.Aeron;
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.Subscription;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.MediaDriver.Context;
+import io.aeron.driver.ThreadingMode;
+import java.time.Duration;
+import reactor.aeron.Configurations;
+import reactor.aeron.pure.archive.Utils;
+import reactor.core.publisher.Flux;
+
+public class SubClient {
+
+ private static final String CHANNEL =
+ new ChannelUriStringBuilder()
+ .controlEndpoint(SimpleBroker.BROKER_REPLAY_ENDPOINT)
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ // .sessionId(-1328843850 /*todo NOTICE: always need to change!!!*/)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int STREAM_ID = SimpleBroker.BROKER_REPLAY_STREAM_ID;
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) {
+ String aeronDirName = Utils.tmpFileName("aeron");
+
+ try (MediaDriver mediaDriver =
+ MediaDriver.launch(
+ new Context()
+ .threadingMode(ThreadingMode.SHARED)
+ // .spiesSimulateConnection(false)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true));
+ Aeron aeron =
+ Aeron.connect(
+ new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName()));
+ Subscription subscription =
+ aeron.addSubscription(
+ CHANNEL,
+ STREAM_ID,
+ Configurations::printAvailableImage,
+ Configurations::printUnavailableImage)) {
+
+ System.out.println("Created subscription: " + CHANNEL + ", streamId: " + STREAM_ID);
+
+ Flux.interval(Duration.ofMillis(100))
+ .doOnNext(
+ i ->
+ subscription.poll(
+ (buffer, offset, length, header) -> {
+ final byte[] data = new byte[length];
+ buffer.getBytes(offset, data);
+
+ System.out.println(
+ String.format(
+ "msg{ offset: %s, length: %s, body: %s }, header{ pos: %s, offset: %s, type: %s }, channel { stream: %s, session: %s, initialTermId: %s, termId: %s, termOffset: %s, flags: %s }",
+ offset,
+ length,
+ new String(data),
+ header.position(),
+ header.offset(),
+ header.type(),
+ STREAM_ID,
+ header.sessionId(),
+ header.initialTermId(),
+ header.termId(),
+ header.termOffset(),
+ header.flags()));
+ },
+ 10))
+ .blockLast();
+
+ } finally {
+ Utils.removeFile(aeronDirName);
+ }
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/SubReplayClient.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/SubReplayClient.java
new file mode 100644
index 00000000..dc9ef0e0
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/SubReplayClient.java
@@ -0,0 +1,92 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.Subscription;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.MediaDriver.Context;
+import io.aeron.driver.ThreadingMode;
+import java.time.Duration;
+import reactor.aeron.Configurations;
+import reactor.aeron.pure.archive.Utils;
+import reactor.core.publisher.Flux;
+
+public class SubReplayClient {
+
+ private static final String CHANNEL =
+ new ChannelUriStringBuilder()
+ .controlEndpoint(SimpleBroker.BROKER_REPLAY_ENDPOINT)
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int STREAM_ID =
+ SimpleBroker.BROKER_REPLAY_STREAM_ID
+ + 1; // todo to start from specify position (not just listen to current messages)
+ private static final long RECORDING_ID = 0L; // todo NOTICE change it if needed
+ private static final long POSITION = 0L; // todo NOTICE change it if needed
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) {
+ String aeronDirName = Utils.tmpFileName("aeron");
+
+ try (MediaDriver mediaDriver =
+ MediaDriver.launch(
+ new Context()
+ .threadingMode(ThreadingMode.SHARED)
+ // .spiesSimulateConnection(false)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(
+ new AeronArchive.Context()
+ .controlResponseChannel("aeron:udp?endpoint=localhost:8022")
+ .controlResponseStreamId(18022)
+ .aeronDirectoryName(aeronDirName));
+ Subscription subscription =
+ aeronArchive.replay(
+ RECORDING_ID,
+ POSITION,
+ Long.MAX_VALUE,
+ CHANNEL,
+ STREAM_ID,
+ Configurations::printAvailableImage,
+ Configurations::printUnavailableImage); ) {
+
+ System.out.println("Created subscription: " + CHANNEL + ", streamId: " + STREAM_ID);
+
+ Flux.interval(Duration.ofMillis(100))
+ .doOnNext(
+ i ->
+ subscription.poll(
+ (buffer, offset, length, header) -> {
+ final byte[] data = new byte[length];
+ buffer.getBytes(offset, data);
+
+ System.out.println(
+ String.format(
+ "Message to stream %d from session %d (%d@%d) <<%s>>, header{ pos: %s, offset: %s, termOffset: %s, type: %s}",
+ STREAM_ID,
+ header.sessionId(),
+ length,
+ offset,
+ new String(data),
+ header.position(),
+ header.offset(),
+ header.termOffset(),
+ header.type()));
+ },
+ 10))
+ .blockLast();
+
+ } finally {
+ Utils.removeFile(aeronDirName);
+ }
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/TruncatedArchive.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/TruncatedArchive.java
new file mode 100644
index 00000000..c59ee3a9
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/TruncatedArchive.java
@@ -0,0 +1,160 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.Aeron;
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.ExclusivePublication;
+import io.aeron.Publication;
+import io.aeron.archive.Archive;
+import io.aeron.archive.ArchiveThreadingMode;
+import io.aeron.archive.ArchivingMediaDriver;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.codecs.SourceLocation;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.ThreadingMode;
+import java.time.Duration;
+import org.agrona.BufferUtil;
+import org.agrona.concurrent.UnsafeBuffer;
+import reactor.aeron.pure.archive.Utils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class TruncatedArchive {
+
+ static final String OUTGOING_ENDPOINT = "localhost:8181";
+ static final int OUTGOING_STREAM_ID = 2223;
+
+ static final String OUTGOING_URI =
+ new ChannelUriStringBuilder()
+ .controlEndpoint(OUTGOING_ENDPOINT)
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int MESSAGE_SIZE = 256;
+ private static final UnsafeBuffer BUFFER =
+ new UnsafeBuffer(BufferUtil.allocateDirectAligned(MESSAGE_SIZE, 64));
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) throws Exception {
+ String aeronDirName = Utils.tmpFileName("aeron");
+ String archiveDirName = aeronDirName + "-archive";
+
+ try (ArchivingMediaDriver archivingMediaDriver =
+ ArchivingMediaDriver.launch(
+ new MediaDriver.Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true),
+ new Archive.Context()
+ .aeronDirectoryName(aeronDirName)
+ .archiveDirectoryName(archiveDirName)
+ .threadingMode(ArchiveThreadingMode.SHARED)
+ .errorHandler(Throwable::printStackTrace)
+ .fileSyncLevel(0)
+ .deleteArchiveOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(new AeronArchive.Context().aeronDirectoryName(aeronDirName))) {
+ print(archivingMediaDriver);
+
+ Aeron aeron = aeronArchive.context().aeron();
+
+ ExclusivePublication outgoingPublication =
+ aeron.addExclusivePublication(OUTGOING_URI, OUTGOING_STREAM_ID);
+ aeronArchive.startRecording(OUTGOING_URI, OUTGOING_STREAM_ID, SourceLocation.LOCAL);
+
+ long recordingId =
+ Utils.findRecording(aeronArchive, OUTGOING_URI, OUTGOING_STREAM_ID, 0, 100)
+ .log("found recordings ")
+ .blockLast()
+ .recordingId;
+
+ Flux.interval(Duration.ofSeconds(1))
+ .doOnNext(
+ i -> {
+ final String message = "hello@" + i;
+ final byte[] messageBytes = message.getBytes();
+ BUFFER.putBytes(0, messageBytes);
+
+ final long result = outgoingPublication.offer(BUFFER, 0, messageBytes.length);
+
+ System.out.print(
+ "Sent " + i + ", current position: " + outgoingPublication.position() + " - ");
+ checkResult(result);
+ })
+ .subscribe();
+
+ Mono.delay(Duration.ofSeconds(10))
+ .doOnSuccess(
+ $ -> {
+ aeronArchive.stopRecording(OUTGOING_URI, OUTGOING_STREAM_ID);
+
+ long recordingPosition = aeronArchive.getRecordingPosition(recordingId);
+ long stopPosition = aeronArchive.getStopPosition(recordingId);
+ // long truncatedPosition = stopPosition / 2;
+ long truncatedPosition = 64;
+
+ System.out.println("recordingPosition = " + recordingPosition);
+ System.out.println("stopPosition = " + stopPosition);
+ System.out.println("truncatedPosition = " + truncatedPosition);
+
+ aeronArchive.truncateRecording(recordingId, truncatedPosition);
+
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ aeronArchive.startRecording(OUTGOING_URI, OUTGOING_STREAM_ID, SourceLocation.LOCAL);
+ })
+ .subscribe();
+
+ Thread.currentThread().join();
+ } finally {
+ Utils.removeFile(archiveDirName);
+ Utils.removeFile(aeronDirName);
+ }
+ }
+
+ private static void print(ArchivingMediaDriver archivingMediaDriver) {
+ MediaDriver mediaDriver = archivingMediaDriver.mediaDriver();
+ Archive archive = archivingMediaDriver.archive();
+ Archive.Context context = archivingMediaDriver.archive().context();
+
+ System.out.println("Archive threadingMode: " + context.threadingMode());
+ System.out.println("Archive controlChannel: " + context.controlChannel());
+ System.out.println("Archive controlStreamId: " + context.controlStreamId());
+ System.out.println("Archive localControlChannel: " + context.localControlChannel());
+ System.out.println("Archive localControlStreamId: " + context.localControlStreamId());
+ System.out.println("Archive recordingEventsChannel: " + context.recordingEventsChannel());
+ System.out.println("Archive recordingEventsStreamId: " + context.recordingEventsStreamId());
+ System.out.println("Archive controlTermBufferSparse: " + context.controlTermBufferSparse());
+ System.out.println("Archive archiveDirName: " + archive.context().archiveDirectoryName());
+ System.out.println("Archive aeronDirectoryName: " + mediaDriver.aeronDirectoryName());
+ }
+
+ private static void checkResult(final long result) {
+ if (result > 0) {
+ System.out.println("yay!");
+ } else if (result == Publication.BACK_PRESSURED) {
+ System.out.println("Offer failed due to back pressure");
+ } else if (result == Publication.ADMIN_ACTION) {
+ System.out.println("Offer failed because of an administration action in the system");
+ } else if (result == Publication.NOT_CONNECTED) {
+ System.out.println("Offer failed because publisher is not connected to subscriber");
+ } else if (result == Publication.CLOSED) {
+ System.out.println("Offer failed publication is closed");
+ } else if (result == Publication.MAX_POSITION_EXCEEDED) {
+ throw new IllegalStateException("Offer failed due to publication reaching max position");
+ } else {
+ System.out.println("Offer failed due to unknown result code: " + result);
+ }
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/TruncatedArchiveHandler.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/TruncatedArchiveHandler.java
new file mode 100644
index 00000000..06063656
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/TruncatedArchiveHandler.java
@@ -0,0 +1,195 @@
+package reactor.aeron.pure.archive.examples;
+
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.Subscription;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.client.RecordingDescriptorConsumer;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.MediaDriver.Context;
+import io.aeron.driver.ThreadingMode;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.agrona.collections.MutableLong;
+import org.agrona.concurrent.SigInt;
+import org.agrona.concurrent.YieldingIdleStrategy;
+import reactor.aeron.pure.archive.Utils;
+import reactor.aeron.pure.archive.Utils.RecordingDescriptor;
+
+public class TruncatedArchiveHandler {
+
+ private static final String REPLAY_URI =
+ new ChannelUriStringBuilder()
+ .controlEndpoint(TruncatedArchive.OUTGOING_ENDPOINT)
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int REPLAY_STREAM_ID = 2225;
+ private static final int FRAGMENT_LIMIT = 10;
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) throws Exception {
+ final AtomicBoolean running = new AtomicBoolean(true);
+ SigInt.register(() -> running.set(false));
+
+ String aeronDirName = Utils.tmpFileName("aeron");
+
+ try (MediaDriver mediaDriver =
+ MediaDriver.launch(
+ new Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(
+ new AeronArchive.Context()
+ .controlResponseChannel(
+ new ChannelUriStringBuilder()
+ .controlEndpoint("localhost:8028")
+ .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build())
+ .controlResponseStreamId(18028)
+ .aeronDirectoryName(aeronDirName))) {
+
+ RecordingDescriptor recording =
+ Utils.findRecording(
+ aeronArchive,
+ TruncatedArchive.OUTGOING_URI,
+ TruncatedArchive.OUTGOING_STREAM_ID,
+ 0,
+ 100)
+ .log("found recordings ")
+ .blockLast();
+
+ Subscription subscription =
+ aeronArchive.replay(
+ recording.recordingId,
+ recording.startPosition,
+ Long.MAX_VALUE,
+ REPLAY_URI,
+ REPLAY_STREAM_ID);
+
+ YieldingIdleStrategy idleStrategy = new YieldingIdleStrategy();
+
+ try {
+ while (running.get()) {
+ int works =
+ subscription.poll(
+ (buffer, offset, length, header) -> {
+ final byte[] data = new byte[length];
+ buffer.getBytes(offset, data);
+
+ System.out.println(
+ String.format(
+ "msg{ offset: %s, length: %s, body: %s }, header{ pos: %s, offset: %s, type: %s }, channel { stream: %s, session: %s, initialTermId: %s, termId: %s, termOffset: %s, flags: %s }",
+ offset,
+ length,
+ new String(data),
+ header.position(),
+ header.offset(),
+ header.type(),
+ TruncatedArchive.OUTGOING_STREAM_ID,
+ header.sessionId(),
+ header.initialTermId(),
+ header.termId(),
+ header.termOffset(),
+ header.flags()));
+ },
+ FRAGMENT_LIMIT)
+ > 0
+ ? 1
+ : 0;
+
+ idleStrategy.idle(works);
+ }
+ } finally {
+ aeronArchive.stopReplay(subscription.images().get(0).sessionId());
+ }
+
+ Thread.currentThread().join();
+ } finally {
+ Utils.removeFile(aeronDirName);
+ }
+ }
+
+ private static long findLatestRecording(
+ final AeronArchive archive, String channel, int channelStreamId) {
+ final MutableLong lastRecordingId = new MutableLong();
+
+ final RecordingDescriptorConsumer consumer =
+ (controlSessionId,
+ correlationId,
+ recordingId,
+ startTimestamp,
+ stopTimestamp,
+ startPosition,
+ stopPosition,
+ initialTermId,
+ segmentFileLength,
+ termBufferLength,
+ mtuLength,
+ sessionId,
+ streamId,
+ strippedChannel,
+ originalChannel,
+ sourceIdentity) -> {
+ System.out.println(
+ new StringBuilder()
+ .append("controlSessionId: ")
+ .append(controlSessionId)
+ .append(", correlationId: ")
+ .append(correlationId)
+ .append(", recordingId: ")
+ .append(recordingId)
+ .append(", startTimestamp: ")
+ .append(startTimestamp)
+ .append(", stopTimestamp: ")
+ .append(stopTimestamp)
+ .append(", startPosition: ")
+ .append(startPosition)
+ .append(", stopPosition: ")
+ .append(stopPosition)
+ .append(", initialTermId: ")
+ .append(initialTermId)
+ .append(", segmentFileLength: ")
+ .append(segmentFileLength)
+ .append(", termBufferLength: ")
+ .append(termBufferLength)
+ .append(", mtuLength: ")
+ .append(mtuLength)
+ .append(", sessionId: ")
+ .append(sessionId)
+ .append(", streamId: ")
+ .append(streamId)
+ .append(", strippedChannel: ")
+ .append(strippedChannel)
+ .append(", originalChannel: ")
+ .append(originalChannel)
+ .append(", sourceIdentity: ")
+ .append(sourceIdentity));
+
+ lastRecordingId.set(recordingId);
+ };
+
+ final long fromRecordingId = 0L;
+ final int recordCount = 100;
+
+ final int foundCount =
+ archive.listRecordingsForUri(
+ fromRecordingId, recordCount, channel, channelStreamId, consumer);
+
+ if (foundCount == 0) {
+ throw new IllegalStateException("no recordings found");
+ }
+
+ return lastRecordingId.get();
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/ExtendedOperation.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/ExtendedOperation.java
new file mode 100644
index 00000000..c6bd3645
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/ExtendedOperation.java
@@ -0,0 +1,112 @@
+package reactor.aeron.pure.archive.examples.recording.simple;
+
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.ExclusivePublication;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.codecs.SourceLocation;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.MediaDriver.Context;
+import io.aeron.driver.ThreadingMode;
+import java.time.Duration;
+import reactor.aeron.pure.archive.Utils;
+import reactor.core.publisher.Flux;
+
+public class ExtendedOperation {
+
+ private static final String CONTROL_RESPONSE_CHANNEL_URI =
+ new ChannelUriStringBuilder()
+ .endpoint("localhost:8485")
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int CONTROL_RESPONSE_STREAM_ID = 8485;
+
+ private static final String TARGET_RECORDING_URI = RecordingServer.INCOMING_RECORDING_URI;
+ private static final int TARGET_RECORDING_STREAM_ID =
+ RecordingServer.INCOMING_RECORDING_STREAM_ID;
+
+ private static final ChannelUriStringBuilder EXTENDED_RECORDING_URI_BUILDER =
+ new ChannelUriStringBuilder()
+ .endpoint("localhost:8486")
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA);
+ private static final int EXTENDED_RECORDING_STREAM_ID = 7777;
+
+ private static final Duration SENT_INTERVAL = Duration.ofSeconds(1);
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) throws InterruptedException {
+ String aeronDirName = Utils.tmpFileName("aeron");
+
+ try (MediaDriver mediaDriver =
+ MediaDriver.launch(
+ new Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(
+ new AeronArchive.Context()
+ .controlResponseChannel(CONTROL_RESPONSE_CHANNEL_URI)
+ .controlResponseStreamId(CONTROL_RESPONSE_STREAM_ID)
+ .aeronDirectoryName(aeronDirName))) {
+
+ Flux.interval(Duration.ofSeconds(5))
+ .flatMap(
+ i ->
+ Utils.findRecording(
+ aeronArchive, TARGET_RECORDING_URI, TARGET_RECORDING_STREAM_ID, 0, 1000))
+ .distinct(recordingDescriptor -> recordingDescriptor.recordingId)
+ .log("fondRecording ")
+ .take(1)
+ .doOnNext(
+ recording -> {
+ aeronArchive.stopRecording(recording.originalChannel, recording.streamId);
+
+ aeronArchive.extendRecording(
+ recording.recordingId,
+ EXTENDED_RECORDING_URI_BUILDER.build(),
+ EXTENDED_RECORDING_STREAM_ID,
+ SourceLocation.REMOTE);
+
+ long stopPosition = aeronArchive.getStopPosition(recording.recordingId);
+
+ String channel =
+ EXTENDED_RECORDING_URI_BUILDER
+ .initialPosition(
+ stopPosition, recording.initialTermId, recording.termBufferLength)
+ .build();
+
+ System.out.println(
+ "Creating publication to "
+ + channel
+ + "stream id: "
+ + EXTENDED_RECORDING_STREAM_ID);
+
+ ExclusivePublication publication =
+ aeronArchive
+ .context()
+ .aeron()
+ .addExclusivePublication(channel, EXTENDED_RECORDING_STREAM_ID);
+
+ Flux.interval(SENT_INTERVAL)
+ .map(i -> "extended msg " + i)
+ .doOnNext(message -> Utils.send(publication, message))
+ .subscribe();
+ })
+ .subscribe();
+
+ Thread.currentThread().join();
+
+ } finally {
+ Utils.removeFile(aeronDirName);
+ }
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/RecordingServer.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/RecordingServer.java
new file mode 100644
index 00000000..ff31e32e
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/RecordingServer.java
@@ -0,0 +1,107 @@
+package reactor.aeron.pure.archive.examples.recording.simple;
+
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.archive.Archive;
+import io.aeron.archive.ArchiveThreadingMode;
+import io.aeron.archive.ArchivingMediaDriver;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.codecs.SourceLocation;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.ThreadingMode;
+import java.time.Duration;
+import reactor.aeron.pure.archive.Utils;
+import reactor.core.publisher.Flux;
+
+public class RecordingServer {
+
+ static final String INCOMING_RECORDING_ENDPOINT = "localhost:7373";
+ static final int INCOMING_RECORDING_STREAM_ID = 3333;
+
+ static final String INCOMING_RECORDING_URI =
+ new ChannelUriStringBuilder()
+ .endpoint(INCOMING_RECORDING_ENDPOINT)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) {
+ String aeronDirName = Utils.tmpFileName("aeron");
+ String archiveDirName = aeronDirName + "-archive";
+
+ try (ArchivingMediaDriver archivingMediaDriver =
+ ArchivingMediaDriver.launch(
+ new MediaDriver.Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true),
+ new Archive.Context()
+ .aeronDirectoryName(aeronDirName)
+ .archiveDirectoryName(archiveDirName)
+ .threadingMode(ArchiveThreadingMode.SHARED)
+ .errorHandler(Throwable::printStackTrace)
+ .fileSyncLevel(0)
+ .deleteArchiveOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(new AeronArchive.Context().aeronDirectoryName(aeronDirName))) {
+ print(archivingMediaDriver);
+
+ // it's important to bind necessary port
+ aeronArchive
+ .context()
+ .aeron()
+ .addSubscription(INCOMING_RECORDING_URI, INCOMING_RECORDING_STREAM_ID);
+
+ System.out.println(
+ "Creating recording to "
+ + INCOMING_RECORDING_URI
+ + ", stream id: "
+ + INCOMING_RECORDING_STREAM_ID);
+
+ aeronArchive.startRecording(
+ INCOMING_RECORDING_URI, INCOMING_RECORDING_STREAM_ID, SourceLocation.REMOTE);
+
+ Flux.interval(Duration.ofSeconds(5))
+ .flatMap(
+ i ->
+ Utils.findRecording(
+ aeronArchive, INCOMING_RECORDING_URI, INCOMING_RECORDING_STREAM_ID, 0, 1000))
+ .distinct(recordingDescriptor -> recordingDescriptor.recordingId)
+ .log("fondRecording ")
+ .blockLast();
+ } finally {
+ Utils.removeFile(archiveDirName);
+ Utils.removeFile(aeronDirName);
+ }
+ }
+
+ private static void print(ArchivingMediaDriver archivingMediaDriver) {
+ MediaDriver mediaDriver = archivingMediaDriver.mediaDriver();
+ Archive archive = archivingMediaDriver.archive();
+ Archive.Context context = archivingMediaDriver.archive().context();
+
+ System.out.println("Archive threadingMode: " + context.threadingMode());
+ System.out.println("Archive controlChannel: " + context.controlChannel());
+ System.out.println("Archive controlStreamId: " + context.controlStreamId());
+ System.out.println("Archive localControlChannel: " + context.localControlChannel());
+ System.out.println("Archive localControlStreamId: " + context.localControlStreamId());
+ System.out.println("Archive recordingEventsChannel: " + context.recordingEventsChannel());
+ System.out.println("Archive recordingEventsStreamId: " + context.recordingEventsStreamId());
+ System.out.println("Archive controlTermBufferSparse: " + context.controlTermBufferSparse());
+ System.out.println("Archive archiveDirName: " + archive.context().archiveDirectoryName());
+ System.out.println("Archive aeronDirectoryName: " + mediaDriver.aeronDirectoryName());
+
+ System.out.println(
+ "Archive listen: "
+ + INCOMING_RECORDING_ENDPOINT
+ + ", streamId: "
+ + INCOMING_RECORDING_STREAM_ID);
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/ReplayOperation.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/ReplayOperation.java
new file mode 100644
index 00000000..973090a4
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/ReplayOperation.java
@@ -0,0 +1,121 @@
+package reactor.aeron.pure.archive.examples.recording.simple;
+
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.Subscription;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.MediaDriver.Context;
+import io.aeron.driver.ThreadingMode;
+import java.time.Duration;
+import reactor.aeron.Configurations;
+import reactor.aeron.pure.archive.Utils;
+import reactor.core.publisher.Flux;
+
+public class ReplayOperation {
+
+ private static final String CONTROL_RESPONSE_CHANNEL_URI =
+ new ChannelUriStringBuilder()
+ .endpoint("localhost:8484")
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int CONTROL_RESPONSE_STREAM_ID = 8484;
+
+ private static final String TARGET_RECORDING_URI = RecordingServer.INCOMING_RECORDING_URI;
+ private static final int TARGET_RECORDING_STREAM_ID =
+ RecordingServer.INCOMING_RECORDING_STREAM_ID;
+
+ private static final int REPLAY_STREAM_ID = 7474;
+ private static final String REPLAY_URI =
+ new ChannelUriStringBuilder()
+ .endpoint("localhost:7474")
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int FRAGMENT_LIMIT = 1000;
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) {
+ String aeronDirName = Utils.tmpFileName("aeron");
+
+ try (MediaDriver mediaDriver =
+ MediaDriver.launch(
+ new Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(
+ new AeronArchive.Context()
+ .controlResponseChannel(CONTROL_RESPONSE_CHANNEL_URI)
+ .controlResponseStreamId(CONTROL_RESPONSE_STREAM_ID)
+ .aeronDirectoryName(aeronDirName))) {
+
+ Flux.interval(Duration.ofSeconds(5))
+ .flatMap(
+ i ->
+ Utils.findRecording(
+ aeronArchive, TARGET_RECORDING_URI, TARGET_RECORDING_STREAM_ID, 0, 1000))
+ .distinct(recordingDescriptor -> recordingDescriptor.recordingId)
+ .log("fondRecording ")
+ // check if the recording is ready for recording (truncated case)
+ .filter(
+ recording ->
+ recording.stopPosition == -1 || recording.startPosition < recording.stopPosition)
+ .subscribe(
+ recording ->
+ aeronArchive.startReplay(
+ recording.recordingId,
+ recording.startPosition,
+ AeronArchive.NULL_LENGTH,
+ REPLAY_URI,
+ REPLAY_STREAM_ID));
+
+ System.out.println(
+ "Creating subscription to " + REPLAY_URI + ", stream id: " + REPLAY_STREAM_ID);
+
+ Subscription subscription =
+ aeronArchive
+ .context()
+ .aeron()
+ .addSubscription(
+ REPLAY_URI,
+ REPLAY_STREAM_ID,
+ Configurations::printAvailableImage,
+ Configurations::printUnavailableImage);
+
+ Flux.interval(Duration.ofSeconds(1))
+ .doOnNext(
+ i ->
+ subscription.poll(
+ (buffer, offset, length, header) -> {
+ final byte[] data = new byte[length];
+ buffer.getBytes(offset, data);
+
+ System.out.println(
+ String.format(
+ "Message from session %d (%d@%d) <<%s>>, header{ pos: %s, offset: %s, termOffset: %s, type: %s}",
+ header.sessionId(),
+ length,
+ offset,
+ new String(data),
+ header.position(),
+ header.offset(),
+ header.termOffset(),
+ header.type()));
+ },
+ FRAGMENT_LIMIT))
+ .blockLast();
+
+ } finally {
+ Utils.removeFile(aeronDirName);
+ }
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/Sender.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/Sender.java
new file mode 100644
index 00000000..9a15c0f3
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/Sender.java
@@ -0,0 +1,59 @@
+package reactor.aeron.pure.archive.examples.recording.simple;
+
+import io.aeron.Aeron;
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.Publication;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.MediaDriver.Context;
+import io.aeron.driver.ThreadingMode;
+import java.time.Duration;
+import reactor.aeron.pure.archive.Utils;
+import reactor.core.publisher.Flux;
+
+public class Sender {
+
+ private static final String OUTBOUND_CHANNEL_URI =
+ new ChannelUriStringBuilder()
+ .endpoint(RecordingServer.INCOMING_RECORDING_ENDPOINT)
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int OUTBOUND_STREAM_ID = RecordingServer.INCOMING_RECORDING_STREAM_ID;
+
+ private static final Duration SENT_INTERVAL = Duration.ofSeconds(1);
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) {
+ String aeronDirName = Utils.tmpFileName("aeron");
+
+ try (MediaDriver mediaDriver =
+ MediaDriver.launch(
+ new Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true));
+ Aeron aeron =
+ Aeron.connect(
+ new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName()))) {
+
+ System.out.println(
+ "Creating publication to " + OUTBOUND_CHANNEL_URI + ", stream id: " + OUTBOUND_STREAM_ID);
+
+ Publication publication =
+ aeron.addExclusivePublication(OUTBOUND_CHANNEL_URI, OUTBOUND_STREAM_ID);
+
+ Flux.interval(SENT_INTERVAL)
+ .map(i -> "Hello World! " + i)
+ .doOnNext(message -> Utils.send(publication, message))
+ .blockLast();
+ } finally {
+ Utils.removeFile(aeronDirName);
+ }
+ }
+}
diff --git a/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/TruncatedOperation.java b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/TruncatedOperation.java
new file mode 100644
index 00000000..a8fbc00d
--- /dev/null
+++ b/reactor-aeron-benchmarks/src/main/java/reactor/aeron/pure/archive/examples/recording/simple/TruncatedOperation.java
@@ -0,0 +1,72 @@
+package reactor.aeron.pure.archive.examples.recording.simple;
+
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.CommonContext;
+import io.aeron.archive.client.AeronArchive;
+import io.aeron.archive.codecs.SourceLocation;
+import io.aeron.driver.MediaDriver;
+import io.aeron.driver.MediaDriver.Context;
+import io.aeron.driver.ThreadingMode;
+import java.time.Duration;
+import reactor.aeron.pure.archive.Utils;
+import reactor.core.publisher.Flux;
+
+public class TruncatedOperation {
+
+ private static final String CONTROL_RESPONSE_CHANNEL_URI =
+ new ChannelUriStringBuilder()
+ .endpoint("localhost:8485")
+ .reliable(Boolean.TRUE)
+ .media(CommonContext.UDP_MEDIA)
+ .build();
+ private static final int CONTROL_RESPONSE_STREAM_ID = 8485;
+
+ private static final String TARGET_RECORDING_URI = RecordingServer.INCOMING_RECORDING_URI;
+ private static final int TARGET_RECORDING_STREAM_ID =
+ RecordingServer.INCOMING_RECORDING_STREAM_ID;
+
+ /**
+ * Main runner.
+ *
+ * @param args program arguments.
+ */
+ public static void main(String[] args) {
+ String aeronDirName = Utils.tmpFileName("aeron");
+
+ try (MediaDriver mediaDriver =
+ MediaDriver.launch(
+ new Context()
+ .threadingMode(ThreadingMode.SHARED)
+ .spiesSimulateConnection(true)
+ .errorHandler(Throwable::printStackTrace)
+ .aeronDirectoryName(aeronDirName)
+ .dirDeleteOnStart(true));
+ AeronArchive aeronArchive =
+ AeronArchive.connect(
+ new AeronArchive.Context()
+ .controlResponseChannel(CONTROL_RESPONSE_CHANNEL_URI)
+ .controlResponseStreamId(CONTROL_RESPONSE_STREAM_ID)
+ .aeronDirectoryName(aeronDirName))) {
+
+ Flux.interval(Duration.ofSeconds(5))
+ .flatMap(
+ i ->
+ Utils.findRecording(
+ aeronArchive, TARGET_RECORDING_URI, TARGET_RECORDING_STREAM_ID, 0, 1000))
+ .distinct(recordingDescriptor -> recordingDescriptor.recordingId)
+ .log("fondRecording ")
+ .doOnNext(
+ recording -> {
+ aeronArchive.stopRecording(recording.originalChannel, recording.streamId);
+
+ aeronArchive.truncateRecording(recording.recordingId, recording.startPosition);
+
+ aeronArchive.startRecording(
+ recording.originalChannel, recording.streamId, SourceLocation.REMOTE);
+ })
+ .blockLast();
+ } finally {
+ Utils.removeFile(aeronDirName);
+ }
+ }
+}