Skip to content

Commit

Permalink
Spark 3.2: Add Scala 2.13 build and CI (apache#4009)
Browse files Browse the repository at this point in the history
  • Loading branch information
fqaiser94 authored Feb 8, 2022
1 parent 2e0e4fa commit 4d78370
Show file tree
Hide file tree
Showing 27 changed files with 156 additions and 104 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ jobs:
- run: |
./gradlew printVersion
./gradlew -DflinkVersions=1.12,1.13,1.14 -DsparkVersions=2.4,3.0,3.1,3.2 -DhiveVersions=2,3 publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
./gradlew -DflinkVersions= -DsparkVersions=3.2 -DscalaVersion=2.13 -DhiveVersions= publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
31 changes: 29 additions & 2 deletions .github/workflows/spark-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ jobs:
path: |
**/build/testlogs
spark-3x-tests:
spark-3x-scala-2-12-tests:
runs-on: ubuntu-latest
strategy:
matrix:
Expand All @@ -123,7 +123,34 @@ jobs:
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
restore-keys: ${{ runner.os }}-gradle
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions=${{ matrix.spark }} -DhiveVersions= -DflinkVersions= :iceberg-spark:iceberg-spark-${{ matrix.spark }}_2.12:check :iceberg-spark:iceberg-spark-extensions-${{ matrix.spark }}_2.12:check :iceberg-spark:iceberg-spark-runtime-${{ matrix.spark }}_2.12:check -Pquick=true -x javadoc
- run: ./gradlew -DsparkVersions=${{ matrix.spark }} -DscalaVersion=2.12 -DhiveVersions= -DflinkVersions= :iceberg-spark:iceberg-spark-${{ matrix.spark }}_2.12:check :iceberg-spark:iceberg-spark-extensions-${{ matrix.spark }}_2.12:check :iceberg-spark:iceberg-spark-runtime-${{ matrix.spark }}_2.12:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v2
if: failure()
with:
name: test logs
path: |
**/build/testlogs
spark-3x-scala-2-13-tests:
runs-on: ubuntu-latest
strategy:
matrix:
jvm: [8, 11]
spark: ['3.2']
env:
SPARK_LOCAL_IP: localhost
steps:
- uses: actions/checkout@v2
- uses: actions/setup-java@v1
with:
java-version: ${{ matrix.jvm }}
- uses: actions/cache@v2
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
restore-keys: ${{ runner.os }}-gradle
- run: echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
- run: ./gradlew -DsparkVersions=${{ matrix.spark }} -DscalaVersion=2.13 -DhiveVersions= -DflinkVersions= :iceberg-spark:iceberg-spark-${{ matrix.spark }}_2.13:check :iceberg-spark:iceberg-spark-extensions-${{ matrix.spark }}_2.13:check :iceberg-spark:iceberg-spark-runtime-${{ matrix.spark }}_2.13:check -Pquick=true -x javadoc
- uses: actions/upload-artifact@v2
if: failure()
with:
Expand Down
2 changes: 2 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ systemProp.defaultHiveVersions=2
systemProp.knownHiveVersions=2,3
systemProp.defaultSparkVersions=3.2
systemProp.knownSparkVersions=2.4,3.0,3.1,3.2
systemProp.defaultScalaVersion=2.12
systemProp.knownScalaVersions=2.12,2.13
org.gradle.parallel=true
org.gradle.jvmargs=-Xmx768m
3 changes: 2 additions & 1 deletion jmh.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ if (jdkVersion != '8' && jdkVersion != '11') {
}

def sparkVersions = (System.getProperty("sparkVersions") != null ? System.getProperty("sparkVersions") : System.getProperty("defaultSparkVersions")).split(",")
def scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
def jmhProjects = []

if (jdkVersion == '8' && sparkVersions.contains("2.4")) {
Expand All @@ -37,7 +38,7 @@ if (sparkVersions.contains("3.1")) {
}

if (sparkVersions.contains("3.2")) {
jmhProjects.add(project(":iceberg-spark:iceberg-spark-3.2_2.12"))
jmhProjects.add(project(":iceberg-spark:iceberg-spark-3.2_${scalaVersion}"))
}

configure(jmhProjects) {
Expand Down
25 changes: 16 additions & 9 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ if (!knownSparkVersions.containsAll(sparkVersions)) {
throw new GradleException("Found unsupported Spark versions: " + (sparkVersions - knownSparkVersions))
}

List<String> knownScalaVersions = System.getProperty("knownScalaVersions").split(",")
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")

if (!knownScalaVersions.contains(scalaVersion)) {
throw new GradleException("Found unsupported Scala version: " + scalaVersion)
}

if (!flinkVersions.isEmpty()) {
include 'flink'
project(':flink').name = 'iceberg-flink'
Expand Down Expand Up @@ -131,15 +138,15 @@ if (sparkVersions.contains("3.1")) {
}

if (sparkVersions.contains("3.2")) {
include ':iceberg-spark:spark-3.2_2.12'
include ':iceberg-spark:spark-extensions-3.2_2.12'
include ':iceberg-spark:spark-runtime-3.2_2.12'
project(':iceberg-spark:spark-3.2_2.12').projectDir = file('spark/v3.2/spark')
project(':iceberg-spark:spark-3.2_2.12').name = 'iceberg-spark-3.2_2.12'
project(':iceberg-spark:spark-extensions-3.2_2.12').projectDir = file('spark/v3.2/spark-extensions')
project(':iceberg-spark:spark-extensions-3.2_2.12').name = 'iceberg-spark-extensions-3.2_2.12'
project(':iceberg-spark:spark-runtime-3.2_2.12').projectDir = file('spark/v3.2/spark-runtime')
project(':iceberg-spark:spark-runtime-3.2_2.12').name = 'iceberg-spark-runtime-3.2_2.12'
include ":iceberg-spark:spark-3.2_${scalaVersion}"
include ":iceberg-spark:spark-extensions-3.2_${scalaVersion}"
include ":iceberg-spark:spark-runtime-3.2_${scalaVersion}"
project(":iceberg-spark:spark-3.2_${scalaVersion}").projectDir = file('spark/v3.2/spark')
project(":iceberg-spark:spark-3.2_${scalaVersion}").name = "iceberg-spark-3.2_${scalaVersion}"
project(":iceberg-spark:spark-extensions-3.2_${scalaVersion}").projectDir = file('spark/v3.2/spark-extensions')
project(":iceberg-spark:spark-extensions-3.2_${scalaVersion}").name = "iceberg-spark-extensions-3.2_${scalaVersion}"
project(":iceberg-spark:spark-runtime-3.2_${scalaVersion}").projectDir = file('spark/v3.2/spark-runtime')
project(":iceberg-spark:spark-runtime-3.2_${scalaVersion}").name = "iceberg-spark-runtime-3.2_${scalaVersion}"
}

// hive 3 depends on hive 2, so always add hive 2 if hive3 is enabled
Expand Down
52 changes: 28 additions & 24 deletions spark/v3.2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@
* under the License.
*/

String sparkVersion = '3.2.0'
String sparkMajorVersion = '3.2'
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")

def sparkProjects = [
project(':iceberg-spark:iceberg-spark-3.2_2.12'),
project(":iceberg-spark:iceberg-spark-extensions-3.2_2.12"),
project(':iceberg-spark:iceberg-spark-runtime-3.2_2.12')
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}"),
project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}"),
project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}"),
]

configure(sparkProjects) {
project.ext {
sparkVersion = '3.2.0'
}

configurations {
all {
resolutionStrategy {
force 'com.fasterxml.jackson.module:jackson-module-scala_2.12:2.12.3'
force "com.fasterxml.jackson.module:jackson-module-scala_${scalaVersion}:2.12.3"
force 'com.fasterxml.jackson.module:jackson-module-paranamer:2.12.3'
}
}
}
}

project(':iceberg-spark:iceberg-spark-3.2_2.12') {
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
apply plugin: 'scala'
apply plugin: 'com.github.alisiikh.scalastyle'

Expand All @@ -58,10 +58,11 @@ project(':iceberg-spark:iceberg-spark-3.2_2.12') {
implementation project(':iceberg-orc')
implementation project(':iceberg-parquet')
implementation project(':iceberg-arrow')
implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}")

compileOnly "com.google.errorprone:error_prone_annotations"
compileOnly "org.apache.avro:avro"
compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
compileOnly("org.apache.spark:spark-hive_${scalaVersion}:${sparkVersion}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
// to make sure io.netty.buffer only comes from project(':iceberg-arrow')
Expand Down Expand Up @@ -109,7 +110,7 @@ project(':iceberg-spark:iceberg-spark-3.2_2.12') {
}
}

project(":iceberg-spark:iceberg-spark-extensions-3.2_2.12") {
project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}") {
apply plugin: 'java-library'
apply plugin: 'scala'
apply plugin: 'com.github.alisiikh.scalastyle'
Expand All @@ -129,13 +130,15 @@ project(":iceberg-spark:iceberg-spark-extensions-3.2_2.12") {
}

dependencies {
compileOnly "org.scala-lang:scala-library"
implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}")

compileOnly "org.scala-lang:scala-library:${scalaVersion}"
compileOnly project(path: ':iceberg-bundled-guava', configuration: 'shadow')
compileOnly project(':iceberg-api')
compileOnly project(':iceberg-core')
compileOnly project(':iceberg-common')
compileOnly project(':iceberg-spark:iceberg-spark-3.2_2.12')
compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
compileOnly project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}")
compileOnly("org.apache.spark:spark-hive_${scalaVersion}:${sparkVersion}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
// to make sure io.netty.buffer only comes from project(':iceberg-arrow')
Expand All @@ -148,7 +151,7 @@ project(":iceberg-spark:iceberg-spark-extensions-3.2_2.12") {

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-spark:iceberg-spark-3.2_2.12', configuration: 'testArtifacts')
testImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')

testImplementation "org.apache.avro:avro"

Expand All @@ -164,7 +167,7 @@ project(":iceberg-spark:iceberg-spark-extensions-3.2_2.12") {
}
}

project(':iceberg-spark:iceberg-spark-runtime-3.2_2.12') {
project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}") {
apply plugin: 'com.github.johnrengelman.shadow'

tasks.jar.dependsOn tasks.shadowJar
Expand Down Expand Up @@ -198,8 +201,8 @@ project(':iceberg-spark:iceberg-spark-runtime-3.2_2.12') {

dependencies {
api project(':iceberg-api')
implementation project(':iceberg-spark:iceberg-spark-3.2_2.12')
implementation project(':iceberg-spark:iceberg-spark-extensions-3.2_2.12')
implementation project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}")
implementation project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}")
implementation project(':iceberg-aws')
implementation(project(':iceberg-aliyun')) {
exclude group: 'edu.umd.cs.findbugs', module: 'findbugs'
Expand All @@ -211,16 +214,16 @@ project(':iceberg-spark:iceberg-spark-runtime-3.2_2.12') {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}

integrationImplementation "org.apache.spark:spark-hive_2.12:${sparkVersion}"
integrationImplementation "org.apache.spark:spark-hive_${scalaVersion}:${sparkVersion}"
integrationImplementation 'org.junit.vintage:junit-vintage-engine'
integrationImplementation 'org.slf4j:slf4j-simple'
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation project(path: ':iceberg-spark:iceberg-spark-3.2_2.12', configuration: 'testArtifacts')
integrationImplementation project(path: ':iceberg-spark:iceberg-spark-extensions-3.2_2.12', configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
// Not allowed on our classpath, only the runtime jar is allowed
integrationCompileOnly project(':iceberg-spark:iceberg-spark-extensions-3.2_2.12')
integrationCompileOnly project(':iceberg-spark:iceberg-spark-3.2_2.12')
integrationCompileOnly project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}")
integrationCompileOnly project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}")
integrationCompileOnly project(':iceberg-api')
}

Expand Down Expand Up @@ -260,7 +263,7 @@ project(':iceberg-spark:iceberg-spark-runtime-3.2_2.12') {
}

task integrationTest(type: Test) {
description = "Test Spark3 Runtime Jar against Spark 3.2"
description = "Test Spark3 Runtime Jar against Spark ${sparkMajorVersion}"
group = "verification"
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path)
Expand All @@ -272,3 +275,4 @@ project(':iceberg-spark:iceberg-spark-runtime-3.2_2.12') {
enabled = false
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import scala.collection.compat.immutable.ArraySeq
import scala.collection.mutable

trait AssignmentAlignmentSupport extends CastSupport {
Expand Down Expand Up @@ -104,13 +105,13 @@ trait AssignmentAlignmentSupport extends CastSupport {
// recursively apply this method on nested fields
val newUpdates = updates.map(u => u.copy(ref = u.ref.tail))
val updatedFieldExprs = applyUpdates(
fieldExprs,
ArraySeq.unsafeWrapArray(fieldExprs),
newUpdates,
resolver,
namePrefix :+ col.name)

// construct a new struct with updated field expressions
toNamedStruct(fields, updatedFieldExprs)
toNamedStruct(ArraySeq.unsafeWrapArray(fields), updatedFieldExprs)

case otherType =>
val colName = (namePrefix :+ col.name).mkString(".")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ case class ResolveProcedures(spark: SparkSession) extends Rule[LogicalPlan] with
validateParams(normalizedParams)

val normalizedArgs = normalizeArgs(args)
Call(procedure, args = buildArgExprs(normalizedParams, normalizedArgs))
Call(procedure, args = buildArgExprs(normalizedParams, normalizedArgs).toSeq)
}

private def validateParams(params: Seq[ProcedureParameter]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.connector.write.RowLevelOperationTable
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import scala.collection.compat.immutable.ArraySeq
import scala.collection.mutable

trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
Expand Down Expand Up @@ -120,7 +121,7 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
operation: RowLevelOperation): Seq[AttributeReference] = {

ExtendedV2ExpressionUtils.resolveRefs[AttributeReference](
operation.requiredMetadataAttributes,
operation.requiredMetadataAttributes.toSeq,
relation)
}

Expand All @@ -131,7 +132,7 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
operation match {
case supportsDelta: SupportsDelta =>
val rowIdAttrs = ExtendedV2ExpressionUtils.resolveRefs[AttributeReference](
supportsDelta.rowId,
supportsDelta.rowId.toSeq,
relation)

val nullableRowIdAttrs = rowIdAttrs.filter(_.nullable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object ExtendedV2ExpressionUtils extends SQLConfHelper {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper

def resolveRef[T <: NamedExpression](ref: NamedReference, plan: LogicalPlan): T = {
plan.resolve(ref.fieldNames, conf.resolver) match {
plan.resolve(ref.fieldNames.toSeq, conf.resolver) match {
case Some(namedExpr) =>
namedExpr.asInstanceOf[T]
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.VariableSubstitution
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters.seqAsJavaListConverter
import scala.jdk.CollectionConverters._
import scala.util.Try

class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface {
Expand Down
Loading

0 comments on commit 4d78370

Please sign in to comment.