From 78043bbf6dbd6a55625cab25ad3bbf84f06bc4c5 Mon Sep 17 00:00:00 2001
From: "Qia (Chad) Wang" <qqia@amazon.com>
Date: Fri, 28 Sep 2018 18:52:57 +0000
Subject: [PATCH] Release 1.0.7

---
 README.md                                     |   3 +
 pom.xml                                       |   2 +-
 .../ebml/EBMLParserInternalElement.java       |   5 +-
 .../kinesisvideo/parser/ebml/EBMLUtils.java   |  21 ++--
 .../parser/examples/KinesisVideoExample.java  |  31 +++---
 .../examples/KinesisVideoRendererExample.java |  45 +++++---
 .../parser/examples/StreamOps.java            |  25 ++++-
 .../parser/mkv/MkvStartMasterElement.java     |   4 +-
 .../parser/utilities/BufferedImageUtil.java   |  34 ++++++
 .../parser/utilities/FragmentMetadata.java    |  15 ++-
 .../utilities/FragmentMetadataVisitor.java    |  74 +++++++++---
 .../parser/utilities/FrameVisitor.java        |  34 +++++-
 .../parser/utilities/H264FrameRenderer.java   |  39 +++++--
 .../kinesisvideo/parser/utilities/MkvTag.java |  34 ++++++
 .../parser/ebml/EBMLParserTest.java           |  10 +-
 .../examples/KinesisVideoExampleTest.java     |  13 +++
 .../KinesisVideoRendererExampleTest.java      |  16 +++
 .../FragmentMetadataVisitorTest.java          | 105 +++++++++++++++++-
 .../utilities/H264FrameRendererTest.java      |   3 +-
 src/test/resources/test_mixed_tags.mkv        | Bin 0 -> 22768 bytes
 .../resources/test_tags_empty_cluster.mkv     | Bin 0 -> 426 bytes
 21 files changed, 424 insertions(+), 89 deletions(-)
 create mode 100644 src/main/java/com/amazonaws/kinesisvideo/parser/utilities/BufferedImageUtil.java
 create mode 100644 src/main/java/com/amazonaws/kinesisvideo/parser/utilities/MkvTag.java
 create mode 100644 src/test/resources/test_mixed_tags.mkv
 create mode 100644 src/test/resources/test_tags_empty_cluster.mkv

diff --git a/README.md b/README.md
index 3bae18a..e0aa468 100644
--- a/README.md
+++ b/README.md
@@ -104,6 +104,9 @@ with the AWS SDK for the Kinesis Video. This example provides examples for
  The Gstreamer pipeline is a toy example that demonstrates that Gstreamer can parse the mkv passed into it. 
 
 ## Release Notes
+### Release 1.0.7 (Sep 2018)
+* Add flag in KinesisVideoRendererExample and KinesisVideoExample to use the existing stream (and not doing PutMedia again if it exists already).
+* Added support to retrieve the information from FragmentMetadata and display in the image panel during rendering.
 
 ### Release 1.0.6 (Sep 2018)
 * Introduce handling for empty fragment metadata
diff --git a/pom.xml b/pom.xml
index 9485a95..31301c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
     <artifactId>amazon-kinesis-video-streams-parser-library</artifactId>
     <packaging>jar</packaging>
     <name>Amazon Kinesis Video Streams Parser Library</name>
-    <version>1.0.6</version>
+    <version>1.0.7</version>
     <description>The Amazon Kinesis Video Streams Parser Library for Java enables Java developers to parse the streams
         returned by GetMedia calls to Amazon Kinesis Video.
     </description>
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLParserInternalElement.java b/src/main/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLParserInternalElement.java
index 3333455..47725a2 100644
--- a/src/main/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLParserInternalElement.java
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLParserInternalElement.java
@@ -20,6 +20,8 @@
 import java.nio.ByteBuffer;
 import java.util.Optional;
 
+import static com.amazonaws.kinesisvideo.parser.ebml.EBMLUtils.UNKNOWN_LENGTH_VALUE;
+
 /**
  * This class is used by the parser to represent an EBML Element internally.
  */
