Skip to content

Commit

Permalink
add consumer side logic to read blob metadata for delta to version
Browse files Browse the repository at this point in the history
  • Loading branch information
workeatsleep committed Mar 6, 2024
1 parent 6bd58a7 commit d4493aa
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@
import com.netflix.hollow.api.consumer.HollowConsumer.BlobRetriever;
import com.netflix.hollow.api.consumer.HollowConsumer.HeaderBlob;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* A simple implementation of a BlobRetriever which allows adding blobs and holds them all in
* memory.
*/
public class TestBlobRetriever implements BlobRetriever {
private final Map<Long, Blob> snapshots = new HashMap<>();
private final Map<Long, Blob> deltas = new HashMap<>();
private final Map<Long, Blob> reverseDeltas = new HashMap<>();
private final Map<Long, HeaderBlob> headers = new HashMap<>();
protected final Map<Long, Blob> snapshots = new HashMap<>();
protected final Map<Long, Blob> deltas = new HashMap<>();
protected final Map<Long, Blob> reverseDeltas = new HashMap<>();
protected final Map<Long, HeaderBlob> headers = new HashMap<>();

@Override
public HeaderBlob retrieveHeaderBlob(long desiredVersion) {
Expand Down Expand Up @@ -60,7 +64,7 @@ public Blob retrieveReverseDeltaBlob(long currentVersion) {
}

// so blob can be reused
private void resetStream(Blob b) {
protected void resetStream(Blob b) {
try {
if (b!= null && b.getInputStream() != null) {
b.getInputStream().reset();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.netflix.hollow.test.consumer;

import com.netflix.hollow.api.consumer.HollowConsumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class TestBlobRetrieverWithNearestSnapshotMatch extends TestBlobRetriever {

@Override
public HollowConsumer.Blob retrieveSnapshotBlob(long desiredVersion) {
long version = findNearestSnapshotVersion(desiredVersion);
HollowConsumer.Blob b = snapshots.get(version);
resetStream(b);
return b;
}


private long findNearestSnapshotVersion(long desiredVersion) {
List<Long> snapshotVersions = new ArrayList<>();
snapshotVersions.addAll(snapshots.keySet());
Collections.sort(snapshotVersions);
int start = 0;
int end = snapshotVersions.size() - 1;
int mid = 0;
while (start + 1< end) {
mid = (start + end) / 2;
if (mid < desiredVersion) {
start = mid;
} else if (mid > desiredVersion){
end = mid;
} else {
return snapshotVersions.get(mid);
}
}
if (end <= desiredVersion) {
return snapshotVersions.get(end);
}
return snapshotVersions.get(start);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.netflix.hollow.api.client;

import static com.netflix.hollow.core.HollowConstants.VERSION_NONE;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_SCHEMA_HASH;

import com.netflix.hollow.api.consumer.HollowConsumer;
Expand Down Expand Up @@ -119,7 +120,7 @@ public synchronized boolean updateTo(long requestedVersion) throws Throwable {
public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersionInfo) throws Throwable {
long requestedVersion = requestedVersionInfo.getVersion();
if (requestedVersion == getCurrentVersionId()) {
if (requestedVersion == HollowConstants.VERSION_NONE && hollowDataHolderVolatile == null) {
if (requestedVersion == VERSION_NONE && hollowDataHolderVolatile == null) {
LOG.warning("No versions to update to, initializing to empty state");
// attempting to refresh, but no available versions - initialize to empty state
hollowDataHolderVolatile = newHollowDataHolder();
Expand Down Expand Up @@ -148,16 +149,17 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion
? planner.planInitializingUpdate(requestedVersion)
: planner.planUpdate(hollowDataHolderVolatile.getCurrentVersion(), requestedVersion,
doubleSnapshotConfig.allowDoubleSnapshot());
boolean isInitialUpdate = getCurrentVersionId() == VERSION_NONE;

for (HollowConsumer.RefreshListener listener : localListeners)
if (listener instanceof HollowConsumer.TransitionAwareRefreshListener)
((HollowConsumer.TransitionAwareRefreshListener)listener).transitionsPlanned(beforeVersion, requestedVersion, updatePlan.isSnapshotPlan(), updatePlan.getTransitionSequence());

if (updatePlan.destinationVersion() == HollowConstants.VERSION_NONE
if (updatePlan.destinationVersion() == VERSION_NONE
&& requestedVersion != HollowConstants.VERSION_LATEST) {
String msg = String.format("Could not create an update plan for version %s, because "
+ "that version or any qualifying previous versions could not be retrieved.", requestedVersion);
if (beforeVersion != HollowConstants.VERSION_NONE) {
if (beforeVersion != VERSION_NONE) {
msg += String.format(" Consumer will remain at current version %s until next update attempt.", beforeVersion);
}
throw new IllegalArgumentException(msg);
Expand Down Expand Up @@ -185,7 +187,7 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion
* Also note that hollowDataHolderVolatile only changes for snapshot plans,
* and it is only for snapshot plans that HollowDataHolder#initializeAPI is
* called. */
newDh.update(updatePlan, localListeners, () -> hollowDataHolderVolatile = newDh);
newDh.update(updatePlan, localListeners, () -> hollowDataHolderVolatile = newDh, isInitialUpdate);
} catch (Throwable t) {
// If the update plan failed then revert back to the old holder
hollowDataHolderVolatile = oldDh;
Expand All @@ -194,7 +196,7 @@ public synchronized boolean updateTo(HollowConsumer.VersionInfo requestedVersion
forceDoubleSnapshot = false;
}
} else { // 0 snapshot and 1+ delta transitions
hollowDataHolderVolatile.update(updatePlan, localListeners, () -> {});
hollowDataHolderVolatile.update(updatePlan, localListeners, () -> {}, isInitialUpdate);
}

for(HollowConsumer.RefreshListener refreshListener : localListeners)
Expand Down Expand Up @@ -245,7 +247,7 @@ public synchronized void removeRefreshListener(HollowConsumer.RefreshListener re
public long getCurrentVersionId() {
HollowDataHolder hollowDataHolderLocal = hollowDataHolderVolatile;
return hollowDataHolderLocal != null ? hollowDataHolderLocal.getCurrentVersion()
: HollowConstants.VERSION_NONE;
: VERSION_NONE;
}

public void forceDoubleSnapshotNextUpdate() {
Expand All @@ -256,7 +258,7 @@ public void forceDoubleSnapshotNextUpdate() {
* Whether or not a snapshot plan should be created. Visible for testing.
*/
boolean shouldCreateSnapshotPlan(HollowConsumer.VersionInfo incomingVersionInfo) {
if (getCurrentVersionId() == HollowConstants.VERSION_NONE
if (getCurrentVersionId() == VERSION_NONE
|| (forceDoubleSnapshot && doubleSnapshotConfig.allowDoubleSnapshot())) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.netflix.hollow.api.consumer.HollowConsumer;
import com.netflix.hollow.api.consumer.HollowConsumer.TransitionAwareRefreshListener;
import com.netflix.hollow.api.custom.HollowAPI;
import com.netflix.hollow.api.error.VersionMismatchException;
import com.netflix.hollow.core.HollowConstants;
import com.netflix.hollow.core.memory.MemoryMode;
import com.netflix.hollow.core.read.HollowBlobInput;
Expand All @@ -33,8 +34,13 @@
import com.netflix.hollow.tools.history.HollowHistoricalStateDataAccess;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.logging.Logger;

import static com.netflix.hollow.core.HollowConstants.VERSION_NONE;
import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION;

/**
* A class comprising much of the internal state of a {@link HollowConsumer}. Not intended for external consumption.
*/
Expand All @@ -56,7 +62,7 @@ class HollowDataHolder {

private WeakReference<HollowHistoricalStateDataAccess> priorHistoricalDataAccess;

private long currentVersion = HollowConstants.VERSION_NONE;
private long currentVersion = VERSION_NONE;

HollowDataHolder(HollowReadStateEngine stateEngine,
HollowAPIFactory apiFactory,
Expand Down Expand Up @@ -106,7 +112,7 @@ HollowDataHolder setSkipTypeShardUpdateWithNoAdditions(boolean skipTypeShardUpda
}

void update(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refreshListeners,
Runnable apiInitCallback) throws Throwable {
Runnable apiInitCallback, boolean isInitialUpdate) throws Throwable {
// Only fail if double snapshot is configured.
// This is a short term solution until it is decided to either remove this feature
// or refine it.
Expand All @@ -123,19 +129,19 @@ void update(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refres
}

if (updatePlan.isSnapshotPlan()) {
applySnapshotPlan(updatePlan, refreshListeners, apiInitCallback);
applySnapshotPlan(updatePlan, refreshListeners, apiInitCallback, isInitialUpdate);
} else {
applyDeltaOnlyPlan(updatePlan, refreshListeners);
applyDeltaOnlyPlan(updatePlan, refreshListeners, isInitialUpdate);
}
}

private void applySnapshotPlan(HollowUpdatePlan updatePlan,
HollowConsumer.RefreshListener[] refreshListeners,
Runnable apiInitCallback) throws Throwable {
applySnapshotTransition(updatePlan.getSnapshotTransition(), refreshListeners, apiInitCallback);
Runnable apiInitCallback, boolean isInitialUpdate) throws Throwable {
applySnapshotTransition(updatePlan.getSnapshotTransition(), refreshListeners, apiInitCallback, isInitialUpdate);

for(HollowConsumer.Blob blob : updatePlan.getDeltaTransitions()) {
applyDeltaTransition(blob, true, refreshListeners);
applyDeltaTransition(blob, true, refreshListeners, isInitialUpdate);
}

try {
Expand All @@ -149,10 +155,10 @@ private void applySnapshotPlan(HollowUpdatePlan updatePlan,

private void applySnapshotTransition(HollowConsumer.Blob snapshotBlob,
HollowConsumer.RefreshListener[] refreshListeners,
Runnable apiInitCallback) throws Throwable {
Runnable apiInitCallback, boolean isInitialUpdate) throws Throwable {
try (HollowBlobInput in = HollowBlobInput.modeBasedSelector(memoryMode, snapshotBlob);
OptionalBlobPartInput optionalPartIn = snapshotBlob.getOptionalBlobPartInputs()) {
applyStateEngineTransition(in, optionalPartIn, snapshotBlob, refreshListeners);
applyStateEngineTransition(in, optionalPartIn, snapshotBlob, refreshListeners, isInitialUpdate);
initializeAPI(apiInitCallback);

for (HollowConsumer.RefreshListener refreshListener : refreshListeners) {
Expand All @@ -165,7 +171,11 @@ private void applySnapshotTransition(HollowConsumer.Blob snapshotBlob,
}
}

private void applyStateEngineTransition(HollowBlobInput in, OptionalBlobPartInput optionalPartIn, HollowConsumer.Blob transition, HollowConsumer.RefreshListener[] refreshListeners) throws IOException {
private void applyStateEngineTransition(HollowBlobInput in,
OptionalBlobPartInput optionalPartIn,
HollowConsumer.Blob transition,
HollowConsumer.RefreshListener[] refreshListeners,
boolean isInitialUpdate) throws IOException {
if(transition.isSnapshot()) {
if(filter == null) {
reader.readSnapshot(in, optionalPartIn);
Expand All @@ -174,11 +184,11 @@ private void applyStateEngineTransition(HollowBlobInput in, OptionalBlobPartInpu
reader.readSnapshot(in, optionalPartIn, filter);
}
} else {
reader.applyDelta(in, optionalPartIn);
long expectedToVersion = transition.getToVersion();
reader.applyDelta(in, optionalPartIn, expectedToVersion, isInitialUpdate);
}

setVersion(transition.getToVersion());

for(HollowConsumer.RefreshListener refreshListener : refreshListeners)
refreshListener.blobLoaded(transition);
}
Expand All @@ -199,21 +209,22 @@ private void initializeAPI(Runnable r) {
}
}

private void applyDeltaOnlyPlan(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refreshListeners) throws Throwable {
private void applyDeltaOnlyPlan(HollowUpdatePlan updatePlan, HollowConsumer.RefreshListener[] refreshListeners,
boolean isInitialUpdate) throws Throwable {
for(HollowConsumer.Blob blob : updatePlan) {
applyDeltaTransition(blob, false, refreshListeners);
applyDeltaTransition(blob, false, refreshListeners, isInitialUpdate);
}
}

private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPlan, HollowConsumer.RefreshListener[] refreshListeners) throws Throwable {
private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPlan, HollowConsumer.RefreshListener[] refreshListeners, boolean isInitialUpdate) throws Throwable {
if (!memoryMode.equals(MemoryMode.ON_HEAP)) {
LOG.warning("Skipping delta transition in shared-memory mode");
return;
}

try (HollowBlobInput in = HollowBlobInput.modeBasedSelector(memoryMode, blob);
OptionalBlobPartInput optionalPartIn = blob.getOptionalBlobPartInputs()) {
applyStateEngineTransition(in, optionalPartIn, blob, refreshListeners);
applyStateEngineTransition(in, optionalPartIn, blob, refreshListeners, isInitialUpdate);

if(objLongevityConfig.enableLongLivedObjectSupport()) {
HollowDataAccess previousDataAccess = currentAPI.getDataAccess();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.netflix.hollow.api.error;

public class VersionMismatchException extends HollowException {
private final long expectedVersion;

private final long actualVersion;

public VersionMismatchException(long expectedVersion, long actualVersion) {
super("toVersion in blob didn't match toVersion seen in metadata; actualToVersion=" + actualVersion + ", expectedToVersion=" + expectedVersion);
this.expectedVersion = expectedVersion;
this.actualVersion = actualVersion;
}

public long getExpectedVersion() {
return expectedVersion;
}

public long getActualVersion() {
return actualVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.netflix.hollow.core.read.engine;

import com.netflix.hollow.api.error.VersionMismatchException;
import com.netflix.hollow.core.HollowBlobHeader;
import com.netflix.hollow.core.HollowBlobOptionalPartHeader;
import com.netflix.hollow.core.memory.MemoryMode;
Expand Down Expand Up @@ -44,6 +45,8 @@
import java.util.TreeSet;
import java.util.logging.Logger;

import static com.netflix.hollow.core.HollowStateEngine.HEADER_TAG_PRODUCER_TO_VERSION;

/**
* A HollowBlobReader is used to populate and update data in a {@link HollowReadStateEngine}, via the consumption
* of snapshot and delta blobs. Caller can choose between on-heap or shared-memory mode; defaults to (and for
Expand Down Expand Up @@ -218,12 +221,15 @@ public void applyDelta(HollowBlobInput in) throws IOException {
}

public void applyDelta(HollowBlobInput in, OptionalBlobPartInput optionalParts) throws IOException {
applyDelta(in, optionalParts, 0, false);
}
public void applyDelta(HollowBlobInput in, OptionalBlobPartInput optionalParts, long expectedVersion, boolean isInitialUpdate) throws IOException {
validateMemoryMode(in.getMemoryMode());
Map<String, HollowBlobInput> optionalPartInputs = null;
if(optionalParts != null)
optionalPartInputs = optionalParts.getInputsByPartName(in.getMemoryMode());

HollowBlobHeader header = readHeader(in, true);
HollowBlobHeader header = readHeaderAndCheckVersion(in, true, expectedVersion, isInitialUpdate);
List<HollowBlobOptionalPartHeader> partHeaders = readPartHeaders(header, optionalPartInputs, in.getMemoryMode());
notifyBeginUpdate();

Expand Down Expand Up @@ -258,16 +264,34 @@ public void applyDelta(HollowBlobInput in, OptionalBlobPartInput optionalParts)
notifyEndUpdate();
}

private HollowBlobHeader readHeader(HollowBlobInput in, boolean isDelta) throws IOException {

private HollowBlobHeader readHeaderAndCheckVersion(HollowBlobInput in, boolean isDelta, long expectedVersion, boolean isInitialUpdate) throws IOException {
HollowBlobHeader header = headerReader.readHeader(in);
if (expectedVersion != 0 && !isInitialUpdate) {
String to_version_tag = header.getHeaderTags().get(HEADER_TAG_PRODUCER_TO_VERSION);
if (to_version_tag != null) {
long to_version = Long.parseLong(to_version_tag);
if (expectedVersion != to_version) {
throw new VersionMismatchException(expectedVersion, to_version);
}
}
}
checkAndApplyHeader(header, isDelta);
return header;
}

private HollowBlobHeader readHeader(HollowBlobInput in, boolean isDelta) throws IOException {
HollowBlobHeader header = headerReader.readHeader(in);
checkAndApplyHeader(header, isDelta);
return header;
}
private void checkAndApplyHeader(final HollowBlobHeader header, boolean isDelta) throws IOException {
if(isDelta && header.getOriginRandomizedTag() != stateEngine.getCurrentRandomizedTag())
throw new IOException("Attempting to apply a delta to a state from which it was not originated!");

stateEngine.setCurrentRandomizedTag(header.getDestinationRandomizedTag());
stateEngine.setOriginRandomizedTag(header.getOriginRandomizedTag());
stateEngine.setHeaderTags(header.getHeaderTags());
return header;
}

private List<HollowBlobOptionalPartHeader> readPartHeaders(HollowBlobHeader header, Map<String, HollowBlobInput> inputsByPartName, MemoryMode mode) throws IOException {
Expand Down
Loading

0 comments on commit d4493aa

Please sign in to comment.