Skip to content

Commit

Permalink
Add ST_Contains optimization and fix the ST_Intersects optimization b…
Browse files Browse the repository at this point in the history
…ehavior (#18)

* Fix the ST_Intersects optimization behavior

* Add ST_Contains function; remove name definition from all function names

* Add ST_Contains function optimizations
  • Loading branch information
pomadchin authored Apr 29, 2022
1 parent cc2e3e1 commit 5ef8031
Show file tree
Hide file tree
Showing 88 changed files with 287 additions and 194 deletions.
1 change: 1 addition & 0 deletions core/src/main/scala/com/azavea/hiveless/HUDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.types.DataType
import scala.util.{Failure, Success, Try}

abstract class HUDF[P, R](implicit d: HDeserializer[Try, P], s: HSerializer[R]) extends HGenericUDF[R] {
def name: String = this.getClass.getName.split("\\.").last
def dataType: DataType = s.dataType
def serialize: R => Any = s.serialize
def function: P => R
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.azavea.hiveless.spark.rules

import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or}
import org.apache.spark.sql.hive.HivelessInternals
import org.apache.spark.sql.hive.HivelessInternals.HiveGenericUDF
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -30,6 +30,8 @@ package object syntax extends Serializable {

def AndList(list: List[Expression]): Expression = list.reduce(And)

def OrList(list: List[Expression]): Expression = list.reduce(Or)

implicit class DataTypeConformityOps(val left: DataType) extends AnyVal {
def conformsToSchema(schema: StructType): Boolean =
HivelessInternals.WithTypeConformity(left).conformsTo(schema)
Expand Down
1 change: 1 addition & 0 deletions spatial-index/sql/createUDFs.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
CREATE OR REPLACE FUNCTION st_contains as 'com.azavea.hiveless.spatial.index.ST_Contains';
CREATE OR REPLACE FUNCTION st_crsFromText as 'com.azavea.hiveless.spatial.index.ST_CrsFromText';
CREATE OR REPLACE FUNCTION st_extentFromGeom as 'com.azavea.hiveless.spatial.index.ST_ExtentFromGeom';
CREATE OR REPLACE FUNCTION st_extentToGeom as 'com.azavea.hiveless.spatial.index.ST_ExtentToGeom';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.azavea.hiveless.spark.sql.rules
import com.azavea.hiveless.spark.rules.syntax._
import com.azavea.hiveless.spatial._
import com.azavea.hiveless.spatial.index._
import com.azavea.hiveless.spatial.index.ST_Intersects
import com.azavea.hiveless.spatial.index.{ST_Contains, ST_Intersects}
import com.azavea.hiveless.serializers.HDeserializer.Errors.ProductDeserializationError
import com.azavea.hiveless.serializers.syntax._
import geotrellis.vector._
Expand Down Expand Up @@ -68,6 +68,97 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] {
.orElse(Try(g.convert[Extent] -> false))
.getOrElse(throw ProductDeserializationError[ST_Intersects, ST_Intersects.Arg]("second"))

// ::: (if (isGeometry) List(condition) else Nil)
// transform expression
val expanded =
And(
IsNotNull(extentExpr),
OrList(
List(
GreaterThanOrEqual(GetStructField(extentExpr, 0, "xmin".some), Literal(extent.xmin)),
GreaterThanOrEqual(GetStructField(extentExpr, 1, "ymin".some), Literal(extent.ymin)),
LessThanOrEqual(GetStructField(extentExpr, 2, "xmax".some), Literal(extent.xmax)),
LessThanOrEqual(GetStructField(extentExpr, 3, "ymax".some), Literal(extent.ymax))
// the old condition node is a secondary filter which is not pushed down
// it is needed in case it is a Geometry intersection
)
)
)

if (isGeometry) And(expanded, condition) else expanded
// Expression
case Failure(_) =>
// In case on the right we have an Expression, no further optimizations needed and
// such predicates won't be pushed down.
//
// In case Geometry is on the right, we can't extract Envelope coordinates, to perform it we need to define
// User Defined Expression and that won't be pushed down.
//
// However, it is possible to extract coordinates out of Extent.
// In this case the GetStructField can be used to extract values and transform the request,
// though such predicates are not pushed down as well.
//
// The rough implementation of the idea above (The transformed plan for Extent, which is not pushed down):
/*if (geometryExpr.dataType.conformsToSchema(extentEncoder.schema)) {
And(
IsNotNull(extentExpr),
OrList(
List(
GreaterThanOrEqual(GetStructField(extentExpr, 0, "xmin".some), GetStructField(geometryExpr, 0, "xmin".some)),
GreaterThanOrEqual(GetStructField(extentExpr, 1, "ymin".some), GetStructField(geometryExpr, 1, "ymin".some)),
LessThanOrEqual(GetStructField(extentExpr, 2, "xmax".some), GetStructField(geometryExpr, 2, "xmax".some)),
LessThanOrEqual(GetStructField(extentExpr, 3, "ymax".some), GetStructField(geometryExpr, 3, "ymax".some))
// the old condition node is a secondary filter which is not pushed down
// it is needed in case it is a Geometry intersection
)
)
)
} else {
throw new UnsupportedOperationException(
s"${classOf[Geometry]} Envelope values extraction is not supported by the internal ${classOf[Geometry]} representation.".stripMargin
)
}*/

throw new UnsupportedOperationException(
s"${classOf[ST_Intersects]} push-down optimization works with ${classOf[Geometry]} and ${classOf[Extent]} Literals only."
)
}

Filter(expr, plan)
} catch {
// fallback to the unoptimized node if optimization failed
case e: Throwable =>
logger.warn(
s"""
|${this.getClass.getName} ${classOf[ST_Intersects]} optimization failed, using the original plan.
|StackTrace: ${ExceptionUtils.getStackTrace(e)}
|""".stripMargin
)
f
}

case f @ Filter(condition: HiveGenericUDF, plan) if condition.of[ST_Contains] =>
try {
val Seq(extentExpr, geometryExpr) = condition.children

// ST_Contains is polymorphic by the first argument
// Optimization is done only when the first argument is Extent
if (!extentExpr.dataType.conformsToSchema(extentEncoder.schema))
throw new UnsupportedOperationException(
s"${classOf[ST_Contains]} push-down optimization works on the ${classOf[Extent]} column data type only."
)

// transform expression
val expr = Try(geometryExpr.eval(null)) match {
// Literals push-down support only
case Success(g) =>
// ST_Contains is polymorphic by the second argument
// Extract Extent literal from the right
// The second argument can be Geometry or Extent
val (extent, isGeometry) = Try(g.convert[Geometry].extent -> true)
.orElse(Try(g.convert[Extent] -> false))
.getOrElse(throw ProductDeserializationError[ST_Contains, ST_Contains.Arg]("second"))

// transform expression
AndList(
List(
Expand Down Expand Up @@ -110,7 +201,7 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] {
}*/

throw new UnsupportedOperationException(
s"${classOf[ST_Intersects]} push-down optimization works with ${classOf[Geometry]} and ${classOf[Extent]} Literals only."
s"${classOf[ST_Contains]} push-down optimization works with ${classOf[Geometry]} and ${classOf[Extent]} Literals only."
)
}

Expand All @@ -120,7 +211,7 @@ object SpatialFilterPushdownRules extends Rule[LogicalPlan] {
case e: Throwable =>
logger.warn(
s"""
|${this.getClass.getName} ${classOf[ST_Intersects]} optimization failed, using the original plan.
|${this.getClass.getName} ${classOf[ST_Contains]} optimization failed, using the original plan.
|StackTrace: ${ExceptionUtils.getStackTrace(e)}
|""".stripMargin
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2021 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.azavea.hiveless.spatial.index

import com.azavea.hiveless.HUDF
import com.azavea.hiveless.spatial._
import com.azavea.hiveless.implicits.tupler._
import com.azavea.hiveless.serializers.HDeserializer.Errors.ProductDeserializationError
import geotrellis.vector._
import shapeless._

class ST_Contains extends HUDF[(ST_Contains.Arg, ST_Contains.Arg), Boolean] {
def function = ST_Contains.function
}

object ST_Contains {
// We could use Either[Extent, Geometry], but Either has no safe fall back CNil
// which may lead to derivation error messages rather than parsing
type Arg = Extent :+: Geometry :+: CNil

def parseGeometry(a: Arg): Option[Geometry] = a.select[Geometry].orElse(a.select[Extent].map(_.toPolygon()))

private def parseGeometryUnsafe(a: Arg, aname: String): Geometry =
parseGeometry(a).getOrElse(throw ProductDeserializationError[ST_Contains, Arg](aname))

def function(left: Arg, right: Arg): Boolean = {
val (l, r) = (parseGeometryUnsafe(left, "first"), parseGeometryUnsafe(right, "second"))

l.contains(r)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,5 @@ import com.azavea.hiveless.HUDF
import geotrellis.proj4.CRS

class ST_CrsFromText extends HUDF[String, CRS] {
val name: String = "st_crsFromText"
def function = CRS.fromString
def function = CRS.fromString
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ import com.azavea.hiveless.spatial._
import geotrellis.vector._

class ST_ExtentFromGeom extends HUDF[Geometry, Extent] {
val name: String = "st_extentFromGeom"
def function = _.extent
def function = _.extent
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ import com.azavea.hiveless.spatial._
import geotrellis.vector._

class ST_ExtentToGeom extends HUDF[Extent, Geometry] {
val name: String = "st_extentToGeom"
def function = _.toPolygon
def function = _.toPolygon
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@ import geotrellis.proj4.CRS
import geotrellis.vector._

class ST_GeomReproject extends HUDF[(Geometry, CRS, CRS), Geometry] {
val name: String = "st_geomReproject"
def function = { (geom: Geometry, from: CRS, to: CRS) => geom.reproject(from, to) }
def function = { (geom: Geometry, from: CRS, to: CRS) => geom.reproject(from, to) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import geotrellis.vector._
import shapeless._

class ST_Intersects extends HUDF[(ST_Intersects.Arg, ST_Intersects.Arg), Boolean] {
val name: String = "st_intersects"
def function = ST_Intersects.function
def function = ST_Intersects.function
}

object ST_Intersects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ import geotrellis.vector.Extent
import com.azavea.hiveless.implicits.tupler._

class ST_MakeExtent extends HUDF[(Double, Double, Double, Double), Extent] {
val name: String = "st_makeExtent"
def function = Extent.apply
def function = Extent.apply
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import geotrellis.proj4.{CRS, LatLng}
import geotrellis.store.index.zcurve.Z2

class ST_PartitionCentroid extends HUDF[(Geometry, Int, Option[Int], Option[Int], Option[CRS], Option[Double]), Long] {
val name: String = "st_partitionCentroid"
def function = ST_PartitionCentroid.function
def function = ST_PartitionCentroid.function
}

object ST_PartitionCentroid {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import geotrellis.store.index.zcurve.Z2
import geotrellis.vector.Geometry

class ST_Z2LatLon extends HUDF[Geometry, Z2Index] {
val name: String = "st_z2LatLon"
def function = ST_Z2LatLon.function
def function = ST_Z2LatLon.function
}

object ST_Z2LatLon {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ class STIndexInjectorSpec extends AnyFunSpec with InjectOptimizerTestEnvironment
val dfe = ssc.sql(
"""
|SELECT * FROM polygons_parquet
|WHERE bbox.xmin >= -75.5859375
|AND bbox.ymin >= 40.3251777
|AND bbox.xmax <= -72.4101562
|AND bbox.ymax <= 43.1971673
|WHERE isNotNull(bbox)
|AND (((bbox.xmin >= -75.5859375
|OR bbox.ymin >= 40.3251777)
|OR bbox.xmax <= -72.4101562)
|OR bbox.ymax <= 43.1971673)
|AND ST_Intersects(bbox, ST_GeomFromGeoJSON('{"type":"Polygon","coordinates":[[[-75.5859375,40.32517767999294],[-75.5859375,43.197167282501276],[-72.41015625,43.197167282501276],[-72.41015625,40.32517767999294],[-75.5859375,40.32517767999294]]]}'))
|""".stripMargin
)
Expand Down
Loading

0 comments on commit 5ef8031

Please sign in to comment.