Skip to content

Commit

Permalink
Merge pull request #262 from tigrisdata/main
Browse files Browse the repository at this point in the history
Alpha release
  • Loading branch information
efirs authored May 23, 2022
2 parents db52902 + 12e0308 commit c5e69e2
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 34 deletions.
2 changes: 1 addition & 1 deletion api/proto
Submodule proto updated from 56ba26 to 778137
28 changes: 28 additions & 0 deletions api/server/v1/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (x *ReadResponse) MarshalJSON() ([]byte, error) {
type Metadata struct {
CreatedAt *time.Time `json:"created_at,omitempty"`
UpdatedAt *time.Time `json:"updated_at,omitempty"`
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

func CreateMDFromResponseMD(x *ResponseMetadata) Metadata {
Expand All @@ -81,6 +82,10 @@ func CreateMDFromResponseMD(x *ResponseMetadata) Metadata {
tm := x.UpdatedAt.AsTime()
md.UpdatedAt = &tm
}
if x.DeletedAt != nil {
tm := x.DeletedAt.AsTime()
md.DeletedAt = &tm
}

return md
}
Expand Down Expand Up @@ -348,3 +353,26 @@ func (x *StreamResponse) MarshalJSON() ([]byte, error) {
}
return json.Marshal(resp)
}

// Proper marshal timestamp in metadata
type dmlResponse struct {
Metadata Metadata `json:"metadata,omitempty"`
Status string `json:"status,omitempty"`
ModifiedCount int32 `json:"modified_count,omitempty"`
}

func (x *InsertResponse) MarshalJSON() ([]byte, error) {
return json.Marshal(&dmlResponse{Metadata: CreateMDFromResponseMD(x.Metadata), Status: x.Status})
}

func (x *ReplaceResponse) MarshalJSON() ([]byte, error) {
return json.Marshal(&dmlResponse{Metadata: CreateMDFromResponseMD(x.Metadata), Status: x.Status})
}

func (x *DeleteResponse) MarshalJSON() ([]byte, error) {
return json.Marshal(&dmlResponse{Metadata: CreateMDFromResponseMD(x.Metadata), Status: x.Status})
}

func (x *UpdateResponse) MarshalJSON() ([]byte, error) {
return json.Marshal(&dmlResponse{Metadata: CreateMDFromResponseMD(x.Metadata), Status: x.Status, ModifiedCount: x.ModifiedCount})
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.7.1
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.12
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.14
github.com/ugorji/go/codec v1.2.7
github.com/valyala/bytebufferpool v1.0.0
google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.12 h1:xVnPHH5wpYjcO1AJB8tTUeW/sSsje7X1qAfEbyUEvx0=
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.12/go.mod h1:CfJzMcMH2+hwBowhURVBtHErh3gmvgYZi9mIfLsM7kw=
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.14 h1:l2a4WDGc+GS/buemgS9eI8VOF9MyYcLC2SF9zQ3T2VQ=
github.com/tigrisdata/tigris-client-go v1.0.0-alpha.14/go.mod h1:CfJzMcMH2+hwBowhURVBtHErh3gmvgYZi9mIfLsM7kw=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
Expand Down
19 changes: 19 additions & 0 deletions server/midddleware/header.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package middleware

import "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"

const (
// RequestTimeoutHeader is an end-to-end request header that indicates the maximum time that a client is
// prepared to await a response. The value of the header is in seconds. A client is allowed to send fractional
// values. For ex, 0.1 means 100milliseconds.
RequestTimeoutHeader = "Request-Timeout"
)

func CustomMatcher(key string) (string, bool) {
switch key {
case RequestTimeoutHeader:
return key, true
default:
return runtime.DefaultHeaderMatcher(key)
}
}
30 changes: 25 additions & 5 deletions server/midddleware/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package middleware

import (
"context"
"strconv"
"time"

"google.golang.org/grpc"
Expand All @@ -24,8 +25,8 @@ import (
)

var (
DefaultTimeout = 5 * time.Second
MaximumTimeout = 300 * time.Second
DefaultTimeout = 2 * time.Second
MaximumTimeout = 5 * time.Second
)

// TimeoutUnaryServerInterceptor returns a new unary server interceptor
Expand All @@ -34,10 +35,9 @@ func TimeoutUnaryServerInterceptor(timeout time.Duration) grpc.UnaryServerInterc
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (iface interface{}, err error) {
var cancel context.CancelFunc

//FIXME: pass and use HTTP timeout
ctx, cancel = setDeadlineUsingHeader(ctx)

d, ok := ctx.Deadline()

if ok && time.Until(d) > MaximumTimeout {
timeout = MaximumTimeout
ok = false
Expand All @@ -52,10 +52,30 @@ func TimeoutUnaryServerInterceptor(timeout time.Duration) grpc.UnaryServerInterc
cancel()
}
if ctx.Err() == context.DeadlineExceeded {
err = status.Errorf(codes.DeadlineExceeded, "Timeout")
err = status.Errorf(codes.DeadlineExceeded, "context deadline exceeded")
}
}()

return handler(ctx, req)
}
}

func setDeadlineUsingHeader(ctx context.Context) (context.Context, context.CancelFunc) {
value := getHeader(ctx, RequestTimeoutHeader)
if len(value) == 0 {
return ctx, nil
}

// header is set for timeout
parsedV, err := strconv.ParseFloat(value, 64)
if err != nil {
// use the default timeout
return ctx, nil
}

milliseconds := int64(parsedV * 1000)
if _, ok := ctx.Deadline(); !ok {
return context.WithDeadline(ctx, time.Now().Add(time.Duration(milliseconds)*time.Millisecond))
}
return ctx, nil
}
26 changes: 26 additions & 0 deletions server/midddleware/timeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package middleware

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
)

