Skip to content

Kotlin DataFrame interoperability

Jolan Rensen edited this page Sep 15, 2023 · 7 revisions

Since Kotlin DataFrame is becoming the most popular in-memory data exploration tool for Kotlin, it might be beneficial to interop between the two table formats. There are plans in place for a separate interop library/module, but unless https://github.com/Kotlin/kotlin-spark-api/issues/195 is resolved, this would have to be 20+ libraries again... In the meantime, I'll thus provide the interop using some small snippets of code you can use in your own library:

Kotlin Dataframe to Spark Dataset/DataFrame

If you want to convert from Kotlin DataFrame to Spark DataSets, that's actually quite simple:

import org.jetbrains.kotlinx.dataframe.*
import org.jetbrains.kotlinx.dataframe.annotations.DataSchema
import org.jetbrains.kotlinx.dataframe.api.*
import org.jetbrains.kotlinx.spark.api.*

@DataSchema
data class Name(
    val firstName: String,
    val lastName: String,
)

@DataSchema
data class Person(
    val name: Name,
    val age: Int,
    val city: String?,
    val weight: Int?,
    val isHappy: Boolean,
)

// Kotlin DataFrame
val df: DataFrame<Person> = listOf(
    Person(Name("Alice", "Cooper"), 15, "London", 54, true),
    Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true),
    Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false),
    Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true),
    Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true),
    Person(Name("Alice", "Wolf"), 20, null, 55, false),
    Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true),
).toDataFrame()

withSpark {
    // Spark Dataset
    val sparkDs: DataSet<Person> = df.toList().toDS()
}

Note that df.toList()/df.toListOf<>() only works if the return type is a data class, which is also what's needed for Spark.

If you want to be able to convert a Kotlin DataFrame without @DataSchema to a Spark Dataset<Row> (DataFrame), we'll need to convert the schema as well:

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.*
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.*
import org.apache.spark.unsafe.types.CalendarInterval
import org.jetbrains.kotlinx.dataframe.*
import org.jetbrains.kotlinx.dataframe.annotations.DataSchema
import org.jetbrains.kotlinx.dataframe.api.*
import org.jetbrains.kotlinx.dataframe.columns.ColumnKind.*
import org.jetbrains.kotlinx.dataframe.schema.*
import org.jetbrains.kotlinx.spark.api.*
import java.math.*
import java.sql.*
import java.time.*
import kotlin.reflect.*

/**
 * Converts the DataFrame to a Spark Dataset of Rows using the provided SparkSession and JavaSparkContext.
 *
 * @param spark The SparkSession object to use for creating the DataFrame.
 * @param sc The JavaSparkContext object to use for converting the DataFrame to RDD.
 * @return A Dataset of Rows representing the converted DataFrame.
 */
fun DataFrame<*>.toSpark(spark: SparkSession, sc: JavaSparkContext): Dataset<Row> {
    val rows = sc.toRDD(rows().map(DataRow<*>::toSpark))
    return spark.createDataFrame(rows, schema().toSpark())
}

/**
 * Converts a DataRow to a Spark Row object.
 *
 * @return The converted Spark Row.
 */
fun DataRow<*>.toSpark(): Row =
    RowFactory.create(
        *values().map {
            when (it) {
                is DataRow<*> -> it.toSpark()
                else -> it
            }
        }.toTypedArray()
    )

/**
 * Converts a DataFrameSchema to a Spark StructType.
 *
 * @return The converted Spark StructType.
 */
fun DataFrameSchema.toSpark(): StructType =
    DataTypes.createStructType(
        columns.map { (name, schema) ->
            DataTypes.createStructField(name, schema.toSpark(), schema.nullable)
        }
    )

/**
 * Converts a ColumnSchema object to Spark DataType.
 *
 * @return The Spark DataType corresponding to the given ColumnSchema object.
 * @throws IllegalArgumentException if the column type or kind is unknown.
 */
fun ColumnSchema.toSpark(): DataType =
    when (this) {
        is ColumnSchema.Value -> type.toSpark() ?: error("unknown data type: $type")
        is ColumnSchema.Group -> schema.toSpark()
        is ColumnSchema.Frame -> error("nested dataframes are not supported")
        else -> error("unknown column kind: $this")
    }

