Skip to content

Commit

Permalink
Improve TakeBackupOperation handler
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Jul 25, 2024
1 parent b2c9e1e commit 1db45c3
Show file tree
Hide file tree
Showing 16 changed files with 980 additions and 247 deletions.
5 changes: 4 additions & 1 deletion cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ func main() {

handlersRegistry := processor.NewOperationHandlerRegistry()
err = handlersRegistry.Add(
types.OperationTypeTB, handlers.MakeTBOperationHandler(dbConnector, client.NewClientYdbConnector()),
types.OperationTypeTB,
handlers.MakeTBOperationHandler(
dbConnector, client.NewClientYdbConnector(), configInstance, queries.MakeWriteTableQuery,
),
)
if err != nil {
xlog.Error(ctx, "failed to register TB handler", zap.Error(err))
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ require (
github.com/golang-jwt/jwt/v4 v4.4.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
)
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,28 +103,28 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand All @@ -147,8 +147,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA=
google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
17 changes: 13 additions & 4 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type DBConnector interface {
CreateOperation(context.Context, types.Operation) (types.ObjectID, error)
CreateBackup(context.Context, types.Backup) (types.ObjectID, error)
UpdateBackup(context context.Context, id types.ObjectID, backupState string) error
ExecuteUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error
Close()
}

Expand Down Expand Up @@ -300,14 +301,14 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) (
func (d *YdbConnector) UpdateOperation(
ctx context.Context, operation types.Operation,
) error {
return d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithUpdateOperation(operation)))
return d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery().WithUpdateOperation(operation))
}

func (d *YdbConnector) CreateOperation(
ctx context.Context, operation types.Operation,
) (types.ObjectID, error) {
operation.SetId(types.GenerateObjectID())
err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithCreateOperation(operation)))
err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery().WithCreateOperation(operation))
if err != nil {
return types.ObjectID{}, err
}
Expand All @@ -319,7 +320,7 @@ func (d *YdbConnector) CreateBackup(
) (types.ObjectID, error) {
id := types.GenerateObjectID()
backup.ID = id
err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithCreateBackup(backup)))
err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery().WithCreateBackup(backup))
if err != nil {
return types.ObjectID{}, err
}
Expand All @@ -333,5 +334,13 @@ func (d *YdbConnector) UpdateBackup(
ID: id,
Status: backupStatus,
}
return d.ExecuteUpsert(context, queries.MakeWriteTableQuery(queries.WithCreateBackup(backup)))
return d.ExecuteUpsert(context, queries.MakeWriteTableQuery().WithUpdateBackup(backup))
}

func (d *YdbConnector) UpdateOperationAndBackup(
ctx context.Context, operation types.Operation, backup types.Backup,
) error {
return d.ExecuteUpsert(
ctx, queries.MakeWriteTableQuery().WithUpdateBackup(backup).WithUpdateOperation(operation),
)
}
40 changes: 38 additions & 2 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,24 @@ func (c *MockDBConnector) UpdateOperation(
return nil
}

func (c *MockDBConnector) UpdateOperationAndBackup(
_ context.Context, op types.Operation, backup types.Backup,
) error {
if _, exist := c.operations[op.GetId()]; !exist {
return fmt.Errorf(
"update nonexistent operation %s", types.OperationToString(op),
)
}
c.operations[op.GetId()] = op
if _, exist := c.backups[backup.ID]; !exist {
return fmt.Errorf(
"update nonexistent backup %s", backup.String(),
)
}
c.backups[backup.ID] = backup
return nil
}

func (c *MockDBConnector) CreateOperation(
_ context.Context, op types.Operation,
) (types.ObjectID, error) {
Expand All @@ -140,8 +158,26 @@ func (c *MockDBConnector) GetOperation(
)
}

func (d *MockDBConnector) SelectOperations(
ctx context.Context, queryBuilder queries.ReadTableQuery,
func (c *MockDBConnector) GetBackup(
_ context.Context, backupID types.ObjectID,
) (types.Backup, error) {
if backup, exist := c.backups[backupID]; exist {
return backup, nil
}
return types.Backup{}, fmt.Errorf(
"operation not found, id %s", backupID.String(),
)
}

func (c *MockDBConnector) SelectOperations(
_ context.Context, _ queries.ReadTableQuery,
) ([]types.Operation, error) {
return nil, errors.New("Do not call this method")
}

func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.WriteTableQuery) error {
queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock)
c.operations[queryBuilderMock.Operation.GetId()] = queryBuilderMock.Operation
c.backups[queryBuilderMock.Backup.ID] = queryBuilderMock.Backup
return nil
}
42 changes: 20 additions & 22 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package db

import (
"fmt"
"github.com/google/uuid"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"time"
"ydbcp/internal/types"
)

