Skip to content

Commit

Permalink
feat(ydbcp): add updated_at field for operation
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Sep 13, 2024
1 parent 8cac8de commit 66a23d5
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 79 deletions.
7 changes: 7 additions & 0 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"
"time"
"ydbcp/internal/config"
"ydbcp/internal/connectors/db/yql/queries"
Expand Down Expand Up @@ -312,6 +313,12 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) (
func (d *YdbConnector) UpdateOperation(
ctx context.Context, operation types.Operation,
) error {
if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil {
operation.SetUpdatedAt(operation.GetAudit().CompletedAt)
} else {
operation.SetUpdatedAt(timestamppb.Now())
}

return d.ExecuteUpsert(ctx, queries.NewWriteTableQuery(ctx).WithUpdateOperation(operation))
}

Expand Down
11 changes: 11 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
creator *string
createdAt *time.Time
completedAt *time.Time
updatedAt *time.Time
updatedTs *timestamppb.Timestamp
)
err := res.ScanNamed(
named.Required("id", &operationId),
Expand All @@ -151,6 +153,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
named.Optional("created_at", &createdAt),
named.Optional("completed_at", &completedAt),
named.Optional("initiated", &creator),
named.Optional("updated_at", &updatedAt),
)
if err != nil {
return nil, err
Expand All @@ -167,6 +170,11 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
if sourcePathsToExclude != nil {
sourcePathsToExcludeSlice = strings.Split(*sourcePathsToExclude, ",")
}

if updatedAt != nil {
updatedTs = timestamppb.New(*updatedAt)
}

if operationType == string(types.OperationTypeTB) {
if backupId == nil {
return nil, fmt.Errorf("failed to read backup_id for TB operation: %s", operationId)
Expand All @@ -185,6 +193,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
SourcePathsToExclude: sourcePathsToExcludeSlice,
YdbOperationId: StringOrEmpty(ydbOperationId),
Audit: auditFromDb(creator, createdAt, completedAt),
UpdatedAt: updatedTs,
}, nil
} else if operationType == string(types.OperationTypeRB) {
if backupId == nil {
Expand All @@ -203,6 +212,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
YdbOperationId: StringOrEmpty(ydbOperationId),
SourcePaths: sourcePathsSlice,
Audit: auditFromDb(creator, createdAt, completedAt),
UpdatedAt: updatedTs,
}, nil
} else if operationType == string(types.OperationTypeDB) {
if backupId == nil {
Expand All @@ -226,6 +236,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
Message: StringOrEmpty(message),
Audit: auditFromDb(creator, createdAt, completedAt),
PathPrefix: pathPrefix,
UpdatedAt: updatedTs,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
AllOperationFields = []string{
"id", "type", "container_id", "database", "endpoint", "backup_id",
"initiated", "created_at", "completed_at", "status", "message",
"paths", "operation_id", "paths_to_exclude",
"paths", "operation_id", "paths_to_exclude", "updated_at",
}
AllBackupScheduleFields = []string{
"id", "container_id", "database", "endpoint", "name", "active", "crontab", "ttl", "paths", "paths_to_exclude",
Expand Down
12 changes: 12 additions & 0 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func BuildCreateOperationQuery(ctx context.Context, operation types.Operation, i
)
}
}
if operation.GetUpdatedAt() != nil {
d.AddValueParam(
"$updated_at",
table_types.TimestampValueFromTime(operation.GetUpdatedAt().AsTime()),
)
}

if operation.GetType() == types.OperationTypeTB {
tb, ok := operation.(*types.TakeBackupOperation)
Expand Down Expand Up @@ -200,6 +206,12 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle
table_types.TimestampValueFromTime(operation.GetAudit().GetCompletedAt().AsTime()),
)
}
if operation.GetUpdatedAt() != nil {
d.AddValueParam(
"$updated_at",
table_types.TimestampValueFromTime(operation.GetUpdatedAt().AsTime()),
)
}
return d
}

