Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alter column datatype #14771

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1313,7 +1313,7 @@ private void testOneCastWithRow(
}

@SuppressWarnings("SameParameterValue")
private Object genValue(TSDataType dataType, int i) {
public static Object genValue(TSDataType dataType, int i) {
switch (dataType) {
case INT32:
return i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,5 @@ public enum CnToDnAsyncRequestType {
ROLLBACK_TABLE_DEVICE_BLACK_LIST,
INVALIDATE_MATCHED_TABLE_DEVICE_CACHE,
DELETE_DATA_FOR_TABLE_DEVICE,
DELETE_TABLE_DEVICE_IN_BLACK_LIST,
DELETE_TABLE_DEVICE_IN_BLACK_LIST
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlanV1;
import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AlterColumnDataTypePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan;
Expand Down Expand Up @@ -404,6 +405,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case PreDeleteColumn:
plan = new PreDeleteColumnPlan();
break;
case AlterColumnDataType:
plan = new AlterColumnDataTypePlan();
break;
case CommitDeleteColumn:
plan = new CommitDeleteColumnPlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ public enum ConfigPhysicalPlanType {
DescTable((short) 862),
ShowTable4InformationSchema((short) 863),
DescTable4InformationSchema((short) 864),
AlterColumnDataType((short) 865),
CommitAlterColumnDataType((short) 866),

/** Deprecated types for sync, restored them for upgrade. */
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.request.write.table;

import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;

import org.apache.tsfile.enums.TSDataType;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

public class AlterColumnDataTypePlan extends AbstractTableColumnPlan {

private TSDataType newType;

public AlterColumnDataTypePlan() {
super(ConfigPhysicalPlanType.AlterColumnDataType);
}

public AlterColumnDataTypePlan(
String database, String tableName, String columnName, TSDataType newType) {
super(ConfigPhysicalPlanType.AlterColumnDataType, database, tableName, columnName);
this.newType = newType;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
super.serializeImpl(stream);
stream.writeInt(newType.serialize());
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
super.deserializeImpl(buffer);
newType = TSDataType.deserializeFrom(buffer);
}

public void setNewType(TSDataType newType) {
this.newType = newType;
}

public TSDataType getNewType() {
return newType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,28 @@
import org.apache.iotdb.confignode.rpc.thrift.TDescTableResp;
import org.apache.iotdb.consensus.common.DataSet;

import org.apache.tsfile.enums.TSDataType;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public class DescTableResp implements DataSet {
private final TSStatus status;
private final TsTable table;
private final Set<String> preDeletedColumns;
private final Map<String, TSDataType> preAlteredColumns;

public DescTableResp(
final TSStatus status, final TsTable table, final Set<String> preDeletedColumns) {
final TSStatus status,
final TsTable table,
final Set<String> preDeletedColumns,
final Map<String, TSDataType> preAlteredColumns) {
this.status = status;
this.table = table;
this.preDeletedColumns = preDeletedColumns;
this.preAlteredColumns = preAlteredColumns;
}

public TDescTableResp convertToTDescTableResp() {
Expand All @@ -47,6 +56,14 @@ public TDescTableResp convertToTDescTableResp() {
Objects.nonNull(table)
? TsTableInternalRPCUtil.serializeSingleTsTable(table)
: null);
return Objects.nonNull(preDeletedColumns) ? resp.setPreDeletedColumns(preDeletedColumns) : resp;
if (Objects.nonNull(preDeletedColumns)) {
resp.setPreDeletedColumns(preDeletedColumns);
}
if (Objects.nonNull(preAlteredColumns)) {
Map<String, Byte> preAlteredColumnsMap = new HashMap<>();
preAlteredColumns.forEach((col, type) -> preAlteredColumnsMap.put(col, type.serialize()));
resp.setPreAlteredColumns(preAlteredColumnsMap);
}
return resp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2660,8 +2660,11 @@ public TSStatus alterOrDropTable(final TAlterOrDropTableReq req) {
return procedureManager.alterTableDropColumn(req);
case DROP_TABLE:
return procedureManager.dropTable(req);
case ALTER_COLUMN_DATA_TYPE:
return procedureManager.alterTableColumnDataType(req);
default:
throw new IllegalArgumentException();
throw new IllegalArgumentException(
AlterOrDropTableOperationType.getType(req.operationType).toString());
}
} else {
return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.table.AbstractAlterOrDropTableProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.table.AddTableColumnProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.table.AlterTableColumnDataTypeProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.table.CreateTableProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.table.DeleteDevicesProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.table.DropTableColumnProcedure;
Expand Down Expand Up @@ -144,6 +145,7 @@
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.ratis.util.AutoCloseableLock;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
Expand Down Expand Up @@ -1820,6 +1822,22 @@ public TSStatus alterTableDropColumn(final TAlterOrDropTableReq req) {
false));
}

public TSStatus alterTableColumnDataType(TAlterOrDropTableReq req) {
return executeWithoutDuplicate(
req.database,
null,
req.tableName,
req.queryId,
ProcedureType.ALTER_TABLE_COLUMN_DATATYPE_PROCEDURE,
new AlterTableColumnDataTypeProcedure(
req.database,
req.tableName,
req.queryId,
ReadWriteIOUtils.readVarIntString(req.updateInfo),
TSDataType.deserialize(req.updateInfo.get()),
false));
}

public TSStatus dropTable(final TAlterOrDropTableReq req) {
return executeWithoutDuplicate(
req.database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1215,7 +1216,7 @@ public synchronized Pair<TSStatus, TsTable> tableColumnCheckForColumnExtension(
null);
}

final TsTable expandedTable = TsTable.deserialize(ByteBuffer.wrap(originalTable.serialize()));
final TsTable expandedTable = new TsTable(originalTable);

final String errorMsg =
String.format(
Expand All @@ -1238,6 +1239,33 @@ public synchronized Pair<TSStatus, TsTable> tableColumnCheckForColumnExtension(
return new Pair<>(RpcUtils.SUCCESS_STATUS, expandedTable);
}

public synchronized Pair<TSStatus, TsTable> tableColumnCheckForColumnAltering(
final String database,
final String tableName,
final String columnName,
final TSDataType dataType)
throws MetadataException {
final TsTable originalTable = getTableIfExists(database, tableName).orElse(null);

if (Objects.isNull(originalTable)) {
return new Pair<>(
RpcUtils.getStatus(
TSStatusCode.TABLE_NOT_EXISTS,
String.format("Table '%s.%s' does not exist", database, tableName)),
null);
}
TSStatus tsStatus =
clusterSchemaInfo.preAlterColumnDataType(database, tableName, columnName, dataType);
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return new Pair<>(tsStatus, null);
}

final TsTable alteredTable = new TsTable(originalTable);
alteredTable.getColumnSchema(columnName).setDataType(dataType);

return new Pair<>(RpcUtils.SUCCESS_STATUS, alteredTable);
}

public synchronized Pair<TSStatus, TsTable> tableColumnCheckForColumnRenaming(
final String database, final String tableName, final String oldName, final String newName)
throws MetadataException {
Expand All @@ -1251,7 +1279,7 @@ public synchronized Pair<TSStatus, TsTable> tableColumnCheckForColumnRenaming(
null);
}

final TsTable expandedTable = TsTable.deserialize(ByteBuffer.wrap(originalTable.serialize()));
final TsTable expandedTable = new TsTable(originalTable);

final TsTableColumnSchema schema = originalTable.getColumnSchema(oldName);
if (Objects.isNull(schema)) {
Expand Down Expand Up @@ -1370,7 +1398,7 @@ public synchronized Pair<TSStatus, TsTable> updateTableProperties(
return new Pair<>(RpcUtils.SUCCESS_STATUS, null);
}

final TsTable updatedTable = TsTable.deserialize(ByteBuffer.wrap(originalTable.serialize()));
final TsTable updatedTable = new TsTable(originalTable);
updatedProperties.forEach(
(k, v) -> {
originalProperties.put(k, originalTable.getPropValue(k).orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.AlterColumnDataTypePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitDeleteTablePlan;
Expand Down Expand Up @@ -578,6 +579,9 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
return clusterSchemaInfo.preDeleteTable((PreDeleteTablePlan) physicalPlan);
case CommitDeleteTable:
return clusterSchemaInfo.dropTable((CommitDeleteTablePlan) physicalPlan);
case AlterColumnDataType:
return clusterSchemaInfo.commitAlterColumnDataType(
((AlterColumnDataTypePlan) physicalPlan));
case CreatePipeV2:
return pipeInfo.createPipe((CreatePipePlanV2) physicalPlan);
case SetPipeStatusV2:
Expand Down
Loading
Loading