From 3ad68b31964c2c9a773a5458e5d5dd70f5775c0a Mon Sep 17 00:00:00 2001 From: marios iliofotou Date: Sun, 1 Jul 2018 16:02:43 -0700 Subject: [PATCH] Flatten operator for Optional columns. (#307) --- .../main/scala/frameless/TypedDataset.scala | 47 ++++++++++++++++++- .../test/scala/frameless/FlattenTests.scala | 29 ++++++++++++ docs/src/main/tut/FeatureOverview.md | 18 +++++++ 3 files changed, 92 insertions(+), 2 deletions(-) create mode 100644 dataset/src/test/scala/frameless/FlattenTests.scala diff --git a/dataset/src/main/scala/frameless/TypedDataset.scala b/dataset/src/main/scala/frameless/TypedDataset.scala index 86f1743b..f2beacd0 100644 --- a/dataset/src/main/scala/frameless/TypedDataset.scala +++ b/dataset/src/main/scala/frameless/TypedDataset.scala @@ -1165,9 +1165,18 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val } /** - * Explodes (flattens) a single column at a time. It only compiles if the type of column supports this operation. + * Explodes a single column at a time. It only compiles if the type of column supports this operation. * - * @param column the column we wish to explode/flatten + * @example + * + * {{{ + * case class X(i: Int, j: Array[Int]) + * case class Y(i: Int, j: Int) + * + * val f: TypedDataset[X] = ??? + * val fNew: TypedDataset[Y] = f.explode('j).as[Y] + * }}} + * @param column the column we wish to explode */ def explode[A, TRep <: HList, V[_], OutMod <: HList, OutModValues <: HList, Out] (column: Witness.Lt[Symbol]) @@ -1189,6 +1198,40 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val sparkExplode(df(column.value.name))).as[Out](TypedExpressionEncoder[Out]) TypedDataset.create[Out](trans) } + + /** + * Flattens a column of type Option[A]. Compiles only if the selected column is of type Option[A]. + * + * + * @example + * + * {{{ + * case class X(i: Int, j: Option[Int]) + * case class Y(i: Int, j: Int) + * + * val f: TypedDataset[X] = ??? + * val fNew: TypedDataset[Y] = f.flattenOption('j).as[Y] + * }}} + * + * @param column the column we wish to flatten + */ + def flattenOption[A, TRep <: HList, V[_], OutMod <: HList, OutModValues <: HList, Out] + (column: Witness.Lt[Symbol]) + (implicit + i0: TypedColumn.Exists[T, column.T, V[A]], + i1: TypedEncoder[A], + i2: V[A] =:= Option[A], + i3: LabelledGeneric.Aux[T, TRep], + i4: Modifier.Aux[TRep, column.T, V[A], A, OutMod], + i5: Values.Aux[OutMod, OutModValues], + i6: Tupler.Aux[OutModValues, Out], + i7: TypedEncoder[Out] + ): TypedDataset[Out] = { + val df = dataset.toDF() + val trans = + df.filter(df(column.value.name).isNotNull).as[Out](TypedExpressionEncoder[Out]) + TypedDataset.create[Out](trans) + } } object TypedDataset { diff --git a/dataset/src/test/scala/frameless/FlattenTests.scala b/dataset/src/test/scala/frameless/FlattenTests.scala new file mode 100644 index 00000000..a65e51b8 --- /dev/null +++ b/dataset/src/test/scala/frameless/FlattenTests.scala @@ -0,0 +1,29 @@ +package frameless + +import org.scalacheck.Prop +import org.scalacheck.Prop.forAll +import org.scalacheck.Prop._ + + +class FlattenTests extends TypedDatasetSuite { + test("simple flatten test") { + val ds: TypedDataset[(Int,Option[Int])] = TypedDataset.create(Seq((1,Option(1)))) + ds.flattenOption('_2): TypedDataset[(Int,Int)] + } + + test("different Optional types") { + def prop[A: TypedEncoder](xs: List[X1[Option[A]]]): Prop = { + val tds: TypedDataset[X1[Option[A]]] = TypedDataset.create(xs) + + val framelessResults: Seq[Tuple1[A]] = tds.flattenOption('a).collect().run().toVector + val scalaResults = xs.flatMap(_.a).map(Tuple1(_)).toVector + + framelessResults ?= scalaResults + } + + check(forAll(prop[Long] _)) + check(forAll(prop[Int] _)) + check(forAll(prop[Char] _)) + check(forAll(prop[String] _)) + } +} diff --git a/docs/src/main/tut/FeatureOverview.md b/docs/src/main/tut/FeatureOverview.md index a485eb3f..8f7d20a1 100644 --- a/docs/src/main/tut/FeatureOverview.md +++ b/docs/src/main/tut/FeatureOverview.md @@ -500,6 +500,24 @@ val sampleStats = bedroomStats.select( sampleStats.show().run() ``` +In addition, optional columns can be flatten using the `.flattenOption` method on `TypedDatset`. +The result contains the rows for which the flattened column is not None (or null). The schema +is automatically adapted to reflect this change. + +```tut:book +val flattenStats = bedroomStats.flattenOption('AvgPriceBeds2) + + +// The second Option[Double] is now of type Double, since all 'null' values are removed +flattenStats: TypedDataset[(String, Option[Double], Double, Option[Double], Option[Double])] +``` + +In a DataFrame, if you just ignore types, this would equivelantly be written as: + +```tut:book +bedroomStats.dataset.toDF().filter($"AvgPriceBeds2".isNotNull) +``` + ### Entire TypedDataset Aggregation