Skip to content

Commit

Permalink
fix(ydbcp): make conditional operation update
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Sep 27, 2024
1 parent fe60c5d commit be2939b
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 10 deletions.
69 changes: 69 additions & 0 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ type DBConnector interface {
)
ActiveOperations(context.Context) ([]types.Operation, error)
UpdateOperation(context.Context, types.Operation) error
UpdateOperationIfStateUnchanged(context.Context, types.Operation, types.OperationState) error
CreateOperation(context.Context, types.Operation) (string, error)
CreateBackup(context.Context, types.Backup) (string, error)
UpdateBackup(context context.Context, id string, backupState string) error
ExecuteUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error
ExecuteConditionalUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error
Close(context.Context)
}

Expand Down Expand Up @@ -228,6 +230,59 @@ func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.W
return nil
}

func (d *YdbConnector) ExecuteConditionalUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error {
queryFormat, err := queryBuilder.FormatQuery(ctx)
if err != nil {
return err
}

var affectedRows int

err = d.GetTableClient().Do(
ctx, func(ctx context.Context, s table.Session) (err error) {
var res result.Result

xlog.Error(ctx, queryFormat.QueryText)

_, res, err = s.Execute(
ctx,
writeTx,
queryFormat.QueryText,
queryFormat.QueryParams,
)
if err != nil {
return err
}

defer func(res result.Result) {
err = res.Close()
if err != nil {
xlog.Error(ctx, "Error closing transaction result")
}
}(res) // result must be closed

if !res.NextResultSet(ctx) || !res.NextRow() {
affectedRows = 0
} else {
affectedRows = res.CurrentResultSet().RowCount()
}

return res.Err()
},
)
if err != nil {
xlog.Error(ctx, "Error executing query", zap.Error(err))
return err
}

if affectedRows == 0 {
xlog.Error(ctx, "upsert wasn't applied, 0 rows were affected")
return errors.New("upsert wasn't applied, 0 rows were affected")
}

return nil
}

func (d *YdbConnector) SelectBackups(
ctx context.Context, queryBuilder queries.ReadTableQuery,
) ([]*types.Backup, error) {
Expand Down Expand Up @@ -297,6 +352,20 @@ func (d *YdbConnector) UpdateOperation(
return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(operation))
}

func (d *YdbConnector) UpdateOperationIfStateUnchanged(
ctx context.Context, operation types.Operation, prevState types.OperationState) error {

var conditionalOperationUpdate = fmt.Sprintf(
`$rows_to_update = SELECT id, '%s' AS status, '%s' AS message
FROM Operations WHERE id = '%s' AND status = '%s';
SELECT * FROM $rows_to_update;
UPDATE Operations ON SELECT * FROM $rows_to_update;`,
operation.GetState().String(), operation.GetMessage(), operation.GetID(), prevState.String(),
)

return d.ExecuteConditionalUpsert(ctx, queries.NewWriteTableQuery().WithRawQuery(conditionalOperationUpdate))
}

func (d *YdbConnector) CreateOperation(
ctx context.Context, operation types.Operation,
) (string, error) {
Expand Down
17 changes: 17 additions & 0 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,20 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.
c.backupSchedules[queryBuilderMock.BackupSchedule.ID] = queryBuilderMock.BackupSchedule
return nil
}

func (c *MockDBConnector) UpdateOperationIfStateUnchanged(ctx context.Context, op types.Operation, prevState types.OperationState) error {
c.guard.Lock()
defer c.guard.Unlock()

if _, exist := c.operations[op.GetID()]; !exist {
return fmt.Errorf("update nonexistent operation %s", types.OperationToString(op))
}
if c.operations[op.GetID()].GetState() == prevState {
c.operations[op.GetID()] = op.Copy()
}
return nil
}

func (c *MockDBConnector) ExecuteConditionalUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error {
return c.ExecuteUpsert(ctx, queryBuilder)
}
16 changes: 15 additions & 1 deletion internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