Expand Down
1 change: 1 addition & 0 deletions internal/connectors/db/yql/schema/create_tables.yql
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ CREATE TABLE Operations (
initiated String,
created_at Timestamp,
completed_at Timestamp,
updated_at Timestamp,

status String,
message String,
Expand Down
7 changes: 3 additions & 4 deletions internal/handlers/restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func RBOperationHandler(
if deadlineExceeded(mr.Audit.CreatedAt, config) {
operation.SetState(types.OperationStateStartCancelling)
operation.SetMessage("Operation deadline exceeded")
return db.UpdateOperation(ctx, operation)
}
return nil

return db.UpdateOperation(ctx, operation)
}
if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
operation.SetState(types.OperationStateDone)
Expand Down Expand Up @@ -114,10 +114,9 @@ func RBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
operation.GetAudit().CompletedAt = timestamppb.Now()
return db.UpdateOperation(ctx, operation)
}

return nil
return db.UpdateOperation(ctx, operation)
}
if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
operation.SetState(types.OperationStateDone)
Expand Down
8 changes: 3 additions & 5 deletions internal/handlers/take_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,8 @@ func TBOperationHandler(
if deadlineExceeded(tb.Audit.CreatedAt, config) {
operation.SetState(types.OperationStateStartCancelling)
operation.SetMessage("Operation deadline exceeded")
return db.UpdateOperation(ctx, operation)
} else {
return nil
}
return db.UpdateOperation(ctx, operation)
} else if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
size, err := getBackupSize(tb.BackupID)
if err != nil {
Expand Down Expand Up @@ -177,9 +175,9 @@ func TBOperationHandler(
return db.ExecuteUpsert(
ctx, getQueryBuilder(ctx).WithUpdateOperation(operation).WithUpdateBackup(backupToWrite),
)
} else {
return nil
}

return db.UpdateOperation(ctx, operation)
}
if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
size, err := getBackupSize(tb.BackupID)
Expand Down
9 changes: 7 additions & 2 deletions internal/server/services/backup/backup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques
Creator: subject,
},
YdbOperationId: clientOperationID,
UpdatedAt: now,
}

operationID, err := s.driver.CreateOperation(ctx, op)
Expand Down Expand Up @@ -309,6 +310,7 @@ func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRe
return nil, status.Errorf(codes.FailedPrecondition, "backup can't be deleted, status %s", backup.Status)
}

now := timestamppb.Now()
op := &types.DeleteBackupOperation{
ContainerID: backup.ContainerID,
BackupID: req.GetBackupId(),
Expand All @@ -318,10 +320,11 @@ func (s *BackupService) DeleteBackup(ctx context.Context, req *pb.DeleteBackupRe
Endpoint: backup.DatabaseEndpoint,
},
Audit: &pb.AuditInfo{
CreatedAt: timestamppb.Now(),
CreatedAt: now,
Creator: subject,
},
PathPrefix: backup.S3PathPrefix,
UpdatedAt: now,
}

operationID, err := s.driver.CreateOperation(ctx, op)
Expand Down Expand Up @@ -453,6 +456,7 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ
ctx = xlog.With(ctx, zap.String("ClientOperationID", clientOperationID))
xlog.Debug(ctx, "import operation started")

now := timestamppb.Now()
op := &types.RestoreBackupOperation{
ContainerID: backup.ContainerID,
BackupId: backupID,
Expand All @@ -463,11 +467,12 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ
},
YdbOperationId: clientOperationID,
Audit: &pb.AuditInfo{
CreatedAt: timestamppb.Now(),
CreatedAt: now,
Creator: subject,
},
SourcePaths: req.GetSourcePaths(),
DestinationPrefix: req.GetDestinationPrefix(),
UpdatedAt: now,
}

operationID, err := s.driver.CreateOperation(ctx, op)
Expand Down
34 changes: 34 additions & 0 deletions internal/types/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"
"log"
"strings"
"ydbcp/internal/util/xlog"
Expand All @@ -26,6 +27,8 @@ type Operation interface {
GetMessage() string
SetMessage(m string)
GetAudit() *pb.AuditInfo
GetUpdatedAt() *timestamppb.Timestamp
SetUpdatedAt(t *timestamppb.Timestamp)
Copy() Operation
Proto() *pb.Operation
}
Expand All @@ -41,6 +44,7 @@ type TakeBackupOperation struct {
SourcePaths []string
SourcePathsToExclude []string
Audit *pb.AuditInfo
UpdatedAt *timestamppb.Timestamp
}

