Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigtable): Track number of rows read to set rowsLimit in subsequent requests #10213

Merged
merged 13 commits into from
Jan 22, 2025
Merged
38 changes: 33 additions & 5 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const prodAddr = "bigtable.googleapis.com:443"
const mtlsProdAddr = "bigtable.mtls.googleapis.com:443"
const featureFlagsHeaderKey = "bigtable-features"

var errNegativeRowLimit = errors.New("bigtable: row limit cannot be negative")

// Client is a client for reading and writing data to tables in an instance.
//
// A Client is safe to use concurrently, except for its Close method.
Expand Down Expand Up @@ -391,7 +393,25 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) {
var prevRowKey string
attrMap := make(map[string]interface{})

numRowsRead := int64(0)
rowLimitSet := false
intialRowLimit := int64(0)
for _, opt := range opts {
if l, ok := opt.(limitRows); ok {
rowLimitSet = true
intialRowLimit = l.limit
bhshkh marked this conversation as resolved.
Show resolved Hide resolved
}
}
if intialRowLimit < 0 {
return errNegativeRowLimit
}

err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
if rowLimitSet && numRowsRead >= intialRowLimit {
return nil
bhshkh marked this conversation as resolved.
Show resolved Hide resolved
}

req := &btpb.ReadRowsRequest{
AppProfileId: t.c.appProfile,
}
Expand All @@ -410,7 +430,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
}
req.Rows = arg.proto()
}
settings := makeReadSettings(req)
settings := makeReadSettings(req, numRowsRead)
for _, opt := range opts {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
opt.set(&settings)
}
Expand Down Expand Up @@ -473,7 +493,9 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
continue
}
prevRowKey = row.Key()
if !f(row) {
continueReading := f(row)
numRowsRead++
if !continueReading {
// Cancel and drain stream.
cancel()
for {
Expand Down Expand Up @@ -939,14 +961,16 @@ type FullReadStatsFunc func(*FullReadStats)
type readSettings struct {
req *btpb.ReadRowsRequest
fullReadStatsFunc FullReadStatsFunc
numRowsRead int64
}

func makeReadSettings(req *btpb.ReadRowsRequest) readSettings {
return readSettings{req, nil}
func makeReadSettings(req *btpb.ReadRowsRequest, numRowsRead int64) readSettings {
bhshkh marked this conversation as resolved.
Show resolved Hide resolved
return readSettings{req, nil, numRowsRead}
}

// A ReadOption is an optional argument to ReadRows.
type ReadOption interface {
// set modifies the request stored in the settings
set(settings *readSettings)
}

Expand All @@ -965,7 +989,11 @@ func LimitRows(limit int64) ReadOption { return limitRows{limit} }

type limitRows struct{ limit int64 }

func (lr limitRows) set(settings *readSettings) { settings.req.RowsLimit = lr.limit }
func (lr limitRows) set(settings *readSettings) {
// Since 'numRowsRead' out of 'limit' requested rows have already been read,
// the subsequest requests should fetch only the remaining rows.
settings.req.RowsLimit = lr.limit - settings.numRowsRead
bhshkh marked this conversation as resolved.
Show resolved Hide resolved
}

// WithFullReadStats returns a ReadOption that will request FullReadStats
// and invoke the given callback on the resulting FullReadStats.
Expand Down
105 changes: 105 additions & 0 deletions bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -715,6 +716,110 @@ func TestReadRowsRequestStats(t *testing.T) {
}
}

func TestReadRowsLimit(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
}
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer adminClient.Close()
tableConf := &TableConf{
TableID: testEnv.config.Table,
Families: map[string]GCPolicy{
"f": NoGcPolicy(),
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}

client, err := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClientWithConfig failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)

m := NewMutation()
m.Set("f", "q", ServerTime, []byte("value"))
if err = table.Apply(ctx, "row1", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}

m = NewMutation()
m.Set("f", "q", ServerTime, []byte("value"))
m.Set("f", "q2", ServerTime, []byte("value2"))
if err = table.Apply(ctx, "row2", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}

m = NewMutation()
m.Set("f", "excluded", ServerTime, []byte("value"))
if err = table.Apply(ctx, "row3", m); err != nil {
t.Fatalf("Apply failed: %v", err)
}

for _, test := range []struct {
desc string
limit *int64
wantRowCount int64
wantErr error
}{
{
desc: "No limit",
wantRowCount: 3,
},
{
desc: "Limit less than number of rows in table",
limit: ptr(int64(2)),
wantRowCount: 2,
},
{
desc: "Limit greater than number of rows in table",
limit: ptr(int64(5)),
wantRowCount: 3,
},
{
desc: "Negative row limit",
limit: ptr(int64(-1)),
wantErr: errNegativeRowLimit,
},
} {
gotRowCount := int64(0)
t.Run(test.desc, func(t *testing.T) {
opts := []ReadOption{}
if test.limit != nil {
opts = append(opts, LimitRows(*test.limit))
}
if err := table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
gotRowCount++
return true
}, opts...); !errors.Is(err, test.wantErr) {
t.Errorf("ReadRows err got: %v, want: %v", err, test.wantErr)
}

if gotRowCount != test.wantRowCount {
t.Errorf("ReadRows returned %d rows, want %d", gotRowCount, test.wantRowCount)
}
})
}
}

