diff --git a/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java b/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java index 37ba151694..e6ea13d419 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java +++ b/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java @@ -19,6 +19,7 @@ import com.netflix.hollow.api.consumer.HollowConsumer; import com.netflix.hollow.api.consumer.HollowConsumer.TransitionAwareRefreshListener; import com.netflix.hollow.api.custom.HollowAPI; +import com.netflix.hollow.api.error.VersionMismatchException; import com.netflix.hollow.core.HollowConstants; import com.netflix.hollow.core.memory.MemoryMode; import com.netflix.hollow.core.read.HollowBlobInput; @@ -33,8 +34,12 @@ import com.netflix.hollow.tools.history.HollowHistoricalStateDataAccess; import java.io.IOException; import java.lang.ref.WeakReference; +import java.util.Optional; +import java.util.OptionalLong; import java.util.logging.Logger; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION; + /** * A class comprising much of the internal state of a {@link HollowConsumer}. Not intended for external consumption. */ @@ -177,8 +182,16 @@ private void applyStateEngineTransition(HollowBlobInput in, OptionalBlobPartInpu reader.applyDelta(in, optionalPartIn); } - setVersion(transition.getToVersion()); + long expectedToVersion = transition.getToVersion(); + String actualToVersionStr = stateEngine.getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION); + if (actualToVersionStr != null) { + long actualToVersion = Long.parseLong(actualToVersionStr); + if (actualToVersion != expectedToVersion) { + throw new VersionMismatchException(expectedToVersion, actualToVersion); + } + } + setVersion(transition.getToVersion()); for(HollowConsumer.RefreshListener refreshListener : refreshListeners) refreshListener.blobLoaded(transition); } diff --git a/hollow/src/main/java/com/netflix/hollow/api/error/VersionMismatchException.java b/hollow/src/main/java/com/netflix/hollow/api/error/VersionMismatchException.java new file mode 100644 index 0000000000..2e5d5f14d1 --- /dev/null +++ b/hollow/src/main/java/com/netflix/hollow/api/error/VersionMismatchException.java @@ -0,0 +1,21 @@ +package com.netflix.hollow.api.error; + +public class VersionMismatchException extends HollowException { + private final long expectedVersion; + + private final long actualVersion; + + public VersionMismatchException(long expectedVersion, long actualVersion) { + super("toVersion in blob didn't match toVersion seen in metadata; actualToVersion=" + actualVersion + ", expectedToVersion=" + expectedVersion); + this.expectedVersion = expectedVersion; + this.actualVersion = actualVersion; + } + + public long getExpectedVersion() { + return expectedVersion; + } + + public long getActualVersion() { + return actualVersion; + } +} diff --git a/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java b/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java index e7b16c0437..6b41d0d5d1 100644 --- a/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java +++ b/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java @@ -18,6 +18,7 @@ import static com.netflix.hollow.core.HollowConstants.VERSION_LATEST; import static com.netflix.hollow.core.HollowConstants.VERSION_NONE; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_SCHEMA_HASH; import static java.util.Collections.emptyList; import static org.junit.Assert.assertEquals; @@ -30,6 +31,7 @@ import com.netflix.hollow.api.consumer.HollowConsumer; import com.netflix.hollow.api.custom.HollowAPI; +import com.netflix.hollow.api.error.VersionMismatchException; import com.netflix.hollow.api.metrics.HollowConsumerMetrics; import com.netflix.hollow.core.HollowStateEngine; import com.netflix.hollow.core.memory.MemoryMode; @@ -46,6 +48,7 @@ import com.netflix.hollow.test.consumer.TestHollowConsumer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -55,6 +58,8 @@ import java.util.logging.LogManager; import java.util.logging.LogRecord; import java.util.logging.Logger; + +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -98,6 +103,27 @@ public void testUpdateTo_noVersions() throws Throwable { assertTrue("Should still have no types", readStateEngine.getAllTypes().isEmpty()); } + @Test + public void testUpdateTo_updateToLatestButBlobOnDifferentVersion() throws Throwable { + expectedException.expect(VersionMismatchException.class); + expectedException.expectMessage("Could not create an update plan, because no existing versions could be retrieved."); + long updateToVersion = 1234; + long blobToVersion = 1235; //this needs to be different from updateToVersion + HollowWriteStateEngine writeStateEngine = new HollowWriteStateEngine(); + writeStateEngine.addHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(blobToVersion)); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + new HollowBlobWriter(writeStateEngine).writeSnapshot(os); + ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray()); + HollowConsumer.Blob blob = mock(HollowConsumer.Blob.class); + when(blob.isSnapshot()) + .thenReturn(true); + when(blob.getInputStream()) + .thenReturn(is); + when(retriever.retrieveSnapshotBlob(anyLong())) + .thenReturn(blob); + subject.updateTo(updateToVersion); + } + @Rule public ExpectedException expectedException = ExpectedException.none();