Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] [CC Refactor #4] Create AbstractProtocol and do various Protocol cleanup #4057

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.actions;

import io.delta.kernel.annotation.Evolving;
import java.util.Set;

/**
* Interface for protocol actions in Delta. The protocol defines the requirements that readers and
* writers of the table need to meet.
*
* @since 3.4.0
*/
@Evolving
public interface AbstractProtocol {

/** The minimum reader version required to read the table. */
int getMinReaderVersion();

/** The minimum writer version required to read the table. */
int getMinWriterVersion();

/** The reader features that need to be supported to read the table. */
Set<String> getReaderFeatures();

/** The writer features that need to be supported to write the table. */
Set<String> getWriterFeatures();
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public class TableFeatures {

public static final String INVARIANTS_FEATURE_NAME = "invariants";

/** The minimum reader version required to support table features. */
public static final int TABLE_FEATURES_MIN_READER_VERSION = 3;

/** The minimum writer version required to support table features. */
public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7;

Expand All @@ -82,7 +85,7 @@ public static void validateReadSupportedTable(
metadata.ifPresent(ColumnMapping::throwOnUnsupportedColumnMappingMode);
break;
case 3:
List<String> readerFeatures = protocol.getReaderFeatures();
Set<String> readerFeatures = protocol.getReaderFeatures();
if (!SUPPORTED_READER_FEATURES.containsAll(readerFeatures)) {
Set<String> unsupportedFeatures = new HashSet<>(readerFeatures);
unsupportedFeatures.removeAll(SUPPORTED_READER_FEATURES);
Expand Down Expand Up @@ -194,8 +197,7 @@ public static Set<String> extractAutomaticallyEnabledWriterFeatures(
Metadata metadata, Protocol protocol) {
return TableFeatures.SUPPORTED_WRITER_FEATURES.stream()
.filter(f -> metadataRequiresWriterFeatureToBeEnabled(metadata, f))
.filter(
f -> protocol.getWriterFeatures() == null || !protocol.getWriterFeatures().contains(f))
.filter(f -> !protocol.getWriterFeatures().contains(f))
.collect(Collectors.toSet());
}

Expand Down Expand Up @@ -277,11 +279,7 @@ private static void validateNoInvariants(StructType tableSchema) {
}

private static boolean isWriterFeatureSupported(Protocol protocol, String featureName) {
List<String> writerFeatures = protocol.getWriterFeatures();
if (writerFeatures == null) {
return false;
}
return writerFeatures.contains(featureName)
return protocol.getWriterFeatures().contains(featureName)
&& protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ public Transaction build(Engine engine) {
if (!newWriterFeatures.isEmpty()) {
logger.info("Automatically enabling writer features: {}", newWriterFeatures);
shouldUpdateProtocol = true;
List<String> oldWriterFeatures = protocol.getWriterFeatures();
Set<String> oldWriterFeatures = protocol.getWriterFeatures();
protocol = protocol.withNewWriterFeatures(newWriterFeatures);
List<String> curWriterFeatures = protocol.getWriterFeatures();
Set<String> curWriterFeatures = protocol.getWriterFeatures();
checkArgument(!Objects.equals(oldWriterFeatures, curWriterFeatures));
TableFeatures.validateWriteSupportedTable(
protocol, metadata, metadata.getSchema(), table.getPath(engine));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
*/
package io.delta.kernel.internal.actions;

import static io.delta.kernel.internal.TableFeatures.TABLE_FEATURES_MIN_READER_VERSION;
import static io.delta.kernel.internal.TableFeatures.TABLE_FEATURES_MIN_WRITER_VERSION;
import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue;

import io.delta.kernel.actions.AbstractProtocol;
import io.delta.kernel.data.*;
import io.delta.kernel.internal.TableFeatures;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.util.InternalUtils;
import io.delta.kernel.internal.util.Preconditions;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.ArrayType;
Expand All @@ -28,22 +33,30 @@
import io.delta.kernel.types.StructType;
import java.util.*;

public class Protocol {
public class Protocol implements AbstractProtocol {

//////////////////////////////////
// Static variables and methods //
//////////////////////////////////

/**
* This method should always be used after a check to ensure the column vector is not null at the
* given row ID.
*
* @throws IllegalArgumentException if the column vector is null at the given row ID
*/
public static Protocol fromColumnVector(ColumnVector vector, int rowId) {
if (vector.isNullAt(rowId)) {
return null;
}
InternalUtils.requireNonNull(vector, rowId, "protocol");

return new Protocol(
vector.getChild(0).getInt(rowId),
vector.getChild(1).getInt(rowId),
vector.getChild(2).isNullAt(rowId)
? Collections.emptyList()
: VectorUtils.toJavaList(vector.getChild(2).getArray(rowId)),
? null
: new HashSet<>(VectorUtils.toJavaList(vector.getChild(2).getArray(rowId))),
vector.getChild(3).isNullAt(rowId)
? Collections.emptyList()
: VectorUtils.toJavaList(vector.getChild(3).getArray(rowId)));
? null
: new HashSet<>(VectorUtils.toJavaList(vector.getChild(3).getArray(rowId))));
}

public static final StructType FULL_SCHEMA =
Expand All @@ -53,35 +66,82 @@ public static Protocol fromColumnVector(ColumnVector vector, int rowId) {
.add("readerFeatures", new ArrayType(StringType.STRING, false /* contains null */))
.add("writerFeatures", new ArrayType(StringType.STRING, false /* contains null */));

//////////////////////////////////
// Member variables and methods //
//////////////////////////////////

private final int minReaderVersion;
private final int minWriterVersion;
private final List<String> readerFeatures;
private final List<String> writerFeatures;
private final Set<String> readerFeatures;
private final Set<String> writerFeatures;

public Protocol(
int minReaderVersion,
int minWriterVersion,
List<String> readerFeatures,
List<String> writerFeatures) {
Set<String> nullableReaderFeatures,
Set<String> nullableWriterFeatures) {
Preconditions.checkArgument(minReaderVersion >= 1, "minReaderVersion must be >= 1");
Preconditions.checkArgument(minWriterVersion >= 1, "minWriterVersion must be >= 1");

this.minReaderVersion = minReaderVersion;
this.minWriterVersion = minWriterVersion;
this.readerFeatures = readerFeatures;
this.writerFeatures = writerFeatures;
this.readerFeatures =
nullableReaderFeatures == null
? Collections.emptySet()
: Collections.unmodifiableSet(nullableReaderFeatures);
this.writerFeatures =
nullableWriterFeatures == null
? Collections.emptySet()
: Collections.unmodifiableSet(nullableWriterFeatures);

final boolean supportsReaderFeatures = minReaderVersion >= TABLE_FEATURES_MIN_READER_VERSION;
final boolean supportsWriterFeatures = minWriterVersion >= TABLE_FEATURES_MIN_WRITER_VERSION;

if (!supportsReaderFeatures && !readerFeatures.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"This protocol has minReaderVersion %d but readerFeatures is not empty: %s. "
+ "readerFeatures are only supported with minReaderVersion >= %d.",
minReaderVersion, readerFeatures, TABLE_FEATURES_MIN_READER_VERSION));
}

if (!supportsWriterFeatures && !writerFeatures.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"This protocol has minWriterVersion %d but writerFeatures is not empty: %s. "
+ "writerFeatures are only supported with minWriterVersion >= %d.",
minWriterVersion, writerFeatures, TABLE_FEATURES_MIN_WRITER_VERSION));
}

if (supportsReaderFeatures && !supportsWriterFeatures) {
throw new IllegalArgumentException(
String.format(
"This protocol has minReaderVersion %d but minWriterVersion %d. "
+ "When minReaderVersion is >= %d, minWriterVersion must be >= %d.",
minReaderVersion,
minWriterVersion,
TABLE_FEATURES_MIN_READER_VERSION,
TABLE_FEATURES_MIN_WRITER_VERSION));
}
}

@Override
public int getMinReaderVersion() {
return minReaderVersion;
}

@Override
public int getMinWriterVersion() {
return minWriterVersion;
}

public List<String> getReaderFeatures() {
@Override
public Set<String> getReaderFeatures() {
return readerFeatures;
}

public List<String> getWriterFeatures() {
@Override
public Set<String> getWriterFeatures() {
return writerFeatures;
}

Expand All @@ -102,26 +162,35 @@ public String toString() {
* @return {@link Row} object with the schema {@link Protocol#FULL_SCHEMA}
*/
public Row toRow() {
Map<Integer, Object> protocolMap = new HashMap<>();
final Map<Integer, Object> protocolMap = new HashMap<>();
protocolMap.put(0, minReaderVersion);
protocolMap.put(1, minWriterVersion);
protocolMap.put(2, stringArrayValue(readerFeatures));
protocolMap.put(3, stringArrayValue(writerFeatures));

// readerFeatures can only exist in the serialized protocol action when minReaderVersion >= 3
protocolMap.put(
2,
minReaderVersion >= TABLE_FEATURES_MIN_READER_VERSION
? stringArrayValue(new ArrayList<>(readerFeatures))
: null);

// writerFeatures can only exist in the serialized protocol action when minWriterVersion >= 7
protocolMap.put(
3,
minWriterVersion >= TABLE_FEATURES_MIN_WRITER_VERSION
? stringArrayValue(new ArrayList<>(writerFeatures))
: null);

return new GenericRow(Protocol.FULL_SCHEMA, protocolMap);
}

public Protocol withNewWriterFeatures(Set<String> writerFeatures) {
Tuple2<Integer, Integer> newProtocolVersions =
TableFeatures.minProtocolVersionFromAutomaticallyEnabledFeatures(writerFeatures);
List<String> newWriterFeatures = new ArrayList<>(writerFeatures);
if (this.writerFeatures != null) {
newWriterFeatures.addAll(this.writerFeatures);
}
public Protocol withNewWriterFeatures(Set<String> newWriterFeatures) {
final Tuple2<Integer, Integer> newProtocolVersions =
TableFeatures.minProtocolVersionFromAutomaticallyEnabledFeatures(newWriterFeatures);
final Set<String> allNewWriterFeatures = new HashSet<>();
allNewWriterFeatures.addAll(writerFeatures);
allNewWriterFeatures.addAll(newWriterFeatures);

return new Protocol(
newProtocolVersions._1,
newProtocolVersions._2,
this.readerFeatures == null ? null : new ArrayList<>(this.readerFeatures),
newWriterFeatures);
newProtocolVersions._1, newProtocolVersions._2, readerFeatures, allNewWriterFeatures);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ class TableFeaturesSuite extends AnyFunSuite {
def createTestProtocol(minWriterVersion: Int, writerFeatures: String*): Protocol = {
new Protocol(
// minReaderVersion - it doesn't matter as the read fails anyway before the writer check
0,
1,
minWriterVersion,
// reader features - it doesn't matter as the read fails anyway before the writer check
Collections.emptyList(),
writerFeatures.toSeq.asJava
Collections.emptySet(),
writerFeatures.toSet.asJava
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.internal.actions

import org.scalatest.funsuite.AnyFunSuite

import scala.collection.JavaConverters._

class ProtocolSuite extends AnyFunSuite {

test("instantiation -- bad minReaderVersion should throw") {
val exMsg = intercept[IllegalArgumentException] {
new Protocol(0, 1, null, null)
}.getMessage

assert(exMsg === "minReaderVersion must be >= 1")
}

test("instantiation -- bad minWriterVersion should throw") {
val exMsg = intercept[IllegalArgumentException] {
new Protocol(1, 0, null, null)
}.getMessage

assert(exMsg === "minWriterVersion must be >= 1")
}

test("instantiation -- minReaderVersion < 3 but readerFeatures non-empty should throw") {
val exMsg = intercept[IllegalArgumentException] {
new Protocol(1, 1, Set("columnMapping").asJava, null)
}.getMessage

assert(exMsg === "This protocol has minReaderVersion 1 but readerFeatures is not " +
"empty: [columnMapping]. readerFeatures are only supported with minReaderVersion >= 3.")
}

test("instantiation -- minWriterVersion < 7 but writerFeatures non-empty should throw") {
val exMsg = intercept[IllegalArgumentException] {
new Protocol(1, 1, null, Set("appendOnly").asJava)
}.getMessage

assert(exMsg === "This protocol has minWriterVersion 1 but writerFeatures is not " +
"empty: [appendOnly]. writerFeatures are only supported with minWriterVersion >= 7.")
}

test("instantiation -- minReaderVersion >= 3 but minWriterVersion < 7 should throw") {
val exMsg = intercept[IllegalArgumentException] {
new Protocol(3, 6, null, null)
}.getMessage

assert(exMsg === "This protocol has minReaderVersion 3 but minWriterVersion 6. When " +
"minReaderVersion is >= 3, minWriterVersion must be >= 7.")
}

}
Loading
Loading