diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 29ce9fb6b38..7d74c689486 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -180,6 +180,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val REGISTER = carbonKeyWord("REGISTER") protected val PROPERTIES = carbonKeyWord("PROPERTIES") protected val REFRESH = carbonKeyWord("REFRESH") + protected val EXPECT = carbonKeyWord("EXPECT") + protected val REMAIN_NUMBER = carbonKeyWord("REMAIN_NUMBER") // For materialized view // Keywords used in this parser @@ -353,4 +355,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { p.getClass.getSimpleName.equals("FloatLit") || p.getClass.getSimpleName.equals("DecimalLit") }) ^^ (_.chars) + + protected lazy val number: Parser[Int] = numericLit ^^ (_.toInt) } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByRemainNumberCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByRemainNumberCommand.scala new file mode 100644 index 00000000000..bea94d58208 --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByRemainNumberCommand.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.{Checker, DataCommand} + +import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.exception.ConcurrentOperationException +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.events.{withEvents, DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent} + +/** + * A command for delete by remaining number. + * In general, keep the latest segment. + * + * @param remaining expected remaining quantity after deletion + */ +case class CarbonDeleteLoadByRemainNumberCommand( + remaining: Int, + databaseNameOp: Option[String], + tableName: String) + extends DataCommand { + + override def processData(sparkSession: SparkSession): Seq[Row] = { + Checker.validateTableExists(databaseNameOp, tableName, sparkSession) + val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + setAuditTable(carbonTable) + setAuditInfo(Map("remaining number" -> remaining.toString)) + if (!carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") + } + + // if insert overwrite in progress, do not allow delete segment + if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) { + throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment") + } + + val segments = CarbonStore.readSegments(carbonTable.getTablePath, showHistory = false, None) + + // Through the remaining number, get the delete id + val deleteSegmentIds = segments.filter(segment => segment.getSegmentStatus == SegmentStatus.SUCCESS + || segment.getSegmentStatus == SegmentStatus.COMPACTED) + .sortBy(_.getLoadStartTime) + .map(_.getLoadName) + .reverse + .drop(remaining) + + if (deleteSegmentIds.length == 0) { + return Seq.empty + } + + withEvents(DeleteSegmentByIdPreEvent(carbonTable, deleteSegmentIds, sparkSession), + DeleteSegmentByIdPostEvent(carbonTable, deleteSegmentIds, sparkSession)) { + CarbonStore.deleteLoadById( + deleteSegmentIds, + CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), + tableName, + carbonTable + ) + } + Seq.empty + } + + override protected def opName: String = "DELETE SEGMENT BY REMAIN_NUMBER" +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 56edba0a181..52757254cb1 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -79,7 +79,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { protected lazy val segmentManagement: Parser[LogicalPlan] = deleteSegmentByID | deleteSegmentByLoadDate | deleteStage | cleanFiles | addSegment | - showSegments + showSegments | deleteSegmentByRemainNumber protected lazy val restructure: Parser[LogicalPlan] = alterTableDropColumn @@ -508,6 +508,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonDeleteLoadByIdCommand(loadIds, dbName, tableName.toLowerCase()) } + protected lazy val deleteSegmentByRemainNumber: Parser[LogicalPlan] = + DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~ + (EXPECT ~> (SEGMENT ~ "." ~ REMAIN_NUMBER) ~> "=" ~> number) <~ + opt(";") ^^ { + case dbName ~ tableName ~ remaining => + CarbonDeleteLoadByRemainNumberCommand(remaining, dbName, tableName.toLowerCase()) + } + protected lazy val deleteSegmentByLoadDate: Parser[LogicalPlan] = DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~ (WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~ diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/deletesegment/DeleteSegmentByRemainNumberTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/deletesegment/DeleteSegmentByRemainNumberTestCase.scala new file mode 100644 index 00000000000..6137c057771 --- /dev/null +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/deletesegment/DeleteSegmentByRemainNumberTestCase.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.deletesegment + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +/** + * test class for testing the delete segment expect remaining number. + */ +class DeleteSegmentByRemainNumberTestCase extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach { + val DELETED_STATUS = "Marked for Delete" + + val SUCCESSFUL_STATUS = "Success" + + override def beforeAll { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy") + } + + override def beforeEach(): Unit = { + sql("drop table if exists deleteSegmentPartitionTable") + sql("drop table if exists deleteSegmentTable") + sql("drop table if exists indexTable") + sql( + "CREATE table deleteSegmentPartitionTable (ID int, date String, country String, name " + + "String, phonetype String, serialname String, salary String) STORED AS carbondata " + + "PARTITIONED by(age int)" + ) + sql( + s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv' + | INTO TABLE deleteSegmentPartitionTable PARTITION (age='20') + | OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv' + | INTO TABLE deleteSegmentPartitionTable PARTITION (age='30') + | OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv' + | INTO TABLE deleteSegmentPartitionTable PARTITION (age='40') + | OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + + sql( + "CREATE table deleteSegmentTable (ID int, date String, country String, name " + + "String, phonetype String, serialname String, salary String) STORED AS carbondata" + ) + sql( + s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv' + | INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv' + | INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv' + | INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + + sql("create index indexTable on table deleteSegmentTable(country) as 'carbondata'" + + "properties('sort_scope'='global_sort', 'Global_sort_partitions'='3')") + } + + override def afterAll(): Unit = { + sql("drop table if exists deleteSegmentTable") + sql("drop table if exists deleteSegmentPartitionTable") + sql("drop table if exists indexTable") + } + + test("delete segment, remain_number = 1") { + sql("delete from table deleteSegmentTable expect segment.remain_number = 1") + val segments1 = sql("show segments on deleteSegmentTable").collect() + assertResult(SUCCESSFUL_STATUS)(segments1(0).get(1)) + assertResult(DELETED_STATUS)(segments1(1).get(1)) + assertResult(DELETED_STATUS)(segments1(2).get(1)) + assertResult(sql("select * from indexTable").count())(10) + + sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1") + val segments2 = sql("show segments on deleteSegmentPartitionTable").collect() + assertResult(SUCCESSFUL_STATUS)(segments2(0).get(1)) + assertResult(DELETED_STATUS)(segments2(1).get(1)) + assertResult(DELETED_STATUS)(segments2(2).get(1)) + } + + test("delete segment, remain nothing") { + sql("delete from table deleteSegmentTable expect segment.remain_number = 0") + val segments1 = sql("show segments on deleteSegmentTable").collect() + segments1.foreach(row => assertResult(DELETED_STATUS)(row.get(1))) + assertResult(sql("select * from indexTable").count())(0) + + sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 0") + val segments2 = sql("show segments on deleteSegmentPartitionTable").collect() + segments2.foreach(row => assertResult(DELETED_STATUS)(row.get(1))) + } + + test("delete segment, remain all") { + sql("delete from table deleteSegmentTable expect segment.remain_number = 3") + val segments1 = sql("show segments on deleteSegmentTable").collect() + segments1.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1))) + assertResult(sql("select * from indexTable").count())(30) + + sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 3") + val segments2 = sql("show segments on deleteSegmentPartitionTable").collect() + segments2.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1))) + } + + test("delete segment, remain_number is invalid") { + val ex1 = intercept[Exception] { + sql("delete from table deleteSegmentTable expect segment.remain_number = -1") + } + assert(ex1.getMessage.contains("not found")) + val ex2 = intercept[Exception] { + sql("delete from table deleteSegmentTable expect segment.remain_number = 2147483648") + } + assert(ex2.getMessage.contains("SqlParse")) + assertResult(sql("select * from indexTable").count())(30) + + val ex3 = intercept[Exception] { + sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = -1") + } + assert(ex3.getMessage.contains("not found")) + val ex4 = intercept[Exception] { + sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 2147483648") + } + assert(ex4.getMessage.contains("SqlParse")) + } + + test("delete segment after update") { + sql("update deleteSegmentPartitionTable d set (d.country) = ('fr') where d.country = 'aus'") + sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1") + val segments2 = sql("select * from deleteSegmentPartitionTable").collect() + segments2.foreach(row => assertResult("fr")(row(2))) + } + + test("delete segment after delete newest segment by segmentId") { + sql("delete from table deleteSegmentTable where segment.id in (2)") + sql("delete from table deleteSegmentTable expect segment.remain_number = 1") + val segments1 = sql("show segments on deleteSegmentTable").collect() + assertResult(DELETED_STATUS)(segments1(0).get(1)) + assertResult(SUCCESSFUL_STATUS)(segments1(1).get(1)) + assertResult(DELETED_STATUS)(segments1(2).get(1)) + assertResult(sql("select * from indexTable").count())(10) + + sql("delete from table deleteSegmentPartitionTable where segment.id in (2)") + sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1") + val segments2 = sql("show segments on deleteSegmentPartitionTable").collect() + assertResult(DELETED_STATUS)(segments2(0).get(1)) + assertResult(SUCCESSFUL_STATUS)(segments2(1).get(1)) + assertResult(DELETED_STATUS)(segments2(2).get(1)) + } +}