type WriteTableQuery interface {
FormatQuery(ctx context.Context) (*FormatQueryResult, error)
WithRawQuery(rawQuery string) WriteTableQuery
WithCreateBackup(backup types.Backup) WriteTableQuery
WithCreateOperation(operation types.Operation) WriteTableQuery
WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery
Expand All @@ -30,6 +31,7 @@ type WriteTableQueryImpl struct {
}

type WriteSingleTableQueryImpl struct {
rawQuery *string
index int
tableName string
upsertFields []string
Expand Down Expand Up @@ -356,6 +358,15 @@ func NewWriteTableQuery() WriteTableQuery {
return &WriteTableQueryImpl{}
}

func (d *WriteTableQueryImpl) WithRawQuery(rawQuery string) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, WriteSingleTableQueryImpl{
index: index,
rawQuery: &rawQuery,
})
return d
}

func (d *WriteTableQueryImpl) WithCreateBackup(backup types.Backup) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index))
Expand Down Expand Up @@ -441,7 +452,10 @@ func (d *WriteTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResu
allParams := make([]table.ParameterOption, 0)
for _, t := range d.tableQueries {
var err error
if t.updateParam == nil {

if t.rawQuery != nil {
queryStrings = append(queryStrings, *t.rawQuery)
} else if t.updateParam == nil {
err = ProcessUpsertQuery(&queryStrings, &allParams, &t)
} else {
err = ProcessUpdateQuery(&queryStrings, &allParams, &t)
Expand Down
4 changes: 4 additions & 0 deletions internal/connectors/db/yql/queries/write_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func NewWriteTableQueryMock() WriteTableQuery {
return &WriteTableQueryMock{}
}

func (w *WriteTableQueryMock) WithRawQuery(_ string) WriteTableQuery {
return w
}

func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult, error) {
return &FormatQueryResult{}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions internal/handlers/restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ func RBOperationHandler(
{
if !opResponse.GetOperation().Ready {
if deadlineExceeded(mr.Audit.CreatedAt, config) {
prevState := operation.GetState()
operation.SetState(types.OperationStateStartCancelling)
operation.SetMessage("Operation deadline exceeded")
return db.UpdateOperationIfStateUnchanged(ctx, operation, prevState)
}

return db.UpdateOperation(ctx, operation)
Expand Down
11 changes: 6 additions & 5 deletions internal/handlers/take_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package handlers
import (
"context"
"fmt"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
Expand All @@ -11,11 +15,6 @@ import (
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)

func NewTBOperationHandler(
Expand Down Expand Up @@ -117,8 +116,10 @@ func TBOperationHandler(
{
if !opResponse.GetOperation().Ready {
if deadlineExceeded(tb.Audit.CreatedAt, config) {
prevState := operation.GetState()
operation.SetState(types.OperationStateStartCancelling)
operation.SetMessage("Operation deadline exceeded")
return db.UpdateOperationIfStateUnchanged(ctx, operation, prevState)
}
return db.UpdateOperation(ctx, operation)
} else if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
Expand Down
8 changes: 4 additions & 4 deletions internal/server/services/operation/operation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package operation

import (
"context"

"ydbcp/internal/auth"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
Expand Down Expand Up @@ -134,15 +133,16 @@ func (s *OperationService) CancelOperation(
}
ctx = xlog.With(ctx, zap.String("SubjectID", subject))

if operation.GetState() != types.OperationStatePending {
xlog.Error(ctx, "can't cancel operation with state != pending")
if operation.GetState() != types.OperationStatePending && operation.GetState() != types.OperationStateRunning {
xlog.Error(ctx, "can't cancel operation with state", zap.String("state", operation.GetState().String()))
return operation.Proto(), nil
}

prevState := operation.GetState()
operation.SetState(types.OperationStateStartCancelling)
operation.SetMessage("Operation was cancelled via OperationService")

err = s.driver.UpdateOperation(ctx, operation)
err = s.driver.UpdateOperationIfStateUnchanged(ctx, operation, prevState)
if err != nil {
xlog.Error(ctx, "error updating operation", zap.Error(err))
return nil, status.Error(codes.Internal, "error updating operation")
Expand Down

0 comments on commit be2939b

Please sign in to comment.