Skip to content

Commit

Permalink
wip: turning NodeIDs into GenericNodeIDs
Browse files Browse the repository at this point in the history
  • Loading branch information
abi87 committed Sep 15, 2023
1 parent 4b1944b commit e16b655
Show file tree
Hide file tree
Showing 32 changed files with 224 additions and 234 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/VictoriaMetrics/fastcache v1.10.0
github.com/ava-labs/avalanchego v1.10.10-rc.4
github.com/ava-labs/avalanchego v1.10.10-rc.4.0.20230915153312-765a716b3af5
github.com/cespare/cp v0.1.0
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
github.com/davecgh/go-spew v1.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanchego v1.10.10-rc.4 h1:1oxQf1boQDliJspfGBqsYsqg91d4F3qiFTnwnp+EruY=
github.com/ava-labs/avalanchego v1.10.10-rc.4/go.mod h1:BN97sZppDSvIMIfEjrLTjdPTFkGLkb0ISJHEcoxMMNk=
github.com/ava-labs/avalanchego v1.10.10-rc.4.0.20230915153312-765a716b3af5 h1:vC6YhUmsTgGu5T43P6Inth20H4EtKebS4uNpQDenvxA=
github.com/ava-labs/avalanchego v1.10.10-rc.4.0.20230915153312-765a716b3af5/go.mod h1:ktWtdp5jBjXgLOq/Cm3l2IcNGlrcKvFdokABy8NtP/w=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
12 changes: 6 additions & 6 deletions peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ type NetworkClient interface {
// node version greater than or equal to minVersion.
// Returns response bytes, the ID of the chosen peer, and ErrRequestFailed if
// the request should be retried.
SendAppRequestAny(minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error)
SendAppRequestAny(minVersion *version.Application, request []byte) ([]byte, ids.GenericNodeID, error)

// SendAppRequest synchronously sends request to the selected nodeID
// Returns response bytes, and ErrRequestFailed if the request should be retried.
SendAppRequest(nodeID ids.NodeID, request []byte) ([]byte, error)
SendAppRequest(nodeID ids.GenericNodeID, request []byte) ([]byte, error)

// SendCrossChainRequest sends a request to a specific blockchain running on this node.
// Returns response bytes, and ErrRequestFailed if the request failed.
Expand All @@ -38,7 +38,7 @@ type NetworkClient interface {

// TrackBandwidth should be called for each valid request with the bandwidth
// (length of response divided by request time), and with 0 if the response is invalid.
TrackBandwidth(nodeID ids.NodeID, bandwidth float64)
TrackBandwidth(nodeID ids.GenericNodeID, bandwidth float64)
}

// client implements NetworkClient interface
Expand All @@ -59,7 +59,7 @@ func NewNetworkClient(network Network) NetworkClient {
// node version greater than or equal to minVersion.
// Returns response bytes, the ID of the chosen peer, and ErrRequestFailed if
// the request should be retried.
func (c *client) SendAppRequestAny(minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) {
func (c *client) SendAppRequestAny(minVersion *version.Application, request []byte) ([]byte, ids.GenericNodeID, error) {
waitingHandler := newWaitingResponseHandler()
nodeID, err := c.network.SendAppRequestAny(minVersion, request, waitingHandler)
if err != nil {
Expand All @@ -74,7 +74,7 @@ func (c *client) SendAppRequestAny(minVersion *version.Application, request []by

// SendAppRequest synchronously sends request to the specified nodeID
// Returns response bytes and ErrRequestFailed if the request should be retried.
func (c *client) SendAppRequest(nodeID ids.NodeID, request []byte) ([]byte, error) {
func (c *client) SendAppRequest(nodeID ids.GenericNodeID, request []byte) ([]byte, error) {
waitingHandler := newWaitingResponseHandler()
if err := c.network.SendAppRequest(nodeID, request, waitingHandler); err != nil {
return nil, err
Expand Down Expand Up @@ -104,6 +104,6 @@ func (c *client) Gossip(gossip []byte) error {
return c.network.Gossip(gossip)
}

func (c *client) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {
func (c *client) TrackBandwidth(nodeID ids.GenericNodeID, bandwidth float64) {
c.network.TrackBandwidth(nodeID, bandwidth)
}
38 changes: 19 additions & 19 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type Network interface {
// node version greater than or equal to minVersion.
// Returns the ID of the chosen peer, and an error if the request could not
// be sent to a peer with the desired [minVersion].
SendAppRequestAny(minVersion *version.Application, message []byte, handler message.ResponseHandler) (ids.NodeID, error)
SendAppRequestAny(minVersion *version.Application, message []byte, handler message.ResponseHandler) (ids.GenericNodeID, error)

// SendAppRequest sends message to given nodeID, notifying handler when there's a response or timeout
SendAppRequest(nodeID ids.NodeID, message []byte, handler message.ResponseHandler) error
SendAppRequest(nodeID ids.GenericNodeID, message []byte, handler message.ResponseHandler) error

// Gossip sends given gossip message to peers
Gossip(gossip []byte) error
Expand All @@ -76,14 +76,14 @@ type Network interface {

// TrackBandwidth should be called for each valid request with the bandwidth
// (length of response divided by request time), and with 0 if the response is invalid.
TrackBandwidth(nodeID ids.NodeID, bandwidth float64)
TrackBandwidth(nodeID ids.GenericNodeID, bandwidth float64)
}

// network is an implementation of Network that processes message requests for
// each peer in linear fashion
type network struct {
lock sync.RWMutex // lock for mutating state of this Network struct
self ids.NodeID // NodeID of this node
self ids.GenericNodeID // NodeID of this node
requestIDGen uint32 // requestID counter used to track outbound requests
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
Expand All @@ -110,7 +110,7 @@ type network struct {
closed utils.Atomic[bool]
}

func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.GenericNodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
return &network{
router: router,
appSender: appSender,
Expand All @@ -134,10 +134,10 @@ func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Mana
// the request will be sent to any peer regardless of their version.
// Returns the ID of the chosen peer, and an error if the request could not
// be sent to a peer with the desired [minVersion].
func (n *network) SendAppRequestAny(minVersion *version.Application, request []byte, handler message.ResponseHandler) (ids.NodeID, error) {
func (n *network) SendAppRequestAny(minVersion *version.Application, request []byte, handler message.ResponseHandler) (ids.GenericNodeID, error) {
// Take a slot from total [activeAppRequests] and block until a slot becomes available.
if err := n.activeAppRequests.Acquire(context.Background(), 1); err != nil {
return ids.EmptyNodeID, errAcquiringSemaphore
return ids.EmptyGenericNodeID, errAcquiringSemaphore
}

n.lock.Lock()
Expand All @@ -147,12 +147,12 @@ func (n *network) SendAppRequestAny(minVersion *version.Application, request []b
}

n.activeAppRequests.Release(1)
return ids.EmptyNodeID, fmt.Errorf("no peers found matching version %s out of %d peers", minVersion, n.peers.Size())
return ids.EmptyGenericNodeID, fmt.Errorf("no peers found matching version %s out of %d peers", minVersion, n.peers.Size())
}

// SendAppRequest sends request message bytes to specified nodeID, notifying the responseHandler on response or failure
func (n *network) SendAppRequest(nodeID ids.NodeID, request []byte, responseHandler message.ResponseHandler) error {
if nodeID == ids.EmptyNodeID {
func (n *network) SendAppRequest(nodeID ids.GenericNodeID, request []byte, responseHandler message.ResponseHandler) error {
if nodeID == ids.EmptyGenericNodeID {
return fmt.Errorf("cannot send request to empty nodeID, nodeID=%s, requestLen=%d", nodeID, len(request))
}

Expand All @@ -173,7 +173,7 @@ func (n *network) SendAppRequest(nodeID ids.NodeID, request []byte, responseHand
// Releases active requests semaphore if there was an error in sending the request
// Returns an error if [appSender] is unable to make the request.
// Assumes write lock is held
func (n *network) sendAppRequest(nodeID ids.NodeID, request []byte, responseHandler message.ResponseHandler) error {
func (n *network) sendAppRequest(nodeID ids.GenericNodeID, request []byte, responseHandler message.ResponseHandler) error {
if n.closed.Get() {
n.activeAppRequests.Release(1)
return nil
Expand All @@ -185,7 +185,7 @@ func (n *network) sendAppRequest(nodeID ids.NodeID, request []byte, responseHand
requestID := n.nextRequestID()
n.outstandingRequestHandlers[requestID] = responseHandler

nodeIDs := set.NewSet[ids.NodeID](1)
nodeIDs := set.NewSet[ids.GenericNodeID](1)
nodeIDs.Add(nodeID)

// Send app request to [nodeID].
Expand Down Expand Up @@ -317,7 +317,7 @@ func (n *network) CrossChainAppResponse(ctx context.Context, respondingChainID i
// returns error if the requestHandler returns an error
// sends a response back to the sender if length of response returned by the handler is >0
// expects the deadline to not have been passed
func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) error {
func (n *network) AppRequest(ctx context.Context, nodeID ids.GenericNodeID, requestID uint32, deadline time.Time, request []byte) error {
if n.closed.Get() {
return nil
}
Expand Down Expand Up @@ -357,7 +357,7 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u
// Error returned by this function is expected to be treated as fatal by the engine
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
func (n *network) AppResponse(ctx context.Context, nodeID ids.GenericNodeID, requestID uint32, response []byte) error {
log.Debug("received AppResponse from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
Expand All @@ -378,7 +378,7 @@ func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID
// - request times out before a response is provided
// error returned by this function is expected to be treated as fatal by the engine
// returns error only when the response handler returns an error
func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.GenericNodeID, requestID uint32) error {
log.Debug("received AppRequestFailed from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
Expand Down Expand Up @@ -444,7 +444,7 @@ func (n *network) Gossip(gossip []byte) error {
// AppGossip is called by avalanchego -> VM when there is an incoming AppGossip from a peer
// error returned by this function is expected to be treated as fatal by the engine
// returns error if request could not be parsed as message.Request or when the requestHandler returns an error
func (n *network) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
func (n *network) AppGossip(_ context.Context, nodeID ids.GenericNodeID, gossipBytes []byte) error {
var gossipMsg message.GossipMessage
if _, err := n.codec.Unmarshal(gossipBytes, &gossipMsg); err != nil {
log.Debug("could not parse app gossip", "nodeID", nodeID, "gossipLen", len(gossipBytes), "err", err)
Expand All @@ -456,7 +456,7 @@ func (n *network) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []
}

// Connected adds the given nodeID to the peer list so that it can receive messages
func (n *network) Connected(_ context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error {
func (n *network) Connected(_ context.Context, nodeID ids.GenericNodeID, nodeVersion *version.Application) error {
log.Debug("adding new peer", "nodeID", nodeID)

n.lock.Lock()
Expand All @@ -476,7 +476,7 @@ func (n *network) Connected(_ context.Context, nodeID ids.NodeID, nodeVersion *v
}

// Disconnected removes given [nodeID] from the peer list
func (n *network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
func (n *network) Disconnected(_ context.Context, nodeID ids.GenericNodeID) error {
log.Debug("disconnecting peer", "nodeID", nodeID)
n.lock.Lock()
defer n.lock.Unlock()
Expand Down Expand Up @@ -532,7 +532,7 @@ func (n *network) Size() uint32 {
return uint32(n.peers.Size())
}

func (n *network) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {
func (n *network) TrackBandwidth(nodeID ids.GenericNodeID, bandwidth float64) {
n.lock.Lock()
defer n.lock.Unlock()

Expand Down
Loading

0 comments on commit e16b655

Please sign in to comment.