From e9b24e82665dd2aac05f109d041b3c995c2ab183 Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Mon, 30 Sep 2024 17:33:30 +0300 Subject: [PATCH] Implement page_tokens --- .github/workflows/unit-test.yml | 4 +- .../{list_schedules => list_entities}/main.go | 194 +++++++++++++----- dockerfile | 2 +- internal/connectors/db/yql/queries/read.go | 85 +++++++- .../connectors/db/yql/queries/read_test.go | 56 ++++- .../server/services/backup/backup_service.go | 17 +- .../backup_schedule_service.go | 14 +- .../services/operation/operation_service.go | 15 +- pkg/proto/ydbcp/v1alpha1/backup_service.proto | 3 +- 9 files changed, 311 insertions(+), 79 deletions(-) rename cmd/integration/{list_schedules => list_entities}/main.go (69%) diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index a4f1d6d4..482eddc1 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -69,8 +69,8 @@ jobs: - name: docker compose up run: | docker compose up -d - - name: run list_schedules test - run: docker exec local-ydbcp sh -c './list_schedules' + - name: run list_entities test + run: docker exec local-ydbcp sh -c './list_entities' - name: docker compose down run: | docker compose down diff --git a/cmd/integration/list_schedules/main.go b/cmd/integration/list_entities/main.go similarity index 69% rename from cmd/integration/list_schedules/main.go rename to cmd/integration/list_entities/main.go index 3246e5bc..412e3a22 100644 --- a/cmd/integration/list_schedules/main.go +++ b/cmd/integration/list_entities/main.go @@ -256,75 +256,104 @@ func main() { log.Panicf("failed to insert schedule: %v", err) } } - scheduleClient := pb.NewBackupScheduleServiceClient(conn) - schedules, err := scheduleClient.ListBackupSchedules( - context.Background(), &pb.ListBackupSchedulesRequest{ - ContainerId: containerID, - DatabaseNameMask: "%", - }, - ) - if err != nil { - log.Panicf("failed to list backup schedules: %v", err) - } - if len(schedules.Schedules) != 4 { - log.Panicln("did not get all the schedules") - } - for i, s := range schedules.Schedules { - if strconv.Itoa(4-i) != s.Id { - log.Panicf("wrong schedules order: expected %d, got %s", i, s.Id) + { + schedules, err := scheduleClient.ListBackupSchedules( + context.Background(), &pb.ListBackupSchedulesRequest{ + ContainerId: containerID, + DatabaseNameMask: "%", + PageSize: 3, + }, + ) + if err != nil { + log.Panicf("failed to list backup schedules: %v", err) } - switch s.Id { - case "1": - { - if s.LastSuccessfulBackupInfo.BackupId != "2" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fivePM { - log.Panicf( - "Expected BackupID = 2, RecoveryPoint = %s, got %s for scheduleID %s", fivePM.String(), - s.LastSuccessfulBackupInfo.String(), - s.Id, - ) - } + if len(schedules.Schedules) != 3 { + log.Panicln("did not get expected amount schedules") + } + if schedules.NextPageToken != "3" { + log.Panicln("wrong next page token") + } + for i, s := range schedules.Schedules { + if strconv.Itoa(4-i) != s.Id { + log.Panicf("wrong schedules order: expected %d, got %s", i, s.Id) } - case "2": - { - if s.LastSuccessfulBackupInfo.BackupId != "4" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fourPM { - log.Panicf( - "Expected BackupID = 4, RecoveryPoint = %s, got %s for scheduleID %s", fourPM.String(), - s.LastSuccessfulBackupInfo.String(), - s.Id, - ) + switch s.Id { + case "2": + { + + if s.LastSuccessfulBackupInfo.BackupId != "4" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fourPM { + log.Panicf( + "Expected BackupID = 4, RecoveryPoint = %s, got %s for scheduleID %s", fourPM.String(), + s.LastSuccessfulBackupInfo.String(), + s.Id, + ) + } } - } - case "3": - { - info := &pb.ScheduledBackupInfo{ - BackupId: "6", - RecoveryPoint: timestamppb.New(fourPM), + case "3": + { + info := &pb.ScheduledBackupInfo{ + BackupId: "6", + RecoveryPoint: timestamppb.New(fourPM), + } + if !proto.Equal(info, s.LastSuccessfulBackupInfo) { + log.Panicf( + "Expected %s, got %s for scheduleID %s", info.String(), s.LastSuccessfulBackupInfo.String(), + s.Id, + ) + } } - if !proto.Equal(info, s.LastSuccessfulBackupInfo) { - log.Panicf( - "Expected %s, got %s for scheduleID %s", info.String(), s.LastSuccessfulBackupInfo.String(), - s.Id, - ) + case "4": + { + if s.LastSuccessfulBackupInfo != nil { + log.Panicf( + "Expected nil, got %s for scheduleID %s", s.LastSuccessfulBackupInfo.String(), + s.Id, + ) + } } - } - case "4": - { - if s.LastSuccessfulBackupInfo != nil { - log.Panicf( - "Expected nil, got %s for scheduleID %s", s.LastSuccessfulBackupInfo.String(), - s.Id, - ) + default: + { + log.Panicf("unexpected schedule id: %s", s.Id) } } - default: - { - log.Panicf("unexpected schedule id: %s", s.Id) + } + } + { + schedules, err := scheduleClient.ListBackupSchedules( + context.Background(), &pb.ListBackupSchedulesRequest{ + ContainerId: containerID, + DatabaseNameMask: "%", + PageSize: 3, + PageToken: "3", + }, + ) + if err != nil { + log.Panicf("failed to list backup schedules: %v", err) + } + if len(schedules.Schedules) != 1 { + log.Panicln("did not get expected amount schedules") + } + if schedules.NextPageToken != "" { + log.Panicln("wrong next page token") + } + + for _, s := range schedules.Schedules { + if s.Id != "1" { + log.Panicf("wrong schedule id, expected 1, got %s", s.Id) + } + if s.LastSuccessfulBackupInfo.BackupId != "2" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fivePM { + log.Panicf( + "Expected BackupID = 2, RecoveryPoint = %s, got %s for scheduleID %s", fivePM.String(), + s.LastSuccessfulBackupInfo.String(), + s.Id, + ) } } } + { s, err := scheduleClient.GetBackupSchedule(ctx, &pb.GetBackupScheduleRequest{Id: "1"}) if err != nil { @@ -380,5 +409,58 @@ func main() { ) } } + { + backupClient := pb.NewBackupServiceClient(conn) + backupsPb, err := backupClient.ListBackups( + ctx, &pb.ListBackupsRequest{ + ContainerId: containerID, + DatabaseNameMask: "%", + PageSize: 4, + }, + ) + if err != nil { + log.Panicf("failed to list backups: %v", err) + } + if len(backupsPb.Backups) != 4 { + log.Panicf("wrong list response size") + } + if backupsPb.NextPageToken != "4" { + log.Panicf("wrong next page token, expected \"4\", got \"%s\"", backupsPb.NextPageToken) + } + backupsPb, err = backupClient.ListBackups( + ctx, &pb.ListBackupsRequest{ + ContainerId: containerID, + DatabaseNameMask: "%", + PageSize: 4, + PageToken: "4", + }, + ) + if err != nil { + log.Panicf("failed to list backups: %v", err) + } + if len(backupsPb.Backups) != 4 { + log.Panicf("wrong list response size") + } + if backupsPb.NextPageToken != "8" { + log.Panicf("wrong next page token, expected \"8\", got \"%s\"", backupsPb.NextPageToken) + } + backupsPb, err = backupClient.ListBackups( + ctx, &pb.ListBackupsRequest{ + ContainerId: containerID, + DatabaseNameMask: "%", + PageSize: 4, + PageToken: "8", + }, + ) + if err != nil { + log.Panicf("failed to list backups: %v", err) + } + if len(backupsPb.Backups) != 0 { + log.Panicf("wrong list response size") + } + if backupsPb.NextPageToken != "" { + log.Panicf("wrong next page token, expected \"\", got \"%s\"", backupsPb.NextPageToken) + } + } } diff --git a/dockerfile b/dockerfile index 480713de..562a6ae3 100644 --- a/dockerfile +++ b/dockerfile @@ -20,7 +20,7 @@ RUN go build -o . ./cmd/ydbcp/main.go RUN go build -o ./make_backup ./cmd/integration/make_backup/main.go # Build integration test app -RUN go build -o ./list_schedules ./cmd/integration/list_schedules/main.go +RUN go build -o ./list_entities ./cmd/integration/list_entities/main.go # Command to run the executable CMD ["./main", "--config=local_config.yaml"] diff --git a/internal/connectors/db/yql/queries/read.go b/internal/connectors/db/yql/queries/read.go index af457a41..9856bad1 100644 --- a/internal/connectors/db/yql/queries/read.go +++ b/internal/connectors/db/yql/queries/read.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "strconv" "strings" "ydbcp/internal/util/xlog" @@ -13,6 +16,10 @@ import ( "go.uber.org/zap" ) +const ( + DEFAULT_PAGE_SIZE = 50 +) + type QueryFilter struct { Field string Values []table_types.Value @@ -34,6 +41,28 @@ type OrderSpec struct { Desc bool } +type PageSpec struct { + Limit uint64 + Offset uint64 +} + +func NewPageSpec(pageSize uint32, pageToken string) (*PageSpec, error) { + var pageSpec PageSpec + if pageSize == 0 { + pageSpec.Limit = DEFAULT_PAGE_SIZE + } else { + pageSpec.Limit = uint64(pageSize) + } + if pageToken != "" { + offset, err := strconv.ParseUint(pageToken, 10, 64) + if err != nil { + return nil, status.Error(codes.InvalidArgument, "can't parse page token") + } + pageSpec.Offset = offset + } + return &pageSpec, nil +} + type ReadTableQueryImpl struct { rawQuery *string tableName string @@ -41,6 +70,7 @@ type ReadTableQueryImpl struct { filterFields []string isLikeFilter map[string]bool orderBy *OrderSpec + pageSpec *PageSpec tableQueryParams []table.ParameterOption } @@ -100,6 +130,12 @@ func WithOrderBy(spec OrderSpec) ReadTableQueryOption { } } +func WithPageSpec(spec PageSpec) ReadTableQueryOption { + return func(d *ReadTableQueryImpl) { + d.pageSpec = &spec + } +} + func (d *ReadTableQueryImpl) AddTableQueryParam(paramValue table_types.Value) string { paramName := fmt.Sprintf("$param%d", len(d.tableQueryParams)) d.tableQueryParams = append( @@ -128,30 +164,57 @@ func (d *ReadTableQueryImpl) MakeFilterString() string { return fmt.Sprintf(" WHERE %s", strings.Join(filterStrings, " AND ")) } +func (d *ReadTableQueryImpl) FormatOrder() *string { + if d.orderBy == nil { + return nil + } + descStr := "" + if d.orderBy.Desc == true { + descStr = " DESC" + } + orderBy := fmt.Sprintf(" ORDER BY %s%s", d.orderBy.Field, descStr) + return &orderBy +} + +func (d *ReadTableQueryImpl) FormatPage() *string { + if d.pageSpec == nil { + return nil + } + page := "" + if d.pageSpec.Limit != 0 { + page = fmt.Sprintf(" LIMIT %d", d.pageSpec.Limit) + } + if d.pageSpec.Offset != 0 { + page += fmt.Sprintf(" OFFSET %d", d.pageSpec.Offset) + } + return &page +} + func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) { var res string filter := d.MakeFilterString() - orderBy := "" - if d.orderBy != nil { - descStr := "" - if d.orderBy.Desc == true { - descStr = " DESC" - } - orderBy = fmt.Sprintf(" ORDER BY %s%s", d.orderBy.Field, descStr) - } + if d.rawQuery == nil { if len(d.tableName) == 0 { return nil, errors.New("no table") } res = fmt.Sprintf( - "SELECT * FROM %s%s%s", + "SELECT * FROM %s%s", d.tableName, filter, - orderBy, ) } else { - res = fmt.Sprintf("%s%s%s", *d.rawQuery, filter, orderBy) + res = fmt.Sprintf("%s%s", *d.rawQuery, filter) } + order := d.FormatOrder() + if order != nil { + res += *order + } + page := d.FormatPage() + if page != nil { + res += *page + } + xlog.Debug(ctx, "read query", zap.String("yql", res)) return &FormatQueryResult{ QueryText: res, diff --git a/internal/connectors/db/yql/queries/read_test.go b/internal/connectors/db/yql/queries/read_test.go index 184ebe75..798eb63c 100644 --- a/internal/connectors/db/yql/queries/read_test.go +++ b/internal/connectors/db/yql/queries/read_test.go @@ -9,7 +9,7 @@ import ( table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" ) -func TestQueryBuilder_Read(t *testing.T) { +func TestQueryBuilderRead(t *testing.T) { const ( queryString = `SELECT * FROM table1 WHERE (column1 = $param0 OR column1 = $param1) AND (column2 = $param2 OR column2 = $param3)` ) @@ -51,7 +51,7 @@ func TestQueryBuilder_Read(t *testing.T) { assert.Equal(t, queryParams, query.QueryParams, "bad query params") } -func TestQueryBuilder_Order(t *testing.T) { +func TestQueryBuilderOrderBy(t *testing.T) { const ( q1 = `SELECT * FROM table1 ORDER BY field` q2 = `SELECT * FROM table1 ORDER BY field DESC` @@ -88,7 +88,7 @@ func TestQueryBuilder_Order(t *testing.T) { ) } -func TestQueryBuilder_RawQueryModifiers(t *testing.T) { +func TestQueryBuilderRawQueryModifiers(t *testing.T) { const ( query = `SELECT * FROM table1` fullQuery = `SELECT * FROM table1 WHERE (col1 = $param0) AND (col2 = $param1) ORDER BY col3 DESC` @@ -124,3 +124,53 @@ func TestQueryBuilder_RawQueryModifiers(t *testing.T) { "bad query format", ) } + +func TestQueryBuilderPagination(t *testing.T) { + const ( + query = `SELECT * FROM table1 WHERE (col1 = $param0) ORDER BY col3 LIMIT 5 OFFSET 10` + noOffset = `SELECT * FROM table1 LIMIT 5` + ) + builder := NewReadTableQuery( + WithTableName("table1"), + WithQueryFilters( + QueryFilter{ + Field: "col1", + Values: []table_types.Value{ + table_types.StringValueFromString("value1"), + }, + }, + ), + WithOrderBy( + OrderSpec{ + Field: "col3", + }, + ), + WithPageSpec( + PageSpec{ + Limit: 5, + Offset: 10, + }, + ), + ) + fq, err := builder.FormatQuery(context.Background()) + assert.Empty(t, err) + assert.Equal( + t, query, fq.QueryText, + "bad query format", + ) + builder = NewReadTableQuery( + WithTableName("table1"), + WithPageSpec( + PageSpec{ + Limit: 5, + }, + ), + ) + fq, err = builder.FormatQuery(context.Background()) + assert.Empty(t, err) + assert.Equal( + t, noOffset, fq.QueryText, + "bad query format", + ) + +} diff --git a/internal/server/services/backup/backup_service.go b/internal/server/services/backup/backup_service.go index 923edcaf..dbaf34d2 100644 --- a/internal/server/services/backup/backup_service.go +++ b/internal/server/services/backup/backup_service.go @@ -2,6 +2,7 @@ package backup import ( "context" + "strconv" "ydbcp/internal/backup_operations" "ydbcp/internal/config" "ydbcp/internal/connectors/client" @@ -365,6 +366,11 @@ func (s *BackupService) ListBackups(ctx context.Context, request *pb.ListBackups ) } + pageSpec, err := queries.NewPageSpec(request.GetPageSize(), request.GetPageToken()) + if err != nil { + return nil, err + } + backups, err := s.driver.SelectBackups( ctx, queries.NewReadTableQuery( queries.WithTableName("Backups"), @@ -375,6 +381,7 @@ func (s *BackupService) ListBackups(ctx context.Context, request *pb.ListBackups Desc: true, }, ), + queries.WithPageSpec(*pageSpec), ), ) if err != nil { @@ -385,8 +392,14 @@ func (s *BackupService) ListBackups(ctx context.Context, request *pb.ListBackups for _, backup := range backups { pbBackups = append(pbBackups, backup.Proto()) } - xlog.Debug(ctx, "success") - return &pb.ListBackupsResponse{Backups: pbBackups}, nil + res := &pb.ListBackupsResponse{ + Backups: pbBackups, + } + if uint64(len(pbBackups)) == pageSpec.Limit { + res.NextPageToken = strconv.FormatUint(pageSpec.Offset+pageSpec.Limit, 10) + } + xlog.Debug(ctx, "ListBackups success") + return res, nil } func (s *BackupService) Register(server server.Server) { diff --git a/internal/server/services/backup_schedule/backup_schedule_service.go b/internal/server/services/backup_schedule/backup_schedule_service.go index b1ddb013..77052c92 100644 --- a/internal/server/services/backup_schedule/backup_schedule_service.go +++ b/internal/server/services/backup_schedule/backup_schedule_service.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/gorhill/cronexpr" "github.com/ydb-platform/ydb-go-sdk/v3/table" + "strconv" "time" "ydbcp/internal/auth" @@ -199,6 +200,12 @@ func (s *BackupScheduleService) ListBackupSchedules( }, ) } + + pageSpec, err := queries.NewPageSpec(request.GetPageSize(), request.GetPageToken()) + if err != nil { + return nil, err + } + schedules, err := s.driver.SelectBackupSchedulesWithRPOInfo( ctx, queries.NewReadTableQuery( queries.WithRawQuery(ListSchedulesQuery), @@ -209,6 +216,7 @@ func (s *BackupScheduleService) ListBackupSchedules( Desc: true, }, ), + queries.WithPageSpec(*pageSpec), ), ) if err != nil { @@ -219,8 +227,12 @@ func (s *BackupScheduleService) ListBackupSchedules( for _, schedule := range schedules { pbSchedules = append(pbSchedules, schedule.Proto()) } + res := &pb.ListBackupSchedulesResponse{Schedules: pbSchedules} + if uint64(len(pbSchedules)) == pageSpec.Limit { + res.NextPageToken = strconv.FormatUint(pageSpec.Offset+pageSpec.Limit, 10) + } xlog.Debug(ctx, "ListBackupSchedules success") - return &pb.ListBackupSchedulesResponse{Schedules: pbSchedules}, nil + return res, nil } func (s *BackupScheduleService) ToggleBackupSchedule( diff --git a/internal/server/services/operation/operation_service.go b/internal/server/services/operation/operation_service.go index da05df56..c97d3eaf 100644 --- a/internal/server/services/operation/operation_service.go +++ b/internal/server/services/operation/operation_service.go @@ -2,6 +2,7 @@ package operation import ( "context" + "strconv" "ydbcp/internal/auth" "ydbcp/internal/connectors/db" @@ -62,6 +63,11 @@ func (s *OperationService) ListOperations( ) } + pageSpec, err := queries.NewPageSpec(request.GetPageSize(), request.GetPageToken()) + if err != nil { + return nil, err + } + operations, err := s.driver.SelectOperations( ctx, queries.NewReadTableQuery( queries.WithTableName("Operations"), @@ -72,6 +78,7 @@ func (s *OperationService) ListOperations( Desc: true, }, ), + queries.WithPageSpec(*pageSpec), ), ) if err != nil { @@ -82,8 +89,12 @@ func (s *OperationService) ListOperations( for _, operation := range operations { pbOperations = append(pbOperations, operation.Proto()) } - xlog.Debug(ctx, "success list operations") - return &pb.ListOperationsResponse{Operations: pbOperations}, nil + res := &pb.ListOperationsResponse{Operations: pbOperations} + if uint64(len(pbOperations)) == pageSpec.Limit { + res.NextPageToken = strconv.FormatUint(pageSpec.Offset+pageSpec.Limit, 10) + } + xlog.Debug(ctx, "success ListOperations") + return res, nil } func (s *OperationService) CancelOperation( diff --git a/pkg/proto/ydbcp/v1alpha1/backup_service.proto b/pkg/proto/ydbcp/v1alpha1/backup_service.proto index 31ad463d..48ffef59 100644 --- a/pkg/proto/ydbcp/v1alpha1/backup_service.proto +++ b/pkg/proto/ydbcp/v1alpha1/backup_service.proto @@ -32,7 +32,8 @@ message ListBackupsRequest { uint32 page_size = 1000; // [(value) = "0-1000"]; // Page token. Set `page_token` to the `next_page_token` returned by a previous ListBackups - // request to get the next page of results. + // request to get the next page of results. Page token is an integer that represents an OFFSET for + // YQL query. string page_token = 1001; // [(length) = "<=100"]; }