Skip to content

Commit

Permalink
Merge pull request #2 from bemu/develop
Browse files Browse the repository at this point in the history
Add nested flattener algorithm, custom partition recovery strategy, custom date formatters and spark reader modes
  • Loading branch information
bemu authored Dec 5, 2019
2 parents a9664a7 + b84bd93 commit 9ea1d18
Show file tree
Hide file tree
Showing 139 changed files with 2,834 additions and 239 deletions.
6 changes: 0 additions & 6 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
pipeline {
agent { label 'm3d' }

options {
ansiColor('xterm')
disableConcurrentBuilds()
Expand All @@ -10,10 +8,6 @@ pipeline {
buildDiscarder(logRotator(daysToKeepStr: '32', numToKeepStr: '16'))
}

environment {
GIT_CREDENTIALS = "9654c627-4650-4079-be03-2d0336fe724f"
}

stages {
stage('cleanup workspace') {
steps {
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ The parameter file for the full load algorithm for example has the following con
"has_header": false,
"partition_column": "date_column_name",
"partition_column_format": "yyyyMMdd",
"partition_columns": [
"target_partitions": [
"year",
"month"
],
Expand All @@ -103,7 +103,7 @@ The parameter file for the full load algorithm for example has the following con
* `has_header` flag defining whether the input files have a header
* `partition_column` column that contains the partitioning information
* `partition_column_format` format of the partitioning column in the case of of time/date columns
* `partition_columns` partitioning columns
* `target_partitions` partitioning columns in the target
* `target_table` target table where the data will be available for querying after loading

### License and Software Information
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/com/adidas/analytics/AlgorithmFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ object AlgorithmFactory {
case "PartitionRangeMaterialization" => PartitionMaterialization.newRangeMaterialization(spark, dfs, configLocation)
case "PartitionQueryMaterialization" => PartitionMaterialization.newQueryMaterialization(spark, dfs, configLocation)
case "FixedSizeStringExtractor" => FixedSizeStringExtractor(spark, dfs, configLocation)
case "NestedFlattener" => NestedFlattener(spark, dfs, configLocation)
case _ => throw new RuntimeException(s"Unable to find algorithm corresponding to $className")
}
}
Expand Down
24 changes: 12 additions & 12 deletions src/main/scala/com/adidas/analytics/algo/AppendLoad.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v
}

override protected def transform(dataFrames: Vector[DataFrame]): Vector[DataFrame] = {
dataFrames.map(df => df.transform(addPartitionColumns(columnToRegexPairs, targetSchema)))
dataFrames.map(df => df.transform(addtargetPartitions(columnToRegexPairs, targetSchema)))
}

override protected def write(dataFrames: Vector[DataFrame]): Unit = {
writeHeaders(dataFrames, partitionColumns, headerDir, dfs)
writeHeaders(dataFrames, targetPartitions, headerDir, dfs)
super.write(dataFrames)
if (computeTableStatistics && dataType == STRUCTURED)
computeStatisticsForTable(targetTable)
Expand All @@ -47,7 +47,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v
}

private def listSources(inputDirPath: Path, headerDirPath: Path, fs: FileSystem, targetSchema: StructType): Seq[Source] = {
val targetSchemaWithoutPartitionColumns = getSchemaWithoutPartitionColumns(targetSchema, partitionColumns.toSet)
val targetSchemaWithouttargetPartitions = getSchemaWithouttargetPartitions(targetSchema, targetPartitions.toSet)

logger.info(s"Looking for input files in $inputDirPath")
val groupedHeaderPathAndSourcePaths = fs.ls(inputDirPath, recursive = true).groupBy { inputPath =>
Expand All @@ -56,7 +56,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v

def getMapSchemaStructToPath = {
val mapSchemaStructToPath = groupedHeaderPathAndSourcePaths.toSeq.map { case (headerPath, sourcePaths) =>
getSchemaFromHeaderOrSource(fs, headerPath, sourcePaths, targetSchemaWithoutPartitionColumns)
getSchemaFromHeaderOrSource(fs, headerPath, sourcePaths, targetSchemaWithouttargetPartitions)
}.groupBy(_._1).map { case (k, v) => (k, v.flatMap(_._2)) }

val filteredMapSchemaStructToPath = mapSchemaStructToPath.filter(schemaFromInputData => matchingSchemas_?(schemaFromInputData._1, targetSchema, schemaFromInputData._2))
Expand All @@ -73,7 +73,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v

val schemaAndSourcePath = if(!verifySchema) {
groupedHeaderPathAndSourcePaths.flatMap { case (headerPath, sourcePaths) =>
val schema = if (fs.exists(headerPath)) loadHeader(headerPath, fs) else targetSchemaWithoutPartitionColumns
val schema = if (fs.exists(headerPath)) loadHeader(headerPath, fs) else targetSchemaWithouttargetPartitions
sourcePaths.map { sourcePath =>
Source(schema, sourcePath.toString)
}
Expand All @@ -84,7 +84,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v
schemaAndSourcePath.toSeq
}

private def getSchemaFromHeaderOrSource(fs: FileSystem, headerPath: Path, sourcePaths: Seq[Path], targetSchemaWithoutPartitionColumns: StructType): (StructType, Seq[Path]) ={
private def getSchemaFromHeaderOrSource(fs: FileSystem, headerPath: Path, sourcePaths: Seq[Path], targetSchemaWithouttargetPartitions: StructType): (StructType, Seq[Path]) ={
val schema = if (fs.exists(headerPath)){
loadHeader(headerPath, fs) }
else {
Expand Down Expand Up @@ -143,8 +143,8 @@ object AppendLoad {
path.replaceFirst("\\w+\\d*://.+?/", "")
}

private def getSchemaWithoutPartitionColumns(targetSchema: StructType, partitionColumns: Set[String]): StructType = {
StructType(targetSchema.fields.filterNot(field => partitionColumns.contains(field.name)))
private def getSchemaWithouttargetPartitions(targetSchema: StructType, targetPartitions: Set[String]): StructType = {
StructType(targetSchema.fields.filterNot(field => targetPartitions.contains(field.name)))
}

private def groupSourcesBySchema(sources: Seq[Source]): Map[StructType, Seq[String]] = {
Expand All @@ -153,7 +153,7 @@ object AppendLoad {
}
}

private def addPartitionColumns(columnNameToRegexPairs: Seq[(String, String)], schema: StructType)(inputDf: DataFrame): DataFrame = {
private def addtargetPartitions(columnNameToRegexPairs: Seq[(String, String)], schema: StructType)(inputDf: DataFrame): DataFrame = {
def getInputFileName: Column = {
udf((path: String) => extractPathWithoutServerAndProtocol(path)).apply(input_file_name)
}
Expand Down Expand Up @@ -185,13 +185,13 @@ object AppendLoad {
DataType.fromJson(fs.readFile(headerPath)).asInstanceOf[StructType]
}

protected def writeHeaders(dataFrames: Seq[DataFrame], partitionColumns: Seq[String], headerDir: String, dfs: DFSWrapper): Unit = {
protected def writeHeaders(dataFrames: Seq[DataFrame], targetPartitions: Seq[String], headerDir: String, dfs: DFSWrapper): Unit = {
logger.info(s"Writing header files to $headerDir")
val headerDirPath = new Path(headerDir)
val fs = dfs.getFileSystem(headerDirPath)
dataFrames.foreach { df =>
val schemaJson = getSchemaWithoutPartitionColumns(df.schema, partitionColumns.toSet).prettyJson
df.collectPartitions(partitionColumns).foreach { partitionCriteria =>
val schemaJson = getSchemaWithouttargetPartitions(df.schema, targetPartitions.toSet).prettyJson
df.collectPartitions(targetPartitions).foreach { partitionCriteria =>
val subdirectories = DataFrameUtils.mapPartitionsToDirectories(partitionCriteria)
val headerPath = new Path(headerDirPath.join(subdirectories), headerFileName)
if (!fs.exists(headerPath)) {
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/adidas/analytics/algo/DeltaLoad.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ final class DeltaLoad protected(val spark: SparkSession, val dfs: DFSWrapper, va
extends Algorithm with PartitionedDeltaLoadConfiguration with DateComponentDerivation {

override protected def transform(dataFrames: Vector[DataFrame]): Vector[DataFrame] = {
val dataFramesWithPartitionColumnsAdded = withDatePartitions(spark, dfs, dataFrames.take(1))
val deltaRecords = dataFramesWithPartitionColumnsAdded(0).persist(StorageLevel.MEMORY_AND_DISK)
val dataFramesWithTargetPartitionsAdded = withDatePartitions(spark, dfs, dataFrames.take(1))
val deltaRecords = dataFramesWithTargetPartitionsAdded(0).persist(StorageLevel.MEMORY_AND_DISK)

val activeRecords = dataFrames(1)

val partitions = deltaRecords.collectPartitions(partitionColumns)
val partitions = deltaRecords.collectPartitions(targetPartitions)
val isRequiredPartition = DataFrameUtils.buildPartitionsCriteriaMatcherFunc(partitions, activeRecords.schema)

// Create DataFrame containing full content of partitions that need to be touched
Expand Down Expand Up @@ -70,8 +70,8 @@ final class DeltaLoad protected(val spark: SparkSession, val dfs: DFSWrapper, va
logger.info("Adding partitioning information if needed")
try {
dataFrames.map { df =>
if (df.columns.toSeq.intersect(partitionColumns) != partitionColumns){
df.transform(withDateComponents(partitionSourceColumn, partitionSourceColumnFormat, partitionColumns))
if (df.columns.toSeq.intersect(targetPartitions) != targetPartitions){
df.transform(withDateComponents(partitionSourceColumn, partitionSourceColumnFormat, targetPartitions))
}
else df
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ final class FixedSizeStringExtractor protected(val spark: SparkSession, val dfs:
}

def extractFields(df: DataFrame, targetSchema: StructType): DataFrame = {
val nonPartitionFields = targetSchema.fields.filter(field => !partitionColumnsSet.contains(field.name))
val nonPartitionFields = targetSchema.fields.filter(field => !targetPartitionsSet.contains(field.name))
if (substringPositions.length != nonPartitionFields.length) {
throw new RuntimeException("Field positions do not correspond to the target schema")
}
Expand Down
48 changes: 32 additions & 16 deletions src/main/scala/com/adidas/analytics/algo/FullLoad.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
package com.adidas.analytics.algo

import com.adidas.analytics.config.FullLoadConfiguration
import com.adidas.analytics.algo.FullLoad._
import com.adidas.analytics.algo.core.Algorithm
import com.adidas.analytics.algo.core.Algorithm.{ComputeTableStatisticsOperation, WriteOperation}
import com.adidas.analytics.algo.shared.DateComponentDerivation
import com.adidas.analytics.config.FullLoadConfiguration
import com.adidas.analytics.util.DFSWrapper._
import com.adidas.analytics.util.DataFormat.{DSVFormat, ParquetFormat}
import com.adidas.analytics.util._
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}

import scala.util.{Failure, Success, Try}


final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val configLocation: String)
extends Algorithm with WriteOperation with FullLoadConfiguration with DateComponentDerivation with ComputeTableStatisticsOperation{

val currentHdfsDir: String = HiveTableAttributeReader(targetTable, spark).getTableLocation

override protected def read(): Vector[DataFrame] = {
createBackupTable()

Expand All @@ -33,21 +37,32 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val
}

override protected def write(dataFrames: Vector[DataFrame]): Unit = {
super.write(dataFrames)
restoreTable()
if (computeTableStatistics && dataType == STRUCTURED)
computeStatisticsForTable(Some(targetTable))
Try{
super.write(dataFrames)
} match {
case Failure(exception) =>
logger.error(s"Handled Exception: ${exception.getMessage}. " +
s"Start Rolling Back the Full Load of table: ${targetTable}!")
recoverFailedWrite()
cleanupDirectory(backupDir)
throw new RuntimeException(exception.getMessage)
case Success(_) =>
restoreTable()
if (computeTableStatistics && dataType == STRUCTURED)
computeStatisticsForTable(Option(targetTable))
}

}

private def createBackupTable(): Unit = {
createDirectory(backupDir)

// backup the data from the current dir because currently data directory for full load is varying
val currentDir = HiveTableAttributeReader(targetTable, spark).getTableLocation
backupDataDirectory(currentDir, backupDir)

backupDataDirectory(currentHdfsDir, backupDir)

try {
dropAndRecreateTableInNewLocation(targetTable, backupDir, partitionColumns)
dropAndRecreateTableInNewLocation(targetTable, backupDir, targetPartitions)
} catch {
case e: Throwable =>
logger.error("Data backup failed", e)
Expand Down Expand Up @@ -87,7 +102,7 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val
DistCpLoadHelper.backupDirectoryContent(dfs, sourceDir, destinationDir)
}

private def dropAndRecreateTableInNewLocation(table: String, destinationDir: String, partitionColumns: Seq[String]): Unit = {
private def dropAndRecreateTableInNewLocation(table: String, destinationDir: String, targetPartitions: Seq[String]): Unit = {
val tempTable: String = s"${table}_temp"
val tempTableDummyLocation: String = s"/tmp/$table"

Expand All @@ -97,7 +112,7 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val
//create the target table like the temp table with data in the new directory
createTable(tempTable, table, destinationDir)

if (partitionColumns.nonEmpty) {
if (targetPartitions.nonEmpty) {
spark.catalog.recoverPartitions(table)
}
}
Expand All @@ -111,8 +126,8 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val
private def withDatePartitions(dataFrames: Vector[DataFrame]): Vector[DataFrame] ={
logger.info("Adding partitioning information if needed")
try {
if (partitionColumns.nonEmpty) {
dataFrames.map(df => df.transform(withDateComponents(partitionSourceColumn, partitionSourceColumnFormat, partitionColumns)))
if (targetPartitions.nonEmpty) {
dataFrames.map(df => df.transform(withDateComponents(partitionSourceColumn, partitionSourceColumnFormat, targetPartitions)))
} else {
dataFrames
}
Expand All @@ -128,7 +143,7 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val

private def restoreTable(): Unit ={
try {
dropAndRecreateTableInNewLocation(targetTable, currentDir, partitionColumns)
dropAndRecreateTableInNewLocation(targetTable, currentDir, targetPartitions)
} catch {
case e: Throwable =>
logger.error("Data writing failed", e)
Expand All @@ -151,23 +166,24 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val
spark.sql(s"DROP TABLE IF EXISTS $tempTable")
}

if (partitionColumns.nonEmpty) {
if (targetPartitions.nonEmpty) {
spark.catalog.recoverPartitions(targetTable)
}
}

private def recoverFailedRead(): Unit = {
dropAndRecreateTableInNewLocation(targetTable, currentDir, partitionColumns)
dropAndRecreateTableInNewLocation(targetTable, currentDir, targetPartitions)
}

private def recoverFailedWrite(): Unit = {
restoreDirectoryContent(currentDir, backupDir)
dropAndRecreateTableInNewLocation(targetTable, currentDir, partitionColumns)
dropAndRecreateTableInNewLocation(targetTable, currentDir, targetPartitions)
}

private def restoreDirectoryContent(sourceDir: String, backupDir: String): Unit = {
DistCpLoadHelper.restoreDirectoryContent(dfs, sourceDir, backupDir)
}

}


Expand Down
Loading

0 comments on commit 9ea1d18

Please sign in to comment.