Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Libp2p Gossipsub Peer Gater #6479

Merged
merged 13 commits into from
Sep 23, 2024
4 changes: 4 additions & 0 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,10 @@ network-config:
# keep the entire network's size. Otherwise, the local node's view of the network will be incomplete due to cache eviction.
# Recommended size is 10x the number of peers in the network.
cache-size: 10000
peer-gater-enabled: true
peer-gater-topic-delivery-weights-override: |
consensus-committee: 1.5, sync-committee: .75

# Application layer spam prevention
alsp-spam-record-cache-size: 1000
alsp-spam-report-queue-size: 10_000
Expand Down
4 changes: 4 additions & 0 deletions insecure/corruptlibp2p/pubsub_adapter_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (c *CorruptPubSubAdapterConfig) WithRpcInspector(_ p2p.GossipSubRPCInspecto
// CorruptPubSub does not support inspector suite. This is a no-op.
}

func (c *CorruptPubSubAdapterConfig) WithPeerGater(_ map[string]float64) {
// CorruptPubSub does not need peer gater. This is a no-op.
}

func (c *CorruptPubSubAdapterConfig) Build() []corrupt.Option {
return c.options
}
Expand Down
10 changes: 10 additions & 0 deletions network/netconf/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ func AllFlagNames() []string {
BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.MisbehaviourPenaltiesKey, p2pconfig.IWantKey),
BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.MisbehaviourPenaltiesKey, p2pconfig.PublishKey),
BuildFlagName(gossipsubKey, p2pconfig.ScoreParamsKey, p2pconfig.ScoringRegistryKey, p2pconfig.MisbehaviourPenaltiesKey, p2pconfig.ClusterPrefixedReductionFactorKey),

BuildFlagName(gossipsubKey, p2pconfig.PeerGaterKey, p2pconfig.EnabledKey),
BuildFlagName(gossipsubKey, p2pconfig.PeerGaterKey, p2pconfig.TopicDeliveryWeightsKey),
}

for _, scope := range []string{systemScope, transientScope, protocolScope, peerScope, peerProtocolScope} {
Expand Down Expand Up @@ -597,6 +600,12 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) {
config.GossipSub.ScoringParameters.ScoringRegistryParameters.MisbehaviourPenalties.ClusterPrefixedReductionFactor,
"the factor used to reduce the penalty for control message misbehaviours on cluster prefixed topics")

flags.Bool(BuildFlagName(gossipsubKey, p2pconfig.PeerGaterKey, p2pconfig.EnabledKey),
config.GossipSub.PeerScoringEnabled,
"enable the libp2p peer gater")
flags.String(BuildFlagName(gossipsubKey, p2pconfig.PeerGaterKey, p2pconfig.TopicDeliveryWeightsKey),
config.GossipSub.PeerGaterTopicDeliveryWeightsOverride,
"topic delivery weights override, this is a comma separated with the format topic_1:2.2,topic_2:3.2,topic_3:1.7 these will be used to override the default topic weight of 1.0 for the specified topic.")
}

// LoadLibP2PResourceManagerFlags loads all CLI flags for the libp2p resource manager configuration on the provided pflag set.
Expand Down Expand Up @@ -666,6 +675,7 @@ func SetAliases(conf *viper.Viper) error {
// mapping should be from network-p2pconfig.key1.key2.key3... to network-config-key1-key2-key3...
m[strings.Join(s[1:], "-")] = key
}

// each flag name should correspond to exactly one key in our config store after it is loaded with the default config
for _, flagName := range AllFlagNames() {
fullKey, ok := m[flagName]
Expand Down
8 changes: 8 additions & 0 deletions network/p2p/builder/gossipsub/gossipSubBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ func (g *Builder) Build(ctx irrecoverable.SignalerContext) (p2p.PubSubAdapter, e
})
gossipSubConfigs.WithMessageIdFunction(utils.MessageID)

if g.gossipSubCfg.PeerGaterEnabled {
topicDeliveryWeights, err := g.gossipSubCfg.PeerGaterTopicDeliveryWeights()
if err != nil {
return nil, fmt.Errorf("failed to add peer gater option: %w", err)
}
gossipSubConfigs.WithPeerGater(topicDeliveryWeights)
}

