From 4d86d1b231dd01ab01157c7035c25b951fc7dfdf Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Sun, 10 Mar 2024 10:40:57 -0400 Subject: [PATCH] Pushrate fix (#218) * corrected ipfs requirements for testing * removed systemresourcemanager * Update main.go * gobreaker --- blockchain/bl_pool.go | 4 +- blockchain/blockchain.go | 20 +++---- blox/blox.go | 15 +++-- cmd/blox/main.go | 31 +++++++++- exchange/fx_exchange.go | 119 +++++++++++++++++++++++++++++++++------ go.mod | 1 + go.sum | 2 + mobile/config.go | 6 +- mobile/example_test.go | 19 ++----- 9 files changed, 165 insertions(+), 52 deletions(-) diff --git a/blockchain/bl_pool.go b/blockchain/bl_pool.go index 22f381f..50e5afe 100644 --- a/blockchain/bl_pool.go +++ b/blockchain/bl_pool.go @@ -658,7 +658,9 @@ func (bl *FxBlockchain) HandlePoolJoinRequest(ctx context.Context, from peer.ID, // Set up the request with context for timeout ctxPing, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - + if bl.rpc == nil { + return fmt.Errorf("IPFS rpc is not defined") + } // Send the ping request res, err := bl.rpc.Request("ping", from.String()).Send(ctxPing) if err != nil { diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index a4c6743..520a2d6 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -862,17 +862,16 @@ func (bl *FxBlockchain) SetAuth(ctx context.Context, on peer.ID, subject peer.ID func (bl *FxBlockchain) authorized(pid peer.ID, action string) bool { log.Debugw("Checking authorization", "action", action, "pid", pid, "bl.authorizer", bl.authorizer, "h.ID", bl.h.ID()) - if bl.authorizer == bl.h.ID() || bl.authorizer == "" { //to cover the cases where in poolHost mode - return action == actionReplicateInPool - } switch action { + case actionReplicateInPool: + return (bl.authorizer == bl.h.ID() || bl.authorizer == "") case actionBloxFreeSpace, actionAccountFund, actionManifestBatchUpload, actionAssetsBalance, actionGetDatastoreSize, actionGetFolderSize, actionFetchContainerLogs, actionEraseBlData, actionWifiRemoveall, actionReboot, actionPartition, actionDeleteWifi, actionDisconnectWifi, actionDeleteFulaConfig, actionGetAccount, actionSeeded, actionAccountExists, actionPoolCreate, actionPoolJoin, actionPoolCancelJoin, actionPoolRequests, actionPoolList, actionPoolVote, actionPoolLeave, actionManifestUpload, actionManifestStore, actionManifestAvailable, actionManifestRemove, actionManifestRemoveStorer, actionManifestRemoveStored: bl.authorizedPeersLock.RLock() _, ok := bl.authorizedPeers[pid] bl.authorizedPeersLock.RUnlock() return ok case actionAuth: - return pid == bl.authorizer + return pid == bl.authorizer && bl.authorizer != "" default: return false } @@ -1113,15 +1112,16 @@ func (bl *FxBlockchain) FetchUsersAndPopulateSets(ctx context.Context, topicStri reqCtx, cancelReqCtx := context.WithTimeout(ctx, 2*time.Second) defer cancelReqCtx() // Ensures resources are cleaned up after the Stat call poolHostAddrString := "/dns4/" + clusterEndpoint + "/tcp/4001/p2p/" + poolHostPeerID - bl.rpc.Request("bootstrap/add", poolHostAddrString).Send(reqCtx) - poolHostAddr, err := ma.NewMultiaddr(poolHostAddrString) - if err == nil { - poolHostAddrInfos, err := peer.AddrInfosFromP2pAddrs(poolHostAddr) + if bl.rpc != nil { + bl.rpc.Request("bootstrap/add", poolHostAddrString).Send(reqCtx) + poolHostAddr, err := ma.NewMultiaddr(poolHostAddrString) if err == nil { - bl.rpc.Swarm().Connect(reqCtx, poolHostAddrInfos[0]) + poolHostAddrInfos, err := peer.AddrInfosFromP2pAddrs(poolHostAddr) + if err == nil { + bl.rpc.Swarm().Connect(reqCtx, poolHostAddrInfos[0]) + } } } - } else { // Handle the error: Endpoint didn't match the pattern fmt.Println("Error: Could not extract peerID from endpoint") diff --git a/blox/blox.go b/blox/blox.go index 6e283aa..cdf26da 100644 --- a/blox/blox.go +++ b/blox/blox.go @@ -19,6 +19,7 @@ import ( "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/kubo/client/rpc" + iface "github.com/ipfs/kubo/core/coreiface" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -126,6 +127,9 @@ func (p *Blox) PubsubValidator(ctx context.Context, id peer.ID, msg *pubsub.Mess func (p *Blox) storeCidIPFS(ctx context.Context, c path.Path) error { getCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() + if p.rpc == nil { + return fmt.Errorf("IPFS rpc is undefined") + } _, err := p.rpc.Block().Get(getCtx, c) if err != nil { log.Errorw("It seems that the link is not found", "c", c, "err", err) @@ -185,10 +189,13 @@ func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error { } statCtx, cancel := context.WithTimeout(ctx, 1*time.Second) defer cancel() // Ensures resources are cleaned up after the Stat call - stat, err := p.rpc.Block().Stat(statCtx, cidPath) - if err != nil { - log.Errorw("It seems that the link is not stored", "l", l, "err", err) - continue + var stat iface.BlockStat + if p.rpc != nil { + stat, err = p.rpc.Block().Stat(statCtx, cidPath) + if err != nil { + log.Errorw("It seems that the link is not stored", "l", l, "err", err) + continue + } } p.ex.IpniNotifyLink(l) log.Debugw("link might be successfully stored", "l", l, "from", provider.ID, "size", stat.Size()) diff --git a/cmd/blox/main.go b/cmd/blox/main.go index f172efd..379908e 100644 --- a/cmd/blox/main.go +++ b/cmd/blox/main.go @@ -19,6 +19,7 @@ import ( "path" "path/filepath" "strconv" + "strings" "sync" "syscall" "time" @@ -1043,7 +1044,6 @@ func action(ctx *cli.Context) error { listenAddrs = append(listenAddrs, relayAddr2) hopts := []libp2p.Option{ - libp2p.ListenAddrs(listenAddrs...), libp2p.EnableNATService(), libp2p.NATPortMap(), libp2p.EnableRelay(), @@ -1078,12 +1078,33 @@ func action(ctx *cli.Context) error { )) } - bopts := append(hopts, libp2p.Identity(k)) + bopts := append(hopts, libp2p.Identity(k), libp2p.ListenAddrs(listenAddrs...)) h, err := libp2p.New(bopts...) if err != nil { return err } - iopts := append(hopts, libp2p.Identity(ipnik)) + + // Create a new slice for updated listen addresses + ipniListenAddrs := make([]ma.Multiaddr, len(listenAddrs)) + // Update each address in the slice + for i, addr := range listenAddrs { + // Convert the multiaddr to a string for manipulation + addrStr := addr.String() + + // Replace "40001" with "40002" in the string representation + updatedAddrStr := strings.Replace(addrStr, "40001", "40002", -1) + + // Parse the updated string back into a multiaddr.Multiaddr + updatedAddr, err := ma.NewMultiaddr(updatedAddrStr) + if err != nil { + log.Fatalf("Failed to parse updated multiaddr '%s': %v", updatedAddrStr, err) + } + + // Store the updated multiaddr in the slice + ipniListenAddrs[i] = updatedAddr + } + + iopts := append(hopts, libp2p.Identity(ipnik), libp2p.ListenAddrs(ipniListenAddrs...)) ipnih, err := libp2p.New(iopts...) if err != nil { return err @@ -1102,6 +1123,10 @@ func action(ctx *cli.Context) error { panic(fmt.Errorf("invalid multiaddress: %w", err)) } node, err := rpc.NewApi(nodeMultiAddr) + if err != nil { + logger.Fatal(err) + return err + } const useIPFSServer = "none" //internal: runs local ipfs instance, none requires an external one and fula runs the mock server on 5001 if useIPFSServer == "internal" { diff --git a/exchange/fx_exchange.go b/exchange/fx_exchange.go index fc94822..976cd5a 100644 --- a/exchange/fx_exchange.go +++ b/exchange/fx_exchange.go @@ -8,6 +8,8 @@ import ( "errors" "fmt" "io" + "math" + "math/rand" "net/http" "strings" "sync" @@ -34,6 +36,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + "github.com/sony/gobreaker" "golang.org/x/sync/errgroup" ) @@ -51,6 +54,7 @@ var ( log = logging.Logger("fula/exchange") errUnauthorized = errors.New("not authorized") exploreAllRecursivelySelector selector.Selector + breakTime = 10 * time.Second ) type tempAuthEntry struct { @@ -58,6 +62,16 @@ type tempAuthEntry struct { timestamp time.Time } +var cbSettings = gobreaker.Settings{ + Name: "Push Circuit Breaker", + MaxRequests: 3, // Open circuit after 3 consecutive failures + Interval: breakTime, // Remain open for 10 seconds + OnStateChange: func(name string, from gobreaker.State, to gobreaker.State) { + log.Errorf("Circuit breaker '%s' changed state from '%s' to '%s'", name, from, to) + }, +} +var pushCircuitBreaker = gobreaker.NewCircuitBreaker(cbSettings) + func init() { var err error ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) @@ -101,7 +115,13 @@ func NewFxExchange(h host.Host, ls ipld.LinkSystem, o ...Option) (*FxExchange, e if err != nil { return nil, err } - tr := &http.Transport{} + tr := &http.Transport{ + DisableKeepAlives: true, // Ensure connections are not reused + MaxIdleConns: 500, + MaxConnsPerHost: 2000, + IdleConnTimeout: 20 * time.Second, + // Include any other necessary transport configuration here + } tr.RegisterProtocol("libp2p", p2phttp.NewTransport(h, p2phttp.ProtocolOption(FxExchangeProtocolID))) client := &http.Client{Transport: tr} @@ -407,6 +427,7 @@ func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error { // Recursively traverse the node and push all its leaves. err = progress.WalkMatching(node, exploreAllRecursivelySelector, func(progress traversal.Progress, node datamodel.Node) error { eg.Go(func() error { + // Create a new context for this specific push operation e.pushRateLimiter.Take() link, err := e.ls.ComputeLink(l.Prototype(), node) if err != nil { @@ -424,7 +445,10 @@ func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error { } func (e *FxExchange) pushOneNode(ctx context.Context, node ipld.Node, to peer.ID, link datamodel.Link) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() var buf bytes.Buffer + c := link.(cidlink.Link).Cid encoder, err := ipldmc.LookupEncoder(c.Prefix().Codec) if err != nil { @@ -441,24 +465,84 @@ func (e *FxExchange) pushOneNode(ctx context.Context, node ipld.Node, to peer.ID log.Errorw("Failed to instantiate push request", "err", err) return err } - resp, err := e.c.Do(req) + _, err = pushCircuitBreaker.Execute(func() (interface{}, error) { + resp, err := e.c.Do(req) + if err != nil { + log.Errorw("Failed to do push request1", "err", err) + return nil, err + } + defer resp.Body.Close() + + // Handle successful response with your existing logic + b, err := io.ReadAll(resp.Body) + switch { + case err != nil: + log.Errorw("Failed to read the response from push", "err", err) + return nil, err // Signal an error to the circuit breaker + case resp.StatusCode != http.StatusOK: + log.Errorw("Received non-OK response from push", "err", err) + return nil, fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b)) + default: + log.Debug("Successfully pushed traversed node") + return nil, nil // Signal success to the circuit breaker + } + }) + // Handle potential errors and circuit breaker state if err != nil { - log.Errorw("Failed to send push request", "err", err) - return err + switch { + case errors.Is(err, gobreaker.ErrOpenState): + // Circuit breaker is open (likely due to previous errors) + return fmt.Errorf("circuit breaker open: %w", err) + default: + // Potentially retry or return the error based on its nature + return retryWithBackoff(ctx, func() error { + // Attempt to execute the request within the circuit breaker again + _, err := pushCircuitBreaker.Execute(func() (interface{}, error) { + resp, err := e.c.Do(req) + if err != nil { + log.Errorw("Failed to do push request on retry", "err", err) + return nil, err // Signal failure to the circuit breaker + } + defer resp.Body.Close() + + // Handle successful response with your existing logic + b, err := io.ReadAll(resp.Body) + switch { + case err != nil: + log.Errorw("Failed to read the response from push", "err", err) + return nil, err // Signal an error to the circuit breaker + case resp.StatusCode != http.StatusOK: + log.Errorw("Received non-OK response from push", "err", err) + return nil, fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b)) + default: + log.Debug("Successfully pushed traversed node") + return nil, nil // Signal success to the circuit breaker + } + }) + return err // Return potential errors after circuit breaker execution attempt + }) + } } - defer resp.Body.Close() - b, err := io.ReadAll(resp.Body) - switch { - case err != nil: - log.Errorw("Failed to read the response from push", "err", err) - return err - case resp.StatusCode != http.StatusOK: - log.Errorw("Received non-OK response from push", "err", err) - return fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b)) - default: - log.Debug("Successfully pushed traversed node") - return nil + return nil +} +func retryWithBackoff(ctx context.Context, fn func() error) error { + maxRetries := 5 + baseDelay := 1 * time.Second + + for attempt := 0; attempt < maxRetries; attempt++ { + err := fn() + if err == nil { + return nil + } + delay := baseDelay * time.Duration(math.Pow(2, float64(attempt))) + jitter := time.Duration(rand.Float64() * float64(delay)) + select { + case <-time.After(delay + jitter): + case <-ctx.Done(): + return ctx.Err() + } } + return fmt.Errorf("failed after %d retries", maxRetries) } func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) { @@ -510,7 +594,10 @@ func (e *FxExchange) handlePush(from peer.ID, w http.ResponseWriter, r *http.Req } if err := e.decodeAndStoreBlock(r.Context(), r.Body, cidlink.Link{Cid: cidInPath}); err != nil { http.Error(w, "invalid cid", http.StatusBadRequest) + return } + + w.WriteHeader(http.StatusOK) } func (e *FxExchange) decodeAndStoreBlock(ctx context.Context, r io.ReadCloser, link ipld.Link) error { diff --git a/go.mod b/go.mod index bfcf1c5..3bd77c7 100644 --- a/go.mod +++ b/go.mod @@ -233,6 +233,7 @@ require ( github.com/rs/cors v1.10.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/samber/lo v1.39.0 // indirect + github.com/sony/gobreaker v0.5.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/testify v1.8.4 // indirect github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect diff --git a/go.sum b/go.sum index e9b233d..b73aa33 100644 --- a/go.sum +++ b/go.sum @@ -966,6 +966,8 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9 github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= +github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg= +github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= diff --git a/mobile/config.go b/mobile/config.go index 6381dcc..cb345dd 100644 --- a/mobile/config.go +++ b/mobile/config.go @@ -85,6 +85,7 @@ func NewConfig() *Config { AllowTransientConnection: true, PoolName: "0", BlockchainEndpoint: "api.node3.functionyard.fula.network", + DisableResourceManger: true, } } @@ -96,6 +97,7 @@ func (cfg *Config) init(mc *Client) error { libp2p.EnableRelay(), libp2p.EnableHolePunching(), } + cfg.DisableResourceManger = true if cfg.DisableResourceManger { hopts = append(hopts, libp2p.ResourceManager(&network.NullResourceManager{})) } @@ -196,6 +198,7 @@ func (cfg *Config) init(mc *Client) error { exchange.WithAuthorizer(mc.h.ID()), exchange.WithAllowTransientConnection(cfg.AllowTransientConnection), exchange.WithIpniPublishDisabled(true), + exchange.WithMaxPushRate(50), exchange.WithDhtProviderOptions( dht.Datastore(namespace.Wrap(mc.ds, datastore.NewKey("dht"))), dht.ProtocolExtension(protocol.ID("/"+cfg.PoolName)), @@ -223,9 +226,6 @@ func (cfg *Config) init(mc *Client) error { return err } } - if err != nil { - return err - } } return mc.ex.Start(context.TODO()) } diff --git a/mobile/example_test.go b/mobile/example_test.go index 025d75a..539e99b 100644 --- a/mobile/example_test.go +++ b/mobile/example_test.go @@ -554,7 +554,7 @@ func Example_poolExchangeLargeDagBetweenClientBlox() { defer cancel() // Elevate log level to show internal communications. - if err := logging.SetLogLevel("*", "info"); err != nil { + if err := logging.SetLogLevel("*", "debug"); err != nil { log.Error("Error happened in logging.SetLogLevel") panic(err) } @@ -603,6 +603,9 @@ func Example_poolExchangeLargeDagBetweenClientBlox() { mcfg.BloxAddr = bloxAddrString + "/p2p/" + h1.ID().String() mcfg.PoolName = "1" mcfg.Exchange = bloxAddrString + mcfg.AllowTransientConnection = true + mcfg.DisableResourceManger = false + mcfg.StaticRelays = []string{} mcfg.BlockchainEndpoint = "127.0.0.1:4004" log.Infow("bloxAdd string created", "addr", bloxAddrString+"/p2p/"+h1.ID().String()) @@ -662,12 +665,6 @@ func Example_poolExchangeLargeDagBetweenClientBlox() { links = append(links, linkBytes) chunkedData = append(chunkedData, chunk) - c, err := cid.Cast(linkBytes) - if err != nil { - log.Errorf("Error casting bytes to CID: %v\n", err) - return - } - log.Debugf("Stored chunk link: %s\n", c.String()) } fmt.Printf("Stored %d chunks\n", len(links)) @@ -681,14 +678,6 @@ func Example_poolExchangeLargeDagBetweenClientBlox() { var count = 0 for recentCids.HasNext() { count = count + 1 - cid, err := recentCids.Next() - if err != nil { - fmt.Printf("Error retrieving next CID: %v", err) - log.Errorf("Error retrieving next CID: %v", err) - // Decide if you want to break or continue based on your error handling strategy - break - } - log.Debugf("recentCid link: %s", cid) } log.Infof("recentCids count: %d", count) fmt.Printf("recentCids count: %d\n", count) // Print each CID