Skip to content

Commit

Permalink
Improve TakeBackupOperation handler
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Jul 24, 2024
1 parent b2c9e1e commit b0e6571
Show file tree
Hide file tree
Showing 13 changed files with 892 additions and 213 deletions.
3 changes: 2 additions & 1 deletion cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ func main() {

handlersRegistry := processor.NewOperationHandlerRegistry()
err = handlersRegistry.Add(
types.OperationTypeTB, handlers.MakeTBOperationHandler(dbConnector, client.NewClientYdbConnector()),
types.OperationTypeTB,
handlers.MakeTBOperationHandler(dbConnector, client.NewClientYdbConnector(), configInstance),
)
if err != nil {
xlog.Error(ctx, "failed to register TB handler", zap.Error(err))
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ require (
github.com/golang-jwt/jwt/v4 v4.4.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
)
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,28 +103,28 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand All @@ -147,8 +147,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA=
google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
11 changes: 10 additions & 1 deletion internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type DBConnector interface {
CreateOperation(context.Context, types.Operation) (types.ObjectID, error)
CreateBackup(context.Context, types.Backup) (types.ObjectID, error)
UpdateBackup(context context.Context, id types.ObjectID, backupState string) error
UpdateOperationAndBackup(ctx context.Context, operation types.Operation, backup types.Backup) error
Close()
}

Expand Down Expand Up @@ -333,5 +334,13 @@ func (d *YdbConnector) UpdateBackup(
ID: id,
Status: backupStatus,
}
return d.ExecuteUpsert(context, queries.MakeWriteTableQuery(queries.WithCreateBackup(backup)))
return d.ExecuteUpsert(context, queries.MakeWriteTableQuery(queries.WithUpdateBackup(backup)))
}

func (d *YdbConnector) UpdateOperationAndBackup(
ctx context.Context, operation types.Operation, backup types.Backup,
) error {
return d.ExecuteUpsert(
ctx, queries.MakeWriteTableQuery(queries.WithUpdateBackup(backup), queries.WithUpdateOperation(operation)),
)
}
29 changes: 29 additions & 0 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,24 @@ func (c *MockDBConnector) UpdateOperation(
return nil
}

func (c *MockDBConnector) UpdateOperationAndBackup(
_ context.Context, op types.Operation, backup types.Backup,
) error {
if _, exist := c.operations[op.GetId()]; !exist {
return fmt.Errorf(
"update nonexistent operation %s", types.OperationToString(op),
)
}
c.operations[op.GetId()] = op
if _, exist := c.backups[backup.ID]; !exist {
return fmt.Errorf(
"update nonexistent backup %s", backup.String(),
)
}
c.backups[backup.ID] = backup
return nil
}

func (c *MockDBConnector) CreateOperation(
_ context.Context, op types.Operation,
) (types.ObjectID, error) {
Expand All @@ -140,6 +158,17 @@ func (c *MockDBConnector) GetOperation(
)
}

func (c *MockDBConnector) GetBackup(
_ context.Context, backupID types.ObjectID,
) (types.Backup, error) {
if backup, exist := c.backups[backupID]; exist {
return backup, nil
}
return types.Backup{}, fmt.Errorf(
"operation not found, id %s", backupID.String(),
)
}

func (d *MockDBConnector) SelectOperations(
ctx context.Context, queryBuilder queries.ReadTableQuery,
) ([]types.Operation, error) {
Expand Down
42 changes: 20 additions & 22 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package db

import (
"fmt"
"github.com/google/uuid"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"time"
"ydbcp/internal/types"
)

Expand Down Expand Up @@ -67,24 +67,26 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {

func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
var (
operationId types.ObjectID
containerId string
operationType string
operationId types.ObjectID
containerId string
operationType string
createdAt int64
backupId types.ObjectID
ydbOperationId string
database string

operationStateBuf *string
backupId *types.ObjectID
ydbOperationId *string
database *string
)
err := res.ScanNamed(
named.Required("id", &operationId),
named.Required("container_id", &containerId),
named.Required("type", &operationType),
named.Required("created_at", &createdAt),
named.Required("backup_id", &backupId),
named.Required("operation_id", &ydbOperationId),
named.Required("database", &database),

named.Optional("status", &operationStateBuf),
named.Optional("backup_id", &backupId),
named.Optional("operation_id", &ydbOperationId),
named.Optional("database", &database),
)
if err != nil {
return nil, err
Expand All @@ -94,30 +96,26 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
operationState = types.OperationState(*operationStateBuf)
}
if operationType == string(types.OperationTypeTB) {
if backupId == nil || database == nil || ydbOperationId == nil {
return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String())
}
return &types.TakeBackupOperation{
Id: operationId,
BackupId: types.ObjectID(*backupId),
BackupId: types.ObjectID(backupId),
ContainerID: containerId,
State: operationState,
Message: "",
YdbConnectionParams: types.GetYdbConnectionParams(*database),
YdbOperationId: *ydbOperationId,
YdbConnectionParams: types.GetYdbConnectionParams(database),
YdbOperationId: ydbOperationId,
CreatedAt: time.Unix(createdAt, 0),
}, nil
} else if operationType == string(types.OperationTypeRB) {
if backupId == nil || database == nil || ydbOperationId == nil {
return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String())
}
return &types.RestoreBackupOperation{
Id: operationId,
BackupId: types.ObjectID(*backupId),
BackupId: types.ObjectID(backupId),
ContainerID: containerId,
State: operationState,
Message: "",
YdbConnectionParams: types.GetYdbConnectionParams(*database),
YdbOperationId: *ydbOperationId,
YdbConnectionParams: types.GetYdbConnectionParams(database),
YdbOperationId: ydbOperationId,
CreatedAt: time.Unix(createdAt, 0),
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/connectors/db/yql/schema/create_tables.yql
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ CREATE TABLE Operations (
type String NOT NULL,
container_id String NOT NULL,
database String NOT NULL,
backup_id UUID,
backup_id UUID NOT NULL,

initiated String,
created_at Timestamp,
created_at Timestamp NOT NULL,
completed_at Timestamp,

status String,

paths String,
operation_id String,
operation_id String NOT NULL,

INDEX idx_cc GLOBAL ON (container_id, created_at, id),
INDEX idx_cbc GLOBAL ON (container_id, backup_id, created_at, id),
Expand Down
Loading

0 comments on commit b0e6571

Please sign in to comment.