if g.routingSystem != nil {
gossipSubConfigs.WithRoutingDiscovery(g.routingSystem)
}
Expand Down
27 changes: 27 additions & 0 deletions network/p2p/config/gossipsub.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package p2pconfig

import (
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -61,6 +63,8 @@ const (
PeerScoringEnabledKey = "peer-scoring-enabled"
ScoreParamsKey = "scoring-parameters"
SubscriptionProviderKey = "subscription-provider"
PeerGaterKey = "peer-gater"
TopicDeliveryWeightsKey = "topic-delivery-weights-override"
)

// GossipSubParameters is the configuration for the GossipSub pubsub implementation.
Expand All @@ -76,6 +80,13 @@ type GossipSubParameters struct {
PeerScoringEnabled bool `mapstructure:"peer-scoring-enabled"`
SubscriptionProvider SubscriptionProviderParameters `mapstructure:"subscription-provider"`
ScoringParameters ScoringParameters `mapstructure:"scoring-parameters"`

// PeerGaterEnabled enables the peer gater.
PeerGaterEnabled bool `mapstructure:"peer-gater-enabled"`
// PeerGaterTopicDeliveryWeightsOverride topic delivery weights that will override the default value for the specified channel.
// This is a comma separated list "channel:weight, channel2:weight, channel3:weight".
// i.e: consensus-committee: 1.5, sync-committee: .75
PeerGaterTopicDeliveryWeightsOverride string `mapstructure:"peer-gater-topic-delivery-weights-override"`
}

const (
Expand All @@ -89,6 +100,22 @@ type ScoringParameters struct {
ScoringRegistryParameters ScoringRegistryParameters `validate:"required" mapstructure:"scoring-registry"`
}

// PeerGaterTopicDeliveryWeights returns the topic delivery weights configured on this struct as a map[string]float64 .
// Note: When new topic delivery weights are added to the struct this func should be updated.
func (g *GossipSubParameters) PeerGaterTopicDeliveryWeights() (map[string]float64, error) {
m := make(map[string]float64)
for _, weightConfig := range strings.Split(g.PeerGaterTopicDeliveryWeightsOverride, ",") {
wc := strings.Split(weightConfig, ":")
f, err := strconv.ParseFloat(strings.TrimSpace(wc[1]), 64)
if err != nil {
return nil, err
}
m[strings.TrimSpace(wc[0])] = f
}

return m, nil
}

// SubscriptionProviderParameters keys.
const (
UpdateIntervalKey = "update-interval"
Expand Down
1 change: 1 addition & 0 deletions network/p2p/config/gossipsub_rpc_inspectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
InspectionKey = "inspection"
TruncationKey = "truncation"
EnableKey = "enable"
EnabledKey = "enabled"
DisabledKey = "disabled"
MessageIDKey = "message-id"
RejectUnstakedPeers = "reject-unstaked-peers"
Expand Down
5 changes: 5 additions & 0 deletions network/p2p/mock/pub_sub_adapter_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions network/p2p/node/gossipSubAdapterConfig.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package p2pnode

import (
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -97,6 +99,16 @@ func (g *GossipSubAdapterConfig) WithTracer(tracer p2p.PubSubTracer) {
g.options = append(g.options, pubsub.WithRawTracer(tracer))
}

// WithPeerGater adds a peer gater option to the config.
// Args:
// - params: the topic delivery weights to use
// Returns:
// -None
func (g *GossipSubAdapterConfig) WithPeerGater(topicDeliveryWeights map[string]float64) {
peerGaterParams := pubsub.NewPeerGaterParams(pubsub.DefaultPeerGaterThreshold, pubsub.DefaultPeerGaterGlobalDecay, pubsub.ScoreParameterDecay(10*time.Minute)).WithTopicDeliveryWeights(topicDeliveryWeights)
g.options = append(g.options, pubsub.WithPeerGater(peerGaterParams))
}

// ScoreTracer returns the tracer for the peer score.
// Args:
// - None
Expand Down
1 change: 1 addition & 0 deletions network/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type PubSubAdapterConfig interface {
// This is used to expose the local scoring table of the GossipSub node to its higher level components.
WithScoreTracer(tracer PeerScoreTracer)
WithRpcInspector(GossipSubRPCInspector)
WithPeerGater(topicDeliveryWeights map[string]float64)
}

// GossipSubRPCInspector abstracts the general behavior of an app specific RPC inspector specifically
Expand Down
Loading