From ab8b94afeb277ff195655629ec9f1249518d15ae Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Thu, 16 May 2024 00:08:05 +0000 Subject: [PATCH 1/6] test(bigtable): Use rowslimit in proxy --- bigtable/internal/testproxy/proxy.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/bigtable/internal/testproxy/proxy.go b/bigtable/internal/testproxy/proxy.go index d5bf11be4753..5c4c92b5e828 100644 --- a/bigtable/internal/testproxy/proxy.go +++ b/bigtable/internal/testproxy/proxy.go @@ -566,16 +566,28 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques rowPbs := rrq.Rows rs := rowSetFromProto(rowPbs) + // Add row limit read option + rowsToRead := int64(0) + readOpts := []bigtable.ReadOption{} + if rrq.RowsLimit != 0 { + readOpts = append(readOpts, bigtable.LimitRows(rrq.RowsLimit)) + rowsToRead = rrq.RowsLimit + } + ctx, cancel := btc.timeout(ctx) defer cancel() - var c int32 + var rowsReadTillNow int64 var rowsPb []*btpb.Row - lim := req.GetCancelAfterRows() + if req.GetCancelAfterRows() != 0 { + rowsToRead = int64(req.GetCancelAfterRows()) + } + + // Client libray does not have a built-in way to limit the number of rows read in a call to Table.ReadRows(). + // The caller needs to keep track of any kind of limit externally and from within the callback function passed to ReadRows() err = t.ReadRows(ctx, rs, func(r bigtable.Row) bool { - c++ - if c == lim { + if rowsReadTillNow == rowsToRead { return false } rpb, err := rowToProto(r) @@ -583,8 +595,9 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques return false } rowsPb = append(rowsPb, rpb) + rowsReadTillNow++ return true - }) + }, readOpts...) res := &pb.RowsResult{ Status: &statpb.Status{ From cc7d22042eead68ca02f5364480e572020107814 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Thu, 16 May 2024 00:33:44 +0000 Subject: [PATCH 2/6] test(bigtable): Limit rows only if specified by client --- bigtable/internal/testproxy/proxy.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bigtable/internal/testproxy/proxy.go b/bigtable/internal/testproxy/proxy.go index 5c4c92b5e828..763dc3182fe3 100644 --- a/bigtable/internal/testproxy/proxy.go +++ b/bigtable/internal/testproxy/proxy.go @@ -567,9 +567,11 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques rs := rowSetFromProto(rowPbs) // Add row limit read option + limitRowsRead := false rowsToRead := int64(0) readOpts := []bigtable.ReadOption{} if rrq.RowsLimit != 0 { + limitRowsRead = true readOpts = append(readOpts, bigtable.LimitRows(rrq.RowsLimit)) rowsToRead = rrq.RowsLimit } @@ -580,6 +582,7 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques var rowsReadTillNow int64 var rowsPb []*btpb.Row if req.GetCancelAfterRows() != 0 { + limitRowsRead = true rowsToRead = int64(req.GetCancelAfterRows()) } @@ -587,7 +590,7 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques // The caller needs to keep track of any kind of limit externally and from within the callback function passed to ReadRows() err = t.ReadRows(ctx, rs, func(r bigtable.Row) bool { - if rowsReadTillNow == rowsToRead { + if limitRowsRead && rowsReadTillNow == rowsToRead { return false } rpb, err := rowToProto(r) From cd50e9da86c99984d318087643e538b21b21096e Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Thu, 16 May 2024 19:11:17 +0000 Subject: [PATCH 3/6] refactor(bigtable): Refactoring code --- bigtable/internal/testproxy/proxy.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/bigtable/internal/testproxy/proxy.go b/bigtable/internal/testproxy/proxy.go index 763dc3182fe3..3c99318c42ba 100644 --- a/bigtable/internal/testproxy/proxy.go +++ b/bigtable/internal/testproxy/proxy.go @@ -566,21 +566,18 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques rowPbs := rrq.Rows rs := rowSetFromProto(rowPbs) - // Add row limit read option + ctx, cancel := btc.timeout(ctx) + defer cancel() + + var rowsReadTillNow int64 + var rowsPb []*btpb.Row + limitRowsRead := false rowsToRead := int64(0) - readOpts := []bigtable.ReadOption{} if rrq.RowsLimit != 0 { limitRowsRead = true - readOpts = append(readOpts, bigtable.LimitRows(rrq.RowsLimit)) rowsToRead = rrq.RowsLimit } - - ctx, cancel := btc.timeout(ctx) - defer cancel() - - var rowsReadTillNow int64 - var rowsPb []*btpb.Row if req.GetCancelAfterRows() != 0 { limitRowsRead = true rowsToRead = int64(req.GetCancelAfterRows()) @@ -600,7 +597,7 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques rowsPb = append(rowsPb, rpb) rowsReadTillNow++ return true - }, readOpts...) + }) res := &pb.RowsResult{ Status: &statpb.Status{ From bc0888239529bd1a8479c3b22180e0ff3bf83179 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Mon, 13 Jan 2025 22:19:46 +0000 Subject: [PATCH 4/6] track rows read in the library instead of proxy --- bigtable/bigtable.go | 29 +++++++++++--- bigtable/internal/testproxy/proxy.go | 11 ++++-- bigtable/retry_test.go | 56 ++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 8 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 94e21f4b6994..2e03d1f9b837 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -391,7 +391,21 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) { var prevRowKey string attrMap := make(map[string]interface{}) + + numRowsRead := int64(0) + rowLimitSet := false + intialRowLimit := int64(0) + for _, opt := range opts { + if l, ok := opt.(limitRows); ok { + rowLimitSet = true + intialRowLimit = l.limit + } + } err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + if rowLimitSet && numRowsRead == intialRowLimit { + return nil + } + req := &btpb.ReadRowsRequest{ AppProfileId: t.c.appProfile, } @@ -410,7 +424,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt * } req.Rows = arg.proto() } - settings := makeReadSettings(req) + settings := makeReadSettings(req, numRowsRead) for _, opt := range opts { opt.set(&settings) } @@ -473,7 +487,9 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt * continue } prevRowKey = row.Key() - if !f(row) { + continueReading := f(row) + numRowsRead++ + if !continueReading { // Cancel and drain stream. cancel() for { @@ -939,10 +955,11 @@ type FullReadStatsFunc func(*FullReadStats) type readSettings struct { req *btpb.ReadRowsRequest fullReadStatsFunc FullReadStatsFunc + numRowsRead int64 } -func makeReadSettings(req *btpb.ReadRowsRequest) readSettings { - return readSettings{req, nil} +func makeReadSettings(req *btpb.ReadRowsRequest, numRowsRead int64) readSettings { + return readSettings{req, nil, numRowsRead} } // A ReadOption is an optional argument to ReadRows. @@ -965,7 +982,9 @@ func LimitRows(limit int64) ReadOption { return limitRows{limit} } type limitRows struct{ limit int64 } -func (lr limitRows) set(settings *readSettings) { settings.req.RowsLimit = lr.limit } +func (lr limitRows) set(settings *readSettings) { + settings.req.RowsLimit = lr.limit - settings.numRowsRead +} // WithFullReadStats returns a ReadOption that will request FullReadStats // and invoke the given callback on the resulting FullReadStats. diff --git a/bigtable/internal/testproxy/proxy.go b/bigtable/internal/testproxy/proxy.go index 97e5f9761b71..e3237bcaf8c7 100644 --- a/bigtable/internal/testproxy/proxy.go +++ b/bigtable/internal/testproxy/proxy.go @@ -570,7 +570,7 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques ctx, cancel := btc.timeout(ctx) defer cancel() - var rowsReadTillNow int64 + var c int32 var rowsPb []*btpb.Row lim := req.GetCancelAfterRows() @@ -580,9 +580,15 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques if reversed { opts = append(opts, bigtable.ReverseScan()) } + + rowsLimit := req.GetRequest().GetRowsLimit() + if rowsLimit > 0 { + opts = append(opts, bigtable.LimitRows(rowsLimit)) + } err = t.ReadRows(ctx, rs, func(r bigtable.Row) bool { - if limitRowsRead && rowsReadTillNow == rowsToRead { + c++ + if c == lim { return false } rpb, err := rowToProto(r) @@ -590,7 +596,6 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques return false } rowsPb = append(rowsPb, rpb) - rowsReadTillNow++ return true }, opts...) diff --git a/bigtable/retry_test.go b/bigtable/retry_test.go index f330dac16616..6f1ee3a0c21f 100644 --- a/bigtable/retry_test.go +++ b/bigtable/retry_test.go @@ -467,6 +467,62 @@ func TestRetryReadRows(t *testing.T) { } } +func TestRetryReadRowsLimit(t *testing.T) { + ctx := context.Background() + + // Intercept requests and delegate to an interceptor defined by the test case + errCount := 0 + var f func(grpc.ServerStream) error + errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if strings.HasSuffix(info.FullMethod, "ReadRows") { + return f(ss) + } + return handler(ctx, ss) + } + + tbl, cleanup, err := setupDefaultFakeServer(grpc.StreamInterceptor(errInjector)) + defer cleanup() + if err != nil { + t.Fatalf("fake server setup: %v", err) + } + + initialRowLimit := int64(3) + + errCount = 0 + // Test overall request failure and retries + f = func(ss grpc.ServerStream) error { + var err error + req := new(btpb.ReadRowsRequest) + must(ss.RecvMsg(req)) + switch errCount { + case 0: + if want, got := initialRowLimit, req.RowsLimit; want != got { + t.Errorf("RowsLimit: got %v, want %v", got, want) + } + must(writeReadRowsResponse(ss, "a", "b")) + err = status.Errorf(codes.Unavailable, "") + case 1: + if want, got := initialRowLimit-2, req.RowsLimit; want != got { + t.Errorf("RowsLimit: got %v, want %v", got, want) + } + must(writeReadRowsResponse(ss, "c")) + err = nil + } + errCount++ + return err + } + + var got []string + must(tbl.ReadRows(ctx, NewRange("a", "z"), func(r Row) bool { + got = append(got, r.Key()) + return true + }, LimitRows(initialRowLimit))) + want := []string{"a", "b", "c"} + if !testutil.Equal(got, want) { + t.Errorf("retry range integration: got %v, want %v", got, want) + } +} + func TestRetryReverseReadRows(t *testing.T) { ctx := context.Background() From 3d8ee82bc84bb62bcc733cf4a75ef5832069cfec Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Wed, 15 Jan 2025 21:20:25 +0000 Subject: [PATCH 5/6] add unit tests for row limit --- bigtable/bigtable.go | 9 +++- bigtable/bigtable_test.go | 98 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 1 deletion(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index ce69b2e85b40..df80d616ef37 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -401,8 +401,12 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt * intialRowLimit = l.limit } } + if intialRowLimit < 0 { + return nil + } + err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { - if rowLimitSet && numRowsRead == intialRowLimit { + if rowLimitSet && numRowsRead >= intialRowLimit { return nil } @@ -964,6 +968,7 @@ func makeReadSettings(req *btpb.ReadRowsRequest, numRowsRead int64) readSettings // A ReadOption is an optional argument to ReadRows. type ReadOption interface { + // set modifies the request stored in the settings set(settings *readSettings) } @@ -983,6 +988,8 @@ func LimitRows(limit int64) ReadOption { return limitRows{limit} } type limitRows struct{ limit int64 } func (lr limitRows) set(settings *readSettings) { + // Since 'numRowsRead' out of 'limit' requested rows have already been read, + // the subsequest requests should fetch only the remaining rows. settings.req.RowsLimit = lr.limit - settings.numRowsRead } diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index 6664670ea6b0..1c6bba641458 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -713,6 +713,104 @@ func TestReadRowsRequestStats(t *testing.T) { } } +func TestReadRowsLimit(t *testing.T) { + testEnv, err := NewEmulatedEnv(IntegrationTestConfig{}) + if err != nil { + t.Fatalf("NewEmulatedEnv failed: %v", err) + } + conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)), + ) + if err != nil { + t.Fatalf("grpc.Dial failed: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + if err != nil { + t.Fatalf("NewClient failed: %v", err) + } + defer adminClient.Close() + tableConf := &TableConf{ + TableID: testEnv.config.Table, + Families: map[string]GCPolicy{ + "f": NoGcPolicy(), + }, + } + if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil { + t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err) + } + + client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn)) + if err != nil { + t.Fatalf("NewClientWithConfig failed: %v", err) + } + defer client.Close() + table := client.Open(testEnv.config.Table) + + m := NewMutation() + m.Set("f", "q", ServerTime, []byte("value")) + if err = table.Apply(ctx, "row1", m); err != nil { + t.Fatalf("Apply failed: %v", err) + } + + m = NewMutation() + m.Set("f", "q", ServerTime, []byte("value")) + m.Set("f", "q2", ServerTime, []byte("value2")) + if err = table.Apply(ctx, "row2", m); err != nil { + t.Fatalf("Apply failed: %v", err) + } + + m = NewMutation() + m.Set("f", "excluded", ServerTime, []byte("value")) + if err = table.Apply(ctx, "row3", m); err != nil { + t.Fatalf("Apply failed: %v", err) + } + + for _, test := range []struct { + desc string + limit *int64 + wantRowCount int64 + }{ + { + desc: "No limit", + wantRowCount: 3, + }, + { + desc: "Limit less than number of rows in table", + limit: ptr(int64(2)), + wantRowCount: 2, + }, + { + desc: "Limit greater than number of rows in table", + limit: ptr(int64(5)), + wantRowCount: 3, + }, + } { + gotRowCount := int64(0) + t.Run(test.desc, func(t *testing.T) { + opts := []ReadOption{} + if test.limit != nil { + opts = append(opts, LimitRows(*test.limit)) + } + if err := table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { + gotRowCount++ + return true + }, opts...); err != nil { + t.Errorf("ReadRows failed: %v", err) + } + + if gotRowCount != test.wantRowCount { + t.Errorf("ReadRows returned %d rows, want %d", gotRowCount, test.wantRowCount) + } + }) + } +} + +// ptr returns a pointer to its argument. +// It can be used to initialize pointer fields: +func ptr[T any](t T) *T { return &t } + // TestHeaderPopulatedWithAppProfile verifies that request params header is populated with table name and app profile func TestHeaderPopulatedWithAppProfile(t *testing.T) { testEnv, err := NewEmulatedEnv(IntegrationTestConfig{}) From 73b8b62e18d2e97c366aafc81002b62cf2f65113 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Tue, 21 Jan 2025 17:39:39 +0000 Subject: [PATCH 6/6] return error on negative row limit --- bigtable/bigtable.go | 4 +++- bigtable/bigtable_test.go | 11 +++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 1b4cab996c32..1cbb548154d0 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -53,6 +53,8 @@ const prodAddr = "bigtable.googleapis.com:443" const mtlsProdAddr = "bigtable.mtls.googleapis.com:443" const featureFlagsHeaderKey = "bigtable-features" +var errNegativeRowLimit = errors.New("bigtable: row limit cannot be negative") + // Client is a client for reading and writing data to tables in an instance. // // A Client is safe to use concurrently, except for its Close method. @@ -402,7 +404,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt * } } if intialRowLimit < 0 { - return nil + return errNegativeRowLimit } err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index 467924e6bf1a..ff322b46f596 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/binary" + "errors" "reflect" "testing" "time" @@ -773,6 +774,7 @@ func TestReadRowsLimit(t *testing.T) { desc string limit *int64 wantRowCount int64 + wantErr error }{ { desc: "No limit", @@ -788,6 +790,11 @@ func TestReadRowsLimit(t *testing.T) { limit: ptr(int64(5)), wantRowCount: 3, }, + { + desc: "Negative row limit", + limit: ptr(int64(-1)), + wantErr: errNegativeRowLimit, + }, } { gotRowCount := int64(0) t.Run(test.desc, func(t *testing.T) { @@ -798,8 +805,8 @@ func TestReadRowsLimit(t *testing.T) { if err := table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { gotRowCount++ return true - }, opts...); err != nil { - t.Errorf("ReadRows failed: %v", err) + }, opts...); !errors.Is(err, test.wantErr) { + t.Errorf("ReadRows err got: %v, want: %v", err, test.wantErr) } if gotRowCount != test.wantRowCount {