diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index e6da997b..b6c8b9be 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -271,7 +271,10 @@ func main() { handlersRegistry := processor.NewOperationHandlerRegistry() err = handlersRegistry.Add( - types.OperationTypeTB, handlers.MakeTBOperationHandler(dbConnector, client.NewClientYdbConnector()), + types.OperationTypeTB, + handlers.MakeTBOperationHandler( + dbConnector, client.NewClientYdbConnector(), configInstance, queries.MakeWriteTableQuery, + ), ) if err != nil { xlog.Error(ctx, "failed to register TB handler", zap.Error(err)) diff --git a/go.mod b/go.mod index 0a052210..d0ed20f1 100644 --- a/go.mod +++ b/go.mod @@ -22,9 +22,9 @@ require ( github.com/golang-jwt/jwt/v4 v4.4.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.6.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect ) diff --git a/go.sum b/go.sum index 81ae04e5..2cb8bb77 100644 --- a/go.sum +++ b/go.sum @@ -103,28 +103,28 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -147,8 +147,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= -google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= -google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= +google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/internal/connectors/db/connector.go b/internal/connectors/db/connector.go index 84d0f638..9fc10e51 100644 --- a/internal/connectors/db/connector.go +++ b/internal/connectors/db/connector.go @@ -47,6 +47,7 @@ type DBConnector interface { CreateOperation(context.Context, types.Operation) (types.ObjectID, error) CreateBackup(context.Context, types.Backup) (types.ObjectID, error) UpdateBackup(context context.Context, id types.ObjectID, backupState string) error + ExecuteUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error Close() } @@ -300,14 +301,14 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) ( func (d *YdbConnector) UpdateOperation( ctx context.Context, operation types.Operation, ) error { - return d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithUpdateOperation(operation))) + return d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery().WithUpdateOperation(operation)) } func (d *YdbConnector) CreateOperation( ctx context.Context, operation types.Operation, ) (types.ObjectID, error) { operation.SetId(types.GenerateObjectID()) - err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithCreateOperation(operation))) + err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery().WithCreateOperation(operation)) if err != nil { return types.ObjectID{}, err } @@ -319,7 +320,7 @@ func (d *YdbConnector) CreateBackup( ) (types.ObjectID, error) { id := types.GenerateObjectID() backup.ID = id - err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithCreateBackup(backup))) + err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery().WithCreateBackup(backup)) if err != nil { return types.ObjectID{}, err } @@ -333,5 +334,13 @@ func (d *YdbConnector) UpdateBackup( ID: id, Status: backupStatus, } - return d.ExecuteUpsert(context, queries.MakeWriteTableQuery(queries.WithCreateBackup(backup))) + return d.ExecuteUpsert(context, queries.MakeWriteTableQuery().WithUpdateBackup(backup)) +} + +func (d *YdbConnector) UpdateOperationAndBackup( + ctx context.Context, operation types.Operation, backup types.Backup, +) error { + return d.ExecuteUpsert( + ctx, queries.MakeWriteTableQuery().WithUpdateBackup(backup).WithUpdateOperation(operation), + ) } diff --git a/internal/connectors/db/mock.go b/internal/connectors/db/mock.go index dc9f73ea..86329679 100644 --- a/internal/connectors/db/mock.go +++ b/internal/connectors/db/mock.go @@ -114,6 +114,24 @@ func (c *MockDBConnector) UpdateOperation( return nil } +func (c *MockDBConnector) UpdateOperationAndBackup( + _ context.Context, op types.Operation, backup types.Backup, +) error { + if _, exist := c.operations[op.GetId()]; !exist { + return fmt.Errorf( + "update nonexistent operation %s", types.OperationToString(op), + ) + } + c.operations[op.GetId()] = op + if _, exist := c.backups[backup.ID]; !exist { + return fmt.Errorf( + "update nonexistent backup %s", backup.String(), + ) + } + c.backups[backup.ID] = backup + return nil +} + func (c *MockDBConnector) CreateOperation( _ context.Context, op types.Operation, ) (types.ObjectID, error) { @@ -140,8 +158,26 @@ func (c *MockDBConnector) GetOperation( ) } -func (d *MockDBConnector) SelectOperations( - ctx context.Context, queryBuilder queries.ReadTableQuery, +func (c *MockDBConnector) GetBackup( + _ context.Context, backupID types.ObjectID, +) (types.Backup, error) { + if backup, exist := c.backups[backupID]; exist { + return backup, nil + } + return types.Backup{}, fmt.Errorf( + "operation not found, id %s", backupID.String(), + ) +} + +func (c *MockDBConnector) SelectOperations( + _ context.Context, _ queries.ReadTableQuery, ) ([]types.Operation, error) { return nil, errors.New("Do not call this method") } + +func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.WriteTableQuery) error { + queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock) + c.operations[queryBuilderMock.Operation.GetId()] = queryBuilderMock.Operation + c.backups[queryBuilderMock.Backup.ID] = queryBuilderMock.Backup + return nil +} diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index a6a6c12b..b14ee6ff 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -1,10 +1,10 @@ package db import ( - "fmt" "github.com/google/uuid" "github.com/ydb-platform/ydb-go-sdk/v3/table/result" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "time" "ydbcp/internal/types" ) @@ -67,24 +67,26 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) { func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { var ( - operationId types.ObjectID - containerId string - operationType string + operationId types.ObjectID + containerId string + operationType string + createdAt int64 + backupId types.ObjectID + ydbOperationId string + database string operationStateBuf *string - backupId *types.ObjectID - ydbOperationId *string - database *string ) err := res.ScanNamed( named.Required("id", &operationId), named.Required("container_id", &containerId), named.Required("type", &operationType), + named.Required("created_at", &createdAt), + named.Required("backup_id", &backupId), + named.Required("operation_id", &ydbOperationId), + named.Required("database", &database), named.Optional("status", &operationStateBuf), - named.Optional("backup_id", &backupId), - named.Optional("operation_id", &ydbOperationId), - named.Optional("database", &database), ) if err != nil { return nil, err @@ -94,30 +96,26 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { operationState = types.OperationState(*operationStateBuf) } if operationType == string(types.OperationTypeTB) { - if backupId == nil || database == nil || ydbOperationId == nil { - return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String()) - } return &types.TakeBackupOperation{ Id: operationId, - BackupId: types.ObjectID(*backupId), + BackupId: types.ObjectID(backupId), ContainerID: containerId, State: operationState, Message: "", - YdbConnectionParams: types.GetYdbConnectionParams(*database), - YdbOperationId: *ydbOperationId, + YdbConnectionParams: types.GetYdbConnectionParams(database), + YdbOperationId: ydbOperationId, + CreatedAt: time.Unix(createdAt, 0), }, nil } else if operationType == string(types.OperationTypeRB) { - if backupId == nil || database == nil || ydbOperationId == nil { - return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String()) - } return &types.RestoreBackupOperation{ Id: operationId, - BackupId: types.ObjectID(*backupId), + BackupId: types.ObjectID(backupId), ContainerID: containerId, State: operationState, Message: "", - YdbConnectionParams: types.GetYdbConnectionParams(*database), - YdbOperationId: *ydbOperationId, + YdbConnectionParams: types.GetYdbConnectionParams(database), + YdbOperationId: ydbOperationId, + CreatedAt: time.Unix(createdAt, 0), }, nil } diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index da83cb79..0236c7e1 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -14,6 +14,10 @@ import ( type WriteTableQuery interface { FormatQuery(ctx context.Context) (*FormatQueryResult, error) + WithCreateBackup(backup types.Backup) WriteTableQuery + WithCreateOperation(operation types.Operation) WriteTableQuery + WithUpdateBackup(backup types.Backup) WriteTableQuery + WithUpdateOperation(operation types.Operation) WriteTableQuery } type WriteTableQueryImpl struct { @@ -123,42 +127,36 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl return d } -type WriteTableQueryOption func(*WriteTableQueryImpl) +type WriteTableQueryImplOption func(*WriteTableQueryImpl) -func MakeWriteTableQuery(options ...WriteTableQueryOption) *WriteTableQueryImpl { - d := &WriteTableQueryImpl{} - for _, opt := range options { - opt(d) - } - return d +type WriteTableQueryMockOption func(*WriteTableQueryMock) + +func MakeWriteTableQuery() WriteTableQuery { + return &WriteTableQueryImpl{} } -func WithCreateBackup(backup types.Backup) WriteTableQueryOption { - return func(d *WriteTableQueryImpl) { - index := len(d.tableQueries) - d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index)) - } +func (d *WriteTableQueryImpl) WithCreateBackup(backup types.Backup) WriteTableQuery { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index)) + return d } -func WithUpdateBackup(backup types.Backup) WriteTableQueryOption { - return func(d *WriteTableQueryImpl) { - index := len(d.tableQueries) - d.tableQueries = append(d.tableQueries, BuildUpdateBackupQuery(backup, index)) - } +func (d *WriteTableQueryImpl) WithUpdateBackup(backup types.Backup) WriteTableQuery { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildUpdateBackupQuery(backup, index)) + return d } -func WithUpdateOperation(operation types.Operation) WriteTableQueryOption { - return func(d *WriteTableQueryImpl) { - index := len(d.tableQueries) - d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index)) - } +func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation) WriteTableQuery { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index)) + return d } -func WithCreateOperation(operation types.Operation) WriteTableQueryOption { - return func(d *WriteTableQueryImpl) { - index := len(d.tableQueries) - d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(operation, index)) - } +func (d *WriteTableQueryImpl) WithCreateOperation(operation types.Operation) WriteTableQuery { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(operation, index)) + return d } func (d *WriteSingleTableQueryImpl) DeclareParameters() string { diff --git a/internal/connectors/db/yql/queries/write_mock.go b/internal/connectors/db/yql/queries/write_mock.go new file mode 100644 index 00000000..2a99e1e1 --- /dev/null +++ b/internal/connectors/db/yql/queries/write_mock.go @@ -0,0 +1,39 @@ +package queries + +import ( + "context" + "ydbcp/internal/types" +) + +type WriteTableQueryMock struct { + Operation types.Operation + Backup types.Backup +} + +func MakeWriteTableQueryMock() WriteTableQuery { + return &WriteTableQueryMock{} +} + +func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult, error) { + return &FormatQueryResult{}, nil +} + +func (w *WriteTableQueryMock) WithCreateBackup(backup types.Backup) WriteTableQuery { + w.Backup = backup + return w +} + +func (w *WriteTableQueryMock) WithCreateOperation(operation types.Operation) WriteTableQuery { + w.Operation = operation + return w +} + +func (w *WriteTableQueryMock) WithUpdateBackup(backup types.Backup) WriteTableQuery { + w.Backup = backup + return w +} + +func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation) WriteTableQuery { + w.Operation = operation + return w +} diff --git a/internal/connectors/db/yql/queries/write_test.go b/internal/connectors/db/yql/queries/write_test.go index 4c039aab..e7195b41 100644 --- a/internal/connectors/db/yql/queries/write_test.go +++ b/internal/connectors/db/yql/queries/write_test.go @@ -30,10 +30,9 @@ UPSERT INTO Operations (id, status, message) VALUES ($id_1, $status_1, $message_ ID: backupId, Status: "Available", } - builder := MakeWriteTableQuery( - WithUpdateBackup(backup), - WithUpdateOperation(&op), - ) + builder := MakeWriteTableQuery(). + WithUpdateBackup(backup). + WithUpdateOperation(&op) var ( queryParams = table.NewQueryParameters( table.ValueParam("$id_0", table_types.UUIDValue(backupId)), diff --git a/internal/connectors/db/yql/schema/create_tables.yql b/internal/connectors/db/yql/schema/create_tables.yql index d0f75cb8..a915f93d 100644 --- a/internal/connectors/db/yql/schema/create_tables.yql +++ b/internal/connectors/db/yql/schema/create_tables.yql @@ -37,16 +37,16 @@ CREATE TABLE Operations ( type String NOT NULL, container_id String NOT NULL, database String NOT NULL, - backup_id UUID, + backup_id UUID NOT NULL, initiated String, - created_at Timestamp, + created_at Timestamp NOT NULL, completed_at Timestamp, status String, paths String, - operation_id String, + operation_id String NOT NULL, INDEX idx_cc GLOBAL ON (container_id, created_at, id), INDEX idx_cbc GLOBAL ON (container_id, backup_id, created_at, id), diff --git a/internal/handlers/restore_backup.go b/internal/handlers/restore_backup.go index b19ef796..f19586b7 100644 --- a/internal/handlers/restore_backup.go +++ b/internal/handlers/restore_backup.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "go.uber.org/zap" - "time" "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" @@ -13,20 +12,14 @@ import ( "ydbcp/internal/util/xlog" ) -func MakeRBOperationHandler(db db.DBConnector, client client.ClientConnector, config config.Config) types.OperationHandler { +func MakeRBOperationHandler( + db db.DBConnector, client client.ClientConnector, config config.Config, +) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { return RBOperationHandler(ctx, op, db, client, config) } } -func valid(status Ydb.StatusIds_StatusCode) bool { - return status == Ydb.StatusIds_SUCCESS || status == Ydb.StatusIds_CANCELLED -} - -func retriable(status Ydb.StatusIds_StatusCode) bool { - return status == Ydb.StatusIds_OVERLOADED || status == Ydb.StatusIds_UNAVAILABLE -} - func RBOperationHandler( ctx context.Context, operation types.Operation, @@ -34,7 +27,8 @@ func RBOperationHandler( client client.ClientConnector, config config.Config, ) error { - xlog.Info(ctx, "received operation", + xlog.Info( + ctx, "received operation", zap.String("id", operation.GetId().String()), zap.String("type", string(operation.GetType())), zap.String("state", string(operation.GetState())), @@ -42,7 +36,8 @@ func RBOperationHandler( ) if operation.GetType() != types.OperationTypeRB { - return fmt.Errorf("wrong type %s != %s for operation %s", + return fmt.Errorf( + "wrong type %s != %s for operation %s", operation.GetType(), types.OperationTypeRB, types.OperationToString(operation), ) } @@ -54,104 +49,53 @@ func RBOperationHandler( conn, err := client.Open(ctx, types.MakeYdbConnectionString(mr.YdbConnectionParams)) if err != nil { - return fmt.Errorf("error initializing client connector for operation #%s: %w", + return fmt.Errorf( + "error initializing client connector for operation #%s: %w", mr.GetId().String(), err, ) } defer func() { _ = client.Close(ctx, conn) }() - xlog.Info(ctx, "getting operation status", - zap.String("id", mr.Id.String()), - zap.String("type", string(operation.GetType())), - zap.String("ydb_operation_id", mr.YdbOperationId), - ) - - opResponse, err := client.GetOperationStatus(ctx, conn, mr.YdbOperationId) - if err != nil { - if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() { - operation.SetState(types.OperationStateError) - operation.SetMessage("Operation deadline exceeded") - return db.UpdateOperation(ctx, operation) - } - - return fmt.Errorf( - "failed to get operation status for operation #%s, import operation id %s: %w", - mr.GetId().String(), - mr.YdbOperationId, - err, - ) + ydbOpResponse := LookupYdbOperationStatus(ctx, client, conn, operation, mr.YdbOperationId, mr.CreatedAt, config) + if ydbOpResponse.shouldAbortHandler { + operation.SetState(ydbOpResponse.opState) + operation.SetMessage(ydbOpResponse.opMessage) + return db.UpdateOperation(ctx, operation) } - - if retriable(opResponse.GetOperation().GetStatus()) { - xlog.Info(ctx, "received retriable error", - zap.String("id", mr.Id.String()), - zap.String("type", string(operation.GetType())), - zap.String("ydb_operation_id", mr.YdbOperationId), - ) - - return nil + if ydbOpResponse.err != nil { + return ydbOpResponse.err } - - if !valid(opResponse.GetOperation().GetStatus()) { - operation.SetState(types.OperationStateError) - operation.SetMessage(fmt.Sprintf("Error status: %s, issues: %s", - opResponse.GetOperation().GetStatus(), - types.IssuesToString(opResponse.GetOperation().Issues)), - ) - return db.UpdateOperation(ctx, operation) + if ydbOpResponse.opResponse == nil { + return nil } + opResponse := ydbOpResponse.opResponse switch mr.State { case types.OperationStatePending: { if !opResponse.GetOperation().Ready { - if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() { - xlog.Info(ctx, "cancelling operation due to ttl", - zap.String("id", mr.Id.String()), - zap.String("type", string(operation.GetType())), - zap.String("ydb_operation_id", mr.YdbOperationId), - ) - - response, err := client.CancelOperation(ctx, conn, mr.YdbOperationId) + if DeadlineExceeded(mr.CreatedAt, config) { + err = CancelYdbOperation(ctx, client, conn, operation, mr.YdbOperationId, db, "TTL") if err != nil { - return fmt.Errorf( - "error cancelling operation #%s, import operation id %s: %w", - mr.GetId().String(), - mr.YdbOperationId, - err, - ) - } - - if response == nil || response.GetStatus() != Ydb.StatusIds_SUCCESS { - return fmt.Errorf( - "error cancelling operation id %s, import operation id %s, issues: %s", - mr.GetId().String(), - mr.YdbOperationId, - types.IssuesToString(response.GetIssues()), - ) + return err } - - operation.SetState(types.OperationStateCancelling) - operation.SetMessage("Operation deadline exceeded") return db.UpdateOperation(ctx, operation) } - return nil } - if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { operation.SetState(types.OperationStateDone) operation.SetMessage("Success") } else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED { - operation.SetState(types.OperationStateCancelled) - operation.SetMessage("Pending operation wac cancelled") + operation.SetState(types.OperationStateError) + operation.SetMessage("Pending operation was cancelled") } } case types.OperationStateCancelling: { if !opResponse.GetOperation().Ready { - if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() { + if DeadlineExceeded(mr.CreatedAt, config) { operation.SetState(types.OperationStateError) operation.SetMessage("Operation deadline exceeded") return db.UpdateOperation(ctx, operation) @@ -159,7 +103,6 @@ func RBOperationHandler( return nil } - if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { operation.SetState(types.OperationStateDone) operation.SetMessage("Operation was completed despite cancellation") @@ -170,7 +113,8 @@ func RBOperationHandler( } } - xlog.Info(ctx, "forgetting operation", + xlog.Info( + ctx, "forgetting operation", zap.String("id", mr.Id.String()), zap.String("type", string(operation.GetType())), zap.String("ydb_operation_id", mr.YdbOperationId), diff --git a/internal/handlers/restore_backup_test.go b/internal/handlers/restore_backup_test.go index b502aad7..b7173d44 100644 --- a/internal/handlers/restore_backup_test.go +++ b/internal/handlers/restore_backup_test.go @@ -218,11 +218,12 @@ func TestRBOperationHandlerPendingOperationCancelled(t *testing.T) { err := handler(ctx, &rbOp) assert.Empty(t, err) - // check operation status (should be cancelled) + // check operation status (should be error) op, err := dbConnector.GetOperation(ctx, rbOp.Id) assert.Empty(t, err) assert.NotEmpty(t, op) - assert.Equal(t, types.OperationStateCancelled, op.GetState()) + assert.Equal(t, types.OperationStateError, op.GetState()) + assert.Equal(t, "Pending operation was cancelled", op.GetMessage()) // check ydb operation status (should be forgotten) ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) diff --git a/internal/handlers/take_backup.go b/internal/handlers/take_backup.go index c74e4274..ef5b717a 100644 --- a/internal/handlers/take_backup.go +++ b/internal/handlers/take_backup.go @@ -3,16 +3,20 @@ package handlers import ( "context" "fmt" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" + "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" - - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" ) -func MakeTBOperationHandler(db db.DBConnector, client client.ClientConnector) types.OperationHandler { +func MakeTBOperationHandler( + db db.DBConnector, client client.ClientConnector, config config.Config, + getQueryBuilder func() queries.WriteTableQuery, +) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return TBOperationHandler(ctx, op, db, client) + return TBOperationHandler(ctx, op, db, client, config, getQueryBuilder) } } @@ -21,6 +25,8 @@ func TBOperationHandler( operation types.Operation, db db.DBConnector, client client.ClientConnector, + config config.Config, + getQueryBuilder func() queries.WriteTableQuery, ) error { if operation.GetType() != types.OperationTypeTB { return fmt.Errorf("wrong operation type %s != %s", operation.GetType(), types.OperationTypeTB) @@ -37,90 +43,80 @@ func TBOperationHandler( defer func() { _ = client.Close(ctx, conn) }() - //lookup YdbServerOperationStatus - opInfo, err := client.GetOperationStatus(ctx, conn, tb.YdbOperationId) - if err != nil { - //skip, write log - //upsert message into operation? - return fmt.Errorf( - "failed to lookup operation status for operation id %s, export operation id %s: %w", - tb.GetId().String(), - tb.YdbOperationId, - err, - ) + ydbOpResponse := LookupYdbOperationStatus(ctx, client, conn, operation, tb.YdbOperationId, tb.CreatedAt, config) + if ydbOpResponse.shouldAbortHandler { + operation.SetState(ydbOpResponse.opState) + operation.SetMessage(ydbOpResponse.opMessage) + return db.UpdateOperation(ctx, operation) + } + if ydbOpResponse.err != nil { + return ydbOpResponse.err + } + if ydbOpResponse.opResponse == nil { + return nil + } + opResponse := ydbOpResponse.opResponse + + backupToWrite := types.Backup{ + ID: tb.BackupId, + Status: types.BackupStateUnknown, } switch tb.State { case types.OperationStatePending: { - if !opInfo.GetOperation().Ready { - //if pending: return op, nil - //if backup deadline failed: cancel operation. (skip for now) - return nil - } - if opInfo.GetOperation().Status == Ydb.StatusIds_SUCCESS { - //upsert into operations (id, status) values (id, done)? - //db.StartUpdate() - //.WithUpdateBackup() - //.WithYUpdateOperation() - err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateAvailable) - if err != nil { - return fmt.Errorf( - "error updating backup table, operation id %s: %w", - tb.GetId().String(), - err, + if !opResponse.GetOperation().Ready { + if DeadlineExceeded(tb.CreatedAt, config) { + err = CancelYdbOperation(ctx, client, conn, operation, tb.YdbOperationId, db, "TTL") + if err != nil { + return err + } + backupToWrite.Status = types.BackupStateError + return db.ExecuteUpsert( + ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) + } else { + return nil } + } else if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { + backupToWrite.Status = types.BackupStateAvailable operation.SetState(types.OperationStateDone) operation.SetMessage("Success") + } else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED { + backupToWrite.Status = types.BackupStateError + operation.SetState(types.OperationStateError) + operation.SetMessage("got CANCELLED status for PENDING operation") } else { - //op.State = Error - //upsert into operations (id, status, message) values (id, error, message)? - err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateError) - if err != nil { - return fmt.Errorf( - "error updating backup table, operation id %s: %w", - tb.GetId().String(), - err, - ) - } - if opInfo.GetOperation().Status == Ydb.StatusIds_CANCELLED { - operation.SetMessage("got CANCELLED status for PENDING operation") - } else { - operation.SetMessage(types.IssuesToString(opInfo.GetOperation().Issues)) - } + backupToWrite.Status = types.BackupStateError operation.SetState(types.OperationStateError) + operation.SetMessage(types.IssuesToString(opResponse.GetOperation().Issues)) } } case types.OperationStateCancelling: { - if !opInfo.GetOperation().Ready { - //can this hang in cancelling state? - return nil - } - if opInfo.GetOperation().Status == Ydb.StatusIds_CANCELLED { - //upsert into operations (id, status, message) values (id, cancelled)? - err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateCancelled) - if err != nil { - return fmt.Errorf( - "error updating backup table, operation id %s: %w", - tb.GetId().String(), - err, + if !opResponse.GetOperation().Ready { + if DeadlineExceeded(tb.CreatedAt, config) { + backupToWrite.Status = types.BackupStateError + operation.SetState(types.OperationStateError) + operation.SetMessage("Operation deadline exceeded") + return db.ExecuteUpsert( + ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), ) + } else { + return nil } + } + if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { + backupToWrite.Status = types.BackupStateAvailable + operation.SetState(types.OperationStateDone) + operation.SetMessage("Operation was completed despite cancellation") + } else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED { + backupToWrite.Status = types.BackupStateCancelled operation.SetState(types.OperationStateCancelled) operation.SetMessage("Success") } else { - //upsert into operations (id, status, message) values (id, error, error.message)? - err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateError) - if err != nil { - return fmt.Errorf( - "error updating backup table, operation id %s: %w", - tb.GetId().String(), - err, - ) - } + backupToWrite.Status = types.BackupStateError operation.SetState(types.OperationStateError) - operation.SetMessage(types.IssuesToString(opInfo.GetOperation().Issues)) + operation.SetMessage(types.IssuesToString(opResponse.GetOperation().Issues)) } } } @@ -142,5 +138,7 @@ func TBOperationHandler( types.IssuesToString(response.GetIssues()), ) } - return db.UpdateOperation(ctx, operation) + return db.ExecuteUpsert( + ctx, getQueryBuilder().WithUpdateOperation(operation).WithUpdateBackup(backupToWrite), + ) } diff --git a/internal/handlers/take_backup_test.go b/internal/handlers/take_backup_test.go index 41871ce1..2eeded9c 100644 --- a/internal/handlers/take_backup_test.go +++ b/internal/handlers/take_backup_test.go @@ -2,8 +2,10 @@ package handlers import ( "context" - "sync" "testing" + "time" + "ydbcp/internal/config" + "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" @@ -14,11 +16,46 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" ) -func TestTBOperationHandler(t *testing.T) { - var wg sync.WaitGroup - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func TestTBOperationHandlerInvalidOperationResponse(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + backupMap[backupID] = backup + opMap[opId] = &tbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector() + // try to handle tb operation with non-existing ydb operation id + handler := MakeTBOperationHandler(dbConnector, clientConnector, config.Config{}, queries.MakeWriteTableQueryMock) + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + assert.Equal(t, "Error status: NOT_FOUND, issues: message:\"operation not found\"", op.GetMessage()) +} + +func TestTBOperationHandlerDeadlineExceededForPendingOperation(t *testing.T) { + ctx := context.Background() opId := types.GenerateObjectID() backupID := types.GenerateObjectID() tbOp := types.TakeBackupOperation{ @@ -34,6 +71,408 @@ func TestTBOperationHandler(t *testing.T) { Status: types.BackupStatePending, } + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := MakeTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 0, + }, queries.MakeWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be cancelled because of deadline exceeded) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelling, op.GetState()) + assert.Equal(t, "Operation deadline exceeded", op.GetMessage()) + + // check backup status (should be error) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateError, b.Status) + + // check ydb operation status (should be cancelled) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_CANCELLED, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestTBOperationHandlerPendingOperationInProgress(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := MakeTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.MakeWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be pending) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStatePending, op.GetState()) + assert.Equal(t, "", op.GetMessage()) + + // check backup status (should be in pending) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStatePending, b.Status) + + // check ydb operation status (should be in progress) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_SUCCESS, ydbOpStatus.GetOperation().GetStatus()) + assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady()) +} + +func TestTBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: true, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := MakeTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.MakeWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be done) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateDone, op.GetState()) + + // check backup status (should be done) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateAvailable, b.Status) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestTBOperationHandlerPendingOperationCancelled(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: true, + Status: Ydb.StatusIds_CANCELLED, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := MakeTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.MakeWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be error) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + + // check backup status (should be error) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateError, b.Status) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestTBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := MakeTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 0, + }, queries.MakeWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be failed because of deadline exceeded) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + assert.Equal(t, "Operation deadline exceeded", op.GetMessage()) + + // check backup status (should be error) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateError, b.Status) + + // check ydb operation status (should be in progress) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_SUCCESS, ydbOpStatus.GetOperation().GetStatus()) + assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady()) +} + +func TestTBOperationHandlerCancellingOperationInProgress(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := MakeTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.MakeWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be the same as before) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelling, op.GetState()) + + // check backup status (should be pending) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStatePending, b.Status) + + // check ydb operation status (should be in progress) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_SUCCESS, ydbOpStatus.GetOperation().GetStatus()) + assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady()) +} + +func TestTBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + ydbOp := &Ydb_Operations.Operation{ Id: "1", Ready: true, @@ -55,23 +494,162 @@ func TestTBOperationHandler(t *testing.T) { client.WithOperations(ydbOpMap), ) - handler := MakeTBOperationHandler(dbConnector, clientConnector) + handler := MakeTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.MakeWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + // check operation status (should be done) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateDone, op.GetState()) + assert.Equal(t, "Operation was completed despite cancellation", op.GetMessage()) + + // check backup status (should be available) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateAvailable, b.Status) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestTBOperationHandlerCancellingOperationCancelled(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: true, + Status: Ydb.StatusIds_CANCELLED, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := MakeTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.MakeWriteTableQueryMock, + ) err := handler(ctx, &tbOp) assert.Empty(t, err) - result, err := dbConnector.GetOperation(ctx, opId) + // check operation status (should be cancelled) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelled, op.GetState()) + + // check backup status (should be cancelled) + b, err := dbConnector.GetBackup(ctx, backupID) assert.Empty(t, err) - assert.Equal( - t, result.GetState(), types.OperationStateDone, - "operation state should be Done", + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStateCancelled, b.Status) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) + +} + +func TestTBOperationHandlerRetriableErrorForPendingOperation(t *testing.T) { + ctx := context.Background() + opId := types.GenerateObjectID() + backupID := types.GenerateObjectID() + tbOp := types.TakeBackupOperation{ + Id: opId, + BackupId: backupID, + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + CreatedAt: time.Now(), + } + backup := types.Backup{ + ID: backupID, + Status: types.BackupStatePending, + } + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_UNAVAILABLE, + Issues: nil, + } + + opMap := make(map[types.ObjectID]types.Operation) + backupMap := make(map[types.ObjectID]types.Backup) + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + backupMap[backupID] = backup + opMap[opId] = &tbOp + ydbOpMap["1"] = ydbOp + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), ) + clientConnector := client.NewMockClientConnector( + client.WithOperations(ydbOpMap), + ) + + handler := MakeTBOperationHandler( + dbConnector, clientConnector, config.Config{ + OperationTtlSeconds: 10000, + }, queries.MakeWriteTableQueryMock, + ) + + err := handler(ctx, &tbOp) + assert.Empty(t, err) + + // check operation status (should be the same as before) + op, err := dbConnector.GetOperation(ctx, tbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStatePending, op.GetState()) - backups, err2 := dbConnector.SelectBackupsByStatus(ctx, types.BackupStateAvailable) - assert.Empty(t, err2) - assert.Equal(t, 1, len(backups)) - assert.Equal(t, types.BackupStateAvailable, backups[0].Status) + // check backup status (should be pending) + b, err := dbConnector.GetBackup(ctx, backupID) + assert.Empty(t, err) + assert.NotEmpty(t, b) + assert.Equal(t, types.BackupStatePending, b.Status) - cancel() - wg.Wait() + // check ydb operation status + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, tbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_UNAVAILABLE, ydbOpStatus.GetOperation().GetStatus()) } diff --git a/internal/handlers/utils.go b/internal/handlers/utils.go new file mode 100644 index 00000000..af30016b --- /dev/null +++ b/internal/handlers/utils.go @@ -0,0 +1,129 @@ +package handlers + +import ( + "context" + "fmt" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" + "github.com/ydb-platform/ydb-go-sdk/v3" + "go.uber.org/zap" + "time" + "ydbcp/internal/config" + "ydbcp/internal/connectors/client" + "ydbcp/internal/connectors/db" + "ydbcp/internal/types" + "ydbcp/internal/util/xlog" +) + +func DeadlineExceeded(createdAt time.Time, config config.Config) bool { + return time.Since(createdAt) > time.Duration(config.OperationTtlSeconds)*time.Second +} +func IsValidStatus(status Ydb.StatusIds_StatusCode) bool { + return status == Ydb.StatusIds_SUCCESS || status == Ydb.StatusIds_CANCELLED +} + +func IsRetriableStatus(status Ydb.StatusIds_StatusCode) bool { + return status == Ydb.StatusIds_OVERLOADED || status == Ydb.StatusIds_UNAVAILABLE +} + +type LookupYdbOperationResponse struct { + opResponse *Ydb_Operations.GetOperationResponse + shouldAbortHandler bool + + opState types.OperationState + opMessage string + err error +} + +func LookupYdbOperationStatus( + ctx context.Context, client client.ClientConnector, conn *ydb.Driver, operation types.Operation, + ydbOperationId string, + createdAt time.Time, config config.Config, +) LookupYdbOperationResponse { + xlog.Info( + ctx, "getting operation status", + zap.String("id", operation.GetId().String()), + zap.String("type", string(operation.GetType())), + zap.String("ydb_operation_id", ydbOperationId), + ) + opResponse, err := client.GetOperationStatus(ctx, conn, ydbOperationId) + if err != nil { + if DeadlineExceeded(createdAt, config) { + return LookupYdbOperationResponse{ + shouldAbortHandler: true, + opState: types.OperationStateError, + opMessage: "Operation deadline exceeded", + err: nil, + } + } + + return LookupYdbOperationResponse{ + err: fmt.Errorf( + "failed to get operation status for operation #%s, import operation id %s: %w", + operation.GetId().String(), ydbOperationId, err, + ), + } + } + + if IsRetriableStatus(opResponse.GetOperation().GetStatus()) { + xlog.Info( + ctx, "received retriable error", + zap.String("id", operation.GetId().String()), + zap.String("type", string(operation.GetType())), + zap.String("ydb_operation_id", ydbOperationId), + ) + return LookupYdbOperationResponse{} + } + + if !IsValidStatus(opResponse.GetOperation().GetStatus()) { + return LookupYdbOperationResponse{ + shouldAbortHandler: true, + opState: types.OperationStateError, + opMessage: fmt.Sprintf( + "Error status: %s, issues: %s", + opResponse.GetOperation().GetStatus(), + types.IssuesToString(opResponse.GetOperation().Issues), + ), + } + } + + return LookupYdbOperationResponse{ + opResponse: opResponse, + err: nil, + } +} + +func CancelYdbOperation( + ctx context.Context, client client.ClientConnector, conn *ydb.Driver, operation types.Operation, + ydbOperationId string, db db.DBConnector, reason string, +) error { + xlog.Info( + ctx, "cancelling operation", zap.String("reason", reason), + zap.String("id", operation.GetId().String()), + zap.String("type", string(operation.GetType())), + zap.String("ydb_operation_id", ydbOperationId), + ) + + response, err := client.CancelOperation(ctx, conn, ydbOperationId) + if err != nil { + return fmt.Errorf( + "error cancelling operation #%s, import operation id %s: %w", + operation.GetId().String(), + ydbOperationId, + err, + ) + } + + if response == nil || response.GetStatus() != Ydb.StatusIds_SUCCESS { + return fmt.Errorf( + "error cancelling operation id %s, import operation id %s, issues: %s", + operation.GetId().String(), + ydbOperationId, + types.IssuesToString(response.GetIssues()), + ) + } + + operation.SetState(types.OperationStateCancelling) + operation.SetMessage("Operation deadline exceeded") + return nil +} diff --git a/internal/types/backup.go b/internal/types/backup.go index e63ea567..4bc176b5 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -110,6 +110,7 @@ type TakeBackupOperation struct { YdbOperationId string SourcePaths []string SourcePathToExclude []string + CreatedAt time.Time } func (o *TakeBackupOperation) GetId() ObjectID {