@@ -27,13 +29,10 @@
 class EBMLParserInternalElement {
     enum ElementReadState { NEW, ID_DONE, SIZE_DONE, CONTENT_READING, CONTENT_SKIPPING, FINISHED }
 
-    static final long UNKNOWN_LENGTH_VALUE = 0xFFFFFFFFFFFFFFL;
-
     private final long startingOffset;
     @Getter
     private final long elementCount;
 
-
     ElementReadState currentElementReadState = ElementReadState.NEW;
 
     @Getter
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLUtils.java b/src/main/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLUtils.java
index e74206b..7b5820a 100644
--- a/src/main/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLUtils.java
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLUtils.java
@@ -23,6 +23,8 @@
  */
 public class EBMLUtils {
 
+    public static final long UNKNOWN_LENGTH_VALUE = -1;
+
     /**
      * Max length for a EBML ID
      */
@@ -36,7 +38,6 @@ private EBMLUtils() {
 
     }
 
-
     /**
      * constant for byte with first bit set.
      */
@@ -125,9 +126,19 @@ private static void readEbmlInt(final TrackingReplayableIdAndSizeByteSource sour
         // Read the rest of the bytes
         final long rest = readEbmlValueNumber(source, size);
 
+        long value = (firstByte & ~((byte) BYTE_WITH_FIRST_BIT_SET >> size)) << (size * Byte.SIZE) | rest;
+
+        long unknownValue = (0xff >> (size + 1));
+        unknownValue <<= size * 8;
+        unknownValue |= (1L << (size * 8)) - 1;
+
+        // Special handing for unknown length
+        if (value == unknownValue) {
+            value = -1;
+        }
+
         // Slap the first byte's value onto the front (with the first one-bit unset)
-        resultAcceptor.accept((firstByte & ~((byte) BYTE_WITH_FIRST_BIT_SET >> size)) << (size * Byte.SIZE) | rest,
-                size + 1);
+        resultAcceptor.accept(value, size + 1);
     }
 
     /**
@@ -148,7 +159,6 @@ public static long readEbmlInt(final ByteBuffer byteBuffer) {
         return ((firstByte & ~((byte) BYTE_WITH_FIRST_BIT_SET >> size)) << (size * Byte.SIZE) | rest);
     }
 
-
     /**
      * An alias for readEbmlInt that makes it clear we're reading a data size value.
      *
@@ -158,7 +168,6 @@ static void readSize(final TrackingReplayableIdAndSizeByteSource source, SizeCon
         readEbmlInt(source, resultAcceptor);
     }
 
-
     private static int readByte(final TrackingReplayableIdAndSizeByteSource source) {
         return source.readByte() & BYTE_MASK;
     }
@@ -269,8 +278,6 @@ public static long readDataSignedInteger(final ByteBuffer byteBuffer, long size)
         return value;
     }
 
-
-
     public static BigInteger readDataUnsignedInteger(final ByteBuffer byteBuffer, long size) {
         Validate.inclusiveBetween(0L,
                 (long) EBML_SIZE_MAX_BYTES,
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExample.java b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExample.java
index e944c60..f3d36d5 100644
--- a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExample.java
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExample.java
@@ -72,12 +72,14 @@ public class KinesisVideoExample extends KinesisVideoCommon {
     private PutMediaWorker putMediaWorker;
     private final StreamOps streamOps;
     private GetMediaProcessingArguments getMediaProcessingArguments;
+    private boolean noSampleInputRequired = false;
 
     @Builder
     private KinesisVideoExample(Regions region,
-            String streamName,
-            AWSCredentialsProvider credentialsProvider,
-            InputStream inputVideoStream) {
+                                String streamName,
+                                AWSCredentialsProvider credentialsProvider,
+                                InputStream inputVideoStream,
+                                boolean noSampleInputRequired) {
         super(region, credentialsProvider, streamName);
         final AmazonKinesisVideoClientBuilder builder = AmazonKinesisVideoClientBuilder.standard();
         configureClient(builder);
@@ -85,6 +87,7 @@ private KinesisVideoExample(Regions region,
         this.inputStream = inputVideoStream;
         this.streamOps = new StreamOps(region,  streamName, credentialsProvider);
         this.executorService = Executors.newFixedThreadPool(2);
+        this.noSampleInputRequired = noSampleInputRequired;
     }
 
     /**
@@ -94,8 +97,8 @@ private KinesisVideoExample(Regions region,
      * @throws IOException fails to read video from the input stream or write to the output stream.
      */
     public void execute () throws InterruptedException, IOException {
-        //Create the Kinesis Video stream, deleting and recreating if necessary.
-        streamOps.recreateStreamIfNecessary();
+        //Create the Kinesis Video stream if it doesn't exist.
+        streamOps.createStreamIfNotExist();
 
         getMediaProcessingArguments = GetMediaProcessingArguments.create();
 
@@ -104,18 +107,20 @@ public void execute () throws InterruptedException, IOException {
             GetMediaWorker getMediaWorker = GetMediaWorker.create(getRegion(),
                     getCredentialsProvider(),
                     getStreamName(),
-                    new StartSelector().withStartSelectorType(StartSelectorType.EARLIEST),
+                    new StartSelector().withStartSelectorType(StartSelectorType.NOW),
                     amazonKinesisVideo,
                     getMediaProcessingArgumentsLocal.getMkvElementVisitor());
             executorService.submit(getMediaWorker);
 
-            //Start a PutMedia worker to write data to a Kinesis Video Stream.
-            putMediaWorker = PutMediaWorker.create(getRegion(),
-                    getCredentialsProvider(),
-                    getStreamName(),
-                    inputStream,
-                    amazonKinesisVideo);
-            executorService.submit(putMediaWorker);
+            if (!noSampleInputRequired) {
+                //Start a PutMedia worker to write data to a Kinesis Video Stream.
+                putMediaWorker = PutMediaWorker.create(getRegion(),
+                        getCredentialsProvider(),
+                        getStreamName(),
+                        inputStream,
+                        amazonKinesisVideo);
+                executorService.submit(putMediaWorker);
+            }
 
             //Wait for the workers to finish.
             executorService.shutdown();
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoRendererExample.java b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoRendererExample.java
index d499bef..a8b657d 100644
--- a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoRendererExample.java
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoRendererExample.java
@@ -16,11 +16,13 @@
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadataVisitor;
 import com.amazonaws.kinesisvideo.parser.utilities.FrameVisitor;
 import com.amazonaws.kinesisvideo.parser.utilities.H264FrameRenderer;
 import com.amazonaws.regions.Regions;
@@ -57,15 +59,22 @@ public class KinesisVideoRendererExample extends KinesisVideoCommon {
     private final StreamOps streamOps;
     private final ExecutorService executorService;
     private KinesisVideoRendererExample.GetMediaProcessingArguments getMediaProcessingArguments;
+    private boolean renderFragmentMetadata = true;
+    private boolean noSampleInputRequired = false;
 
     @Builder
     private KinesisVideoRendererExample(Regions region,
-                                           String streamName,
-                                           AWSCredentialsProvider credentialsProvider, InputStream inputVideoStream) {
+                                        String streamName,
+                                        AWSCredentialsProvider credentialsProvider,
+                                        InputStream inputVideoStream,
+                                        boolean renderFragmentMetadata,
+                                        boolean noSampleInputRequired) {
         super(region, credentialsProvider, streamName);
         this.inputStream = inputVideoStream;
         this.streamOps = new StreamOps(region,  streamName, credentialsProvider);
         this.executorService = Executors.newFixedThreadPool(2);
+        this.renderFragmentMetadata = renderFragmentMetadata;
+        this.noSampleInputRequired = noSampleInputRequired;
     }
 
     /**
@@ -76,23 +85,29 @@ private KinesisVideoRendererExample(Regions region,
      */
     public void execute() throws InterruptedException, IOException {
 
-        streamOps.recreateStreamIfNecessary();
-        getMediaProcessingArguments = KinesisVideoRendererExample.GetMediaProcessingArguments.create();
+        streamOps.createStreamIfNotExist();
+
+        getMediaProcessingArguments = KinesisVideoRendererExample.GetMediaProcessingArguments.create(
+                renderFragmentMetadata ?
+                        Optional.of(new FragmentMetadataVisitor.BasicMkvTagProcessor()) : Optional.empty());
+
         try (KinesisVideoRendererExample.GetMediaProcessingArguments getMediaProcessingArgumentsLocal = getMediaProcessingArguments) {
 
-            //Start a PutMedia worker to write data to a Kinesis Video Stream.
-          PutMediaWorker putMediaWorker = PutMediaWorker.create(getRegion(),
-                    getCredentialsProvider(),
-                    getStreamName(),
-                    inputStream,
-                  streamOps.amazonKinesisVideo);
-            executorService.submit(putMediaWorker);
+            if (!noSampleInputRequired) {
+                //Start a PutMedia worker to write data to a Kinesis Video Stream.
+                PutMediaWorker putMediaWorker = PutMediaWorker.create(getRegion(),
+                        getCredentialsProvider(),
+                        getStreamName(),
+                        inputStream,
+                        streamOps.amazonKinesisVideo);
+                executorService.submit(putMediaWorker);
+            }
 
             //Start a GetMedia worker to read and process data from the Kinesis Video Stream.
             GetMediaWorker getMediaWorker = GetMediaWorker.create(getRegion(),
                     getCredentialsProvider(),
                     getStreamName(),
-                    new StartSelector().withStartSelectorType(StartSelectorType.EARLIEST),
+                    new StartSelector().withStartSelectorType(StartSelectorType.NOW),
                     streamOps.amazonKinesisVideo,
                     getMediaProcessingArgumentsLocal.getFrameVisitor());
             executorService.submit(getMediaWorker);
@@ -116,10 +131,12 @@ private static class GetMediaProcessingArguments implements Closeable {
             this.frameVisitor = frameVisitor;
         }
 
-        private static GetMediaProcessingArguments create() throws IOException {
+        private static GetMediaProcessingArguments create(
+                Optional<FragmentMetadataVisitor.MkvTagProcessor> tagProcessor) throws IOException {
 
             KinesisVideoFrameViewer kinesisVideoFrameViewer = new KinesisVideoFrameViewer(FRAME_WIDTH, FRAME_HEIGHT);
-            return new GetMediaProcessingArguments(FrameVisitor.create(H264FrameRenderer.create(kinesisVideoFrameViewer)));
+            return new GetMediaProcessingArguments(
+                        FrameVisitor.create(H264FrameRenderer.create(kinesisVideoFrameViewer), tagProcessor));
         }
 
         @Override
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/StreamOps.java b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/StreamOps.java
index f95240d..b031280 100644
--- a/src/main/java/com/amazonaws/kinesisvideo/parser/examples/StreamOps.java
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/examples/StreamOps.java
@@ -68,19 +68,38 @@ public void recreateStreamIfNecessary() throws InterruptedException {
         //some basic validations on the response of the create stream
         Validate.isTrue(createdStreamInfo.isPresent());
         Validate.isTrue(createdStreamInfo.get().getDataRetentionInHours() == DATA_RETENTION_IN_HOURS);
-        log.info("StreamOps {} created ARN {}", streamName, createdStreamInfo.get().getStreamARN());
+        log.info("Stream {} created ARN {}", streamName, createdStreamInfo.get().getStreamARN());
+    }
+
+    public void createStreamIfNotExist() throws InterruptedException {
+        final Optional<StreamInfo> streamInfo = getStreamInfo();
+        log.info("Stream {} exists {}", streamName, streamInfo.isPresent());
+        if (!streamInfo.isPresent()) {
+            //create the stream.
+            amazonKinesisVideo.createStream(new CreateStreamRequest().withStreamName(streamName)
+                    .withDataRetentionInHours(DATA_RETENTION_IN_HOURS)
+                    .withMediaType("video/h264"));
+            log.info("CreateStream called for stream {}", streamName);
+            //wait for stream to become active.
+            final Optional<StreamInfo> createdStreamInfo =
+                    waitForStateToMatch((s) -> s.isPresent() && "ACTIVE".equals(s.get().getStatus()));
+            //some basic validations on the response of the create stream
+            Validate.isTrue(createdStreamInfo.isPresent());
+            Validate.isTrue(createdStreamInfo.get().getDataRetentionInHours() == DATA_RETENTION_IN_HOURS);
+            log.info("Stream {} created ARN {}", streamName, createdStreamInfo.get().getStreamARN());
+        }
     }
 
     private void deleteStreamIfPresent() throws InterruptedException {
         final Optional<StreamInfo> streamInfo = getStreamInfo();
-        log.info("StreamOps {} exists {}", streamName, streamInfo.isPresent());
+        log.info("Stream {} exists {}", streamName, streamInfo.isPresent());
         if (streamInfo.isPresent()) {
             //Delete the stream
             amazonKinesisVideo.deleteStream(new DeleteStreamRequest().withStreamARN(streamInfo.get().getStreamARN()));
             log.info("DeleteStream called for stream {} ARN {} ", streamName, streamInfo.get().getStreamARN());
             //Wait for stream to be deleted
             waitForStateToMatch((s) -> !s.isPresent());
-            log.info("StreamOps {} deleted", streamName);
+            log.info("Stream {} deleted", streamName);
         }
     }
 
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/MkvStartMasterElement.java b/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/MkvStartMasterElement.java
index 8987de5..bcb4d17 100644
--- a/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/MkvStartMasterElement.java
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/mkv/MkvStartMasterElement.java
@@ -22,6 +22,8 @@
 import java.nio.channels.WritableByteChannel;
 import java.util.List;
 
+import static com.amazonaws.kinesisvideo.parser.ebml.EBMLUtils.UNKNOWN_LENGTH_VALUE;
+
 /**
  * Class representing the start of a mkv master element.
  * It includes the bytes containing the id and size of the element along with its {@link EBMLElementMetaData}
@@ -62,7 +64,7 @@ public boolean equivalent(MkvElement other) {
     }
 
     public boolean isUnknownLength() {
-        return dataSize == 0xFFFFFFFFFFFFFFL;
+        return dataSize == UNKNOWN_LENGTH_VALUE;
     }
 
     @Override
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/BufferedImageUtil.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/BufferedImageUtil.java
new file mode 100644
index 0000000..4e6fab4
--- /dev/null
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/BufferedImageUtil.java
@@ -0,0 +1,34 @@
+/*
+Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License"). 
+You may not use this file except in compliance with the License. 
+A copy of the License is located at
+
+   http://aws.amazon.com/apache2.0/
+
+or in the "license" file accompanying this file. 
+This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and limitations under the License.
+*/
+package com.amazonaws.kinesisvideo.parser.utilities;
+
+import javax.annotation.Nonnull;
+import java.awt.Color;
+import java.awt.Font;
+import java.awt.Graphics;
+import java.awt.image.BufferedImage;
+
+public final class BufferedImageUtil {
+    private static final int DEFAULT_FONT_SIZE = 13;
+    private static final Font DEFAULT_FONT = new Font(null, Font.CENTER_BASELINE, DEFAULT_FONT_SIZE);
+
+    public static void addTextToImage(@Nonnull BufferedImage bufferedImage, String text, int pixelX, int pixelY) {
+        Graphics graphics = bufferedImage.getGraphics();
+        graphics.setColor(Color.YELLOW);
+        graphics.setFont(DEFAULT_FONT);
+        for (String line : text.split(MkvTag.class.getSimpleName())) {
+            graphics.drawString(line, pixelX, pixelY += graphics.getFontMetrics().getHeight());
+        }
+    }
+}
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadata.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadata.java
index 19a5823..28fb9f8 100644
--- a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadata.java
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadata.java
@@ -13,7 +13,6 @@
 */
 package com.amazonaws.kinesisvideo.parser.utilities;
 
-import lombok.Builder;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
@@ -54,8 +53,8 @@ public class FragmentMetadata {
     private Optional<String> continuationToken = Optional.empty();
 
     private FragmentMetadata(String fragmentNumberString,
-            double serverSideTimestampSeconds,
-            double producerSideTimestampSeconds) {
+                             double serverSideTimestampSeconds,
+                             double producerSideTimestampSeconds) {
         this(fragmentNumberString,
                 convertToMillis(serverSideTimestampSeconds),
                 convertToMillis(producerSideTimestampSeconds),
@@ -69,11 +68,11 @@ private FragmentMetadata(String fragmentNumberString, long errorId, String error
     }
 
     private FragmentMetadata(String fragmentNumberString,
-            long serverSideTimestampMillis,
-            long producerSideTimestampMillis,
-            boolean success,
-            long errorId,
-            String errorCode) {
+                             long serverSideTimestampMillis,
+                             long producerSideTimestampMillis,
+                             boolean success,
+                             long errorId,
+                             String errorCode) {
         this.fragmentNumberString = fragmentNumberString;
         this.fragmentNumber = new BigInteger(fragmentNumberString);
         this.serverSideTimestampMillis = serverSideTimestampMillis;
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitor.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitor.java
index 49e409a..71469f4 100644
--- a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitor.java
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitor.java
@@ -16,7 +16,6 @@
 import com.amazonaws.kinesisvideo.parser.ebml.EBMLElementMetaData;
 import com.amazonaws.kinesisvideo.parser.ebml.EBMLTypeInfo;
 import com.amazonaws.kinesisvideo.parser.ebml.MkvTypeInfos;
-import com.amazonaws.kinesisvideo.parser.mkv.visitors.CompositeMkvElementVisitor;
 import com.amazonaws.kinesisvideo.parser.mkv.MkvDataElement;
 import com.amazonaws.kinesisvideo.parser.mkv.MkvElement;
 import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException;
@@ -24,8 +23,10 @@
 import com.amazonaws.kinesisvideo.parser.mkv.MkvEndMasterElement;
 import com.amazonaws.kinesisvideo.parser.mkv.MkvStartMasterElement;
 import com.amazonaws.kinesisvideo.parser.mkv.MkvValue;
+import com.amazonaws.kinesisvideo.parser.mkv.visitors.CompositeMkvElementVisitor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.Validate;
 
 import java.math.BigInteger;
@@ -63,12 +64,22 @@ public class FragmentMetadataVisitor extends CompositeMkvElementVisitor {
     };
     private static final String AWS_KINESISVIDEO_TAGNAME_PREFIX = "AWS_KINESISVIDEO";
 
+    public interface MkvTagProcessor {
+        default void process(MkvTag mkvTag, Optional<FragmentMetadata> currentFragmentMetadata) {
+            throw new NotImplementedException("Default FragmentMetadataVisitor.MkvTagProcessor");
+        }
+        default void clear() {
+            throw new NotImplementedException("Default FragmentMetadataVisitor.MkvTagProcessor");
+        }
+    }
+
     private final MkvChildElementCollector tagCollector;
     private final MkvChildElementCollector trackCollector;
     private final StateMachineVisitor stateMachineVisitor;
 
-    private final Set<EBMLTypeInfo> trackTypesForTrackMetadata = new HashSet();
+    private final Optional<MkvTagProcessor> mkvTagProcessor;
 
+    private final Set<EBMLTypeInfo> trackTypesForTrackMetadata = new HashSet();
 
     @Getter
     private Optional<FragmentMetadata> previousFragmentMetadata = Optional.empty();
@@ -80,13 +91,15 @@ public class FragmentMetadataVisitor extends CompositeMkvElementVisitor {
 
     private Optional<String> continuationToken = Optional.empty();
 
+    private final Map<BigInteger, MkvTrackMetadata> trackMetadataMap = new HashMap();
 
-
-    private final Map<BigInteger, MkvTrackMetadata> trackMetadataMap = new HashMap<BigInteger,MkvTrackMetadata>();
+    private String tagName = null;
+    private String tagValue = null;
 
     private FragmentMetadataVisitor(List<MkvElementVisitor> childVisitors,
-            MkvChildElementCollector tagCollector,
-            MkvChildElementCollector trackCollector) {
+                                    MkvChildElementCollector tagCollector,
+                                    MkvChildElementCollector trackCollector,
+                                    Optional<MkvTagProcessor> mkvTagProcessor) {
         super(childVisitors);
         Validate.isTrue(tagCollector.getParentTypeInfo().equals(MkvTypeInfos.TAGS));
         Validate.isTrue(trackCollector.getParentTypeInfo().equals(MkvTypeInfos.TRACKS));
@@ -94,19 +107,24 @@ private FragmentMetadataVisitor(List<MkvElementVisitor> childVisitors,
         this.trackCollector = trackCollector;
         this.stateMachineVisitor = new StateMachineVisitor();
         this.childVisitors.add(stateMachineVisitor);
+        this.mkvTagProcessor = mkvTagProcessor;
         for (EBMLTypeInfo trackType : TRACK_TYPES) {
             this.trackTypesForTrackMetadata.add(trackType);
         }
     }
 
     public static FragmentMetadataVisitor create() {
+        return create(Optional.empty());
+    }
+
+    public static FragmentMetadataVisitor create(Optional<MkvTagProcessor> mkvTagProcessor) {
         final List<MkvElementVisitor> childVisitors = new ArrayList<>();
         final MkvChildElementCollector tagCollector = new MkvChildElementCollector(MkvTypeInfos.TAGS);
         final MkvChildElementCollector trackCollector = new MkvChildElementCollector(MkvTypeInfos.TRACKS);
         childVisitors.add(tagCollector);
         childVisitors.add(trackCollector);
 
-        return new FragmentMetadataVisitor(childVisitors, tagCollector, trackCollector);
+        return new FragmentMetadataVisitor(childVisitors, tagCollector, trackCollector, mkvTagProcessor);
     }
 
     enum State {NEW, PRE_CLUSTER, IN_CLUSTER, POST_CLUSTER}
@@ -158,8 +176,8 @@ public void visit(MkvEndMasterElement endMasterElement) throws MkvElementVisitEx
                 default:
                     break;
             }
-            //If any tags section finishes, try to update the millisbehind latest and continuation token
-            //since there can be multiple in the same segment.
+            // If any tags section finishes, try to update the millisbehind latest and continuation token
+            // since there can be multiple in the same segment.
             if (MkvTypeInfos.TAGS.equals(endMasterElement.getElementMetaData().getTypeInfo())) {
                 if (log.isDebugEnabled()) {
                     log.debug("TAGS end {}, potentially updating millisbehindlatest and continuation token",
@@ -171,7 +189,24 @@ public void visit(MkvEndMasterElement endMasterElement) throws MkvElementVisitEx
 
         @Override
         public void visit(MkvDataElement dataElement) throws MkvElementVisitException {
+            if (mkvTagProcessor.isPresent()) {
+                if (MkvTypeInfos.TAGNAME.equals(dataElement.getElementMetaData().getTypeInfo())) {
+                    tagName = getMkvElementStringVal(dataElement);
+                } else if (MkvTypeInfos.TAGSTRING.equals(dataElement.getElementMetaData().getTypeInfo())) {
+                    tagValue = getMkvElementStringVal(dataElement);
+                }
 
+                if (tagName != null && tagValue != null) {
+                    // Only process non-internal tags
+                    if (!tagName.startsWith(AWS_KINESISVIDEO_TAGNAME_PREFIX)) {
+                        mkvTagProcessor.get().process(new MkvTag(tagName, tagValue), currentFragmentMetadata);
+                    }
+
+                    // Empty the values for new tag
+                    tagName = null;
+                    tagValue = null;
+                }
+            }
         }
     }
 
@@ -203,7 +238,6 @@ private void setMillisBehindLatestAndContinuationToken() {
         }
     }
 
-
     private void collectPreClusterInfo() {
         final Map<String, String> tagNameToTagValueMap = getTagNameToValueMap();
 
@@ -233,7 +267,6 @@ private Map<Long, List<MkvElement>> getTrackEntryMap() {
         return trackEntryElementNumberToMkvElement;
     }
 
-
     private void createTrackMetadata(List<MkvElement> trackEntryPropertyLists) {
         Map<EBMLTypeInfo, MkvElement> metaDataProperties = trackEntryPropertyLists.stream()
                 .filter(e -> trackTypesForTrackMetadata.contains(e.getElementMetaData().getTypeInfo()))
@@ -266,7 +299,6 @@ private static String getStringVal(Map<EBMLTypeInfo, MkvElement> metaDataPropert
         return ((MkvValue<String>)dataElement.getValueCopy()).getVal();
     }
 
-
     private static Optional<BigInteger> getUnsignedLongValOptional(Map<EBMLTypeInfo, MkvElement> metaDataProperties,
             EBMLTypeInfo key) {
         return Optional.ofNullable(getUnsignedLongVal(metaDataProperties, key));
@@ -294,7 +326,6 @@ private static ByteBuffer getByteBuffer(Map<EBMLTypeInfo, MkvElement> metaDataPr
         return ((MkvValue<ByteBuffer>)dataElement.getValueCopy()).getVal();
     }
 
-
     private Map<String, String> getTagNameToValueMap() {
         List<MkvElement> tagElements = tagCollector.copyOfCollection();
         Map<String, Long> tagNameToParentElementNumber = tagElements.stream()
@@ -326,14 +357,29 @@ private static EBMLElementMetaData getParentElement(MkvElement e) {
         return e.getElementPath().get(e.getElementPath().size()-1);
     }
 
-
     private void resetCollectedData() {
         previousFragmentMetadata = currentFragmentMetadata;
         currentFragmentMetadata = Optional.empty();
         trackMetadataMap.clear();
+        tagName = tagValue = null;
 
         tagCollector.clearCollection();
         trackCollector.clearCollection();
     }
 
+
+    public static final class BasicMkvTagProcessor implements FragmentMetadataVisitor.MkvTagProcessor {
+        @Getter
+        private List<MkvTag> tags = new ArrayList<>();
+
+        @Override
+        public void process(MkvTag mkvTag, Optional<FragmentMetadata> currentFragmentMetadata) {
+            tags.add(mkvTag);
+        }
+
+        @Override
+        public void clear() {
+            tags.clear();
+        }
+    }
 }
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FrameVisitor.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FrameVisitor.java
index 52ba05c..e6966e8 100644
--- a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FrameVisitor.java
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/FrameVisitor.java
@@ -29,17 +29,27 @@ public class FrameVisitor extends CompositeMkvElementVisitor {
     private final FragmentMetadataVisitor fragmentMetadataVisitor;
     private final FrameVisitorInternal frameVisitorInternal;
     private final FrameProcessor frameProcessor;
+    private final Optional<FragmentMetadataVisitor.MkvTagProcessor> tagProcessor;
 
-    private FrameVisitor(FragmentMetadataVisitor fragmentMetadataVisitor, FrameProcessor frameProcessor) {
+    private FrameVisitor(FragmentMetadataVisitor fragmentMetadataVisitor,
+                         Optional<FragmentMetadataVisitor.MkvTagProcessor> tagProcessor,
+                         FrameProcessor frameProcessor) {
         super(fragmentMetadataVisitor);
         this.fragmentMetadataVisitor = fragmentMetadataVisitor;
         this.frameVisitorInternal = new FrameVisitorInternal();
         this.childVisitors.add(this.frameVisitorInternal);
         this.frameProcessor = frameProcessor;
+        this.tagProcessor = tagProcessor;
     }
 
     public static FrameVisitor create(FrameProcessor frameProcessor) {
-        return new FrameVisitor(FragmentMetadataVisitor.create(), frameProcessor);
+        return new FrameVisitor(FragmentMetadataVisitor.create(), Optional.empty(), frameProcessor);
+    }
+
+    public static FrameVisitor create(FrameProcessor frameProcessor,
+                                      Optional<FragmentMetadataVisitor.MkvTagProcessor> tagProcessor) {
+        return new FrameVisitor(FragmentMetadataVisitor.create(tagProcessor),
+                tagProcessor, frameProcessor);
     }
 
     public interface FrameProcessor {
@@ -47,6 +57,15 @@ default void process(Frame frame, MkvTrackMetadata trackMetadata,
                              Optional<FragmentMetadata> fragmentMetadata) {
             throw new NotImplementedException("Default FrameVisitor.FrameProcessor");
         }
+        default void process(Frame frame, MkvTrackMetadata trackMetadata,
+                             Optional<FragmentMetadata> fragmentMetadata,
+                             Optional<FragmentMetadataVisitor.MkvTagProcessor> tagProcessor) {
+            if (tagProcessor.isPresent()) {
+                throw new NotImplementedException("Default FrameVisitor.FrameProcessor");
+            } else {
+                process(frame, trackMetadata, fragmentMetadata);
+            }
+        }
     }
 
     private class FrameVisitorInternal extends MkvElementVisitor {
@@ -58,19 +77,24 @@ public void visit(com.amazonaws.kinesisvideo.parser.mkv.MkvStartMasterElement st
         @Override
         public void visit(com.amazonaws.kinesisvideo.parser.mkv.MkvEndMasterElement endMasterElement)
                 throws com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException {
+            if (tagProcessor.isPresent()
+                    && MkvTypeInfos.CLUSTER.equals(endMasterElement.getElementMetaData().getTypeInfo())) {
+                tagProcessor.get().clear();
+            }
         }
 
         @Override
         public void visit(com.amazonaws.kinesisvideo.parser.mkv.MkvDataElement dataElement)
                 throws com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException {
-
             if (MkvTypeInfos.SIMPLEBLOCK.equals(dataElement.getElementMetaData().getTypeInfo())) {
                 MkvValue<Frame> frame = dataElement.getValueCopy();
                 Validate.notNull(frame);
                 MkvTrackMetadata trackMetadata =
                         fragmentMetadataVisitor.getMkvTrackMetadata(frame.getVal().getTrackNumber());
-                    frameProcessor.process(frame.getVal(), trackMetadata,
-                            fragmentMetadataVisitor.getCurrentFragmentMetadata());
+
+                frameProcessor.process(frame.getVal(), trackMetadata,
+                        fragmentMetadataVisitor.getCurrentFragmentMetadata(),
+                        tagProcessor);
             }
         }
     }
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/H264FrameRenderer.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/H264FrameRenderer.java
index 819da70..513c8cf 100644
--- a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/H264FrameRenderer.java
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/H264FrameRenderer.java
@@ -14,26 +14,20 @@
 package com.amazonaws.kinesisvideo.parser.utilities;
 
 import java.awt.image.BufferedImage;
-import java.nio.ByteBuffer;
-import java.util.List;
 import java.util.Optional;
 
 import com.amazonaws.kinesisvideo.parser.examples.KinesisVideoFrameViewer;
 import com.amazonaws.kinesisvideo.parser.mkv.Frame;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.jcodec.codecs.h264.H264Decoder;
-import org.jcodec.codecs.h264.mp4.AvcCBox;
-import org.jcodec.common.model.ColorSpace;
-import org.jcodec.common.model.Picture;
-import org.jcodec.scale.AWTUtil;
-import org.jcodec.scale.Transform;
-import org.jcodec.scale.Yuv420jToRgb;
 
-import static org.jcodec.codecs.h264.H264Utils.splitMOVPacket;
+import static com.amazonaws.kinesisvideo.parser.utilities.BufferedImageUtil.addTextToImage;
+
 
 @Slf4j
 public class H264FrameRenderer extends H264FrameDecoder {
+    private static final int PIXEL_TO_LEFT = 10;
+    private static final int PIXEL_TO_TOP_LINE_1 = 20;
+    private static final int PIXEL_TO_TOP_LINE_2 = 40;
 
     private final KinesisVideoFrameViewer kinesisVideoFrameViewer;
 
@@ -48,8 +42,29 @@ public static H264FrameRenderer create(KinesisVideoFrameViewer kinesisVideoFrame
     }
 
     @Override
-    public void process(Frame frame, MkvTrackMetadata trackMetadata, Optional<FragmentMetadata> fragmentMetadata) {
+    public void process(Frame frame, MkvTrackMetadata trackMetadata, Optional<FragmentMetadata> fragmentMetadata,
+                        Optional<FragmentMetadataVisitor.MkvTagProcessor> tagProcessor) {
         final BufferedImage bufferedImage = decodeH264Frame(frame, trackMetadata);
+        if (tagProcessor.isPresent()) {
+            final FragmentMetadataVisitor.BasicMkvTagProcessor processor =
+                    (FragmentMetadataVisitor.BasicMkvTagProcessor) tagProcessor.get();
+
+            if (fragmentMetadata.isPresent()) {
+                addTextToImage(bufferedImage,
+                        String.format("Fragment Number: %s", fragmentMetadata.get().getFragmentNumberString()),
+                        PIXEL_TO_LEFT, PIXEL_TO_TOP_LINE_1);
+            }
+
+            if (processor.getTags().size() > 0) {
+                addTextToImage(bufferedImage, "Fragment Metadata: " + processor.getTags().toString(),
+                        PIXEL_TO_LEFT, PIXEL_TO_TOP_LINE_2);
+            } else {
+                addTextToImage(bufferedImage, "Fragment Metadata: No Metadata Available",
+                        PIXEL_TO_LEFT, PIXEL_TO_TOP_LINE_2);
+            }
+        }
         kinesisVideoFrameViewer.update(bufferedImage);
     }
+
+
 }
diff --git a/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/MkvTag.java b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/MkvTag.java
new file mode 100644
index 0000000..01bd58f
--- /dev/null
+++ b/src/main/java/com/amazonaws/kinesisvideo/parser/utilities/MkvTag.java
@@ -0,0 +1,34 @@
+/*
+Copyright 2017-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License"). 
+You may not use this file except in compliance with the License. 
+A copy of the License is located at
+
+   http://aws.amazon.com/apache2.0/
+
+or in the "license" file accompanying this file. 
+This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and limitations under the License.
+*/
+package com.amazonaws.kinesisvideo.parser.utilities;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * Class that captures MKV tag key/value string pairs
+ */
+@AllArgsConstructor(access = AccessLevel.PUBLIC)
+@Builder
+@Getter
+@ToString
+public class MkvTag {
+    @Builder.Default
+    private String tagName = "";
+    @Builder.Default
+    private String tagValue = "";
+}
diff --git a/src/test/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLParserTest.java b/src/test/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLParserTest.java
index e86021f..54ba428 100644
--- a/src/test/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLParserTest.java
+++ b/src/test/java/com/amazonaws/kinesisvideo/parser/ebml/EBMLParserTest.java
@@ -24,6 +24,8 @@
 import java.util.Arrays;
 import java.util.OptionalLong;
 
+import static com.amazonaws.kinesisvideo.parser.ebml.EBMLUtils.UNKNOWN_LENGTH_VALUE;
+
 /**
  * Tests for the {@link EBMLParser}.
  */
@@ -236,14 +238,14 @@ public void testWithMultipleMasterElementsAndChildElements() throws IOException
                         .callbackType(TestEBMLParserCallback.CallbackDescription.CallbackType.START)
                         .typeInfo(TestEBMLTypeInfoProvider.SEGMENT)
                         .elementCount(3)
-                        .numBytes(OptionalLong.of(EBMLParserInternalElement.UNKNOWN_LENGTH_VALUE))
+                        .numBytes(OptionalLong.of(UNKNOWN_LENGTH_VALUE))
                         .bytes(SEGMENT_element_rawbytes)
                         .build())
                 .expectCallback(TestEBMLParserCallback.CallbackDescription.builder()
                         .callbackType(TestEBMLParserCallback.CallbackDescription.CallbackType.START)
                         .typeInfo(TestEBMLTypeInfoProvider.SEEKHEAD)
                         .elementCount(4)
-                        .numBytes(OptionalLong.of(EBMLParserInternalElement.UNKNOWN_LENGTH_VALUE))
+                        .numBytes(OptionalLong.of(UNKNOWN_LENGTH_VALUE))
                         .bytes(SEEKHEAD_element_rawbytes)
                         .build());
         addExpectedCallbacksForBaseElement(TestEBMLTypeInfoProvider.CRC,
@@ -280,7 +282,6 @@ public void testWithMultipleMasterElementsAndChildElements() throws IOException
         callParser(outputStream, 1);
     }
 
-
     @Test
     public void unknownElementsTest() throws IOException {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
@@ -320,7 +321,6 @@ public void unknownElementsTest() throws IOException {
         callParser(outputStream, 1);
     }
 
-
     private ByteArrayOutputStream setupTestForMasterElementWithOneChildAndUnknownlength() throws IOException {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         byte [] EBML_element_rawbytes = writeElement(EBML_id_bytes, UNKNOWN_LENGTH, outputStream);
@@ -404,7 +404,7 @@ private TestEBMLParserCallback.CallbackDescription createExpectedCallbackForStar
                 .callbackType(TestEBMLParserCallback.CallbackDescription.CallbackType.START)
                 .typeInfo(TestEBMLTypeInfoProvider.EBML)
                 .elementCount(0)
-                .numBytes(OptionalLong.of(EBMLParserInternalElement.UNKNOWN_LENGTH_VALUE))
+                .numBytes(OptionalLong.of(UNKNOWN_LENGTH_VALUE))
                 .bytes(EBML_element_rawbytes)
                 .build();
     }
diff --git a/src/test/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExampleTest.java b/src/test/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExampleTest.java
index 3fe05b8..0b9e020 100644
--- a/src/test/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExampleTest.java
+++ b/src/test/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoExampleTest.java
@@ -44,4 +44,17 @@ public void testExample() throws InterruptedException, IOException {
         Assert.assertEquals(8, example.getFragmentsRead());
     }
 
+    @Ignore
+    @Test
+    public void testConsumerExample() throws InterruptedException, IOException {
+        KinesisVideoExample example = KinesisVideoExample.builder().region(Regions.US_WEST_2)
+                .streamName("myTestStream")
+                .credentialsProvider(new ProfileCredentialsProvider())
+                // Use existing stream in KVS (with Producer sending)
+                .noSampleInputRequired(true)
+                .build();
+
+        example.execute();
+    }
+
 }
diff --git a/src/test/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoRendererExampleTest.java b/src/test/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoRendererExampleTest.java
index f2cd416..ed753fe 100644
--- a/src/test/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoRendererExampleTest.java
+++ b/src/test/java/com/amazonaws/kinesisvideo/parser/examples/KinesisVideoRendererExampleTest.java
@@ -30,6 +30,22 @@ public void testExample() throws InterruptedException, IOException {
                 .streamName("render-example-stream")
                 .credentialsProvider(new ProfileCredentialsProvider())
                 .inputVideoStream(TestResourceUtil.getTestInputStream("clusters.mkv"))
+                .renderFragmentMetadata(false)
+                .build();
+
+        example.execute();
+    }
+
+    @Ignore
+    @Test
+    public void testConsumerExample() throws InterruptedException, IOException {
+        KinesisVideoRendererExample example = KinesisVideoRendererExample.builder().region(Regions.US_WEST_2)
+                .streamName("render-example-stream")
+                .credentialsProvider(new ProfileCredentialsProvider())
+                // Display the tags in the frame viewer window
+                .renderFragmentMetadata(true)
+                // Use existing stream in KVS (with Producer sending)
+                .noSampleInputRequired(true)
                 .build();
 
         example.execute();
diff --git a/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitorTest.java b/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitorTest.java
index 3c14e4e..ce33536 100644
--- a/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitorTest.java
+++ b/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/FragmentMetadataVisitorTest.java
@@ -24,6 +24,7 @@
 import com.amazonaws.kinesisvideo.parser.mkv.MkvElement;
 import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException;
 import com.amazonaws.kinesisvideo.parser.mkv.MkvValue;
+import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadataVisitor.BasicMkvTagProcessor;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -51,7 +52,8 @@ public void basicTest() throws IOException, MkvElementVisitException {
         continuationTokens.add("91343852333181432397633822764885441725874549018");
         continuationTokens.add("91343852333181432402585582922026963247510532162");
 
-        final FragmentMetadataVisitor fragmentVisitor = FragmentMetadataVisitor.create();
+        final BasicMkvTagProcessor tagProcessor = new BasicMkvTagProcessor();
+        final FragmentMetadataVisitor fragmentVisitor = FragmentMetadataVisitor.create(Optional.of(tagProcessor));
         StreamingMkvReader mkvStreamReader =
                 StreamingMkvReader.createDefault(new InputStreamParserByteSource(in));
         int segmentCount = 0;
@@ -65,6 +67,7 @@ public void basicTest() throws IOException, MkvElementVisitException {
                     MkvTrackMetadata trackMetadata = fragmentVisitor.getMkvTrackMetadata(frame.getTrackNumber());
                     assertTrackAndFragmentInfo(fragmentVisitor, frame, trackMetadata);
                 }
+
                 if (MkvTypeInfos.SEGMENT.equals(mkvElement.get().getElementMetaData().getTypeInfo())) {
                     if (mkvElement.get() instanceof MkvEndMasterElement) {
                         if (segmentCount < continuationTokens.size()) {
@@ -72,11 +75,39 @@ public void basicTest() throws IOException, MkvElementVisitException {
                             Assert.assertTrue(continuationToken.isPresent());
                             Assert.assertEquals(continuationTokens.get(segmentCount), continuationToken.get());
                         }
+
+                        Assert.assertTrue(fragmentVisitor.getCurrentFragmentMetadata().isPresent());
+
+                        final List<MkvTag> tags = tagProcessor.getTags();
+                        Assert.assertEquals(7, tags.size());
+
+                        Assert.assertEquals("COMPATIBLE_BRANDS", tags.get(0).getTagName());
+                        Assert.assertEquals("isomavc1mp42", tags.get(0).getTagValue());
+
+                        Assert.assertEquals("MAJOR_BRAND", tags.get(1).getTagName());
+                        Assert.assertEquals("M4V ", tags.get(1).getTagValue());
+
+                        Assert.assertEquals("MINOR_VERSION", tags.get(2).getTagName());
+                        Assert.assertEquals("1", tags.get(2).getTagValue());
+
+                        Assert.assertEquals("ENCODER", tags.get(3).getTagName());
+                        Assert.assertEquals("Lavf57.71.100", tags.get(3).getTagValue());
+
+                        Assert.assertEquals("HANDLER_NAME", tags.get(4).getTagName());
+                        Assert.assertEquals("ETI ISO Video Media Handler", tags.get(4).getTagValue());
+
+                        Assert.assertEquals("ENCODER", tags.get(5).getTagName());
+                        Assert.assertEquals("Elemental H.264", tags.get(5).getTagValue());
+
+                        Assert.assertEquals("DURATION", tags.get(6).getTagName());
+                        Assert.assertEquals("00:00:10.000000000\u0000\u0000", tags.get(6).getTagValue());
+
                         segmentCount++;
+
+                        tagProcessor.clear();
                     }
                 }
             }
-
         }
     }
 
@@ -159,6 +190,76 @@ public void testFragmentNumbers_NoClusterData() throws IOException, MkvElementVi
         Assert.assertEquals(expectedFragmentNumbers, visitedFragmentNumbers);
     }
 
+    /**
+     * Validating the fragment metadata visitor returns the set of tags in the right order from the test file.
+     * The test file contains mix of multiple clusters and tags.
+     */
+    @Test
+    public void testMkvTags_MixedCluster() throws IOException, MkvElementVisitException {
+        final BasicMkvTagProcessor tagProcessor = new BasicMkvTagProcessor();
+        final FragmentMetadataVisitor fragmentVisitor = FragmentMetadataVisitor.create(Optional.of(tagProcessor));
+        String testFile = "test_mixed_tags.mkv";
+        boolean firstCluster = true;
+
+        final InputStream inputStream = TestResourceUtil.getTestInputStream(testFile);
+        StreamingMkvReader mkvStreamReader =
+                StreamingMkvReader.createDefault(new InputStreamParserByteSource(inputStream));
+        while (mkvStreamReader.mightHaveNext()) {
+            Optional<MkvElement> mkvElement = mkvStreamReader.nextIfAvailable();
+            if (mkvElement.isPresent()) {
+
+                mkvElement.get().accept(fragmentVisitor);
+
+                if (MkvTypeInfos.SEGMENT.equals(mkvElement.get().getElementMetaData().getTypeInfo())
+                        && mkvElement.get() instanceof MkvEndMasterElement) {
+                    final List<MkvTag> tags = tagProcessor.getTags();
+                    Assert.assertEquals(firstCluster ? 10 : 5, tags.size());
+                    for (int i = 0; i < tags.size(); i++) {
+                        Assert.assertEquals(String.format("testTag_%s", i % 5), tags.get(i).getTagName());
+                        Assert.assertEquals(String.format("testTag_%s_Value", i % 5), tags.get(i).getTagValue());
+                    }
+
+                    tagProcessor.clear();
+                    firstCluster = false;
+                }
+            }
+        }
+    }
+
+    /**
+     * Validating the fragment metadata visitor returns the set of tags in the right order from the test file.
+     * The test file contains no clusters only EBML header, Segment and a set of tags.
+     */
+    @Test
+    public void testMkvTags_NoCluster() throws IOException, MkvElementVisitException {
+        final BasicMkvTagProcessor tagProcessor = new BasicMkvTagProcessor();
+        final FragmentMetadataVisitor fragmentVisitor = FragmentMetadataVisitor.create(Optional.of(tagProcessor));
+        String testFile = "test_tags_empty_cluster.mkv";
+
+        final InputStream inputStream = TestResourceUtil.getTestInputStream(testFile);
+        StreamingMkvReader mkvStreamReader =
+                StreamingMkvReader.createDefault(new InputStreamParserByteSource(inputStream));
+        while (mkvStreamReader.mightHaveNext()) {
+            Optional<MkvElement> mkvElement = mkvStreamReader.nextIfAvailable();
+            if (mkvElement.isPresent()) {
+
+                mkvElement.get().accept(fragmentVisitor);
+
+                if (MkvTypeInfos.SEGMENT.equals(mkvElement.get().getElementMetaData().getTypeInfo())
+                        && mkvElement.get() instanceof MkvEndMasterElement) {
+                    final List<MkvTag> tags = tagProcessor.getTags();
+                    Assert.assertEquals(5, tags.size());
+                    for (int i = 0; i < tags.size(); i++) {
+                        Assert.assertEquals(String.format("testTag_%s", i), tags.get(i).getTagName());
+                        Assert.assertEquals(String.format("testTag_%s_Value", i), tags.get(i).getTagValue());
+                    }
+
+                    tagProcessor.clear();
+                }
+            }
+        }
+    }
+
     @Test
     public void testFragmentMetadata_NoFragementMetadata_withWebm() throws IOException, MkvElementVisitException {
         final FragmentMetadataVisitor fragmentMetadataVisitor = FragmentMetadataVisitor.create();
diff --git a/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/H264FrameRendererTest.java b/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/H264FrameRendererTest.java
index f3a348d..3696a38 100644
--- a/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/H264FrameRendererTest.java
+++ b/src/test/java/com/amazonaws/kinesisvideo/parser/utilities/H264FrameRendererTest.java
@@ -17,7 +17,8 @@
 import com.amazonaws.kinesisvideo.parser.ebml.InputStreamParserByteSource;
 import com.amazonaws.kinesisvideo.parser.ebml.MkvTypeInfos;
 import com.amazonaws.kinesisvideo.parser.examples.KinesisVideoFrameViewer;
-import com.amazonaws.kinesisvideo.parser.mkv.*;
+import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException;
+import com.amazonaws.kinesisvideo.parser.mkv.StreamingMkvReader;
 import com.amazonaws.kinesisvideo.parser.mkv.visitors.CompositeMkvElementVisitor;
 import com.amazonaws.kinesisvideo.parser.mkv.visitors.CountVisitor;
 import org.junit.Assert;
diff --git a/src/test/resources/test_mixed_tags.mkv b/src/test/resources/test_mixed_tags.mkv
new file mode 100644
index 0000000000000000000000000000000000000000..cc767ec14e7aa85c36eed1effabe54b7348da4bb
GIT binary patch
literal 22768
zcmeI4OKcle6o$txfmRhw0D-D*NMQj&D8A2`dts9}o)nYXj$+4IATU-5X%uLpI9ai1
z97Gd@gb=Vm0;E79wUtnLEKr456qQGIu&XK+8o>fsbOVSk!|{xrNS)Z=nX%=FV<|JE
zd7Kl?!{_t=pF2nF>uc-rGv!G9Cj9+TzAgS|d4GKA)fbENb2Fvsq7h#%N8_)QqldGL
zlaVd=b9e6YWW;bj{iRZM$(-D%E<I|mRhJITPn70!#mNFmELT@`Z?7uki!UrpG#`t4
zV^P1cm^T*l8;g5malf&IH<oZ?-J3lB{7<FzH==W|U5{P*^62|#XAj=!|L*c5S3X^F
zu1I&h|E2S94o{t$C{2}m^Ha}F%=Bgx!$xXxmzCoKyR5t~cVW86Fy6WVlh?15Bc*Rv
zmbz|U{r>mQ{(iX}$(~xRT*BV@x#Edw_a?Y?dS*3WNM&RBq4C&IW^8nD@YWrSO$fdJ
zlcnh57fX@N6Ca*?81`6<8pbBvB@yGWk$Q6En9*~{b6n+afkS^Tm0)^pas#GM*=sQE
z9T?9RhLdSKo6P2u2|H658XY*6veUUjdMp*UM-$7iS4>kbxRQ)>Zc$FS(nM)fD9bV#
zSA+`2C9Kw_#Vw_^li+bD!BJ0wtUa2yM+>=R%FgBnQX@`=LsW80T0&FV#}st}2q(aU
zo&Y1GnZ($jCqx(NBZS;P`1!)tukIy<({Q$I)E=snWjI&0GxO}F7`uAfFvyMdr?+4}
z^HpE8@}$+)*XASD?Xf!BXRF&!f4g)1PQu7vwS3!wTbpp-Z*5N4b*;?_@A#q32@ibT
z=7a|?wmIRQAGJAQ_iCFH-t~4{5;k9xvA$RF=0EaK{rX<5a344o?p>#TP^W%8;He*}
z<niOlY$0wxkxVBF>CCuOf%a)ioWq`}Q+5V3>0B~BHjqnZ(uG`R*iQSZjYs^DBhm{{
zNQm@8xu&McElUwCm=?lRfG?B?!n75PUWBE5(TnuZ%RMVdFRjHTq!*}*HPo%mgCQ@{
zOX#N*>7_~4WbdgH!Rf_fJS2K)SR0XE{Aw@1IAr>xbCCjSFH|Yu6>VvdUKFs4RD>yI
z5z{m+mtlQtFH#EUKa_PKxQ_O6JN1e$5A`6vAie~n2^3#KrwJ5aww)BOSwQ?65C3m-
zB{;r74GRlj8Wl&xmkz)eaEutC04-WFCbcrb^sy-A%oN0hSl{?!Rs%GRFJATKv3C$(
z5MK~qW*a)3=pBUwXy_doyY#=br#B=7eBoiKFAa+$;!7vs3v|6uO%;Q77}Jsj#1|9D
zMJi3J){lRD5z?3X!o2EBx)<>U@dfb(@dW_I{nQM;Ry-DnFMAqah}*Imy!C}ODvpRR
z9e^*;!CT=LB|v^58v0#8c;UjNO2WjU5C8aLx*}e)>I?I#FU50+FRl2VUaR)ooh)PT
z3sl8gNvzF-@e=mFgnmk~_XT@jcE+aE>D)C53&}icSR4^wIsso4XfBnpNYLyeiIe~^
zQko1JcclzC?no1V>I?U(FAD_m1@R>qO<?a!=rn=7FWCFyZ}4<02OhyNl(6unQE^0k
z=>U8Y++^S$DbQdn&4@(>p^CzwX(9sS3wL{en^j-9SAAKzi1>o|g7`9vy)UkHw4;=@
z;;}&UXfK;bT)RJ;gPTVUizDJoC*X^c@O~Gm6g2O0E;XyPzYrm`grU^I%p;I-oB++@
zi}0#1@AV_TwBmaj&7&5+D+tY_(B&R9kI+2&SMvz0gpkx1(Wp2gzH|V-$jV#lKz@N2
z*bz&@kRww_t`#^(FbF9KzL<T`P2&$=yz0xx8;CE6FC&58Jc#B|=rn=m5t>JxXC5)j
NePKs%^`&8P{0BuZEn5Hp

literal 0
HcmV?d00001

diff --git a/src/test/resources/test_tags_empty_cluster.mkv b/src/test/resources/test_tags_empty_cluster.mkv
new file mode 100644
index 0000000000000000000000000000000000000000..3df614ec1c39d7a1757f0072eb28242d7e1fa237
GIT binary patch
literal 426
zcmb1gy}x*gQ(GgW({~{L)X3uWxsk)EsWUgRq$s~QJCVVuy^+bOwUJ38xFP+&P{`qQ
zMg}l&C@zMw%+gOlSvszZp)Agl)Z&tm#PoOrmv*QSKU^q2EHS4vm2|TVG0ie0*DNDU
Wvy8|!%NWxvV{*+h!8FSRVio{AEMFM_

literal 0
HcmV?d00001