Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement page_tokens #77

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ jobs:
- name: docker compose up
run: |
docker compose up -d
- name: run list_schedules test
run: docker exec local-ydbcp sh -c './list_schedules'
- name: run list_entities test
run: docker exec local-ydbcp sh -c './list_entities'
- name: docker compose down
run: |
docker compose down
Original file line number Diff line number Diff line change
Expand Up @@ -256,75 +256,104 @@ func main() {
log.Panicf("failed to insert schedule: %v", err)
}
}

scheduleClient := pb.NewBackupScheduleServiceClient(conn)
schedules, err := scheduleClient.ListBackupSchedules(
context.Background(), &pb.ListBackupSchedulesRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
},
)
if err != nil {
log.Panicf("failed to list backup schedules: %v", err)
}

if len(schedules.Schedules) != 4 {
log.Panicln("did not get all the schedules")
}
for i, s := range schedules.Schedules {
if strconv.Itoa(4-i) != s.Id {
log.Panicf("wrong schedules order: expected %d, got %s", i, s.Id)
{
schedules, err := scheduleClient.ListBackupSchedules(
context.Background(), &pb.ListBackupSchedulesRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
PageSize: 3,
},
)
if err != nil {
log.Panicf("failed to list backup schedules: %v", err)
}
switch s.Id {
case "1":
{
if s.LastSuccessfulBackupInfo.BackupId != "2" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fivePM {
log.Panicf(
"Expected BackupID = 2, RecoveryPoint = %s, got %s for scheduleID %s", fivePM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
if len(schedules.Schedules) != 3 {
log.Panicln("did not get expected amount schedules")
}
if schedules.NextPageToken != "3" {
log.Panicln("wrong next page token")
}
for i, s := range schedules.Schedules {
if strconv.Itoa(4-i) != s.Id {
log.Panicf("wrong schedules order: expected %d, got %s", i, s.Id)
}
case "2":
{
if s.LastSuccessfulBackupInfo.BackupId != "4" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fourPM {
log.Panicf(
"Expected BackupID = 4, RecoveryPoint = %s, got %s for scheduleID %s", fourPM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)
switch s.Id {
case "2":
{

if s.LastSuccessfulBackupInfo.BackupId != "4" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fourPM {
log.Panicf(
"Expected BackupID = 4, RecoveryPoint = %s, got %s for scheduleID %s", fourPM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)

}
}
}
case "3":
{
info := &pb.ScheduledBackupInfo{
BackupId: "6",
RecoveryPoint: timestamppb.New(fourPM),
case "3":
{
info := &pb.ScheduledBackupInfo{
BackupId: "6",
RecoveryPoint: timestamppb.New(fourPM),
}
if !proto.Equal(info, s.LastSuccessfulBackupInfo) {
log.Panicf(
"Expected %s, got %s for scheduleID %s", info.String(), s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}
if !proto.Equal(info, s.LastSuccessfulBackupInfo) {
log.Panicf(
"Expected %s, got %s for scheduleID %s", info.String(), s.LastSuccessfulBackupInfo.String(),
s.Id,
)
case "4":
{
if s.LastSuccessfulBackupInfo != nil {
log.Panicf(
"Expected nil, got %s for scheduleID %s", s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}
}
case "4":
{
if s.LastSuccessfulBackupInfo != nil {
log.Panicf(
"Expected nil, got %s for scheduleID %s", s.LastSuccessfulBackupInfo.String(),
s.Id,
)
default:
{
log.Panicf("unexpected schedule id: %s", s.Id)
}
}
default:
{
log.Panicf("unexpected schedule id: %s", s.Id)
}
}
{
schedules, err := scheduleClient.ListBackupSchedules(
context.Background(), &pb.ListBackupSchedulesRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
PageSize: 3,
PageToken: "3",
},
)
if err != nil {
log.Panicf("failed to list backup schedules: %v", err)
}
if len(schedules.Schedules) != 1 {
log.Panicln("did not get expected amount schedules")
}
if schedules.NextPageToken != "" {
log.Panicln("wrong next page token")
}

for _, s := range schedules.Schedules {
if s.Id != "1" {
log.Panicf("wrong schedule id, expected 1, got %s", s.Id)
}
if s.LastSuccessfulBackupInfo.BackupId != "2" || s.LastSuccessfulBackupInfo.RecoveryPoint.AsTime() != fivePM {
log.Panicf(
"Expected BackupID = 2, RecoveryPoint = %s, got %s for scheduleID %s", fivePM.String(),
s.LastSuccessfulBackupInfo.String(),
s.Id,
)
}
}
}

{
s, err := scheduleClient.GetBackupSchedule(ctx, &pb.GetBackupScheduleRequest{Id: "1"})
if err != nil {
Expand Down Expand Up @@ -380,5 +409,58 @@ func main() {
)
}
}
{
backupClient := pb.NewBackupServiceClient(conn)
backupsPb, err := backupClient.ListBackups(
ctx, &pb.ListBackupsRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
PageSize: 4,
},
)
if err != nil {
log.Panicf("failed to list backups: %v", err)
}
if len(backupsPb.Backups) != 4 {
log.Panicf("wrong list response size")
}
if backupsPb.NextPageToken != "4" {
log.Panicf("wrong next page token, expected \"4\", got \"%s\"", backupsPb.NextPageToken)
}
backupsPb, err = backupClient.ListBackups(
ctx, &pb.ListBackupsRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
PageSize: 4,
PageToken: "4",
},
)
if err != nil {
log.Panicf("failed to list backups: %v", err)
}
if len(backupsPb.Backups) != 4 {
log.Panicf("wrong list response size")
}
if backupsPb.NextPageToken != "8" {
log.Panicf("wrong next page token, expected \"8\", got \"%s\"", backupsPb.NextPageToken)
}
backupsPb, err = backupClient.ListBackups(
ctx, &pb.ListBackupsRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
PageSize: 4,
PageToken: "8",
},
)
if err != nil {
log.Panicf("failed to list backups: %v", err)
}
if len(backupsPb.Backups) != 0 {
log.Panicf("wrong list response size")
}
if backupsPb.NextPageToken != "" {
log.Panicf("wrong next page token, expected \"\", got \"%s\"", backupsPb.NextPageToken)
}

}
}
2 changes: 1 addition & 1 deletion dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ RUN go build -o . ./cmd/ydbcp/main.go
RUN go build -o ./make_backup ./cmd/integration/make_backup/main.go

# Build integration test app
RUN go build -o ./list_schedules ./cmd/integration/list_schedules/main.go
RUN go build -o ./list_entities ./cmd/integration/list_entities/main.go

# Command to run the executable
CMD ["./main", "--config=local_config.yaml"]
85 changes: 74 additions & 11 deletions internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"strconv"
"strings"

"ydbcp/internal/util/xlog"
Expand All @@ -13,6 +16,10 @@ import (
"go.uber.org/zap"
)

const (
DEFAULT_PAGE_SIZE = 50
)

type QueryFilter struct {
Field string
Values []table_types.Value
Expand All @@ -34,13 +41,36 @@ type OrderSpec struct {
Desc bool
}

type PageSpec struct {
Limit uint64
Offset uint64
}

func NewPageSpec(pageSize uint32, pageToken string) (*PageSpec, error) {
var pageSpec PageSpec
if pageSize == 0 {
pageSpec.Limit = DEFAULT_PAGE_SIZE
} else {
pageSpec.Limit = uint64(pageSize)
}
if pageToken != "" {
offset, err := strconv.ParseUint(pageToken, 10, 64)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "can't parse page token")
}
pageSpec.Offset = offset
}
return &pageSpec, nil
}

type ReadTableQueryImpl struct {
rawQuery *string
tableName string
filters [][]table_types.Value
filterFields []string
isLikeFilter map[string]bool
orderBy *OrderSpec
pageSpec *PageSpec
tableQueryParams []table.ParameterOption
}

Expand Down Expand Up @@ -100,6 +130,12 @@ func WithOrderBy(spec OrderSpec) ReadTableQueryOption {
}
}

func WithPageSpec(spec PageSpec) ReadTableQueryOption {
return func(d *ReadTableQueryImpl) {
d.pageSpec = &spec
}
}

func (d *ReadTableQueryImpl) AddTableQueryParam(paramValue table_types.Value) string {
paramName := fmt.Sprintf("$param%d", len(d.tableQueryParams))
d.tableQueryParams = append(
Expand Down Expand Up @@ -128,30 +164,57 @@ func (d *ReadTableQueryImpl) MakeFilterString() string {
return fmt.Sprintf(" WHERE %s", strings.Join(filterStrings, " AND "))
}

func (d *ReadTableQueryImpl) FormatOrder() *string {
if d.orderBy == nil {
return nil
}
descStr := ""
if d.orderBy.Desc == true {
descStr = " DESC"
}
orderBy := fmt.Sprintf(" ORDER BY %s%s", d.orderBy.Field, descStr)
return &orderBy
}

func (d *ReadTableQueryImpl) FormatPage() *string {
if d.pageSpec == nil {
return nil
}
page := ""
if d.pageSpec.Limit != 0 {
page = fmt.Sprintf(" LIMIT %d", d.pageSpec.Limit)
}
if d.pageSpec.Offset != 0 {
page += fmt.Sprintf(" OFFSET %d", d.pageSpec.Offset)
}
return &page
}

func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) {
var res string
filter := d.MakeFilterString()
orderBy := ""
if d.orderBy != nil {
descStr := ""
if d.orderBy.Desc == true {
descStr = " DESC"
}
orderBy = fmt.Sprintf(" ORDER BY %s%s", d.orderBy.Field, descStr)
}

if d.rawQuery == nil {
if len(d.tableName) == 0 {
return nil, errors.New("no table")
}
res = fmt.Sprintf(
"SELECT * FROM %s%s%s",
"SELECT * FROM %s%s",
d.tableName,
filter,
orderBy,
)
} else {
res = fmt.Sprintf("%s%s%s", *d.rawQuery, filter, orderBy)
res = fmt.Sprintf("%s%s", *d.rawQuery, filter)
}
order := d.FormatOrder()
if order != nil {
res += *order
}
page := d.FormatPage()
if page != nil {
res += *page
}

xlog.Debug(ctx, "read query", zap.String("yql", res))
return &FormatQueryResult{
QueryText: res,
Expand Down
Loading
Loading