func TestTimeout(t *testing.T) {
ctx := context.Background()
ctx = metadata.NewIncomingContext(ctx, metadata.New(map[string]string{
RequestTimeoutHeader: "0.2",
}))
ctx, _ = setDeadlineUsingHeader(ctx)
deadline, ok := ctx.Deadline()
require.True(t, ok)
require.WithinDuration(t, time.Now().Add(200*time.Millisecond), deadline, 50*time.Millisecond)

ctx = context.Background()
ctx, _ = setDeadlineUsingHeader(ctx)
_, ok = ctx.Deadline()
require.False(t, ok)
}
10 changes: 6 additions & 4 deletions server/services/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/tigrisdata/tigris/cdc"
"github.com/tigrisdata/tigris/internal"
"github.com/tigrisdata/tigris/server/metadata"
middleware "github.com/tigrisdata/tigris/server/midddleware"
"github.com/tigrisdata/tigris/server/transaction"
"github.com/tigrisdata/tigris/store/kv"
"github.com/tigrisdata/tigris/util"
Expand Down Expand Up @@ -89,9 +90,10 @@ func newApiService(kv kv.KeyValueStore) *apiService {
}

func (s *apiService) RegisterHTTP(router chi.Router, inproc *inprocgrpc.Channel) error {
mux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &api.CustomMarshaler{
JSONBuiltin: &runtime.JSONBuiltin{},
}))
mux := runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard, &api.CustomMarshaler{JSONBuiltin: &runtime.JSONBuiltin{}}),
runtime.WithIncomingHeaderMatcher(middleware.CustomMatcher),
)

