diff --git a/README.md b/README.md index 35ebea9..ead0b57 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,22 @@ visited `MkvElement` for each constituent visitor in the order in which the visi `CopyVisitor` is a visitor used to copy the raw bytes of the Mkv elements in a stream to an output stream. +## ResponseStreamConsumers +The `GetMediaResponseStreamConsumer` is an abstract class used to consume the output of a GetMedia* call to Kinesis Video in a streaming fashion. +It supports a single abstract method called process that is invoked to process the streaming payload of a GetMedia response. +The first parameter for process method is the payload inputStream in a GetMediaResult returned by a call to GetMedia. +Implementations of the process method of this interface should block until all the data in the inputStream has been + processed or the process method decides to stop for some other reason. The second argument is a FragmentMetadataCallback + which is invoked at the end of every processed fragment. The `GetMediaResponseStreamConsumer` provides a utility method + `processWithFragmentEndCallbacks` that can be used by child classes to implement the end of fragment callbacks. + The process method can be implemented using a combination of the visitors described earlier. + +### MergedOutputPiper +The `MergedOutputPiper` extends `GetMediaResponseStreamConsumer` to merge consecutive mkv streams in the output of GetMedia + and pipes the merged stream to the stdin of a child process. It is meant to be used to pipe the output of a GetMedia* call to a processing application that can not deal +with having multiple consecutive mkv streams. Gstreamer is one such application that requires a merged stream. + + ## Example * `KinesisVideoExample` is an example that shows how the `StreamingMkvReader` and the different visitors can be integrated with the AWS SDK for the Kinesis Video. This example provides examples for @@ -82,9 +98,20 @@ with the AWS SDK for the Kinesis Video. This example provides examples for } ``` * It has been tested not only for streams ingested by `PutMediaWorker` but also streams sent to Kinesis Video Streams using GStreamer Demo application (https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-cpp) - + +* `KinesisVideoGStreamerPiperExample` is an example for continuously piping the output of GetMedia calls from a Kinesis Video stream to GStreamer. + The test `KinesisVideoGStreamerPiperExampleTest` provides an example that pipes the output of a KVS GetMedia call to a Gstreamer pipeline. + The Gstreamer pipeline is a toy example that demonstrates that Gstreamer can parse the mkv passed into it. + ## Release Notes +### Release 1.0.5 (May 2018) +* Introduce `GetMediaResponseStreamConsumer` as an abstract class used to consume the output of a GetMedia* call +to Kinesis Video in a streaming fashion. Child classes will use visitors to implement different consumers. +* The `MergedOutputPiper` extends `GetMediaResponseStreamConsumer` to merge consecutive mkv streams in the output of GetMedia + and pipes the merged stream to the stdin of a child process. +* Add the capability and example to pipe the output of GetMedia calls to GStreamer using `MergedOutputPiper`. + ### Release 1.0.4 (April 2018) * Add example for KinesisVideo Streams integration with Rekognition and draw Bounding Boxes for every sampled frame. * Fix for stream ending before reporting tags visited. diff --git a/pom.xml b/pom.xml index 447c695..ebe0ea0 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ amazon-kinesis-video-streams-parser-library jar Amazon Kinesis Video Streams Parser Library - 1.0.4 + 1.0.5-SNAPSHOT The Amazon Kinesis Video Streams Parser Library for Java enables Java developers to parse the streams returned by GetMedia calls to Amazon Kinesis Video. diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/ContinuousGetMediaWorker.java b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/ContinuousGetMediaWorker.java new file mode 100644 index 0000000..666783c --- /dev/null +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/ContinuousGetMediaWorker.java @@ -0,0 +1,124 @@ +/* +Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + + http://aws.amazon.com/apache2.0/ + +or in the "license" file accompanying this file. +This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +*/ +package com.amazonaws.kinesisvideo.parser.examples; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException; +import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadata; +import com.amazonaws.kinesisvideo.parser.utilities.consumer.GetMediaResponseStreamConsumer; +import com.amazonaws.kinesisvideo.parser.utilities.consumer.GetMediaResponseStreamConsumerFactory; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesisvideo.AmazonKinesisVideo; +import com.amazonaws.services.kinesisvideo.AmazonKinesisVideoMedia; +import com.amazonaws.services.kinesisvideo.AmazonKinesisVideoMediaClientBuilder; +import com.amazonaws.services.kinesisvideo.model.APIName; +import com.amazonaws.services.kinesisvideo.model.GetDataEndpointRequest; +import com.amazonaws.services.kinesisvideo.model.GetMediaRequest; +import com.amazonaws.services.kinesisvideo.model.GetMediaResult; +import com.amazonaws.services.kinesisvideo.model.StartSelector; +import com.amazonaws.services.kinesisvideo.model.StartSelectorType; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.Validate; + +import java.io.IOException; +import java.util.Optional; + +/** + * Worker used to make a GetMedia call to Kinesis Video and stream in data and parse it and apply a visitor. + */ +@Slf4j +public class ContinuousGetMediaWorker extends KinesisVideoCommon implements Runnable { + private static final int HTTP_STATUS_OK = 200; + private final AmazonKinesisVideoMedia videoMedia; + private final GetMediaResponseStreamConsumerFactory consumerFactory; + private final StartSelector startSelector; + private Optional fragmentNumberToStartAfter = Optional.empty(); + private boolean shouldStop = false; + + private ContinuousGetMediaWorker(Regions region, + AWSCredentialsProvider credentialsProvider, + String streamName, + StartSelector startSelector, + String endPoint, + GetMediaResponseStreamConsumerFactory consumerFactory) { + super(region, credentialsProvider, streamName); + + AmazonKinesisVideoMediaClientBuilder builder = AmazonKinesisVideoMediaClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, region.getName())) + .withCredentials(getCredentialsProvider()); + + this.videoMedia = builder.build(); + this.consumerFactory = consumerFactory; + this.startSelector = startSelector; + } + + public static ContinuousGetMediaWorker create(Regions region, + AWSCredentialsProvider credentialsProvider, + String streamName, + StartSelector startSelector, + AmazonKinesisVideo amazonKinesisVideo, + GetMediaResponseStreamConsumerFactory consumer) { + String endPoint = amazonKinesisVideo.getDataEndpoint(new GetDataEndpointRequest().withAPIName(APIName.GET_MEDIA) + .withStreamName(streamName)).getDataEndpoint(); + + return new ContinuousGetMediaWorker(region, credentialsProvider, streamName, startSelector, endPoint, consumer); + } + + public void stop() { + shouldStop = true; + } + + @Override + public void run() { + log.info("Start ContinuousGetMedia worker for stream {}", streamName); + while (!shouldStop) { + try { + + StartSelector selectorToUse = fragmentNumberToStartAfter.map(fn -> new StartSelector().withStartSelectorType(StartSelectorType.FRAGMENT_NUMBER) + .withAfterFragmentNumber(fn)).orElse(startSelector); + + GetMediaResult result = videoMedia.getMedia(new GetMediaRequest().withStreamName(streamName).withStartSelector(selectorToUse)); + log.info("Start processing GetMedia called for stream {} response {} requestId {}", + streamName, + result.getSdkHttpMetadata().getHttpStatusCode(), + result.getSdkResponseMetadata().getRequestId()); + + if (result.getSdkHttpMetadata().getHttpStatusCode() == HTTP_STATUS_OK) { + GetMediaResponseStreamConsumer consumer = consumerFactory.createConsumer(); + consumer.process( + result.getPayload(), this::updateFragmentNumberToStartAfter); + } else { + Thread.sleep(200); + } + } catch (IOException | MkvElementVisitException e) { + log.error("Failure in ContinuousGetMedia worker for stream {} {}", streamName, e); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } catch (Throwable t) { + log.error("WHAT",t); + } finally { + log.info("Exit processing GetMedia called for stream {}", streamName); + } + } + log.info("Exit ContinuousGetMedia worker for stream {}", streamName); + } + + private void updateFragmentNumberToStartAfter(FragmentMetadata f) { + Validate.isTrue(!fragmentNumberToStartAfter.isPresent() + || f.getFragmentNumberString().compareTo(fragmentNumberToStartAfter.get()) > 0); + fragmentNumberToStartAfter = Optional.of(f.getFragmentNumberString()); + } +} diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/GetMediaWorker.java b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/GetMediaWorker.java index 0b8e8f5..25956e4 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/GetMediaWorker.java +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/GetMediaWorker.java @@ -16,7 +16,6 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.kinesisvideo.parser.ebml.InputStreamParserByteSource; -import com.amazonaws.kinesisvideo.parser.mkv.MkvElement; import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException; import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitor; import com.amazonaws.kinesisvideo.parser.mkv.StreamingMkvReader; @@ -29,11 +28,8 @@ import com.amazonaws.services.kinesisvideo.model.GetMediaRequest; import com.amazonaws.services.kinesisvideo.model.GetMediaResult; import com.amazonaws.services.kinesisvideo.model.StartSelector; -import com.amazonaws.services.kinesisvideo.model.StartSelectorType; import lombok.extern.slf4j.Slf4j; -import java.util.Optional; - /** * Worker used to make a GetMedia call to Kinesis Video and stream in data and parse it and apply a visitor. */ diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExample.java b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExample.java index b2f70b4..e944c60 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExample.java +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExample.java @@ -27,18 +27,12 @@ import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesisvideo.AmazonKinesisVideo; import com.amazonaws.services.kinesisvideo.AmazonKinesisVideoClientBuilder; -import com.amazonaws.services.kinesisvideo.model.CreateStreamRequest; -import com.amazonaws.services.kinesisvideo.model.DeleteStreamRequest; -import com.amazonaws.services.kinesisvideo.model.DescribeStreamRequest; -import com.amazonaws.services.kinesisvideo.model.ResourceNotFoundException; import com.amazonaws.services.kinesisvideo.model.StartSelector; import com.amazonaws.services.kinesisvideo.model.StartSelectorType; -import com.amazonaws.services.kinesisvideo.model.StreamInfo; import lombok.Builder; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.Validate; import java.io.BufferedOutputStream; import java.io.Closeable; @@ -48,11 +42,9 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; /** * Example for integrating with Kinesis Video. diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoGStreamerPiperExample.java b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoGStreamerPiperExample.java new file mode 100644 index 0000000..08fe5a6 --- /dev/null +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoGStreamerPiperExample.java @@ -0,0 +1,133 @@ +/* +Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + + http://aws.amazon.com/apache2.0/ + +or in the "license" file accompanying this file. +This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +*/ +package com.amazonaws.kinesisvideo.parser.examples; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.kinesisvideo.parser.utilities.consumer.MergedOutputPiperFactory; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesisvideo.AmazonKinesisVideo; +import com.amazonaws.services.kinesisvideo.AmazonKinesisVideoClientBuilder; +import com.amazonaws.services.kinesisvideo.model.StartSelector; +import com.amazonaws.services.kinesisvideo.model.StartSelectorType; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Example for continuously piping the output of GetMedia calls from a Kinesis Video stream to GStreamer. + */ +@Slf4j +public class KinesisVideoGStreamerPiperExample extends KinesisVideoCommon { + private static final String DEFAULT_PATH_TO_GSTREAMER = "/usr/bin/gst-launch-1.0"; + private static final String [] FDSRC_ARGS = new String[] { + "-v", + "fdsrc", + "!" + }; + + private final AmazonKinesisVideo amazonKinesisVideo; + private final InputStream inputStream; + private final ExecutorService executorService; + private PutMediaWorker putMediaWorker; + private final StreamOps streamOps; + //The arguments to construct the gstreamer pipeline. + //The merged output of GetMedia will be piped to the gstreamer pipeline created using these arguments. + private final List gStreamerPipelineArguments; + + + @Builder + private KinesisVideoGStreamerPiperExample(Regions region, + String streamName, + AWSCredentialsProvider credentialsProvider, + InputStream inputVideoStream, + String gStreamerPipelineArgument) { + super(region, credentialsProvider, streamName); + final AmazonKinesisVideoClientBuilder builder = AmazonKinesisVideoClientBuilder.standard(); + configureClient(builder); + this.amazonKinesisVideo = builder.build(); + this.inputStream = inputVideoStream; + this.streamOps = new StreamOps(region, streamName, credentialsProvider); + this.executorService = Executors.newFixedThreadPool(2); + this.gStreamerPipelineArguments = new ArrayList<>(); + addGStreamerPipelineArguments(gStreamerPipelineArgument); + } + + private void addGStreamerPipelineArguments(String gStreamerPipeLineArgument) { + this.gStreamerPipelineArguments.add(pathToExecutable("PATH_TO_GSTREAMER", DEFAULT_PATH_TO_GSTREAMER)); + addToPipelineArguments(FDSRC_ARGS); + addToPipelineArguments(gStreamerPipeLineArgument.split("\\s+")); + } + + private String pathToExecutable(String environmentVariable, String defaultPath) { + final String environmentVariableValue = System.getenv(environmentVariable); + return StringUtils.isEmpty(environmentVariableValue) ? defaultPath : environmentVariableValue; + } + + private void addToPipelineArguments(String []pipelineArguments) { + for (String pipelineArgument : pipelineArguments) { + this.gStreamerPipelineArguments.add(pipelineArgument); + } + } + + /** + * This method executes the example. + * + * @throws InterruptedException the thread is interrupted while waiting for the stream to enter the correct state. + * @throws IOException fails to read video from the input stream or write to the output stream. + */ + public void execute () throws InterruptedException, IOException { + //Create the Kinesis Video stream, deleting and recreating if necessary. + streamOps.recreateStreamIfNecessary(); + + ContinuousGetMediaWorker getWorker = ContinuousGetMediaWorker.create(getRegion(), + getCredentialsProvider(), + getStreamName(), + new StartSelector().withStartSelectorType(StartSelectorType.EARLIEST), + amazonKinesisVideo, + new MergedOutputPiperFactory(Optional.empty(), + true, + gStreamerPipelineArguments)); + + executorService.submit(getWorker); + + //Start a PutMedia worker to write data to a Kinesis Video Stream. + putMediaWorker = PutMediaWorker.create(getRegion(), + getCredentialsProvider(), + getStreamName(), + inputStream, + amazonKinesisVideo); + executorService.submit(putMediaWorker); + + Thread.sleep(3000); + getWorker.stop(); + + executorService.shutdown(); + executorService.awaitTermination(120, TimeUnit.SECONDS); + if (!executorService.isTerminated()) { + log.warn("Shutting down executor service by force"); + executorService.shutdownNow(); + } else { + log.info("Executor service is shutdown"); + } + } +} diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/MkvElementVisitor.java b/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/MkvElementVisitor.java index d8d894e..eb6ede7 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/MkvElementVisitor.java +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/MkvElementVisitor.java @@ -13,7 +13,6 @@ */ package com.amazonaws.kinesisvideo.parser.mkv; - /** * Base visitor for visiting the different types of elements vended by a {\link StreamingMkvReader}. */ @@ -23,4 +22,8 @@ public abstract class MkvElementVisitor { public abstract void visit(MkvEndMasterElement endMasterElement) throws MkvElementVisitException; public abstract void visit(MkvDataElement dataElement) throws MkvElementVisitException; + + public boolean isDone() { + return false; + } } diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/StreamingMkvReader.java b/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/StreamingMkvReader.java index 93b4b70..82d5d97 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/StreamingMkvReader.java +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/StreamingMkvReader.java @@ -135,7 +135,7 @@ public Optional nextIfAvailable() { * @throws MkvElementVisitException If the visitor fails. */ public void apply(MkvElementVisitor visitor) throws MkvElementVisitException { - while (this.mightHaveNext()) { + while (this.mightHaveNext() && !visitor.isDone()) { Optional mkvElementOptional = this.nextIfAvailable(); if (mkvElementOptional.isPresent()) { mkvElementOptional.get().accept(visitor); diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/visitors/CompositeMkvElementVisitor.java b/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/visitors/CompositeMkvElementVisitor.java index c51332e..a7a36d7 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/visitors/CompositeMkvElementVisitor.java +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/visitors/CompositeMkvElementVisitor.java @@ -57,6 +57,11 @@ public void visit(MkvDataElement dataElement) throws MkvElementVisitException { visitAll(dataElement); } + @Override + public boolean isDone() { + return childVisitors.stream().anyMatch(MkvElementVisitor::isDone); + } + private void visitAll(MkvElement element) throws MkvElementVisitException { try { for (MkvElementVisitor childVisitor : childVisitors) { diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/visitors/CountVisitor.java b/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/visitors/CountVisitor.java index a385e8f..2560ccd 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/visitors/CountVisitor.java +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/visitors/CountVisitor.java @@ -21,6 +21,7 @@ import com.amazonaws.kinesisvideo.parser.mkv.MkvStartMasterElement; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -46,6 +47,14 @@ public CountVisitor(Collection typesToCount) { .forEach(t -> endMasterCount.put(t, 0)); } + public static CountVisitor create(EBMLTypeInfo... typesToCount) { + List typeInfoList = new ArrayList<>(); + for (EBMLTypeInfo typeToCount : typesToCount) { + typeInfoList.add(typeToCount); + } + return new CountVisitor(typeInfoList); + } + @Override public void visit(MkvStartMasterElement startMasterElement) { incrementTypeCount(startMasterElement); diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadata.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadata.java index e484a3c..79c7d98 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadata.java +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadata.java @@ -15,6 +15,7 @@ import lombok.Builder; import lombok.Getter; +import lombok.Setter; import lombok.ToString; import org.apache.commons.lang3.Validate; @@ -22,12 +23,14 @@ import java.util.Date; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.TimeUnit; /** * Metadata for a Kinesis Video Fragment. */ -@Getter @ToString +@Getter +@ToString public class FragmentMetadata { private static final String FRAGMENT_NUMBER_KEY = "AWS_KINESISVIDEO_FRAGMENT_NUMBER"; private static final String SERVER_SIDE_TIMESTAMP_KEY = "AWS_KINESISVIDEO_SERVER_TIMESTAMP"; @@ -45,6 +48,11 @@ public class FragmentMetadata { private final long errorId; private final String errorCode; + @Setter + private OptionalLong millisBehindNow = OptionalLong.empty(); + @Setter + private Optional continuationToken = Optional.empty(); + private FragmentMetadata(String fragmentNumberString, double serverSideTimestampSeconds, double producerSideTimestampSeconds) { @@ -106,4 +114,7 @@ public Date getProducerSideTimetampAsDate() { return new Date(this.producerSideTimestampMillis); } + public boolean isCompleteFragment() { + return millisBehindNow.isPresent() && continuationToken.isPresent(); + } } diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitor.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitor.java index c892195..00f060e 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitor.java +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitor.java @@ -194,10 +194,12 @@ private void setMillisBehindLatestAndContinuationToken() { String millisBehindString = tagNameToTagValueMap.get(MILLIS_BEHIND_NOW_KEY); if (millisBehindString != null) { millisBehindNow = (OptionalLong.of(Long.parseLong(millisBehindString))); + currentFragmentMetadata.ifPresent(f -> f.setMillisBehindNow(millisBehindNow)); } String continutationTokenString = tagNameToTagValueMap.get(CONTINUATION_TOKEN_KEY); if (continutationTokenString != null) { continuationToken = Optional.of(continutationTokenString); + currentFragmentMetadata.ifPresent(f -> f.setContinuationToken(continuationToken)); } } diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/OutputSegmentMerger.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/OutputSegmentMerger.java index 7b1b4c6..5d80c69 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/OutputSegmentMerger.java +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/OutputSegmentMerger.java @@ -143,11 +143,7 @@ private static List getTypesToMergeOn() { } private static CountVisitor getCountVisitor() { - List typesToCountList = new ArrayList<>(); - typesToCountList.add(MkvTypeInfos.CLUSTER); - typesToCountList.add(MkvTypeInfos.SEGMENT); - typesToCountList.add(MkvTypeInfos.SIMPLEBLOCK); - return new CountVisitor(typesToCountList); + return CountVisitor.create(MkvTypeInfos.CLUSTER, MkvTypeInfos.SEGMENT, MkvTypeInfos.SIMPLEBLOCK); } @@ -163,6 +159,7 @@ public int getSimpleBlocksCount() { return countVisitor.getCount(MkvTypeInfos.SIMPLEBLOCK); } + @Override public boolean isDone() { return MergeState.DONE == state; } @@ -229,11 +226,21 @@ public void visit(MkvStartMasterElement startMasterElement) throws MkvElementVis } } catch(IOException ie) { - throw new MkvElementVisitException("IOException in merge visitor", ie); + wrapIOException(ie); } } + private void wrapIOException(IOException ie) throws MkvElementVisitException { + String exceptionMessage = "IOException in merge visitor "; + if (lastClusterTimecode.isPresent()) { + exceptionMessage += "in or immediately after cluster with timecode "+lastClusterTimecode.get(); + } else { + exceptionMessage += "in first cluster"; + } + throw new MkvElementVisitException(exceptionMessage, ie); + } + @Override public void visit(MkvEndMasterElement endMasterElement) throws MkvElementVisitException { switch (state) { @@ -308,9 +315,14 @@ public void visit(MkvDataElement dataElement) throws MkvElementVisitException { } } catch (IOException ie) { - throw new MkvElementVisitException("IOException in merge visitor", ie); + wrapIOException(ie); } } + + @Override + public boolean isDone() { + return MergeState.DONE == state; + } } private void emitClusterStart() throws IOException { diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/FragmentMetadataCallback.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/FragmentMetadataCallback.java new file mode 100644 index 0000000..768de7b --- /dev/null +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/FragmentMetadataCallback.java @@ -0,0 +1,24 @@ +/* +Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + + http://aws.amazon.com/apache2.0/ + +or in the "license" file accompanying this file. +This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +*/ +package com.amazonaws.kinesisvideo.parser.utilities.consumer; + +import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadata; + +/** + * A callback that receives the fragment metadata of a fragment. + */ +@FunctionalInterface +public interface FragmentMetadataCallback { + void call(FragmentMetadata consumedFragment); +} diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/FragmentProgressTracker.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/FragmentProgressTracker.java new file mode 100644 index 0000000..ebb0882 --- /dev/null +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/FragmentProgressTracker.java @@ -0,0 +1,89 @@ +/* +Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + + http://aws.amazon.com/apache2.0/ + +or in the "license" file accompanying this file. +This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +*/ +package com.amazonaws.kinesisvideo.parser.utilities.consumer; + +import com.amazonaws.kinesisvideo.parser.ebml.MkvTypeInfos; +import com.amazonaws.kinesisvideo.parser.mkv.MkvDataElement; +import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException; +import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitor; +import com.amazonaws.kinesisvideo.parser.mkv.MkvEndMasterElement; +import com.amazonaws.kinesisvideo.parser.mkv.MkvStartMasterElement; +import com.amazonaws.kinesisvideo.parser.mkv.visitors.CompositeMkvElementVisitor; +import com.amazonaws.kinesisvideo.parser.mkv.visitors.CountVisitor; +import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadataVisitor; +import lombok.RequiredArgsConstructor; + +/** + * This class is used to track the progress in processing the output of a GetMedia call. + * + */ +public class FragmentProgressTracker extends CompositeMkvElementVisitor { + private final CountVisitor countVisitor; + + + private FragmentProgressTracker(MkvElementVisitor processingVisitor, + FragmentMetadataVisitor metadataVisitor, + CountVisitor countVisitor, + EndOfSegmentVisitor endOfSegmentVisitor) { + super(metadataVisitor, processingVisitor, countVisitor, endOfSegmentVisitor); + this.countVisitor = countVisitor; + } + + public static FragmentProgressTracker create(MkvElementVisitor processingVisitor, + FragmentMetadataCallback callback) { + FragmentMetadataVisitor metadataVisitor = FragmentMetadataVisitor.create(); + return new FragmentProgressTracker(processingVisitor, + metadataVisitor, + CountVisitor.create(MkvTypeInfos.CLUSTER, + MkvTypeInfos.SEGMENT, + MkvTypeInfos.SIMPLEBLOCK, + MkvTypeInfos.TAG), + new EndOfSegmentVisitor(metadataVisitor, callback)); + } + + public int getClustersCount() { + return countVisitor.getCount(MkvTypeInfos.CLUSTER); + } + + public int getSegmentsCount() { + return countVisitor.getCount(MkvTypeInfos.SEGMENT); + } + + public int getSimpleBlocksCount() { + return countVisitor.getCount(MkvTypeInfos.SIMPLEBLOCK); + } + + @RequiredArgsConstructor + private static class EndOfSegmentVisitor extends MkvElementVisitor { + private final FragmentMetadataVisitor metadataVisitor; + private final FragmentMetadataCallback endOfFragmentCallback; + + @Override + public void visit(MkvStartMasterElement startMasterElement) throws MkvElementVisitException { + + } + + @Override + public void visit(MkvEndMasterElement endMasterElement) throws MkvElementVisitException { + if (MkvTypeInfos.SEGMENT.equals(endMasterElement.getElementMetaData().getTypeInfo())) { + metadataVisitor.getCurrentFragmentMetadata().ifPresent(endOfFragmentCallback::call); + } + } + + @Override + public void visit(MkvDataElement dataElement) throws MkvElementVisitException { + } + } + +} diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/GetMediaResponseStreamConsumer.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/GetMediaResponseStreamConsumer.java new file mode 100644 index 0000000..d895050 --- /dev/null +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/GetMediaResponseStreamConsumer.java @@ -0,0 +1,44 @@ +/* +Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + + http://aws.amazon.com/apache2.0/ + +or in the "license" file accompanying this file. +This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +*/ +package com.amazonaws.kinesisvideo.parser.utilities.consumer; + +import com.amazonaws.kinesisvideo.parser.ebml.InputStreamParserByteSource; +import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException; +import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitor; +import com.amazonaws.kinesisvideo.parser.mkv.StreamingMkvReader; +import com.amazonaws.kinesisvideo.parser.mkv.visitors.CompositeMkvElementVisitor; +import com.amazonaws.kinesisvideo.parser.utilities.OutputSegmentMerger; + +import java.io.IOException; +import java.io.InputStream; + +/** + * This base class is used to consume the output of a GetMedia* call to Kinesis Video in a streaming fashion. + * The first parameter for process method is the payload inputStream in a GetMediaResult returned by a call to GetMedia. + * Implementations of the process method of this interface should block until all the data in the inputStream has been + * processed or the process method decides to stop for some other reason. The FragmentMetadataCallback is invoked at + * the end of every processed fragment. + */ +public abstract class GetMediaResponseStreamConsumer { + + public abstract void process(InputStream inputStream, FragmentMetadataCallback callback) + throws MkvElementVisitException, IOException; + + protected void processWithFragmentEndCallbacks(InputStream inputStream, + FragmentMetadataCallback endOfFragmentCallback, + MkvElementVisitor mkvElementVisitor) throws MkvElementVisitException { + StreamingMkvReader.createDefault(new InputStreamParserByteSource(inputStream)) + .apply(FragmentProgressTracker.create(mkvElementVisitor, endOfFragmentCallback)); + } +} diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/GetMediaResponseStreamConsumerFactory.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/GetMediaResponseStreamConsumerFactory.java new file mode 100644 index 0000000..120fa87 --- /dev/null +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/GetMediaResponseStreamConsumerFactory.java @@ -0,0 +1,23 @@ +/* +Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + + http://aws.amazon.com/apache2.0/ + +or in the "license" file accompanying this file. +This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +*/ +package com.amazonaws.kinesisvideo.parser.utilities.consumer; + +import java.io.IOException; + +/** + * A base class used to create GetMediaResponseStreamConsumers. + */ +public abstract class GetMediaResponseStreamConsumerFactory { + public abstract GetMediaResponseStreamConsumer createConsumer() throws IOException; +} diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/MergedOutputPiper.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/MergedOutputPiper.java new file mode 100644 index 0000000..29e53e2 --- /dev/null +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/MergedOutputPiper.java @@ -0,0 +1,73 @@ +/* +Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + + http://aws.amazon.com/apache2.0/ + +or in the "license" file accompanying this file. +This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +*/ +package com.amazonaws.kinesisvideo.parser.utilities.consumer; + +import com.amazonaws.kinesisvideo.parser.ebml.InputStreamParserByteSource; +import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException; +import com.amazonaws.kinesisvideo.parser.mkv.StreamingMkvReader; +import com.amazonaws.kinesisvideo.parser.mkv.visitors.CompositeMkvElementVisitor; +import com.amazonaws.kinesisvideo.parser.utilities.OutputSegmentMerger; +import lombok.RequiredArgsConstructor; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * This class merges consecutive mkv streams and pipes the merged stream to the stdin of a child process. + * It is meant to be used to pipe the output of a GetMedia* call to a processing application that can not deal + * with having multiple consecutive mkv streams. Gstreamer is one such application that requires a merged stream. + * A merged stream is where the consecutive mkv streams are merged as long as they share the same track + * and EBML information and the cluster level timecodes in those streams keep increasing. + * If a non-matching mkv stream is detected, the piper stops. + */ + +@RequiredArgsConstructor +public class MergedOutputPiper extends GetMediaResponseStreamConsumer { + /** + * The process builder to create the child proccess to which the merged output + */ + private final ProcessBuilder childProcessBuilder; + + + private OutputSegmentMerger merger; + private Process targetProcess; + + @Override + public void process(final InputStream inputStream, FragmentMetadataCallback endOfFragmentCallback) + throws MkvElementVisitException, IOException { + targetProcess = childProcessBuilder.start(); + try (OutputStream os = targetProcess.getOutputStream()) { + merger = OutputSegmentMerger.createToStopAtFirstNonMatchingSegment(os); + processWithFragmentEndCallbacks(inputStream, endOfFragmentCallback, merger); + } + } + + /** + * Get the number of segments that were merged by the piper. + * If the merger is done because the last segment it read cannot be merged, then the number of merged segments + * is the number of segments read minus the last segment. + * If the merger is not done then the number of merged segments is the number of read segments. + * @return + */ + public int getMergedSegments() { + if (merger.isDone()) { + return merger.getSegmentsCount() - 1; + } else { + return merger.getSegmentsCount(); + } + } + + +} diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/MergedOutputPiperFactory.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/MergedOutputPiperFactory.java new file mode 100644 index 0000000..762e4eb --- /dev/null +++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/MergedOutputPiperFactory.java @@ -0,0 +1,74 @@ +/* +Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + + http://aws.amazon.com/apache2.0/ + +or in the "license" file accompanying this file. +This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +*/ +package com.amazonaws.kinesisvideo.parser.utilities.consumer; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * This factory class creates MergedOutputPiper consumers based on a particular target ProcessBuilder. + */ +public class MergedOutputPiperFactory extends GetMediaResponseStreamConsumerFactory { + private final Optional directoryOptional; + private final List commandList; + private final boolean redirectOutputAndError; + + public MergedOutputPiperFactory(String... commands) { + this(Optional.empty(), commands); + } + + public MergedOutputPiperFactory(Optional directoryOptional, String... commands) { + this(directoryOptional, false, commands); + } + + private MergedOutputPiperFactory(Optional directoryOptional, + boolean redirectOutputAndError, + String... commands) { + this.directoryOptional = directoryOptional; + this.commandList = new ArrayList(); + for (String command : commands) { + commandList.add(command); + } + this.redirectOutputAndError = redirectOutputAndError; + } + + public MergedOutputPiperFactory(Optional directoryOptional, + boolean redirectOutputAndError, + List commandList) { + this.directoryOptional = directoryOptional; + this.commandList = commandList; + this.redirectOutputAndError = redirectOutputAndError; + } + + @Override + public GetMediaResponseStreamConsumer createConsumer() throws IOException{ + ProcessBuilder builder = new ProcessBuilder().command(commandList); + directoryOptional.ifPresent(d -> builder.directory(new File(d))); + if (redirectOutputAndError) { + builder.redirectOutput(Files.createFile(Paths.get(redirectedFileName("stdout"))).toFile()); + builder.redirectError(Files.createFile(Paths.get(redirectedFileName("stderr"))).toFile()); + } + return new MergedOutputPiper(builder); + + } + + private String redirectedFileName(String suffix) { + return "MergedOutputPiper-"+System.currentTimeMillis()+"-"+suffix; + } +} diff --git a/src/test/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoGStreamerPiperExampleTest.java b/src/test/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoGStreamerPiperExampleTest.java new file mode 100644 index 0000000..60ceb7e --- /dev/null +++ b/src/test/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoGStreamerPiperExampleTest.java @@ -0,0 +1,68 @@ +/* +Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + + http://aws.amazon.com/apache2.0/ + +or in the "license" file accompanying this file. +This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +*/ +package com.amazonaws.kinesisvideo.parser.examples; + +import com.amazonaws.auth.profile.ProfileCredentialsProvider; +import com.amazonaws.kinesisvideo.parser.TestResourceUtil; +import com.amazonaws.kinesisvideo.parser.ebml.InputStreamParserByteSource; +import com.amazonaws.kinesisvideo.parser.ebml.MkvTypeInfos; +import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException; +import com.amazonaws.kinesisvideo.parser.mkv.StreamingMkvReader; +import com.amazonaws.kinesisvideo.parser.mkv.visitors.CountVisitor; +import com.amazonaws.regions.Regions; +import lombok.Getter; +import org.jcodec.common.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * Test to execute Kinesis Video GStreamer Piper Example. + * It passes in a pipeline that demuxes and remuxes the input mkv stream and writes to a file sink. + * This can be used to demonstrate that the stream passed into the gstreamer pipeline is acceptable to it. + */ +public class KinesisVideoGStreamerPiperExampleTest { + + @Ignore + @Test + public void testExample() throws InterruptedException, IOException, MkvElementVisitException { + final Path outputFilePath = Paths.get("output_from_gstreamer-"+System.currentTimeMillis()+".mkv"); + String gStreamerPipelineArgument = + "matroskademux ! matroskamux! filesink location=" + outputFilePath.toAbsolutePath().toString(); + + KinesisVideoGStreamerPiperExample example = KinesisVideoGStreamerPiperExample.builder().region(Regions.US_WEST_2) + .streamName("myTestStream2") + .credentialsProvider(new ProfileCredentialsProvider()) + .inputVideoStream(TestResourceUtil.getTestInputStream("clusters.mkv")) + .gStreamerPipelineArgument(gStreamerPipelineArgument) + .build(); + + example.execute(); + + //Verify that the generated output file has the expected number of segments, clusters and simple blocks. + CountVisitor countVisitor = + CountVisitor.create(MkvTypeInfos.SEGMENT, MkvTypeInfos.CLUSTER, MkvTypeInfos.SIMPLEBLOCK); + StreamingMkvReader.createDefault(new InputStreamParserByteSource(Files.newInputStream(outputFilePath))) + .apply(countVisitor); + + Assert.assertEquals(1,countVisitor.getCount(MkvTypeInfos.SEGMENT)); + Assert.assertEquals(8,countVisitor.getCount(MkvTypeInfos.CLUSTER)); + Assert.assertEquals(444,countVisitor.getCount(MkvTypeInfos.SIMPLEBLOCK)); + } +} diff --git a/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/FrameRendererTest.java b/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/FrameRendererTest.java index 172b7f7..cd44d40 100644 --- a/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/FrameRendererTest.java +++ b/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/FrameRendererTest.java @@ -48,11 +48,8 @@ public void frameCountTest() throws IOException, MkvElementVisitException { StreamingMkvReader mkvStreamReader = StreamingMkvReader.createDefault(new InputStreamParserByteSource(in)); - List typesToCount = new ArrayList<>(); - typesToCount.add(MkvTypeInfos.CLUSTER); - typesToCount.add(MkvTypeInfos.SIMPLEBLOCK); - typesToCount.add(MkvTypeInfos.TRACKS); - CountVisitor countVisitor = new CountVisitor(typesToCount); + CountVisitor countVisitor = + CountVisitor.create(MkvTypeInfos.CLUSTER, MkvTypeInfos.SEGMENT, MkvTypeInfos.SIMPLEBLOCK); mkvStreamReader.apply(new CompositeMkvElementVisitor(countVisitor, FrameVisitor.create(frameRenderer))); diff --git a/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/OutputSegmentMergerTest.java b/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/OutputSegmentMergerTest.java index 56f4586..42fbb07 100644 --- a/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/OutputSegmentMergerTest.java +++ b/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/OutputSegmentMergerTest.java @@ -335,11 +335,7 @@ public void perfTest() throws IOException, MkvElementVisitException, Interrupted } private static CountVisitor getCountVisitor() { - List typesToCountList = new ArrayList<>(); - typesToCountList.add(MkvTypeInfos.CLUSTER); - typesToCountList.add(MkvTypeInfos.SEGMENT); - typesToCountList.add(MkvTypeInfos.SIMPLEBLOCK); - return new CountVisitor(typesToCountList); + return CountVisitor.create(MkvTypeInfos.CLUSTER, MkvTypeInfos.SEGMENT, MkvTypeInfos.SIMPLEBLOCK); } @Test diff --git a/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/MergedOutputPiperTest.java b/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/MergedOutputPiperTest.java new file mode 100644 index 0000000..5e33e16 --- /dev/null +++ b/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/consumer/MergedOutputPiperTest.java @@ -0,0 +1,209 @@ +/* +Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + + http://aws.amazon.com/apache2.0/ + +or in the "license" file accompanying this file. +This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. +*/ +package com.amazonaws.kinesisvideo.parser.utilities.consumer; + +import com.amazonaws.kinesisvideo.parser.TestResourceUtil; +import com.amazonaws.kinesisvideo.parser.ebml.InputStreamParserByteSource; +import com.amazonaws.kinesisvideo.parser.ebml.MkvTypeInfos; +import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException; +import com.amazonaws.kinesisvideo.parser.mkv.StreamingMkvReader; +import com.amazonaws.kinesisvideo.parser.mkv.visitors.CountVisitor; +import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadata; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + +/** + * Tests for MergedOutputPiper + */ +public class MergedOutputPiperTest { + //Can be overridden by setting the environment variable PATH_TO_CAT + private static final String DEFAULT_PATH_TO_CAT = "/bin/cat"; + //Can be overridden by setting the environment variable PATH_TO_GSTREAMER + private static final String DEFAULT_PATH_TO_GSTREAMER = "/usr/bin/gst-launch-1.0"; + private boolean canRunBasicTest; + private String pathToCat; + private boolean canRunGStreamer; + private String pathToGStreamer; + + private Optional processedFragmentNumberString; + + @Before + public void setup() { + pathToCat = pathToExecutable("PATH_TO_CAT", DEFAULT_PATH_TO_CAT); + canRunBasicTest = checkIfFileExists(pathToCat); + pathToGStreamer = pathToExecutable("PATH_TO_GSTREAMER", DEFAULT_PATH_TO_GSTREAMER); + canRunGStreamer = checkIfFileExists(pathToGStreamer); + processedFragmentNumberString = Optional.empty(); + } + + private String pathToExecutable(String environmentVariable, String defaultPath) { + final String environmentVariableValue = System.getenv(environmentVariable); + return StringUtils.isEmpty(environmentVariableValue) ? defaultPath : environmentVariableValue; + } + + + private boolean checkIfFileExists(String pathToFile) { + return new File(pathToFile).exists(); + } + + + @Test + public void testBasic() throws IOException, MkvElementVisitException { + if (canRunBasicTest) { + String fileName = "output_get_media.mkv"; + runBasicTestForFile(fileName, "91343852333181432412489103236310005892133364608", 5); + } + } + + @Test + public void testBasicNonIncreasingTimecode() throws IOException, MkvElementVisitException { + if (canRunBasicTest) { + String fileName = "output-get-media-non-increasing-timecode.mkv"; + runBasicTestForFile(fileName, "91343852338381293673923423239754896920603583280", 1); + } + } + + @Ignore + @Test + public void testGStreamerVideoSink() throws IOException, MkvElementVisitException { + if (canRunGStreamer) { + Path tmpFilePathToStdout = Files.createTempFile("testGStreamer", "stdout"); + Path tmpFilePathToStdErr = Files.createTempFile("testGStreamer", "stderr"); + try { + InputStream is = TestResourceUtil.getTestInputStream("output_get_media.mkv"); + ProcessBuilder processBuilder = new ProcessBuilder().command(pathToGStreamer, + "-v", + "fdsrc", + "!", + "decodebin", + "!", + "videoconvert", + "!", + "autovideosink") + .redirectOutput(tmpFilePathToStdout.toFile()) + .redirectError(tmpFilePathToStdErr.toFile()); + MergedOutputPiper piper = new MergedOutputPiper(processBuilder); + piper.process(is, this::setProcessedFragmentNumberString); + Assert.assertEquals("91343852333181432412489103236310005892133364608", + processedFragmentNumberString.get()); + Assert.assertEquals(5, piper.getMergedSegments()); + } finally { + Files.delete(tmpFilePathToStdout); + Files.delete(tmpFilePathToStdErr); + } + } + } + + @Test + public void testGStreamerFileSink() throws IOException, MkvElementVisitException { + if (canRunGStreamer) { + Path tmpFilePathToStdout = Files.createTempFile("testGStreamerFileSink", "stdout"); + Path tmpFilePathToStdErr = Files.createTempFile("testGStreamerFileSink", "stderr"); + Path tmpFilePathToOutputFile = Files.createTempFile("testGStreamerFileSink", "output.mkv"); + try { + InputStream is = TestResourceUtil.getTestInputStream("output_get_media.mkv"); + ProcessBuilder processBuilder = new ProcessBuilder().command(pathToGStreamer, + "-v", + "fdsrc", + "!", + "filesink", + "location=" + tmpFilePathToOutputFile.toAbsolutePath().toString()) + .redirectOutput(tmpFilePathToStdout.toFile()) + .redirectError(tmpFilePathToStdErr.toFile()); + MergedOutputPiper piper = new MergedOutputPiper(processBuilder); + piper.process(is, this::setProcessedFragmentNumberString); + + CountVisitor countVisitor = + CountVisitor.create(MkvTypeInfos.CLUSTER, MkvTypeInfos.SEGMENT, MkvTypeInfos.SIMPLEBLOCK); + StreamingMkvReader.createDefault(new InputStreamParserByteSource(new FileInputStream( + tmpFilePathToOutputFile.toFile()))).apply(countVisitor); + Assert.assertEquals(5, countVisitor.getCount(MkvTypeInfos.CLUSTER)); + Assert.assertEquals(1, countVisitor.getCount(MkvTypeInfos.SEGMENT)); + Assert.assertEquals(300, countVisitor.getCount(MkvTypeInfos.SIMPLEBLOCK)); + } finally { + Files.delete(tmpFilePathToStdout); + Files.delete(tmpFilePathToStdErr); + Files.delete(tmpFilePathToOutputFile); + } + } + } + + @Test + public void testGStreamerFileSinkWithFactory() throws IOException, MkvElementVisitException { + if (canRunGStreamer) { + Path tmpFilePathToOutputFile = Files.createTempFile("testGStreamerFileSink", "output.mkv"); + MergedOutputPiperFactory piperFactory = new MergedOutputPiperFactory(pathToGStreamer, + "-v", + "fdsrc", + "!", + "filesink", + "location=" + tmpFilePathToOutputFile.toAbsolutePath().toString()); + try { + InputStream is = TestResourceUtil.getTestInputStream("output_get_media.mkv"); + GetMediaResponseStreamConsumer piper = piperFactory.createConsumer(); + Assert.assertTrue(piper.getClass().equals(MergedOutputPiper.class)); + piper.process(is, this::setProcessedFragmentNumberString); + + CountVisitor countVisitor = + CountVisitor.create(MkvTypeInfos.CLUSTER, MkvTypeInfos.SEGMENT, MkvTypeInfos.SIMPLEBLOCK); + StreamingMkvReader.createDefault(new InputStreamParserByteSource(new FileInputStream( + tmpFilePathToOutputFile.toFile()))).apply(countVisitor); + Assert.assertEquals(5, countVisitor.getCount(MkvTypeInfos.CLUSTER)); + Assert.assertEquals(1, countVisitor.getCount(MkvTypeInfos.SEGMENT)); + Assert.assertEquals(300, countVisitor.getCount(MkvTypeInfos.SIMPLEBLOCK)); + } finally { + Files.delete(tmpFilePathToOutputFile); + } + } + } + + private void runBasicTestForFile(String fileName, + String expectedFragmentNumberToStartAfter, + int expectedNumMergedSegments) throws IOException, MkvElementVisitException { + StopWatch timer = new StopWatch(); + timer.start(); + + InputStream is = TestResourceUtil.getTestInputStream(fileName); + Path tmpFilePath = Files.createTempFile("basicTest:" + fileName + ":", "merged.mkv"); + + try { + ProcessBuilder processBuilder = + new ProcessBuilder().command(pathToCat).redirectOutput(tmpFilePath.toFile()); + MergedOutputPiper piper = new MergedOutputPiper(processBuilder); + piper.process(is, this::setProcessedFragmentNumberString); + timer.stop(); + Assert.assertEquals(expectedFragmentNumberToStartAfter, processedFragmentNumberString.get()); + Assert.assertEquals(expectedNumMergedSegments, piper.getMergedSegments()); + } finally { + Files.delete(tmpFilePath); + } + } + + private Optional setProcessedFragmentNumberString(FragmentMetadata f) { + return processedFragmentNumberString = Optional.of(f.getFragmentNumberString()); + } + +}