Skip to content

Kotlin DataFrame interoperability

Jolan Rensen edited this page Sep 15, 2023 · 7 revisions

Since Kotlin DataFrame is becoming the most popular in-memory data exploration tool for Kotlin, it might be beneficial to interop between the two table formats. There are plans in place for a separate interop library/module, but unless https://github.com/Kotlin/kotlin-spark-api/issues/195 is resolved, this would have to be 20+ libraries again... In the meantime, I'll thus provide the interop using some small snippets of code you can use in your own library:

Kotlin Dataframe to Spark DataSet/DataFrame

If you want to convert from Kotlin DataFrame to Spark DataSets, that's actually quite simple:

import org.jetbrains.kotlinx.dataframe.*
import org.jetbrains.kotlinx.dataframe.annotations.DataSchema
import org.jetbrains.kotlinx.dataframe.api.*
import org.jetbrains.kotlinx.spark.api.*

@DataSchema
data class Name(
    val firstName: String,
    val lastName: String,
)

@DataSchema
data class Person(
    val name: Name,
    val age: Int,
    val city: String?,
    val weight: Int?,
    val isHappy: Boolean,
)

// Kotlin DataFrame
val df: DataFrame<Person> = listOf(
    Person(Name("Alice", "Cooper"), 15, "London", 54, true),
    Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true),
    Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false),
    Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true),
    Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true),
    Person(Name("Alice", "Wolf"), 20, null, 55, false),
    Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true),
).toDataFrame()

withSpark {
    // Spark Dataset
    val sparkDs: DataSet<Person> = df.toList().toDS()
}

Note that df.toList()/df.toListOf<>() only works if the return type is a data class, which is also what's needed for Spark.

If you want to be able to convert a Kotlin DataFrame without @DataSchema to a Spark Dataset<Row> (DataFrame), we'll need to convert the schema as well:

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.*
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.*
import org.apache.spark.unsafe.types.CalendarInterval
import org.jetbrains.kotlinx.dataframe.*
import org.jetbrains.kotlinx.dataframe.annotations.DataSchema
import org.jetbrains.kotlinx.dataframe.api.*
import org.jetbrains.kotlinx.dataframe.columns.ColumnKind.*
import org.jetbrains.kotlinx.dataframe.schema.*
import org.jetbrains.kotlinx.spark.api.*
import java.math.*
import java.sql.*
import java.time.*
import kotlin.reflect.*

/**
 * Converts the DataFrame to a Spark Dataset of Rows using the provided SparkSession and JavaSparkContext.
 *
 * @param spark The SparkSession object to use for creating the DataFrame.
 * @param sc The JavaSparkContext object to use for converting the DataFrame to RDD.
 * @return A Dataset of Rows representing the converted DataFrame.
 */
fun DataFrame<*>.toSpark(spark: SparkSession, sc: JavaSparkContext): Dataset<Row> {
    val rows = sc.toRDD(rows().map(DataRow<*>::toSpark))
    return spark.createDataFrame(rows, schema().toSpark())
}

/**
 * Converts a DataRow to a Spark Row object.
 *
 * @return The converted Spark Row.
 */
fun DataRow<*>.toSpark(): Row =
    RowFactory.create(
        *values().map {
            when (it) {
                is DataRow<*> -> it.toSpark()
                else -> it
            }
        }.toTypedArray()
    )

/**
 * Converts a DataFrameSchema to a Spark StructType.
 *
 * @return The converted Spark StructType.
 */
fun DataFrameSchema.toSpark(): StructType =
    DataTypes.createStructType(
        columns.map { (name, schema) ->
            DataTypes.createStructField(name, schema.toSpark(), schema.nullable)
        }
    )

/**
 * Converts a ColumnSchema object to Spark DataType.
 *
 * @return The Spark DataType corresponding to the given ColumnSchema object.
 * @throws IllegalArgumentException if the column type or kind is unknown.
 */
fun ColumnSchema.toSpark(): DataType =
    when (this) {
        is ColumnSchema.Value -> type.toSpark() ?: error("unknown data type: $type")
        is ColumnSchema.Group -> schema.toSpark()
        is ColumnSchema.Frame -> error("nested dataframes are not supported")
        else -> error("unknown column kind: $this")
    }

/**
 * Returns the corresponding Spark DataType for a given Kotlin type.
 *
 * @return The Spark DataType that corresponds to the Kotlin type, or null if no matching DataType is found.
 */
fun KType.toSpark(): DataType? = when(this) {
    typeOf<Byte>(), typeOf<Byte?>() -> DataTypes.ByteType
    typeOf<Short>(), typeOf<Short?>() -> DataTypes.ShortType
    typeOf<Int>(), typeOf<Int?>() -> DataTypes.IntegerType
    typeOf<Long>(), typeOf<Long?>() -> DataTypes.LongType
    typeOf<Boolean>(), typeOf<Boolean?>() -> DataTypes.BooleanType
    typeOf<Float>(), typeOf<Float?>() -> DataTypes.FloatType
    typeOf<Double>(), typeOf<Double?>() -> DataTypes.DoubleType
    typeOf<String>(), typeOf<String?>() -> DataTypes.StringType
    typeOf<LocalDate>(), typeOf<LocalDate?>() -> DataTypes.DateType
    typeOf<Date>(), typeOf<Date?>() -> DataTypes.DateType
    typeOf<Timestamp>(), typeOf<Timestamp?>() -> DataTypes.TimestampType
    typeOf<Instant>(), typeOf<Instant?>() -> DataTypes.TimestampType
    typeOf<ByteArray>(), typeOf<ByteArray?>() -> DataTypes.BinaryType
    typeOf<Decimal>(), typeOf<Decimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigDecimal>(), typeOf<BigDecimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigInteger>(), typeOf<BigInteger?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<CalendarInterval>(), typeOf<CalendarInterval?>() -> DataTypes.CalendarIntervalType
    else -> null
}

withSpark {
    // Spark Dataset
    val sparkDs: DataSet<Row> = df.toSpark(spark, sc)
}

Note that DataFrames cannot contain frame columns as nested DataSets are not supported in Spark.

Spark DataSet to Kotlin DataFrame

TODO