Skip to content

Commit

Permalink
Merge pull request #1 from adidas/semistructured_data_load
Browse files Browse the repository at this point in the history
Add support for semistrucured data loads
  • Loading branch information
bemu authored Oct 8, 2019
2 parents c202767 + c146550 commit a9664a7
Show file tree
Hide file tree
Showing 70 changed files with 2,155 additions and 55 deletions.
70 changes: 62 additions & 8 deletions src/main/scala/com/adidas/analytics/algo/AppendLoad.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.adidas.analytics.algo.core.Algorithm
import com.adidas.analytics.algo.core.Algorithm.ComputeTableStatisticsOperation
import com.adidas.analytics.config.AppendLoadConfiguration
import com.adidas.analytics.util.DFSWrapper._
import com.adidas.analytics.util.DataFormat.{DSVFormat, ParquetFormat}
import com.adidas.analytics.util.DataFormat.{DSVFormat, JSONFormat, ParquetFormat}
import com.adidas.analytics.util.DataFrameUtils._
import com.adidas.analytics.util._
import org.apache.hadoop.fs.{FileSystem, Path}
Expand All @@ -33,7 +33,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v
override protected def write(dataFrames: Vector[DataFrame]): Unit = {
writeHeaders(dataFrames, partitionColumns, headerDir, dfs)
super.write(dataFrames)
if (computeTableStatistics)
if (computeTableStatistics && dataType == STRUCTURED)
computeStatisticsForTable(targetTable)
}

Expand All @@ -50,14 +50,67 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v
val targetSchemaWithoutPartitionColumns = getSchemaWithoutPartitionColumns(targetSchema, partitionColumns.toSet)

logger.info(s"Looking for input files in $inputDirPath")
fs.ls(inputDirPath, recursive = true).groupBy { inputPath =>
val groupedHeaderPathAndSourcePaths = fs.ls(inputDirPath, recursive = true).groupBy { inputPath =>
buildHeaderFilePath(columnToRegexPairs, targetSchema, extractPathWithoutServerAndProtocol(inputPath.toString), headerDirPath)
}.flatMap { case (headerPath, sourcePaths) =>
val schema = if (fs.exists(headerPath)) loadHeader(headerPath, fs) else targetSchemaWithoutPartitionColumns
sourcePaths.map { sourcePath =>
Source(schema, sourcePath.toString)
}

def getMapSchemaStructToPath = {
val mapSchemaStructToPath = groupedHeaderPathAndSourcePaths.toSeq.map { case (headerPath, sourcePaths) =>
getSchemaFromHeaderOrSource(fs, headerPath, sourcePaths, targetSchemaWithoutPartitionColumns)
}.groupBy(_._1).map { case (k, v) => (k, v.flatMap(_._2)) }

val filteredMapSchemaStructToPath = mapSchemaStructToPath.filter(schemaFromInputData => matchingSchemas_?(schemaFromInputData._1, targetSchema, schemaFromInputData._2))

if (mapSchemaStructToPath.size != filteredMapSchemaStructToPath.size)
throw new RuntimeException("Schema does not match the input data for some of the input folders.")

mapSchemaStructToPath.flatMap { case (schema, sourcePaths) =>
sourcePaths.map { sourcePath =>
Source(targetSchema, sourcePath.toString)
}
}
}

val schemaAndSourcePath = if(!verifySchema) {
groupedHeaderPathAndSourcePaths.flatMap { case (headerPath, sourcePaths) =>
val schema = if (fs.exists(headerPath)) loadHeader(headerPath, fs) else targetSchemaWithoutPartitionColumns
sourcePaths.map { sourcePath =>
Source(schema, sourcePath.toString)
}
}
}.toSeq
} else {
getMapSchemaStructToPath
}
schemaAndSourcePath.toSeq
}

private def getSchemaFromHeaderOrSource(fs: FileSystem, headerPath: Path, sourcePaths: Seq[Path], targetSchemaWithoutPartitionColumns: StructType): (StructType, Seq[Path]) ={
val schema = if (fs.exists(headerPath)){
loadHeader(headerPath, fs) }
else {
inferSchemaFromSource(sourcePaths)
}
(schema, sourcePaths)
}

private def inferSchemaFromSource(sourcePaths: Seq[Path]): StructType = {
val reader = spark.read.options(sparkReaderOptions)
val dataFormat = fileFormat match {
case "dsv" => DSVFormat()
case "parquet" => ParquetFormat()
case "json" => JSONFormat()
case anotherFormat => throw new RuntimeException(s"Unknown file format: $anotherFormat")
}
dataFormat.read(reader, sourcePaths.map(_.toString): _*).schema
}

