Skip to content

Commit

Permalink
Merge pull request #31 from qqia/release_1.0.7
Browse files Browse the repository at this point in the history
Release 1.0.7
  • Loading branch information
zhiyua-git authored Sep 28, 2018
2 parents 7fb5cc9 + 78043bb commit 57063da
Show file tree
Hide file tree
Showing 21 changed files with 424 additions and 89 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ with the AWS SDK for the Kinesis Video. This example provides examples for
The Gstreamer pipeline is a toy example that demonstrates that Gstreamer can parse the mkv passed into it.
## Release Notes
### Release 1.0.7 (Sep 2018)
* Add flag in KinesisVideoRendererExample and KinesisVideoExample to use the existing stream (and not doing PutMedia again if it exists already).
* Added support to retrieve the information from FragmentMetadata and display in the image panel during rendering.
### Release 1.0.6 (Sep 2018)
* Introduce handling for empty fragment metadata
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<artifactId>amazon-kinesis-video-streams-parser-library</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Video Streams Parser Library</name>
<version>1.0.6</version>
<version>1.0.7</version>
<description>The Amazon Kinesis Video Streams Parser Library for Java enables Java developers to parse the streams
returned by GetMedia calls to Amazon Kinesis Video.
</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,19 @@
import java.nio.ByteBuffer;
import java.util.Optional;

import static com.amazonaws.kinesisvideo.parser.ebml.EBMLUtils.UNKNOWN_LENGTH_VALUE;

