From 5e4cb5885d664740a9be5d3bcb169f779421ce99 Mon Sep 17 00:00:00 2001 From: Xiaodan Date: Thu, 16 Nov 2023 15:39:37 -0800 Subject: [PATCH] change consumer to read metadata in blob header for version number --- .../hollow/api/client/HollowDataHolder.java | 20 +++++++++++++++- .../api/client/HollowClientUpdaterTest.java | 24 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java b/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java index 37ba151694..950c6ed089 100644 --- a/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java +++ b/hollow/src/main/java/com/netflix/hollow/api/client/HollowDataHolder.java @@ -33,8 +33,12 @@ import com.netflix.hollow.tools.history.HollowHistoricalStateDataAccess; import java.io.IOException; import java.lang.ref.WeakReference; +import java.util.Optional; +import java.util.OptionalLong; import java.util.logging.Logger; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION; + /** * A class comprising much of the internal state of a {@link HollowConsumer}. Not intended for external consumption. */ @@ -177,7 +181,21 @@ private void applyStateEngineTransition(HollowBlobInput in, OptionalBlobPartInpu reader.applyDelta(in, optionalPartIn); } - setVersion(transition.getToVersion()); + long expectedToVersion = transition.getToVersion(); + Long actualToVersion = null; + String actualToVersionStr = stateEngine.getHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION); + if (actualToVersionStr != null) { + actualToVersion = Long.valueOf(actualToVersionStr); + } + + if (actualToVersion != null) { + if (actualToVersion.longValue() != expectedToVersion) { + LOG.warning("toVersion in blob didn't match toVersion seen in metadata; actualToVersion=" + actualToVersion + ", expectedToVersion=" + expectedToVersion); + } + setVersion(actualToVersion.longValue()); + } else { + setVersion(transition.getToVersion()); + } for(HollowConsumer.RefreshListener refreshListener : refreshListeners) refreshListener.blobLoaded(transition); diff --git a/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java b/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java index e7b16c0437..2c2e117ad4 100644 --- a/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java +++ b/hollow/src/test/java/com/netflix/hollow/api/client/HollowClientUpdaterTest.java @@ -18,6 +18,7 @@ import static com.netflix.hollow.core.HollowConstants.VERSION_LATEST; import static com.netflix.hollow.core.HollowConstants.VERSION_NONE; +import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION; import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_SCHEMA_HASH; import static java.util.Collections.emptyList; import static org.junit.Assert.assertEquals; @@ -46,6 +47,7 @@ import com.netflix.hollow.test.consumer.TestHollowConsumer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -55,6 +57,8 @@ import java.util.logging.LogManager; import java.util.logging.LogRecord; import java.util.logging.Logger; + +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -98,6 +102,26 @@ public void testUpdateTo_noVersions() throws Throwable { assertTrue("Should still have no types", readStateEngine.getAllTypes().isEmpty()); } + @Test + public void testUpdateTo_updateToLatestButBlobOnDifferentVersion() throws Throwable { + long updateToVersion = 1234; + long blobToVersion = 1235; //this needs to be different from updateToVersion + HollowWriteStateEngine writeStateEngine = new HollowWriteStateEngine(); + writeStateEngine.addHeaderTag(HEADER_TAG_PRODUCER_TO_VERSION, String.valueOf(blobToVersion)); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + new HollowBlobWriter(writeStateEngine).writeSnapshot(os); + ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray()); + HollowConsumer.Blob blob = mock(HollowConsumer.Blob.class); + when(blob.isSnapshot()) + .thenReturn(true); + when(blob.getInputStream()) + .thenReturn(is); + when(retriever.retrieveSnapshotBlob(anyLong())) + .thenReturn(blob); + subject.updateTo(updateToVersion); + Assert.assertEquals(subject.getCurrentVersionId(), blobToVersion); + } + @Rule public ExpectedException expectedException = ExpectedException.none();