Skip to content

Commit

Permalink
fix(bigtable): Track number of readrows to set rowsLimit in subsequen…
Browse files Browse the repository at this point in the history
…t requests (#10213)

* test(bigtable): Use rowslimit in proxy

* test(bigtable): Limit rows only if specified by client

* refactor(bigtable): Refactoring code

* track rows read in the library instead of proxy
  • Loading branch information
bhshkh authored Jan 22, 2025
1 parent 793e7d0 commit abb615e
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 5 deletions.
38 changes: 33 additions & 5 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const prodAddr = "bigtable.googleapis.com:443"
const mtlsProdAddr = "bigtable.mtls.googleapis.com:443"
const featureFlagsHeaderKey = "bigtable-features"

var errNegativeRowLimit = errors.New("bigtable: row limit cannot be negative")

// Client is a client for reading and writing data to tables in an instance.
//
// A Client is safe to use concurrently, except for its Close method.
Expand Down Expand Up @@ -391,7 +393,25 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) {
var prevRowKey string
attrMap := make(map[string]interface{})

numRowsRead := int64(0)
rowLimitSet := false
intialRowLimit := int64(0)
for _, opt := range opts {
if l, ok := opt.(limitRows); ok {
rowLimitSet = true
intialRowLimit = l.limit
}
}
if intialRowLimit < 0 {
return errNegativeRowLimit
}

err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
if rowLimitSet && numRowsRead >= intialRowLimit {
return nil
}

req := &btpb.ReadRowsRequest{
AppProfileId: t.c.appProfile,
}
Expand All @@ -410,7 +430,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
}
req.Rows = arg.proto()
}
settings := makeReadSettings(req)
settings := makeReadSettings(req, numRowsRead)
for _, opt := range opts {
opt.set(&settings)
}
Expand Down Expand Up @@ -473,7 +493,9 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
continue
}
prevRowKey = row.Key()
if !f(row) {
continueReading := f(row)
numRowsRead++
if !continueReading {
// Cancel and drain stream.
cancel()
for {
Expand Down Expand Up @@ -939,14 +961,16 @@ type FullReadStatsFunc func(*FullReadStats)
type readSettings struct {
req *btpb.ReadRowsRequest
fullReadStatsFunc FullReadStatsFunc
numRowsRead int64
}

func makeReadSettings(req *btpb.ReadRowsRequest) readSettings {
return readSettings{req, nil}
func makeReadSettings(req *btpb.ReadRowsRequest, numRowsRead int64) readSettings {
return readSettings{req, nil, numRowsRead}
}

// A ReadOption is an optional argument to ReadRows.
type ReadOption interface {
// set modifies the request stored in the settings
set(settings *readSettings)
}

Expand All @@ -965,7 +989,11 @@ func LimitRows(limit int64) ReadOption { return limitRows{limit} }

type limitRows struct{ limit int64 }

func (lr limitRows) set(settings *readSettings) { settings.req.RowsLimit = lr.limit }
func (lr limitRows) set(settings *readSettings) {
// Since 'numRowsRead' out of 'limit' requested rows have already been read,
// the subsequest requests should fetch only the remaining rows.
settings.req.RowsLimit = lr.limit - settings.numRowsRead
}

// WithFullReadStats returns a ReadOption that will request FullReadStats
// and invoke the given callback on the resulting FullReadStats.
Expand Down
105 changes: 105 additions & 0 deletions bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -715,6 +716,110 @@ func TestReadRowsRequestStats(t *testing.T) {
}
}

func TestReadRowsLimit(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
}
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer adminClient.Close()
tableConf := &TableConf{
TableID: testEnv.config.Table,
Families: map[string]GCPolicy{
"f": NoGcPolicy(),
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}

client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClientWithConfig failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)

m := NewMutation()
m.Set("f", "q", ServerTime, []byte("value"))
if err = table.Apply(ctx, "row1", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}

m = NewMutation()
m.Set("f", "q", ServerTime, []byte("value"))
m.Set("f", "q2", ServerTime, []byte("value2"))
if err = table.Apply(ctx, "row2", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}

m = NewMutation()
m.Set("f", "excluded", ServerTime, []byte("value"))
if err = table.Apply(ctx, "row3", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}

for _, test := range []struct {
desc string
limit *int64
wantRowCount int64
wantErr error
}{
{
desc: "No limit",
wantRowCount: 3,
},
{
desc: "Limit less than number of rows in table",
limit: ptr(int64(2)),
wantRowCount: 2,
},
{
desc: "Limit greater than number of rows in table",
limit: ptr(int64(5)),
wantRowCount: 3,
},
{
desc: "Negative row limit",
limit: ptr(int64(-1)),
wantErr: errNegativeRowLimit,
},
} {
gotRowCount := int64(0)
t.Run(test.desc, func(t *testing.T) {
opts := []ReadOption{}
if test.limit != nil {
opts = append(opts, LimitRows(*test.limit))
}
if err := table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
gotRowCount++
return true
}, opts...); !errors.Is(err, test.wantErr) {
t.Errorf("ReadRows err got: %v, want: %v", err, test.wantErr)
}

if gotRowCount != test.wantRowCount {
t.Errorf("ReadRows returned %d rows, want %d", gotRowCount, test.wantRowCount)
}
})
}
}

