diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 5ec131db..96226f2b 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -133,6 +133,14 @@ func main() { os.Exit(1) } + if err := handlersRegistry.Add( + types.OperationTypeDB, + handlers.NewDBOperationHandler(dbConnector, s3Connector, queries.NewWriteTableQuery), + ); err != nil { + xlog.Error(ctx, "failed to register DB handler", zap.Error(err)) + os.Exit(1) + } + processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry) wg.Add(1) diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index 9d1333b4..e172cc5f 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -202,6 +202,29 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { SourcePaths: sourcePathsSlice, Audit: auditFromDb(creator, createdAt, completedAt), }, nil + } else if operationType == string(types.OperationTypeDB) { + if backupId == nil { + return nil, fmt.Errorf("failed to read backup_id for DB operation: %s", operationId) + } + + var pathPrefix string + if len(sourcePathsSlice) > 0 { + pathPrefix = sourcePathsSlice[0] + } + + return &types.DeleteBackupOperation{ + ID: operationId, + BackupID: *backupId, + ContainerID: containerId, + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: databaseEndpoint, + DatabaseName: databaseName, + }, + State: operationState, + Message: StringOrEmpty(message), + Audit: auditFromDb(creator, createdAt, completedAt), + PathPrefix: pathPrefix, + }, nil } return &types.GenericOperation{ID: operationId}, nil diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 2a606458..05e8350e 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -61,6 +61,7 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i d.AddValueParam("$id", table_types.StringValueFromString(operation.GetID())) d.AddValueParam("$type", table_types.StringValueFromString(string(operation.GetType()))) d.AddValueParam("$status", table_types.StringValueFromString(operation.GetState().String())) + d.AddValueParam("$message", table_types.StringValueFromString(operation.GetMessage())) if operation.GetAudit() != nil { d.AddValueParam( "$initiated", @@ -103,7 +104,6 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i "$operation_id", table_types.StringValueFromString(tb.YdbOperationId), ) - d.AddValueParam("$message", table_types.StringValueFromString(tb.Message)) if len(tb.SourcePaths) > 0 { d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(tb.SourcePaths, ","))) } @@ -138,11 +138,37 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i "$operation_id", table_types.StringValueFromString(rb.YdbOperationId), ) - d.AddValueParam("$message", table_types.StringValueFromString(rb.Message)) if len(rb.SourcePaths) > 0 { d.AddValueParam("$paths", table_types.StringValueFromString(strings.Join(rb.SourcePaths, ","))) } + } else if operation.GetType() == types.OperationTypeDB { + db, ok := operation.(*types.DeleteBackupOperation) + if !ok { + xlog.Error(ctx, "error cast operation to DeleteBackupOperation", zap.String("operation_id", operation.GetID())) + } + + d.AddValueParam( + "$container_id", table_types.StringValueFromString(db.ContainerID), + ) + + d.AddValueParam( + "$database", + table_types.StringValueFromString(db.YdbConnectionParams.DatabaseName), + ) + + d.AddValueParam( + "$endpoint", + table_types.StringValueFromString(db.YdbConnectionParams.Endpoint), + ) + + d.AddValueParam( + "$backup_id", + table_types.StringValueFromString(db.BackupID), + ) + + d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix)) + } else { xlog.Error(ctx, "unknown operation type write to db", zap.String("operation_type", string(operation.GetType()))) } diff --git a/internal/connectors/s3/connector.go b/internal/connectors/s3/connector.go index 86ce0470..bae93caa 100644 --- a/internal/connectors/s3/connector.go +++ b/internal/connectors/s3/connector.go @@ -13,6 +13,7 @@ import ( type S3Connector interface { ListObjects(pathPrefix string, bucket string) ([]string, error) GetSize(pathPrefix string, bucket string) (int64, error) + DeleteObjects(keys []string, bucket string) error } type ClientS3Connector struct { @@ -104,3 +105,35 @@ func (c *ClientS3Connector) GetSize(pathPrefix string, bucket string) (int64, er return size, nil } + +func (c *ClientS3Connector) DeleteObjects(keys []string, bucket string) error { + if len(keys) == 0 { + return nil + } + + objectsPtr := make([]*s3.ObjectIdentifier, len(keys)) + for i, object := range keys { + objectsPtr[i] = &s3.ObjectIdentifier{ + Key: aws.String(object), + } + } + + input := s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &s3.Delete{ + Objects: objectsPtr, + Quiet: aws.Bool(true), + }, + } + + delOut, err := c.s3.DeleteObjects(&input) + if err != nil { + return err + } + + if len(delOut.Errors) > 0 { + return fmt.Errorf("can't delete objects: %v", delOut.Errors) + } + + return nil +} diff --git a/internal/handlers/delete_backup.go b/internal/handlers/delete_backup.go new file mode 100644 index 00000000..b93ee1c1 --- /dev/null +++ b/internal/handlers/delete_backup.go @@ -0,0 +1,125 @@ +package handlers + +import ( + "context" + "fmt" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" + "ydbcp/internal/connectors/db" + "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/connectors/s3" + "ydbcp/internal/types" + "ydbcp/internal/util/xlog" +) + +func NewDBOperationHandler( + db db.DBConnector, s3 s3.S3Connector, getQueryBuilder func(ctx context.Context) queries.WriteTableQuery, +) types.OperationHandler { + return func(ctx context.Context, op types.Operation) error { + return DBOperationHandler(ctx, op, db, s3, getQueryBuilder) + } +} + +func DBOperationHandler( + ctx context.Context, + operation types.Operation, + db db.DBConnector, + s3 s3.S3Connector, + getQueryBuilder func(ctx context.Context) queries.WriteTableQuery, +) error { + xlog.Info( + ctx, "received operation", + zap.String("id", operation.GetID()), + zap.String("type", string(operation.GetType())), + zap.String("state", string(operation.GetState())), + zap.String("message", operation.GetMessage()), + ) + + if operation.GetType() != types.OperationTypeDB { + return fmt.Errorf( + "wrong type %s != %s for operation %s", + operation.GetType(), types.OperationTypeDB, types.OperationToString(operation), + ) + } + + dbOp, ok := operation.(*types.DeleteBackupOperation) + if !ok { + return fmt.Errorf("can't cast operation to DeleteBackupOperation %s", types.OperationToString(operation)) + } + + backups, err := db.SelectBackups( + ctx, queries.NewReadTableQuery( + queries.WithTableName("Backups"), + queries.WithSelectFields(queries.AllBackupFields...), + queries.WithQueryFilters( + queries.QueryFilter{ + Field: "id", + Values: []table_types.Value{table_types.StringValueFromString(dbOp.BackupID)}, + }, + ), + ), + ) + + if err != nil { + return fmt.Errorf("can't select backups: %v", err) + } + + if len(backups) == 0 { + return fmt.Errorf("backup not found") + } + + backupToWrite := types.Backup{ + ID: dbOp.BackupID, + Status: types.BackupStateUnknown, + } + + deleteBackup := func(pathPrefix string, bucket string) error { + objects, err := s3.ListObjects(pathPrefix, bucket) + if err != nil { + return fmt.Errorf("failed to list S3 objects: %v", err) + } + + if len(objects) != 0 { + err = s3.DeleteObjects(objects, bucket) + if err != nil { + return fmt.Errorf("failed to delete S3 objects: %v", err) + } + } + + backupToWrite.Status = types.BackupStateDeleted + operation.SetState(types.OperationStateDone) + operation.SetMessage("Success") + operation.GetAudit().CompletedAt = timestamppb.Now() + return nil + } + + switch dbOp.State { + case types.OperationStatePending: + { + operation.SetState(types.OperationStateRunning) + err := db.UpdateOperation(ctx, operation) + if err != nil { + return fmt.Errorf("can't update operation: %v", err) + } + + err = deleteBackup(backups[0].S3PathPrefix, backups[0].S3Bucket) + if err != nil { + return err + } + } + case types.OperationStateRunning: + { + err = deleteBackup(backups[0].S3PathPrefix, backups[0].S3Bucket) + if err != nil { + return err + } + } + default: + return fmt.Errorf("unexpected operation state %s", dbOp.State) + } + + return db.ExecuteUpsert( + ctx, getQueryBuilder(ctx).WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ) +} diff --git a/internal/server/services/backup/backupservice.go b/internal/server/services/backup/backupservice.go index dcab49cb..f5fd01d8 100644 --- a/internal/server/services/backup/backupservice.go +++ b/internal/server/services/backup/backupservice.go @@ -220,6 +220,76 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques return op.Proto(), nil } +func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRequest) (*pb.Operation, error) { + xlog.Info(ctx, "DeleteBackup", zap.String("request", req.String())) + + backupID, err := types.ParseObjectID(req.BackupId) + if err != nil { + xlog.Error(ctx, "failed to parse BackupId", zap.Error(err)) + return nil, status.Errorf(codes.InvalidArgument, "failed to parse BackupId %s: %v", req.BackupId, err) + } + + backups, err := s.driver.SelectBackups( + ctx, queries.NewReadTableQuery( + queries.WithTableName("Backups"), + queries.WithSelectFields(queries.AllBackupFields...), + queries.WithQueryFilters( + queries.QueryFilter{ + Field: "id", + Values: []table_types.Value{table_types.StringValueFromString(backupID)}, + }, + ), + ), + ) + + if err != nil { + xlog.Error(ctx, "can't select backups", zap.Error(err)) + return nil, status.Errorf(codes.Internal, "can't select backups: %v", err) + } + + if len(backups) == 0 { + return nil, status.Error(codes.NotFound, "backup not found") + } + + backup := backups[0] + + subject, err := auth.CheckAuth( + ctx, s.auth, auth.PermissionBackupCreate, backup.ContainerID, "", + ) + + if err != nil { + return nil, err + } + + if backup.Status != types.BackupStateAvailable { + return nil, status.Errorf(codes.FailedPrecondition, "backup is not available, status %s", backup.Status) + } + + op := &types.DeleteBackupOperation{ + ContainerID: backup.ContainerID, + BackupID: req.GetBackupId(), + State: types.OperationStatePending, + YdbConnectionParams: types.YdbConnectionParams{ + DatabaseName: backup.DatabaseName, + Endpoint: backup.DatabaseEndpoint, + }, + Audit: &pb.AuditInfo{ + CreatedAt: timestamppb.Now(), + Creator: subject, + }, + PathPrefix: backup.S3PathPrefix, + } + + operationID, err := s.driver.CreateOperation(ctx, op) + if err != nil { + return nil, status.Errorf(codes.Internal, "can't create operation: %v", err) + } + + op.ID = operationID + xlog.Debug(ctx, "DeleteBackup was started successfully", zap.String("operation", types.OperationToString(op))) + return op.Proto(), nil +} + func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequest) (*pb.Operation, error) { xlog.Info(ctx, "MakeRestore", zap.String("request", req.String())) diff --git a/internal/types/backup.go b/internal/types/backup.go index fe1297b0..636c3c09 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -226,6 +226,69 @@ func (o *RestoreBackupOperation) Proto() *pb.Operation { } } +type DeleteBackupOperation struct { + ID string + ContainerID string + YdbConnectionParams YdbConnectionParams + BackupID string + State OperationState + Message string + PathPrefix string + Audit *pb.AuditInfo +} + +func (o *DeleteBackupOperation) GetID() string { + return o.ID +} +func (o *DeleteBackupOperation) SetID(id string) { + o.ID = id +} +func (o *DeleteBackupOperation) GetContainerID() string { + return o.ContainerID +} +func (o *DeleteBackupOperation) GetType() OperationType { + return OperationTypeDB +} +func (o *DeleteBackupOperation) SetType(_ OperationType) { +} +func (o *DeleteBackupOperation) GetState() OperationState { + return o.State +} +func (o *DeleteBackupOperation) SetState(s OperationState) { + o.State = s +} +func (o *DeleteBackupOperation) GetMessage() string { + return o.Message +} +func (o *DeleteBackupOperation) SetMessage(m string) { + o.Message = m +} +func (o *DeleteBackupOperation) GetAudit() *pb.AuditInfo { + return o.Audit +} +func (o *DeleteBackupOperation) Copy() Operation { + copy := *o + return © +} + +func (o *DeleteBackupOperation) Proto() *pb.Operation { + return &pb.Operation{ + Id: o.ID, + ContainerId: o.ContainerID, + Type: string(OperationTypeDB), + DatabaseName: o.YdbConnectionParams.DatabaseName, + DatabaseEndpoint: o.YdbConnectionParams.Endpoint, + YdbServerOperationId: "", + BackupId: o.BackupID, + SourcePaths: []string{o.PathPrefix}, + SourcePathsToExclude: nil, + RestorePaths: nil, + Audit: o.Audit, + Status: o.State.Enum(), + Message: o.Message, + } +} + type GenericOperation struct { ID string ContainerID string @@ -296,6 +359,7 @@ var ( const ( OperationTypeTB = OperationType("TB") OperationTypeRB = OperationType("RB") + OperationTypeDB = OperationType("DB") BackupTimestampFormat = "20060102_150405" S3ForcePathStyle = true )