Skip to content

Commit

Permalink
Release 1.0.5 (#23)
Browse files Browse the repository at this point in the history
* Add the capability and example to pipe the output of GetMedia calls to GStreamer.

* Add release notes for 1.0.5
  • Loading branch information
sayantacC authored May 18, 2018
1 parent 0017a7d commit b72c4d8
Show file tree
Hide file tree
Showing 23 changed files with 945 additions and 34 deletions.
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ visited `MkvElement` for each constituent visitor in the order in which the visi

`CopyVisitor` is a visitor used to copy the raw bytes of the Mkv elements in a stream to an output stream.

## ResponseStreamConsumers
The `GetMediaResponseStreamConsumer` is an abstract class used to consume the output of a GetMedia* call to Kinesis Video in a streaming fashion.
It supports a single abstract method called process that is invoked to process the streaming payload of a GetMedia response.
The first parameter for process method is the payload inputStream in a GetMediaResult returned by a call to GetMedia.
Implementations of the process method of this interface should block until all the data in the inputStream has been
processed or the process method decides to stop for some other reason. The second argument is a FragmentMetadataCallback
which is invoked at the end of every processed fragment. The `GetMediaResponseStreamConsumer` provides a utility method
`processWithFragmentEndCallbacks` that can be used by child classes to implement the end of fragment callbacks.
The process method can be implemented using a combination of the visitors described earlier.

### MergedOutputPiper
The `MergedOutputPiper` extends `GetMediaResponseStreamConsumer` to merge consecutive mkv streams in the output of GetMedia
and pipes the merged stream to the stdin of a child process. It is meant to be used to pipe the output of a GetMedia* call to a processing application that can not deal
with having multiple consecutive mkv streams. Gstreamer is one such application that requires a merged stream.


## Example
* `KinesisVideoExample` is an example that shows how the `StreamingMkvReader` and the different visitors can be integrated
with the AWS SDK for the Kinesis Video. This example provides examples for
Expand All @@ -82,9 +98,20 @@ with the AWS SDK for the Kinesis Video. This example provides examples for
}
```
* It has been tested not only for streams ingested by `PutMediaWorker` but also streams sent to Kinesis Video Streams using GStreamer Demo application (https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-cpp)
* `KinesisVideoGStreamerPiperExample` is an example for continuously piping the output of GetMedia calls from a Kinesis Video stream to GStreamer.
The test `KinesisVideoGStreamerPiperExampleTest` provides an example that pipes the output of a KVS GetMedia call to a Gstreamer pipeline.
The Gstreamer pipeline is a toy example that demonstrates that Gstreamer can parse the mkv passed into it.
## Release Notes
### Release 1.0.5 (May 2018)
* Introduce `GetMediaResponseStreamConsumer` as an abstract class used to consume the output of a GetMedia* call
to Kinesis Video in a streaming fashion. Child classes will use visitors to implement different consumers.
* The `MergedOutputPiper` extends `GetMediaResponseStreamConsumer` to merge consecutive mkv streams in the output of GetMedia
and pipes the merged stream to the stdin of a child process.
* Add the capability and example to pipe the output of GetMedia calls to GStreamer using `MergedOutputPiper`.
### Release 1.0.4 (April 2018)
* Add example for KinesisVideo Streams integration with Rekognition and draw Bounding Boxes for every sampled frame.
* Fix for stream ending before reporting tags visited.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<artifactId>amazon-kinesis-video-streams-parser-library</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Video Streams Parser Library</name>
<version>1.0.4</version>
<version>1.0.5-SNAPSHOT</version>
<description>The Amazon Kinesis Video Streams Parser Library for Java enables Java developers to parse the streams
returned by GetMedia calls to Amazon Kinesis Video.
</description>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://aws.amazon.com/apache2.0/
or in the "license" file accompanying this file.
This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
package com.amazonaws.kinesisvideo.parser.examples;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException;
import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadata;
import com.amazonaws.kinesisvideo.parser.utilities.consumer.GetMediaResponseStreamConsumer;
import com.amazonaws.kinesisvideo.parser.utilities.consumer.GetMediaResponseStreamConsumerFactory;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesisvideo.AmazonKinesisVideo;
import com.amazonaws.services.kinesisvideo.AmazonKinesisVideoMedia;
import com.amazonaws.services.kinesisvideo.AmazonKinesisVideoMediaClientBuilder;
import com.amazonaws.services.kinesisvideo.model.APIName;
import com.amazonaws.services.kinesisvideo.model.GetDataEndpointRequest;
import com.amazonaws.services.kinesisvideo.model.GetMediaRequest;
import com.amazonaws.services.kinesisvideo.model.GetMediaResult;
import com.amazonaws.services.kinesisvideo.model.StartSelector;
import com.amazonaws.services.kinesisvideo.model.StartSelectorType;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.Validate;

import java.io.IOException;
import java.util.Optional;

/**
* Worker used to make a GetMedia call to Kinesis Video and stream in data and parse it and apply a visitor.
*/
@Slf4j
public class ContinuousGetMediaWorker extends KinesisVideoCommon implements Runnable {
private static final int HTTP_STATUS_OK = 200;
private final AmazonKinesisVideoMedia videoMedia;
private final GetMediaResponseStreamConsumerFactory consumerFactory;
private final StartSelector startSelector;
private Optional<String> fragmentNumberToStartAfter = Optional.empty();
private boolean shouldStop = false;

private ContinuousGetMediaWorker(Regions region,
AWSCredentialsProvider credentialsProvider,
String streamName,
StartSelector startSelector,
String endPoint,
GetMediaResponseStreamConsumerFactory consumerFactory) {
super(region, credentialsProvider, streamName);

AmazonKinesisVideoMediaClientBuilder builder = AmazonKinesisVideoMediaClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, region.getName()))
.withCredentials(getCredentialsProvider());

this.videoMedia = builder.build();
this.consumerFactory = consumerFactory;
this.startSelector = startSelector;
}

public static ContinuousGetMediaWorker create(Regions region,
AWSCredentialsProvider credentialsProvider,
String streamName,
StartSelector startSelector,
AmazonKinesisVideo amazonKinesisVideo,
GetMediaResponseStreamConsumerFactory consumer) {
String endPoint = amazonKinesisVideo.getDataEndpoint(new GetDataEndpointRequest().withAPIName(APIName.GET_MEDIA)
.withStreamName(streamName)).getDataEndpoint();

return new ContinuousGetMediaWorker(region, credentialsProvider, streamName, startSelector, endPoint, consumer);
}

public void stop() {
shouldStop = true;
}

@Override
public void run() {
log.info("Start ContinuousGetMedia worker for stream {}", streamName);
while (!shouldStop) {
try {

StartSelector selectorToUse = fragmentNumberToStartAfter.map(fn -> new StartSelector().withStartSelectorType(StartSelectorType.FRAGMENT_NUMBER)
.withAfterFragmentNumber(fn)).orElse(startSelector);

GetMediaResult result = videoMedia.getMedia(new GetMediaRequest().withStreamName(streamName).withStartSelector(selectorToUse));
log.info("Start processing GetMedia called for stream {} response {} requestId {}",
streamName,
result.getSdkHttpMetadata().getHttpStatusCode(),
result.getSdkResponseMetadata().getRequestId());

if (result.getSdkHttpMetadata().getHttpStatusCode() == HTTP_STATUS_OK) {
GetMediaResponseStreamConsumer consumer = consumerFactory.createConsumer();
consumer.process(
result.getPayload(), this::updateFragmentNumberToStartAfter);
} else {
Thread.sleep(200);
}
} catch (IOException | MkvElementVisitException e) {
log.error("Failure in ContinuousGetMedia worker for stream {} {}", streamName, e);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
} catch (Throwable t) {
log.error("WHAT",t);
} finally {
log.info("Exit processing GetMedia called for stream {}", streamName);
}
}
log.info("Exit ContinuousGetMedia worker for stream {}", streamName);
}

private void updateFragmentNumberToStartAfter(FragmentMetadata f) {
Validate.isTrue(!fragmentNumberToStartAfter.isPresent()
|| f.getFragmentNumberString().compareTo(fragmentNumberToStartAfter.get()) > 0);
fragmentNumberToStartAfter = Optional.of(f.getFragmentNumberString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.kinesisvideo.parser.ebml.InputStreamParserByteSource;
import com.amazonaws.kinesisvideo.parser.mkv.MkvElement;
import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException;
import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitor;
import com.amazonaws.kinesisvideo.parser.mkv.StreamingMkvReader;
Expand All @@ -29,11 +28,8 @@
import com.amazonaws.services.kinesisvideo.model.GetMediaRequest;
import com.amazonaws.services.kinesisvideo.model.GetMediaResult;
import com.amazonaws.services.kinesisvideo.model.StartSelector;
import com.amazonaws.services.kinesisvideo.model.StartSelectorType;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;

/**
* Worker used to make a GetMedia call to Kinesis Video and stream in data and parse it and apply a visitor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,12 @@
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesisvideo.AmazonKinesisVideo;
import com.amazonaws.services.kinesisvideo.AmazonKinesisVideoClientBuilder;
import com.amazonaws.services.kinesisvideo.model.CreateStreamRequest;
import com.amazonaws.services.kinesisvideo.model.DeleteStreamRequest;
import com.amazonaws.services.kinesisvideo.model.DescribeStreamRequest;
import com.amazonaws.services.kinesisvideo.model.ResourceNotFoundException;
import com.amazonaws.services.kinesisvideo.model.StartSelector;
import com.amazonaws.services.kinesisvideo.model.StartSelectorType;
import com.amazonaws.services.kinesisvideo.model.StreamInfo;
import lombok.Builder;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;

import java.io.BufferedOutputStream;
import java.io.Closeable;
Expand All @@ -48,11 +42,9 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

/**
* Example for integrating with Kinesis Video.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://aws.amazon.com/apache2.0/
or in the "license" file accompanying this file.
This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
package com.amazonaws.kinesisvideo.parser.examples;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.kinesisvideo.parser.utilities.consumer.MergedOutputPiperFactory;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesisvideo.AmazonKinesisVideo;
import com.amazonaws.services.kinesisvideo.AmazonKinesisVideoClientBuilder;
import com.amazonaws.services.kinesisvideo.model.StartSelector;
import com.amazonaws.services.kinesisvideo.model.StartSelectorType;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Example for continuously piping the output of GetMedia calls from a Kinesis Video stream to GStreamer.
*/
@Slf4j
public class KinesisVideoGStreamerPiperExample extends KinesisVideoCommon {
private static final String DEFAULT_PATH_TO_GSTREAMER = "/usr/bin/gst-launch-1.0";
private static final String [] FDSRC_ARGS = new String[] {
"-v",
"fdsrc",
"!"
};

private final AmazonKinesisVideo amazonKinesisVideo;
private final InputStream inputStream;
private final ExecutorService executorService;
private PutMediaWorker putMediaWorker;
private final StreamOps streamOps;
//The arguments to construct the gstreamer pipeline.
//The merged output of GetMedia will be piped to the gstreamer pipeline created using these arguments.
private final List<String> gStreamerPipelineArguments;


@Builder
private KinesisVideoGStreamerPiperExample(Regions region,
String streamName,
AWSCredentialsProvider credentialsProvider,
InputStream inputVideoStream,
String gStreamerPipelineArgument) {
super(region, credentialsProvider, streamName);
final AmazonKinesisVideoClientBuilder builder = AmazonKinesisVideoClientBuilder.standard();
configureClient(builder);
this.amazonKinesisVideo = builder.build();
this.inputStream = inputVideoStream;
this.streamOps = new StreamOps(region, streamName, credentialsProvider);
this.executorService = Executors.newFixedThreadPool(2);
this.gStreamerPipelineArguments = new ArrayList<>();
addGStreamerPipelineArguments(gStreamerPipelineArgument);
}

private void addGStreamerPipelineArguments(String gStreamerPipeLineArgument) {
this.gStreamerPipelineArguments.add(pathToExecutable("PATH_TO_GSTREAMER", DEFAULT_PATH_TO_GSTREAMER));
addToPipelineArguments(FDSRC_ARGS);
addToPipelineArguments(gStreamerPipeLineArgument.split("\\s+"));
}

private String pathToExecutable(String environmentVariable, String defaultPath) {
final String environmentVariableValue = System.getenv(environmentVariable);
return StringUtils.isEmpty(environmentVariableValue) ? defaultPath : environmentVariableValue;
}

private void addToPipelineArguments(String []pipelineArguments) {
for (String pipelineArgument : pipelineArguments) {
this.gStreamerPipelineArguments.add(pipelineArgument);
}
}

/**
* This method executes the example.
*
* @throws InterruptedException the thread is interrupted while waiting for the stream to enter the correct state.
* @throws IOException fails to read video from the input stream or write to the output stream.
*/
public void execute () throws InterruptedException, IOException {
//Create the Kinesis Video stream, deleting and recreating if necessary.
streamOps.recreateStreamIfNecessary();

ContinuousGetMediaWorker getWorker = ContinuousGetMediaWorker.create(getRegion(),
getCredentialsProvider(),
getStreamName(),
new StartSelector().withStartSelectorType(StartSelectorType.EARLIEST),
amazonKinesisVideo,
new MergedOutputPiperFactory(Optional.empty(),
true,
gStreamerPipelineArguments));

executorService.submit(getWorker);

//Start a PutMedia worker to write data to a Kinesis Video Stream.
putMediaWorker = PutMediaWorker.create(getRegion(),
getCredentialsProvider(),
getStreamName(),
inputStream,
amazonKinesisVideo);
executorService.submit(putMediaWorker);

Thread.sleep(3000);
getWorker.stop();

executorService.shutdown();
executorService.awaitTermination(120, TimeUnit.SECONDS);
if (!executorService.isTerminated()) {
log.warn("Shutting down executor service by force");
executorService.shutdownNow();
} else {
log.info("Executor service is shutdown");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.amazonaws.kinesisvideo.parser.mkv;


/**
* Base visitor for visiting the different types of elements vended by a {\link StreamingMkvReader}.
*/
Expand All @@ -23,4 +22,8 @@ public abstract class MkvElementVisitor {
public abstract void visit(MkvEndMasterElement endMasterElement) throws MkvElementVisitException;

public abstract void visit(MkvDataElement dataElement) throws MkvElementVisitException;

public boolean isDone() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public Optional<MkvElement> nextIfAvailable() {
* @throws MkvElementVisitException If the visitor fails.
*/
public void apply(MkvElementVisitor visitor) throws MkvElementVisitException {
while (this.mightHaveNext()) {
while (this.mightHaveNext() && !visitor.isDone()) {
Optional<MkvElement> mkvElementOptional = this.nextIfAvailable();
if (mkvElementOptional.isPresent()) {
mkvElementOptional.get().accept(visitor);
Expand Down
Loading

0 comments on commit b72c4d8

Please sign in to comment.