Skip to content

Commit

Permalink
Implement page_tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Sep 30, 2024
1 parent 680b553 commit e9b24e8
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 79 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ jobs:
- name: docker compose up
run: |
docker compose up -d
- name: run list_schedules test
run: docker exec local-ydbcp sh -c './list_schedules'
- name: run list_entities test
run: docker exec local-ydbcp sh -c './list_entities'
- name: docker compose down
run: |
docker compose down
Original file line number Diff line number Diff line change
Expand Up @@ -256,75 +256,104 @@ func main() {
log.Panicf("failed to insert schedule: %v", err)
}
}

scheduleClient := pb.NewBackupScheduleServiceClient(conn)
schedules, err := scheduleClient.ListBackupSchedules(
context.Background(), &pb.ListBackupSchedulesRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
},
)
if err != nil {
log.Panicf("failed to list backup schedules: %v", err)
}

if len(schedules.Schedules) != 4 {
log.Panicln("did not get all the schedules")
}
for i, s := range schedules.Schedules {
if strconv.Itoa(4-i) != s.Id {
log.Panicf("wrong schedules order: expected %d, got %s", i, s.Id)
{
schedules, err := scheduleClient.ListBackupSchedules(
context.Background(), &pb.ListBackupSchedulesRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
PageSize: 3,
},
)
if err != nil {
log.Panicf("failed to list backup schedules: %v", err)
}
switch s.Id {
case "1":
{
if s.LastSuccessfulBackupInfo.BackupId != "2" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fivePM {
log.Panicf(
"Expected BackupID = 2, RecoveryPoint = %s, got %s for scheduleID %s", fivePM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
if len(schedules.Schedules) != 3 {
log.Panicln("did not get expected amount schedules")
}
if schedules.NextPageToken != "3" {
log.Panicln("wrong next page token")
}
for i, s := range schedules.Schedules {
if strconv.Itoa(4-i) != s.Id {
log.Panicf("wrong schedules order: expected %d, got %s", i, s.Id)
}
case "2":
{
if s.LastSuccessfulBackupInfo.BackupId != "4" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fourPM {
log.Panicf(
"Expected BackupID = 4, RecoveryPoint = %s, got %s for scheduleID %s", fourPM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)
switch s.Id {
case "2":
{

if s.LastSuccessfulBackupInfo.BackupId != "4" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fourPM {
log.Panicf(
"Expected BackupID = 4, RecoveryPoint = %s, got %s for scheduleID %s", fourPM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)

}
}
}
case "3":
{
info := &pb.ScheduledBackupInfo{
BackupId: "6",
RecoveryPoint: timestamppb.New(fourPM),
case "3":
{
info := &pb.ScheduledBackupInfo{
BackupId: "6",
RecoveryPoint: timestamppb.New(fourPM),
}
if !proto.Equal(info, s.LastSuccessfulBackupInfo) {
log.Panicf(
"Expected %s, got %s for scheduleID %s", info.String(), s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}
if !proto.Equal(info, s.LastSuccessfulBackupInfo) {
log.Panicf(
"Expected %s, got %s for scheduleID %s", info.String(), s.LastSuccessfulBackupInfo.String(),
s.Id,
)
case "4":
{
if s.LastSuccessfulBackupInfo != nil {
log.Panicf(
"Expected nil, got %s for scheduleID %s", s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}
}
case "4":
{
if s.LastSuccessfulBackupInfo != nil {
log.Panicf(
"Expected nil, got %s for scheduleID %s", s.LastSuccessfulBackupInfo.String(),
s.Id,
)
default:
{
log.Panicf("unexpected schedule id: %s", s.Id)
}
}
default:
{
log.Panicf("unexpected schedule id: %s", s.Id)
}
}
{
schedules, err := scheduleClient.ListBackupSchedules(
context.Background(), &pb.ListBackupSchedulesRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
PageSize: 3,
PageToken: "3",
},
)
if err != nil {
log.Panicf("failed to list backup schedules: %v", err)
}
if len(schedules.Schedules) != 1 {
log.Panicln("did not get expected amount schedules")
}
if schedules.NextPageToken != "" {
log.Panicln("wrong next page token")
}

for _, s := range schedules.Schedules {
if s.Id != "1" {
log.Panicf("wrong schedule id, expected 1, got %s", s.Id)
}
if s.LastSuccessfulBackupInfo.BackupId != "2" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fivePM {
log.Panicf(
"Expected BackupID = 2, RecoveryPoint = %s, got %s for scheduleID %s", fivePM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}
}

{
s, err := scheduleClient.GetBackupSchedule(ctx, &pb.GetBackupScheduleRequest{Id: "1"})
if err != nil {
Expand Down Expand Up @@ -380,5 +409,58 @@ func main() {
)
}
}
{
backupClient := pb.NewBackupServiceClient(conn)
backupsPb, err := backupClient.ListBackups(
ctx, &pb.ListBackupsRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
PageSize: 4,
},
)
if err != nil {
log.Panicf("failed to list backups: %v", err)
}
if len(backupsPb.Backups) != 4 {
log.Panicf("wrong list response size")
}
if backupsPb.NextPageToken != "4" {
log.Panicf("wrong next page token, expected \"4\", got \"%s\"", backupsPb.NextPageToken)
}
backupsPb, err = backupClient.ListBackups(
ctx, &pb.ListBackupsRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
PageSize: 4,
PageToken: "4",
},
)
if err != nil {
log.Panicf("failed to list backups: %v", err)
}
if len(backupsPb.Backups) != 4 {
log.Panicf("wrong list response size")
}
if backupsPb.NextPageToken != "8" {
log.Panicf("wrong next page token, expected \"8\", got \"%s\"", backupsPb.NextPageToken)
}
backupsPb, err = backupClient.ListBackups(
ctx, &pb.ListBackupsRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
PageSize: 4,
PageToken: "8",
},
)
if err != nil {
log.Panicf("failed to list backups: %v", err)
}
if len(backupsPb.Backups) != 0 {
log.Panicf("wrong list response size")
}
if backupsPb.NextPageToken != "" {
log.Panicf("wrong next page token, expected \"\", got \"%s\"", backupsPb.NextPageToken)
}

}
}
2 changes: 1 addition & 1 deletion dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ RUN go build -o . ./cmd/ydbcp/main.go
RUN go build -o ./make_backup ./cmd/integration/make_backup/main.go

# Build integration test app
RUN go build -o ./list_schedules ./cmd/integration/list_schedules/main.go
RUN go build -o ./list_entities ./cmd/integration/list_entities/main.go

# Command to run the executable
CMD ["./main", "--config=local_config.yaml"]
85 changes: 74 additions & 11 deletions internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"strconv"
"strings"

"ydbcp/internal/util/xlog"
Expand All @@ -13,6 +16,10 @@ import (
"go.uber.org/zap"
)

const (
DEFAULT_PAGE_SIZE = 50
)

type QueryFilter struct {
Field string
Values []table_types.Value
Expand All @@ -34,13 +41,36 @@ type OrderSpec struct {
Desc bool
}

type PageSpec struct {
Limit uint64
Offset uint64
}

func NewPageSpec(pageSize uint32, pageToken string) (*PageSpec, error) {
var pageSpec PageSpec
if pageSize == 0 {
pageSpec.Limit = DEFAULT_PAGE_SIZE
} else {
pageSpec.Limit = uint64(pageSize)
}
if pageToken != "" {
offset, err := strconv.ParseUint(pageToken, 10, 64)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "can't parse page token")
}
pageSpec.Offset = offset
}
return &pageSpec, nil
}

type ReadTableQueryImpl struct {
rawQuery *string
tableName string
filters [][]table_types.Value
filterFields []string
isLikeFilter map[string]bool
orderBy *OrderSpec
pageSpec *PageSpec
tableQueryParams []table.ParameterOption
}

Expand Down Expand Up @@ -100,6 +130,12 @@ func WithOrderBy(spec OrderSpec) ReadTableQueryOption {
}
}

func WithPageSpec(spec PageSpec) ReadTableQueryOption {
return func(d *ReadTableQueryImpl) {
d.pageSpec = &spec
}
}

func (d *ReadTableQueryImpl) AddTableQueryParam(paramValue table_types.Value) string {
paramName := fmt.Sprintf("$param%d", len(d.tableQueryParams))
d.tableQueryParams = append(
Expand Down Expand Up @@ -128,30 +164,57 @@ func (d *ReadTableQueryImpl) MakeFilterString() string {
return fmt.Sprintf(" WHERE %s", strings.Join(filterStrings, " AND "))
}

func (d *ReadTableQueryImpl) FormatOrder() *string {
if d.orderBy == nil {
return nil
}
descStr := ""
if d.orderBy.Desc == true {
descStr = " DESC"
}
orderBy := fmt.Sprintf(" ORDER BY %s%s", d.orderBy.Field, descStr)
return &orderBy
}

func (d *ReadTableQueryImpl) FormatPage() *string {
if d.pageSpec == nil {
return nil
}
page := ""
if d.pageSpec.Limit != 0 {
page = fmt.Sprintf(" LIMIT %d", d.pageSpec.Limit)
}
if d.pageSpec.Offset != 0 {
page += fmt.Sprintf(" OFFSET %d", d.pageSpec.Offset)
}
return &page
}

func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) {
var res string
filter := d.MakeFilterString()
orderBy := ""
if d.orderBy != nil {
descStr := ""
if d.orderBy.Desc == true {
descStr = " DESC"
}
orderBy = fmt.Sprintf(" ORDER BY %s%s", d.orderBy.Field, descStr)
}

if d.rawQuery == nil {
if len(d.tableName) == 0 {
return nil, errors.New("no table")
}
res = fmt.Sprintf(
"SELECT * FROM %s%s%s",
"SELECT * FROM %s%s",
d.tableName,
filter,
orderBy,
)
} else {
res = fmt.Sprintf("%s%s%s", *d.rawQuery, filter, orderBy)
res = fmt.Sprintf("%s%s", *d.rawQuery, filter)
}
order := d.FormatOrder()
if order != nil {
res += *order
}
page := d.FormatPage()
if page != nil {
res += *page
}

xlog.Debug(ctx, "read query", zap.String("yql", res))
return &FormatQueryResult{
QueryText: res,
Expand Down
Loading

0 comments on commit e9b24e8

Please sign in to comment.