diff --git a/src/main/scala/com/adidas/analytics/algo/AppendLoad.scala b/src/main/scala/com/adidas/analytics/algo/AppendLoad.scala index f7d02ba..c19451f 100644 --- a/src/main/scala/com/adidas/analytics/algo/AppendLoad.scala +++ b/src/main/scala/com/adidas/analytics/algo/AppendLoad.scala @@ -3,8 +3,7 @@ package com.adidas.analytics.algo import java.util.regex.Pattern import com.adidas.analytics.algo.AppendLoad.{logger, _} -import com.adidas.analytics.algo.core.Algorithm -import com.adidas.analytics.algo.core.Algorithm.ComputeTableStatisticsOperation +import com.adidas.analytics.algo.core.{Algorithm, TableStatistics} import com.adidas.analytics.config.AppendLoadConfiguration import com.adidas.analytics.util.DFSWrapper._ import com.adidas.analytics.util.DataFormat.{DSVFormat, JSONFormat, ParquetFormat} @@ -16,11 +15,13 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructType, _} import org.slf4j.{Logger, LoggerFactory} +import scala.collection.immutable + /** * Performs append load of new records to an existing table. */ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val configLocation: String) - extends Algorithm with AppendLoadConfiguration with ComputeTableStatisticsOperation{ + extends Algorithm with AppendLoadConfiguration with TableStatistics { override protected def read(): Vector[DataFrame] = { readInputData(targetSchema, spark, dfs) @@ -30,11 +31,19 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v dataFrames.map(df => df.transform(addtargetPartitions(columnToRegexPairs, targetSchema))) } - override protected def write(dataFrames: Vector[DataFrame]): Unit = { + override protected def write(dataFrames: Vector[DataFrame]): Vector[DataFrame] = { writeHeaders(dataFrames, targetPartitions, headerDir, dfs) super.write(dataFrames) - if (computeTableStatistics && dataType == STRUCTURED) + } + + override protected def updateStatistics(dataFrames: Vector[DataFrame]): Unit = { + if (computeTableStatistics && dataType == STRUCTURED && targetTable.isDefined) { + if (targetPartitions.nonEmpty) { + dataFrames.foreach(df => computeStatisticsForTablePartitions(df, targetTable.get, targetPartitions)) + } computeStatisticsForTable(targetTable) + } + } private def readInputData(targetSchema: StructType, spark: SparkSession, dfs: DFSWrapper): Vector[DataFrame] = { @@ -54,7 +63,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v buildHeaderFilePath(columnToRegexPairs, targetSchema, extractPathWithoutServerAndProtocol(inputPath.toString), headerDirPath) } - def getMapSchemaStructToPath = { + def getMapSchemaStructToPath: immutable.Iterable[Source] = { val mapSchemaStructToPath = groupedHeaderPathAndSourcePaths.toSeq.map { case (headerPath, sourcePaths) => getSchemaFromHeaderOrSource(fs, headerPath, sourcePaths, targetSchemaWithouttargetPartitions) }.groupBy(_._1).map { case (k, v) => (k, v.flatMap(_._2)) } @@ -71,7 +80,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v } } - val schemaAndSourcePath = if(!verifySchema) { + val schemaAndSourcePath = if (!verifySchema) { groupedHeaderPathAndSourcePaths.flatMap { case (headerPath, sourcePaths) => val schema = if (fs.exists(headerPath)) loadHeader(headerPath, fs) else targetSchemaWithouttargetPartitions sourcePaths.map { sourcePath => @@ -84,9 +93,10 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v schemaAndSourcePath.toSeq } - private def getSchemaFromHeaderOrSource(fs: FileSystem, headerPath: Path, sourcePaths: Seq[Path], targetSchemaWithouttargetPartitions: StructType): (StructType, Seq[Path]) ={ - val schema = if (fs.exists(headerPath)){ - loadHeader(headerPath, fs) } + private def getSchemaFromHeaderOrSource(fs: FileSystem, headerPath: Path, sourcePaths: Seq[Path], targetSchemaWithouttargetPartitions: StructType): (StructType, Seq[Path]) = { + val schema = if (fs.exists(headerPath)) { + loadHeader(headerPath, fs) + } else { inferSchemaFromSource(sourcePaths) } @@ -105,10 +115,10 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v } private def matchingSchemas_?(schemaFromInputData: StructType, targetSchema: StructType, paths: Seq[Path]): Boolean = { - val inputColumnsVector = schemaFromInputData.names.toVector - val targetColumnsVector = targetSchema.names.toVector + val inputColumnsVector = schemaFromInputData.names.toVector + val targetColumnsVector = targetSchema.names.toVector val diff = inputColumnsVector.diff(targetColumnsVector) - if(diff.nonEmpty) + if (diff.nonEmpty) logger.error(s"Inferred schema does not match the target schema for ${paths.toString}") diff.isEmpty } @@ -220,4 +230,5 @@ object AppendLoad { } protected case class Source(schema: StructType, inputFileLocation: String) + } diff --git a/src/main/scala/com/adidas/analytics/algo/FullLoad.scala b/src/main/scala/com/adidas/analytics/algo/FullLoad.scala index 0827eb4..f56c5fc 100644 --- a/src/main/scala/com/adidas/analytics/algo/FullLoad.scala +++ b/src/main/scala/com/adidas/analytics/algo/FullLoad.scala @@ -2,8 +2,8 @@ package com.adidas.analytics.algo import com.adidas.analytics.config.FullLoadConfiguration import com.adidas.analytics.algo.FullLoad._ -import com.adidas.analytics.algo.core.Algorithm -import com.adidas.analytics.algo.core.Algorithm.{ComputeTableStatisticsOperation, WriteOperation} +import com.adidas.analytics.algo.core.{Algorithm, TableStatistics} +import com.adidas.analytics.algo.core.Algorithm.WriteOperation import com.adidas.analytics.algo.shared.DateComponentDerivation import com.adidas.analytics.util.DFSWrapper._ import com.adidas.analytics.util.DataFormat.{DSVFormat, ParquetFormat} @@ -16,7 +16,7 @@ import scala.util.{Failure, Success, Try} final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val configLocation: String) - extends Algorithm with WriteOperation with FullLoadConfiguration with DateComponentDerivation with ComputeTableStatisticsOperation{ + extends Algorithm with WriteOperation with FullLoadConfiguration with DateComponentDerivation with TableStatistics { val currentHdfsDir: String = HiveTableAttributeReader(targetTable, spark).getTableLocation @@ -36,7 +36,7 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val withDatePartitions(dataFrames) } - override protected def write(dataFrames: Vector[DataFrame]): Unit = { + override protected def write(dataFrames: Vector[DataFrame]): Vector[DataFrame] = { Try{ super.write(dataFrames) } match { @@ -46,14 +46,23 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val recoverFailedWrite() cleanupDirectory(backupDir) throw new RuntimeException(exception.getMessage) - case Success(_) => + case Success(outputDaframe) => restoreTable() - if (computeTableStatistics && dataType == STRUCTURED) - computeStatisticsForTable(Option(targetTable)) + outputDaframe } } + override protected def updateStatistics(dataFrames: Vector[DataFrame]): Unit = { + if (computeTableStatistics && dataType == STRUCTURED) { + if(targetPartitions.nonEmpty) { + dataFrames.foreach(df => computeStatisticsForTablePartitions(df,targetTable, targetPartitions)) + } + computeStatisticsForTable(Option(targetTable)) + } + + } + private def createBackupTable(): Unit = { createDirectory(backupDir) diff --git a/src/main/scala/com/adidas/analytics/algo/core/Algorithm.scala b/src/main/scala/com/adidas/analytics/algo/core/Algorithm.scala index 1304336..da2b585 100644 --- a/src/main/scala/com/adidas/analytics/algo/core/Algorithm.scala +++ b/src/main/scala/com/adidas/analytics/algo/core/Algorithm.scala @@ -6,10 +6,15 @@ import com.adidas.analytics.util.{DFSWrapper, InputReader, OutputWriter} import org.apache.spark.sql.{DataFrame, SparkSession} import org.slf4j.{Logger, LoggerFactory} + /** * Base trait for algorithms that defines their base methods */ -trait Algorithm extends JobRunner with Serializable with BaseReadOperation with BaseWriteOperation { +trait Algorithm extends JobRunner + with Serializable + with BaseReadOperation + with BaseWriteOperation + with BaseUpdateStatisticsOperation { protected def spark: SparkSession @@ -32,7 +37,9 @@ trait Algorithm extends JobRunner with Serializable with BaseReadOperation with logger.info("Starting processing stage...") val result = transform(inputDateFrames) logger.info("Starting writing stage...") - write(result) + val outputResult = write(result) + logger.info("Starting computing statistics...") + updateStatistics(outputResult) } } @@ -54,6 +61,31 @@ object Algorithm { protected def read(): Vector[DataFrame] } + /** + * Base trait for update statistics operations + */ + trait BaseUpdateStatisticsOperation { + + /** + * Reads the produced output dataframe and update table statistics + * + * @return DataFrame written in writer() step + */ + protected def updateStatistics(dataFrames: Vector[DataFrame]): Unit + } + + /** + * The simplest implementation of update statistics + */ + trait UpdateStatisticsOperation extends BaseUpdateStatisticsOperation { + /** + * By default the Update Statistics are disabled for a given Algorithm + * @param dataFrames Dataframes to compute statistics + */ + override protected def updateStatistics(dataFrames: Vector[DataFrame]): Unit = logger.info("Skipping update statistics step!") + + } + /** * Base trait for write operations */ @@ -71,7 +103,7 @@ object Algorithm { * * @param dataFrames DataFrame to write */ - protected def write(dataFrames: Vector[DataFrame]): Unit + protected def write(dataFrames: Vector[DataFrame]): Vector[DataFrame] } /** @@ -107,8 +139,8 @@ object Algorithm { */ protected def writer: AtomicWriter - override protected def write(dataFrames: Vector[DataFrame]): Unit = { - dataFrames.foreach { df => + override protected def write(dataFrames: Vector[DataFrame]): Vector[DataFrame] = { + dataFrames.map { df => writer.writeWithBackup(dfs, outputFilesNum.map(df.repartition).getOrElse(df)) } } @@ -129,8 +161,8 @@ object Algorithm { */ protected def writer: OutputWriter - override protected def write(dataFrames: Vector[DataFrame]): Unit = { - dataFrames.foreach { df => + override protected def write(dataFrames: Vector[DataFrame]): Vector[DataFrame] = { + dataFrames.map { df => writer.write(dfs, outputFilesNum.map(df.repartition).getOrElse(df)) } } diff --git a/src/main/scala/com/adidas/analytics/algo/core/Metadata.scala b/src/main/scala/com/adidas/analytics/algo/core/Metadata.scala index 216d1b9..140f240 100644 --- a/src/main/scala/com/adidas/analytics/algo/core/Metadata.scala +++ b/src/main/scala/com/adidas/analytics/algo/core/Metadata.scala @@ -2,7 +2,10 @@ package com.adidas.analytics.algo.core import org.apache.spark.sql.DataFrame - +/** + * This is a generic trait for all strategies that will + * add new partitions on metadata table + */ trait Metadata { protected val tableName: String diff --git a/src/main/scala/com/adidas/analytics/algo/core/PartitionHelpers.scala b/src/main/scala/com/adidas/analytics/algo/core/PartitionHelpers.scala new file mode 100644 index 0000000..fc3b6f1 --- /dev/null +++ b/src/main/scala/com/adidas/analytics/algo/core/PartitionHelpers.scala @@ -0,0 +1,28 @@ +package com.adidas.analytics.algo.core + +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} + +/** + * This is a trait with generic logic to interact with dataframes on partition level + */ +trait PartitionHelpers { + + protected def getDistinctPartitions(outputDataFrame: DataFrame, targetPartitions: Seq[String]): Dataset[Row] = { + val targetPartitionsColumns: Seq[Column] = targetPartitions.map(partitionString => col(partitionString)) + + outputDataFrame.select(targetPartitionsColumns: _*).distinct + } + + protected def getParameterValue(row: Row, partitionString: String): String = + createParameterValue(row.get(row.fieldIndex(partitionString))) + + protected def createParameterValue(partitionRawValue: Any): String = + partitionRawValue match { + case value: java.lang.Short => value.toString + case value: java.lang.Integer => value.toString + case value: scala.Predef.String => "'" + value + "'" + case null => throw new Exception("Partition Value is null. No support for null partitions!") + case value => throw new Exception("Unsupported partition DataType: " + value.getClass) + } +} diff --git a/src/main/scala/com/adidas/analytics/algo/core/TableStatistics.scala b/src/main/scala/com/adidas/analytics/algo/core/TableStatistics.scala new file mode 100644 index 0000000..7d4efc3 --- /dev/null +++ b/src/main/scala/com/adidas/analytics/algo/core/TableStatistics.scala @@ -0,0 +1,48 @@ +package com.adidas.analytics.algo.core + +import org.apache.spark.sql._ +import scala.collection.JavaConversions._ + +/** + * This is a generic trait to use in the algorithms where we want + * to compute statistics on table and partition level + */ +trait TableStatistics extends PartitionHelpers { + + protected def spark: SparkSession + + /** + * will add statistics on partition level using HiveQL statements + */ + protected def computeStatisticsForTablePartitions(df: DataFrame, + targetTable: String, + targetPartitions: Seq[String]): Unit = { + + val distinctPartitions: DataFrame = getDistinctPartitions(df, targetPartitions) + + generateComputePartitionStatements(distinctPartitions, targetTable, targetPartitions) + .collectAsList() + .foreach((statement: String) => spark.sql(statement)) + + } + + /** + * will add statistics on table level using HiveQL statements + */ + protected def computeStatisticsForTable(tableName: Option[String]): Unit = tableName match { + case Some(table) => spark.sql(s"ANALYZE TABLE ${table} COMPUTE STATISTICS") + case None => Unit + } + + private def generateComputePartitionStatements(df: DataFrame, + targetTable: String, + targetPartitions: Seq[String]): Dataset[String] = { + df.map(partitionValue => { + val partitionStatementValues: Seq[String] = targetPartitions + .map(partitionColumn => s"${partitionColumn}=${getParameterValue(partitionValue, partitionColumn)}") + + s"ANALYZE TABLE ${targetTable} PARTITION(${partitionStatementValues.mkString(",")}) COMPUTE STATISTICS" + })(Encoders.STRING) + } + +} diff --git a/src/main/scala/com/adidas/analytics/config/AlgorithmTemplateConfiguration.scala b/src/main/scala/com/adidas/analytics/config/AlgorithmTemplateConfiguration.scala index 746d843..b6ce24f 100644 --- a/src/main/scala/com/adidas/analytics/config/AlgorithmTemplateConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/AlgorithmTemplateConfiguration.scala @@ -1,6 +1,6 @@ package com.adidas.analytics.config -import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation} +import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation, UpdateStatisticsOperation} import com.adidas.analytics.config.AlgorithmTemplateConfiguration.ruleToLocalDate import com.adidas.analytics.config.shared.{ConfigurationContext, MetadataUpdateStrategy} import com.adidas.analytics.util.DataFormat.ParquetFormat @@ -14,6 +14,7 @@ import org.joda.time.{Days, LocalDate} trait AlgorithmTemplateConfiguration extends ConfigurationContext with ReadOperation with SafeWriteOperation + with UpdateStatisticsOperation with MetadataUpdateStrategy { protected def spark: SparkSession diff --git a/src/main/scala/com/adidas/analytics/config/DeltaLoadConfiguration.scala b/src/main/scala/com/adidas/analytics/config/DeltaLoadConfiguration.scala index 02b7317..fee6815 100644 --- a/src/main/scala/com/adidas/analytics/config/DeltaLoadConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/DeltaLoadConfiguration.scala @@ -1,6 +1,6 @@ package com.adidas.analytics.config -import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation} +import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation, UpdateStatisticsOperation} import com.adidas.analytics.config.shared.{ConfigurationContext, DateComponentDerivationConfiguration, MetadataUpdateStrategy} import com.adidas.analytics.util.DataFormat.ParquetFormat import com.adidas.analytics.util.{DataFormat, InputReader, LoadMode, OutputWriter} @@ -8,7 +8,9 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SparkSession} -trait DeltaLoadConfiguration extends ConfigurationContext with MetadataUpdateStrategy { +trait DeltaLoadConfiguration extends ConfigurationContext + with UpdateStatisticsOperation + with MetadataUpdateStrategy { protected val activeRecordsTable: String = configReader.getAs[String]("active_records_table_lake") protected val deltaRecordsTable: Option[String] = configReader.getAsOption[String]("delta_records_table_lake") @@ -21,7 +23,12 @@ trait DeltaLoadConfiguration extends ConfigurationContext with MetadataUpdateStr protected val recordModeColumnName: String = "recordmode" protected val upsertRecordModes: Seq[String] = Seq("", "N") protected val upsertRecordsModesFilterFunction: Row => Boolean = { row: Row => - val recordmode = row.getAs[String](recordModeColumnName) + var recordmode = "" + try { + recordmode = row.getAs[String](recordModeColumnName) + } catch { + case _ => recordmode = row.getAs[String](recordModeColumnName.toUpperCase) + } recordmode == null || recordmode == "" || recordmode == "N" } } diff --git a/src/main/scala/com/adidas/analytics/config/FixedSizeStringExtractorConfiguration.scala b/src/main/scala/com/adidas/analytics/config/FixedSizeStringExtractorConfiguration.scala index 6a7b0f9..db59dc9 100644 --- a/src/main/scala/com/adidas/analytics/config/FixedSizeStringExtractorConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/FixedSizeStringExtractorConfiguration.scala @@ -1,6 +1,6 @@ package com.adidas.analytics.config -import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation} +import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation, UpdateStatisticsOperation} import com.adidas.analytics.config.FixedSizeStringExtractorConfiguration._ import com.adidas.analytics.config.shared.{ConfigurationContext, MetadataUpdateStrategy} import com.adidas.analytics.util.DataFormat.ParquetFormat @@ -15,6 +15,7 @@ import org.slf4j.{Logger, LoggerFactory} trait FixedSizeStringExtractorConfiguration extends ConfigurationContext with ReadOperation with SafeWriteOperation + with UpdateStatisticsOperation with MetadataUpdateStrategy { private val logger: Logger = LoggerFactory.getLogger(getClass) diff --git a/src/main/scala/com/adidas/analytics/config/FullLoadConfiguration.scala b/src/main/scala/com/adidas/analytics/config/FullLoadConfiguration.scala index fb9414f..9bd89e5 100644 --- a/src/main/scala/com/adidas/analytics/config/FullLoadConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/FullLoadConfiguration.scala @@ -4,7 +4,7 @@ import com.adidas.analytics.config.shared.{ConfigurationContext, DateComponentDe import com.adidas.analytics.util.DataFormat.ParquetFormat import com.adidas.analytics.util.{LoadMode, OutputWriter} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.util.PermissiveMode +import org.apache.spark.sql.catalyst.util.FailFastMode import org.apache.spark.sql.types.StructType @@ -38,5 +38,5 @@ trait FullLoadConfiguration extends ConfigurationContext with LoadConfiguration super.readNullValue.orElse(Some("XXNULLXXX")) } - override def loadMode: String = readerModeSetter(PermissiveMode.name) + override def loadMode: String = readerModeSetter(FailFastMode.name) } diff --git a/src/main/scala/com/adidas/analytics/config/NestedFlattenerConfiguration.scala b/src/main/scala/com/adidas/analytics/config/NestedFlattenerConfiguration.scala index 2664da1..71e8f00 100644 --- a/src/main/scala/com/adidas/analytics/config/NestedFlattenerConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/NestedFlattenerConfiguration.scala @@ -1,14 +1,18 @@ package com.adidas.analytics.config import com.adidas.analytics.config.shared.{ConfigurationContext, MetadataUpdateStrategy} -import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation} +import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation, UpdateStatisticsOperation} import com.adidas.analytics.util.DataFormat.ParquetFormat import com.adidas.analytics.util.{InputReader, LoadMode, OutputWriter} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType -trait NestedFlattenerConfiguration extends ConfigurationContext with ReadOperation with SafeWriteOperation with MetadataUpdateStrategy { +trait NestedFlattenerConfiguration extends ConfigurationContext + with ReadOperation + with SafeWriteOperation + with UpdateStatisticsOperation + with MetadataUpdateStrategy { protected def spark: SparkSession diff --git a/src/main/scala/com/adidas/analytics/config/PartitionMaterializationConfiguration.scala b/src/main/scala/com/adidas/analytics/config/PartitionMaterializationConfiguration.scala index f9b1efe..0ab7c9e 100644 --- a/src/main/scala/com/adidas/analytics/config/PartitionMaterializationConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/PartitionMaterializationConfiguration.scala @@ -1,6 +1,6 @@ package com.adidas.analytics.config -import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation} +import com.adidas.analytics.algo.core.Algorithm.{ReadOperation, SafeWriteOperation, UpdateStatisticsOperation} import com.adidas.analytics.config.shared.MetadataUpdateStrategy import com.adidas.analytics.util.DataFormat.ParquetFormat import com.adidas.analytics.util.DataFrameUtils.PartitionCriteria @@ -13,6 +13,7 @@ import org.joda.time.format.DateTimeFormat trait PartitionMaterializationConfiguration extends ReadOperation with SafeWriteOperation + with UpdateStatisticsOperation with MetadataUpdateStrategy { protected def configReader: ConfigReader diff --git a/src/main/scala/com/adidas/analytics/config/shared/LoadConfiguration.scala b/src/main/scala/com/adidas/analytics/config/shared/LoadConfiguration.scala index 623acdc..748ab59 100644 --- a/src/main/scala/com/adidas/analytics/config/shared/LoadConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/shared/LoadConfiguration.scala @@ -42,7 +42,7 @@ trait LoadConfiguration { protected def readQuoteValue: Option[String] = configReader.getAsOption[String]("quote_character") - protected def computeTableStatistics: Boolean = configReader.getAsOption[Boolean]("compute_table_statistics").getOrElse(false) + protected def computeTableStatistics: Boolean = configReader.getAsOption[Boolean]("compute_table_statistics").getOrElse(true) protected def readerModeSetter(defaultMode: String): String = { configReader.getAsOption[String]("reader_mode") match { diff --git a/src/main/scala/com/adidas/analytics/config/shared/MetadataUpdateStrategy.scala b/src/main/scala/com/adidas/analytics/config/shared/MetadataUpdateStrategy.scala index 6498d1c..891fbe3 100644 --- a/src/main/scala/com/adidas/analytics/config/shared/MetadataUpdateStrategy.scala +++ b/src/main/scala/com/adidas/analytics/config/shared/MetadataUpdateStrategy.scala @@ -1,17 +1,17 @@ package com.adidas.analytics.config.shared import com.adidas.analytics.algo.core.Metadata -import com.adidas.analytics.util.{SparkRecoverPartitionsCustom, SparkRecoverPartitionsNative} +import com.adidas.analytics.util.{RecoverPartitionsCustom, RecoverPartitionsNative} trait MetadataUpdateStrategy extends ConfigurationContext { protected def getMetaDataUpdateStrategy(targetTable: String, partitionColumns: Seq[String]): Metadata = configReader.getAsOption[String]("metadata_update_strategy") match { - case Some("SparkRecoverPartitionsNative") => SparkRecoverPartitionsNative(targetTable, partitionColumns) - case Some("SparkRecoverPartitionsCustom") => SparkRecoverPartitionsCustom(targetTable, partitionColumns) + case Some("SparkRecoverPartitionsNative") => RecoverPartitionsNative(targetTable, partitionColumns) + case Some("SparkRecoverPartitionsCustom") => RecoverPartitionsCustom(targetTable, partitionColumns) case Some(invalidConfig) => throw new Exception(s"Invalid metadata update strategy ${invalidConfig}") - case None => SparkRecoverPartitionsNative(targetTable, partitionColumns) + case None => RecoverPartitionsNative(targetTable, partitionColumns) } } diff --git a/src/main/scala/com/adidas/analytics/util/OutputWriter.scala b/src/main/scala/com/adidas/analytics/util/OutputWriter.scala index 87d3fcf..7b65dc4 100644 --- a/src/main/scala/com/adidas/analytics/util/OutputWriter.scala +++ b/src/main/scala/com/adidas/analytics/util/OutputWriter.scala @@ -6,6 +6,8 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{DataFrameWriter, _} import org.slf4j.{Logger, LoggerFactory} +import scala.util.{Failure, Success, Try} + /** * Base trait for classes which are capable of persisting DataFrames */ @@ -17,7 +19,7 @@ sealed abstract class OutputWriter { def options: Map[String, String] - def write(dfs: DFSWrapper, df: DataFrame): Unit + def write(dfs: DFSWrapper, df: DataFrame): DataFrame protected def getWriter(df: DataFrame): DataFrameWriter[Row] = { if (targetPartitions.nonEmpty) { @@ -87,9 +89,9 @@ object OutputWriter { def format: DataFormat - def writeWithBackup(dfs: DFSWrapper, df: DataFrame): Unit + def writeWithBackup(dfs: DFSWrapper, df: DataFrame): DataFrame - protected def writeUnsafe(dfs: DFSWrapper, df: DataFrame, finalLocation: String, loadMode: LoadMode): Unit = { + protected def writeUnsafe(dfs: DFSWrapper, df: DataFrame, finalLocation: String, loadMode: LoadMode): DataFrame = { val finalPath = new Path(finalLocation) val fs = dfs.getFileSystem(finalPath) if (loadMode == LoadMode.OverwriteTable) { @@ -98,50 +100,57 @@ object OutputWriter { write(fs, df, finalPath, loadMode) } - protected def writeSafe(dfs: DFSWrapper, df: DataFrame, finalLocation: String, loadMode: LoadMode): Unit = { - lazy val partitionsCriteria = df.collectPartitions(targetPartitions) - - val finalPath = new Path(finalLocation) - val fs = dfs.getFileSystem(finalPath) + protected def writeSafe(dfs: DFSWrapper, df: DataFrame, finalLocation: String, loadMode: LoadMode): DataFrame = { + Try { + lazy val partitionsCriteria = df.collectPartitions(targetPartitions) + + val finalPath = new Path(finalLocation) + val fs = dfs.getFileSystem(finalPath) + + val tempPath = HadoopLoadHelper.buildTempPath(finalPath) + val tempDataPath = new Path(tempPath, "data") + val tempBackupPath = new Path(tempPath, "backup") + + fs.delete(tempPath, true) + + loadMode match { + case LoadMode.OverwriteTable => + loadTable(fs, df, finalPath, tempDataPath, tempBackupPath) + case LoadMode.OverwritePartitions => + loadPartitions(fs, df, finalPath, tempDataPath, tempBackupPath, partitionsCriteria) + case LoadMode.OverwritePartitionsWithAddedColumns => + val existingDf = format.read(df.sparkSession.read, finalLocation) + val outputDf = df.addMissingColumns(existingDf.schema) + loadPartitions(fs, outputDf, finalPath, tempDataPath, tempBackupPath, partitionsCriteria) + case LoadMode.AppendJoinPartitions => + val isRequiredPartition = DataFrameUtils.buildPartitionsCriteriaMatcherFunc(partitionsCriteria, df.schema) + val existingDf = format.read(df.sparkSession.read, finalLocation).filter(isRequiredPartition) + val joinColumns = existingDf.columns.toSet intersect df.columns.toSet + val combinedDf = existingDf.join(df, joinColumns.toSeq, "FULL_OUTER") + loadPartitions(fs, combinedDf, finalPath, tempDataPath, tempBackupPath, partitionsCriteria) + case LoadMode.AppendUnionPartitions => + val isRequiredPartition = DataFrameUtils.buildPartitionsCriteriaMatcherFunc(partitionsCriteria, df.schema) + val existingDf = format.read(df.sparkSession.read, finalLocation).filter(isRequiredPartition) + val combinedDf = df.addMissingColumns(existingDf.schema).union(existingDf) + loadPartitions(fs, combinedDf, finalPath, tempDataPath, tempBackupPath, partitionsCriteria) + } - val tempPath = HadoopLoadHelper.buildTempPath(finalPath) - val tempDataPath = new Path(tempPath, "data") - val tempBackupPath = new Path(tempPath, "backup") - - fs.delete(tempPath, true) - - loadMode match { - case LoadMode.OverwriteTable => - loadTable(fs, df, finalPath, tempDataPath, tempBackupPath) - case LoadMode.OverwritePartitions => - loadPartitions(fs, df, finalPath, tempDataPath, tempBackupPath, partitionsCriteria) - case LoadMode.OverwritePartitionsWithAddedColumns => - val existingDf = format.read(df.sparkSession.read, finalLocation) - val outputDf = df.addMissingColumns(existingDf.schema) - loadPartitions(fs, outputDf, finalPath, tempDataPath, tempBackupPath, partitionsCriteria) - case LoadMode.AppendJoinPartitions => - val isRequiredPartition = DataFrameUtils.buildPartitionsCriteriaMatcherFunc(partitionsCriteria, df.schema) - val existingDf = format.read(df.sparkSession.read, finalLocation).filter(isRequiredPartition) - val joinColumns = existingDf.columns.toSet intersect df.columns.toSet - val combinedDf = existingDf.join(df, joinColumns.toSeq, "FULL_OUTER") - loadPartitions(fs, combinedDf, finalPath, tempDataPath, tempBackupPath, partitionsCriteria) - case LoadMode.AppendUnionPartitions => - val isRequiredPartition = DataFrameUtils.buildPartitionsCriteriaMatcherFunc(partitionsCriteria, df.schema) - val existingDf = format.read(df.sparkSession.read, finalLocation).filter(isRequiredPartition) - val combinedDf = df.addMissingColumns(existingDf.schema).union(existingDf) - loadPartitions(fs, combinedDf, finalPath, tempDataPath, tempBackupPath, partitionsCriteria) + fs.delete(tempPath, true) + } match { + case Failure(exception) => throw exception + case Success(_) => df } - fs.delete(tempPath, true) } - private def write(fs: FileSystem, df: DataFrame, finalPath: Path, loadMode: LoadMode): Unit = { - try { - val writer = getWriter(df).options(options).mode(loadMode.sparkMode) - format.write(writer, finalPath.toUri.toString) - logger.info(s"Data was successfully written to $finalPath") - } catch { - case e: Throwable => throw new RuntimeException(s"Unable to process data", e) + private def write(fs: FileSystem, df: DataFrame, finalPath: Path, loadMode: LoadMode): DataFrame = { + Try { + val writer = getWriter(df).options(options).mode(loadMode.sparkMode) + format.write(writer, finalPath.toUri.toString) + logger.info(s"Data was successfully written to $finalPath") + } match { + case Failure(exception) => throw new RuntimeException("Unable to process data", exception) + case Success(_) => df } } @@ -191,24 +200,30 @@ object OutputWriter { case class TableWriter(table: String, targetPartitions: Seq[String], options: Map[String, String], loadMode: LoadMode) extends OutputWriter { - override def write(dfs: DFSWrapper, df: DataFrame): Unit = { - logger.info(s"Writing data to table $table") - if (loadMode == LoadMode.OverwriteTable) { - val spark = df.sparkSession - spark.sql(s"TRUNCATE TABLE $table") + override def write(dfs: DFSWrapper, df: DataFrame): DataFrame = { + Try { + logger.info(s"Writing data to table $table") + if (loadMode == LoadMode.OverwriteTable) { + val spark = df.sparkSession + spark.sql(s"TRUNCATE TABLE $table") + } + getWriter(df).options(options).mode(loadMode.sparkMode).saveAsTable(table) + } match { + case Failure(exception) => throw exception + case Success(_) => df } - getWriter(df).options(options).mode(loadMode.sparkMode).saveAsTable(table) + } } case class FileSystemWriter(location: String, format: DataFormat, targetPartitions: Seq[String], options: Map[String, String], loadMode: LoadMode) extends AtomicWriter { - override def write(dfs: DFSWrapper, df: DataFrame): Unit = { + override def write(dfs: DFSWrapper, df: DataFrame): DataFrame = { writeUnsafe(dfs, df, location, loadMode) } - override def writeWithBackup(dfs: DFSWrapper, df: DataFrame): Unit = { + override def writeWithBackup(dfs: DFSWrapper, df: DataFrame): DataFrame = { writeSafe(dfs, df, location, loadMode) } } @@ -217,25 +232,30 @@ object OutputWriter { options: Map[String, String], loadMode: LoadMode, metadataConfiguration: Metadata) extends AtomicWriter { - override def write(dfs: DFSWrapper, df: DataFrame): Unit = { + override def write(dfs: DFSWrapper, df: DataFrame): DataFrame = { val spark = df.sparkSession val location = getTableLocation(spark) writeUnsafe(dfs, df, location, loadMode) - if (targetPartitions.nonEmpty){ - metadataConfiguration.recoverPartitions(df) - } else { - metadataConfiguration.refreshTable(df) - } + updatePartitionsMetadata(df) } - override def writeWithBackup(dfs: DFSWrapper, df: DataFrame): Unit = { + override def writeWithBackup(dfs: DFSWrapper, df: DataFrame): DataFrame = { val spark = df.sparkSession val location = getTableLocation(spark) writeSafe(dfs, df, location, loadMode) - if (targetPartitions.nonEmpty){ - metadataConfiguration.recoverPartitions(df) - } else { - metadataConfiguration.refreshTable(df) + updatePartitionsMetadata(df) + } + + private def updatePartitionsMetadata(df: DataFrame): DataFrame = { + Try { + if (targetPartitions.nonEmpty) { + metadataConfiguration.recoverPartitions(df) + } else { + metadataConfiguration.refreshTable(df) + } + } match { + case Failure(exception) => throw exception + case Success(_) => df } } diff --git a/src/main/scala/com/adidas/analytics/util/RecoverPartitionsCustom.scala b/src/main/scala/com/adidas/analytics/util/RecoverPartitionsCustom.scala new file mode 100644 index 0000000..30f8f87 --- /dev/null +++ b/src/main/scala/com/adidas/analytics/util/RecoverPartitionsCustom.scala @@ -0,0 +1,32 @@ +package com.adidas.analytics.util + +import com.adidas.analytics.algo.core.{PartitionHelpers, Metadata} +import org.apache.spark.sql._ + +import scala.collection.JavaConversions._ + +case class RecoverPartitionsCustom(override val tableName: String, + override val targetPartitions: Seq[String]) extends Metadata with PartitionHelpers{ + + override def recoverPartitions(outputDataFrame: DataFrame): Unit = { + val spark: SparkSession = outputDataFrame.sparkSession + + val distinctPartitions: DataFrame = getDistinctPartitions(outputDataFrame, targetPartitions) + + generateAddPartitionStatements(distinctPartitions) + .collectAsList() + .foreach((statement: String) => spark.sql(statement)) + + } + + private def generateAddPartitionStatements(df: DataFrame): Dataset[String] = { + df.map(partitionValue => { + val partitionStatementValues: Seq[String] = targetPartitions + .map(partitionColumn => s"${partitionColumn}=${getParameterValue(partitionValue, partitionColumn)}") + + s"ALTER TABLE ${tableName} ADD IF NOT EXISTS PARTITION(${partitionStatementValues.mkString(",")})" + })(Encoders.STRING) + } + + +} diff --git a/src/main/scala/com/adidas/analytics/util/RecoverPartitionsNative.scala b/src/main/scala/com/adidas/analytics/util/RecoverPartitionsNative.scala new file mode 100644 index 0000000..6b6d5d1 --- /dev/null +++ b/src/main/scala/com/adidas/analytics/util/RecoverPartitionsNative.scala @@ -0,0 +1,12 @@ +package com.adidas.analytics.util + +import com.adidas.analytics.algo.core.Metadata +import org.apache.spark.sql.DataFrame + +case class RecoverPartitionsNative(override val tableName: String, + override val targetPartitions: Seq[String]) extends Metadata { + + override def recoverPartitions(outputDataFrame: DataFrame): Unit = + outputDataFrame.sparkSession.catalog.recoverPartitions(tableName) + +} diff --git a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_1-3 b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_1-3 index 3a9df8c..18709e9 100644 --- a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_1-3 +++ b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_1-3 @@ -1,7 +1,7 @@ -salesorder|item|recordmode|date|customer|article|amount|year|month|day|hash -1|1||20160601|customer1|article1|100|2016|6|1|25477178d8d29a2f8f13d4355038f802e5018dc2d5fabeb894a79a86775c0321 -1|2||20160601|customer1|article2|200|2016|6|1|30ad9b1f4f6e505d14e37b0f89d14f02d40c2d4bc8b9f09ef32ca8751f4ac5ca -1|3||20160601|customer1|article3|50|2016|6|1|a4458112b638227532611317d9d603eec08bc3f1b6dd740804923d93252998f8 -2|1||20170215|customer3|article4|10|2017|2|15|a7c373227592ad12d700cc6b89704b3bf8321ee9fab8c2da1f4f72342d8d13fa -2|2||20170215|customer3|article5|50|2017|2|15|7b7f968369cbf75f472675271edeacac4b4662b632a5faf73b22b3c2012cf396 -2|3||20170215|customer3|article1|30|2017|2|15|57bbe3b59285bbd4eec69013b6aaa15189ae26f33105754148dbe257628878c2 +salesorder|item|recordmode|date|customer|article|amount|year|month|day +1|1||20160601|customer1|article1|100|2016|6|1 +1|2||20160601|customer1|article2|200|2016|6|1 +1|3||20160601|customer1|article3|50|2016|6|1 +2|1||20170215|customer3|article4|10|2017|2|15 +2|2||20170215|customer3|article5|50|2017|2|15 +2|3||20170215|customer3|article1|30|2017|2|15 diff --git a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_1-3.gz b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_1-3.gz index 743315c..06bedf6 100644 Binary files a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_1-3.gz and b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_1-3.gz differ diff --git a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_2-3 b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_2-3 index 195bf14..1c86786 100644 --- a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_2-3 +++ b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_2-3 @@ -1,8 +1,8 @@ -salesorder|item|recordmode|date|customer|article|amount|year|month|day|hash -3|1||20170215|customer1|article6|200|2017|2|15|30e1eec7c39412d7e56d5b6f928cfe9383573a387cb946a98b0aaab73ec0e1da -3|2||20170215|customer1|article2|120|2017|2|15|ac1cc0b2f83e8dc39f6afb958d7299e2d63a3c007e7a63c1be9e58c3b651d1f4 -3|3||20170215|customer1|article4|90|2017|2|15|8a4d62f2dee53c2b09361034aea44e51b4a64421839abd637698e745c0e74c09 -4|1||20170430|customer2|article3|80|2017|4|30|eb0f66b612de9208f3c5e10c6c1ed449a19ef423db4c6bbbf9c50b234515e22e -4|2||20170430|customer2|article7|70|2017|4|30|83e7bca789f2eddb132938f37dbf8c70f4d015d1eac33fe577f6dceb48864976 -4|3||20170430|customer2|article1|30|2017|4|30|68ce3936cc5df6b521cf681423b8acd2fb0bf652199d2661c88321cbdef61ae1 -4|4||20170430|customer2|article2|50|2017|4|30|2f8ea87a583752f80836a1dbf67686b14c0296a5f88f924e657cf2aea3ae4261 +salesorder|item|recordmode|date|customer|article|amount|year|month|day +3|1||20170215|customer1|article6|200|2017|2|15 +3|2||20170215|customer1|article2|120|2017|2|15 +3|3||20170215|customer1|article4|90|2017|2|15 +4|1||20170430|customer2|article3|80|2017|4|30 +4|2||20170430|customer2|article7|70|2017|4|30 +4|3||20170430|customer2|article1|30|2017|4|30 +4|4||20170430|customer2|article2|50|2017|4|30 diff --git a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_2-3.gz b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_2-3.gz index 3dba365..e329007 100644 Binary files a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_2-3.gz and b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_2-3.gz differ diff --git a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_3-3 b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_3-3 index f7862be..d0e2d76 100644 --- a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_3-3 +++ b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_3-3 @@ -1,7 +1,7 @@ -salesorder|item|recordmode|date|customer|article|amount|year|month|day|hash -5|1||20170510|customer4|article5|150|2017|5|10|cc209d0a174804a06d4bcb8758f230c6869c5483c5ba5e3507c10567bb06e507 -5|2||20170510|customer4|article3|100|2017|5|10|3aff52224ae9414524c20457a47eae2687d9c5febe7d5153d2699b84e0f48873 -5|3||20170510|customer4|article6|80|2017|5|10|6f71ddfc2bbfd6c12563b084a8e77a539e143571974a72e15bccd049707600cf -6|1||20170601|customer3|article4|100|2017|6|1|39a605e302e9cca64767529bd3c648e90cd08df1c6a7611c6a7791ff6a6f6b6a -6|2||20170601|customer3|article1|50|2017|6|1|45ebcff4087496d8e6e3efb0d864bdb0187b6ad7d5045782884aaa77bf0949c3 -6|3||20170601|customer3|article2|90|2017|6|1|2243192b36dcb31032aa80f1c8f80d1a33e63afb199b89681fe585e1954ab085 +salesorder|item|recordmode|date|customer|article|amount|year|month|day +5|1||20170510|customer4|article5|150|2017|5|10 +5|2||20170510|customer4|article3|100|2017|5|10 +5|3||20170510|customer4|article6|80|2017|5|10 +6|1||20170601|customer3|article4|100|2017|6|1 +6|2||20170601|customer3|article1|50|2017|6|1 +6|3||20170601|customer3|article2|90|2017|6|1 diff --git a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_3-3.gz b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_3-3.gz index 427dfed..13343ad 100644 Binary files a/src/test/resources/GzipDecompressorTest/data_20180719111849_data_3-3.gz and b/src/test/resources/GzipDecompressorTest/data_20180719111849_data_3-3.gz differ diff --git a/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/data_20180101-part-00000.psv b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/data_20180101-part-00000.psv new file mode 100644 index 0000000..7865741 --- /dev/null +++ b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/data_20180101-part-00000.psv @@ -0,0 +1,4 @@ +7|1|N|20180110|customer5|article2|120 +7|2|N|20180110|customer5|article4|180 +7|3|N|20180110|customer5|article1|220 +7|3|N|20180110|customer5|BAD RECORD \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/data_20180101-part-00001.psv b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/data_20180101-part-00001.psv new file mode 100644 index 0000000..a6ab327 --- /dev/null +++ b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/data_20180101-part-00001.psv @@ -0,0 +1,3 @@ +8|1|N|20180110|customer5|article2|200 +8|2|N|20180110|customer5|article4|80 +8|3|N|20180110|customer5|article1|20 \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/expected_partitions.txt b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/expected_partitions.txt new file mode 100644 index 0000000..79e3cee --- /dev/null +++ b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/expected_partitions.txt @@ -0,0 +1,6 @@ +year=2016/month=6/day=1 +year=2017/month=2/day=15 +year=2017/month=4/day=30 +year=2017/month=5/day=10 +year=2018/month=1/day=1 +year=2018/month=6/day=1 \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/expected_partitions_schema.json b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/expected_partitions_schema.json new file mode 100644 index 0000000..0f4cce7 --- /dev/null +++ b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/expected_partitions_schema.json @@ -0,0 +1,10 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "partition", + "type" : "string", + "nullable" : true, + "metadata" : { } + } + ] +} diff --git a/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/lake_data_post.psv b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/lake_data_post.psv new file mode 100644 index 0000000..ff625ff --- /dev/null +++ b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/lake_data_post.psv @@ -0,0 +1,25 @@ +1|1||20160601|customer1|article1|150|2016|6|1 +1|2||20160601|customer1|article2|200|2016|6|1 +1|3||20160601|customer1|article3|50|2016|6|1 +2|1||20170215|customer2|article4|10|2017|2|15 +2|2||20170215|customer2|article5|50|2017|2|15 +2|3||20170215|customer2|article1|30|2017|2|15 +3|1||20170215|customer1|article6|200|2017|2|15 +3|2||20170215|customer1|article2|120|2017|2|15 +3|3||20170215|customer1|article4|90|2017|2|15 +4|1||20170430|customer3|article3|80|2017|4|30 +4|2||20170430|customer3|article7|70|2017|4|30 +4|3||20170430|customer3|article1|30|2017|4|30 +4|4||20170430|customer3|article2|50|2017|4|30 +5|1||20170510|customer4|article5|150|2017|5|10 +5|2||20170510|customer4|article3|100|2017|5|10 +5|3||20170510|customer4|article6|80|2017|5|10 +6|1||20180601|customer2|article4|100|2018|6|1 +6|2||20180601|customer2|article1|50|2018|6|1 +6|3||20180601|customer2|article2|90|2018|6|1 +7|1|N|20180110|customer5|article2|120|2018|1|1 +7|2|N|20180110|customer5|article4|180|2018|1|1 +7|3|N|20180110|customer5|article1|220|2018|1|1 +8|1|N|20180110|customer5|article2|200|2018|1|1 +8|2|N|20180110|customer5|article4|80|2018|1|1 +8|3|N|20180110|customer5|article1|20|2018|1|1 \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/lake_data_pre.psv b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/lake_data_pre.psv new file mode 100644 index 0000000..b1bca8d --- /dev/null +++ b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/lake_data_pre.psv @@ -0,0 +1,19 @@ +1|1||20160601|customer1|article1|150|2016|6|1 +1|2||20160601|customer1|article2|200|2016|6|1 +1|3||20160601|customer1|article3|50|2016|6|1 +2|1||20170215|customer2|article4|10|2017|2|15 +2|2||20170215|customer2|article5|50|2017|2|15 +2|3||20170215|customer2|article1|30|2017|2|15 +3|1||20170215|customer1|article6|200|2017|2|15 +3|2||20170215|customer1|article2|120|2017|2|15 +3|3||20170215|customer1|article4|90|2017|2|15 +4|1||20170430|customer3|article3|80|2017|4|30 +4|2||20170430|customer3|article7|70|2017|4|30 +4|3||20170430|customer3|article1|30|2017|4|30 +4|4||20170430|customer3|article2|50|2017|4|30 +5|1||20170510|customer4|article5|150|2017|5|10 +5|2||20170510|customer4|article3|100|2017|5|10 +5|3||20170510|customer4|article6|80|2017|5|10 +6|1||20180601|customer2|article4|100|2018|6|1 +6|2||20180601|customer2|article1|50|2018|6|1 +6|3||20180601|customer2|article2|90|2018|6|1 \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/params.json b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/params.json new file mode 100644 index 0000000..c1ab748 --- /dev/null +++ b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/params.json @@ -0,0 +1,20 @@ +{ + "target_table": "test_lake.test_table", + "source_dir": "/tmp/tests/test_landing/test_table/data", + "header_dir": "/tmp/tests/test_landing/test_table/header", + "file_format": "dsv", + "delimiter": "|", + "metadata_update_strategy": "SparkRecoverPartitionsCustom", + "has_header": false, + "compute_table_statistics": true, + "regex_filename": [ + "([0-9]{4})(?=[0-9]{2})(?=[0-9]{2})", + "(?<=[0-9]{4})([0-9]{2})(?=[0-9]{2})", + "(?<=[0-9]{6})([0-9]{2})" + ], + "target_partitions": [ + "year", + "month", + "day" + ] +} \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/target_schema.json b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/target_schema.json new file mode 100644 index 0000000..bdd9036 --- /dev/null +++ b/src/test/resources/RecoverPartitionsCustomIntegrationTest/multiple_source_files/target_schema.json @@ -0,0 +1,54 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/data_20180101-part-00000.psv b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/data_20180101-part-00000.psv new file mode 100644 index 0000000..7865741 --- /dev/null +++ b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/data_20180101-part-00000.psv @@ -0,0 +1,4 @@ +7|1|N|20180110|customer5|article2|120 +7|2|N|20180110|customer5|article4|180 +7|3|N|20180110|customer5|article1|220 +7|3|N|20180110|customer5|BAD RECORD \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/data_20180101-part-00001.psv b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/data_20180101-part-00001.psv new file mode 100644 index 0000000..a6ab327 --- /dev/null +++ b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/data_20180101-part-00001.psv @@ -0,0 +1,3 @@ +8|1|N|20180110|customer5|article2|200 +8|2|N|20180110|customer5|article4|80 +8|3|N|20180110|customer5|article1|20 \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/expected_partitions.txt b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/expected_partitions.txt new file mode 100644 index 0000000..79e3cee --- /dev/null +++ b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/expected_partitions.txt @@ -0,0 +1,6 @@ +year=2016/month=6/day=1 +year=2017/month=2/day=15 +year=2017/month=4/day=30 +year=2017/month=5/day=10 +year=2018/month=1/day=1 +year=2018/month=6/day=1 \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/expected_partitions_schema.json b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/expected_partitions_schema.json new file mode 100644 index 0000000..0f4cce7 --- /dev/null +++ b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/expected_partitions_schema.json @@ -0,0 +1,10 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "partition", + "type" : "string", + "nullable" : true, + "metadata" : { } + } + ] +} diff --git a/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/lake_data_post.psv b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/lake_data_post.psv new file mode 100644 index 0000000..ff625ff --- /dev/null +++ b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/lake_data_post.psv @@ -0,0 +1,25 @@ +1|1||20160601|customer1|article1|150|2016|6|1 +1|2||20160601|customer1|article2|200|2016|6|1 +1|3||20160601|customer1|article3|50|2016|6|1 +2|1||20170215|customer2|article4|10|2017|2|15 +2|2||20170215|customer2|article5|50|2017|2|15 +2|3||20170215|customer2|article1|30|2017|2|15 +3|1||20170215|customer1|article6|200|2017|2|15 +3|2||20170215|customer1|article2|120|2017|2|15 +3|3||20170215|customer1|article4|90|2017|2|15 +4|1||20170430|customer3|article3|80|2017|4|30 +4|2||20170430|customer3|article7|70|2017|4|30 +4|3||20170430|customer3|article1|30|2017|4|30 +4|4||20170430|customer3|article2|50|2017|4|30 +5|1||20170510|customer4|article5|150|2017|5|10 +5|2||20170510|customer4|article3|100|2017|5|10 +5|3||20170510|customer4|article6|80|2017|5|10 +6|1||20180601|customer2|article4|100|2018|6|1 +6|2||20180601|customer2|article1|50|2018|6|1 +6|3||20180601|customer2|article2|90|2018|6|1 +7|1|N|20180110|customer5|article2|120|2018|1|1 +7|2|N|20180110|customer5|article4|180|2018|1|1 +7|3|N|20180110|customer5|article1|220|2018|1|1 +8|1|N|20180110|customer5|article2|200|2018|1|1 +8|2|N|20180110|customer5|article4|80|2018|1|1 +8|3|N|20180110|customer5|article1|20|2018|1|1 \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/lake_data_pre.psv b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/lake_data_pre.psv new file mode 100644 index 0000000..b1bca8d --- /dev/null +++ b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/lake_data_pre.psv @@ -0,0 +1,19 @@ +1|1||20160601|customer1|article1|150|2016|6|1 +1|2||20160601|customer1|article2|200|2016|6|1 +1|3||20160601|customer1|article3|50|2016|6|1 +2|1||20170215|customer2|article4|10|2017|2|15 +2|2||20170215|customer2|article5|50|2017|2|15 +2|3||20170215|customer2|article1|30|2017|2|15 +3|1||20170215|customer1|article6|200|2017|2|15 +3|2||20170215|customer1|article2|120|2017|2|15 +3|3||20170215|customer1|article4|90|2017|2|15 +4|1||20170430|customer3|article3|80|2017|4|30 +4|2||20170430|customer3|article7|70|2017|4|30 +4|3||20170430|customer3|article1|30|2017|4|30 +4|4||20170430|customer3|article2|50|2017|4|30 +5|1||20170510|customer4|article5|150|2017|5|10 +5|2||20170510|customer4|article3|100|2017|5|10 +5|3||20170510|customer4|article6|80|2017|5|10 +6|1||20180601|customer2|article4|100|2018|6|1 +6|2||20180601|customer2|article1|50|2018|6|1 +6|3||20180601|customer2|article2|90|2018|6|1 \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/params.json b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/params.json new file mode 100644 index 0000000..fb4148c --- /dev/null +++ b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/params.json @@ -0,0 +1,20 @@ +{ + "target_table": "test_lake.test_table", + "source_dir": "/tmp/tests/test_landing/test_table/data", + "header_dir": "/tmp/tests/test_landing/test_table/header", + "file_format": "dsv", + "delimiter": "|", + "metadata_update_strategy": "SparkRecoverPartitionsNative", + "compute_table_statistics": true, + "has_header": false, + "regex_filename": [ + "([0-9]{4})(?=[0-9]{2})(?=[0-9]{2})", + "(?<=[0-9]{4})([0-9]{2})(?=[0-9]{2})", + "(?<=[0-9]{6})([0-9]{2})" + ], + "target_partitions": [ + "year", + "month", + "day" + ] +} \ No newline at end of file diff --git a/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/target_schema.json b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/target_schema.json new file mode 100644 index 0000000..bdd9036 --- /dev/null +++ b/src/test/resources/RecoverPartitionsNativeIntegrationTest/multiple_source_files/target_schema.json @@ -0,0 +1,54 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/scala/com/adidas/analytics/integration/RecoverPartitionsCustomIntegrationTest.scala b/src/test/scala/com/adidas/analytics/integration/RecoverPartitionsCustomIntegrationTest.scala new file mode 100644 index 0000000..cd0cdb1 --- /dev/null +++ b/src/test/scala/com/adidas/analytics/integration/RecoverPartitionsCustomIntegrationTest.scala @@ -0,0 +1,76 @@ +package com.adidas.analytics.integration + +import com.adidas.utils.TestUtils._ +import com.adidas.analytics.algo.AppendLoad +import com.adidas.utils.FileReader +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.{Dataset, Encoders} +import org.scalatest.FeatureSpec +import org.scalatest.Matchers._ + +import scala.collection.JavaConverters._ + + +class RecoverPartitionsCustomIntegrationTest extends FeatureSpec with BaseIntegrationTest { + + feature("Partitions can be updated programmatically using custom logic") { + + scenario("Using Append Load Algorithm with multiple source files") { + val testResourceDir = "multiple_source_files" + val headerPath20180101 = new Path(headerDirPath, "year=2018/month=1/day=1/header.json") + val targetPath20180101 = new Path(targetDirPath, "year=2018/month=1/day=1") + + val targetSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema.json")).asInstanceOf[StructType] + val expectedPartitionsSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/expected_partitions_schema.json")).asInstanceOf[StructType] + val dataReader = FileReader.newDSVFileReader(Some(targetSchema)) + val expectedPartitionsDataReader = FileReader.newDSVFileReader(Some(expectedPartitionsSchema)) + + val targetTable = createTargetTable(testResourceDir, Seq("year", "month", "day"), targetSchema) + setupInitialState(targetTable, s"$testResourceDir/lake_data_pre.psv", dataReader) + prepareSourceData(testResourceDir, Seq("data_20180101-part-00000.psv", "data_20180101-part-00001.psv")) + uploadParameters(testResourceDir) + + // checking pre-conditions + spark.read.csv(sourceDirPath.toString).count() shouldBe 7 + targetTable.read().count() shouldBe 19 + + fs.exists(targetPath20180101) shouldBe false + fs.exists(headerPath20180101) shouldBe false + + // executing load + AppendLoad(spark, dfs, paramsFileHdfsPath.toString).run() + + // validating result + val expectedDataLocation = resolveResource(s"$testResourceDir/lake_data_post.psv", withProtocol = true) + val expectedPartitionsLocation = resolveResource(s"$testResourceDir/expected_partitions.txt", withProtocol = true) + val expectedDf = dataReader.read(spark, expectedDataLocation) + val actualDf = targetTable.read() + + val producedPartitionsNumber: Dataset[String] = spark + .sql(s"SHOW PARTITIONS ${targetDatabase}.${tableName}") + .as(Encoders.STRING) + + // MetaData Specific Tests + val expectedPartitions: Dataset[String] = expectedPartitionsDataReader + .read(spark, expectedPartitionsLocation) + .as(Encoders.STRING) + + expectedPartitions.collectAsList().asScala.sorted.toSet should + equal(producedPartitionsNumber.collectAsList().asScala.sorted.toSet) + + actualDf.hasDiff(expectedDf) shouldBe false + + spark + .sql(s"DESCRIBE extended ${targetDatabase}.${tableName} PARTITION(year=2018,month=1,day=1)") + .filter("col_name == 'Partition Statistics'") + .head() + .getAs[String]("data_type").contains("6 rows") shouldBe true + + fs.exists(targetPath20180101) shouldBe true + fs.exists(headerPath20180101) shouldBe true + } + } + + +} diff --git a/src/test/scala/com/adidas/analytics/integration/RecoverPartitionsNativeIntegrationTest.scala b/src/test/scala/com/adidas/analytics/integration/RecoverPartitionsNativeIntegrationTest.scala new file mode 100644 index 0000000..b6c2d96 --- /dev/null +++ b/src/test/scala/com/adidas/analytics/integration/RecoverPartitionsNativeIntegrationTest.scala @@ -0,0 +1,76 @@ +package com.adidas.analytics.integration + +import com.adidas.utils.TestUtils._ +import com.adidas.analytics.algo.AppendLoad +import com.adidas.utils.FileReader +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.{Dataset, Encoders} +import org.scalatest.FeatureSpec +import org.scalatest.Matchers._ + +import scala.collection.JavaConverters._ + + +class RecoverPartitionsNativeIntegrationTest extends FeatureSpec with BaseIntegrationTest { + + feature("Partitions can be updated with native spark.recoverPartitions()") { + + scenario("Using Append Load Algorithm with multiple source files") { + val testResourceDir = "multiple_source_files" + val headerPath20180101 = new Path(headerDirPath, "year=2018/month=1/day=1/header.json") + val targetPath20180101 = new Path(targetDirPath, "year=2018/month=1/day=1") + + val targetSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema.json")).asInstanceOf[StructType] + val expectedPartitionsSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/expected_partitions_schema.json")).asInstanceOf[StructType] + val dataReader = FileReader.newDSVFileReader(Some(targetSchema)) + val expectedPartitionsDataReader = FileReader.newDSVFileReader(Some(expectedPartitionsSchema)) + + val targetTable = createTargetTable(testResourceDir, Seq("year", "month", "day"), targetSchema) + setupInitialState(targetTable, s"$testResourceDir/lake_data_pre.psv", dataReader) + prepareSourceData(testResourceDir, Seq("data_20180101-part-00000.psv", "data_20180101-part-00001.psv")) + uploadParameters(testResourceDir) + + // checking pre-conditions + spark.read.csv(sourceDirPath.toString).count() shouldBe 7 + targetTable.read().count() shouldBe 19 + + fs.exists(targetPath20180101) shouldBe false + fs.exists(headerPath20180101) shouldBe false + + // executing load + AppendLoad(spark, dfs, paramsFileHdfsPath.toString).run() + + // validating result + val expectedDataLocation = resolveResource(s"$testResourceDir/lake_data_post.psv", withProtocol = true) + val expectedPartitionsLocation = resolveResource(s"$testResourceDir/expected_partitions.txt", withProtocol = true) + val expectedDf = dataReader.read(spark, expectedDataLocation) + val actualDf = targetTable.read() + + val producedPartitionsNumber: Dataset[String] = spark + .sql(s"SHOW PARTITIONS ${targetDatabase}.${tableName}") + .as(Encoders.STRING) + + // MetaData Specific Tests + val expectedPartitions: Dataset[String] = expectedPartitionsDataReader + .read(spark, expectedPartitionsLocation) + .as(Encoders.STRING) + + expectedPartitions.collectAsList().asScala.sorted.toSet should + equal(producedPartitionsNumber.collectAsList().asScala.sorted.toSet) + + actualDf.hasDiff(expectedDf) shouldBe false + + spark + .sql(s"DESCRIBE extended ${targetDatabase}.${tableName} PARTITION(year=2018,month=1,day=1)") + .filter("col_name == 'Partition Statistics'") + .head() + .getAs[String]("data_type").contains("6 rows") shouldBe true + + fs.exists(targetPath20180101) shouldBe true + fs.exists(headerPath20180101) shouldBe true + } + } + + +} diff --git a/src/test/scala/com/adidas/analytics/unit/DateComponentDerivationTest.scala b/src/test/scala/com/adidas/analytics/unit/DateComponentDerivationTest.scala index 092d721..a0d71fa 100644 --- a/src/test/scala/com/adidas/analytics/unit/DateComponentDerivationTest.scala +++ b/src/test/scala/com/adidas/analytics/unit/DateComponentDerivationTest.scala @@ -41,10 +41,10 @@ class DateComponentDerivationTest extends FunSuite with SparkSessionWrapper with val transformedDf = sampleDf.transform(dateComponentDerivationTester) val expectedDf = Seq( - ("201301",2013,1), - ("201531",2015,31), - ("202001",2020,1) - ).toDF("zatpweek","year","week") + ("201301", 2013, 1), + ("201531", 2015, 31), + ("202001", 2020, 1) + ).toDF("zatpweek", "year", "week") transformedDf.hasDiff(expectedDf) shouldBe false } @@ -54,23 +54,25 @@ class DateComponentDerivationTest extends FunSuite with SparkSessionWrapper with val sampleDf = Seq( ("20130112"), ("20150815"), - ("20200325") + ("20200325"), + ("20180110") ).toDF("partcol") val dateComponentDerivationTester: DataFrame => DataFrame = new DateComponentDerivationSubClass() .validateWithDateComponents( sourceDateColumnName = "partcol", sourceDateFormat = "yyyyMMdd", - targetDateComponentColumnNames = Seq("year", "month","day") + targetDateComponentColumnNames = Seq("year", "month", "day") ) val transformedDf = sampleDf.transform(dateComponentDerivationTester) val expectedDf = Seq( - ("20130112",2013,1,12), - ("20150815",2015,8,15), - ("20200325",2020,3,25) - ).toDF("partcol","year","month","day") + ("20130112", 2013, 1, 12), + ("20150815", 2015, 8, 15), + ("20200325", 2020, 3, 25), + ("20180110", 2018, 1, 10) + ).toDF("partcol", "year", "month", "day") transformedDf.hasDiff(expectedDf) shouldBe false } @@ -93,10 +95,10 @@ class DateComponentDerivationTest extends FunSuite with SparkSessionWrapper with val transformedDf = sampleDf.transform(dateComponentDerivationTester) val expectedDf = Seq( - ("20130112",2013,1), - ("20150815",2015,8), - ("20200325",2020,3) - ).toDF("partcol","year","month") + ("20130112", 2013, 1), + ("20150815", 2015, 8), + ("20200325", 2020, 3) + ).toDF("partcol", "year", "month") transformedDf.hasDiff(expectedDf) shouldBe false } @@ -119,10 +121,10 @@ class DateComponentDerivationTest extends FunSuite with SparkSessionWrapper with val transformedDf = sampleDf.transform(dateComponentDerivationTester) val expectedDf = Seq( - ("20130112",2013,1), - ("201508151",9999,99), - ("20200325",2020,3) - ).toDF("partcol","year","month") + ("20130112", 2013, 1), + ("201508151", 9999, 99), + ("20200325", 2020, 3) + ).toDF("partcol", "year", "month") transformedDf.hasDiff(expectedDf) shouldBe false } @@ -144,10 +146,10 @@ class DateComponentDerivationTest extends FunSuite with SparkSessionWrapper with val transformedDf = sampleDf.transform(dateComponentDerivationTester) val expectedDf = Seq( - (201301,2013,1), - (2015233,9999,99), - (202003,2020,3) - ).toDF("partcol","year","month") + (201301, 2013, 1), + (2015233, 9999, 99), + (202003, 2020, 3) + ).toDF("partcol", "year", "month") transformedDf.hasDiff(expectedDf) shouldBe false } @@ -170,9 +172,9 @@ class DateComponentDerivationTest extends FunSuite with SparkSessionWrapper with val transformedDf = sampleDf.transform(dateComponentDerivationTester) val expectedDf = Seq( - (2013014,2013,1,4), - (2015233,2015,23,3), - (2020037,2020,3,7) + (2013014, 2013, 1, 4), + (2015233, 2015, 23, 3), + (2020037, 2020, 3, 7) ).toDF("partcol", "year", "week", "day") transformedDf.hasDiff(expectedDf) shouldBe false diff --git a/src/test/scala/com/adidas/analytics/unit/RecoverPartitionsCustomTest.scala b/src/test/scala/com/adidas/analytics/unit/RecoverPartitionsCustomTest.scala new file mode 100644 index 0000000..c568ed4 --- /dev/null +++ b/src/test/scala/com/adidas/analytics/unit/RecoverPartitionsCustomTest.scala @@ -0,0 +1,99 @@ +package com.adidas.analytics.unit + +import com.adidas.analytics.util.RecoverPartitionsCustom +import com.adidas.utils.SparkSessionWrapper +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.{Dataset, Row} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers, PrivateMethodTester} + +import scala.collection.JavaConverters._ + +class RecoverPartitionsCustomTest extends FunSuite + with SparkSessionWrapper + with PrivateMethodTester + with Matchers + with BeforeAndAfterAll{ + + test("test conversion of String Value to HiveQL Partition Parameter") { + val customSparkRecoverPartitions = RecoverPartitionsCustom(tableName="", targetPartitions = Seq()) + val createParameterValue = PrivateMethod[String]('createParameterValue) + val result = customSparkRecoverPartitions invokePrivate createParameterValue("theValue") + + result should be("'theValue'") + } + + test("test conversion of Short Value to HiveQL Partition Parameter") { + val customSparkRecoverPartitions = RecoverPartitionsCustom(tableName="", targetPartitions = Seq()) + val createParameterValue = PrivateMethod[String]('createParameterValue) + val result = customSparkRecoverPartitions invokePrivate createParameterValue(java.lang.Short.valueOf("2")) + + result should be("2") + } + + test("test conversion of Integer Value to HiveQL Partition Parameter") { + val customSparkRecoverPartitions = RecoverPartitionsCustom(tableName="", targetPartitions = Seq()) + val createParameterValue = PrivateMethod[String]('createParameterValue) + val result = customSparkRecoverPartitions invokePrivate createParameterValue(java.lang.Integer.valueOf("4")) + + result should be("4") + } + + test("test conversion of null Value to HiveQL Partition Parameter") { + val customSparkRecoverPartitions = RecoverPartitionsCustom(tableName="", targetPartitions = Seq()) + val createParameterValue = PrivateMethod[String]('createParameterValue) + an [Exception] should be thrownBy { + customSparkRecoverPartitions invokePrivate createParameterValue(null) + } + } + + test("test conversion of not supported Value to HiveQL Partition Parameter") { + val customSparkRecoverPartitions = RecoverPartitionsCustom(tableName="", targetPartitions = Seq()) + val createParameterValue = PrivateMethod[String]('createParameterValue) + an [Exception] should be thrownBy { + customSparkRecoverPartitions invokePrivate createParameterValue(false) + } + } + + test("test HiveQL statements Generation") { + val customSparkRecoverPartitions = RecoverPartitionsCustom( + tableName="test", + targetPartitions = Seq("country","district") + ) + + val rowsInput = Seq( + Row(1, "portugal", "porto"), + Row(2, "germany", "herzogenaurach"), + Row(3, "portugal", "coimbra") + ) + + val inputSchema = StructType( + List( + StructField("number", IntegerType, nullable = true), + StructField("country", StringType, nullable = true), + StructField("district", StringType, nullable = true) + ) + ) + + val expectedStatements: Seq[String] = Seq( + "ALTER TABLE test ADD IF NOT EXISTS PARTITION(country='portugal',district='porto')", + "ALTER TABLE test ADD IF NOT EXISTS PARTITION(country='germany',district='herzogenaurach')", + "ALTER TABLE test ADD IF NOT EXISTS PARTITION(country='portugal',district='coimbra')" + ) + + val testDataset: Dataset[Row] = spark.createDataset(rowsInput)(RowEncoder(inputSchema)) + + val createParameterValue = PrivateMethod[Dataset[String]]('generateAddPartitionStatements) + + val producedStatements: Seq[String] = (customSparkRecoverPartitions invokePrivate createParameterValue(testDataset)) + .collectAsList() + .asScala + + expectedStatements.sorted.toSet should equal(producedStatements.sorted.toSet) + } + + override def afterAll(): Unit = { + spark.stop() + } + +}