diff --git a/src/main/scala/com/adidas/analytics/algo/AppendLoad.scala b/src/main/scala/com/adidas/analytics/algo/AppendLoad.scala index 7f11e67..e9bf054 100644 --- a/src/main/scala/com/adidas/analytics/algo/AppendLoad.scala +++ b/src/main/scala/com/adidas/analytics/algo/AppendLoad.scala @@ -7,7 +7,7 @@ import com.adidas.analytics.algo.core.Algorithm import com.adidas.analytics.algo.core.Algorithm.ComputeTableStatisticsOperation import com.adidas.analytics.config.AppendLoadConfiguration import com.adidas.analytics.util.DFSWrapper._ -import com.adidas.analytics.util.DataFormat.{DSVFormat, ParquetFormat} +import com.adidas.analytics.util.DataFormat.{DSVFormat, JSONFormat, ParquetFormat} import com.adidas.analytics.util.DataFrameUtils._ import com.adidas.analytics.util._ import org.apache.hadoop.fs.{FileSystem, Path} @@ -33,7 +33,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v override protected def write(dataFrames: Vector[DataFrame]): Unit = { writeHeaders(dataFrames, partitionColumns, headerDir, dfs) super.write(dataFrames) - if (computeTableStatistics) + if (computeTableStatistics && dataType == STRUCTURED) computeStatisticsForTable(targetTable) } @@ -50,14 +50,67 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v val targetSchemaWithoutPartitionColumns = getSchemaWithoutPartitionColumns(targetSchema, partitionColumns.toSet) logger.info(s"Looking for input files in $inputDirPath") - fs.ls(inputDirPath, recursive = true).groupBy { inputPath => + val groupedHeaderPathAndSourcePaths = fs.ls(inputDirPath, recursive = true).groupBy { inputPath => buildHeaderFilePath(columnToRegexPairs, targetSchema, extractPathWithoutServerAndProtocol(inputPath.toString), headerDirPath) - }.flatMap { case (headerPath, sourcePaths) => - val schema = if (fs.exists(headerPath)) loadHeader(headerPath, fs) else targetSchemaWithoutPartitionColumns - sourcePaths.map { sourcePath => - Source(schema, sourcePath.toString) + } + + def getMapSchemaStructToPath = { + val mapSchemaStructToPath = groupedHeaderPathAndSourcePaths.toSeq.map { case (headerPath, sourcePaths) => + getSchemaFromHeaderOrSource(fs, headerPath, sourcePaths, targetSchemaWithoutPartitionColumns) + }.groupBy(_._1).map { case (k, v) => (k, v.flatMap(_._2)) } + + val filteredMapSchemaStructToPath = mapSchemaStructToPath.filter(schemaFromInputData => matchingSchemas_?(schemaFromInputData._1, targetSchema, schemaFromInputData._2)) + + if (mapSchemaStructToPath.size != filteredMapSchemaStructToPath.size) + throw new RuntimeException("Schema does not match the input data for some of the input folders.") + + mapSchemaStructToPath.flatMap { case (schema, sourcePaths) => + sourcePaths.map { sourcePath => + Source(targetSchema, sourcePath.toString) + } + } + } + + val schemaAndSourcePath = if(!verifySchema) { + groupedHeaderPathAndSourcePaths.flatMap { case (headerPath, sourcePaths) => + val schema = if (fs.exists(headerPath)) loadHeader(headerPath, fs) else targetSchemaWithoutPartitionColumns + sourcePaths.map { sourcePath => + Source(schema, sourcePath.toString) + } } - }.toSeq + } else { + getMapSchemaStructToPath + } + schemaAndSourcePath.toSeq + } + + private def getSchemaFromHeaderOrSource(fs: FileSystem, headerPath: Path, sourcePaths: Seq[Path], targetSchemaWithoutPartitionColumns: StructType): (StructType, Seq[Path]) ={ + val schema = if (fs.exists(headerPath)){ + loadHeader(headerPath, fs) } + else { + inferSchemaFromSource(sourcePaths) + } + (schema, sourcePaths) + } + + private def inferSchemaFromSource(sourcePaths: Seq[Path]): StructType = { + val reader = spark.read.options(sparkReaderOptions) + val dataFormat = fileFormat match { + case "dsv" => DSVFormat() + case "parquet" => ParquetFormat() + case "json" => JSONFormat() + case anotherFormat => throw new RuntimeException(s"Unknown file format: $anotherFormat") + } + dataFormat.read(reader, sourcePaths.map(_.toString): _*).schema + } + + private def matchingSchemas_?(schemaFromInputData: StructType, targetSchema: StructType, paths: Seq[Path]): Boolean = { + val inputColumnsVector = schemaFromInputData.names.toVector + val targetColumnsVector = targetSchema.names.toVector + val diff = inputColumnsVector.diff(targetColumnsVector) + if(diff.nonEmpty) + logger.error(s"Inferred schema does not match the target schema for ${paths.toString}") + diff.isEmpty } private def readSources(sources: Seq[Source], fs: FileSystem, spark: SparkSession): Vector[DataFrame] = { @@ -70,6 +123,7 @@ final class AppendLoad protected(val spark: SparkSession, val dfs: DFSWrapper, v fileFormat match { case "dsv" => DSVFormat(Some(schema)).read(reader, inputPaths: _*) case "parquet" => ParquetFormat(Some(schema)).read(reader, inputPaths: _*) + case "json" => JSONFormat(Some(schema)).read(reader, inputPaths: _*) case anotherFormat => throw new RuntimeException(s"Unknown file format: $anotherFormat") } } diff --git a/src/main/scala/com/adidas/analytics/algo/FullLoad.scala b/src/main/scala/com/adidas/analytics/algo/FullLoad.scala index c5d494e..28b0af2 100644 --- a/src/main/scala/com/adidas/analytics/algo/FullLoad.scala +++ b/src/main/scala/com/adidas/analytics/algo/FullLoad.scala @@ -35,8 +35,8 @@ final class FullLoad protected(val spark: SparkSession, val dfs: DFSWrapper, val override protected def write(dataFrames: Vector[DataFrame]): Unit = { super.write(dataFrames) restoreTable() - if (computeTableStatistics) - computeStatisticsForTable(targetTable) + if (computeTableStatistics && dataType == STRUCTURED) + computeStatisticsForTable(Some(targetTable)) } private def createBackupTable(): Unit = { diff --git a/src/main/scala/com/adidas/analytics/algo/PartitionMaterialization.scala b/src/main/scala/com/adidas/analytics/algo/PartitionMaterialization.scala index cdc58c8..aa33787 100644 --- a/src/main/scala/com/adidas/analytics/algo/PartitionMaterialization.scala +++ b/src/main/scala/com/adidas/analytics/algo/PartitionMaterialization.scala @@ -31,11 +31,11 @@ object PartitionMaterialization { } def newRangeMaterialization(spark: SparkSession, dfs: DFSWrapper, configLocation: String): PartitionMaterialization = { - new RangeMaterialization(spark, dfs, LoadMode.OverwritePartitions, configLocation) + new RangeMaterialization(spark, dfs, LoadMode.OverwritePartitionsWithAddedColumns, configLocation) } def newQueryMaterialization(spark: SparkSession, dfs: DFSWrapper, configLocation: String): PartitionMaterialization = { - new QueryMaterialization(spark, dfs, LoadMode.OverwritePartitions, configLocation) + new QueryMaterialization(spark, dfs, LoadMode.OverwritePartitionsWithAddedColumns, configLocation) } private class FullMaterialization(val spark: SparkSession, val dfs: DFSWrapper, val loadMode: LoadMode, val configLocation: String) diff --git a/src/main/scala/com/adidas/analytics/algo/core/Algorithm.scala b/src/main/scala/com/adidas/analytics/algo/core/Algorithm.scala index 7ae61e0..993bb31 100644 --- a/src/main/scala/com/adidas/analytics/algo/core/Algorithm.scala +++ b/src/main/scala/com/adidas/analytics/algo/core/Algorithm.scala @@ -141,6 +141,9 @@ object Algorithm { protected def spark: SparkSession - protected def computeStatisticsForTable(tableName: String): Unit = spark.sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") + protected def computeStatisticsForTable(tableName: Option[String]): Unit = { + if (tableName.isDefined) + spark.sql(s"ANALYZE TABLE ${tableName.get} COMPUTE STATISTICS") + } } } diff --git a/src/main/scala/com/adidas/analytics/config/AlgorithmTemplateConfiguration.scala b/src/main/scala/com/adidas/analytics/config/AlgorithmTemplateConfiguration.scala index 8a0b09b..1fd8de1 100644 --- a/src/main/scala/com/adidas/analytics/config/AlgorithmTemplateConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/AlgorithmTemplateConfiguration.scala @@ -64,7 +64,7 @@ trait AlgorithmTemplateConfiguration extends ConfigurationContext with ReadOpera table = targetTable, format = ParquetFormat (Some (targetSchema) ), partitionColumns = Seq ("", "", ""), //If partitions are required, this would look like, e.g., Seq("year", "month") - loadMode = LoadMode.OverwritePartitions + loadMode = LoadMode.OverwritePartitionsWithAddedColumns ) } } diff --git a/src/main/scala/com/adidas/analytics/config/AppendLoadConfiguration.scala b/src/main/scala/com/adidas/analytics/config/AppendLoadConfiguration.scala index d0515af..589cee9 100644 --- a/src/main/scala/com/adidas/analytics/config/AppendLoadConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/AppendLoadConfiguration.scala @@ -5,7 +5,9 @@ import com.adidas.analytics.config.shared.{ConfigurationContext, LoadConfigurati import com.adidas.analytics.util.DataFormat.ParquetFormat import com.adidas.analytics.util.{LoadMode, OutputWriter} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{DataType, StructType} + +import scala.util.parsing.json.JSONObject trait AppendLoadConfiguration extends ConfigurationContext with LoadConfiguration with SafeWriteOperation { @@ -16,14 +18,53 @@ trait AppendLoadConfiguration extends ConfigurationContext with LoadConfiguratio protected val headerDir: String = configReader.getAs[String]("header_dir") + protected val targetTable: Option[String] = configReader.getAsOption[String]("target_table") + + // This option is used to specify whether the input data schema must be the same as target schema specified in the configuration file + // Note: if it is set to True, it will cause input data to be read more than once + private val verifySchemaOption: Option[Boolean] = configReader.getAsOption[Boolean]("verify_schema") + + protected val verifySchema: Boolean = dataType match { + case SEMISTRUCTURED => verifySchemaOption.getOrElse(true) + case _ => false + } + protected val columnToRegexPairs: Seq[(String, String)] = partitionColumns zip regexFilename - protected val targetSchema: StructType = spark.table(targetTable).schema + private val jsonSchemaOption: Option[JSONObject] = configReader.getAsOption[JSONObject]("schema") + + protected val targetSchema: StructType = getTargetSchema + + private val targetDir: Option[String] = configReader.getAsOption[String]("target_dir") + + override protected val writer: OutputWriter.AtomicWriter = dataType match { + case STRUCTURED if targetTable.isDefined => OutputWriter.newTableLocationWriter( + table = targetTable.get, + format = ParquetFormat(Some(targetSchema)), + partitionColumns = partitionColumns, + loadMode = LoadMode.OverwritePartitionsWithAddedColumns + ) + case SEMISTRUCTURED if targetDir.isDefined => OutputWriter.newFileSystemWriter( + location = targetDir.get, + format = ParquetFormat(Some(targetSchema)), + partitionColumns = partitionColumns, + loadMode = LoadMode.OverwritePartitions + ) + case anotherDataType => throw new RuntimeException(s"Unsupported data type: $anotherDataType in AppendLoad or the configuration file is malformed.") + } + + private def getTargetSchemaFromHiveTable: StructType = { + targetTable match { + case Some(tableName) => spark.table(tableName).schema + case None => throw new RuntimeException("No schema definition found.") + } + } - override protected val writer: OutputWriter.AtomicWriter = OutputWriter.newTableLocationWriter( - table = targetTable, - format = ParquetFormat(Some(targetSchema)), - partitionColumns = partitionColumns, - loadMode = LoadMode.OverwritePartitions - ) + private def getTargetSchema: StructType = { + dataType match { + case STRUCTURED => getTargetSchemaFromHiveTable + case SEMISTRUCTURED if jsonSchemaOption.isDefined => DataType.fromJson(jsonSchemaOption.get.toString()).asInstanceOf[StructType] + case anotherDataType => throw new RuntimeException(s"Unsupported data type: $anotherDataType in AppendLoad or the configuration file is malformed.") + } + } } diff --git a/src/main/scala/com/adidas/analytics/config/DeltaLoadConfiguration.scala b/src/main/scala/com/adidas/analytics/config/DeltaLoadConfiguration.scala index dd88726..abf250a 100644 --- a/src/main/scala/com/adidas/analytics/config/DeltaLoadConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/DeltaLoadConfiguration.scala @@ -49,7 +49,7 @@ object DeltaLoadConfiguration { table = activeRecordsTable, format = ParquetFormat(Some(targetSchema)), partitionColumns = partitionColumns, - loadMode = LoadMode.OverwritePartitions + loadMode = LoadMode.OverwritePartitionsWithAddedColumns ) } diff --git a/src/main/scala/com/adidas/analytics/config/FixedSizeStringExtractorConfiguration.scala b/src/main/scala/com/adidas/analytics/config/FixedSizeStringExtractorConfiguration.scala index 8c9df36..8693b80 100644 --- a/src/main/scala/com/adidas/analytics/config/FixedSizeStringExtractorConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/FixedSizeStringExtractorConfiguration.scala @@ -60,7 +60,7 @@ trait FixedSizeStringExtractorConfiguration extends ConfigurationContext with Re table = targetTable, format = ParquetFormat(Some(targetSchema)), partitionColumns = partitionColumnsOrdered, - loadMode = if (partitionColumnsOrdered.nonEmpty) LoadMode.OverwritePartitions else LoadMode.OverwriteTable + loadMode = if (partitionColumnsOrdered.nonEmpty) LoadMode.OverwritePartitionsWithAddedColumns else LoadMode.OverwriteTable ) } diff --git a/src/main/scala/com/adidas/analytics/config/FullLoadConfiguration.scala b/src/main/scala/com/adidas/analytics/config/FullLoadConfiguration.scala index a94bb64..7b27f11 100644 --- a/src/main/scala/com/adidas/analytics/config/FullLoadConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/FullLoadConfiguration.scala @@ -16,14 +16,20 @@ trait FullLoadConfiguration extends ConfigurationContext with LoadConfiguration protected val backupDir: String = configReader.getAs[String]("backup_dir") + protected val targetTable: String = configReader.getAs[String]("target_table") + protected val targetSchema: StructType = spark.table(targetTable).schema - protected val writer: OutputWriter.AtomicWriter = OutputWriter.newFileSystemWriter( - location = currentDir, - format = ParquetFormat(Some(targetSchema)), - partitionColumns = partitionColumns, - loadMode = LoadMode.OverwriteTable - ) + protected val writer: OutputWriter.AtomicWriter = dataType match { + case STRUCTURED => OutputWriter.newFileSystemWriter( + location = currentDir, + format = ParquetFormat(Some(targetSchema)), + partitionColumns = partitionColumns, + loadMode = LoadMode.OverwriteTable + ) + case anotherDataType => throw new RuntimeException(s"Unsupported data type: $anotherDataType for FullLoad.") + } + override protected val partitionSourceColumn: String = configReader.getAs[String]("partition_column") override protected val partitionSourceColumnFormat: String = configReader.getAs[String]("partition_column_format") diff --git a/src/main/scala/com/adidas/analytics/config/shared/LoadConfiguration.scala b/src/main/scala/com/adidas/analytics/config/shared/LoadConfiguration.scala index c77f2ed..c4e3654 100644 --- a/src/main/scala/com/adidas/analytics/config/shared/LoadConfiguration.scala +++ b/src/main/scala/com/adidas/analytics/config/shared/LoadConfiguration.scala @@ -4,9 +4,11 @@ import com.adidas.analytics.util.ConfigReader import org.apache.spark.sql.catalyst.util.{DropMalformedMode} trait LoadConfiguration { + val STRUCTURED = "structured" + val SEMISTRUCTURED = "semistructured" - private val fileDelimiter: String = configReader.getAs[String]("delimiter") - private val hasHeader: Boolean = configReader.getAs[Boolean]("has_header") + private val fileDelimiter: Option[String] = configReader.getAsOption[String]("delimiter") + private val hasHeader: Option[Boolean] = configReader.getAsOption[Boolean]("has_header") private val optionalSparkOptions: Map[String, String] = Map[String, Option[String]]( "nullValue" -> readNullValue, @@ -15,16 +17,19 @@ trait LoadConfiguration { case (key, Some(value)) => (key, value) } - private val requiredSparkOptions: Map[String, String] = Map[String, String]( + private val requiredSparkOptions: Map[String, String] = Map[String, Option[Any]]( "delimiter" -> fileDelimiter, - "header" -> hasHeader.toString, - "mode" -> loadMode - ) + "header" -> hasHeader, + "mode" -> Some(loadMode) + ).collect { + case (key, Some(value)) => (key, value.toString) + } + protected val partitionColumns: Seq[String] = configReader.getAsSeq[String]("partition_columns") protected val inputDir: String = configReader.getAs[String]("source_dir") - protected val targetTable: String = configReader.getAs[String]("target_table") protected val fileFormat: String = configReader.getAs[String]("file_format") + protected val dataType: String = configReader.getAsOption[String]("data_type").getOrElse(STRUCTURED) protected val sparkReaderOptions: Map[String, String] = requiredSparkOptions ++ optionalSparkOptions diff --git a/src/main/scala/com/adidas/analytics/util/DataFormat.scala b/src/main/scala/com/adidas/analytics/util/DataFormat.scala index c58a560..79b3b76 100644 --- a/src/main/scala/com/adidas/analytics/util/DataFormat.scala +++ b/src/main/scala/com/adidas/analytics/util/DataFormat.scala @@ -44,5 +44,19 @@ object DataFormat { writer.csv(location) } } + + case class JSONFormat(optionalSchema: Option[StructType] = None) extends DataFormat { + + override def read(reader: DataFrameReader, locations: String*): DataFrame = { + val filesString = locations.mkString(", ") + logger.info(s"Reading JSON data from $filesString") + optionalSchema.fold(reader.option("inferSchema", "true"))(schema => reader.schema(schema)).json(locations: _*) + } + + override def write(writer: DataFrameWriter[Row], location: String): Unit = { + logger.info(s"Writing JSON data to $location") + writer.json(location) + } + } } diff --git a/src/main/scala/com/adidas/analytics/util/LoadMode.scala b/src/main/scala/com/adidas/analytics/util/LoadMode.scala index 9f6cf42..043bf2d 100644 --- a/src/main/scala/com/adidas/analytics/util/LoadMode.scala +++ b/src/main/scala/com/adidas/analytics/util/LoadMode.scala @@ -16,6 +16,10 @@ object LoadMode { override val sparkMode: SaveMode = SaveMode.Overwrite } + case object OverwritePartitionsWithAddedColumns extends LoadMode { + override val sparkMode: SaveMode = SaveMode.Overwrite + } + case object AppendJoinPartitions extends LoadMode { override def sparkMode: SaveMode = SaveMode.Append } diff --git a/src/main/scala/com/adidas/analytics/util/OutputWriter.scala b/src/main/scala/com/adidas/analytics/util/OutputWriter.scala index 054637d..0f8dcb2 100644 --- a/src/main/scala/com/adidas/analytics/util/OutputWriter.scala +++ b/src/main/scala/com/adidas/analytics/util/OutputWriter.scala @@ -40,7 +40,7 @@ object OutputWriter { * @return TableWriter */ def newTableWriter(table: String, partitionColumns: Seq[String] = Seq.empty, options: Map[String, String] = Map.empty, - loadMode: LoadMode = LoadMode.OverwritePartitions): TableWriter = { + loadMode: LoadMode = LoadMode.OverwritePartitionsWithAddedColumns): TableWriter = { TableWriter(table, partitionColumns, options, loadMode) } @@ -55,7 +55,7 @@ object OutputWriter { * @return TableLocationWriter */ def newTableLocationWriter(table: String, format: DataFormat, partitionColumns: Seq[String] = Seq.empty, - options: Map[String, String] = Map.empty, loadMode: LoadMode = LoadMode.OverwritePartitions): TableLocationWriter = { + options: Map[String, String] = Map.empty, loadMode: LoadMode = LoadMode.OverwritePartitionsWithAddedColumns): TableLocationWriter = { TableLocationWriter(table, format, partitionColumns, options, loadMode) } @@ -70,7 +70,7 @@ object OutputWriter { * @return FileSystemWriter */ def newFileSystemWriter(location: String, format: DataFormat, partitionColumns: Seq[String] = Seq.empty, - options: Map[String, String] = Map.empty, loadMode: LoadMode = LoadMode.OverwritePartitions): FileSystemWriter = { + options: Map[String, String] = Map.empty, loadMode: LoadMode = LoadMode.OverwritePartitionsWithAddedColumns): FileSystemWriter = { FileSystemWriter(location, format, partitionColumns, options, loadMode) } @@ -114,6 +114,8 @@ object OutputWriter { case LoadMode.OverwriteTable => loadTable(fs, df, finalPath, tempDataPath, tempBackupPath) case LoadMode.OverwritePartitions => + loadPartitions(fs, df, finalPath, tempDataPath, tempBackupPath, partitionsCriteria) + case LoadMode.OverwritePartitionsWithAddedColumns => val existingDf = format.read(df.sparkSession.read, finalLocation) val outputDf = df.addMissingColumns(existingDf.schema) loadPartitions(fs, outputDf, finalPath, tempDataPath, tempBackupPath, partitionsCriteria) @@ -162,7 +164,7 @@ object OutputWriter { private def loadPartitions(fs: FileSystem, df: DataFrame, finalPath: Path, dataPath: Path, backupPath: Path, partitionsCriteria: Seq[Seq[(String, String)]]): Unit = { if (partitionsCriteria.nonEmpty) { - write(fs, df, dataPath, LoadMode.OverwritePartitions) + write(fs, df, dataPath, LoadMode.OverwritePartitionsWithAddedColumns) logger.info(s"Creating backup in $backupPath") val backupSpecs = HadoopLoadHelper.createMoveSpecs(fs, finalPath, backupPath, partitionsCriteria) diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/data-nodate-part-00001.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/data-nodate-part-00001.txt new file mode 100644 index 0000000..4641958 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/data-nodate-part-00001.txt @@ -0,0 +1,6 @@ +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/lake_data_post.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/lake_data_post.txt new file mode 100644 index 0000000..f323773 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/lake_data_post.txt @@ -0,0 +1,11 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6, "year": 2018, "month": 1, "day": 2} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/lake_data_pre.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/lake_data_pre.txt new file mode 100644 index 0000000..67a9c7c --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/lake_data_pre.txt @@ -0,0 +1,5 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/params.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/params.json new file mode 100644 index 0000000..75e0703 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/params.json @@ -0,0 +1,73 @@ +{ + "target_dir": "/tmp/tests/test_lake/test_table_semistructured/data", + "source_dir": "/tmp/tests/test_landing/test_table_semistructured/data", + "header_dir": "/tmp/tests/test_landing/test_table_semistructured/header", + "delimiter": "|", + "file_format": "json", + "has_header": false, + "regex_filename": [ + "year=([0-9]{4})(?=/month=[0-9]{2}/day=[0-9]{2})", + "(?<=year=[0-9]{4})/month=([0-9]{2})/(?=day=[0-9]{2})", + "(?<=year=[0-9]{4}/month=[0-9]{2})/day=([0-9]{2})" + ], + "partition_columns": [ + "year", + "month", + "day" + ], + "data_type": "semistructured", + "schema": { + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] + } +} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/target_schema.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/target_schema.json new file mode 100644 index 0000000..bdd9036 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load/target_schema.json @@ -0,0 +1,54 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/data-nodate-part-00001.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/data-nodate-part-00001.txt new file mode 100644 index 0000000..bc368d4 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/data-nodate-part-00001.txt @@ -0,0 +1,6 @@ +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2, "city": "Berlin"} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1, "city": "Berlin"} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1, "city": "Berlin"} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3, "city": "Berlin"} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1, "city": "Berlin"} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6, "city": "Berlin"} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/data-nodate-part-00002.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/data-nodate-part-00002.txt new file mode 100644 index 0000000..38f6741 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/data-nodate-part-00002.txt @@ -0,0 +1,6 @@ +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer1", "article": "article1", "amount": 2} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer2", "article": "article1", "amount": 1} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer3", "article": "article2", "amount": 1} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer4", "article": "article5", "amount": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer5", "article": "article1", "amount": 1} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer6", "article": "article6", "amount": 6} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/data-nodate-part-00003.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/data-nodate-part-00003.txt new file mode 100644 index 0000000..864e24b --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/data-nodate-part-00003.txt @@ -0,0 +1,6 @@ +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer1", "article": "article1", "amount": 2} +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer2", "article": "article1", "amount": 1} +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer3", "article": "article2", "amount": 1} +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer4", "article": "article5", "amount": 3} +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer5", "article": "article1", "amount": 1} +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer6", "article": "article6", "amount": 6} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/lake_data_post.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/lake_data_post.txt new file mode 100644 index 0000000..b5d2c9d --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/lake_data_post.txt @@ -0,0 +1,23 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6, "year": 2018, "month": 1, "day": 2} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer6", "article": "article6", "amount": 6, "year": 2018, "month": 1, "day": 3} +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 4} +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 4} +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 4} +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 4} +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 4} +{"salesorder": 4, "item": 100, "recordmode": "N", "date": 20180104, "customer": "customer6", "article": "article6", "amount": 6, "year": 2018, "month": 1, "day": 4} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/lake_data_pre.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/lake_data_pre.txt new file mode 100644 index 0000000..b6f1f21 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/lake_data_pre.txt @@ -0,0 +1,5 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "city": "Berlin", "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "city": "Berlin", "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "city": "Berlin", "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "city": "Berlin", "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "city": "Berlin", "year": 2018, "month": 1, "day": 1} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/params.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/params.json new file mode 100644 index 0000000..fd42e88 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/params.json @@ -0,0 +1,79 @@ +{ + "target_dir": "/tmp/tests/test_lake/test_table_semistructured/data", + "source_dir": "/tmp/tests/test_landing/test_table_semistructured/data", + "header_dir": "/tmp/tests/test_landing/test_table_semistructured/header", + "delimiter": "|", + "file_format": "json", + "has_header": false, + "regex_filename": [ + "year=([0-9]{4})(?=/month=[0-9]{2}/day=[0-9]{2})", + "(?<=year=[0-9]{4})/month=([0-9]{2})/(?=day=[0-9]{2})", + "(?<=year=[0-9]{4}/month=[0-9]{2})/day=([0-9]{2})" + ], + "partition_columns": [ + "year", + "month", + "day" + ], + "data_type": "semistructured", + "verify_schema": true, + "schema": { + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "city", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] + } +} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/params_column_dropped.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/params_column_dropped.json new file mode 100644 index 0000000..893624f --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/params_column_dropped.json @@ -0,0 +1,74 @@ +{ + "target_dir": "/tmp/tests/test_lake/test_table_semistructured/data", + "source_dir": "/tmp/tests/test_landing/test_table_semistructured/data", + "header_dir": "/tmp/tests/test_landing/test_table_semistructured/header", + "delimiter": "|", + "file_format": "json", + "has_header": false, + "regex_filename": [ + "year=([0-9]{4})(?=/month=[0-9]{2}/day=[0-9]{2})", + "(?<=year=[0-9]{4})/month=([0-9]{2})/(?=day=[0-9]{2})", + "(?<=year=[0-9]{4}/month=[0-9]{2})/day=([0-9]{2})" + ], + "partition_columns": [ + "year", + "month", + "day" + ], + "data_type": "semistructured", + "verify_schema": true, + "schema": { + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] + } +} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/target_schema.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/target_schema.json new file mode 100644 index 0000000..1ca24ab --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/target_schema.json @@ -0,0 +1,59 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "city", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/target_schema_column_dropped.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/target_schema_column_dropped.json new file mode 100644 index 0000000..bdd9036 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_dropping_column/target_schema_column_dropped.json @@ -0,0 +1,54 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/data-nodate-part-00001.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/data-nodate-part-00001.txt new file mode 100644 index 0000000..4641958 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/data-nodate-part-00001.txt @@ -0,0 +1,6 @@ +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/data-nodate-part-00002.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/data-nodate-part-00002.txt new file mode 100644 index 0000000..3c9e787 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/data-nodate-part-00002.txt @@ -0,0 +1,6 @@ +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer1", "article": "article1", "city": "Berlin", "amount": 2} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer2", "article": "article1", "city": "Berlin", "amount": 1} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer3", "article": "article2", "city": "Berlin", "amount": 1} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer4", "article": "article5", "city": "Berlin", "amount": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer5", "article": "article1", "city": "Berlin", "amount": 1} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer6", "article": "article6", "city": "Berlin", "amount": 6} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/lake_data_post.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/lake_data_post.txt new file mode 100644 index 0000000..e6b31da --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/lake_data_post.txt @@ -0,0 +1,17 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "city": null, "amount": 2, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "city": null, "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "city": null, "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "city": null, "amount": 3, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "city": null, "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "city": null, "amount": 2, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "city": null, "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "city": null, "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "city": null, "amount": 3, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "city": null, "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "city": null, "amount": 6, "year": 2018, "month": 1, "day": 2} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer1", "article": "article1", "city": "Berlin", "amount": 2, "year": 2018, "month": 1, "day": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer2", "article": "article1", "city": "Berlin", "amount": 1, "year": 2018, "month": 1, "day": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer3", "article": "article2", "city": "Berlin", "amount": 1, "year": 2018, "month": 1, "day": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer4", "article": "article5", "city": "Berlin", "amount": 3, "year": 2018, "month": 1, "day": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer5", "article": "article1", "city": "Berlin", "amount": 1, "year": 2018, "month": 1, "day": 3} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer6", "article": "article6", "city": "Berlin", "amount": 6, "year": 2018, "month": 1, "day": 3} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/lake_data_pre.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/lake_data_pre.txt new file mode 100644 index 0000000..67a9c7c --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/lake_data_pre.txt @@ -0,0 +1,5 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/params.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/params.json new file mode 100644 index 0000000..75e0703 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/params.json @@ -0,0 +1,73 @@ +{ + "target_dir": "/tmp/tests/test_lake/test_table_semistructured/data", + "source_dir": "/tmp/tests/test_landing/test_table_semistructured/data", + "header_dir": "/tmp/tests/test_landing/test_table_semistructured/header", + "delimiter": "|", + "file_format": "json", + "has_header": false, + "regex_filename": [ + "year=([0-9]{4})(?=/month=[0-9]{2}/day=[0-9]{2})", + "(?<=year=[0-9]{4})/month=([0-9]{2})/(?=day=[0-9]{2})", + "(?<=year=[0-9]{4}/month=[0-9]{2})/day=([0-9]{2})" + ], + "partition_columns": [ + "year", + "month", + "day" + ], + "data_type": "semistructured", + "schema": { + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] + } +} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/params_evolved.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/params_evolved.json new file mode 100644 index 0000000..a352e7f --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/params_evolved.json @@ -0,0 +1,78 @@ +{ + "target_dir": "/tmp/tests/test_lake/test_table_semistructured/data", + "source_dir": "/tmp/tests/test_landing/test_table_semistructured/data", + "header_dir": "/tmp/tests/test_landing/test_table_semistructured/header", + "delimiter": "|", + "file_format": "json", + "has_header": false, + "regex_filename": [ + "year=([0-9]{4})(?=/month=[0-9]{2}/day=[0-9]{2})", + "(?<=year=[0-9]{4})/month=([0-9]{2})/(?=day=[0-9]{2})", + "(?<=year=[0-9]{4}/month=[0-9]{2})/day=([0-9]{2})" + ], + "partition_columns": [ + "year", + "month", + "day" + ], + "data_type": "semistructured", + "schema": { + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "city", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] + } +} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/target_schema.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/target_schema.json new file mode 100644 index 0000000..bdd9036 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/target_schema.json @@ -0,0 +1,54 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/target_schema_evolved.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/target_schema_evolved.json new file mode 100644 index 0000000..1ca24ab --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_evolving_schema/target_schema_evolved.json @@ -0,0 +1,59 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "city", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/data-nodate-part-00001.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/data-nodate-part-00001.txt new file mode 100644 index 0000000..e9d9a0b --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/data-nodate-part-00001.txt @@ -0,0 +1,6 @@ +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer1", "article": "article1", "amount": 2, "city": "Berlin"} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer2", "article": "article1", "amount": 1, "city": "Berlin"} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer3", "article": "article2", "amount": 1, "city": "Berlin"} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer4", "article": "article5", "amount": 3, "city": "Berlin"} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer5", "article": "article1", "amount": 1, "city": "Berlin"} +{"salesorder": 3, "item": 100, "recordmode": "N", "date": 20180103, "customer": "customer6", "article": "article6", "amount": 6, "city": "Berlin"} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/lake_data_post.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/lake_data_post.txt new file mode 100644 index 0000000..76e6e30 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/lake_data_post.txt @@ -0,0 +1,5 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/lake_data_pre.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/lake_data_pre.txt new file mode 100644 index 0000000..b6f1f21 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/lake_data_pre.txt @@ -0,0 +1,5 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "city": "Berlin", "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "city": "Berlin", "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "city": "Berlin", "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "city": "Berlin", "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "city": "Berlin", "year": 2018, "month": 1, "day": 1} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/params.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/params.json new file mode 100644 index 0000000..893624f --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/params.json @@ -0,0 +1,74 @@ +{ + "target_dir": "/tmp/tests/test_lake/test_table_semistructured/data", + "source_dir": "/tmp/tests/test_landing/test_table_semistructured/data", + "header_dir": "/tmp/tests/test_landing/test_table_semistructured/header", + "delimiter": "|", + "file_format": "json", + "has_header": false, + "regex_filename": [ + "year=([0-9]{4})(?=/month=[0-9]{2}/day=[0-9]{2})", + "(?<=year=[0-9]{4})/month=([0-9]{2})/(?=day=[0-9]{2})", + "(?<=year=[0-9]{4}/month=[0-9]{2})/day=([0-9]{2})" + ], + "partition_columns": [ + "year", + "month", + "day" + ], + "data_type": "semistructured", + "verify_schema": true, + "schema": { + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] + } +} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/target_schema.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/target_schema.json new file mode 100644 index 0000000..bdd9036 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_mismatching_schema/target_schema.json @@ -0,0 +1,54 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/data-nodate-part-00001.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/data-nodate-part-00001.txt new file mode 100644 index 0000000..4641958 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/data-nodate-part-00001.txt @@ -0,0 +1,6 @@ +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/lake_data_pre.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/lake_data_pre.txt new file mode 100644 index 0000000..67a9c7c --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/lake_data_pre.txt @@ -0,0 +1,5 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/params.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/params.json new file mode 100644 index 0000000..f2baa21 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/params.json @@ -0,0 +1,20 @@ +{ + "source_dir": "/tmp/tests/test_landing/test_table_semistructured/data", + "header_dir": "/tmp/tests/test_landing/test_table_semistructured/header", + "delimiter": "|", + "file_format": "json", + "has_header": false, + "regex_filename": [ + "year=([0-9]{4})(?=/month=[0-9]{2}/day=[0-9]{2})", + "(?<=year=[0-9]{4})/month=([0-9]{2})/(?=day=[0-9]{2})", + "(?<=year=[0-9]{4}/month=[0-9]{2})/day=([0-9]{2})" + ], + "partition_columns": [ + "year", + "month", + "day" + ], + "target_table": "lake.test_table", + "target_dir": "/tmp/tests/test_lake/test_table_semistructured/data", + "data_type": "unstructured" +} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/target_schema.json b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/target_schema.json new file mode 100644 index 0000000..bdd9036 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_json_load_wrong_configuration/target_schema.json @@ -0,0 +1,54 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/20180101_schema.json b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/20180101_schema.json new file mode 100644 index 0000000..9d6a9d3 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/20180101_schema.json @@ -0,0 +1,39 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/data-nodate-part-00001.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/data-nodate-part-00001.txt new file mode 100644 index 0000000..7ce0b53 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/data-nodate-part-00001.txt @@ -0,0 +1,6 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer6", "article": "article6", "amount": 6} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/data-nodate-part-00002.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/data-nodate-part-00002.txt new file mode 100644 index 0000000..4641958 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/data-nodate-part-00002.txt @@ -0,0 +1,6 @@ +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/lake_data_post.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/lake_data_post.txt new file mode 100644 index 0000000..f7659c6 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/lake_data_post.txt @@ -0,0 +1,12 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer6", "article": "article6", "amount": 6, "year": 2018, "month": 1, "day": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6, "year": 2018, "month": 1, "day": 2} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/lake_data_pre.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/lake_data_pre.txt new file mode 100644 index 0000000..67a9c7c --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/lake_data_pre.txt @@ -0,0 +1,5 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/params.json b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/params.json new file mode 100644 index 0000000..75e0703 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/params.json @@ -0,0 +1,73 @@ +{ + "target_dir": "/tmp/tests/test_lake/test_table_semistructured/data", + "source_dir": "/tmp/tests/test_landing/test_table_semistructured/data", + "header_dir": "/tmp/tests/test_landing/test_table_semistructured/header", + "delimiter": "|", + "file_format": "json", + "has_header": false, + "regex_filename": [ + "year=([0-9]{4})(?=/month=[0-9]{2}/day=[0-9]{2})", + "(?<=year=[0-9]{4})/month=([0-9]{2})/(?=day=[0-9]{2})", + "(?<=year=[0-9]{4}/month=[0-9]{2})/day=([0-9]{2})" + ], + "partition_columns": [ + "year", + "month", + "day" + ], + "data_type": "semistructured", + "schema": { + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] + } +} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/target_schema.json b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/target_schema.json new file mode 100644 index 0000000..bdd9036 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_load_with_existing_header/target_schema.json @@ -0,0 +1,54 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/data-nodate-part-00001.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/data-nodate-part-00001.txt new file mode 100644 index 0000000..0f9b966 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/data-nodate-part-00001.txt @@ -0,0 +1,6 @@ +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6, "location": {"city": "Berlin", "country": "Germany"}} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/lake_data_post.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/lake_data_post.txt new file mode 100644 index 0000000..74128ee --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/lake_data_post.txt @@ -0,0 +1,11 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 2, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 2, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 2, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 2, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 2, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6, "year": 2018, "month": 1, "day": 2, "location": {"city": "Berlin", "country": "Germany"}} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/lake_data_pre.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/lake_data_pre.txt new file mode 100644 index 0000000..61c4573 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/lake_data_pre.txt @@ -0,0 +1,5 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1, "location": {"city": "Berlin", "country": "Germany"}} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1, "location": {"city": "Berlin", "country": "Germany"}} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/params.json b/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/params.json new file mode 100644 index 0000000..93df79d --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/params.json @@ -0,0 +1,91 @@ +{ + "target_dir": "/tmp/tests/test_lake/test_table_semistructured/data", + "source_dir": "/tmp/tests/test_landing/test_table_semistructured/data", + "header_dir": "/tmp/tests/test_landing/test_table_semistructured/header", + "delimiter": "|", + "file_format": "json", + "has_header": false, + "regex_filename": [ + "year=([0-9]{4})(?=/month=[0-9]{2}/day=[0-9]{2})", + "(?<=year=[0-9]{4})/month=([0-9]{2})/(?=day=[0-9]{2})", + "(?<=year=[0-9]{4}/month=[0-9]{2})/day=([0-9]{2})" + ], + "partition_columns": [ + "year", + "month", + "day" + ], + "data_type": "semistructured", + "schema": { + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "location", + "type" : { + "type" : "struct", + "fields" : [ { + "name" : "city", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "country", + "type" : "string", + "nullable" : true, + "metadata" : { } + } ] + }, + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] + } +} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/target_schema.json b/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/target_schema.json new file mode 100644 index 0000000..4a05649 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_nested_json_load/target_schema.json @@ -0,0 +1,72 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "location", + "type" : { + "type" : "struct", + "fields" : [ { + "name" : "city", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "country", + "type" : "string", + "nullable" : true, + "metadata" : { } + } ] + }, + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/data_20180422-00001.parquet b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/data_20180422-00001.parquet new file mode 100644 index 0000000..80d6202 Binary files /dev/null and b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/data_20180422-00001.parquet differ diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/lake_data_post.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/lake_data_post.txt new file mode 100644 index 0000000..f323773 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/lake_data_post.txt @@ -0,0 +1,11 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 2} +{"salesorder": 2, "item": 100, "recordmode": "N", "date": 20180102, "customer": "customer6", "article": "article6", "amount": 6, "year": 2018, "month": 1, "day": 2} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/lake_data_pre.txt b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/lake_data_pre.txt new file mode 100644 index 0000000..67a9c7c --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/lake_data_pre.txt @@ -0,0 +1,5 @@ +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer1", "article": "article1", "amount": 2, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer2", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer3", "article": "article2", "amount": 1, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer4", "article": "article5", "amount": 3, "year": 2018, "month": 1, "day": 1} +{"salesorder": 1, "item": 100, "recordmode": "N", "date": 20180101, "customer": "customer5", "article": "article1", "amount": 1, "year": 2018, "month": 1, "day": 1} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/params.json b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/params.json new file mode 100644 index 0000000..693fa18 --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/params.json @@ -0,0 +1,73 @@ +{ + "target_dir": "/tmp/tests/test_lake/test_table_semistructured/data", + "source_dir": "/tmp/tests/test_landing/test_table_semistructured/data", + "header_dir": "/tmp/tests/test_landing/test_table_semistructured/header", + "delimiter": "|", + "file_format": "parquet", + "has_header": true, + "regex_filename": [ + "year=([0-9]{4})(?=/month=[0-9]{2}/day=[0-9]{2})", + "(?<=year=[0-9]{4})/month=([0-9]{2})/(?=day=[0-9]{2})", + "(?<=year=[0-9]{4}/month=[0-9]{2})/day=([0-9]{2})" + ], + "partition_columns": [ + "year", + "month", + "day" + ], + "data_type": "semistructured", + "schema": { + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "article", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "amount", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] + } +} \ No newline at end of file diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/sales.parquet b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/sales.parquet new file mode 100644 index 0000000..aebedee Binary files /dev/null and b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/sales.parquet differ diff --git a/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/target_schema.json b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/target_schema.json new file mode 100644 index 0000000..12abdbc --- /dev/null +++ b/src/test/resources/SemiStructuredLoadTest/semistructured_parquet_test/target_schema.json @@ -0,0 +1,44 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "salesorder", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "item", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "recordmode", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "date", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "customer", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "year", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "month", + "type" : "short", + "nullable" : true, + "metadata" : { } + }, { + "name" : "day", + "type" : "short", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/src/test/scala/com/adidas/analytics/FileReader.scala b/src/test/scala/com/adidas/analytics/FileReader.scala index cbf30ad..bcc45ee 100644 --- a/src/test/scala/com/adidas/analytics/FileReader.scala +++ b/src/test/scala/com/adidas/analytics/FileReader.scala @@ -1,7 +1,7 @@ package com.adidas.analytics import com.adidas.analytics.util.DataFormat -import com.adidas.analytics.util.DataFormat.{DSVFormat, ParquetFormat} +import com.adidas.analytics.util.DataFormat.{DSVFormat, JSONFormat, ParquetFormat} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SparkSession} @@ -34,6 +34,10 @@ object FileReader { new FileReader(ParquetFormat(), Map.empty[String, String]) } + def newJsonFileReader(optionalSchema: Option[StructType] = None): FileReader = { + new FileReader(JSONFormat(optionalSchema), Map.empty[String, String]) + } + def apply(format: DataFormat, options: (String, String)*): FileReader = { new FileReader(format, options.toMap) } diff --git a/src/test/scala/com/adidas/analytics/Table.scala b/src/test/scala/com/adidas/analytics/Table.scala index 40e2aef..f84cfbc 100644 --- a/src/test/scala/com/adidas/analytics/Table.scala +++ b/src/test/scala/com/adidas/analytics/Table.scala @@ -136,6 +136,8 @@ object Table { statementBuilder += s"FIELDS TERMINATED BY '$delimiter'" case _: DataFormat.ParquetFormat => statementBuilder += "STORED AS PARQUET" + case anotherFormat => throw new RuntimeException(s"Unknown file format: $anotherFormat") + } statementBuilder += s"LOCATION '$location'" diff --git a/src/test/scala/com/adidas/analytics/algo/AlgorithmTemplateTest.scala b/src/test/scala/com/adidas/analytics/algo/AlgorithmTemplateTest.scala index 2156c82..a7f81ab 100644 --- a/src/test/scala/com/adidas/analytics/algo/AlgorithmTemplateTest.scala +++ b/src/test/scala/com/adidas/analytics/algo/AlgorithmTemplateTest.scala @@ -64,7 +64,7 @@ class AlgorithmTemplateTest extends FeatureSpec with BaseAlgorithmTest { private def setupInitialState(targetTable: Table, localDataFile: String, dataReader: FileReader): Unit = { val initialDataLocation = resolveResource(localDataFile, withProtocol = true) - targetTable.write(Seq(initialDataLocation), dataReader, LoadMode.OverwritePartitions) + targetTable.write(Seq(initialDataLocation), dataReader, LoadMode.OverwritePartitionsWithAddedColumns) } private def prepareDefaultSourceData(): Unit = { diff --git a/src/test/scala/com/adidas/analytics/algo/AppendLoadTest.scala b/src/test/scala/com/adidas/analytics/algo/AppendLoadTest.scala index c7ba572..717afaf 100644 --- a/src/test/scala/com/adidas/analytics/algo/AppendLoadTest.scala +++ b/src/test/scala/com/adidas/analytics/algo/AppendLoadTest.scala @@ -418,6 +418,6 @@ class AppendLoadTest extends FeatureSpec with BaseAlgorithmTest { private def setupInitialState(targetTable: Table, localDataFile: String, dataReader: FileReader): Unit = { val initialDataLocation = resolveResource(localDataFile, withProtocol = true) - targetTable.write(Seq(initialDataLocation), dataReader, LoadMode.OverwritePartitions) + targetTable.write(Seq(initialDataLocation), dataReader, LoadMode.OverwritePartitionsWithAddedColumns) } } diff --git a/src/test/scala/com/adidas/analytics/algo/DeltaLoadTest.scala b/src/test/scala/com/adidas/analytics/algo/DeltaLoadTest.scala index 3631c66..2acb6a2 100644 --- a/src/test/scala/com/adidas/analytics/algo/DeltaLoadTest.scala +++ b/src/test/scala/com/adidas/analytics/algo/DeltaLoadTest.scala @@ -171,7 +171,7 @@ class DeltaLoadTest extends FeatureSpec with BaseAlgorithmTest { tableBuilder.buildDSVTable(DFSWrapper(fs.getConf), spark, external = true) } - table.write(Seq(dataLocation), dsvReader, LoadMode.OverwritePartitions, fillNulls = true) + table.write(Seq(dataLocation), dsvReader, LoadMode.OverwritePartitionsWithAddedColumns, fillNulls = true) table } @@ -197,7 +197,7 @@ class DeltaLoadTest extends FeatureSpec with BaseAlgorithmTest { dsvFileName match { case Some(fileName) => val dataLocation = resolveResource(s"$testResourceDir/$fileName", withProtocol = true) - table.write(Seq(dataLocation), dsvReader, LoadMode.OverwritePartitions, fillNulls = true) + table.write(Seq(dataLocation), dsvReader, LoadMode.OverwritePartitionsWithAddedColumns, fillNulls = true) table case None => table } diff --git a/src/test/scala/com/adidas/analytics/algo/FullLoadTest.scala b/src/test/scala/com/adidas/analytics/algo/FullLoadTest.scala index 0f95875..d8666a4 100644 --- a/src/test/scala/com/adidas/analytics/algo/FullLoadTest.scala +++ b/src/test/scala/com/adidas/analytics/algo/FullLoadTest.scala @@ -274,6 +274,6 @@ class FullLoadTest extends FeatureSpec with BaseAlgorithmTest { private def setupInitialState(targetTable: Table, localDataFile: String, dataReader: FileReader): Unit = { val initialDataLocation = resolveResource(localDataFile, withProtocol = true) - targetTable.write(Seq(initialDataLocation), dataReader, LoadMode.OverwritePartitions) + targetTable.write(Seq(initialDataLocation), dataReader, LoadMode.OverwritePartitionsWithAddedColumns) } } diff --git a/src/test/scala/com/adidas/analytics/algo/PartitionMaterializationTest.scala b/src/test/scala/com/adidas/analytics/algo/PartitionMaterializationTest.scala index 3756f8f..eeb7cad 100644 --- a/src/test/scala/com/adidas/analytics/algo/PartitionMaterializationTest.scala +++ b/src/test/scala/com/adidas/analytics/algo/PartitionMaterializationTest.scala @@ -73,7 +73,7 @@ class PartitionMaterializationTest extends FeatureSpec with BaseAlgorithmTest { copyResourceFileToHdfs(s"$testDir/$paramsFileName", paramsFileHdfsPath) // adding data to target table - targetTable.write(Seq(getInitialDataFile(testDir)), fileReader, LoadMode.OverwritePartitions) + targetTable.write(Seq(getInitialDataFile(testDir)), fileReader, LoadMode.OverwritePartitionsWithAddedColumns) // check pre-conditions sourceTable.read().count() shouldBe 19 @@ -115,7 +115,7 @@ class PartitionMaterializationTest extends FeatureSpec with BaseAlgorithmTest { copyResourceFileToHdfs(s"$testDir/$paramsFileName", paramsFileHdfsPath) // add data to target table - targetTable.write(Seq(Seq(9999, 1111, "", 20170215, "CUSTOMER99", "ARTICLE", 99, 2017, 2, 15)), LoadMode.OverwritePartitions) + targetTable.write(Seq(Seq(9999, 1111, "", 20170215, "CUSTOMER99", "ARTICLE", 99, 2017, 2, 15)), LoadMode.OverwritePartitionsWithAddedColumns) // checking pre-conditions sourceTable.read().count() shouldBe 19 @@ -140,7 +140,7 @@ class PartitionMaterializationTest extends FeatureSpec with BaseAlgorithmTest { copyResourceFileToHdfs(s"$testDir/$paramsFileName", paramsFileHdfsPath) // adding data to target table - targetTable.write(Seq(getInitialDataFile(testDir)), fileReader, LoadMode.OverwritePartitions) + targetTable.write(Seq(getInitialDataFile(testDir)), fileReader, LoadMode.OverwritePartitionsWithAddedColumns) // check pre-conditions sourceTable.read().count() shouldBe 19 @@ -220,7 +220,7 @@ class PartitionMaterializationTest extends FeatureSpec with BaseAlgorithmTest { copyResourceFileToHdfs(s"$testDir/$paramsFileName", paramsFileHdfsPath) // add data to target table - targetTable.write(Seq(Seq(9999, 1111, "", 20170215, "CUSTOMER99", "ARTICLE", 99, 2017, 2, 15)), LoadMode.OverwritePartitions) + targetTable.write(Seq(Seq(9999, 1111, "", 20170215, "CUSTOMER99", "ARTICLE", 99, 2017, 2, 15)), LoadMode.OverwritePartitionsWithAddedColumns) // checking pre-conditions sourceTable.read().count() shouldBe 19 @@ -245,7 +245,7 @@ class PartitionMaterializationTest extends FeatureSpec with BaseAlgorithmTest { copyResourceFileToHdfs(s"$testDir/$paramsFileName", paramsFileHdfsPath) // adding data to target table - targetTable.write(Seq(getInitialDataFile(testDir)), fileReader, LoadMode.OverwritePartitions) + targetTable.write(Seq(getInitialDataFile(testDir)), fileReader, LoadMode.OverwritePartitionsWithAddedColumns) // check pre-conditions sourceTable.read().count() shouldBe 19 @@ -277,7 +277,7 @@ class PartitionMaterializationTest extends FeatureSpec with BaseAlgorithmTest { copyResourceFileToHdfs(s"output_files_3/$paramsFileName", paramsFileHdfsPath) // adding data to target table - targetTable.write(Seq(getInitialDataFile(testDir)), fileReader, LoadMode.OverwritePartitions) + targetTable.write(Seq(getInitialDataFile(testDir)), fileReader, LoadMode.OverwritePartitionsWithAddedColumns) // check pre-conditions sourceTable.read().count() shouldBe 19 @@ -308,7 +308,7 @@ class PartitionMaterializationTest extends FeatureSpec with BaseAlgorithmTest { copyResourceFileToHdfs(s"output_files_5/$paramsFileName", paramsFileHdfsPath) // adding data to target table - targetTable.write(Seq(getInitialDataFile(testDir)), fileReader, LoadMode.OverwritePartitions) + targetTable.write(Seq(getInitialDataFile(testDir)), fileReader, LoadMode.OverwritePartitionsWithAddedColumns) // check pre-conditions sourceTable.read().count() shouldBe 19 @@ -362,7 +362,7 @@ class PartitionMaterializationTest extends FeatureSpec with BaseAlgorithmTest { // create source table and add data to it sourceTable = createTable("source_table", sourceDatabase, partitions, "source_table_data", schema) - sourceTable.write(Seq(getSourceDataFile(testDir)), fileReader, LoadMode.OverwritePartitions) + sourceTable.write(Seq(getSourceDataFile(testDir)), fileReader, LoadMode.OverwritePartitionsWithAddedColumns) // create target table targetTable = createTable("target_table", targetDatabase, partitions, "target_table_data", schema) diff --git a/src/test/scala/com/adidas/analytics/algo/SemiStructuredLoadTest.scala b/src/test/scala/com/adidas/analytics/algo/SemiStructuredLoadTest.scala new file mode 100644 index 0000000..0b8c80c --- /dev/null +++ b/src/test/scala/com/adidas/analytics/algo/SemiStructuredLoadTest.scala @@ -0,0 +1,464 @@ +package com.adidas.analytics.algo + +import com.adidas.analytics.TestUtils._ +import com.adidas.analytics.util.DFSWrapper._ +import com.adidas.analytics.util.DataFormat.{JSONFormat, ParquetFormat} +import com.adidas.analytics.util.{DFSWrapper, LoadMode, OutputWriter} +import com.adidas.analytics.{FileReader, Table} +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.types.{DataType, StructType} +import org.scalatest.FeatureSpec +import org.scalatest.Matchers._ + + +class SemiStructuredLoadTest extends FeatureSpec with BaseAlgorithmTest { + + private val sourceDatabase: String = "test_landing" + private val targetDatabase: String = "test_lake" + private val tableName: String = "test_table" + + private val paramsFileName: String = "params.json" + private val paramsFileHdfsPath: Path = new Path(hdfsRootTestPath, paramsFileName) + + private val sourceDirPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableName/data") + private val headerDirPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableName/header") + private val targetDirPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableName") + + + feature("Data can be loaded from source to target with append mode") { + scenario("SemiStructured Data can be loaded with append mode by creating partitions from full path") { + val tableNameJson: String = "test_table_semistructured" + val paramsFileModdedRegexHdfsPath: Path = new Path(hdfsRootTestPath, paramsFileName) + val sourceDirFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=02/") + val targetDirFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson/data") + val headerDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/header") + val targetDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson") + + fs.mkdirs(sourceDirFullPath) + fs.mkdirs(headerDirPathPartFromFullPath) + fs.mkdirs(targetDirPathPartFromFullPath) + fs.mkdirs(targetDirFullPath) + + val testResourceDir = "semistructured_json_load" + val headerPath20180102 = new Path(headerDirPathPartFromFullPath, "year=2018/month=1/day=2/header.json") + val targetPath20180102 = new Path(targetDirFullPath, "year=2018/month=1/day=2") + + val targetSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema.json")).asInstanceOf[StructType] + val targetTableLocation = fs.makeQualified(new Path(hdfsRootTestPath, targetDirPathPartFromFullPath)).toString + + val dataReader = FileReader.newJsonFileReader(Some(targetSchema)) + val dataFormat = ParquetFormat() + val dataWriter = OutputWriter.newFileSystemWriter(targetDirFullPath.toString, dataFormat, Seq("year", "month", "day")) + setupInitialState(s"$testResourceDir/lake_data_pre.txt", dataReader, dataWriter) + prepareSourceData(testResourceDir, Seq("data-nodate-part-00001.txt"), sourceDirFullPath) + uploadParameters(testResourceDir, paramsFileName, paramsFileModdedRegexHdfsPath) + + // checking pre-conditions + spark.read.json(sourceDirFullPath.toString).count() shouldBe 6 + spark.read.parquet(targetTableLocation).count() shouldBe 5 + + fs.exists(targetPath20180102) shouldBe false + fs.exists(headerPath20180102) shouldBe false + + // executing load + AppendLoad(spark, dfs, paramsFileModdedRegexHdfsPath.toString).run() + + // validating result + val expectedDataLocation = resolveResource(s"$testResourceDir/lake_data_post.txt", withProtocol = true) + val expectedDf = dataReader.read(spark, expectedDataLocation) + val actualDf = spark.read.schema(targetSchema).parquet(targetTableLocation) + actualDf.hasDiff(expectedDf) shouldBe false + + fs.exists(targetPath20180102) shouldBe true + fs.exists(headerPath20180102) shouldBe true + } + + scenario("Nested SemiStructured Data can be loaded with append mode by creating partitions from full path") { + val tableNameJson: String = "test_table_semistructured" + val paramsFileModdedRegexHdfsPath: Path = new Path(hdfsRootTestPath, paramsFileName) + val sourceDirFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=02/") + val targetDirFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson/data") + val headerDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/header") + val targetDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson") + + fs.mkdirs(sourceDirFullPath) + fs.mkdirs(headerDirPathPartFromFullPath) + fs.mkdirs(targetDirPathPartFromFullPath) + fs.mkdirs(targetDirFullPath) + + val testResourceDir = "semistructured_nested_json_load" + val headerPath20180102 = new Path(headerDirPathPartFromFullPath, "year=2018/month=1/day=2/header.json") + val targetPath20180102 = new Path(targetDirFullPath, "year=2018/month=1/day=2") + + val targetSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema.json")).asInstanceOf[StructType] + val targetTableLocation = fs.makeQualified(new Path(hdfsRootTestPath, targetDirPathPartFromFullPath)).toString + + val dataReader = FileReader.newJsonFileReader(Some(targetSchema)) + val dataFormat = ParquetFormat() + val dataWriter = OutputWriter.newFileSystemWriter(targetDirFullPath.toString, dataFormat, Seq("year", "month", "day")) + setupInitialState(s"$testResourceDir/lake_data_pre.txt", dataReader, dataWriter) + prepareSourceData(testResourceDir, Seq("data-nodate-part-00001.txt"), sourceDirFullPath) + uploadParameters(testResourceDir, paramsFileName, paramsFileModdedRegexHdfsPath) + + // checking pre-conditions + spark.read.json(sourceDirFullPath.toString).count() shouldBe 6 + spark.read.parquet(targetTableLocation).count() shouldBe 5 + + fs.exists(targetPath20180102) shouldBe false + fs.exists(headerPath20180102) shouldBe false + + // executing load + AppendLoad(spark, dfs, paramsFileModdedRegexHdfsPath.toString).run() + + // validating result + val expectedDataLocation = resolveResource(s"$testResourceDir/lake_data_post.txt", withProtocol = true) + val expectedDf = dataReader.read(spark, expectedDataLocation) + val actualDf = spark.read.schema(targetSchema).parquet(targetTableLocation) + actualDf.hasDiff(expectedDf) shouldBe false + + fs.exists(targetPath20180102) shouldBe true + fs.exists(headerPath20180102) shouldBe true + } + + scenario("SemiStructured Parquet Data can be loaded with append mode by creating partitions from full path") { + val tableNameJson: String = "test_table_semistructured" + val paramsFileModdedRegexHdfsPath: Path = new Path(hdfsRootTestPath, paramsFileName) + val sourceDirFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=02/") + val targetDirFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson/data") + val headerDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/header") + val targetDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson") + + fs.mkdirs(sourceDirFullPath) + fs.mkdirs(headerDirPathPartFromFullPath) + fs.mkdirs(targetDirPathPartFromFullPath) + fs.mkdirs(targetDirFullPath) + + val testResourceDir = "semistructured_parquet_test" + val headerPath20180102 = new Path(headerDirPathPartFromFullPath, "year=2018/month=1/day=2/header.json") + val targetPath20180102 = new Path(targetDirFullPath, "year=2018/month=1/day=2") + + val targetSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema.json")).asInstanceOf[StructType] + val targetTableLocation = fs.makeQualified(new Path(hdfsRootTestPath, targetDirPathPartFromFullPath)).toString + + val dataReader = FileReader.newJsonFileReader(Some(targetSchema)) + val dataFormat = ParquetFormat(Some(targetSchema)) + val dataWriter = OutputWriter.newFileSystemWriter(targetDirFullPath.toString, dataFormat, Seq("year", "month", "day")) + setupInitialState(s"$testResourceDir/lake_data_pre.txt", dataReader, dataWriter) + prepareSourceData(testResourceDir, Seq("sales.parquet"), sourceDirFullPath) + uploadParameters(testResourceDir, paramsFileName, paramsFileModdedRegexHdfsPath) + + // checking pre-conditions + spark.read.parquet(sourceDirFullPath.toString).count() shouldBe 6 + spark.read.parquet(targetTableLocation).count() shouldBe 5 + + fs.exists(targetPath20180102) shouldBe false + fs.exists(headerPath20180102) shouldBe false + + // executing load + AppendLoad(spark, dfs, paramsFileModdedRegexHdfsPath.toString).run() + + // validating result + val expectedDataLocation = resolveResource(s"$testResourceDir/lake_data_post.txt", withProtocol = true) + val expectedDf = dataReader.read(spark, expectedDataLocation) + val actualDf = spark.read.schema(targetSchema).parquet(targetTableLocation) + actualDf.hasDiff(expectedDf) shouldBe false + + + fs.exists(targetPath20180102) shouldBe true + fs.exists(headerPath20180102) shouldBe true + } + + scenario("SemiStructured Data can be loaded with append mode with evolving schema") { + val tableNameJson: String = "test_table_semistructured" + val paramsFileModdedRegexHdfsPath: Path = new Path(hdfsRootTestPath, paramsFileName) + val sourceDirFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=02/") + val targetDirFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson/data") + val headerDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/header") + val targetDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson") + + fs.mkdirs(sourceDirFullPath) + fs.mkdirs(headerDirPathPartFromFullPath) + fs.mkdirs(targetDirPathPartFromFullPath) + fs.mkdirs(targetDirFullPath) + + val testResourceDir = "semistructured_json_load_evolving_schema" + val headerPath20180102 = new Path(headerDirPathPartFromFullPath, "year=2018/month=1/day=2/header.json") + val targetPath20180102 = new Path(targetDirFullPath, "year=2018/month=1/day=2") + + val targetSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema.json")).asInstanceOf[StructType] + val targetTableLocation = fs.makeQualified(new Path(hdfsRootTestPath, targetDirPathPartFromFullPath)).toString + + val dataReader = FileReader.newJsonFileReader(Some(targetSchema)) + val dataFormat = ParquetFormat() + val dataWriter = OutputWriter.newFileSystemWriter(targetDirFullPath.toString, dataFormat, Seq("year", "month", "day")) + setupInitialState(s"$testResourceDir/lake_data_pre.txt", dataReader, dataWriter) + prepareSourceData(testResourceDir, Seq("data-nodate-part-00001.txt"), sourceDirFullPath) + uploadParameters(testResourceDir, paramsFileName, paramsFileModdedRegexHdfsPath) + + // checking pre-conditions + spark.read.json(sourceDirFullPath.toString).count() shouldBe 6 + spark.read.parquet(targetTableLocation).count() shouldBe 5 + + // executing load + AppendLoad(spark, dfs, paramsFileModdedRegexHdfsPath.toString).run() + + // Executing append load with the evolved schema + val sourceDirFullPath20180103: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=03/") + prepareSourceData(testResourceDir, Seq("data-nodate-part-00002.txt"), sourceDirFullPath20180103) + val targetSchemaEvolved = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema_evolved.json")).asInstanceOf[StructType] + val dataReaderEvolved = FileReader.newJsonFileReader(Some(targetSchemaEvolved)) + + fs.delete(paramsFileModdedRegexHdfsPath, false) + val paramsEvolvedFileName: String = "params_evolved.json" + uploadParameters(testResourceDir, paramsEvolvedFileName, paramsFileModdedRegexHdfsPath) + + AppendLoad(spark, dfs, paramsFileModdedRegexHdfsPath.toString).run() + // validating result + val expectedDataLocation = resolveResource(s"$testResourceDir/lake_data_post.txt", withProtocol = true) + val expectedDf = dataReaderEvolved.read(spark, expectedDataLocation) + val actualDf = spark.read.schema(targetSchemaEvolved).parquet(targetTableLocation) + + actualDf.hasDiff(expectedDf) shouldBe false + + fs.exists(targetPath20180102) shouldBe true + fs.exists(headerPath20180102) shouldBe true + } + + scenario("SemiStructured Data can be loaded with append mode with dropping columns") { + val tableNameJson: String = "test_table_semistructured" + val paramsFileModdedRegexHdfsPath: Path = new Path(hdfsRootTestPath, paramsFileName) + val sourceDirFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=02/") + val targetDirFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson/data") + val headerDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/header") + val targetDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson") + + fs.mkdirs(sourceDirFullPath) + fs.mkdirs(headerDirPathPartFromFullPath) + fs.mkdirs(targetDirPathPartFromFullPath) + fs.mkdirs(targetDirFullPath) + + val testResourceDir = "semistructured_json_load_dropping_column" + val headerPath20180102 = new Path(headerDirPathPartFromFullPath, "year=2018/month=1/day=2/header.json") + val headerPath20180103 = new Path(headerDirPathPartFromFullPath, "year=2018/month=1/day=3/header.json") + val targetPath20180102 = new Path(targetDirFullPath, "year=2018/month=1/day=2") + val targetPath20180103 = new Path(targetDirFullPath, "year=2018/month=1/day=3") + + val targetSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema.json")).asInstanceOf[StructType] + val targetTableLocation = fs.makeQualified(new Path(hdfsRootTestPath, targetDirPathPartFromFullPath)).toString + + val dataReader = FileReader.newJsonFileReader(Some(targetSchema)) + val dataFormat = ParquetFormat() + val dataWriter = OutputWriter.newFileSystemWriter(targetDirFullPath.toString, dataFormat, Seq("year", "month", "day")) + + setupInitialState(s"$testResourceDir/lake_data_pre.txt", dataReader, dataWriter) + prepareSourceData(testResourceDir, Seq("data-nodate-part-00001.txt"), sourceDirFullPath) + uploadParameters(testResourceDir, paramsFileName, paramsFileModdedRegexHdfsPath) + + // checking pre-conditions + spark.read.json(sourceDirFullPath.toString).count() shouldBe 6 + spark.read.parquet(targetTableLocation).count() shouldBe 5 + + // executing load with old schema + AppendLoad(spark, dfs, paramsFileModdedRegexHdfsPath.toString).run() + fs.exists(headerPath20180102) shouldBe true + fs.exists(targetPath20180102) shouldBe true + + // clean up + fs.delete(sourceDirFullPath, true) + fs.delete(paramsFileModdedRegexHdfsPath, false) + + // prepare new data + val sourceDirFullPath20180103: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=03/") + val sourceDirFullPath20180104: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=04/") + prepareSourceData(testResourceDir, Seq("data-nodate-part-00002.txt"), sourceDirFullPath20180103) + prepareSourceData(testResourceDir, Seq("data-nodate-part-00003.txt"), sourceDirFullPath20180104) + val targetSchemaDroppedCol = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema_column_dropped.json")).asInstanceOf[StructType] + val dataReaderDroppedCol = FileReader.newJsonFileReader(Some(targetSchemaDroppedCol)) + val paramsDroppedColFileName: String = "params_column_dropped.json" + uploadParameters(testResourceDir, paramsDroppedColFileName, paramsFileModdedRegexHdfsPath) + + // Executing append load with the updated schema + AppendLoad(spark, dfs, paramsFileModdedRegexHdfsPath.toString).run() + + // validating result + val expectedDataLocation = resolveResource(s"$testResourceDir/lake_data_post.txt", withProtocol = true) + val expectedDf = dataReaderDroppedCol.read(spark, expectedDataLocation) + val actualDf = spark.read.schema(targetSchemaDroppedCol).parquet(targetTableLocation) + + actualDf.hasDiff(expectedDf) shouldBe false + + fs.exists(headerPath20180103) shouldBe true + fs.exists(targetPath20180103) shouldBe true + + } + + scenario("SemiStructured Data cannot be loaded when data contains more columns than target schema") { + val tableNameJson: String = "test_table_semistructured" + val paramsFileModdedRegexHdfsPath: Path = new Path(hdfsRootTestPath, paramsFileName) + val sourceDirFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=02/") + val targetDirFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson/data") + val headerDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/header") + val targetDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson") + + fs.mkdirs(sourceDirFullPath) + fs.mkdirs(headerDirPathPartFromFullPath) + fs.mkdirs(targetDirPathPartFromFullPath) + fs.mkdirs(targetDirFullPath) + + val testResourceDir = "semistructured_json_load_mismatching_schema" + val headerPath20180102 = new Path(headerDirPathPartFromFullPath, "year=2018/month=1/day=2/header.json") + val targetPath20180102 = new Path(targetDirFullPath, "year=2018/month=1/day=2") + + val targetSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema.json")).asInstanceOf[StructType] + val targetTableLocation = fs.makeQualified(new Path(hdfsRootTestPath, targetDirPathPartFromFullPath)).toString + + val dataReader = FileReader.newJsonFileReader(Some(targetSchema)) + val dataFormat = ParquetFormat() + val dataWriter = OutputWriter.newFileSystemWriter(targetDirFullPath.toString, dataFormat, Seq("year", "month", "day")) + + setupInitialState(s"$testResourceDir/lake_data_pre.txt", dataReader, dataWriter) + prepareSourceData(testResourceDir, Seq("data-nodate-part-00001.txt"), sourceDirFullPath) + uploadParameters(testResourceDir, paramsFileName, paramsFileModdedRegexHdfsPath) + + // checking pre-conditions + spark.read.json(sourceDirFullPath.toString).count() shouldBe 6 + spark.read.parquet(targetTableLocation).count() shouldBe 5 + + // executing load with old schema + val caught = intercept[RuntimeException] { + AppendLoad(spark, dfs, paramsFileModdedRegexHdfsPath.toString).run() + } + logger.info(caught.getMessage) + assert(caught.getMessage.equals(s"Schema does not match the input data for some of the input folders.")) + + // validating result + val expectedDataLocation = resolveResource(s"$testResourceDir/lake_data_post.txt", withProtocol = true) + val expectedDf = dataReader.read(spark, expectedDataLocation) + val actualDf = spark.read.schema(targetSchema).parquet(targetTableLocation) + + actualDf.hasDiff(expectedDf) shouldBe false + + fs.exists(headerPath20180102) shouldBe false + fs.exists(targetPath20180102) shouldBe false + } + + scenario("SemiStructured Data cannot be loaded with wrong configuration") { + val tableNameJson: String = "test_table_semistructured" + val paramsFileModdedRegexHdfsPath: Path = new Path(hdfsRootTestPath, paramsFileName) + val sourceDirFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=02/") + val targetDirFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson/data") + val headerDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/header") + val targetDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson") + + fs.mkdirs(sourceDirFullPath) + fs.mkdirs(headerDirPathPartFromFullPath) + fs.mkdirs(targetDirPathPartFromFullPath) + fs.mkdirs(targetDirFullPath) + + val testResourceDir = "semistructured_json_load_wrong_configuration" + val targetSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema.json")).asInstanceOf[StructType] + + val dataReader = FileReader.newJsonFileReader(Some(targetSchema)) + val dataFormat = ParquetFormat() + val dataWriter = OutputWriter.newFileSystemWriter(targetDirFullPath.toString, dataFormat, Seq("year", "month", "day")) + + setupInitialState(s"$testResourceDir/lake_data_pre.txt", dataReader, dataWriter) + prepareSourceData(testResourceDir, Seq("data-nodate-part-00001.txt"), sourceDirFullPath) + uploadParameters(testResourceDir, paramsFileName, paramsFileModdedRegexHdfsPath) + + val caught = intercept[RuntimeException]{ + AppendLoad(spark, dfs, paramsFileModdedRegexHdfsPath.toString).run() + } + logger.info(caught.getMessage) + assert(caught.getMessage.equals(s"Unsupported data type: unstructured in AppendLoad or the configuration file is malformed.")) + } + + scenario("Loading semistructured data when some header files are available and schemas are the same") { + val tableNameJson: String = "test_table_semistructured" + val paramsFileModdedRegexHdfsPath: Path = new Path(hdfsRootTestPath, paramsFileName) + val sourceDirFullPath20180101: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=01/") + val sourceDirFullPath20180102: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/data/year=2018/month=01/day=02/") + val targetDirFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson/data") + val headerDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$sourceDatabase/$tableNameJson/header") + val targetDirPathPartFromFullPath: Path = new Path(hdfsRootTestPath, s"$targetDatabase/$tableNameJson") + + fs.mkdirs(sourceDirFullPath20180102) + fs.mkdirs(headerDirPathPartFromFullPath) + fs.mkdirs(targetDirPathPartFromFullPath) + fs.mkdirs(targetDirFullPath) + + val testResourceDir = "semistructured_load_with_existing_header" + val headerPath20180101 = new Path(headerDirPathPartFromFullPath, "year=2018/month=1/day=1/header.json") + val headerPath20180102 = new Path(headerDirPathPartFromFullPath, "year=2018/month=1/day=2/header.json") + val targetPath20180101 = new Path(targetDirFullPath, "year=2018/month=1/day=1") + val targetPath20180102 = new Path(targetDirFullPath, "year=2018/month=1/day=2") + + val targetSchema = DataType.fromJson(getResourceAsText(s"$testResourceDir/target_schema.json")).asInstanceOf[StructType] + val targetTableLocation = fs.makeQualified(new Path(hdfsRootTestPath, targetDirPathPartFromFullPath)).toString + + val dataReader = FileReader.newJsonFileReader(Some(targetSchema)) + val dataFormat = ParquetFormat() + val dataWriter = OutputWriter.newFileSystemWriter(targetDirFullPath.toString, dataFormat, Seq("year", "month", "day")) + setupInitialState(s"$testResourceDir/lake_data_pre.txt", dataReader, dataWriter) + copyResourceFileToHdfs(s"$testResourceDir/20180101_schema.json", headerPath20180101) + prepareSourceData(testResourceDir, Seq("data-nodate-part-00001.txt"), sourceDirFullPath20180101) + prepareSourceData(testResourceDir, Seq("data-nodate-part-00002.txt"), sourceDirFullPath20180102) + uploadParameters(testResourceDir, paramsFileName, paramsFileModdedRegexHdfsPath) + + // checking pre-conditions + spark.read.json(sourceDirFullPath20180102.toString).count() shouldBe 6 + spark.read.parquet(targetTableLocation).count() shouldBe 5 + + fs.exists(targetPath20180101) shouldBe true + fs.exists(targetPath20180102) shouldBe false + + fs.exists(headerPath20180101) shouldBe true + fs.exists(headerPath20180102) shouldBe false + + val expectedSchema20180101 = DataType.fromJson(getResourceAsText(s"$testResourceDir/20180101_schema.json")).asInstanceOf[StructType] + val expectedSchema20180102 = StructType(targetSchema.dropRight(3)) + + // executing load + AppendLoad(spark, dfs, paramsFileHdfsPath.toString).run() + + // validating result + val expectedDataLocation = resolveResource(s"$testResourceDir/lake_data_post.txt", withProtocol = true) + val expectedDf = dataReader.read(spark, expectedDataLocation) + val actualDf = spark.read.schema(targetSchema).parquet(targetTableLocation) + + actualDf.hasDiff(expectedDf) shouldBe false + + fs.exists(targetPath20180101) shouldBe true + fs.exists(targetPath20180102) shouldBe true + + fs.exists(headerPath20180101) shouldBe true + fs.exists(headerPath20180102) shouldBe true + + val actualSchema20180101 = DataType.fromJson(fs.readFile(headerPath20180101)).asInstanceOf[StructType] + val actualSchema20180105 = DataType.fromJson(fs.readFile(headerPath20180102)).asInstanceOf[StructType] + + actualSchema20180101 shouldBe expectedSchema20180101 + actualSchema20180105 shouldBe expectedSchema20180102 + } + } + + override def beforeEach(): Unit = { + super.beforeEach() + fs.mkdirs(sourceDirPath) + fs.mkdirs(headerDirPath) + fs.mkdirs(targetDirPath) + } + + private def uploadParameters(testResourceDir: String, whichParamsFile: String = paramsFileName, whichParamsPath: Path = paramsFileHdfsPath): Unit = { + copyResourceFileToHdfs(s"$testResourceDir/$whichParamsFile", whichParamsPath) + } + + private def prepareSourceData(testResourceDir: String, sourceFiles: Seq[String], sourceDirPath: Path = sourceDirPath): Unit = { + sourceFiles.foreach(file => copyResourceFileToHdfs(s"$testResourceDir/$file", sourceDirPath)) + } + + private def setupInitialState(localDataFile: String, dataReader: FileReader, dataWriter: OutputWriter): Unit = { + val initialDataLocation = resolveResource(localDataFile, withProtocol = true) + dataWriter.write(dfs, dataReader.read(spark, initialDataLocation)) + } +}