Skip to content

Commit

Permalink
Add support for Publishing Header alongside Blob (Snapshot/Delta) (#555)
Browse files Browse the repository at this point in the history
Add support for Publishing Header alongside Blob (Snapshot/Delta)
  • Loading branch information
mbrilnetflix authored Feb 11, 2022
1 parent fcb1d67 commit 804114d
Show file tree
Hide file tree
Showing 22 changed files with 587 additions and 90 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ venv

# publishing secrets
secrets/signing-key

**/.DS_Store
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.netflix.hollow.api.consumer.HollowConsumer.Blob;
import com.netflix.hollow.api.consumer.HollowConsumer.BlobRetriever;
import com.netflix.hollow.api.consumer.HollowConsumer.HeaderBlob;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -29,6 +30,12 @@ public class TestBlobRetriever implements BlobRetriever {
private final Map<Long, Blob> snapshots = new HashMap<>();
private final Map<Long, Blob> deltas = new HashMap<>();
private final Map<Long, Blob> reverseDeltas = new HashMap<>();
private final Map<Long, HeaderBlob> headers = new HashMap<>();

@Override
public HeaderBlob retrieveHeaderBlob(long desiredVersion) {
return headers.get(desiredVersion);
}

@Override
public Blob retrieveSnapshotBlob(long desiredVersion) {
Expand Down Expand Up @@ -57,4 +64,7 @@ public void addReverseDelta(long currentVersion, Blob transition) {
reverseDeltas.put(currentVersion, transition);
}

public void addHeader(long desiredVersion, HeaderBlob headerBlob) {
headers.put(desiredVersion, headerBlob);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.hollow.api.client;

import com.netflix.hollow.api.consumer.HollowConsumer;
import com.netflix.hollow.core.HollowBlobHeader;

/**
* An interface which defines the necessary interactions of a {@link HollowClient} with a blob data store.
Expand All @@ -32,18 +33,25 @@ public interface HollowBlobRetriever {
* @param desiredVersion the desired version
* @return the snapshot for the state with an identifier equal to or less than the desired version
*/
public HollowBlob retrieveSnapshotBlob(long desiredVersion);
HollowBlob retrieveSnapshotBlob(long desiredVersion);

/**
* @param currentVersion the current version
* @return a delta transition which can be applied to the currentVersion
*/
public HollowBlob retrieveDeltaBlob(long currentVersion);
HollowBlob retrieveDeltaBlob(long currentVersion);

/**
* @param currentVersion the current version
* @return a reverse delta transition which can be applied to the currentVersion
*/
public HollowBlob retrieveReverseDeltaBlob(long currentVersion);
HollowBlob retrieveReverseDeltaBlob(long currentVersion);

/**
* @param currentVersion the desired version
* @return the header for the state with an identifier equal to currentVersion
*/
default HollowBlobHeader retrieveHeaderBlob(long currentVersion) {
return new HollowBlobHeader();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ class HollowClientConsumerBridge {

static HollowConsumer.BlobRetriever consumerBlobRetrieverFor(final HollowBlobRetriever blobRetriever) {
return new HollowConsumer.BlobRetriever() {


@Override
public HollowConsumer.HeaderBlob retrieveHeaderBlob(long currentVersion) {
throw new UnsupportedOperationException();
}

@Override
public Blob retrieveSnapshotBlob(long desiredVersion) {
final HollowBlob blob = blobRetriever.retrieveSnapshotBlob(desiredVersion);
Expand All @@ -48,7 +53,7 @@ public File getFile() throws IOException {
}
};
}

@Override
public Blob retrieveDeltaBlob(long currentVersion) {
final HollowBlob blob = blobRetriever.retrieveDeltaBlob(currentVersion);
Expand Down Expand Up @@ -86,6 +91,7 @@ public File getFile() throws IOException {
}
};
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;



/**
* A HollowConsumer is the top-level class used by consumers of Hollow data to initialize and keep up-to-date a local in-memory
* copy of a hollow dataset. The interactions between the "blob" transition store and announcement listener are defined by
Expand Down Expand Up @@ -471,6 +473,31 @@ default Set<String> configuredOptionalBlobParts() {
return null;
}

default HollowConsumer.HeaderBlob retrieveHeaderBlob(long currentVersion) {
throw new UnsupportedOperationException();
}
}

protected interface VersionedBlob {

InputStream getInputStream() throws IOException;

default File getFile() throws IOException {
throw new UnsupportedOperationException();
}
}

public static abstract class HeaderBlob implements VersionedBlob{

private final long version;

protected HeaderBlob(long version) {
this.version = version;
}

public long getVersion() {
return this.version;
}
}

/**
Expand All @@ -487,10 +514,10 @@ default Set<String> configuredOptionalBlobParts() {
* <dd>Implementations will define how to retrieve the actual blob data for this specific blob from a data store as an InputStream.</dd>
* </dl>
*/
public static abstract class Blob {
public static abstract class Blob implements VersionedBlob{

private final long fromVersion;
private final long toVersion;
protected final long fromVersion;
protected final long toVersion;
private final BlobType blobType;

/**
Expand Down Expand Up @@ -544,10 +571,6 @@ public OptionalBlobPartInput getOptionalBlobPartInputs() throws IOException {
return null;
}

public File getFile() throws IOException {
throw new UnsupportedOperationException();
}

/**
* Blobs can be of types {@code SNAPSHOT}, {@code DELTA} or {@code REVERSE_DELTA}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;
import java.util.UUID;


public class HollowFilesystemBlobRetriever implements HollowConsumer.BlobRetriever {
private final Path blobStorePath;
private final HollowConsumer.BlobRetriever fallbackBlobRetriever;
Expand Down Expand Up @@ -114,6 +115,43 @@ private void ensurePathExists(Path blobStorePath) {
}
}

@Override
public HollowConsumer.HeaderBlob retrieveHeaderBlob(long desiredVersion) {
Path exactPath = blobStorePath.resolve("header-" + desiredVersion);
if (Files.exists(exactPath))
return new FilesystemHeaderBlob(exactPath, desiredVersion);

long maxVersionBeforeDesired = HollowConstants.VERSION_NONE;
try(DirectoryStream<Path> directoryStream = Files.newDirectoryStream(blobStorePath)) {
for (Path path : directoryStream) {
String filename = path.getFileName().toString();
if(filename.startsWith("header-")) {
long version = Long.parseLong(filename.substring(filename.lastIndexOf("-") + 1));
if(version < desiredVersion && version > maxVersionBeforeDesired) {
maxVersionBeforeDesired = version;
}
}
}
} catch(IOException ex) {
throw new RuntimeException("Error listing header files; path=" + blobStorePath, ex);
}
HollowConsumer.HeaderBlob filesystemBlob = null;
if (maxVersionBeforeDesired != HollowConstants.VERSION_NONE) {
filesystemBlob = new FilesystemHeaderBlob(blobStorePath.resolve("snapshot-" + maxVersionBeforeDesired), maxVersionBeforeDesired);
if (useExistingStaleSnapshot) {
return filesystemBlob;
}
}

if(fallbackBlobRetriever != null) {
HollowConsumer.HeaderBlob remoteBlob = fallbackBlobRetriever.retrieveHeaderBlob(desiredVersion);
if(remoteBlob != null && (filesystemBlob == null || remoteBlob.getVersion() != filesystemBlob.getVersion()))
return new HeaderBlobFromBackupToFilesystem(remoteBlob, blobStorePath.resolve("header-" + remoteBlob.getVersion()));
}

return filesystemBlob;
}

@Override
public HollowConsumer.Blob retrieveSnapshotBlob(long desiredVersion) {
Path exactPath = blobStorePath.resolve("snapshot-" + desiredVersion);
Expand Down Expand Up @@ -267,6 +305,25 @@ private boolean allRequestedPartsExist(HollowConsumer.Blob.BlobType type, long c
return true;
}

private static class FilesystemHeaderBlob extends HollowConsumer.HeaderBlob {
private final Path path;

protected FilesystemHeaderBlob(Path headerPath, long version) {
super(version);
this.path = headerPath;
}

@Override
public InputStream getInputStream() throws IOException {
return new BufferedInputStream(Files.newInputStream(path));
}

@Override
public File getFile() throws IOException {
return path.toFile();
}
}

private static class FilesystemBlob extends HollowConsumer.Blob {

private final Path path;
Expand Down Expand Up @@ -327,6 +384,52 @@ public File getFile() throws IOException {

}

private static class HeaderBlobFromBackupToFilesystem extends HollowConsumer.HeaderBlob {
private final HollowConsumer.HeaderBlob remoteHeaderBlob;
private final Path path;

protected HeaderBlobFromBackupToFilesystem(HollowConsumer.HeaderBlob remoteHeaderBlob, Path destinationPath) {
super(remoteHeaderBlob.getVersion());
this.path = destinationPath;
this.remoteHeaderBlob = remoteHeaderBlob;
}

@Override
public InputStream getInputStream() throws IOException {

Path tempPath = path.resolveSibling(path.getName(path.getNameCount()-1) + "-" + UUID.randomUUID().toString());
try(
InputStream is = remoteHeaderBlob.getInputStream();
OutputStream os = Files.newOutputStream(tempPath)
) {
byte buf[] = new byte[4096];
int n;
while (-1 != (n = is.read(buf)))
os.write(buf, 0, n);
}
Files.move(tempPath, path, REPLACE_EXISTING);

return new BufferedInputStream(Files.newInputStream(path));
}

@Override
public File getFile() throws IOException {
Path tempPath = path.resolveSibling(path.getName(path.getNameCount()-1) + "-" + UUID.randomUUID().toString());
try(
InputStream is = remoteHeaderBlob.getInputStream();
OutputStream os = Files.newOutputStream(tempPath)
) {
byte buf[] = new byte[4096];
int n;
while (-1 != (n = is.read(buf)))
os.write(buf, 0, n);
}
Files.move(tempPath, path, REPLACE_EXISTING);

return path.toFile();
}
}

private static class BlobForBackupToFilesystem extends HollowConsumer.Blob {

private final HollowConsumer.Blob remoteBlob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,9 +546,12 @@ HollowProducer.Populator incrementalPopulate(
void publish(ProducerListeners listeners, long toVersion, Artifacts artifacts) throws IOException {
Status.StageBuilder psb = listeners.firePublishStart(toVersion);
try {
// We want a header to be created for all states.
artifacts.header = blobStager.openHeader(toVersion);
if(!readStates.hasCurrent() || doIntegrityCheck || numStatesUntilNextSnapshot <= 0)
artifacts.snapshot = stageBlob(listeners, blobStager.openSnapshot(toVersion));

publishHeaderBlob(artifacts.header);
if (readStates.hasCurrent()) {
artifacts.delta = stageBlob(listeners,
blobStager.openDelta(readStates.current().getVersion(), toVersion));
Expand All @@ -575,7 +578,6 @@ void publish(ProducerListeners listeners, long toVersion, Artifacts artifacts) t
artifacts.markSnapshotPublishComplete();
numStatesUntilNextSnapshot = numStatesBetweenSnapshots;
}

psb.success();
} catch (Throwable throwable) {
psb.fail(throwable);
Expand Down Expand Up @@ -679,12 +681,22 @@ private void publishSnapshotBlobAsync(ProducerListeners listeners, Artifacts art

private void publishBlob(HollowProducer.Blob b) {
try {
publisher.publish(b);
publisher.publish((HollowProducer.PublishArtifact)b);
} finally {
blobStorageCleaner.clean(b.getType());
}
}

private void publishHeaderBlob(HollowProducer.HeaderBlob b) {
try {
HollowBlobWriter writer = new HollowBlobWriter(getWriteEngine());
b.write(writer);
publisher.publish(b);
} catch (IOException e){
throw new RuntimeException(e);
}
}

/**
* Given these read states
*
Expand Down Expand Up @@ -757,7 +769,7 @@ private ReadStateHelper checkIntegrity(
listeners.fireIntegrityCheckComplete(status);
}
}

private ReadStateHelper noIntegrityCheck(ReadStateHelper readStates, Artifacts artifacts) throws IOException {
ReadStateHelper result = readStates;

Expand Down Expand Up @@ -859,6 +871,7 @@ static final class Artifacts {
HollowProducer.Blob snapshot = null;
HollowProducer.Blob delta = null;
HollowProducer.Blob reverseDelta = null;
HollowProducer.HeaderBlob header = null;

boolean cleanupCalled;
boolean snapshotPublishComplete;
Expand All @@ -876,6 +889,10 @@ synchronized void cleanup() {
reverseDelta.cleanup();
reverseDelta = null;
}
if (header != null) {
header.cleanup();
header = null;
}
}

synchronized void markSnapshotPublishComplete() {
Expand All @@ -898,6 +915,8 @@ boolean hasDelta() {
boolean hasReverseDelta() {
return reverseDelta != null;
}

boolean hasHeader() { return header != null; }
}

/**
Expand All @@ -910,11 +929,11 @@ public void cleanSnapshots() {
}

@Override
public void cleanDeltas() {
public void cleanReverseDeltas() {
}

@Override
public void cleanReverseDeltas() {
public void cleanDeltas() {
}
}

Expand Down
Loading

0 comments on commit 804114d

Please sign in to comment.