Skip to content

Commit

Permalink
delete segment expect remaining numbe
Browse files Browse the repository at this point in the history
Revise

modify code

add SI test
add no partition table teset
  • Loading branch information
W1thOut authored and W1thOut committed Aug 18, 2021
1 parent d8f7df9 commit f8fdb75
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val REGISTER = carbonKeyWord("REGISTER")
protected val PROPERTIES = carbonKeyWord("PROPERTIES")
protected val REFRESH = carbonKeyWord("REFRESH")
protected val EXPECT = carbonKeyWord("EXPECT")
protected val REMAIN_NUMBER = carbonKeyWord("REMAIN_NUMBER")

// For materialized view
// Keywords used in this parser
Expand Down Expand Up @@ -353,4 +355,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
p.getClass.getSimpleName.equals("FloatLit") ||
p.getClass.getSimpleName.equals("DecimalLit")
}) ^^ (_.chars)

protected lazy val number: Parser[Int] = numericLit ^^ (_.toInt)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command.management

import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}

import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.events.{withEvents, DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent}

/**
* A command for delete by remaining number.
* In general, keep the latest segment.
*
* @param remaining expected remaining quantity after deletion
*/
case class CarbonDeleteLoadByRemainNumberCommand(
remaining: Int,
databaseNameOp: Option[String],
tableName: String)
extends DataCommand {

override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
setAuditTable(carbonTable)
setAuditInfo(Map("remaining number" -> remaining.toString))
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}

// if insert overwrite in progress, do not allow delete segment
if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")
}

val segments = CarbonStore.readSegments(carbonTable.getTablePath, showHistory = false, None)

// Through the remaining number, get the delete id
val deleteSegmentIds = segments.filter(segment => segment.getSegmentStatus == SegmentStatus.SUCCESS
|| segment.getSegmentStatus == SegmentStatus.COMPACTED)
.sortBy(_.getLoadStartTime)
.map(_.getLoadName)
.reverse
.drop(remaining)

if (deleteSegmentIds.length == 0) {
return Seq.empty
}

withEvents(DeleteSegmentByIdPreEvent(carbonTable, deleteSegmentIds, sparkSession),
DeleteSegmentByIdPostEvent(carbonTable, deleteSegmentIds, sparkSession)) {
CarbonStore.deleteLoadById(
deleteSegmentIds,
CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
tableName,
carbonTable
)
}
Seq.empty
}

override protected def opName: String = "DELETE SEGMENT BY REMAIN_NUMBER"
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {

protected lazy val segmentManagement: Parser[LogicalPlan] =
deleteSegmentByID | deleteSegmentByLoadDate | deleteStage | cleanFiles | addSegment |
showSegments
showSegments | deleteSegmentByRemainNumber

protected lazy val restructure: Parser[LogicalPlan] = alterTableDropColumn

Expand Down Expand Up @@ -508,6 +508,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
CarbonDeleteLoadByIdCommand(loadIds, dbName, tableName.toLowerCase())
}

protected lazy val deleteSegmentByRemainNumber: Parser[LogicalPlan] =
DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~
(EXPECT ~> (SEGMENT ~ "." ~ REMAIN_NUMBER) ~> "=" ~> number) <~
opt(";") ^^ {
case dbName ~ tableName ~ remaining =>
CarbonDeleteLoadByRemainNumberCommand(remaining, dbName, tableName.toLowerCase())
}

protected lazy val deleteSegmentByLoadDate: Parser[LogicalPlan] =
DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
(WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.deletesegment

import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties

/**
* test class for testing the delete segment expect remaining number.
*/
class DeleteSegmentByRemainNumberTestCase extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
val DELETED_STATUS = "Marked for Delete"

val SUCCESSFUL_STATUS = "Success"

override def beforeAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
}

override def beforeEach(): Unit = {
sql("drop table if exists deleteSegmentPartitionTable")
sql("drop table if exists deleteSegmentTable")
sql("drop table if exists indexTable")
sql(
"CREATE table deleteSegmentPartitionTable (ID int, date String, country String, name " +
"String, phonetype String, serialname String, salary String) STORED AS carbondata " +
"PARTITIONED by(age int)"
)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='20')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='30')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv'
| INTO TABLE deleteSegmentPartitionTable PARTITION (age='40')
| OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)

