Skip to content

Commit

Permalink
* Added table/options.UseQueryServiceExecute() option for redirect …
Browse files Browse the repository at this point in the history
…`table.Session.Execute` call to `query.Execute`
  • Loading branch information
asmyasnikov committed Apr 18, 2024
1 parent 35a861a commit 5e467ce
Show file tree
Hide file tree
Showing 43 changed files with 677 additions and 298 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added `table/options.UseQueryServiceExecute()` option for redirect `table.Session.Execute` call to `query.Execute`

## v3.65.1
* Updated dependency `ydb-go-genproto`
* Added processing of `Ydb.StatusIds_EXTERNAL_ERROR` in `retry.Retry`
Expand Down
2 changes: 1 addition & 1 deletion balancers/balancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func PreferLocationsWithFallback(balancer *balancerConfig.Config, locations ...s
}

type Endpoint interface {
NodeID() uint32
NodeID() int64
Address() string
Location() string

Expand Down
2 changes: 1 addition & 1 deletion internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Balancer struct {
onApplyDiscoveredEndpoints []func(ctx context.Context, endpoints []endpoint.Info)
}

func (b *Balancer) HasNode(id uint32) bool {
func (b *Balancer) HasNode(id int64) bool {
if b.config.SingleConn {
return true
}
Expand Down
6 changes: 3 additions & 3 deletions internal/balancer/connections_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type connectionsState struct {
connByNodeID map[uint32]conn.Conn
connByNodeID map[int64]conn.Conn

prefer []conn.Conn
fallback []conn.Conn
Expand Down Expand Up @@ -115,11 +115,11 @@ func (s *connectionsState) selectRandomConnection(conns []conn.Conn, allowBanned
return nil, failedConns
}

func connsToNodeIDMap(conns []conn.Conn) (nodes map[uint32]conn.Conn) {
func connsToNodeIDMap(conns []conn.Conn) (nodes map[int64]conn.Conn) {
if len(conns) == 0 {
return nil
}
nodes = make(map[uint32]conn.Conn, len(conns))
nodes = make(map[int64]conn.Conn, len(conns))
for _, c := range conns {
nodes[c.Endpoint().NodeID()] = c
}
Expand Down
16 changes: 8 additions & 8 deletions internal/balancer/connections_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestConnsToNodeIDMap(t *testing.T) {
table := []struct {
name string
source []conn.Conn
res map[uint32]conn.Conn
res map[int64]conn.Conn
}{
{
name: "Empty",
Expand All @@ -28,7 +28,7 @@ func TestConnsToNodeIDMap(t *testing.T) {
source: []conn.Conn{
&mock.Conn{NodeIDField: 0},
},
res: map[uint32]conn.Conn{
res: map[int64]conn.Conn{
0: &mock.Conn{NodeIDField: 0},
},
},
Expand All @@ -38,7 +38,7 @@ func TestConnsToNodeIDMap(t *testing.T) {
&mock.Conn{NodeIDField: 1},
&mock.Conn{NodeIDField: 10},
},
res: map[uint32]conn.Conn{
res: map[int64]conn.Conn{
1: &mock.Conn{NodeIDField: 1},
10: &mock.Conn{NodeIDField: 10},
},
Expand All @@ -50,7 +50,7 @@ func TestConnsToNodeIDMap(t *testing.T) {
&mock.Conn{NodeIDField: 0},
&mock.Conn{NodeIDField: 10},
},
res: map[uint32]conn.Conn{
res: map[int64]conn.Conn{
0: &mock.Conn{NodeIDField: 0},
1: &mock.Conn{NodeIDField: 1},
10: &mock.Conn{NodeIDField: 10},
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestNewState(t *testing.T) {
&mock.Conn{AddrField: "2", NodeIDField: 2},
}, nil, balancerConfig.Info{}, false),
res: &connectionsState{
connByNodeID: map[uint32]conn.Conn{
connByNodeID: map[int64]conn.Conn{
1: &mock.Conn{AddrField: "1", NodeIDField: 1},
2: &mock.Conn{AddrField: "2", NodeIDField: 2},
},
Expand All @@ -290,7 +290,7 @@ func TestNewState(t *testing.T) {
return info.SelfLocation == c.Endpoint().Location()
}), balancerConfig.Info{SelfLocation: "t"}, false),
res: &connectionsState{
connByNodeID: map[uint32]conn.Conn{
connByNodeID: map[int64]conn.Conn{
1: &mock.Conn{AddrField: "t1", NodeIDField: 1, LocationField: "t"},
2: &mock.Conn{AddrField: "f1", NodeIDField: 2, LocationField: "f"},
3: &mock.Conn{AddrField: "t2", NodeIDField: 3, LocationField: "t"},
Expand Down Expand Up @@ -318,7 +318,7 @@ func TestNewState(t *testing.T) {
return info.SelfLocation == c.Endpoint().Location()
}), balancerConfig.Info{SelfLocation: "t"}, true),
res: &connectionsState{
connByNodeID: map[uint32]conn.Conn{
connByNodeID: map[int64]conn.Conn{
1: &mock.Conn{AddrField: "t1", NodeIDField: 1, LocationField: "t"},
2: &mock.Conn{AddrField: "f1", NodeIDField: 2, LocationField: "f"},
3: &mock.Conn{AddrField: "t2", NodeIDField: 3, LocationField: "t"},
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestNewState(t *testing.T) {
return info.SelfLocation == c.Endpoint().Location()
}), balancerConfig.Info{SelfLocation: "t"}, true),
res: &connectionsState{
connByNodeID: map[uint32]conn.Conn{
connByNodeID: map[int64]conn.Conn{
1: &mock.Conn{AddrField: "t1", NodeIDField: 1, LocationField: "t"},
2: &mock.Conn{AddrField: "f1", NodeIDField: 2, LocationField: "f"},
3: &mock.Conn{AddrField: "t2", NodeIDField: 3, LocationField: "t"},
Expand Down
2 changes: 1 addition & 1 deletion internal/balancer/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type (
)

type Endpoint interface {
NodeID() uint32
NodeID() int64
}

func WithEndpoint(ctx context.Context, endpoint Endpoint) context.Context {
Expand Down
2 changes: 1 addition & 1 deletion internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *conn) IsState(states ...State) bool {
return false
}

func (c *conn) NodeID() uint32 {
func (c *conn) NodeID() int64 {
if c != nil {
return c.endpoint.NodeID()
}
Expand Down
4 changes: 2 additions & 2 deletions internal/conn/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package conn
import "fmt"

type connError struct {
nodeID uint32
nodeID int64
endpoint string
err error
}

func newConnError(id uint32, endpoint string, err error) connError {
func newConnError(id int64, endpoint string, err error) connError {
return connError{
nodeID: id,
endpoint: endpoint,
Expand Down
2 changes: 1 addition & 1 deletion internal/conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

type connsKey struct {
address string
nodeID uint32
nodeID int64
}

type Pool struct {
Expand Down
4 changes: 2 additions & 2 deletions internal/credentials/access_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func WithDatabase(database string) databaseAuthErrorOption {
return databaseAuthErrorOption(database)
}

type nodeIDAuthErrorOption uint32
type nodeIDAuthErrorOption int64

func (id nodeIDAuthErrorOption) applyAuthErrorOption(w io.Writer) {
fmt.Fprint(w, "nodeID:")
fmt.Fprint(w, strconv.FormatUint(uint64(id), 10))
}

func WithNodeID(id uint32) authErrorOption {
func WithNodeID(id int64) authErrorOption {
return nodeIDAuthErrorOption(id)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, e
endpoints = append(endpoints, endpoint.New(
net.JoinHostPort(e.GetAddress(), strconv.Itoa(int(e.GetPort()))),
endpoint.WithLocation(e.GetLocation()),
endpoint.WithID(e.GetNodeId()),
endpoint.WithID(int64(e.GetNodeId())),
endpoint.WithLoadFactor(e.GetLoadFactor()),
endpoint.WithLocalDC(e.GetLocation() == location),
endpoint.WithServices(e.GetService()),
Expand Down
8 changes: 4 additions & 4 deletions internal/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type Info interface {
NodeID() uint32
NodeID() int64
Address() string
Location() string
LastUpdated() time.Time
Expand All @@ -30,7 +30,7 @@ type Endpoint interface {

type endpoint struct { //nolint:maligned
mu sync.RWMutex
id uint32
id int64
address string
location string
services []string
Expand Down Expand Up @@ -70,7 +70,7 @@ func (e *endpoint) String() string {
)
}

func (e *endpoint) NodeID() uint32 {
func (e *endpoint) NodeID() int64 {
e.mu.RLock()
defer e.mu.RUnlock()

Expand Down Expand Up @@ -128,7 +128,7 @@ func (e *endpoint) Touch(opts ...Option) {

type Option func(e *endpoint)

func WithID(id uint32) Option {
func WithID(id int64) Option {
return func(e *endpoint) {
e.id = id
}
Expand Down
6 changes: 3 additions & 3 deletions internal/mock/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Conn struct {
PingErr error
AddrField string
LocationField string
NodeIDField uint32
NodeIDField int64
State conn.State
LocalDCField bool
}
Expand Down Expand Up @@ -80,14 +80,14 @@ func (c *Conn) Unban(ctx context.Context) conn.State {
type Endpoint struct {
AddrField string
LocationField string
NodeIDField uint32
NodeIDField int64
LocalDCField bool
}

func (e *Endpoint) Choose(bool) {
}

func (e *Endpoint) NodeID() uint32 {
func (e *Endpoint) NodeID() int64 {
return e.NodeIDField
}

Expand Down
2 changes: 1 addition & 1 deletion internal/params/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *Parameters) String() string {
}

func (p *Parameters) ToYDB(a *allocator.Allocator) map[string]*Ydb.TypedValue {
if p == nil {
if p == nil || len(*p) == 0 {
return nil
}
parameters := make(map[string]*Ydb.TypedValue, len(*p))
Expand Down
4 changes: 2 additions & 2 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//go:generate mockgen -destination grpc_client_mock_test.go -package query -write_package_comment=false github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1 QueryServiceClient,QueryService_AttachSessionClient,QueryService_ExecuteQueryClient

type nodeChecker interface {
HasNode(id uint32) bool
HasNode(id int64) bool
}

type balancer interface {
Expand Down Expand Up @@ -197,7 +197,7 @@ func New(ctx context.Context, balancer balancer, cfg *config.Config) *Client {

s, err := createSession(createCtx, client.grpcClient, cfg,
withSessionCheck(func(s *Session) bool {
return balancer.HasNode(uint32(s.nodeID))
return balancer.HasNode(s.nodeID)
}),
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func newTestSession(id string) *Session {
func newTestSessionWithClient(id string, client Ydb_Query_V1.QueryServiceClient) *Session {
return &Session{
id: id,
grpcClient: client,
client: client,
statusCode: statusIdle,
cfg: config.New(),
}
Expand Down
Loading

0 comments on commit 5e467ce

Please sign in to comment.