// ptr returns a pointer to its argument.
// It can be used to initialize pointer fields:
func ptr[T any](t T) *T { return &t }

// TestHeaderPopulatedWithAppProfile verifies that request params header is populated with table name and app profile
func TestHeaderPopulatedWithAppProfile(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
Expand Down
6 changes: 6 additions & 0 deletions bigtable/internal/testproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,19 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques

var c int32
var rowsPb []*btpb.Row

lim := req.GetCancelAfterRows()

reversed := req.GetRequest().GetReversed()
opts := []bigtable.ReadOption{}
if reversed {
opts = append(opts, bigtable.ReverseScan())
}

rowsLimit := req.GetRequest().GetRowsLimit()
if rowsLimit > 0 {
opts = append(opts, bigtable.LimitRows(rowsLimit))
}
err = t.ReadRows(ctx, rs, func(r bigtable.Row) bool {

c++
Expand Down
56 changes: 56 additions & 0 deletions bigtable/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,62 @@ func TestRetryReadRows(t *testing.T) {
}
}

func TestRetryReadRowsLimit(t *testing.T) {
ctx := context.Background()

// Intercept requests and delegate to an interceptor defined by the test case
errCount := 0
var f func(grpc.ServerStream) error
errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if strings.HasSuffix(info.FullMethod, "ReadRows") {
return f(ss)
}
return handler(ctx, ss)
}

tbl, cleanup, err := setupDefaultFakeServer(grpc.StreamInterceptor(errInjector))
defer cleanup()
if err != nil {
t.Fatalf("fake server setup: %v", err)
}

initialRowLimit := int64(3)

errCount = 0
// Test overall request failure and retries
f = func(ss grpc.ServerStream) error {
var err error
req := new(btpb.ReadRowsRequest)
must(ss.RecvMsg(req))
switch errCount {
case 0:
if want, got := initialRowLimit, req.RowsLimit; want != got {
t.Errorf("RowsLimit: got %v, want %v", got, want)
}
must(writeReadRowsResponse(ss, "a", "b"))
err = status.Errorf(codes.Unavailable, "")
case 1:
if want, got := initialRowLimit-2, req.RowsLimit; want != got {
t.Errorf("RowsLimit: got %v, want %v", got, want)
}
must(writeReadRowsResponse(ss, "c"))
err = nil
}
errCount++
return err
}

var got []string
must(tbl.ReadRows(ctx, NewRange("a", "z"), func(r Row) bool {
got = append(got, r.Key())
return true
}, LimitRows(initialRowLimit)))
want := []string{"a", "b", "c"}
if !testutil.Equal(got, want) {
t.Errorf("retry range integration: got %v, want %v", got, want)
}
}

func TestRetryReverseReadRows(t *testing.T) {
ctx := context.Background()

Expand Down

0 comments on commit abb615e

Please sign in to comment.