func (o *TakeBackupOperation) GetID() string {
Expand Down Expand Up @@ -72,6 +76,12 @@ func (o *TakeBackupOperation) SetMessage(m string) {
func (o *TakeBackupOperation) GetAudit() *pb.AuditInfo {
return o.Audit
}
func (o *TakeBackupOperation) GetUpdatedAt() *timestamppb.Timestamp {
return o.UpdatedAt
}
func (o *TakeBackupOperation) SetUpdatedAt(t *timestamppb.Timestamp) {
o.UpdatedAt = t
}
func (o *TakeBackupOperation) Copy() Operation {
copy := *o
return &copy
Expand All @@ -92,6 +102,7 @@ func (o *TakeBackupOperation) Proto() *pb.Operation {
Audit: o.Audit,
Status: o.State.Enum(),
Message: o.Message,
UpdatedAt: o.UpdatedAt,
}
}

Expand All @@ -109,6 +120,7 @@ type RestoreBackupOperation struct {
SourcePaths []string
DestinationPrefix string
Audit *pb.AuditInfo
UpdatedAt *timestamppb.Timestamp
}

func (o *RestoreBackupOperation) GetID() string {
Expand Down Expand Up @@ -140,6 +152,12 @@ func (o *RestoreBackupOperation) SetMessage(m string) {
func (o *RestoreBackupOperation) GetAudit() *pb.AuditInfo {
return o.Audit
}
func (o *RestoreBackupOperation) GetUpdatedAt() *timestamppb.Timestamp {
return o.UpdatedAt
}
func (o *RestoreBackupOperation) SetUpdatedAt(t *timestamppb.Timestamp) {
o.UpdatedAt = t
}
func (o *RestoreBackupOperation) Copy() Operation {
copy := *o
return &copy
Expand All @@ -160,6 +178,7 @@ func (o *RestoreBackupOperation) Proto() *pb.Operation {
Audit: o.Audit,
Status: o.State.Enum(),
Message: o.Message,
UpdatedAt: o.UpdatedAt,
}
}

Expand All @@ -172,6 +191,7 @@ type DeleteBackupOperation struct {
Message string
PathPrefix string
Audit *pb.AuditInfo
UpdatedAt *timestamppb.Timestamp
}

func (o *DeleteBackupOperation) GetID() string {
Expand Down Expand Up @@ -203,6 +223,12 @@ func (o *DeleteBackupOperation) SetMessage(m string) {
func (o *DeleteBackupOperation) GetAudit() *pb.AuditInfo {
return o.Audit
}
func (o *DeleteBackupOperation) GetUpdatedAt() *timestamppb.Timestamp {
return o.UpdatedAt
}
func (o *DeleteBackupOperation) SetUpdatedAt(t *timestamppb.Timestamp) {
o.UpdatedAt = t
}
func (o *DeleteBackupOperation) Copy() Operation {
copy := *o
return &copy
Expand All @@ -223,6 +249,7 @@ func (o *DeleteBackupOperation) Proto() *pb.Operation {
Audit: o.Audit,
Status: o.State.Enum(),
Message: o.Message,
UpdatedAt: o.UpdatedAt,
}
}

Expand All @@ -232,6 +259,7 @@ type GenericOperation struct {
Type OperationType
State OperationState
Message string
UpdatedAt *timestamppb.Timestamp
}

func (o *GenericOperation) GetID() string {
Expand Down Expand Up @@ -264,6 +292,12 @@ func (o *GenericOperation) SetMessage(m string) {
func (o *GenericOperation) GetAudit() *pb.AuditInfo {
return nil
}
func (o *GenericOperation) GetUpdatedAt() *timestamppb.Timestamp {
return nil
}
func (o *GenericOperation) SetUpdatedAt(t *timestamppb.Timestamp) {
o.UpdatedAt = t
}
func (o *GenericOperation) Copy() Operation {
copy := *o
return &copy
Expand Down
Loading

0 comments on commit 66a23d5

Please sign in to comment.