Skip to content

Commit

Permalink
Merge pull request #3 from bemu/develop
Browse files Browse the repository at this point in the history
allow programmatic addition of partitions, use fail-fast for full loads as default and add custom date formatters
  • Loading branch information
bemu authored Mar 25, 2020
2 parents 9ea1d18 + d1adc17 commit d83f9c4
Show file tree
Hide file tree
Showing 43 changed files with 895 additions and 151 deletions.
37 changes: 24 additions & 13 deletions src/main/scala/com/adidas/analytics/algo/AppendLoad.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package com.adidas.analytics.algo
import java.util.regex.Pattern

import com.adidas.analytics.algo.AppendLoad.{logger, _}
import com.adidas.analytics.algo.core.Algorithm
import com.adidas.analytics.algo.core.Algorithm.ComputeTableStatisticsOperation
import com.adidas.analytics.algo.core.{Algorithm, TableStatistics}
import com.adidas.analytics.config.AppendLoadConfiguration
import com.adidas.analytics.util.DFSWrapper._
import com.adidas.analytics.util.DataFormat.{DSVFormat, JSONFormat, ParquetFormat}
Expand All @@ -16,11 +15,13 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, _}
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.immutable

/**
* Performs append load of new records to an existing table.
*/
final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val configLocation: String)
extends Algorithm with AppendLoadConfiguration with ComputeTableStatisticsOperation{
extends Algorithm with AppendLoadConfiguration with TableStatistics {

override protected def read(): Vector[DataFrame] = {
readInputData(targetSchema, spark, dfs)
Expand All @@ -30,11 +31,19 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v
dataFrames.map(df => df.transform(addtargetPartitions(columnToRegexPairs, targetSchema)))
}

override protected def write(dataFrames: Vector[DataFrame]): Unit = {
override protected def write(dataFrames: Vector[DataFrame]): Vector[DataFrame] = {
writeHeaders(dataFrames, targetPartitions, headerDir, dfs)
super.write(dataFrames)
if (computeTableStatistics && dataType == STRUCTURED)
}

override protected def updateStatistics(dataFrames: Vector[DataFrame]): Unit = {
if (computeTableStatistics && dataType == STRUCTURED && targetTable.isDefined) {
if (targetPartitions.nonEmpty) {
dataFrames.foreach(df => computeStatisticsForTablePartitions(df, targetTable.get, targetPartitions))
}
computeStatisticsForTable(targetTable)
}

}

private def readInputData(targetSchema: StructType, spark: SparkSession, dfs: DFSWrapper): Vector[DataFrame] = {
Expand All @@ -54,7 +63,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v
buildHeaderFilePath(columnToRegexPairs, targetSchema, extractPathWithoutServerAndProtocol(inputPath.toString), headerDirPath)
}

def getMapSchemaStructToPath = {
def getMapSchemaStructToPath: immutable.Iterable[Source] = {
val mapSchemaStructToPath = groupedHeaderPathAndSourcePaths.toSeq.map { case (headerPath, sourcePaths) =>
getSchemaFromHeaderOrSource(fs, headerPath, sourcePaths, targetSchemaWithouttargetPartitions)
}.groupBy(_._1).map { case (k, v) => (k, v.flatMap(_._2)) }
Expand All @@ -71,7 +80,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v
}
}

val schemaAndSourcePath = if(!verifySchema) {
val schemaAndSourcePath = if (!verifySchema) {
groupedHeaderPathAndSourcePaths.flatMap { case (headerPath, sourcePaths) =>
val schema = if (fs.exists(headerPath)) loadHeader(headerPath, fs) else targetSchemaWithouttargetPartitions
sourcePaths.map { sourcePath =>
Expand All @@ -84,9 +93,10 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v
schemaAndSourcePath.toSeq
}

private def getSchemaFromHeaderOrSource(fs: FileSystem, headerPath: Path, sourcePaths: Seq[Path], targetSchemaWithouttargetPartitions: StructType): (StructType, Seq[Path]) ={
val schema = if (fs.exists(headerPath)){
loadHeader(headerPath, fs) }
private def getSchemaFromHeaderOrSource(fs: FileSystem, headerPath: Path, sourcePaths: Seq[Path], targetSchemaWithouttargetPartitions: StructType): (StructType, Seq[Path]) = {
val schema = if (fs.exists(headerPath)) {
loadHeader(headerPath, fs)
}
else {
inferSchemaFromSource(sourcePaths)
}
Expand All @@ -105,10 +115,10 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v
}

private def matchingSchemas_?(schemaFromInputData: StructType, targetSchema: StructType, paths: Seq[Path]): Boolean = {
val inputColumnsVector = schemaFromInputData.names.toVector
val targetColumnsVector = targetSchema.names.toVector
val inputColumnsVector = schemaFromInputData.names.toVector
val targetColumnsVector = targetSchema.names.toVector
val diff = inputColumnsVector.diff(targetColumnsVector)
if(diff.nonEmpty)
if (diff.nonEmpty)
logger.error(s"Inferred schema does not match the target schema for ${paths.toString}")
diff.isEmpty
}
Expand Down Expand Up @@ -220,4 +230,5 @@ object AppendLoad {
}

protected case class Source(schema: StructType, inputFileLocation: String)

}
23 changes: 16 additions & 7 deletions src/main/scala/com/adidas/analytics/algo/FullLoad.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package com.adidas.analytics.algo

import com.adidas.analytics.config.FullLoadConfiguration
import com.adidas.analytics.algo.FullLoad._
import com.adidas.analytics.algo.core.Algorithm
import com.adidas.analytics.algo.core.Algorithm.{ComputeTableStatisticsOperation, WriteOperation}
import com.adidas.analytics.algo.core.{Algorithm, TableStatistics}
import com.adidas.analytics.algo.core.Algorithm.WriteOperation
import com.adidas.analytics.algo.shared.DateComponentDerivation
import com.adidas.analytics.util.DFSWrapper._
import com.adidas.analytics.util.DataFormat.{DSVFormat, ParquetFormat}
Expand All @@ -16,7 +16,7 @@ import scala.util.{Failure, Success, Try}


final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val configLocation: String)
extends Algorithm with WriteOperation with FullLoadConfiguration with DateComponentDerivation with ComputeTableStatisticsOperation{
extends Algorithm with WriteOperation with FullLoadConfiguration with DateComponentDerivation with TableStatistics {

val currentHdfsDir: String = HiveTableAttributeReader(targetTable, spark).getTableLocation

Expand All @@ -36,7 +36,7 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val
withDatePartitions(dataFrames)
}

override protected def write(dataFrames: Vector[DataFrame]): Unit = {
override protected def write(dataFrames: Vector[DataFrame]): Vector[DataFrame] = {
Try{
super.write(dataFrames)
} match {
Expand All @@ -46,14 +46,23 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val
recoverFailedWrite()
cleanupDirectory(backupDir)
throw new RuntimeException(exception.getMessage)
case Success(_) =>
case Success(outputDaframe) =>
restoreTable()
if (computeTableStatistics && dataType == STRUCTURED)
computeStatisticsForTable(Option(targetTable))
outputDaframe
}

}

override protected def updateStatistics(dataFrames: Vector[DataFrame]): Unit = {
if (computeTableStatistics && dataType == STRUCTURED) {
if(targetPartitions.nonEmpty) {
dataFrames.foreach(df => computeStatisticsForTablePartitions(df,targetTable, targetPartitions))
}
computeStatisticsForTable(Option(targetTable))
}

}

private def createBackupTable(): Unit = {
createDirectory(backupDir)

Expand Down
46 changes: 39 additions & 7 deletions src/main/scala/com/adidas/analytics/algo/core/Algorithm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ import com.adidas.analytics.util.{DFSWrapper, InputReader, OutputWriter}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}


/**
* Base trait for algorithms that defines their base methods
*/
trait Algorithm extends JobRunner with Serializable with BaseReadOperation with BaseWriteOperation {
trait Algorithm extends JobRunner
with Serializable
with BaseReadOperation
with BaseWriteOperation
with BaseUpdateStatisticsOperation {

protected def spark: SparkSession

Expand All @@ -32,7 +37,9 @@ trait Algorithm extends JobRunner with Serializable with BaseReadOperation with
logger.info("Starting processing stage...")
val result = transform(inputDateFrames)
logger.info("Starting writing stage...")
write(result)
val outputResult = write(result)
logger.info("Starting computing statistics...")
updateStatistics(outputResult)
}
}

Expand All @@ -54,6 +61,31 @@ object Algorithm {
protected def read(): Vector[DataFrame]
}

/**
* Base trait for update statistics operations
*/
trait BaseUpdateStatisticsOperation {

/**
* Reads the produced output dataframe and update table statistics
*
* @return DataFrame written in writer() step
*/
protected def updateStatistics(dataFrames: Vector[DataFrame]): Unit
}

/**
* The simplest implementation of update statistics
*/
trait UpdateStatisticsOperation extends BaseUpdateStatisticsOperation {
/**
* By default the Update Statistics are disabled for a given Algorithm
* @param dataFrames Dataframes to compute statistics
*/
override protected def updateStatistics(dataFrames: Vector[DataFrame]): Unit = logger.info("Skipping update statistics step!")

}

/**
* Base trait for write operations
*/
Expand All @@ -71,7 +103,7 @@ object Algorithm {
*
* @param dataFrames DataFrame to write
*/
protected def write(dataFrames: Vector[DataFrame]): Unit
protected def write(dataFrames: Vector[DataFrame]): Vector[DataFrame]
}

/**
Expand Down Expand Up @@ -107,8 +139,8 @@ object Algorithm {
*/
protected def writer: AtomicWriter

override protected def write(dataFrames: Vector[DataFrame]): Unit = {
dataFrames.foreach { df =>
override protected def write(dataFrames: Vector[DataFrame]): Vector[DataFrame] = {
dataFrames.map { df =>
writer.writeWithBackup(dfs, outputFilesNum.map(df.repartition).getOrElse(df))
}
}
Expand All @@ -129,8 +161,8 @@ object Algorithm {
*/
protected def writer: OutputWriter

override protected def write(dataFrames: Vector[DataFrame]): Unit = {
dataFrames.foreach { df =>
override protected def write(dataFrames: Vector[DataFrame]): Vector[DataFrame] = {
dataFrames.map { df =>
writer.write(dfs, outputFilesNum.map(df.repartition).getOrElse(df))
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/com/adidas/analytics/algo/core/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package com.adidas.analytics.algo.core

import org.apache.spark.sql.DataFrame


/**
* This is a generic trait for all strategies that will
* add new partitions on metadata table
*/
trait Metadata {

protected val tableName: String
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.adidas.analytics.algo.core

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}

/**
* This is a trait with generic logic to interact with dataframes on partition level
*/
trait PartitionHelpers {

protected def getDistinctPartitions(outputDataFrame: DataFrame, targetPartitions: Seq[String]): Dataset[Row] = {
val targetPartitionsColumns: Seq[Column] = targetPartitions.map(partitionString => col(partitionString))

outputDataFrame.select(targetPartitionsColumns: _*).distinct
}

protected def getParameterValue(row: Row, partitionString: String): String =
createParameterValue(row.get(row.fieldIndex(partitionString)))

protected def createParameterValue(partitionRawValue: Any): String =
partitionRawValue match {
case value: java.lang.Short => value.toString
case value: java.lang.Integer => value.toString
case value: scala.Predef.String => "'" + value + "'"
case null => throw new Exception("Partition Value is null. No support for null partitions!")
case value => throw new Exception("Unsupported partition DataType: " + value.getClass)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.adidas.analytics.algo.core

import org.apache.spark.sql._
import scala.collection.JavaConversions._

/**
* This is a generic trait to use in the algorithms where we want
* to compute statistics on table and partition level
*/
trait TableStatistics extends PartitionHelpers {

protected def spark: SparkSession

/**
* will add statistics on partition level using HiveQL statements
*/
protected def computeStatisticsForTablePartitions(df: DataFrame,
targetTable: String,
targetPartitions: Seq[String]): Unit = {

val distinctPartitions: DataFrame = getDistinctPartitions(df, targetPartitions)

generateComputePartitionStatements(distinctPartitions, targetTable, targetPartitions)
.collectAsList()
.foreach((statement: String) => spark.sql(statement))

}

/**
* will add statistics on table level using HiveQL statements
*/
protected def computeStatisticsForTable(tableName: Option[String]): Unit = tableName match {
case Some(table) => spark.sql(s"ANALYZE TABLE ${table} COMPUTE STATISTICS")
case None => Unit
}

private def generateComputePartitionStatements(df: DataFrame,
targetTable: String,
targetPartitions: Seq[String]): Dataset[String] = {
df.map(partitionValue => {
val partitionStatementValues: Seq[String] = targetPartitions
.map(partitionColumn => s"${partitionColumn}=${getParameterValue(partitionValue, partitionColumn)}")

s"ANALYZE TABLE ${targetTable} PARTITION(${partitionStatementValues.mkString(",")}) COMPUTE STATISTICS"
})(Encoders.STRING)
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.adidas.analytics.config

import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation}
import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation, UpdateStatisticsOperation}
import com.adidas.analytics.config.AlgorithmTemplateConfiguration.ruleToLocalDate
import com.adidas.analytics.config.shared.{ConfigurationContext, MetadataUpdateStrategy}
import com.adidas.analytics.util.DataFormat.ParquetFormat
Expand All @@ -14,6 +14,7 @@ import org.joda.time.{Days, LocalDate}
trait AlgorithmTemplateConfiguration extends ConfigurationContext
with ReadOperation
with SafeWriteOperation
with UpdateStatisticsOperation
with MetadataUpdateStrategy {

protected def spark: SparkSession
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package com.adidas.analytics.config

import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation}
import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation, UpdateStatisticsOperation}
import com.adidas.analytics.config.shared.{ConfigurationContext, DateComponentDerivationConfiguration, MetadataUpdateStrategy}
import com.adidas.analytics.util.DataFormat.ParquetFormat
import com.adidas.analytics.util.{DataFormat, InputReader, LoadMode, OutputWriter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession}


trait DeltaLoadConfiguration extends ConfigurationContext with MetadataUpdateStrategy {
trait DeltaLoadConfiguration extends ConfigurationContext
with UpdateStatisticsOperation
with MetadataUpdateStrategy {

protected val activeRecordsTable: String = configReader.getAs[String]("active_records_table_lake")
protected val deltaRecordsTable: Option[String] = configReader.getAsOption[String]("delta_records_table_lake")
Expand All @@ -21,7 +23,12 @@ trait DeltaLoadConfiguration extends ConfigurationContext with MetadataUpdateStr
protected val recordModeColumnName: String = "recordmode"
protected val upsertRecordModes: Seq[String] = Seq("", "N")
protected val upsertRecordsModesFilterFunction: Row => Boolean = { row: Row =>
val recordmode = row.getAs[String](recordModeColumnName)
var recordmode = ""
try {
recordmode = row.getAs[String](recordModeColumnName)
} catch {
case _ => recordmode = row.getAs[String](recordModeColumnName.toUpperCase)
}
recordmode == null || recordmode == "" || recordmode == "N"
}
}
Expand Down
Loading

0 comments on commit d83f9c4

Please sign in to comment.