sql(
"CREATE table deleteSegmentTable (ID int, date String, country String, name " +
"String, phonetype String, serialname String, salary String) STORED AS carbondata"
)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv'
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
sql(
s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv'
| INTO TABLE deleteSegmentTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)

sql("create index indexTable on table deleteSegmentTable(country) as 'carbondata'" +
"properties('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
}

override def afterAll(): Unit = {
sql("drop table if exists deleteSegmentTable")
sql("drop table if exists deleteSegmentPartitionTable")
sql("drop table if exists indexTable")
}

test("delete segment, remain_number = 1") {
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
val segments1 = sql("show segments on deleteSegmentTable").collect()
assertResult(SUCCESSFUL_STATUS)(segments1(0).get(1))
assertResult(DELETED_STATUS)(segments1(1).get(1))
assertResult(DELETED_STATUS)(segments1(2).get(1))
assertResult(sql("select * from indexTable").count())(10)

sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1")
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
assertResult(SUCCESSFUL_STATUS)(segments2(0).get(1))
assertResult(DELETED_STATUS)(segments2(1).get(1))
assertResult(DELETED_STATUS)(segments2(2).get(1))
}

test("delete segment, remain nothing") {
sql("delete from table deleteSegmentTable expect segment.remain_number = 0")
val segments1 = sql("show segments on deleteSegmentTable").collect()
segments1.foreach(row => assertResult(DELETED_STATUS)(row.get(1)))
assertResult(sql("select * from indexTable").count())(0)

sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 0")
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
segments2.foreach(row => assertResult(DELETED_STATUS)(row.get(1)))
}

test("delete segment, remain all") {
sql("delete from table deleteSegmentTable expect segment.remain_number = 3")
val segments1 = sql("show segments on deleteSegmentTable").collect()
segments1.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1)))
assertResult(sql("select * from indexTable").count())(30)

sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 3")
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
segments2.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1)))
}

test("delete segment, remain_number is invalid") {
val ex1 = intercept[Exception] {
sql("delete from table deleteSegmentTable expect segment.remain_number = -1")
}
assert(ex1.getMessage.contains("not found"))
val ex2 = intercept[Exception] {
sql("delete from table deleteSegmentTable expect segment.remain_number = 2147483648")
}
assert(ex2.getMessage.contains("SqlParse"))
assertResult(sql("select * from indexTable").count())(30)

val ex3 = intercept[Exception] {
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = -1")
}
assert(ex3.getMessage.contains("not found"))
val ex4 = intercept[Exception] {
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 2147483648")
}
assert(ex4.getMessage.contains("SqlParse"))
}

test("delete segment after update") {
sql("update deleteSegmentPartitionTable d set (d.country) = ('fr') where d.country = 'aus'")
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1")
val segments2 = sql("select * from deleteSegmentPartitionTable").collect()
segments2.foreach(row => assertResult("fr")(row(2)))
}

test("delete segment after delete newest segment by segmentId") {
sql("delete from table deleteSegmentTable where segment.id in (2)")
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
val segments1 = sql("show segments on deleteSegmentTable").collect()
assertResult(DELETED_STATUS)(segments1(0).get(1))
assertResult(SUCCESSFUL_STATUS)(segments1(1).get(1))
assertResult(DELETED_STATUS)(segments1(2).get(1))
assertResult(sql("select * from indexTable").count())(10)

sql("delete from table deleteSegmentPartitionTable where segment.id in (2)")
sql("delete from table deleteSegmentPartitionTable expect segment.remain_number = 1")
val segments2 = sql("show segments on deleteSegmentPartitionTable").collect()
assertResult(DELETED_STATUS)(segments2(0).get(1))
assertResult(SUCCESSFUL_STATUS)(segments2(1).get(1))
assertResult(DELETED_STATUS)(segments2(2).get(1))
}
}

0 comments on commit f8fdb75

Please sign in to comment.