// ptr returns a pointer to its argument.
// It can be used to initialize pointer fields:
func ptr[T any](t T) *T { return &t }

// TestHeaderPopulatedWithAppProfile verifies that request params header is populated with table name and app profile
func TestHeaderPopulatedWithAppProfile(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
Expand Down
6 changes: 6 additions & 0 deletions bigtable/internal/testproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,19 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques

var c int32
var rowsPb []*btpb.Row

lim := req.GetCancelAfterRows()

reversed := req.GetRequest().GetReversed()
opts := []bigtable.ReadOption{}
if reversed {
opts = append(opts, bigtable.ReverseScan())
}

rowsLimit := req.GetRequest().GetRowsLimit()
if rowsLimit > 0 {
opts = append(opts, bigtable.LimitRows(rowsLimit))
}
err = t.ReadRows(ctx, rs, func(r bigtable.Row) bool {

c++
Expand Down
56 changes: 56 additions & 0 deletions bigtable/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,62 @@ func TestRetryReadRows(t *testing.T) {
}
}

func TestRetryReadRowsLimit(t *testing.T) {
ctx := context.Background()

// Intercept requests and delegate to an interceptor defined by the test case
errCount := 0
var f func(grpc.ServerStream) error
errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if strings.HasSuffix(info.FullMethod, "ReadRows") {
return f(ss)
}
return handler(ctx, ss)
}

tbl, cleanup, err := setupDefaultFakeServer(grpc.StreamInterceptor(errInjector))
defer cleanup()
if err != nil {
t.Fatalf("fake server setup: %v", err)
}

initialRowLimit := int64(3)

errCount = 0
// Test overall request failure and retries
f = func(ss grpc.ServerStream) error {
var err error
req := new(btpb.ReadRowsRequest)
must(ss.RecvMsg(req))
switch errCount {
case 0:
if want, got := initialRowLimit, req.RowsLimit; want != got {
t.Errorf("RowsLimit: got %v, want %v", got, want)
}
must(writeReadRowsResponse(ss, "a", "b"))
err = status.Errorf(codes.Unavailable, "")
case 1:
if want, got := initialRowLimit-2, req.RowsLimit; want != got {
t.Errorf("RowsLimit: got %v, want %v", got, want)
}
must(writeReadRowsResponse(ss, "c"))
err = nil
}
errCount++
return err
}

var got []string
must(tbl.ReadRows(ctx, NewRange("a", "z"), func(r Row) bool {
got = append(got, r.Key())
return true
}, LimitRows(initialRowLimit)))
want := []string{"a", "b", "c"}
if !testutil.Equal(got, want) {
t.Errorf("retry range integration: got %v, want %v", got, want)
}
}

func TestRetryReverseReadRows(t *testing.T) {
ctx := context.Background()

Expand Down
Loading