if err := api.RegisterTigrisHandlerClient(context.TODO(), mux, api.NewTigrisClient(inproc)); err != nil {
return err
Expand Down Expand Up @@ -252,7 +254,7 @@ func (s *apiService) Delete(ctx context.Context, r *api.DeleteRequest) (*api.Del
return &api.DeleteResponse{
Status: resp.status,
Metadata: &api.ResponseMetadata{
UpdatedAt: resp.updatedAt.GetProtoTS(),
DeletedAt: resp.deletedAt.GetProtoTS(),
},
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/services/v1/query_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (runner *DeleteQueryRunner) Run(ctx context.Context, tx transaction.Tx, ten

return &Response{
status: DeletedStatus,
updatedAt: ts,
deletedAt: ts,
}, ctx, nil
}

Expand Down
1 change: 1 addition & 0 deletions server/services/v1/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ type Response struct {
status string
createdAt *internal.Timestamp
updatedAt *internal.Timestamp
deletedAt *internal.Timestamp
modifiedCount int32
}
22 changes: 18 additions & 4 deletions server/services/v1/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/rs/zerolog/log"
api "github.com/tigrisdata/tigris/api/server/v1"
"github.com/tigrisdata/tigris/server/metadata"
"github.com/tigrisdata/tigris/server/midddleware"
"github.com/tigrisdata/tigris/server/transaction"
"github.com/tigrisdata/tigris/store/kv"
ulog "github.com/tigrisdata/tigris/util/log"
Expand Down Expand Up @@ -133,6 +134,7 @@ func (sessMgr *SessionManager) Execute(ctx context.Context, req *ReqOptions) (*R
}

func (sessMgr *SessionManager) executeWithRetry(ctx context.Context, req *ReqOptions) (resp *Response, err error) {
delta := time.Duration(50) * time.Millisecond
start := time.Now()
for {
var session *QuerySession
Expand All @@ -150,12 +152,24 @@ func (sessMgr *SessionManager) executeWithRetry(ctx context.Context, req *ReqOpt
if err != kv.ErrConflictingTransaction {
return
}
if time.Since(start) > 2*time.Second {

select {
case <-ctx.Done():
return
default:
d, ok := ctx.Deadline()
if ok && time.Until(d) <= delta {
// if remaining is less than delta then probably not worth retrying
return
}
if !ok && time.Since(start) > (middleware.DefaultTimeout-delta) {
// this should not happen, adding a safeguard
return
}

log.Debug().Msgf("retrying transactions id: %s, since: %v", session.txCtx.Id, time.Since(start))
time.Sleep(time.Duration(rand.Intn(25)) * time.Millisecond)
}

log.Debug().Msgf("retrying transactions id: %s, since: %v", session.txCtx.Id, time.Since(start))
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
}
}

Expand Down
21 changes: 10 additions & 11 deletions test/v1/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tigrisdata/tigris-client-go/config"
"github.com/tigrisdata/tigris-client-go/fields"
"github.com/tigrisdata/tigris-client-go/filter"
"github.com/tigrisdata/tigris-client-go/projection"
"github.com/tigrisdata/tigris-client-go/tigris"
"github.com/tigrisdata/tigris-client-go/update"
)

func TestClientCollectionBasic(t *testing.T) {
Expand Down Expand Up @@ -72,14 +71,14 @@ func TestClientCollectionBasic(t *testing.T) {
_, err = c.Update(ctx, filter.Or(
filter.Eq("Key1", "aaa"),
filter.Eq("Key1", "bbb")),
update.Set("Field1", 345),
fields.Set("Field1", 345),
)
require.NoError(t, err)

it, err := c.Read(ctx, filter.Or(
filter.Eq("Key1", "aaa"),
filter.Eq("Key1", "ccc")),
projection.Exclude("Key1").
fields.Exclude("Key1").
Include("Field1"),
)
require.NoError(t, err)
Expand All @@ -93,7 +92,7 @@ func TestClientCollectionBasic(t *testing.T) {
require.NoError(t, it.Err())
it.Close()

it, err = c.ReadAll(ctx, projection.All)
it, err = c.ReadAll(ctx, fields.All)
require.NoError(t, err)
it.Close()

Expand Down Expand Up @@ -137,8 +136,8 @@ func TestClientCollectionTx(t *testing.T) {
_ = db.Drop(ctx)
}()

err = db.Tx(ctx, func(ctx context.Context, tx *tigris.Tx) error {
c := tigris.GetTxCollection[Coll1](tx)
err = db.Tx(ctx, func(ctx context.Context) error {
c := tigris.GetCollection[Coll1](db)

d1 := &Coll1{Key1: "aaa", Field1: 123}
d2 := &Coll1{Key1: "bbb", Field1: 123}
Expand All @@ -152,14 +151,14 @@ func TestClientCollectionTx(t *testing.T) {
_, err = c.Update(ctx, filter.Or(
filter.Eq("Key1", "aaa"),
filter.Eq("Key1", "bbb")),
update.Set("Field1", 345),
fields.Set("Field1", 345),
)
require.NoError(t, err)

it, err := c.Read(ctx, filter.Or(
filter.Eq("Key1", "aaa"),
filter.Eq("Key1", "ccc")),
projection.Exclude("Key1").
fields.Exclude("Key1").
Include("Field1"),
)
require.NoError(t, err)
Expand All @@ -173,7 +172,7 @@ func TestClientCollectionTx(t *testing.T) {
require.NoError(t, it.Err())
it.Close()

it, err = c.ReadAll(ctx, projection.All)
it, err = c.ReadAll(ctx, fields.All)
require.NoError(t, err)
it.Close()

Expand All @@ -200,7 +199,7 @@ func TestClientCollectionTx(t *testing.T) {
require.NoError(t, err)

c := tigris.GetCollection[Coll1](db)
it, err := c.ReadAll(ctx, projection.All)
it, err := c.ReadAll(ctx, fields.All)
require.NoError(t, err)

var d Coll1
Expand Down
Loading

0 comments on commit c5e69e2

Please sign in to comment.