Skip to content

Commit

Permalink
Merge pull request #65 from ydb-platform/feature/NBYDB-281
Browse files Browse the repository at this point in the history
feat(ydbcp): support ttl for backups
  • Loading branch information
ulya-sidorina authored Sep 25, 2024
2 parents 729f369 + f7eb117 commit 8e77e05
Show file tree
Hide file tree
Showing 13 changed files with 290 additions and 6 deletions.
2 changes: 2 additions & 0 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"ydbcp/internal/server/services/operation"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
"ydbcp/internal/watchers/ttl_watcher"
ap "ydbcp/pkg/plugins/auth"

"go.uber.org/automaxprocs/maxprocs"
Expand Down Expand Up @@ -144,6 +145,7 @@ func main() {
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry)
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)

wg.Add(1)
go func() {
Expand Down
9 changes: 8 additions & 1 deletion internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func MakeBackup(
allowedEndpointDomains []string,
allowInsecureEndpoint bool,
req *pb.MakeBackupRequest, scheduleId *string,
subject string,
subject string, ttl *time.Duration,
) (*types.Backup, *types.TakeBackupOperation, error) {
if !IsAllowedEndpoint(req.DatabaseEndpoint, allowedEndpointDomains, allowInsecureEndpoint) {
xlog.Error(
Expand Down Expand Up @@ -144,6 +144,12 @@ func MakeBackup(
ctx = xlog.With(ctx, zap.String("ClientOperationID", clientOperationID))
xlog.Info(ctx, "Export operation started")

var expireAt *time.Time
if ttl != nil {
expireAt = new(time.Time)
*expireAt = time.Now().Add(*ttl)
}

now := timestamppb.Now()
backup := &types.Backup{
ID: types.GenerateObjectID(),
Expand All @@ -160,6 +166,7 @@ func MakeBackup(
Creator: subject,
},
ScheduleID: scheduleId,
ExpireAt: expireAt,
}

op := &types.TakeBackupOperation{
Expand Down
3 changes: 3 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
creator *string
completedAt *time.Time
createdAt *time.Time
expireAt *time.Time
)

err := res.ScanNamed(
Expand All @@ -93,6 +94,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
named.Optional("message", &message),
named.Optional("size", &size),
named.Optional("schedule_id", &scheduleId),
named.Optional("expire_at", &expireAt),

named.Optional("created_at", &createdAt),
named.Optional("completed_at", &completedAt),
Expand All @@ -116,6 +118,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
AuditInfo: auditFromDb(creator, createdAt, completedAt),
Size: Int64OrZero(size),
ScheduleID: scheduleId,
ExpireAt: expireAt,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
"initiated", "created_at", "completed_at",
"s3_endpoint", "s3_region", "s3_bucket",
"s3_path_prefix", "status", "paths", "message",
"size", "schedule_id",
"size", "schedule_id", "expire_at",
}
AllOperationFields = []string{
"id", "type", "container_id", "database", "endpoint", "backup_id",
Expand Down
6 changes: 5 additions & 1 deletion internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle
"$message",
table_types.StringValueFromString(operation.GetMessage()),
)
if operation.GetAudit() != nil {
if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil {
d.AddValueParam(
"$completed_at",
table_types.TimestampValueFromTime(operation.GetAudit().GetCompletedAt().AsTime()),
Expand Down Expand Up @@ -266,6 +266,10 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl
d.AddValueParam("$completed_at", table_types.TimestampValueFromTime(b.AuditInfo.CompletedAt.AsTime()))
}
}

if b.ExpireAt != nil {
d.AddValueParam("$expire_at", table_types.TimestampValueFromTime(*b.ExpireAt))
}
return d
}

Expand Down
1 change: 1 addition & 0 deletions internal/connectors/db/yql/schema/create_tables.yql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CREATE TABLE Backups (
status String,
message String,
size Int64,
expire_at Timestamp,

paths String,

Expand Down
9 changes: 8 additions & 1 deletion internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ func BackupScheduleHandler(
now := time.Now()
// do not handle last_backup_id status = (failed | deleted) for now, just do backups on cron.
if schedule.NextLaunch != nil && schedule.NextLaunch.Before(now) {
var ttl *time.Duration
if schedule.ScheduleSettings != nil && schedule.ScheduleSettings.Ttl != nil {
ttl = new(time.Duration)
*ttl = schedule.ScheduleSettings.Ttl.AsDuration()
}

b, op, err := backup_operations.MakeBackup(
ctx, clientConn, s3, allowedEndpointDomains, allowInsecureEndpoint, &pb.MakeBackupRequest{
ContainerId: schedule.ContainerID,
DatabaseName: schedule.DatabaseName,
DatabaseEndpoint: schedule.DatabaseEndpoint,
SourcePaths: schedule.SourcePaths,
SourcePathsToExclude: schedule.SourcePathsToExclude,
}, &schedule.ID, "YDBCP", //TODO: who to put as subject here?
}, &schedule.ID, types.OperationCreatorName, //TODO: who to put as subject here?
ttl,
)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/server/services/backup/backup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques
ctx = xlog.With(ctx, zap.String("SubjectID", subject))

backup, op, err := backup_operations.MakeBackup(
ctx, s.clientConn, s.s3, s.allowedEndpointDomains, s.allowInsecureEndpoint, req, nil, subject,
ctx, s.clientConn, s.s3, s.allowedEndpointDomains, s.allowInsecureEndpoint, req, nil, subject, nil,
)

if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package types

import (
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"
"time"

pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

Expand Down Expand Up @@ -37,6 +39,7 @@ type Backup struct {
AuditInfo *pb.AuditInfo
Size int64
ScheduleID *string
ExpireAt *time.Time
}

func (o *Backup) String() string {
Expand Down Expand Up @@ -71,6 +74,11 @@ func (o *Backup) Proto() *pb.Backup {
if o.ScheduleID != nil {
backup.ScheduleId = *o.ScheduleID
}

if o.ExpireAt != nil {
backup.ExpireAt = timestamppb.New(*o.ExpireAt)
}

return backup
}

Expand Down
2 changes: 1 addition & 1 deletion internal/types/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ const (
OperationTypeRB = OperationType("RB")
OperationTypeDB = OperationType("DB")
BackupTimestampFormat = "20060102_150405"
S3ForcePathStyle = true
OperationCreatorName = "ydbcp"
)

func OperationToString(o Operation) string {
Expand Down
103 changes: 103 additions & 0 deletions internal/watchers/ttl_watcher/ttl_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package ttl_watcher

import (
"context"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"sync"
"time"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
"ydbcp/internal/watchers"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)

func NewTtlWatcher(
ctx context.Context,
wg *sync.WaitGroup,
db db.DBConnector,
queryBuilderFactory queries.WriteQueryBulderFactory,
options ...watchers.Option,
) *watchers.WatcherImpl {
return watchers.NewWatcher(
ctx,
wg,
func(ctx context.Context, period time.Duration) {
TtlWatcherAction(ctx, period, db, queryBuilderFactory)
},
time.Hour,
"Ttl",
options...,
)
}

func TtlWatcherAction(
baseCtx context.Context,
period time.Duration,
db db.DBConnector,
queryBuilderFactory queries.WriteQueryBulderFactory,
) {
ctx, cancel := context.WithTimeout(baseCtx, period)
defer cancel()

backups, err := db.SelectBackups(
ctx, queries.NewReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "status",
Values: []table_types.Value{
table_types.StringValueFromString(types.BackupStateAvailable),
table_types.StringValueFromString(types.BackupStateError),
table_types.StringValueFromString(types.BackupStateCancelled),
},
},
),
),
)

if err != nil {
xlog.Error(ctx, "can't select backups", zap.Error(err))
return
}

for _, backup := range backups {
if backup.ExpireAt != nil && backup.ExpireAt.Before(time.Now()) {
now := timestamppb.Now()
dbOp := &types.DeleteBackupOperation{
ContainerID: backup.ContainerID,
BackupID: backup.ID,
State: types.OperationStatePending,
YdbConnectionParams: types.YdbConnectionParams{
DatabaseName: backup.DatabaseName,
Endpoint: backup.DatabaseEndpoint,
},
Audit: &pb.AuditInfo{
CreatedAt: now,
Creator: types.OperationCreatorName,
},
PathPrefix: backup.S3PathPrefix,
UpdatedAt: now,
}

backupToWrite := types.Backup{
ID: backup.ID,
Status: types.BackupStateDeleting,
}

err := db.ExecuteUpsert(
ctx, queryBuilderFactory().WithCreateOperation(dbOp).WithUpdateBackup(backupToWrite),
)

if err != nil {
xlog.Error(ctx, "can't create DeleteBackup operation", zap.String("BackupID", backup.ID), zap.Error(err))
}

xlog.Debug(ctx, "DeleteBackup operation was created successfully", zap.String("BackupID", backup.ID))
}
}
}
77 changes: 77 additions & 0 deletions internal/watchers/ttl_watcher/ttl_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package ttl_watcher

import (
"context"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
"ydbcp/internal/util/ticker"
"ydbcp/internal/watchers"
)

func TestTtlWatcher(t *testing.T) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Prepare fake clock and ticker
clock := clockwork.NewFakeClock()
var fakeTicker *ticker.FakeTicker
tickerInitialized := make(chan struct{})
tickerProvider := func(duration time.Duration) ticker.Ticker {
assert.Empty(t, fakeTicker, "ticker reuse")
fakeTicker = ticker.NewFakeTicker(duration)
tickerInitialized <- struct{}{}
return fakeTicker
}

// Prepare a backup
backupID := types.GenerateObjectID()
expireAt := time.Now()
backup := types.Backup{
ID: backupID,
Status: types.BackupStateAvailable,
ExpireAt: &expireAt,
}
backupMap := make(map[string]types.Backup)
backupMap[backupID] = backup

// Prepare mock db and ttl watcher
db := db.NewMockDBConnector(
db.WithBackups(backupMap),
)
_ = NewTtlWatcher(
ctx,
&wg,
db,
queries.NewWriteTableQueryMock,
watchers.WithTickerProvider(tickerProvider),
)

// Wait for the ticker to be initialized
select {
case <-ctx.Done():
t.Error("ticker not initialized")
case <-tickerInitialized:
assert.Equal(t, fakeTicker.Period, time.Hour, "incorrect period")
}

// Send a tick to the fake ticker
t0 := clock.Now().Add(time.Hour)
fakeTicker.Send(t0)

cancel()
wg.Wait()

// Check that DeleteBackup operation was created
ops, err := db.ActiveOperations(ctx)
assert.Empty(t, err)
assert.Equal(t, len(ops), 1)
assert.Equal(t, ops[0].GetType(), types.OperationTypeDB, "operation type should be DB")
assert.Equal(t, ops[0].GetState(), types.OperationStatePending, "operation state should be Pending")
}
Loading

0 comments on commit 8e77e05

Please sign in to comment.