Skip to content

Commit

Permalink
Flatten operator for Optional columns. (typelevel#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
imarios authored Jul 1, 2018
1 parent be589b3 commit 3ad68b3
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 2 deletions.
47 changes: 45 additions & 2 deletions dataset/src/main/scala/frameless/TypedDataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1165,9 +1165,18 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val
}

/**
* Explodes (flattens) a single column at a time. It only compiles if the type of column supports this operation.
* Explodes a single column at a time. It only compiles if the type of column supports this operation.
*
* @param column the column we wish to explode/flatten
* @example
*
* {{{
* case class X(i: Int, j: Array[Int])
* case class Y(i: Int, j: Int)
*
* val f: TypedDataset[X] = ???
* val fNew: TypedDataset[Y] = f.explode('j).as[Y]
* }}}
* @param column the column we wish to explode
*/
def explode[A, TRep <: HList, V[_], OutMod <: HList, OutModValues <: HList, Out]
(column: Witness.Lt[Symbol])
Expand All @@ -1189,6 +1198,40 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val
sparkExplode(df(column.value.name))).as[Out](TypedExpressionEncoder[Out])
TypedDataset.create[Out](trans)
}

/**
* Flattens a column of type Option[A]. Compiles only if the selected column is of type Option[A].
*
*
* @example
*
* {{{
* case class X(i: Int, j: Option[Int])
* case class Y(i: Int, j: Int)
*
* val f: TypedDataset[X] = ???
* val fNew: TypedDataset[Y] = f.flattenOption('j).as[Y]
* }}}
*
* @param column the column we wish to flatten
*/
def flattenOption[A, TRep <: HList, V[_], OutMod <: HList, OutModValues <: HList, Out]
(column: Witness.Lt[Symbol])
(implicit
i0: TypedColumn.Exists[T, column.T, V[A]],
i1: TypedEncoder[A],
i2: V[A] =:= Option[A],
i3: LabelledGeneric.Aux[T, TRep],
i4: Modifier.Aux[TRep, column.T, V[A], A, OutMod],
i5: Values.Aux[OutMod, OutModValues],
i6: Tupler.Aux[OutModValues, Out],
i7: TypedEncoder[Out]
): TypedDataset[Out] = {
val df = dataset.toDF()
val trans =
df.filter(df(column.value.name).isNotNull).as[Out](TypedExpressionEncoder[Out])
TypedDataset.create[Out](trans)
}
}

object TypedDataset {
Expand Down
29 changes: 29 additions & 0 deletions dataset/src/test/scala/frameless/FlattenTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package frameless

import org.scalacheck.Prop
import org.scalacheck.Prop.forAll
import org.scalacheck.Prop._


class FlattenTests extends TypedDatasetSuite {
test("simple flatten test") {
val ds: TypedDataset[(Int,Option[Int])] = TypedDataset.create(Seq((1,Option(1))))
ds.flattenOption('_2): TypedDataset[(Int,Int)]
}

test("different Optional types") {
def prop[A: TypedEncoder](xs: List[X1[Option[A]]]): Prop = {
val tds: TypedDataset[X1[Option[A]]] = TypedDataset.create(xs)

val framelessResults: Seq[Tuple1[A]] = tds.flattenOption('a).collect().run().toVector
val scalaResults = xs.flatMap(_.a).map(Tuple1(_)).toVector

framelessResults ?= scalaResults
}

check(forAll(prop[Long] _))
check(forAll(prop[Int] _))
check(forAll(prop[Char] _))
check(forAll(prop[String] _))
}
}
18 changes: 18 additions & 0 deletions docs/src/main/tut/FeatureOverview.md
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,24 @@ val sampleStats = bedroomStats.select(
sampleStats.show().run()
```

In addition, optional columns can be flatten using the `.flattenOption` method on `TypedDatset`.
The result contains the rows for which the flattened column is not None (or null). The schema
is automatically adapted to reflect this change.

```tut:book
val flattenStats = bedroomStats.flattenOption('AvgPriceBeds2)
// The second Option[Double] is now of type Double, since all 'null' values are removed
flattenStats: TypedDataset[(String, Option[Double], Double, Option[Double], Option[Double])]
```

In a DataFrame, if you just ignore types, this would equivelantly be written as:

```tut:book
bedroomStats.dataset.toDF().filter($"AvgPriceBeds2".isNotNull)
```


### Entire TypedDataset Aggregation

Expand Down

0 comments on commit 3ad68b3

Please sign in to comment.