/**
 * Returns the corresponding Spark DataType for a given Kotlin type.
 *
 * @return The Spark DataType that corresponds to the Kotlin type, or null if no matching DataType is found.
 */
fun KType.toSpark(): DataType? = when(this) {
    typeOf<Byte>(), typeOf<Byte?>() -> DataTypes.ByteType
    typeOf<Short>(), typeOf<Short?>() -> DataTypes.ShortType
    typeOf<Int>(), typeOf<Int?>() -> DataTypes.IntegerType
    typeOf<Long>(), typeOf<Long?>() -> DataTypes.LongType
    typeOf<Boolean>(), typeOf<Boolean?>() -> DataTypes.BooleanType
    typeOf<Float>(), typeOf<Float?>() -> DataTypes.FloatType
    typeOf<Double>(), typeOf<Double?>() -> DataTypes.DoubleType
    typeOf<String>(), typeOf<String?>() -> DataTypes.StringType
    typeOf<LocalDate>(), typeOf<LocalDate?>() -> DataTypes.DateType
    typeOf<Date>(), typeOf<Date?>() -> DataTypes.DateType
    typeOf<Timestamp>(), typeOf<Timestamp?>() -> DataTypes.TimestampType
    typeOf<Instant>(), typeOf<Instant?>() -> DataTypes.TimestampType
    typeOf<ByteArray>(), typeOf<ByteArray?>() -> DataTypes.BinaryType
    typeOf<Decimal>(), typeOf<Decimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigDecimal>(), typeOf<BigDecimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigInteger>(), typeOf<BigInteger?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<CalendarInterval>(), typeOf<CalendarInterval?>() -> DataTypes.CalendarIntervalType
    else -> null
}

withSpark {
    // Spark Dataset
    val sparkDs: DataSet<Row> = df.toSpark(spark, sc)
}

Note that DataFrames cannot contain frame columns as nested Datasets are not supported in Spark.

Spark Dataset to Kotlin DataFrame

Converting Spark Datasets that are backed by data classes are again relatively simple:

@DataSchema
data class Name(
    val firstName: String,
    val lastName: String,
)

@DataSchema
data class Person(
    val name: Name,
    val age: Int,
    val city: String?,
    val weight: Int?,
    val isHappy: Boolean,
)

withSpark {
    // Spark Dataset
    val ds: Dataset<Person> = listOf(
        Person(Name("Alice", "Cooper"), 15, "London", 54, true),
        Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true),
        Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false),
        Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true),
        Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true),
        Person(Name("Alice", "Wolf"), 20, null, 55, false),
        Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true),
    ).toDS()

    val kotlinDf: DataFrame<Person> = ds.collectAsList().toDataFrame()
}

Even if you don't have data class, but a @DataSchema interface, you can still collectAsList().toDataFrame() and then just .cast<YourInterface>(verify = true) or .convertTo<YourInterface>().

If instead you don't have typing information and you want to convert a Spark Dataset<Row>(DataFrame) to a Kotlin DataFrame<Any?> we need some conversion:

import org.apache.spark.sql.*
import org.apache.spark.sql.types.StructType
import org.jetbrains.kotlinx.dataframe.*
import org.jetbrains.kotlinx.dataframe.annotations.DataSchema
import org.jetbrains.kotlinx.dataframe.api.*
import org.jetbrains.kotlinx.spark.api.*

fun Dataset<Row>.toKotlinDataFrame(schema: StructType = schema(), prefix: List<String> = emptyList()): AnyFrame =
    schema.fields().map {
        val name = it.name()
        when (val dataType = it.dataType()) {
            is StructType ->
                DataColumn.createColumnGroup(
                    name = name,
                    df = toKotlinDataFrame(dataType, prefix + name),
                )

            else ->
                DataColumn.createWithTypeInference(
                    name = name,
                    values = select((prefix + name).joinToString("."))
                        .collectAsList()
                        .map { it[0] },
                    nullable = it.nullable(),
                )
        }
    }.toDataFrame()

withSpark {
    // Spark Dataset
    val ds: Dataset<Row> = ...

    val kotlinDf: DataFrame<*> = ds.toKotlinDataFrame()
    
}