Expand Down Expand Up @@ -67,24 +67,26 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {

func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
var (
operationId types.ObjectID
containerId string
operationType string
operationId types.ObjectID
containerId string
operationType string
createdAt int64
backupId types.ObjectID
ydbOperationId string
database string

operationStateBuf *string
backupId *types.ObjectID
ydbOperationId *string
database *string
)
err := res.ScanNamed(
named.Required("id", &operationId),
named.Required("container_id", &containerId),
named.Required("type", &operationType),
named.Required("created_at", &createdAt),
named.Required("backup_id", &backupId),
named.Required("operation_id", &ydbOperationId),
named.Required("database", &database),

named.Optional("status", &operationStateBuf),
named.Optional("backup_id", &backupId),
named.Optional("operation_id", &ydbOperationId),
named.Optional("database", &database),
)
if err != nil {
return nil, err
Expand All @@ -94,30 +96,26 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
operationState = types.OperationState(*operationStateBuf)
}
if operationType == string(types.OperationTypeTB) {
if backupId == nil || database == nil || ydbOperationId == nil {
return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String())
}
return &types.TakeBackupOperation{
Id: operationId,
BackupId: types.ObjectID(*backupId),
BackupId: types.ObjectID(backupId),
ContainerID: containerId,
State: operationState,
Message: "",
YdbConnectionParams: types.GetYdbConnectionParams(*database),
YdbOperationId: *ydbOperationId,
YdbConnectionParams: types.GetYdbConnectionParams(database),
YdbOperationId: ydbOperationId,
CreatedAt: time.Unix(createdAt, 0),
}, nil
} else if operationType == string(types.OperationTypeRB) {
if backupId == nil || database == nil || ydbOperationId == nil {
return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String())
}
return &types.RestoreBackupOperation{
Id: operationId,
BackupId: types.ObjectID(*backupId),
BackupId: types.ObjectID(backupId),
ContainerID: containerId,
State: operationState,
Message: "",
YdbConnectionParams: types.GetYdbConnectionParams(*database),
YdbOperationId: *ydbOperationId,
YdbConnectionParams: types.GetYdbConnectionParams(database),
YdbOperationId: ydbOperationId,
CreatedAt: time.Unix(createdAt, 0),
}, nil
}

Expand Down
52 changes: 25 additions & 27 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (

type WriteTableQuery interface {
FormatQuery(ctx context.Context) (*FormatQueryResult, error)
WithCreateBackup(backup types.Backup) WriteTableQuery
WithCreateOperation(operation types.Operation) WriteTableQuery
WithUpdateBackup(backup types.Backup) WriteTableQuery
WithUpdateOperation(operation types.Operation) WriteTableQuery
}

type WriteTableQueryImpl struct {
Expand Down Expand Up @@ -123,42 +127,36 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl
return d
}

type WriteTableQueryOption func(*WriteTableQueryImpl)
type WriteTableQueryImplOption func(*WriteTableQueryImpl)

func MakeWriteTableQuery(options ...WriteTableQueryOption) *WriteTableQueryImpl {
d := &WriteTableQueryImpl{}
for _, opt := range options {
opt(d)
}
return d
type WriteTableQueryMockOption func(*WriteTableQueryMock)

func MakeWriteTableQuery() WriteTableQuery {
return &WriteTableQueryImpl{}
}

func WithCreateBackup(backup types.Backup) WriteTableQueryOption {
return func(d *WriteTableQueryImpl) {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index))
}
func (d *WriteTableQueryImpl) WithCreateBackup(backup types.Backup) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index))
return d
}

func WithUpdateBackup(backup types.Backup) WriteTableQueryOption {
return func(d *WriteTableQueryImpl) {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildUpdateBackupQuery(backup, index))
}
func (d *WriteTableQueryImpl) WithUpdateBackup(backup types.Backup) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildUpdateBackupQuery(backup, index))
return d
}

func WithUpdateOperation(operation types.Operation) WriteTableQueryOption {
return func(d *WriteTableQueryImpl) {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index))
}
func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index))
return d
}

func WithCreateOperation(operation types.Operation) WriteTableQueryOption {
return func(d *WriteTableQueryImpl) {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(operation, index))
}
func (d *WriteTableQueryImpl) WithCreateOperation(operation types.Operation) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(operation, index))
return d
}

func (d *WriteSingleTableQueryImpl) DeclareParameters() string {
Expand Down
39 changes: 39 additions & 0 deletions internal/connectors/db/yql/queries/write_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package queries

import (
"context"
"ydbcp/internal/types"
)

type WriteTableQueryMock struct {
Operation types.Operation
Backup types.Backup
}

func MakeWriteTableQueryMock() WriteTableQuery {
return &WriteTableQueryMock{}
}

func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult, error) {
return &FormatQueryResult{}, nil
}

func (w *WriteTableQueryMock) WithCreateBackup(backup types.Backup) WriteTableQuery {
w.Backup = backup
return w
}

func (w *WriteTableQueryMock) WithCreateOperation(operation types.Operation) WriteTableQuery {
w.Operation = operation
return w
}

func (w *WriteTableQueryMock) WithUpdateBackup(backup types.Backup) WriteTableQuery {
w.Backup = backup
return w
}

func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation) WriteTableQuery {
w.Operation = operation
return w
}
7 changes: 3 additions & 4 deletions internal/connectors/db/yql/queries/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ UPSERT INTO Operations (id, status, message) VALUES ($id_1, $status_1, $message_
ID: backupId,
Status: "Available",
}
builder := MakeWriteTableQuery(
WithUpdateBackup(backup),
WithUpdateOperation(&op),
)
builder := MakeWriteTableQuery().
WithUpdateBackup(backup).
WithUpdateOperation(&op)
var (
queryParams = table.NewQueryParameters(
table.ValueParam("$id_0", table_types.UUIDValue(backupId)),
Expand Down
Loading

0 comments on commit 1db45c3

Please sign in to comment.