diff --git a/pom.xml b/pom.xml
index 0267c0d..3c6cad0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,12 +18,12 @@ limitations under the License.
org.example
streaming-book-examples
- 0.9.0
+ 1.0.0
jar
- 2.6.0-SNAPSHOT
+ 2.5.0
v2-rev374-1.22.0
1.22.0
20.0
diff --git a/src/main/java/net/streamingbook/BeamModel.java b/src/main/java/net/streamingbook/BeamModel.java
index a39035d..fa076f5 100644
--- a/src/main/java/net/streamingbook/BeamModel.java
+++ b/src/main/java/net/streamingbook/BeamModel.java
@@ -95,10 +95,10 @@ public PCollection expand(PCollection> input) {
@Override
public String[] getExpectedResults() {
- return new String[] { "[11:00:00, 11:02:00): TeamX:14 11:01:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:06:00, 11:08:00): TeamX:12 11:07:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast" };
+ return new String[] { "[12:00:00, 12:02:00): TeamX:14 12:01:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:06:00, 12:08:00): TeamX:12 12:07:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast" };
}
}
@@ -118,15 +118,15 @@ public PCollection expand(PCollection> input) {
@Override
public String[] getExpectedResults() {
return new String[] {
- "[11:00:00, 11:02:00): TeamX:5 11:01:59 EARLY index=0 isFirst",
- "[11:00:00, 11:02:00): TeamX:14 11:01:59 EARLY index=1",
- "[11:02:00, 11:04:00): TeamX:7 11:03:59 EARLY index=0 isFirst",
- "[11:02:00, 11:04:00): TeamX:10 11:03:59 EARLY index=1",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 EARLY index=2",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 EARLY index=0 isFirst",
- "[11:06:00, 11:08:00): TeamX:3 11:07:59 EARLY index=0 isFirst",
- "[11:06:00, 11:08:00): TeamX:11 11:07:59 EARLY index=1",
- "[11:06:00, 11:08:00): TeamX:12 11:07:59 EARLY index=2"
+ "[12:00:00, 12:02:00): TeamX:5 12:01:59 EARLY index=0 isFirst",
+ "[12:00:00, 12:02:00): TeamX:14 12:01:59 EARLY index=1",
+ "[12:02:00, 12:04:00): TeamX:7 12:03:59 EARLY index=0 isFirst",
+ "[12:02:00, 12:04:00): TeamX:10 12:03:59 EARLY index=1",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 EARLY index=2",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 EARLY index=0 isFirst",
+ "[12:06:00, 12:08:00): TeamX:3 12:07:59 EARLY index=0 isFirst",
+ "[12:06:00, 12:08:00): TeamX:11 12:07:59 EARLY index=1",
+ "[12:06:00, 12:08:00): TeamX:12 12:07:59 EARLY index=2"
};
}
}
@@ -153,14 +153,14 @@ public PCollection expand(PCollection> input) {
@Override
public String[] getExpectedResults() {
return new String[] {
- "[11:00:00, 11:02:00): TeamX:5 11:01:59 EARLY index=0 isFirst",
- "[11:00:00, 11:02:00): TeamX:14 11:01:59 ON_TIME index=1 onTimeIndex=0 isLast",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 EARLY index=0 isFirst",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 ON_TIME index=1 onTimeIndex=0 isLast",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 EARLY index=0 isFirst",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 ON_TIME index=1 onTimeIndex=0 isLast",
- "[11:06:00, 11:08:00): TeamX:12 11:07:59 EARLY index=0 isFirst",
- "[11:06:00, 11:08:00): TeamX:12 11:07:59 ON_TIME index=1 onTimeIndex=0 isLast"
+ "[12:00:00, 12:02:00): TeamX:5 12:01:59 EARLY index=0 isFirst",
+ "[12:00:00, 12:02:00): TeamX:14 12:01:59 ON_TIME index=1 onTimeIndex=0 isLast",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 EARLY index=0 isFirst",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 ON_TIME index=1 onTimeIndex=0 isLast",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 EARLY index=0 isFirst",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 ON_TIME index=1 onTimeIndex=0 isLast",
+ "[12:06:00, 12:08:00): TeamX:12 12:07:59 EARLY index=0 isFirst",
+ "[12:06:00, 12:08:00): TeamX:12 12:07:59 ON_TIME index=1 onTimeIndex=0 isLast"
};
}
}
@@ -181,14 +181,14 @@ public PCollection expand(PCollection> input) {
@Override
public String[] getExpectedResults() {
return new String[] {
- "[11:00:00, 11:02:00): TeamX:5 11:01:59 EARLY index=0 isFirst",
- "[11:00:00, 11:02:00): TeamX:14 11:01:59 ON_TIME index=1 onTimeIndex=0 isLast",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 EARLY index=0 isFirst",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 ON_TIME index=1 onTimeIndex=0 isLast",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 EARLY index=0 isFirst",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 ON_TIME index=1 onTimeIndex=0 isLast",
- "[11:06:00, 11:08:00): TeamX:12 11:07:59 EARLY index=0 isFirst",
- "[11:06:00, 11:08:00): TeamX:12 11:07:59 ON_TIME index=1 onTimeIndex=0 isLast"
+ "[12:00:00, 12:02:00): TeamX:5 12:01:59 EARLY index=0 isFirst",
+ "[12:00:00, 12:02:00): TeamX:14 12:01:59 ON_TIME index=1 onTimeIndex=0 isLast",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 EARLY index=0 isFirst",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 ON_TIME index=1 onTimeIndex=0 isLast",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 EARLY index=0 isFirst",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 ON_TIME index=1 onTimeIndex=0 isLast",
+ "[12:06:00, 12:08:00): TeamX:12 12:07:59 EARLY index=0 isFirst",
+ "[12:06:00, 12:08:00): TeamX:12 12:07:59 ON_TIME index=1 onTimeIndex=0 isLast"
};
}
}
@@ -208,10 +208,10 @@ public static class Example2_6left extends Example2_6 {
@Override
public String[] getExpectedResults() {
return new String[] {
- "[11:00:00, 11:02:00): TeamX:14 11:01:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:06:00, 11:08:00): TeamX:12 11:07:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast"
+ "[12:00:00, 12:02:00): TeamX:14 12:01:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:06:00, 12:08:00): TeamX:12 12:07:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast"
};
}
}
@@ -221,10 +221,10 @@ public static class Example2_6right extends Example2_6 {
@Override
public String[] getExpectedResults() {
return new String[] {
- "[11:00:00, 11:02:00): TeamX:5 11:01:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:06:00, 11:08:00): TeamX:12 11:07:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast"
+ "[12:00:00, 12:02:00): TeamX:5 12:01:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:06:00, 12:08:00): TeamX:12 12:07:59 ON_TIME index=0 onTimeIndex=0 isFirst isLast"
};
}
}
@@ -252,15 +252,15 @@ public String[] getExpectedResults() {
// is triggering the ON_TIME panes, and not the final advancement of the watermark to infinity
// (which would also mark the pane as last).
return new String[] {
- "[11:00:00, 11:02:00): TeamX:5 11:01:59 EARLY index=0 isFirst",
- "[11:00:00, 11:02:00): TeamX:14 11:01:59 ON_TIME index=1 onTimeIndex=0",
- "[11:02:00, 11:04:00): TeamX:10 11:03:59 EARLY index=0 isFirst",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 EARLY index=1",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 ON_TIME index=2 onTimeIndex=0",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 EARLY index=0 isFirst",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 ON_TIME index=1 onTimeIndex=0",
- "[11:06:00, 11:08:00): TeamX:3 11:07:59 EARLY index=0 isFirst",
- "[11:06:00, 11:08:00): TeamX:12 11:07:59 ON_TIME index=1 onTimeIndex=0"
+ "[12:00:00, 12:02:00): TeamX:5 12:01:59 EARLY index=0 isFirst",
+ "[12:00:00, 12:02:00): TeamX:14 12:01:59 ON_TIME index=1 onTimeIndex=0",
+ "[12:02:00, 12:04:00): TeamX:10 12:03:59 EARLY index=0 isFirst",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 EARLY index=1",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 ON_TIME index=2 onTimeIndex=0",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 EARLY index=0 isFirst",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 ON_TIME index=1 onTimeIndex=0",
+ "[12:06:00, 12:08:00): TeamX:3 12:07:59 EARLY index=0 isFirst",
+ "[12:06:00, 12:08:00): TeamX:12 12:07:59 ON_TIME index=1 onTimeIndex=0"
};
}
}
@@ -273,15 +273,15 @@ public String[] getExpectedResults() {
// is triggering the ON_TIME panes, and not the final advancement of the watermark to infinity
// (which would also mark the pane as last).
return new String[] {
- "[11:00:00, 11:02:00): TeamX:5 11:01:59 EARLY index=0 isFirst",
- "[11:00:00, 11:02:00): TeamX:5 11:01:59 ON_TIME index=1 onTimeIndex=0",
- "[11:00:00, 11:02:00): TeamX:14 11:01:59 LATE index=2 onTimeIndex=1",
- "[11:02:00, 11:04:00): TeamX:10 11:03:59 EARLY index=0 isFirst",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 ON_TIME index=1 onTimeIndex=0",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 EARLY index=0 isFirst",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 ON_TIME index=1 onTimeIndex=0",
- "[11:06:00, 11:08:00): TeamX:3 11:07:59 EARLY index=0 isFirst",
- "[11:06:00, 11:08:00): TeamX:12 11:07:59 ON_TIME index=1 onTimeIndex=0"
+ "[12:00:00, 12:02:00): TeamX:5 12:01:59 EARLY index=0 isFirst",
+ "[12:00:00, 12:02:00): TeamX:5 12:01:59 ON_TIME index=1 onTimeIndex=0",
+ "[12:00:00, 12:02:00): TeamX:14 12:01:59 LATE index=2 onTimeIndex=1",
+ "[12:02:00, 12:04:00): TeamX:10 12:03:59 EARLY index=0 isFirst",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 ON_TIME index=1 onTimeIndex=0",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 EARLY index=0 isFirst",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 ON_TIME index=1 onTimeIndex=0",
+ "[12:06:00, 12:08:00): TeamX:3 12:07:59 EARLY index=0 isFirst",
+ "[12:06:00, 12:08:00): TeamX:12 12:07:59 ON_TIME index=1 onTimeIndex=0"
};
}
}
@@ -304,15 +304,15 @@ public PCollection expand(PCollection> input) {
@Override
public String[] getExpectedResults() {
return new String[] {
- "[11:00:00, 11:02:00): TeamX:5 11:01:59 EARLY index=0 isFirst",
- "[11:00:00, 11:02:00): TeamX:5 11:01:59 ON_TIME index=1 onTimeIndex=0",
- "[11:00:00, 11:02:00): TeamX:11 11:01:59 LATE index=2 onTimeIndex=1",
- "[11:02:00, 11:04:00): TeamX:10 11:03:59 EARLY index=0 isFirst",
- "[11:02:00, 11:04:00): TeamX:18 11:03:59 ON_TIME index=1 onTimeIndex=0",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 EARLY index=0 isFirst",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 ON_TIME index=1 onTimeIndex=0",
- "[11:06:00, 11:08:00): TeamX:3 11:07:59 EARLY index=0 isFirst",
- "[11:06:00, 11:08:00): TeamX:12 11:07:59 ON_TIME index=1 onTimeIndex=0"
+ "[12:00:00, 12:02:00): TeamX:5 12:01:59 EARLY index=0 isFirst",
+ "[12:00:00, 12:02:00): TeamX:5 12:01:59 ON_TIME index=1 onTimeIndex=0",
+ "[12:00:00, 12:02:00): TeamX:11 12:01:59 LATE index=2 onTimeIndex=1",
+ "[12:02:00, 12:04:00): TeamX:10 12:03:59 EARLY index=0 isFirst",
+ "[12:02:00, 12:04:00): TeamX:18 12:03:59 ON_TIME index=1 onTimeIndex=0",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 EARLY index=0 isFirst",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 ON_TIME index=1 onTimeIndex=0",
+ "[12:06:00, 12:08:00): TeamX:3 12:07:59 EARLY index=0 isFirst",
+ "[12:06:00, 12:08:00): TeamX:12 12:07:59 ON_TIME index=1 onTimeIndex=0"
};
}
}
@@ -335,15 +335,15 @@ public PCollection expand(PCollection> input) {
@Override
public String[] getExpectedResults() {
return new String[] {
- "[11:00:00, 11:02:00): TeamX:5 11:01:59 EARLY index=0 isFirst",
- "[11:00:00, 11:02:00): TeamX:0 11:01:59 ON_TIME index=1 onTimeIndex=0",
- "[11:00:00, 11:02:00): TeamX:9 11:01:59 LATE index=2 onTimeIndex=1",
- "[11:02:00, 11:04:00): TeamX:10 11:03:59 EARLY index=0 isFirst",
- "[11:02:00, 11:04:00): TeamX:8 11:03:59 ON_TIME index=1 onTimeIndex=0",
- "[11:04:00, 11:06:00): TeamX:4 11:05:59 EARLY index=0 isFirst",
- "[11:04:00, 11:06:00): TeamX:0 11:05:59 ON_TIME index=1 onTimeIndex=0",
- "[11:06:00, 11:08:00): TeamX:3 11:07:59 EARLY index=0 isFirst",
- "[11:06:00, 11:08:00): TeamX:9 11:07:59 ON_TIME index=1 onTimeIndex=0"
+ "[12:00:00, 12:02:00): TeamX:5 12:01:59 EARLY index=0 isFirst",
+ "[12:00:00, 12:02:00): TeamX:0 12:01:59 ON_TIME index=1 onTimeIndex=0",
+ "[12:00:00, 12:02:00): TeamX:9 12:01:59 LATE index=2 onTimeIndex=1",
+ "[12:02:00, 12:04:00): TeamX:10 12:03:59 EARLY index=0 isFirst",
+ "[12:02:00, 12:04:00): TeamX:8 12:03:59 ON_TIME index=1 onTimeIndex=0",
+ "[12:04:00, 12:06:00): TeamX:4 12:05:59 EARLY index=0 isFirst",
+ "[12:04:00, 12:06:00): TeamX:0 12:05:59 ON_TIME index=1 onTimeIndex=0",
+ "[12:06:00, 12:08:00): TeamX:3 12:07:59 EARLY index=0 isFirst",
+ "[12:06:00, 12:08:00): TeamX:9 12:07:59 ON_TIME index=1 onTimeIndex=0"
};
}
}
@@ -366,15 +366,15 @@ public PCollection expand(PCollection> input) {
@Override
public String[] getExpectedResults() {
return new String[] {
- "[11:00:26, 11:01:26): TeamX:5 11:01:25 EARLY index=0 isFirst",
- "[11:00:26, 11:01:26): TeamX:5 11:01:25 ON_TIME index=1 onTimeIndex=0",
- "[11:02:24, 11:03:24): TeamX:7 11:03:23 EARLY index=0 isFirst",
- "[11:02:24, 11:05:19): TeamX:22 11:05:18 EARLY index=0 isFirst",
- "[11:02:24, 11:05:19): TeamX:22 11:05:18 ON_TIME index=1 onTimeIndex=0",
- "[11:00:26, 11:05:19): TeamX:36 11:05:18 LATE index=0 onTimeIndex=0 isFirst",
- "[11:06:39, 11:07:39): TeamX:3 11:07:38 EARLY index=0 isFirst",
- "[11:06:39, 11:08:46): TeamX:12 11:08:45 EARLY index=0 isFirst",
- "[11:06:39, 11:08:46): TeamX:12 11:08:45 ON_TIME index=1 onTimeIndex=0 isLast"
+ "[12:00:26, 12:01:26): TeamX:5 12:01:25 EARLY index=0 isFirst",
+ "[12:00:26, 12:01:26): TeamX:5 12:01:25 ON_TIME index=1 onTimeIndex=0",
+ "[12:02:24, 12:03:24): TeamX:7 12:03:23 EARLY index=0 isFirst",
+ "[12:02:24, 12:05:19): TeamX:22 12:05:18 EARLY index=0 isFirst",
+ "[12:02:24, 12:05:19): TeamX:22 12:05:18 ON_TIME index=1 onTimeIndex=0",
+ "[12:00:26, 12:05:19): TeamX:36 12:05:18 LATE index=0 onTimeIndex=0 isFirst",
+ "[12:06:39, 12:07:39): TeamX:3 12:07:38 EARLY index=0 isFirst",
+ "[12:06:39, 12:08:46): TeamX:12 12:08:45 EARLY index=0 isFirst",
+ "[12:06:39, 12:08:46): TeamX:12 12:08:45 ON_TIME index=1 onTimeIndex=0 isLast"
};
}
}
@@ -396,15 +396,15 @@ public PCollection expand(PCollection> input) {
@Override
public String[] getExpectedResults() {
return new String[] {
- "[11:00:26, 11:01:25): TeamX:5 11:01:24 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:01:25, 11:02:24): TeamX:9 11:02:23 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:02:24, 11:03:06): TeamX:7 11:03:05 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:03:06, 11:03:39): TeamX:8 11:03:38 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:03:39, 11:04:19): TeamX:3 11:04:18 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:04:19, 11:06:39): TeamX:4 11:06:38 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:06:39, 11:07:26): TeamX:3 11:07:25 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:07:26, 11:07:46): TeamX:8 11:07:45 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
- "[11:07:46, END_OF_GLOBAL_WINDOW): TeamX:1 04:00:54 ON_TIME index=0 onTimeIndex=0 isFirst isLast"
+ "[12:00:26, 12:01:25): TeamX:5 12:01:24 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:01:25, 12:02:24): TeamX:9 12:02:23 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:02:24, 12:03:06): TeamX:7 12:03:05 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:03:06, 12:03:39): TeamX:8 12:03:38 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:03:39, 12:04:19): TeamX:3 12:04:18 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:04:19, 12:06:39): TeamX:4 12:06:38 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:06:39, 12:07:26): TeamX:3 12:07:25 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:07:26, 12:07:46): TeamX:8 12:07:45 ON_TIME index=0 onTimeIndex=0 isFirst isLast",
+ "[12:07:46, END_OF_GLOBAL_WINDOW): TeamX:1 04:00:54 ON_TIME index=0 onTimeIndex=0 isFirst isLast"
};
}
}
diff --git a/src/main/java/net/streamingbook/Utils.java b/src/main/java/net/streamingbook/Utils.java
index 390232d..3aa7765 100644
--- a/src/main/java/net/streamingbook/Utils.java
+++ b/src/main/java/net/streamingbook/Utils.java
@@ -26,7 +26,7 @@
public class Utils {
public static Instant parseTime(String time) {
- return Instant.parse("T" + time);
+ return Instant.parse("T" + time + "Z");
}
public static DateTimeFormatter TIME_FMT = DateTimeFormat.forPattern("HH:mm:ss");
diff --git a/src/test/java/net/streamingbook/StateAndTimersTest.java b/src/test/java/net/streamingbook/StateAndTimersTest.java
index f38475c..e779b47 100644
--- a/src/test/java/net/streamingbook/StateAndTimersTest.java
+++ b/src/test/java/net/streamingbook/StateAndTimersTest.java
@@ -97,9 +97,9 @@ public void stateTest() {
.apply(ParDo.of(new StateAndTimers.FormatAttributionAsString()));
PAssert.that(teamScores)
- .containsInAnyOrder("[global window]: imp=123 http://search.com?q=xyz → http://xyz.com/ → http://xyz.com/join-mailing-list 11:01:30 UNKNOWN",
+ .containsInAnyOrder("[global window]: imp=123 http://search.com?q=xyz → http://xyz.com/ → http://xyz.com/join-mailing-list 12:01:30 UNKNOWN",
"[global window]: imp=456 http://search.com?q=thing → http://xyz.com/thing → http://xyz.com/thing/add-to-cart → "
- + "http://xyz.com/thing/purchase → http://xyz.com/thing/receipt 11:03:45 UNKNOWN");
+ + "http://xyz.com/thing/purchase → http://xyz.com/thing/receipt 12:03:45 UNKNOWN");
p.run().waitUntilFinish();
}
diff --git a/src/test/java/net/streamingbook/UtilsTest.java b/src/test/java/net/streamingbook/UtilsTest.java
new file mode 100644
index 0000000..e73e152
--- /dev/null
+++ b/src/test/java/net/streamingbook/UtilsTest.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2018 Tyler Akidau
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.streamingbook;
+
+import static net.streamingbook.Utils.parseTime;
+import static net.streamingbook.Utils.formatTime;
+import static org.junit.Assert.assertEquals;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+
+@RunWith(JUnit4.class)
+public class UtilsTest {
+ @Test
+ public void parseTimeTest() {
+ String time1 = "12:34:56";
+ Instant instant = parseTime(time1);
+ String time2 = formatTime(instant);
+ assertEquals(time1, time2);
+ }
+}