diff --git a/.github/workflows/linting.yml b/.github/workflows/linting.yml index f6a654b4..135edc31 100644 --- a/.github/workflows/linting.yml +++ b/.github/workflows/linting.yml @@ -7,7 +7,7 @@ jobs: fail-fast: true matrix: os: ["ubuntu-latest", "macOS-latest"] - go: ["1.20.x", "1.21.x"] + go: ["1.21.x", "1.22.x"] runs-on: ${{ matrix.os }} steps: - name: "checkout" @@ -21,10 +21,7 @@ jobs: - name: "fmt" run: if [ "$(go fmt ./... | wc -l)" -gt 0 ]; then echo "go fmt failed, please run again locally"; exit 1; fi if: matrix.os == 'ubuntu-latest' - - name: "test" - run: "go test ./..." + - name: "test datapoint and origin" + run: "go test github.com/orcfax/oracle-suite/pkg/datapoint/..." - name: "vet" run: "go vet ./..." - - run: "go install honnef.co/go/tools/cmd/staticcheck@latest" - - name: staticcheck - run: "staticcheck ./..." diff --git a/Makefile b/Makefile index e9b7197c..f35582e1 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,7 @@ gofer-release-sign: ## Build a gofer release.. goreleaser release --skip=publish --clean -f cmd/gofer/.goreleaser.yml lint: ## Lint the source code (--ignore-errors to ignore errs) + @echo ignore errors with "--ignore-errors" go fmt ./... staticcheck ./... golint ./... diff --git a/pkg/datapoint/orcfax_test.go b/pkg/datapoint/orcfax_test.go index e5a8e0a3..c397d318 100644 --- a/pkg/datapoint/orcfax_test.go +++ b/pkg/datapoint/orcfax_test.go @@ -10,13 +10,14 @@ import ( func TestReadBuildProperties(t *testing.T) { ldFlags := debug.BuildSetting{ - "-ldflags", "-s -w -X main.version=100.0.0-SNAPSHOT-057f3fc -X main.commit=057f3fc6318d1824148bf91de5ef674fe8b9a504 -X main.date=2024-01-29T19:14:07Z -X main.builtBy=goreleaser", + Key: "-ldflags", + Value: "-s -w -X main.version=100.0.0-SNAPSHOT-057f3fc -X main.commit=057f3fc6318d1824148bf91de5ef674fe8b9a504 -X main.date=2024-01-29T19:14:07Z -X main.builtBy=goreleaser", } res := parseBuildProperties([]debug.BuildSetting{ldFlags}) expected := value.BuildProperties{ - "057f3fc6318d1824148bf91de5ef674fe8b9a504", - "100.0.0-SNAPSHOT-057f3fc", - "2024-01-29T19:14:07Z", + Commit: "057f3fc6318d1824148bf91de5ef674fe8b9a504", + Version: "100.0.0-SNAPSHOT-057f3fc", + Date: "2024-01-29T19:14:07Z", } if res != expected { t.Errorf( diff --git a/pkg/transport/libp2p/crypto/ethkey/key_test.go b/pkg/transport/libp2p/crypto/ethkey/key_test.go deleted file mode 100644 index 6ac2c7ee..00000000 --- a/pkg/transport/libp2p/crypto/ethkey/key_test.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (C) 2021-2023 Chronicle Labs, Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ethkey - -import ( - "bytes" - "testing" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/stretchr/testify/assert" - - "github.com/defiweb/go-eth/types" -) - -var ( - testAddress1 = types.MustAddressFromHex("0x2d800d93b065ce011af83f316cef9f0d005b0aa4") - testAddress2 = types.MustAddressFromHex("0x8eb3daaf5cb4138f5f96711c09c0cfd0288a36e9") - fakeSignature = bytes.Repeat([]byte{0x01}, 65) -) - -func TestAddressToPeerID(t *testing.T) { - t.Run("correct-address", func(t *testing.T) { - assert.Equal( - t, - "1Afqz6rsuyYpr7Dpp12PbftE22nYH3k2Fw5", - HexAddressToPeerID("0x69B352cbE6Fc5C130b6F62cc8f30b9d7B0DC27d0").Pretty(), - ) - }) - t.Run("empty-address", func(t *testing.T) { - assert.Equal( - t, - "", - HexAddressToPeerID("").Pretty(), - ) - }) -} - -func TestPeerIDToAddress(t *testing.T) { - id, _ := peer.Decode("1Afqz6rsuyYpr7Dpp12PbftE22nYH3k2Fw5") - - assert.Equal( - t, - "0x69b352cbe6fc5c130b6f62cc8f30b9d7b0dc27d0", - PeerIDToAddress(id).String(), - ) -} diff --git a/pkg/transport/libp2p/crypto/ethkey/priv_test.go b/pkg/transport/libp2p/crypto/ethkey/priv_test.go deleted file mode 100644 index d1aa20a4..00000000 --- a/pkg/transport/libp2p/crypto/ethkey/priv_test.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright (C) 2021-2023 Chronicle Labs, Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ethkey - -import ( - "testing" - - "github.com/defiweb/go-eth/types" - "github.com/stretchr/testify/assert" - - "github.com/orcfax/oracle-suite/pkg/ethereum/mocks" -) - -func TestPrivKey_Equals(t *testing.T) { - sig1 := &mocks.Key{} - sig1.On("Address").Return(testAddress1) - prv1 := NewPrivKey(sig1) - - sig2 := &mocks.Key{} - sig2.On("Address").Return(testAddress2) - prv2 := NewPrivKey(sig2) - - assert.True(t, prv1.Equals(prv1)) //nolint:gocritic - assert.False(t, prv1.Equals(prv2)) -} - -func TestPrivKey_GetPublic(t *testing.T) { - sig := &mocks.Key{} - sig.On("Address").Return(testAddress1) - prv := NewPrivKey(sig) - - pub := prv.GetPublic() - assert.Equal(t, &PubKey{address: testAddress1}, pub) -} - -func TestPrivKey_Raw(t *testing.T) { - sig := &mocks.Key{} - sig.On("Address").Return(testAddress1) - prv := NewPrivKey(sig) - - bts, err := prv.Raw() - assert.NoError(t, err) - assert.Equal(t, testAddress1.Bytes(), bts) -} - -func TestPrivKey_Sign(t *testing.T) { - wthSig := types.MustSignatureFromBytes(fakeSignature) - sig := &mocks.Key{} - sig.On("SignMessage", []byte("foo")).Return(&wthSig, nil) - prv := NewPrivKey(sig) - - ethBts, err := prv.Sign([]byte("foo")) - assert.NoError(t, err) - assert.Equal(t, wthSig.Bytes(), ethBts) -} - -func TestPrivKey_Type(t *testing.T) { - assert.Equal(t, KeyTypeID, NewPrivKey(&mocks.Key{}).Type()) -} diff --git a/pkg/transport/libp2p/crypto/ethkey/pub_test.go b/pkg/transport/libp2p/crypto/ethkey/pub_test.go deleted file mode 100644 index 414fc9f4..00000000 --- a/pkg/transport/libp2p/crypto/ethkey/pub_test.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (C) 2021-2023 Chronicle Labs, Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ethkey - -import ( - "testing" - - "github.com/defiweb/go-eth/wallet" - "github.com/stretchr/testify/assert" -) - -func TestPubKey_Equals(t *testing.T) { - pub1 := NewPubKey(testAddress1) - pub2 := NewPubKey(testAddress2) - - assert.True(t, pub1.Equals(pub1)) - assert.False(t, pub1.Equals(pub2)) -} - -func TestPubKey_Raw(t *testing.T) { - pub := NewPubKey(testAddress1) - - b, err := pub.Raw() - assert.NoError(t, err) - assert.Equal(t, testAddress1.Bytes(), b) -} - -func TestPubKey_Type(t *testing.T) { - assert.Equal(t, KeyTypeID, NewPubKey(testAddress1).Type()) -} - -func TestPubKey_Verify(t *testing.T) { - key1 := wallet.NewRandomKey() - key2 := wallet.NewRandomKey() - sig1, _ := key1.SignMessage([]byte("foo")) - sig2, _ := key2.SignMessage([]byte("foo")) - - pub := NewPubKey(key1.Address()) - bts := []byte("foo") - - t.Run("valid", func(t *testing.T) { - ok, err := pub.Verify(bts, sig1.Bytes()) - assert.True(t, ok) - assert.NoError(t, err) - }) - t.Run("invalid", func(t *testing.T) { - ok, err := pub.Verify(bts, sig2.Bytes()) - assert.False(t, ok) - assert.NoError(t, err) - }) -} diff --git a/pkg/transport/libp2p/internal/node_test.go b/pkg/transport/libp2p/internal/node_test.go deleted file mode 100644 index ba0d65fe..00000000 --- a/pkg/transport/libp2p/internal/node_test.go +++ /dev/null @@ -1,240 +0,0 @@ -// Copyright (C) 2021-2023 Chronicle Labs, Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package internal - -import ( - "bytes" - "context" - "crypto/rand" - "fmt" - "net" - "testing" - "time" - - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const defaultTimeout = 10 * time.Second - -func TestNode_MessagePropagation(t *testing.T) { - // This test checks if messages are propagated between peers correctly. - // - // Topology: - // n0 <--[direct connection]--> n1 <--[direct connection]--> n2 - - peers, err := getNodeInfo(3) - require.NoError(t, err) - - ctx, ctxCancel := context.WithCancel(context.Background()) - defer ctxCancel() - - n0, err := NewNode( - PeerPrivKey(peers[0].PrivKey), - ListenAddrs(peers[0].ListenAddrs), - DirectPeers(peers[1].PeerAddrs), - ExternalAddr(multiaddr.StringCast("/ip4/10.0.0.1/tcp/5555")), - ) - require.NoError(t, err) - require.NoError(t, n0.Start(ctx)) - time.Sleep(time.Second) - - n1, err := NewNode( - PeerPrivKey(peers[1].PrivKey), - ListenAddrs(peers[1].ListenAddrs), - DirectPeers(append([]multiaddr.Multiaddr{}, peers[0].PeerAddrs[0], peers[2].PeerAddrs[0])), - ) - require.NoError(t, err) - require.NoError(t, n1.Start(ctx)) - time.Sleep(time.Second) - - n2, err := NewNode( - PeerPrivKey(peers[2].PrivKey), - ListenAddrs(peers[2].ListenAddrs), - DirectPeers(peers[1].PeerAddrs), - ) - require.NoError(t, err) - require.NoError(t, n2.Start(ctx)) - time.Sleep(time.Second) - - _, err = n0.Subscribe("test") - require.NoError(t, err) - _, err = n1.Subscribe("test") - require.NoError(t, err) - _, err = n2.Subscribe("test") - require.NoError(t, err) - - // Wait for the peers to connect to each other: - waitFor(t, func() bool { - return len(n0.PubSub().ListPeers("test")) > 0 && - len(n1.PubSub().ListPeers("test")) > 0 && - len(n2.PubSub().ListPeers("test")) > 0 - }) - - s0, err := n0.Subscription("test") - require.NoError(t, err) - s1, err := n1.Subscription("test") - require.NoError(t, err) - s2, err := n2.Subscription("test") - require.NoError(t, err) - - err = s0.Publish([]byte("makerdao")) - assert.NoError(t, err) - - // Message should be received on both nodes: - waitForMessage(t, s0.Next(), []byte("makerdao")) - waitForMessage(t, s1.Next(), []byte("makerdao")) - waitForMessage(t, s2.Next(), []byte("makerdao")) -} - -// message is the simplest implementation of the transport.Message interface. -type message []byte - -func (m *message) String() string { - if m == nil { - return "" - } - return string(*m) -} - -func (m *message) Equal(msg *message) bool { - return bytes.Equal(*m, *msg) -} - -func (m *message) MarshallBinary() ([]byte, error) { - return *m, nil -} - -func (m *message) UnmarshallBinary(bytes []byte) error { - *m = bytes - return nil -} - -type nodeInfo struct { - ID peer.ID - PrivKey crypto.PrivKey - ListenAddrs []multiaddr.Multiaddr - PeerAddrs []multiaddr.Multiaddr -} - -// getNodeInfo returns n nodeInfo structs which can be used to generate -// random test nodes. -func getNodeInfo(n int) ([]nodeInfo, error) { - ps, err := findFreePorts(n) - if err != nil { - return nil, err - } - var pi []nodeInfo - for i := 0; i < n; i++ { - rr := rand.Reader - sk, _, err := crypto.GenerateEd25519Key(rr) - if err != nil { - return nil, err - } - id, err := peer.IDFromPrivateKey(sk) - if err != nil { - return nil, err - } - pi = append(pi, nodeInfo{ - ListenAddrs: []multiaddr.Multiaddr{multiaddr.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", ps[i]))}, - PeerAddrs: []multiaddr.Multiaddr{multiaddr.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/p2p/%s", ps[i], id.Pretty()))}, - PrivKey: sk, - ID: id, - }) - } - return pi, nil -} - -// findFreePorts returns n random ports available to use. -func findFreePorts(n int) ([]int, error) { - var ports []int - for i := 0; i < n; i++ { - addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") - if err != nil { - return nil, err - } - l, err := net.ListenTCP("tcp", addr) - if err != nil { - return nil, err - } - defer l.Close() - ports = append(ports, l.Addr().(*net.TCPAddr).Port) - } - return ports, nil -} - -// waitFor waits until cond becomes true. -func waitFor(t *testing.T, cond func() bool) { - s := time.Now() - for !cond() { - if time.Since(s) >= defaultTimeout { - assert.Fail(t, "timeout") - return - } - time.Sleep(time.Second) - } -} - -// waitForMessage waits for the expected message. -func waitForMessage(t *testing.T, stat chan *pubsub.Message, expected []byte) { - to := time.After(defaultTimeout) - select { - case received := <-stat: - assert.Equal(t, expected, received.GetData()) - case <-to: - assert.Fail(t, "timeout") - return - } -} - -// countMessages counts asynchronously received messages for specified time -// duration, then returns results in channel. -func countMessages(sub *Subscription, duration time.Duration) chan map[peer.ID]int { - ch := make(chan map[peer.ID]int) - go func() { - count := map[peer.ID]int{} - defer func() { ch <- count }() - for { - select { - case <-time.After(duration): - return - case msg, ok := <-sub.Next(): - if !ok { - return - } - id := msg.GetFrom() - if _, ok := count[id]; !ok { - count[id] = 0 - } - count[id]++ - } - } - }() - return ch -} - -func containsPeerID(ids []peer.ID, id peer.ID) bool { - for _, i := range ids { - if i == id { - return true - } - } - return false -} diff --git a/pkg/transport/libp2p/internal/options_peer_scoring_test.go b/pkg/transport/libp2p/internal/options_peer_scoring_test.go deleted file mode 100644 index ca62f53d..00000000 --- a/pkg/transport/libp2p/internal/options_peer_scoring_test.go +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright (C) 2021-2023 Chronicle Labs, Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package internal - -import ( - "bytes" - "context" - "sync" - "testing" - "time" - - pubsub "github.com/libp2p/go-libp2p-pubsub" - pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -type scoringEventTracer struct { - prunes int - mu sync.Mutex -} - -func (s *scoringEventTracer) Trace(evt *pubsub_pb.TraceEvent) { - s.mu.Lock() - defer s.mu.Unlock() - if evt.GetType() == pubsub_pb.TraceEvent_PRUNE { - s.prunes++ - } -} - -func (s *scoringEventTracer) Prunes() int { - s.mu.Lock() - defer s.mu.Unlock() - return s.prunes -} - -func TestNode_PeerScoring(t *testing.T) { - // This test verifies that the integration with peer scoring is working - // correctly. - - var peerScoreParams = &pubsub.PeerScoreParams{ - AppSpecificScore: func(id peer.ID) float64 { return 0 }, - AppSpecificWeight: 1, - IPColocationFactorWeight: -1, - IPColocationFactorThreshold: 1, - DecayInterval: 1 * time.Minute, - DecayToZero: 0.01, - RetainScore: 10 * time.Second, - Topics: make(map[string]*pubsub.TopicScoreParams), - } - - var topicScoreParams = &pubsub.TopicScoreParams{ - TimeInMeshCap: 0, - TimeInMeshQuantum: 1 * time.Second, - TopicWeight: 1, - FirstMessageDeliveriesWeight: 100, - FirstMessageDeliveriesDecay: 0.999, - FirstMessageDeliveriesCap: 300, - InvalidMessageDeliveriesWeight: -100, - InvalidMessageDeliveriesDecay: 0.999, - } - - var thresholds = &pubsub.PeerScoreThresholds{ - GossipThreshold: -100, - PublishThreshold: -200, - GraylistThreshold: -300, - AcceptPXThreshold: 0, - } - - peers, err := getNodeInfo(2) - require.NoError(t, err) - - ctx, ctxCancel := context.WithCancel(context.Background()) - defer ctxCancel() - - et := &scoringEventTracer{} - n0, err := NewNode( - PeerPrivKey(peers[0].PrivKey), - ListenAddrs(peers[0].ListenAddrs), - PubsubEventTracer(et), - PeerScoring(peerScoreParams, thresholds, func(topic string) *pubsub.TopicScoreParams { - return topicScoreParams - }), - ) - require.NoError(t, err) - require.NoError(t, n0.Start(ctx)) - time.Sleep(time.Second) - - n1, err := NewNode( - PeerPrivKey(peers[1].PrivKey), - ListenAddrs(peers[1].ListenAddrs), - ) - require.NoError(t, err) - require.NoError(t, n1.Start(ctx)) - time.Sleep(time.Second) - - // Add validator to the n0 node which will reject all received messages - // from the second node: - n0.AddValidator(func(ctx context.Context, topic string, id peer.ID, msg *pubsub.Message) pubsub.ValidationResult { - if n0.Host().ID() == id { - return pubsub.ValidationAccept - } - if bytes.Equal(msg.Data, []byte("valid")) { - return pubsub.ValidationAccept - } - return pubsub.ValidationReject - }) - - require.NoError(t, n1.Connect(peers[0].PeerAddrs[0])) - _, err = n0.Subscribe("test") - require.NoError(t, err) - _, err = n1.Subscribe("test") - require.NoError(t, err) - - s0, err := n0.Subscription("test") - require.NoError(t, err) - s1, err := n1.Subscription("test") - require.NoError(t, err) - - // Wait for the peers to connect to each other: - waitFor(t, func() bool { - return len(n0.PubSub().ListPeers("test")) > 0 && len(n1.PubSub().ListPeers("test")) > 0 - }) - - s0wait := countMessages(s0, 2*time.Second) - - // Send a few valid messages to boost a peer score: - for i := 0; i < 5; i++ { - err := s1.Publish([]byte("valid")) - if err != nil { - panic(err) - } - } - - // Now send 4 invalid messages, that should be enough to lower the score below 0: - for i := 0; i < 4; i++ { - err := s1.Publish([]byte("invalid")) - if err != nil { - panic(err) - } - } - <-s0wait - - // Wait 1 second for the "heartbeat". - time.Sleep(1 * time.Second) - - // The n1 node should be pruned: - assert.Equal(t, 1, et.Prunes()) -} diff --git a/pkg/transport/libp2p/internal/options_rate_limiter_test.go b/pkg/transport/libp2p/internal/options_rate_limiter_test.go deleted file mode 100644 index f625b8ff..00000000 --- a/pkg/transport/libp2p/internal/options_rate_limiter_test.go +++ /dev/null @@ -1,150 +0,0 @@ -// Copyright (C) 2021-2023 Chronicle Labs, Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package internal - -import ( - "context" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestNode_RateLimiter_PeerLimit(t *testing.T) { - // This test checks if peer limits works correctly. The limit is set to - // 128 bytes/s, we will try to send two messages of 128 bytes each. - // The second one must be rejected because it will exceed the 128 bytes/s - // limit. Then we wait one second and try to send another 128 byte message. - // This time the message should be accepted. - - peers, err := getNodeInfo(2) - require.NoError(t, err) - - ctx, ctxCancel := context.WithCancel(context.Background()) - defer ctxCancel() - - n0, err := NewNode( - PeerPrivKey(peers[0].PrivKey), - ListenAddrs(peers[0].ListenAddrs), - RateLimiter(RateLimiterConfig{ - BytesPerSecond: 128, - BurstSize: 128, - RelayBytesPerSecond: 128, - RelayBurstSize: 128, - }), - ) - require.NoError(t, err) - require.NoError(t, n0.Start(ctx)) - time.Sleep(time.Second) - - n1, err := NewNode( - PeerPrivKey(peers[1].PrivKey), - ListenAddrs(peers[1].ListenAddrs), - ) - require.NoError(t, err) - require.NoError(t, n1.Start(ctx)) - time.Sleep(time.Second) - - require.NoError(t, n1.Connect(peers[0].PeerAddrs[0])) - _, err = n0.Subscribe("test") - require.NoError(t, err) - _, err = n1.Subscribe("test") - require.NoError(t, err) - - s1, err := n0.Subscription("test") - require.NoError(t, err) - s2, err := n1.Subscription("test") - require.NoError(t, err) - - // Wait for the peers to connect to each other: - waitFor(t, func() bool { - return len(n0.PubSub().ListPeers("test")) > 0 && len(n1.PubSub().ListPeers("test")) > 0 - }) - - // Send messages: - msgsCh := countMessages(s1, 2*time.Second) - msg := []byte(strings.Repeat("a", 128)) - require.NoError(t, s2.Publish(msg)) - require.NoError(t, s2.Publish(msg)) // exceeds limit - time.Sleep(1 * time.Second) - require.NoError(t, s2.Publish(msg)) - - // Only two messages should arrive, rest messages exceed the peer limit: - assert.Equal(t, 2, (<-msgsCh)[n1.Host().ID()]) -} - -func TestNode_RateLimiter_PeerBurst(t *testing.T) { - // This test checks if data burst for a peer works correctly. The value for - // the data limit is smaller than the message size. We will try to send two - // messages. The first one should be accepted because its size is within the - // burst limit. The second one should be rejected because it exceeds the - // burst limit. - - peers, err := getNodeInfo(2) - require.NoError(t, err) - - ctx, ctxCancel := context.WithCancel(context.Background()) - defer ctxCancel() - - n0, err := NewNode( - PeerPrivKey(peers[0].PrivKey), - ListenAddrs(peers[0].ListenAddrs), - RateLimiter(RateLimiterConfig{ - BytesPerSecond: 1, - BurstSize: 128, - RelayBytesPerSecond: 1, - RelayBurstSize: 128, - }), - ) - require.NoError(t, err) - require.NoError(t, n0.Start(ctx)) - time.Sleep(time.Second) - - n1, err := NewNode( - PeerPrivKey(peers[1].PrivKey), - ListenAddrs(peers[1].ListenAddrs), - ) - require.NoError(t, err) - require.NoError(t, n1.Start(ctx)) - time.Sleep(time.Second) - - require.NoError(t, n1.Connect(peers[0].PeerAddrs[0])) - _, err = n0.Subscribe("test") - require.NoError(t, err) - _, err = n1.Subscribe("test") - require.NoError(t, err) - - s1, err := n0.Subscription("test") - require.NoError(t, err) - s2, err := n1.Subscription("test") - require.NoError(t, err) - - // Wait for the peers to connect to each other: - waitFor(t, func() bool { - return len(n0.PubSub().ListPeers("test")) > 0 && len(n1.PubSub().ListPeers("test")) > 0 - }) - - // Send messages: - msgsCh := countMessages(s1, 2*time.Second) - msg := []byte(strings.Repeat("a", 128)) - require.NoError(t, s2.Publish(msg)) - require.NoError(t, s2.Publish(msg)) - - // Only one message should arrive, second one exceeds the peer limit: - assert.Equal(t, 1, (<-msgsCh)[n1.Host().ID()]) -} diff --git a/pkg/transport/libp2p/internal/options_test.go b/pkg/transport/libp2p/internal/options_test.go deleted file mode 100644 index 16da1b21..00000000 --- a/pkg/transport/libp2p/internal/options_test.go +++ /dev/null @@ -1,307 +0,0 @@ -// Copyright (C) 2021-2023 Chronicle Labs, Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package internal - -import ( - "context" - "crypto/rand" - "testing" - "time" - - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestNode_PeerPrivKey(t *testing.T) { - // This tests checks the PeerPrivKey option. - - sk, _, _ := crypto.GenerateRSAKeyPair(2048, rand.Reader) - - ctx, ctxCancel := context.WithCancel(context.Background()) - defer ctxCancel() - - n, err := NewNode( - PeerPrivKey(sk), - ) - require.NoError(t, err) - require.NoError(t, n.Start(ctx)) - time.Sleep(time.Second) - - id, _ := peer.IDFromPrivateKey(sk) - assert.Equal(t, id, n.Host().ID()) -} - -func TestNode_MessagePrivKey(t *testing.T) { - // This tests checks the MessagePrivKey option. - - sk, _, _ := crypto.GenerateRSAKeyPair(2048, rand.Reader) - - ctx, ctxCancel := context.WithCancel(context.Background()) - defer ctxCancel() - - n, err := NewNode( - MessagePrivKey(sk), - ) - require.NoError(t, err) - require.NoError(t, n.Start(ctx)) - time.Sleep(time.Second) - - _, err = n.Subscribe("test") - require.NoError(t, err) - s, err := n.Subscription("test") - require.NoError(t, err) - - err = s.Publish([]byte("makerdao")) - require.NoError(t, err) - - // The public key used to sign the message should be derived from the key - // passed to the MessagePrivKey function: - id, _ := peer.IDFromPrivateKey(sk) - msg := <-s.Next() - assert.Equal(t, id, msg.GetFrom()) - // The public key extracted form a message must be different - // from peer's public key: - assert.NotEqual(t, n.Host().ID(), msg.GetFrom()) -} - -func TestNode_Discovery(t *testing.T) { - // This test checks whether all nodes in the network can discover each - // other when Discovery option is used. - // - // Topology: - // n0 <--[discovery]--> n1 <--[discovery]--> n2 - - peers, err := getNodeInfo(3) - require.NoError(t, err) - - ctx, ctxCancel := context.WithCancel(context.Background()) - defer ctxCancel() - - n0, err := NewNode( - PeerPrivKey(peers[0].PrivKey), - ListenAddrs(peers[0].ListenAddrs), - Discovery(nil), - ) - require.NoError(t, err) - require.NoError(t, n0.Start(ctx)) - time.Sleep(time.Second) - - n1, err := NewNode( - PeerPrivKey(peers[1].PrivKey), - ListenAddrs(peers[1].ListenAddrs), - Discovery(peers[0].PeerAddrs), - ) - require.NoError(t, err) - require.NoError(t, n1.Start(ctx)) - time.Sleep(time.Second) - - n2, err := NewNode( - PeerPrivKey(peers[2].PrivKey), - ListenAddrs(peers[2].ListenAddrs), - Discovery(peers[0].PeerAddrs), - ) - require.NoError(t, err) - require.NoError(t, n2.Start(ctx)) - time.Sleep(time.Second) - - _, err = n0.Subscribe("test") - require.NoError(t, err) - _, err = n1.Subscribe("test") - require.NoError(t, err) - _, err = n2.Subscribe("test") - require.NoError(t, err) - - // Every peer should see two other peers: - waitFor(t, func() bool { - lp := n0.PubSub().ListPeers("test") - return containsPeerID(lp, peers[1].ID) && containsPeerID(lp, peers[2].ID) - }) - waitFor(t, func() bool { - lp := n1.PubSub().ListPeers("test") - return containsPeerID(lp, peers[0].ID) && containsPeerID(lp, peers[2].ID) - }) - waitFor(t, func() bool { - lp := n2.PubSub().ListPeers("test") - return containsPeerID(lp, peers[0].ID) && containsPeerID(lp, peers[1].ID) - }) -} - -func TestNode_Discovery_AddrNotLeaking(t *testing.T) { - // This test checks that the addresses of nodes that do not use discovery - // are not revealed to other nodes in the network. - // - // Topology: - // n0 <--[discovery]--> n1 <--[direct]--> n2 - // - // - n0 node should only be connected to n1 node (through discovery) - // - n1 node should only be connected to n0 node (through discovery) and n1 (through direct connection) - // - n2 node should only be connected to n1 node (through direct connection) - // - the n0 node's address must not be exposed to n2 node - - peers, err := getNodeInfo(3) - require.NoError(t, err) - - ctx, ctxCancel := context.WithCancel(context.Background()) - defer ctxCancel() - - n0, err := NewNode( - PeerPrivKey(peers[0].PrivKey), - ListenAddrs(peers[0].ListenAddrs), - Discovery(nil), - ) - require.NoError(t, err) - require.NoError(t, n0.Start(ctx)) - time.Sleep(time.Second) - - n1, err := NewNode( - PeerPrivKey(peers[1].PrivKey), - ListenAddrs(peers[1].ListenAddrs), - DirectPeers(peers[2].PeerAddrs), - Discovery(peers[0].PeerAddrs), - ) - require.NoError(t, err) - require.NoError(t, n1.Start(ctx)) - time.Sleep(time.Second) - - n2, err := NewNode( - PeerPrivKey(peers[2].PrivKey), - ListenAddrs(peers[2].ListenAddrs), - DirectPeers(peers[1].PeerAddrs), - ) - require.NoError(t, err) - require.NoError(t, n2.Start(ctx)) - time.Sleep(time.Second) - - _, err = n0.Subscribe("test") - require.NoError(t, err) - _, err = n1.Subscribe("test") - require.NoError(t, err) - _, err = n2.Subscribe("test") - require.NoError(t, err) - - waitFor(t, func() bool { - lp := n0.PubSub().ListPeers("test") - return len(lp) == 1 && containsPeerID(lp, peers[1].ID) - }) - waitFor(t, func() bool { - lp := n1.PubSub().ListPeers("test") - return len(lp) == 2 && containsPeerID(lp, peers[0].ID) && containsPeerID(lp, peers[2].ID) - }) - waitFor(t, func() bool { - lp := n2.PubSub().ListPeers("test") - return len(lp) == 1 && containsPeerID(lp, peers[1].ID) - }) -} - -func TestNode_ConnectionLimit(t *testing.T) { - // This test checks whether the connection number is properly limited when - // the ConnectionLimit option is used. - - peers, err := getNodeInfo(5) - require.NoError(t, err) - - ctx, ctxCancel := context.WithCancel(context.Background()) - defer ctxCancel() - - n, err := NewNode( - PeerPrivKey(peers[0].PrivKey), - ListenAddrs(peers[0].ListenAddrs), - ConnectionLimit(1, 1, 0), - ) - require.NoError(t, err) - require.NoError(t, n.Start(ctx)) - time.Sleep(time.Second) - - for i := 2; i < len(peers); i++ { - n, err := NewNode( - PeerPrivKey(peers[i].PrivKey), - ListenAddrs(peers[i].ListenAddrs), - Discovery(nil), - ) - require.NoError(t, err) - require.NoError(t, n.Start(ctx)) - time.Sleep(time.Second) - - require.NoError(t, n.Connect(peers[0].PeerAddrs[0])) - } - - n.Host().ConnManager().TrimOpenConns(context.Background()) - time.Sleep(time.Second) - - conns := 0 - for _, p := range n.Host().Peerstore().Peers() { - if n.Host().Network().Connectedness(p) == network.Connected { - conns++ - } - } - - assert.Equal(t, conns, 1) -} - -func TestNode_DirectPeers(t *testing.T) { - // This test checks whether the direct connection between peers configured - // using the DirectPeers option is always maintained. - - peers, err := getNodeInfo(5) - require.NoError(t, err) - - ctx, ctxCancel := context.WithCancel(context.Background()) - defer ctxCancel() - - n0, err := NewNode( - PeerPrivKey(peers[0].PrivKey), - ListenAddrs(peers[0].ListenAddrs), - ConnectionLimit(1, 1, 0), - DirectPeers(peers[1].PeerAddrs), - ) - require.NoError(t, err) - require.NoError(t, n0.Start(ctx)) - time.Sleep(time.Second) - - n1, err := NewNode( - PeerPrivKey(peers[1].PrivKey), - ListenAddrs(peers[1].ListenAddrs), - DirectPeers(peers[0].PeerAddrs), - ) - require.NoError(t, err) - require.NoError(t, n1.Start(ctx)) - time.Sleep(time.Second) - - for i := 2; i < len(peers); i++ { - n, err := NewNode( - PeerPrivKey(peers[i].PrivKey), - ListenAddrs(peers[i].ListenAddrs), - ) - require.NoError(t, err) - require.NoError(t, n.Start(ctx)) - - require.NoError(t, n.Connect(peers[0].PeerAddrs[0])) - // Connection with tagged hosts are less likely to be dropped. - // By tagging them we can be sure it wasn't coincidence that - // the connection to n1 host is maintained after call to - // the TrimOpenConns method. - n0.Host().ConnManager().TagPeer(n.Host().ID(), "test", 1) - } - - // The connection between n0 and n1 nodes should be persisted even - // with a connection limit: - n0.Host().ConnManager().TrimOpenConns(context.Background()) - time.Sleep(time.Second) - assert.Equal(t, network.Connected, n0.Host().Network().Connectedness(n1.Host().ID())) -}