private def matchingSchemas_?(schemaFromInputData: StructType, targetSchema: StructType, paths: Seq[Path]): Boolean = {
val inputColumnsVector = schemaFromInputData.names.toVector
val targetColumnsVector = targetSchema.names.toVector
val diff = inputColumnsVector.diff(targetColumnsVector)
if(diff.nonEmpty)
logger.error(s"Inferred schema does not match the target schema for ${paths.toString}")
diff.isEmpty
}

private def readSources(sources: Seq[Source], fs: FileSystem, spark: SparkSession): Vector[DataFrame] = {
Expand All @@ -70,6 +123,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v
fileFormat match {
case "dsv" => DSVFormat(Some(schema)).read(reader, inputPaths: _*)
case "parquet" => ParquetFormat(Some(schema)).read(reader, inputPaths: _*)
case "json" => JSONFormat(Some(schema)).read(reader, inputPaths: _*)
case anotherFormat => throw new RuntimeException(s"Unknown file format: $anotherFormat")
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/adidas/analytics/algo/FullLoad.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val
override protected def write(dataFrames: Vector[DataFrame]): Unit = {
super.write(dataFrames)
restoreTable()
if (computeTableStatistics)
computeStatisticsForTable(targetTable)
if (computeTableStatistics && dataType == STRUCTURED)
computeStatisticsForTable(Some(targetTable))
}

private def createBackupTable(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ object PartitionMaterialization {
}

def newRangeMaterialization(spark: SparkSession, dfs: DFSWrapper, configLocation: String): PartitionMaterialization = {
new RangeMaterialization(spark, dfs, LoadMode.OverwritePartitions, configLocation)
new RangeMaterialization(spark, dfs, LoadMode.OverwritePartitionsWithAddedColumns, configLocation)
}

def newQueryMaterialization(spark: SparkSession, dfs: DFSWrapper, configLocation: String): PartitionMaterialization = {
new QueryMaterialization(spark, dfs, LoadMode.OverwritePartitions, configLocation)
new QueryMaterialization(spark, dfs, LoadMode.OverwritePartitionsWithAddedColumns, configLocation)
}

private class FullMaterialization(val spark: SparkSession, val dfs: DFSWrapper, val loadMode: LoadMode, val configLocation: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ object Algorithm {

protected def spark: SparkSession

protected def computeStatisticsForTable(tableName: String): Unit = spark.sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
protected def computeStatisticsForTable(tableName: Option[String]): Unit = {
if (tableName.isDefined)
spark.sql(s"ANALYZE TABLE ${tableName.get} COMPUTE STATISTICS")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ trait AlgorithmTemplateConfiguration extends ConfigurationContext with ReadOpera
table = targetTable,
format = ParquetFormat (Some (targetSchema) ),
partitionColumns = Seq ("", "", ""), //If partitions are required, this would look like, e.g., Seq("year", "month")
loadMode = LoadMode.OverwritePartitions
loadMode = LoadMode.OverwritePartitionsWithAddedColumns
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import com.adidas.analytics.config.shared.{ConfigurationContext, LoadConfigurati
import com.adidas.analytics.util.DataFormat.ParquetFormat
import com.adidas.analytics.util.{LoadMode, OutputWriter}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}

import scala.util.parsing.json.JSONObject


trait AppendLoadConfiguration extends ConfigurationContext with LoadConfiguration with SafeWriteOperation {
Expand All @@ -16,14 +18,53 @@ trait AppendLoadConfiguration extends ConfigurationContext with LoadConfiguratio

protected val headerDir: String = configReader.getAs[String]("header_dir")

protected val targetTable: Option[String] = configReader.getAsOption[String]("target_table")

// This option is used to specify whether the input data schema must be the same as target schema specified in the configuration file
// Note: if it is set to True, it will cause input data to be read more than once
private val verifySchemaOption: Option[Boolean] = configReader.getAsOption[Boolean]("verify_schema")

protected val verifySchema: Boolean = dataType match {
case SEMISTRUCTURED => verifySchemaOption.getOrElse(true)
case _ => false
}

protected val columnToRegexPairs: Seq[(String, String)] = partitionColumns zip regexFilename

protected val targetSchema: StructType = spark.table(targetTable).schema
private val jsonSchemaOption: Option[JSONObject] = configReader.getAsOption[JSONObject]("schema")

protected val targetSchema: StructType = getTargetSchema

private val targetDir: Option[String] = configReader.getAsOption[String]("target_dir")

override protected val writer: OutputWriter.AtomicWriter = dataType match {
case STRUCTURED if targetTable.isDefined => OutputWriter.newTableLocationWriter(
table = targetTable.get,
format = ParquetFormat(Some(targetSchema)),
partitionColumns = partitionColumns,
loadMode = LoadMode.OverwritePartitionsWithAddedColumns
)
case SEMISTRUCTURED if targetDir.isDefined => OutputWriter.newFileSystemWriter(
location = targetDir.get,
format = ParquetFormat(Some(targetSchema)),
partitionColumns = partitionColumns,
loadMode = LoadMode.OverwritePartitions
)
case anotherDataType => throw new RuntimeException(s"Unsupported data type: $anotherDataType in AppendLoad or the configuration file is malformed.")
}

private def getTargetSchemaFromHiveTable: StructType = {
targetTable match {
case Some(tableName) => spark.table(tableName).schema
case None => throw new RuntimeException("No schema definition found.")
}
}

override protected val writer: OutputWriter.AtomicWriter = OutputWriter.newTableLocationWriter(
table = targetTable,
format = ParquetFormat(Some(targetSchema)),
partitionColumns = partitionColumns,
loadMode = LoadMode.OverwritePartitions
)
private def getTargetSchema: StructType = {
dataType match {
case STRUCTURED => getTargetSchemaFromHiveTable
case SEMISTRUCTURED if jsonSchemaOption.isDefined => DataType.fromJson(jsonSchemaOption.get.toString()).asInstanceOf[StructType]
case anotherDataType => throw new RuntimeException(s"Unsupported data type: $anotherDataType in AppendLoad or the configuration file is malformed.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object DeltaLoadConfiguration {
table = activeRecordsTable,
format = ParquetFormat(Some(targetSchema)),
partitionColumns = partitionColumns,
loadMode = LoadMode.OverwritePartitions
loadMode = LoadMode.OverwritePartitionsWithAddedColumns
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ trait FixedSizeStringExtractorConfiguration extends ConfigurationContext with Re
table = targetTable,
format = ParquetFormat(Some(targetSchema)),
partitionColumns = partitionColumnsOrdered,
loadMode = if (partitionColumnsOrdered.nonEmpty) LoadMode.OverwritePartitions else LoadMode.OverwriteTable
loadMode = if (partitionColumnsOrdered.nonEmpty) LoadMode.OverwritePartitionsWithAddedColumns else LoadMode.OverwriteTable
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ trait FullLoadConfiguration extends ConfigurationContext with LoadConfiguration

protected val backupDir: String = configReader.getAs[String]("backup_dir")

protected val targetTable: String = configReader.getAs[String]("target_table")

protected val targetSchema: StructType = spark.table(targetTable).schema

protected val writer: OutputWriter.AtomicWriter = OutputWriter.newFileSystemWriter(
location = currentDir,
format = ParquetFormat(Some(targetSchema)),
partitionColumns = partitionColumns,
loadMode = LoadMode.OverwriteTable
)
protected val writer: OutputWriter.AtomicWriter = dataType match {
case STRUCTURED => OutputWriter.newFileSystemWriter(
location = currentDir,
format = ParquetFormat(Some(targetSchema)),
partitionColumns = partitionColumns,
loadMode = LoadMode.OverwriteTable
)
case anotherDataType => throw new RuntimeException(s"Unsupported data type: $anotherDataType for FullLoad.")
}


override protected val partitionSourceColumn: String = configReader.getAs[String]("partition_column")
override protected val partitionSourceColumnFormat: String = configReader.getAs[String]("partition_column_format")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import com.adidas.analytics.util.ConfigReader
import org.apache.spark.sql.catalyst.util.{DropMalformedMode}

trait LoadConfiguration {
val STRUCTURED = "structured"
val SEMISTRUCTURED = "semistructured"

private val fileDelimiter: String = configReader.getAs[String]("delimiter")
private val hasHeader: Boolean = configReader.getAs[Boolean]("has_header")
private val fileDelimiter: Option[String] = configReader.getAsOption[String]("delimiter")
private val hasHeader: Option[Boolean] = configReader.getAsOption[Boolean]("has_header")

private val optionalSparkOptions: Map[String, String] = Map[String, Option[String]](
"nullValue" -> readNullValue,
Expand All @@ -15,16 +17,19 @@ trait LoadConfiguration {
case (key, Some(value)) => (key, value)
}

private val requiredSparkOptions: Map[String, String] = Map[String, String](
private val requiredSparkOptions: Map[String, String] = Map[String, Option[Any]](
"delimiter" -> fileDelimiter,
"header" -> hasHeader.toString,
"mode" -> loadMode
)
"header" -> hasHeader,
"mode" -> Some(loadMode)
).collect {
case (key, Some(value)) => (key, value.toString)
}


protected val partitionColumns: Seq[String] = configReader.getAsSeq[String]("partition_columns")
protected val inputDir: String = configReader.getAs[String]("source_dir")
protected val targetTable: String = configReader.getAs[String]("target_table")
protected val fileFormat: String = configReader.getAs[String]("file_format")
protected val dataType: String = configReader.getAsOption[String]("data_type").getOrElse(STRUCTURED)

protected val sparkReaderOptions: Map[String, String] = requiredSparkOptions ++ optionalSparkOptions

Expand Down
14 changes: 14 additions & 0 deletions src/main/scala/com/adidas/analytics/util/DataFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,19 @@ object DataFormat {
writer.csv(location)
}
}

case class JSONFormat(optionalSchema: Option[StructType] = None) extends DataFormat {

override def read(reader: DataFrameReader, locations: String*): DataFrame = {
val filesString = locations.mkString(", ")
logger.info(s"Reading JSON data from $filesString")
optionalSchema.fold(reader.option("inferSchema", "true"))(schema => reader.schema(schema)).json(locations: _*)
}

override def write(writer: DataFrameWriter[Row], location: String): Unit = {
logger.info(s"Writing JSON data to $location")
writer.json(location)
}
}
}

4 changes: 4 additions & 0 deletions src/main/scala/com/adidas/analytics/util/LoadMode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ object LoadMode {
override val sparkMode: SaveMode = SaveMode.Overwrite
}

case object OverwritePartitionsWithAddedColumns extends LoadMode {
override val sparkMode: SaveMode = SaveMode.Overwrite
}

case object AppendJoinPartitions extends LoadMode {
override def sparkMode: SaveMode = SaveMode.Append
}
Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/com/adidas/analytics/util/OutputWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object OutputWriter {
* @return TableWriter
*/
def newTableWriter(table: String, partitionColumns: Seq[String] = Seq.empty, options: Map[String, String] = Map.empty,
loadMode: LoadMode = LoadMode.OverwritePartitions): TableWriter = {
loadMode: LoadMode = LoadMode.OverwritePartitionsWithAddedColumns): TableWriter = {
TableWriter(table, partitionColumns, options, loadMode)
}

Expand All @@ -55,7 +55,7 @@ object OutputWriter {
* @return TableLocationWriter
*/
def newTableLocationWriter(table: String, format: DataFormat, partitionColumns: Seq[String] = Seq.empty,
options: Map[String, String] = Map.empty, loadMode: LoadMode = LoadMode.OverwritePartitions): TableLocationWriter = {
options: Map[String, String] = Map.empty, loadMode: LoadMode = LoadMode.OverwritePartitionsWithAddedColumns): TableLocationWriter = {
TableLocationWriter(table, format, partitionColumns, options, loadMode)
}

Expand All @@ -70,7 +70,7 @@ object OutputWriter {
* @return FileSystemWriter
*/
def newFileSystemWriter(location: String, format: DataFormat, partitionColumns: Seq[String] = Seq.empty,
options: Map[String, String] = Map.empty, loadMode: LoadMode = LoadMode.OverwritePartitions): FileSystemWriter = {
options: Map[String, String] = Map.empty, loadMode: LoadMode = LoadMode.OverwritePartitionsWithAddedColumns): FileSystemWriter = {
FileSystemWriter(location, format, partitionColumns, options, loadMode)
}

Expand Down Expand Up @@ -114,6 +114,8 @@ object OutputWriter {
case LoadMode.OverwriteTable =>
loadTable(fs, df, finalPath, tempDataPath, tempBackupPath)
case LoadMode.OverwritePartitions =>
loadPartitions(fs, df, finalPath, tempDataPath, tempBackupPath, partitionsCriteria)
case LoadMode.OverwritePartitionsWithAddedColumns =>
val existingDf = format.read(df.sparkSession.read, finalLocation)
val outputDf = df.addMissingColumns(existingDf.schema)
loadPartitions(fs, outputDf, finalPath, tempDataPath, tempBackupPath, partitionsCriteria)
Expand Down Expand Up @@ -162,7 +164,7 @@ object OutputWriter {
private def loadPartitions(fs: FileSystem, df: DataFrame, finalPath: Path, dataPath: Path, backupPath: Path,
partitionsCriteria: Seq[Seq[(String, String)]]): Unit = {
if (partitionsCriteria.nonEmpty) {
write(fs, df, dataPath, LoadMode.OverwritePartitions)
write(fs, df, dataPath, LoadMode.OverwritePartitionsWithAddedColumns)

logger.info(s"Creating backup in $backupPath")
val backupSpecs = HadoopLoadHelper.createMoveSpecs(fs, finalPath, backupPath, partitionsCriteria)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2}
{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1}
{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1}
{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3}
{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1}
{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6}
Loading

0 comments on commit a9664a7

Please sign in to comment.