Skip to content

Commit

Permalink
nodejs: int64 => uin32
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Apr 19, 2024
1 parent f66baf0 commit ba089d9
Show file tree
Hide file tree
Showing 21 changed files with 291 additions and 460 deletions.
22 changes: 11 additions & 11 deletions balancers/balancers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package balancers

import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"sort"
"strings"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
)

Expand All @@ -28,8 +28,8 @@ func SingleConn() *balancerConfig.Config {

type filterLocalDC struct{}

func (filterLocalDC) Allow(info balancerConfig.Info, c conn.Conn) bool {
return c.Endpoint().Location() == info.SelfLocation
func (filterLocalDC) Allow(info balancerConfig.Info, e endpoint.Info) bool {
return e.Location() == info.SelfLocation
}

func (filterLocalDC) String() string {
Expand Down Expand Up @@ -58,8 +58,8 @@ func PreferLocalDCWithFallBack(balancer *balancerConfig.Config) *balancerConfig.

type filterLocations []string

func (locations filterLocations) Allow(_ balancerConfig.Info, c conn.Conn) bool {
location := strings.ToUpper(c.Endpoint().Location())
func (locations filterLocations) Allow(_ balancerConfig.Info, e endpoint.Info) bool {
location := strings.ToUpper(e.Location())
for _, l := range locations {
if location == l {
return true
Expand Down Expand Up @@ -111,7 +111,7 @@ func PreferLocationsWithFallback(balancer *balancerConfig.Config, locations ...s
}

type Endpoint interface {
NodeID() int64
NodeID() uint32
Address() string
Location() string

Expand All @@ -122,10 +122,10 @@ type Endpoint interface {
LocalDC() bool
}

type filterFunc func(info balancerConfig.Info, c conn.Conn) bool
type filterFunc func(info balancerConfig.Info, e endpoint.Info) bool

func (p filterFunc) Allow(info balancerConfig.Info, c conn.Conn) bool {
return p(info, c)
func (p filterFunc) Allow(info balancerConfig.Info, e endpoint.Info) bool {
return p(info, e)
}

func (p filterFunc) String() string {
Expand All @@ -135,8 +135,8 @@ func (p filterFunc) String() string {
// Prefer creates balancer which use endpoints by filter
// Balancer "balancer" defines balancing algorithm between endpoints selected with filter
func Prefer(balancer *balancerConfig.Config, filter func(endpoint Endpoint) bool) *balancerConfig.Config {
balancer.Filter = filterFunc(func(_ balancerConfig.Info, c conn.Conn) bool {
return filter(c.Endpoint())
balancer.Filter = filterFunc(func(_ balancerConfig.Info, e endpoint.Info) bool {
return filter(e)
})

return balancer
Expand Down
82 changes: 56 additions & 26 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Balancer struct {
onApplyDiscoveredEndpoints []func(ctx context.Context, endpoints []endpoint.Info)
}

func (b *Balancer) HasNode(id int64) bool {
func (b *Balancer) HasNode(id uint32) bool {
if b.config.SingleConn {
return true
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
}
}

b.applyDiscoveredEndpoints(ctx, endpoints, localDC)
b.applyEndpoints(ctx, endpoints, localDC)

return nil
}
Expand All @@ -142,40 +142,40 @@ func endpointsDiff(newestEndpoints []endpoint.Endpoint, previousConns []conn.Con
added = make([]trace.EndpointInfo, 0, len(previousConns))
dropped = make([]trace.EndpointInfo, 0, len(previousConns))
var (
newestMap = make(map[string]struct{}, len(newestEndpoints))
newestMap = make(map[string]endpoint.Endpoint, len(newestEndpoints))
previousMap = make(map[string]struct{}, len(previousConns))
)
sort.Slice(newestEndpoints, func(i, j int) bool {
return newestEndpoints[i].Address() < newestEndpoints[j].Address()
})
sort.Slice(previousConns, func(i, j int) bool {
return previousConns[i].Endpoint().Address() < previousConns[j].Endpoint().Address()
return previousConns[i].Address() < previousConns[j].Address()
})
for _, e := range previousConns {
previousMap[e.Endpoint().Address()] = struct{}{}
previousMap[e.Address()] = struct{}{}
}
for _, e := range newestEndpoints {
nodes = append(nodes, e.Copy())
newestMap[e.Address()] = struct{}{}
nodes = append(nodes, e)
newestMap[e.Address()] = e
if _, has := previousMap[e.Address()]; !has {
added = append(added, e.Copy())
added = append(added, e)
}
}
for _, c := range previousConns {
if _, has := newestMap[c.Endpoint().Address()]; !has {
dropped = append(dropped, c.Endpoint().Copy())
if e, has := newestMap[c.Address()]; !has {
dropped = append(dropped, e)
}
}

return nodes, added, dropped
}

func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
func (b *Balancer) applyEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
var (
onDone = trace.DriverOnBalancerUpdate(
b.driverConfig.Trace(), &ctx,
stack.FunctionID(
"github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).applyDiscoveredEndpoints"),
"github.com/ydb-platform/ydb-go-sdk/3/internal/balancer.(*Balancer).applyEndpoints"),
b.config.DetectLocalDC,
)
previousConns []conn.Conn
Expand All @@ -186,10 +186,9 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
}()

connections := endpointsToConnections(b.pool, endpoints)
for _, c := range connections {
connections.Each(func(c conn.Conn, e endpoint.Endpoint) {
b.pool.Allow(ctx, c)
c.Endpoint().Touch()
}
})

info := balancerConfig.Info{SelfLocation: localDC}
state := newConnectionsState(connections, b.config.Filter, info, b.config.AllowFallback)
Expand Down Expand Up @@ -260,7 +259,7 @@ func New(
localDCDetector: detectLocalDC,
}
d := internalDiscovery.New(ctx, pool.Get(
endpoint.New(driverConfig.Endpoint()),
driverConfig.Endpoint(),
), discoveryConfig)

b.discoveryClient = d
Expand All @@ -272,8 +271,8 @@ func New(
}

if b.config.SingleConn {
b.applyDiscoveredEndpoints(ctx, []endpoint.Endpoint{
endpoint.New(driverConfig.Endpoint()),
b.applyEndpoints(ctx, []endpoint.Endpoint{
endpoint.New(0, driverConfig.Endpoint(), ""),
}, "")
} else {
// initialization of balancer state
Expand Down Expand Up @@ -348,8 +347,7 @@ func (b *Balancer) wrapCall(ctx context.Context, f func(ctx context.Context, cc
if conn.UseWrapping(ctx) {
if credentials.IsAccessError(err) {
err = credentials.AccessError("no access", err,
credentials.WithAddress(cc.Endpoint().String()),
credentials.WithNodeID(cc.Endpoint().NodeID()),
credentials.WithAddress(cc.Address()),
credentials.WithCredentials(b.driverConfig.Credentials()),
)
}
Expand Down Expand Up @@ -377,9 +375,9 @@ func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
)
defer func() {
if err == nil {
onDone(c.Endpoint(), nil)
onDone(c.Address(), nil)
} else {
onDone(nil, err)
onDone("", err)
}
}()

Expand Down Expand Up @@ -408,11 +406,43 @@ func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
return c, nil
}

func endpointsToConnections(p *conn.Pool, endpoints []endpoint.Endpoint) []conn.Conn {
conns := make([]conn.Conn, 0, len(endpoints))
for _, e := range endpoints {
conns = append(conns, p.Get(e))
type connByEndpoint struct {
conns map[string]conn.Conn
endpoints map[string]endpoint.Endpoint
}

func (m *connByEndpoint) Get(address string) (conn.Conn, endpoint.Endpoint) {
return m.conns[address], m.endpoints[address]
}

func (m *connByEndpoint) Each(visitor func(c conn.Conn, e endpoint.Endpoint)) {
for _, e := range m.endpoints {
visitor(m.conns[e.Address()], e)
}
}

func (m *connByEndpoint) Len() int {
return len(m.endpoints)
}

func (m *connByEndpoint) Conns() (conns []conn.Conn) {
conns = make([]conn.Conn, 0, len(m.conns))
for _, c := range m.conns {
conns = append(conns, c)
}
return conns
}

func endpointsToConnections(p *conn.Pool, endpoints []endpoint.Endpoint) *connByEndpoint {
m := &connByEndpoint{
conns: make(map[string]conn.Conn, len(endpoints)),
endpoints: make(map[string]endpoint.Endpoint, len(endpoints)),
}

for _, e := range endpoints {
m.conns[e.Address()] = p.Get(e.Address())
m.endpoints[e.Address()] = e
}

return m
}
4 changes: 2 additions & 2 deletions internal/balancer/config/routerconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package config

import (
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring"
)

Expand Down Expand Up @@ -47,6 +47,6 @@ type Info struct {
}

type Filter interface {
Allow(info Info, c conn.Conn) bool
Allow(info Info, e endpoint.Info) bool
String() string
}
33 changes: 17 additions & 16 deletions internal/balancer/connections_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package balancer

import (
"context"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand"
)

type connectionsState struct {
connByNodeID map[int64]conn.Conn
connByNodeID map[uint32]conn.Conn

prefer []conn.Conn
fallback []conn.Conn
Expand All @@ -19,7 +20,7 @@ type connectionsState struct {
}

func newConnectionsState(
conns []conn.Conn,
conns *connByEndpoint,
filter balancerConfig.Filter,
info balancerConfig.Info,
allowFallback bool,
Expand All @@ -31,7 +32,7 @@ func newConnectionsState(

res.prefer, res.fallback = sortPreferConnections(conns, filter, info, allowFallback)
if allowFallback {
res.all = conns
res.all = conns.Conns()
} else {
res.all = res.prefer
}
Expand Down Expand Up @@ -115,40 +116,40 @@ func (s *connectionsState) selectRandomConnection(conns []conn.Conn, allowBanned
return nil, failedConns
}

func connsToNodeIDMap(conns []conn.Conn) (nodes map[int64]conn.Conn) {
if len(conns) == 0 {
func connsToNodeIDMap(conns *connByEndpoint) (nodes map[uint32]conn.Conn) {
if conns.Len() == 0 {
return nil
}
nodes = make(map[int64]conn.Conn, len(conns))
for _, c := range conns {
nodes[c.Endpoint().NodeID()] = c
}
nodes = make(map[uint32]conn.Conn, conns.Len())
conns.Each(func(c conn.Conn, e endpoint.Endpoint) {
nodes[e.NodeID()] = c
})

return nodes
}

func sortPreferConnections(
conns []conn.Conn,
conns *connByEndpoint,
filter balancerConfig.Filter,
info balancerConfig.Info,
allowFallback bool,
) (prefer, fallback []conn.Conn) {
if filter == nil {
return conns, nil
return conns.Conns(), nil
}

prefer = make([]conn.Conn, 0, len(conns))
prefer = make([]conn.Conn, 0, conns.Len())
if allowFallback {
fallback = make([]conn.Conn, 0, len(conns))
fallback = make([]conn.Conn, 0, conns.Len())
}

for _, c := range conns {
if filter.Allow(info, c) {
conns.Each(func(c conn.Conn, e endpoint.Endpoint) {
if filter.Allow(info, e) {
prefer = append(prefer, c)
} else if allowFallback {
fallback = append(fallback, c)
}
}
})

return prefer, fallback
}
Expand Down
Loading

0 comments on commit ba089d9

Please sign in to comment.