Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix dynamic partition table unexpectly stop scheduling (backport #45235) #45271

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.starrocks.persist.CreateTableInfo;
import com.starrocks.persist.DropInfo;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.statistic.StatsConstants;
import com.starrocks.system.SystemInfoService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -863,6 +864,10 @@ public boolean isInfoSchemaDb() {
return fullQualifiedName.equalsIgnoreCase(InfoSchemaDb.DATABASE_NAME);
}

public boolean isStatisticsDatabase() {
return fullQualifiedName.equalsIgnoreCase(StatsConstants.STATISTICS_DB_NAME);
}

// the invoker should hold db's writeLock
public void setExist(boolean exist) {
this.exist = exist;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public class DynamicPartitionProperty {
public static final int NOT_SET_HISTORY_PARTITION_NUM = 0;
public static final String NOT_SET_PREFIX = "p";

private boolean exist;
private final boolean exists;

private boolean enable;
private boolean enabled;
private String timeUnit;
private int start;
private int end;
Expand All @@ -64,8 +64,8 @@ public class DynamicPartitionProperty {
private int historyPartitionNum;
public DynamicPartitionProperty(Map<String, String> properties) {
if (properties != null && !properties.isEmpty()) {
this.exist = true;
this.enable = Boolean.parseBoolean(properties.get(ENABLE));
this.exists = true;
this.enabled = Boolean.parseBoolean(properties.get(ENABLE));
this.timeUnit = properties.get(TIME_UNIT);
this.tz = TimeUtils.getOrSystemTimeZone(properties.get(TIME_ZONE));
// In order to compatible dynamic add partition version
Expand All @@ -79,7 +79,7 @@ public DynamicPartitionProperty(Map<String, String> properties) {
HISTORY_PARTITION_NUM, String.valueOf(NOT_SET_HISTORY_PARTITION_NUM)));
createStartOfs(properties);
} else {
this.exist = false;
this.exists = false;
}
}

Expand All @@ -99,8 +99,8 @@ private void createStartOfs(Map<String, String> properties) {
}
}

public boolean isExist() {
return exist;
public boolean isExists() {
return exists;
}

public String getTimeUnit() {
Expand All @@ -123,8 +123,8 @@ public int getBuckets() {
return buckets;
}

public boolean getEnable() {
return enable;
public boolean isEnabled() {
return enabled;
}

public StartOfDate getStartOfWeek() {
Expand Down Expand Up @@ -158,6 +158,7 @@ public int getHistoryPartitionNum() {
return historyPartitionNum;
}

<<<<<<< HEAD
public String getPropString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
Expand All @@ -167,6 +168,16 @@ public String getPropString() {
sb.append(START + ":" + start + ",");
sb.append(END + ":" + end + ",");
sb.append(PREFIX + ":" + prefix + ",");
=======
public Map<String, String> getProperties() {
Map<String, String> properties = Maps.newHashMap();
properties.put(ENABLE, String.valueOf(enabled));
properties.put(TIME_UNIT, timeUnit);
properties.put(TIME_ZONE, tz.getID());
properties.put(START, String.valueOf(start));
properties.put(END, String.valueOf(end));
properties.put(PREFIX, prefix);
>>>>>>> d97d27382e ([BugFix] Fix dynamic partition table unexpectly stop scheduling (#45235))
if (buckets > 0) {
sb.append(BUCKETS + ":" + buckets + ",");
}
Expand All @@ -190,7 +201,7 @@ public void setTimeUnit(String timeUnit) {

@Override
public String toString() {
String res = ",\n\"" + ENABLE + "\" = \"" + enable + "\""
String res = ",\n\"" + ENABLE + "\" = \"" + enabled + "\""
+ ",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\""
+ ",\n\"" + TIME_ZONE + "\" = \"" + tz.getID() + "\""
+ ",\n\"" + START + "\" = \"" + start + "\""
Expand Down
78 changes: 77 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public TableProperty getTableProperty() {
public boolean dynamicPartitionExists() {
return tableProperty != null
&& tableProperty.getDynamicPartitionProperty() != null
&& tableProperty.getDynamicPartitionProperty().isExist();
&& tableProperty.getDynamicPartitionProperty().isExists();
}

public void setBaseIndexId(long baseIndexId) {
Expand Down Expand Up @@ -2150,6 +2150,82 @@ public void onCreate() {
}

@Override
<<<<<<< HEAD
=======
public void onCreate(Database db) {
super.onCreate(db);

ColocateTableIndex colocateTableIndex = GlobalStateMgr.getCurrentState().getColocateTableIndex();
if (colocateTableIndex.isColocateTable(getId())) {
ColocateTableIndex.GroupId groupId = colocateTableIndex.getGroup(getId());
List<List<Long>> backendsPerBucketSeq = colocateTableIndex.getBackendsPerBucketSeq(groupId);
ColocatePersistInfo colocatePersistInfo = ColocatePersistInfo.createForAddTable(groupId, getId(),
backendsPerBucketSeq);
GlobalStateMgr.getCurrentState().getEditLog().logColocateAddTable(colocatePersistInfo);
}

DynamicPartitionUtil.registerOrRemovePartitionScheduleInfo(db.getId(), this);

if (Config.dynamic_partition_enable && getTableProperty().getDynamicPartitionProperty().isEnabled()) {
new Thread(() -> {
try {
GlobalStateMgr.getCurrentState().getDynamicPartitionScheduler()
.executeDynamicPartitionForTable(db.getId(), getId());
} catch (Exception ex) {
LOG.warn("Some problems were encountered in the process of triggering " +
"the execution of dynamic partitioning", ex);
}
}, "BackgroundDynamicPartitionThread").start();
}

if (isTemporaryTable()) {
TemporaryTableMgr temporaryTableMgr = GlobalStateMgr.getCurrentState().getTemporaryTableMgr();
temporaryTableMgr.addTemporaryTable(sessionId, db.getId(), name, id);
LOG.debug("add temporary table, name[{}] id[{}] session[{}]", name, id, sessionId);
}
}

private void analyzePartitionInfo() {
if (!(partitionInfo instanceof ExpressionRangePartitionInfo)) {
return;
}
ExpressionRangePartitionInfo expressionRangePartitionInfo = (ExpressionRangePartitionInfo) partitionInfo;
// currently, automatic partition only supports one expression
Expr partitionExpr = expressionRangePartitionInfo.getPartitionExprs().get(0);
// for Partition slot ref, the SlotDescriptor is not serialized, so should
// recover it here.
// the SlotDescriptor is used by toThrift, which influences the execution
// process.
List<SlotRef> slotRefs = Lists.newArrayList();
partitionExpr.collect(SlotRef.class, slotRefs);
Preconditions.checkState(slotRefs.size() == 1);
// schema change should update slot id
for (int i = 0; i < fullSchema.size(); i++) {
Column column = fullSchema.get(i);
if (column.getName().equalsIgnoreCase(slotRefs.get(0).getColumnName())) {
SlotDescriptor slotDescriptor = new SlotDescriptor(new SlotId(i), column.getName(),
column.getType(), column.isAllowNull());
slotRefs.get(0).setDesc(slotDescriptor);
}
}
}

// Remove all Tablets belonging to this table from TabletInvertedIndex
public void removeTabletsFromInvertedIndex() {
TabletInvertedIndex invertedIndex = GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
Collection<Partition> allPartitions = getAllPartitions();
for (Partition partition : allPartitions) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
}
}

// If you are modifying this function, please check if you need to modify LakeTable.onDrop also.
@Override
>>>>>>> d97d27382e ([BugFix] Fix dynamic partition table unexpectly stop scheduling (#45235))
public void onDrop(Database db, boolean force, boolean replay) {
// drop all temp partitions of this table, so that there is no temp partitions in recycle bin,
// which make things easier.
Expand Down
Loading
Loading