/**
* This class is used by the parser to represent an EBML Element internally.
*/
@ToString
class EBMLParserInternalElement {
enum ElementReadState { NEW, ID_DONE, SIZE_DONE, CONTENT_READING, CONTENT_SKIPPING, FINISHED }

static final long UNKNOWN_LENGTH_VALUE = 0xFFFFFFFFFFFFFFL;

private final long startingOffset;
@Getter
private final long elementCount;


ElementReadState currentElementReadState = ElementReadState.NEW;

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
*/
public class EBMLUtils {

public static final long UNKNOWN_LENGTH_VALUE = -1;

/**
* Max length for a EBML ID
*/
Expand All @@ -36,7 +38,6 @@ private EBMLUtils() {

}


/**
* constant for byte with first bit set.
*/
Expand Down Expand Up @@ -125,9 +126,19 @@ private static void readEbmlInt(final TrackingReplayableIdAndSizeByteSource sour
// Read the rest of the bytes
final long rest = readEbmlValueNumber(source, size);

long value = (firstByte & ~((byte) BYTE_WITH_FIRST_BIT_SET >> size)) << (size * Byte.SIZE) | rest;

long unknownValue = (0xff >> (size + 1));
unknownValue <<= size * 8;
unknownValue |= (1L << (size * 8)) - 1;

// Special handing for unknown length
if (value == unknownValue) {
value = -1;
}

// Slap the first byte's value onto the front (with the first one-bit unset)
resultAcceptor.accept((firstByte & ~((byte) BYTE_WITH_FIRST_BIT_SET >> size)) << (size * Byte.SIZE) | rest,
size + 1);
resultAcceptor.accept(value, size + 1);
}

/**
Expand All @@ -148,7 +159,6 @@ public static long readEbmlInt(final ByteBuffer byteBuffer) {
return ((firstByte & ~((byte) BYTE_WITH_FIRST_BIT_SET >> size)) << (size * Byte.SIZE) | rest);
}


/**
* An alias for readEbmlInt that makes it clear we're reading a data size value.
*
Expand All @@ -158,7 +168,6 @@ static void readSize(final TrackingReplayableIdAndSizeByteSource source, SizeCon
readEbmlInt(source, resultAcceptor);
}


private static int readByte(final TrackingReplayableIdAndSizeByteSource source) {
return source.readByte() & BYTE_MASK;
}
Expand Down Expand Up @@ -269,8 +278,6 @@ public static long readDataSignedInteger(final ByteBuffer byteBuffer, long size)
return value;
}



public static BigInteger readDataUnsignedInteger(final ByteBuffer byteBuffer, long size) {
Validate.inclusiveBetween(0L,
(long) EBML_SIZE_MAX_BYTES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,22 @@ public class KinesisVideoExample extends KinesisVideoCommon {
private PutMediaWorker putMediaWorker;
private final StreamOps streamOps;
private GetMediaProcessingArguments getMediaProcessingArguments;
private boolean noSampleInputRequired = false;

@Builder
private KinesisVideoExample(Regions region,
String streamName,
AWSCredentialsProvider credentialsProvider,
InputStream inputVideoStream) {
String streamName,
AWSCredentialsProvider credentialsProvider,
InputStream inputVideoStream,
boolean noSampleInputRequired) {
super(region, credentialsProvider, streamName);
final AmazonKinesisVideoClientBuilder builder = AmazonKinesisVideoClientBuilder.standard();
configureClient(builder);
this.amazonKinesisVideo = builder.build();
this.inputStream = inputVideoStream;
this.streamOps = new StreamOps(region, streamName, credentialsProvider);
this.executorService = Executors.newFixedThreadPool(2);
this.noSampleInputRequired = noSampleInputRequired;
}

/**
Expand All @@ -94,8 +97,8 @@ private KinesisVideoExample(Regions region,
* @throws IOException fails to read video from the input stream or write to the output stream.
*/
public void execute () throws InterruptedException, IOException {
//Create the Kinesis Video stream, deleting and recreating if necessary.
streamOps.recreateStreamIfNecessary();
//Create the Kinesis Video stream if it doesn't exist.
streamOps.createStreamIfNotExist();

getMediaProcessingArguments = GetMediaProcessingArguments.create();

Expand All @@ -104,18 +107,20 @@ public void execute () throws InterruptedException, IOException {
GetMediaWorker getMediaWorker = GetMediaWorker.create(getRegion(),
getCredentialsProvider(),
getStreamName(),
new StartSelector().withStartSelectorType(StartSelectorType.EARLIEST),
new StartSelector().withStartSelectorType(StartSelectorType.NOW),
amazonKinesisVideo,
getMediaProcessingArgumentsLocal.getMkvElementVisitor());
executorService.submit(getMediaWorker);

//Start a PutMedia worker to write data to a Kinesis Video Stream.
putMediaWorker = PutMediaWorker.create(getRegion(),
getCredentialsProvider(),
getStreamName(),
inputStream,
amazonKinesisVideo);
executorService.submit(putMediaWorker);
if (!noSampleInputRequired) {
//Start a PutMedia worker to write data to a Kinesis Video Stream.
putMediaWorker = PutMediaWorker.create(getRegion(),
getCredentialsProvider(),
getStreamName(),
inputStream,
amazonKinesisVideo);
executorService.submit(putMediaWorker);
}

//Wait for the workers to finish.
executorService.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadataVisitor;
import com.amazonaws.kinesisvideo.parser.utilities.FrameVisitor;
import com.amazonaws.kinesisvideo.parser.utilities.H264FrameRenderer;
import com.amazonaws.regions.Regions;
Expand Down Expand Up @@ -57,15 +59,22 @@ public class KinesisVideoRendererExample extends KinesisVideoCommon {
private final StreamOps streamOps;
private final ExecutorService executorService;
private KinesisVideoRendererExample.GetMediaProcessingArguments getMediaProcessingArguments;
private boolean renderFragmentMetadata = true;
private boolean noSampleInputRequired = false;

@Builder
private KinesisVideoRendererExample(Regions region,
String streamName,
AWSCredentialsProvider credentialsProvider, InputStream inputVideoStream) {
String streamName,
AWSCredentialsProvider credentialsProvider,
InputStream inputVideoStream,
boolean renderFragmentMetadata,
boolean noSampleInputRequired) {
super(region, credentialsProvider, streamName);
this.inputStream = inputVideoStream;
this.streamOps = new StreamOps(region, streamName, credentialsProvider);
this.executorService = Executors.newFixedThreadPool(2);
this.renderFragmentMetadata = renderFragmentMetadata;
this.noSampleInputRequired = noSampleInputRequired;
}

/**
Expand All @@ -76,23 +85,29 @@ private KinesisVideoRendererExample(Regions region,
*/
public void execute() throws InterruptedException, IOException {

streamOps.recreateStreamIfNecessary();
getMediaProcessingArguments = KinesisVideoRendererExample.GetMediaProcessingArguments.create();
streamOps.createStreamIfNotExist();

getMediaProcessingArguments = KinesisVideoRendererExample.GetMediaProcessingArguments.create(
renderFragmentMetadata ?
Optional.of(new FragmentMetadataVisitor.BasicMkvTagProcessor()) : Optional.empty());

try (KinesisVideoRendererExample.GetMediaProcessingArguments getMediaProcessingArgumentsLocal = getMediaProcessingArguments) {

//Start a PutMedia worker to write data to a Kinesis Video Stream.
PutMediaWorker putMediaWorker = PutMediaWorker.create(getRegion(),
getCredentialsProvider(),
getStreamName(),
inputStream,
streamOps.amazonKinesisVideo);
executorService.submit(putMediaWorker);
if (!noSampleInputRequired) {
//Start a PutMedia worker to write data to a Kinesis Video Stream.
PutMediaWorker putMediaWorker = PutMediaWorker.create(getRegion(),
getCredentialsProvider(),
getStreamName(),
inputStream,
streamOps.amazonKinesisVideo);
executorService.submit(putMediaWorker);
}

//Start a GetMedia worker to read and process data from the Kinesis Video Stream.
GetMediaWorker getMediaWorker = GetMediaWorker.create(getRegion(),
getCredentialsProvider(),
getStreamName(),
new StartSelector().withStartSelectorType(StartSelectorType.EARLIEST),
new StartSelector().withStartSelectorType(StartSelectorType.NOW),
streamOps.amazonKinesisVideo,
getMediaProcessingArgumentsLocal.getFrameVisitor());
executorService.submit(getMediaWorker);
Expand All @@ -116,10 +131,12 @@ private static class GetMediaProcessingArguments implements Closeable {
this.frameVisitor = frameVisitor;
}

private static GetMediaProcessingArguments create() throws IOException {
private static GetMediaProcessingArguments create(
Optional<FragmentMetadataVisitor.MkvTagProcessor> tagProcessor) throws IOException {

KinesisVideoFrameViewer kinesisVideoFrameViewer = new KinesisVideoFrameViewer(FRAME_WIDTH, FRAME_HEIGHT);
return new GetMediaProcessingArguments(FrameVisitor.create(H264FrameRenderer.create(kinesisVideoFrameViewer)));
return new GetMediaProcessingArguments(
FrameVisitor.create(H264FrameRenderer.create(kinesisVideoFrameViewer), tagProcessor));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,38 @@ public void recreateStreamIfNecessary() throws InterruptedException {
//some basic validations on the response of the create stream
Validate.isTrue(createdStreamInfo.isPresent());
Validate.isTrue(createdStreamInfo.get().getDataRetentionInHours() == DATA_RETENTION_IN_HOURS);
log.info("StreamOps {} created ARN {}", streamName, createdStreamInfo.get().getStreamARN());
log.info("Stream {} created ARN {}", streamName, createdStreamInfo.get().getStreamARN());
}

public void createStreamIfNotExist() throws InterruptedException {
final Optional<StreamInfo> streamInfo = getStreamInfo();
log.info("Stream {} exists {}", streamName, streamInfo.isPresent());
if (!streamInfo.isPresent()) {
//create the stream.
amazonKinesisVideo.createStream(new CreateStreamRequest().withStreamName(streamName)
.withDataRetentionInHours(DATA_RETENTION_IN_HOURS)
.withMediaType("video/h264"));
log.info("CreateStream called for stream {}", streamName);
//wait for stream to become active.
final Optional<StreamInfo> createdStreamInfo =
waitForStateToMatch((s) -> s.isPresent() && "ACTIVE".equals(s.get().getStatus()));
//some basic validations on the response of the create stream
Validate.isTrue(createdStreamInfo.isPresent());
Validate.isTrue(createdStreamInfo.get().getDataRetentionInHours() == DATA_RETENTION_IN_HOURS);
log.info("Stream {} created ARN {}", streamName, createdStreamInfo.get().getStreamARN());
}
}

private void deleteStreamIfPresent() throws InterruptedException {
final Optional<StreamInfo> streamInfo = getStreamInfo();
log.info("StreamOps {} exists {}", streamName, streamInfo.isPresent());
log.info("Stream {} exists {}", streamName, streamInfo.isPresent());
if (streamInfo.isPresent()) {
//Delete the stream
amazonKinesisVideo.deleteStream(new DeleteStreamRequest().withStreamARN(streamInfo.get().getStreamARN()));
log.info("DeleteStream called for stream {} ARN {} ", streamName, streamInfo.get().getStreamARN());
//Wait for stream to be deleted
waitForStateToMatch((s) -> !s.isPresent());
log.info("StreamOps {} deleted", streamName);
log.info("Stream {} deleted", streamName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.nio.channels.WritableByteChannel;
import java.util.List;

import static com.amazonaws.kinesisvideo.parser.ebml.EBMLUtils.UNKNOWN_LENGTH_VALUE;

/**
* Class representing the start of a mkv master element.
* It includes the bytes containing the id and size of the element along with its {@link EBMLElementMetaData}
Expand Down Expand Up @@ -62,7 +64,7 @@ public boolean equivalent(MkvElement other) {
}

public boolean isUnknownLength() {
return dataSize == 0xFFFFFFFFFFFFFFL;
return dataSize == UNKNOWN_LENGTH_VALUE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://aws.amazon.com/apache2.0/
or in the "license" file accompanying this file.
This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
*/
package com.amazonaws.kinesisvideo.parser.utilities;

import javax.annotation.Nonnull;
import java.awt.Color;
import java.awt.Font;
import java.awt.Graphics;
import java.awt.image.BufferedImage;

public final class BufferedImageUtil {
private static final int DEFAULT_FONT_SIZE = 13;
private static final Font DEFAULT_FONT = new Font(null, Font.CENTER_BASELINE, DEFAULT_FONT_SIZE);

public static void addTextToImage(@Nonnull BufferedImage bufferedImage, String text, int pixelX, int pixelY) {
Graphics graphics = bufferedImage.getGraphics();
graphics.setColor(Color.YELLOW);
graphics.setFont(DEFAULT_FONT);
for (String line : text.split(MkvTag.class.getSimpleName())) {
graphics.drawString(line, pixelX, pixelY += graphics.getFontMetrics().getHeight());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.amazonaws.kinesisvideo.parser.utilities;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
Expand Down Expand Up @@ -54,8 +53,8 @@ public class FragmentMetadata {
private Optional<String> continuationToken = Optional.empty();

private FragmentMetadata(String fragmentNumberString,
double serverSideTimestampSeconds,
double producerSideTimestampSeconds) {
double serverSideTimestampSeconds,
double producerSideTimestampSeconds) {
this(fragmentNumberString,
convertToMillis(serverSideTimestampSeconds),
convertToMillis(producerSideTimestampSeconds),
Expand All @@ -69,11 +68,11 @@ private FragmentMetadata(String fragmentNumberString, long errorId, String error
}

private FragmentMetadata(String fragmentNumberString,
long serverSideTimestampMillis,
long producerSideTimestampMillis,
boolean success,
long errorId,
String errorCode) {
long serverSideTimestampMillis,
long producerSideTimestampMillis,
boolean success,
long errorId,
String errorCode) {
this.fragmentNumberString = fragmentNumberString;
this.fragmentNumber = new BigInteger(fragmentNumberString);
this.serverSideTimestampMillis = serverSideTimestampMillis;
Expand Down
Loading

0 comments on commit 57063da

Please sign in to comment.