From 69155ad56c4109de4f39c467dba07faeab660755 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Sun, 3 Dec 2023 12:31:11 -0500 Subject: [PATCH] Added DHT Provide to Pull (#186) * Added DHT Provide to Pull After Pull the node announces that it can provide the link on dht * Added method to store cid * updated tests * Added storeManifests and tests StoreManifest first calls blockchain to get the avaialble manifests and then fetch them and store them * Added dht to store and get the number of repliactions * corrected a go cehck * Update blox.go * Added Fetch available and Store to a time when blox starts * Update example_test.go * Update example_test.go * Update example_test.go * Adde check to make sure we are not storing an already stored link * Added mechanism to make sure we do not get stuck on limit --- blockchain/bl_manifest.go | 76 ++ blockchain/interface.go | 26 +- blox/blox.go | 159 ++++ blox/example_test.go | 893 ++++++++++------------ exchange/dht.go | 8 +- exchange/example_test.go | 807 +++++++++++++++++++ exchange/fx_exchange.go | 42 +- mobile/blockchain.go | 76 -- exchange/dht_test.go => ping/example_test | 117 +-- 9 files changed, 1561 insertions(+), 643 deletions(-) create mode 100644 exchange/example_test.go rename exchange/dht_test.go => ping/example_test (74%) diff --git a/blockchain/bl_manifest.go b/blockchain/bl_manifest.go index 42a4cf3b..a66c060e 100644 --- a/blockchain/bl_manifest.go +++ b/blockchain/bl_manifest.go @@ -7,7 +7,11 @@ import ( "fmt" "io" "net/http" + "strconv" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" ) @@ -80,6 +84,78 @@ func (bl *FxBlockchain) ManifestStore(ctx context.Context, to peer.ID, r Manifes } } +func (bl *FxBlockchain) HandleManifestBatchStore(ctx context.Context, poolIDString string, links []ipld.Link) ([]string, error) { + var linksString []string + for _, link := range links { + linksString = append(linksString, link.String()) + } + poolID, err := strconv.Atoi(poolIDString) + if err != nil { + // Handle the error if the conversion fails + return nil, fmt.Errorf("invalid topic, not an integer: %s", err) + } + manifestBatchStoreRequest := ManifestBatchStoreRequest{ + PoolID: poolID, + Cid: linksString, + } + + // Call manifestBatchStore method + responseBody, err := bl.callBlockchain(ctx, "POST", actionManifestBatchStore, manifestBatchStoreRequest) + if err != nil { + return nil, err + } + // Interpret the response + var manifestBatchStoreResponse ManifestBatchStoreResponse + if err := json.Unmarshal(responseBody, &manifestBatchStoreResponse); err != nil { + return nil, fmt.Errorf("failed to unmarshal manifestBatchStore response: %w", err) + } + return manifestBatchStoreResponse.Cid, nil + +} +func (bl *FxBlockchain) HandleManifestsAvailable(ctx context.Context, poolIDString string, limit int) ([]LinkWithLimit, error) { + + poolID, err := strconv.Atoi(poolIDString) + if err != nil { + // Handle the error if the conversion fails + return nil, fmt.Errorf("invalid topic, not an integer: %s", err) + } + manifestAvailableRequest := ManifestAvailableRequest{ + PoolID: poolID, + } + + // Call manifestBatchStore method + responseBody, err := bl.callBlockchain(ctx, "POST", actionManifestAvailable, manifestAvailableRequest) + if err != nil { + return nil, err + } + // Interpret the response + var manifestAvailableResponse ManifestAvailableResponse + if err := json.Unmarshal(responseBody, &manifestAvailableResponse); err != nil { + return nil, fmt.Errorf("failed to unmarshal manifestAvailable response: %w", err) + } + + var linksWithLimits []LinkWithLimit + + // Group links by uploader + for _, manifest := range manifestAvailableResponse.Manifests { + c, err := cid.Decode(manifest.ManifestMetadata.Job.Uri) + if err != nil { + return nil, fmt.Errorf("failed to decode CID: %w", err) + } + + if len(linksWithLimits) < limit { + linksWithLimits = append(linksWithLimits, LinkWithLimit{ + Link: cidlink.Link{Cid: c}, + Limit: manifest.ReplicationAvailable, // Assuming you have the replication limit in the manifest + }) + } else { + break + } + } + + return linksWithLimits, nil +} + func (bl *FxBlockchain) ManifestAvailable(ctx context.Context, to peer.ID, r ManifestAvailableRequest) ([]byte, error) { if bl.allowTransientConnection { diff --git a/blockchain/interface.go b/blockchain/interface.go index 15e77825..6f88860e 100644 --- a/blockchain/interface.go +++ b/blockchain/interface.go @@ -5,6 +5,7 @@ import ( "reflect" wifi "github.com/functionland/go-fula/wap/pkg/wifi" + "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p/core/peer" ) @@ -24,6 +25,7 @@ const ( actionPoolLeave = "fula-pool-leave" actionManifestUpload = "fula-manifest-upload" actionManifestStore = "fula-manifest-storage" + actionManifestBatchStore = "fula-manifest-batch_storage" actionManifestAvailable = "fula-manifest-available" actionManifestRemove = "fula-manifest-remove" actionManifestRemoveStorer = "fula-manifest-remove_storer" @@ -37,6 +39,11 @@ const ( actionDeleteFulaConfig = "delete-fula-config" ) +type LinkWithLimit struct { + Link ipld.Link + Limit int +} + type SeededRequest struct { Seed string `json:"seed"` } @@ -210,6 +217,17 @@ type ManifestStoreResponse struct { Cid string `json:"cid"` } +type ManifestBatchStoreRequest struct { + Cid []string `json:"cid"` + PoolID int `json:"pool_id"` +} + +type ManifestBatchStoreResponse struct { + PoolID int `json:"pool_id"` + Storer string `json:"storer"` + Cid []string `json:"cid"` +} + type ManifestAvailableRequest struct { PoolID int `json:"pool_id"` } @@ -220,9 +238,9 @@ type ManifestData struct { } type Manifest struct { - PoolID int `json:"pool_id"` - ReplicationAvailable int `json:"replication_available"` - ManifestData ManifestData `json:"manifest_data"` + PoolID int `json:"pool_id"` + ReplicationAvailable int `json:"replication_available"` + ManifestMetadata ManifestMetadata `json:"manifest_metadata"` } type ManifestAvailableResponse struct { @@ -311,6 +329,7 @@ var requestTypes = map[string]reflect.Type{ actionPoolLeave: reflect.TypeOf(PoolLeaveRequest{}), actionManifestUpload: reflect.TypeOf(ManifestUploadRequest{}), actionManifestStore: reflect.TypeOf(ManifestStoreRequest{}), + actionManifestBatchStore: reflect.TypeOf(ManifestBatchStoreRequest{}), actionManifestAvailable: reflect.TypeOf(ManifestAvailableRequest{}), actionManifestRemove: reflect.TypeOf(ManifestRemoveRequest{}), actionManifestRemoveStorer: reflect.TypeOf(ManifestRemoveStorerRequest{}), @@ -339,6 +358,7 @@ var responseTypes = map[string]reflect.Type{ actionPoolLeave: reflect.TypeOf(PoolLeaveResponse{}), actionManifestUpload: reflect.TypeOf(ManifestUploadResponse{}), actionManifestStore: reflect.TypeOf(ManifestStoreResponse{}), + actionManifestBatchStore: reflect.TypeOf(ManifestBatchStoreResponse{}), actionManifestAvailable: reflect.TypeOf(ManifestAvailableResponse{}), actionManifestRemove: reflect.TypeOf(ManifestRemoveResponse{}), actionManifestRemoveStorer: reflect.TypeOf(ManifestRemoveStorerResponse{}), diff --git a/blox/blox.go b/blox/blox.go index 9813b3db..38d9826a 100644 --- a/blox/blox.go +++ b/blox/blox.go @@ -2,8 +2,12 @@ package blox import ( "context" + "errors" + "fmt" "net/http" + "strconv" "sync" + "time" "github.com/functionland/go-fula/announcements" "github.com/functionland/go-fula/blockchain" @@ -105,6 +109,128 @@ func (p *Blox) PubsubValidator(ctx context.Context, id peer.ID, msg *pubsub.Mess return p.an.ValidateAnnouncement(ctx, id, msg, status, exists) } +func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error { + exists, err := p.Has(ctx, l) + if err != nil { + log.Errorw("And Error happened in checking Has", "err", err) + } + if exists { + log.Warnw("link already exists in datastore", "l", l.String()) + return fmt.Errorf("link already exists in datastore %s", l.String()) + } + providers, err := p.ex.FindProvidersDht(l) + if err != nil { + log.Errorw("And Error happened in StoreCid", "err", err) + return err + } + // Iterate over the providers and ping + for _, provider := range providers { + if provider.ID != p.h.ID() { + log.Debugw("Pinging storer", "peerID", provider.ID) + err := p.ex.PingDht(provider.ID) + if err == nil { + //Found a storer, now pull the cid + //TODO: Ideally this should fetch only the cid itseld or the path that is changed with root below it + //TODO: Ideally we should have a mechanism to reserve the pull requests and keep the pulled+requests to a max of replication factor + log.Debugw("Found a storer", "id", provider.ID) + replicasStr := "" + replicasStr, err = p.ex.SearchValueDht(ctx, l.String()) + if err != nil { + log.Warnw("SearchValue returned an error", "err", err) + } + log.Debugw("SearchValue returned value", "val", replicasStr, "for", l.String()) + replicas := 0 + replicas, err = strconv.Atoi(replicasStr) + if err != nil { + log.Warn(err) + replicas = 0 + } + log.Debugw("Checking replicas vs limit", "replicas", replicas, "limit", limit) + if replicas < limit { + newReplicas := replicas + 1 + p.ex.PutValueDht(ctx, l.String(), strconv.Itoa(newReplicas)) + err = p.ex.Pull(ctx, provider.ID, l) + if err != nil { + log.Errorw("Error happened in pulling from provider", "err", err) + continue + } + return nil + } else { + return fmt.Errorf("limit of %d is reached for %s", limit, l.String()) + } + } + } else { + log.Warnw("provider is the same as requestor", "l", l.String()) + return fmt.Errorf("provider is the same as requestor for link %s", l.String()) + } + } + return errors.New("no provider found") +} + +func (p *Blox) StoreManifest(ctx context.Context, links []blockchain.LinkWithLimit, maxCids int) error { + log.Debugw("StoreManifest", "links", links) + var storedLinks []ipld.Link // Initialize an empty slice for successful storage + + for _, l := range links { + if len(storedLinks) >= maxCids { + break + } + exists, err := p.Has(ctx, l.Link) + if err != nil { + continue + } + if exists { + continue + } + err = p.StoreCid(ctx, l.Link, l.Limit) // Assuming StoreCid is a function that stores the link and returns an error if it fails + if err != nil { + // If there's an error, log it and continue with the next link + log.Errorw("Error storing CID", "link", l, "err", err) + continue // Skip appending this link to the storedLinks slice + } + // Append to storedLinks only if StoreCid is successful + storedLinks = append(storedLinks, l.Link) + } + log.Debugw("StoreManifest", "storedLinks", storedLinks) + // Handle the successfully stored links with the blockchain + if len(storedLinks) > 0 { + _, err := p.bl.HandleManifestBatchStore(ctx, p.topicName, storedLinks) + if err != nil { + log.Errorw("Error happened in storing manifest", "err", err) + return err + } + } + + // If all links failed to store, return an error + if len(storedLinks) == 0 { + return errors.New("all links failed to store") + } + + return nil +} + +// FetchAvailableManifestsAndStore fetches available manifests and stores them. +func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int) error { + // Fetch the available manifests for a specific pool_id + availableLinks, err := p.bl.HandleManifestsAvailable(ctx, p.topicName, maxCids) + if err != nil { + return fmt.Errorf("failed to fetch available manifests: %w", err) + } + log.Debugw("FetchAvailableManifestsAndStore", "availableLinks", availableLinks) + // Check if there are enough manifests available + if len(availableLinks) == 0 { + return errors.New("no available manifests to store") + } + + // Attempt to store the fetched manifests + err = p.StoreManifest(ctx, availableLinks, maxCids) + if err != nil { + return fmt.Errorf("failed to store manifests: %w", err) + } + + return nil +} + func (p *Blox) Start(ctx context.Context) error { // implemented topic validators with chain integration. validator := p.PubsubValidator @@ -154,6 +280,34 @@ func (p *Blox) Start(ctx context.Context) error { } p.ctx, p.cancel = context.WithCancel(context.Background()) + // Starting a new goroutine for periodic task + p.wg.Add(1) + go func() { + defer p.wg.Done() + defer log.Debug("Start blox FetchAvailableManifestsAndStore go routine is ending") + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // This block will execute every 5 minutes + if err := p.FetchAvailableManifestsAndStore(ctx, 10); err != nil { + log.Errorw("Error in FetchAvailableManifestsAndStore", "err", err) + // Handle the error or continue based on your requirement + } + case <-ctx.Done(): + // This will handle the case where the parent context is canceled + log.Info("Stopping periodic FetchAvailableManifestsAndStore due to context cancellation") + return + case <-p.ctx.Done(): + // This will handle the case where the parent context is canceled + log.Info("Stopping periodic FetchAvailableManifestsAndStore due to context cancellation") + return + } + } + }() + return nil } @@ -175,6 +329,11 @@ func (p *Blox) Ping(ctx context.Context, to peer.ID) (int, int, error) { return p.pn.Ping(ctx, to) } +func (p *Blox) PingDht(to peer.ID) error { + //This is for unit testing and no need to call directly + return p.ex.PingDht(to) +} + func (p *Blox) GetBlMembers() map[peer.ID]common.MemberStatus { //This is for unit testing and no need to call directly return p.bl.GetMembers() diff --git a/blox/example_test.go b/blox/example_test.go index 55b47a3b..77d50456 100644 --- a/blox/example_test.go +++ b/blox/example_test.go @@ -11,14 +11,17 @@ import ( "time" "github.com/functionland/go-fula/blox" + "github.com/functionland/go-fula/exchange" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime/codec/dagjson" "github.com/ipld/go-ipld-prime/fluent" "github.com/ipld/go-ipld-prime/node/basicnode" "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/host/autorelay" "github.com/multiformats/go-multiaddr" ) @@ -33,6 +36,8 @@ func requestLoggerMiddleware(next http.Handler) http.Handler { log.Debugw("Received request", "url", r.URL.Path, "method", r.Method, "body", string(body)) if r.URL.Path == "/fula/pool/vote" { fmt.Printf("Voted on QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe %s", string(body)) + } else if r.URL.Path == "/fula/manifest/batch_storage" { + fmt.Printf("Stored manifest: %s", string(body)) } // Create a new io.Reader from the read body as the original body is now drained @@ -136,6 +141,38 @@ func startMockServer(addr string) *http.Server { json.NewEncoder(w).Encode(response) }) + handler.HandleFunc("/fula/manifest/available", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "manifests": []map[string]interface{}{ + { + "pool_id": 1, + "manifest_metadata": map[string]interface{}{ + "job": map[string]string{ + "engine": "IPFS", + "uri": "bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34", + "work": "Storage", + }, + }, + "replication_available": 2, + }, + { + "pool_id": 1, + "manifest_metadata": map[string]interface{}{ + "job": map[string]string{ + "engine": "IPFS", + "uri": "bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy", + "work": "Storage", + }, + }, + "replication_available": 1, + }, + }, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + }) + handler.HandleFunc("/fula/pool/leave", func(w http.ResponseWriter, r *http.Request) { response := map[string]interface{}{ "pool_id": 1, @@ -144,6 +181,34 @@ func startMockServer(addr string) *http.Server { json.NewEncoder(w).Encode(response) }) + handler.HandleFunc("/fula/manifest/batch_storage", func(w http.ResponseWriter, r *http.Request) { + var reqBody struct { + CIDs []string `json:"cid"` + PoolID int `json:"pool_id"` + } + + // Decode the JSON body of the request + if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Make sure to close the request body + defer r.Body.Close() + + // Use the CIDs from the request in the response + response := map[string]interface{}{ + "pool_id": reqBody.PoolID, + "cid": reqBody.CIDs, + } + + // Encode the response as JSON + if err := json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }) + // Wrap the handlers with the logging middleware loggedHandler := requestLoggerMiddleware(handler) @@ -311,9 +376,12 @@ func Example_poolDiscoverPeersViaPubSub() { // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA } -// Example_poolExchangeDagBetweenPoolNodes starts up a pool with 2 nodes, stores a sample DAG in -// one node and fetches it via GraphSync from the other node. -func Example_poolExchangeDagBetweenPoolNodes() { +func updatePoolName(newPoolName string) error { + return nil +} + +/*func Example_announcements() { + fmt.Println("*********test Started******") server := startMockServer("127.0.0.1:4000") defer func() { // Shutdown the server after test @@ -321,16 +389,16 @@ func Example_poolExchangeDagBetweenPoolNodes() { panic(err) // Handle the error as you see fit } }() - + fmt.Println("*********server Started******") const poolName = "1" ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - + fmt.Print("*********HERE1******") // Elevate log level to show internal communications. if err := logging.SetLogLevel("*", "info"); err != nil { panic(err) } - + fmt.Print("*********HERE2******") // Use a deterministic random generator to generate deterministic // output for the example. rng := rand.New(rand.NewSource(42)) @@ -344,10 +412,18 @@ func Example_poolExchangeDagBetweenPoolNodes() { if err != nil { panic(err) } - n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1)) + n1, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h1), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + ) if err != nil { panic(err) } + fmt.Print("*********HERE3******") if err := n1.Start(ctx); err != nil { panic(err) } @@ -363,7 +439,14 @@ func Example_poolExchangeDagBetweenPoolNodes() { if err != nil { panic(err) } - n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2)) + n2, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h2), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + ) if err != nil { panic(err) } @@ -373,112 +456,124 @@ func Example_poolExchangeDagBetweenPoolNodes() { defer n2.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) - // Connect n1 to n2. - h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) - if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { + // Instantiate the third node in the pool + pid3, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { panic(err) } - h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) - if err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { + h3, err := libp2p.New(libp2p.Identity(pid3)) + if err != nil { panic(err) } - - // Authorize exchange between the two nodes - if err := n1.SetAuth(ctx, h1.ID(), h2.ID(), true); err != nil { + n3, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h3), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + ) + if err != nil { panic(err) } - if err := n2.SetAuth(ctx, h2.ID(), h1.ID(), true); err != nil { + if err := n3.Start(ctx); err != nil { panic(err) } + defer n3.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) - // Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1 - n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { - na.AssembleEntry("this").AssignBool(true) - }) - n1leafLink, err := n1.Store(ctx, n1leaf) - if err != nil { + // Connect n1 to n2 and n3 so that there is a path for gossip propagation. + // Note that we are not connecting n2 to n3 as they should discover + // each other via pool's iexist announcements. + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { panic(err) } - n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { - na.AssembleEntry("that").AssignInt(42) - na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink) - }) - n1RootLink, err := n1.Store(ctx, n1Root) - if err != nil { + h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { panic(err) } - fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) - // Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1 - n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { - na.AssembleEntry("that").AssignBool(false) - }) - n2leafLink, err := n2.Store(ctx, n2leaf) + // Wait until the nodes discover each other + for { + if len(h1.Peerstore().Peers()) == 5 && + len(h2.Peerstore().Peers()) == 5 && + len(h3.Peerstore().Peers()) == 5 { + break + } else { + fmt.Printf("%s peerstore contains %d nodes:\n", h1.ID(), len(h1.Peerstore().Peers())) + fmt.Printf("%s peerstore contains %d nodes:\n", h2.ID(), len(h2.Peerstore().Peers())) + fmt.Printf("%s peerstore contains %d nodes:\n", h3.ID(), len(h3.Peerstore().Peers())) + } + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(time.Second) + } + } + + h1Peers := h1.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) + for _, id := range h1Peers { + fmt.Printf("- %s\n", id) + } + + h2Peers := h2.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) + for _, id := range h2Peers { + fmt.Printf("- %s\n", id) + } + + h3Peers := h3.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) + for _, id := range h3Peers { + fmt.Printf("- %s\n", id) + } + + // Instantiate the fourth node not in the pool + pid4, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { panic(err) } - n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { - na.AssembleEntry("this").AssignInt(24) - na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink) - }) - n2RootLink, err := n2.Store(ctx, n2Root) + h4, err := libp2p.New(libp2p.Identity(pid4)) if err != nil { panic(err) } - fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) - - fmt.Println("exchanging by Pull...") - // Pull the sample DAG stored on node 1 from node 2 by only asking for the root link. - // Because fetch implementation is recursive, it should fetch the leaf link too. - if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil { + n4, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h4), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + ) + if err != nil { + panic(err) + } + if err := n4.Start(ctx); err != nil { panic(err) } + defer n4.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h4.ID().String()) - // Assert that n2 now has both root and leaf links - if exists, err := n2.Has(ctx, n1RootLink); err != nil { + //Manually adding h4 as it is not in the same pool + h4.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) + if err = h4.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { panic(err) - } else if !exists { - panic("expected n2 to have fetched the entire sample DAG") - } else { - fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID()) - n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any) - if err != nil { - panic(err) - } - var buf bytes.Buffer - if err := dagjson.Encode(n, &buf); err != nil { - panic(err) - } - fmt.Printf(" content: %s\n", buf.String()) } - if exists, err := n2.Has(ctx, n1leafLink); err != nil { + h4.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + if err = h4.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { panic(err) - } else if !exists { - panic("expected n2 to have fetched the entire sample DAG") - } else { - fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID()) - n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any) - if err != nil { - panic(err) - } - var buf bytes.Buffer - if err := dagjson.Encode(n, &buf); err != nil { - panic(err) - } - fmt.Printf(" content: %s\n", buf.String()) } - - fmt.Println("exchanging by Push...") - // Push the sample DAG stored on node 2 to node 1 by only pushing the root link. - // Because Push implementation is recursive, it should push the leaf link too. - if err := n2.Push(ctx, h1.ID(), n2RootLink); err != nil { + h4.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) + if err = h4.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { panic(err) } - // Since push is an asynchronous operation, wait until background push is finished - // by periodically checking if link is present on node 1. + // Wait until the fourth node discover others for { - if exists, _ := n1.Has(ctx, n2RootLink); exists { + if len(h4.Peerstore().Peers()) == 5 { break } select { @@ -489,74 +584,70 @@ func Example_poolExchangeDagBetweenPoolNodes() { } } - // Assert that n1 now has both root and leaf links - if exists, err := n1.Has(ctx, n2RootLink); err != nil { - panic(err) - } else if !exists { - panic("expected n2 to have pushed the entire sample DAG") - } else { - fmt.Printf("%s successfully pushed:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID()) - n, err := n1.Load(ctx, n2RootLink, basicnode.Prototype.Any) - if err != nil { - panic(err) + h4Peers := h4.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h4.ID(), len(h4Peers)) + for _, id := range h4Peers { + fmt.Printf("- %s\n", id) + } + + n4.AnnounceJoinPoolRequestPeriodically(ctx) + + // Wait until the fourth node discover others + for { + members := n4.GetBlMembers() + if len(members) == 4 { + for id, status := range members { + memberInfo := fmt.Sprintf("Member ID: %s, Status: %v", id.String(), status) + fmt.Println(memberInfo) + } + break + } else { + fmt.Println(members) } - var buf bytes.Buffer - if err := dagjson.Encode(n, &buf); err != nil { - panic(err) + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(2 * time.Second) } - fmt.Printf(" content: %s\n", buf.String()) - } - if exists, err := n1.Has(ctx, n2leafLink); err != nil { - panic(err) - } else if !exists { - panic("expected n2 to have pushed the entire sample DAG") - } else { - fmt.Printf("%s successfully pushed:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID()) - n, err := n1.Load(ctx, n2leafLink, basicnode.Prototype.Any) - if err != nil { - panic(err) - } - var buf bytes.Buffer - if err := dagjson.Encode(n, &buf); err != nil { - panic(err) - } - fmt.Printf(" content: %s\n", buf.String()) } - // Output: - // Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // Unordered output: + //Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT // Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - // QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links: - // root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy - // leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 - // QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links: - // root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy - // leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 - // exchanging by Pull... - // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched: - // link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy - // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // content: {"oneLeafLink":{"/":"bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"},"that":42} - // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched: - // link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 - // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // content: {"this":true} - // exchanging by Push... - // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed: - // link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy - // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // content: {"anotherLeafLink":{"/":"bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4"},"this":24} - // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed: - // link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 - // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // content: {"that":false} -} - -func updatePoolName(newPoolName string) error { - return nil -} + // Instantiated node in pool 1 with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // Finally QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 5 nodes: + // - 12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835 + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 5 nodes: + // - 12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835 + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 5 nodes: + // - 12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835 + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Instantiated node in pool 1 with ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe peerstore contains 5 nodes: + // - 12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835 + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Member ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT, Status: 2 + // Member ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF, Status: 2 + // Member ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA, Status: 2 + // Member ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe, Status: 1 +}*/ -func Example_ping() { +func Example_storeManifest() { server := startMockServer("127.0.0.1:4000") defer func() { // Shutdown the server after test @@ -570,7 +661,7 @@ func Example_ping() { defer cancel() // Elevate log level to show internal communications. - if err := logging.SetLogLevel("*", "info"); err != nil { + if err := logging.SetLogLevel("*", "debug"); err != nil { panic(err) } @@ -593,6 +684,15 @@ func Example_ping() { blox.WithHost(h1), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), ) if err != nil { panic(err) @@ -618,6 +718,15 @@ func Example_ping() { blox.WithHost(h2), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), ) if err != nil { panic(err) @@ -643,6 +752,15 @@ func Example_ping() { blox.WithHost(h3), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), ) if err != nil { panic(err) @@ -653,7 +771,7 @@ func Example_ping() { defer n3.Shutdown(ctx) fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) - // Instantiate the fourth node not in the pool + // Instantiate the third node in the pool pid4, _, err := crypto.GenerateECDSAKeyPair(rng) if err != nil { panic(err) @@ -663,11 +781,20 @@ func Example_ping() { panic(err) } n4, err := blox.New( - blox.WithPoolName("0"), - blox.WithTopicName("0"), + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), blox.WithHost(h4), blox.WithUpdatePoolName(updatePoolName), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithPingCount(5), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), ) if err != nil { panic(err) @@ -676,72 +803,31 @@ func Example_ping() { panic(err) } defer n4.Shutdown(ctx) - fmt.Printf("Instantiated node in pool %s with ID: %s\n", "0", h4.ID().String()) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h4.ID().String()) - // Connect n1 to n2 and n3 so that there is a path for gossip propagation. - // Note that we are not connecting n2 to n3 as they should discover - // each other via pool's iexist announcements. + // Connect n1 to n2. h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { panic(err) } - h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) - if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { - panic(err) - } - - // Wait until the nodes discover each other - for { - if len(h1.Peerstore().Peers()) == 4 && - len(h2.Peerstore().Peers()) == 4 && - len(h3.Peerstore().Peers()) == 4 { - break - } - select { - case <-ctx.Done(): - panic(ctx.Err()) - default: - time.Sleep(time.Second) - } - } - - h1Peers := h1.Peerstore().Peers() - fmt.Printf("Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) - for _, id := range h1Peers { - fmt.Printf("- %s\n", id) - } - - h2Peers := h2.Peerstore().Peers() - fmt.Printf("Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) - for _, id := range h2Peers { - fmt.Printf("- %s\n", id) - } - - h3Peers := h3.Peerstore().Peers() - fmt.Printf("Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) - for _, id := range h3Peers { - fmt.Printf("- %s\n", id) - } - - //Manually adding h4 as it is not in the same pool - h1.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) - if err = h1.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) + if err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { panic(err) } - //Manually adding h4 as it is not in the same pool - h2.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) - if err = h2.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + h3.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) + if err = h3.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { panic(err) } - //Manually adding h4 as it is not in the same pool - h3.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) - if err = h3.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + h4.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) + if err = h4.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { panic(err) } - // Wait until the fourth node discover others for { - if len(h4.Peerstore().Peers()) >= 4 { + if len(h1.Peerstore().Peers()) >= 3 && + len(h2.Peerstore().Peers()) >= 3 && + len(h3.Peerstore().Peers()) >= 3 && + len(h4.Peerstore().Peers()) >= 3 { break } select { @@ -752,190 +838,103 @@ func Example_ping() { } } - if err = n4.StartPingServer(ctx); err != nil { + // Authorize exchange between the two nodes + if err := n1.SetAuth(ctx, h1.ID(), h2.ID(), true); err != nil { panic(err) - } else { - fmt.Print("Node 4 ping server started\n") } - - average, rate, err := n1.Ping(ctx, h4.ID()) - if err != nil { - fmt.Println("Error occured in Ping", "err", err) + if err := n2.SetAuth(ctx, h2.ID(), h1.ID(), true); err != nil { panic(err) } - fmt.Printf("%s ping results success_count: %d:\n", h1.ID(), rate) - if average < 0 { - panic("average is 0 for first ping") - } - average, rate, err = n2.Ping(ctx, h4.ID()) + // Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1 + n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { + na.AssembleEntry("this").AssignBool(true) + }) + n1leafLink, err := n1.Store(ctx, n1leaf) if err != nil { panic(err) } - fmt.Printf("%s ping results success_count: %d:\n", h2.ID(), rate) - if average < 0 { - panic("average is 0 for second ping") - } - - average, rate, err = n3.Ping(ctx, h4.ID()) + n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { + na.AssembleEntry("that").AssignInt(42) + na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink) + }) + n1RootLink, err := n1.Store(ctx, n1Root) if err != nil { panic(err) } - fmt.Printf("%s ping results success_count: %d:", h3.ID(), rate) - if average < 0 { - panic("average is 0 for third ping") - } - - // Unordered output: - // Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - // Instantiated node in pool 1 with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // Instantiated node in pool 0 with ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe - // Finally QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 4 nodes: - // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe - // Finally QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 4 nodes: - // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe - // Finally QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 4 nodes: - // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe - // Node 4 ping server started - // QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT ping results success_count: 5: - // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF ping results success_count: 5: - // QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA ping results success_count: 5: -} - -/*func Example_announcements() { - fmt.Println("*********test Started******") - server := startMockServer("127.0.0.1:4000") - defer func() { - // Shutdown the server after test - if err := server.Shutdown(context.Background()); err != nil { - panic(err) // Handle the error as you see fit - } - }() - fmt.Println("*********server Started******") - const poolName = "1" - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - fmt.Print("*********HERE1******") - // Elevate log level to show internal communications. - if err := logging.SetLogLevel("*", "info"); err != nil { - panic(err) - } - fmt.Print("*********HERE2******") - // Use a deterministic random generator to generate deterministic - // output for the example. - rng := rand.New(rand.NewSource(42)) + fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) - // Instantiate the first node in the pool - pid1, _, err := crypto.GenerateECDSAKeyPair(rng) - if err != nil { - panic(err) - } - h1, err := libp2p.New(libp2p.Identity(pid1)) + // Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1 + n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { + na.AssembleEntry("that").AssignBool(false) + }) + n2leafLink, err := n2.Store(ctx, n2leaf) if err != nil { panic(err) } - n1, err := blox.New( - blox.WithPoolName(poolName), - blox.WithTopicName(poolName), - blox.WithHost(h1), - blox.WithUpdatePoolName(updatePoolName), - blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), - blox.WithPingCount(5), - ) + n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { + na.AssembleEntry("this").AssignInt(24) + na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink) + }) + n2RootLink, err := n2.Store(ctx, n2Root) if err != nil { panic(err) } - fmt.Print("*********HERE3******") - if err := n1.Start(ctx); err != nil { - panic(err) - } - defer n1.Shutdown(ctx) - fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) + fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) - // Instantiate the second node in the pool - pid2, _, err := crypto.GenerateECDSAKeyPair(rng) - if err != nil { - panic(err) - } - h2, err := libp2p.New(libp2p.Identity(pid2)) - if err != nil { - panic(err) - } - n2, err := blox.New( - blox.WithPoolName(poolName), - blox.WithTopicName(poolName), - blox.WithHost(h2), - blox.WithUpdatePoolName(updatePoolName), - blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), - blox.WithPingCount(5), - ) - if err != nil { - panic(err) - } - if err := n2.Start(ctx); err != nil { + fmt.Println("exchanging by Pull...") + // Pull the sample DAG stored on node 1 from node 2 by only asking for the root link. + // Because fetch implementation is recursive, it should fetch the leaf link too. + if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil { panic(err) } - defer n2.Shutdown(ctx) - fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) - // Instantiate the third node in the pool - pid3, _, err := crypto.GenerateECDSAKeyPair(rng) - if err != nil { - panic(err) - } - h3, err := libp2p.New(libp2p.Identity(pid3)) - if err != nil { - panic(err) - } - n3, err := blox.New( - blox.WithPoolName(poolName), - blox.WithTopicName(poolName), - blox.WithHost(h3), - blox.WithUpdatePoolName(updatePoolName), - blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), - blox.WithPingCount(5), - ) - if err != nil { + // Assert that n2 now has both root and leaf links + if exists, err := n2.Has(ctx, n1RootLink); err != nil { panic(err) + } else if !exists { + panic("expected n2 to have fetched the entire sample DAG") + } else { + fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID()) + n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any) + if err != nil { + panic(err) + } + var buf bytes.Buffer + if err := dagjson.Encode(n, &buf); err != nil { + panic(err) + } + fmt.Printf(" content: %s\n", buf.String()) } - if err := n3.Start(ctx); err != nil { + if exists, err := n2.Has(ctx, n1leafLink); err != nil { panic(err) + } else if !exists { + panic("expected n2 to have fetched the entire sample DAG") + } else { + fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID()) + n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any) + if err != nil { + panic(err) + } + var buf bytes.Buffer + if err := dagjson.Encode(n, &buf); err != nil { + panic(err) + } + fmt.Printf(" content: %s\n", buf.String()) } - defer n3.Shutdown(ctx) - fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) - // Connect n1 to n2 and n3 so that there is a path for gossip propagation. - // Note that we are not connecting n2 to n3 as they should discover - // each other via pool's iexist announcements. - h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) - if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { - panic(err) - } - h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) - if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { + fmt.Println("exchanging by Push...") + // Push the sample DAG stored on node 2 to node 1 by only pushing the root link. + // Because Push implementation is recursive, it should push the leaf link too. + if err := n2.Push(ctx, h1.ID(), n2RootLink); err != nil { panic(err) } - // Wait until the nodes discover each other + // Since push is an asynchronous operation, wait until background push is finished + // by periodically checking if link is present on node 1. for { - if len(h1.Peerstore().Peers()) == 5 && - len(h2.Peerstore().Peers()) == 5 && - len(h3.Peerstore().Peers()) == 5 { + if exists, _ := n1.Has(ctx, n2RootLink); exists { break - } else { - fmt.Printf("%s peerstore contains %d nodes:\n", h1.ID(), len(h1.Peerstore().Peers())) - fmt.Printf("%s peerstore contains %d nodes:\n", h2.ID(), len(h2.Peerstore().Peers())) - fmt.Printf("%s peerstore contains %d nodes:\n", h3.ID(), len(h3.Peerstore().Peers())) } select { case <-ctx.Done(): @@ -945,139 +944,87 @@ func Example_ping() { } } - h1Peers := h1.Peerstore().Peers() - fmt.Printf("Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) - for _, id := range h1Peers { - fmt.Printf("- %s\n", id) - } - - h2Peers := h2.Peerstore().Peers() - fmt.Printf("Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) - for _, id := range h2Peers { - fmt.Printf("- %s\n", id) - } - - h3Peers := h3.Peerstore().Peers() - fmt.Printf("Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) - for _, id := range h3Peers { - fmt.Printf("- %s\n", id) - } - - // Instantiate the fourth node not in the pool - pid4, _, err := crypto.GenerateECDSAKeyPair(rng) - if err != nil { + // Assert that n1 now has both root and leaf links + if exists, err := n1.Has(ctx, n2RootLink); err != nil { panic(err) + } else if !exists { + panic("expected n2 to have pushed the entire sample DAG") + } else { + fmt.Printf("%s successfully pushed:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID()) + n, err := n1.Load(ctx, n2RootLink, basicnode.Prototype.Any) + if err != nil { + panic(err) + } + var buf bytes.Buffer + if err := dagjson.Encode(n, &buf); err != nil { + panic(err) + } + fmt.Printf(" content: %s\n", buf.String()) } - h4, err := libp2p.New(libp2p.Identity(pid4)) - if err != nil { + if exists, err := n1.Has(ctx, n2leafLink); err != nil { panic(err) + } else if !exists { + panic("expected n2 to have pushed the entire sample DAG") + } else { + fmt.Printf("%s successfully pushed:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID()) + n, err := n1.Load(ctx, n2leafLink, basicnode.Prototype.Any) + if err != nil { + panic(err) + } + var buf bytes.Buffer + if err := dagjson.Encode(n, &buf); err != nil { + panic(err) + } + fmt.Printf(" content: %s\n", buf.String()) } - n4, err := blox.New( - blox.WithPoolName(poolName), - blox.WithTopicName(poolName), - blox.WithHost(h4), - blox.WithUpdatePoolName(updatePoolName), - blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), - blox.WithPingCount(5), - ) + peerlist3, err := n3.FindLinkProvidersByDht(n2leafLink) if err != nil { + fmt.Print("Error happened in FindLinkProvidersByDht3") panic(err) } - if err := n4.Start(ctx); err != nil { - panic(err) - } - defer n4.Shutdown(ctx) - fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h4.ID().String()) - - //Manually adding h4 as it is not in the same pool - h4.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) - if err = h4.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { - panic(err) - } - h4.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) - if err = h4.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { - panic(err) - } - h4.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) - if err = h4.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { - panic(err) - } - - // Wait until the fourth node discover others - for { - if len(h4.Peerstore().Peers()) == 5 { - break - } - select { - case <-ctx.Done(): - panic(ctx.Err()) - default: - time.Sleep(time.Second) - } - } - h4Peers := h4.Peerstore().Peers() - fmt.Printf("Finally %s peerstore contains %d nodes:\n", h4.ID(), len(h4Peers)) - for _, id := range h4Peers { - fmt.Printf("- %s\n", id) + // Iterate over the slice and print the peer ID of each AddrInfo + for _, addrInfo := range peerlist3 { + fmt.Printf("Found %s on %s\n", n2leafLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string } - n4.AnnounceJoinPoolRequestPeriodically(ctx) + n3.FetchAvailableManifestsAndStore(ctx, 2) + time.Sleep(1 * time.Second) + n4.FetchAvailableManifestsAndStore(ctx, 2) - // Wait until the fourth node discover others - for { - members := n4.GetBlMembers() - if len(members) == 4 { - for id, status := range members { - memberInfo := fmt.Sprintf("Member ID: %s, Status: %v", id.String(), status) - fmt.Println(memberInfo) - } - break - } else { - fmt.Println(members) - } - select { - case <-ctx.Done(): - panic(ctx.Err()) - default: - time.Sleep(2 * time.Second) - } - } - - // Unordered output: - //Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // Output: + // Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT // Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF // Instantiated node in pool 1 with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // Finally QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 5 nodes: - // - 12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835 - // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe - // Finally QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 5 nodes: - // - 12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835 - // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe - // Finally QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 5 nodes: - // - 12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835 - // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe // Instantiated node in pool 1 with ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe - // Finally QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe peerstore contains 5 nodes: - // - 12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835 - // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF - // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA - // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT - // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe - // Member ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT, Status: 2 - // Member ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF, Status: 2 - // Member ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA, Status: 2 - // Member ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe, Status: 1 -}*/ + // QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links: + // root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy + // leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 + // QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links: + // root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy + // leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 + // exchanging by Pull... + // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched: + // link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy + // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // content: {"oneLeafLink":{"/":"bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"},"that":42} + // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched: + // link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 + // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // content: {"this":true} + // exchanging by Push... + // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed: + // link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy + // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // content: {"anotherLeafLink":{"/":"bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4"},"this":24} + // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed: + // link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 + // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // content: {"that":false} + // Found bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4 on QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34","bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy"],"pool_id":1} + // Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34","bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy"],"pool_id":1} +} type Config struct { StaticRelays []string diff --git a/exchange/dht.go b/exchange/dht.go index 0ec21a31..103ca936 100644 --- a/exchange/dht.go +++ b/exchange/dht.go @@ -36,10 +36,14 @@ func newDhtProvider(h host.Host, opts *options) (*fulaDht, error) { return d, nil } +func (d *fulaDht) PingDht(p peer.ID) error { + err := d.dh.Ping(d.ctx, p) + return err +} + func (d *fulaDht) AddPeer(p peer.ID) { d.dh.RoutingTable().PeerAdded(p) - err := d.dh.Ping(d.ctx, p) - log.Infow("AddPeer", "self", d.dh.PeerID(), "added", p, "err", err) + log.Infow("AddPeer", "self", d.dh.PeerID(), "added", p) d.dh.RefreshRoutingTable() } diff --git a/exchange/example_test.go b/exchange/example_test.go new file mode 100644 index 00000000..b84c3035 --- /dev/null +++ b/exchange/example_test.go @@ -0,0 +1,807 @@ +package exchange_test + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math/rand" + "net/http" + "time" + + "github.com/functionland/go-fula/blox" + "github.com/functionland/go-fula/exchange" + logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime/codec/dagjson" + "github.com/ipld/go-ipld-prime/fluent" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/protocol" +) + +var log = logging.Logger("fula/dhttest") + +// requestLoggerMiddleware logs the details of each request +func requestLoggerMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Log the request + body, _ := io.ReadAll(r.Body) + log.Debugw("Received request", "url", r.URL.Path, "method", r.Method, "body", string(body)) + if r.URL.Path == "/fula/pool/vote" { + fmt.Printf("Voted on QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe %s", string(body)) + } + + // Create a new io.Reader from the read body as the original body is now drained + r.Body = io.NopCloser(bytes.NewBuffer(body)) + + // Call the next handler + next.ServeHTTP(w, r) + }) +} +func startMockServer(addr string) *http.Server { + handler := http.NewServeMux() + + handler.HandleFunc("/fula/pool/join", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/cancel_join", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/poolrequests", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "poolrequests": []map[string]interface{}{ + { + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + "voted": []string{}, + "positive_votes": 0, + "peer_id": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/all", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pools": []map[string]interface{}{ + { + "pool_id": 1, + "creator": "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY", + "pool_name": "PoolTest1", + "region": "Ontario", + "parent": nil, + "participants": []string{ + "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + }, + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/users", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "users": []map[string]interface{}{ + { + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + "pool_id": nil, + "request_pool_id": 1, + "peer_id": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + }, + { + "account": "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + }, + { + "account": "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + }, + { + "account": "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/vote", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/leave", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + // Wrap the handlers with the logging middleware + loggedHandler := requestLoggerMiddleware(handler) + + // Create an HTTP server + server := &http.Server{ + Addr: addr, + Handler: loggedHandler, + } + // Start the server in a new goroutine + go func() { + if err := server.ListenAndServe(); err != http.ErrServerClosed { + panic(err) // Handle the error as you see fit + } + }() + + // Give the server a moment to start + time.Sleep(time.Millisecond * 100) + + return server +} + +func updatePoolName(newPoolName string) error { + return nil +} + +func Example_provideAfterPull() { + server := startMockServer("127.0.0.1:4001") + defer func() { + // Shutdown the server after test + if err := server.Shutdown(context.Background()); err != nil { + panic(err) // Handle the error as you see fit + } + }() + + const poolName = "1" + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Elevate log level to show internal communications. + if err := logging.SetLogLevel("*", "info"); err != nil { + panic(err) + } + + // Use a deterministic random generator to generate deterministic + // output for the example. + rng := rand.New(rand.NewSource(42)) + + // Instantiate the first node in the pool + pid1, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h1, err := libp2p.New(libp2p.Identity(pid1)) + if err != nil { + panic(err) + } + n1, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h1), + blox.WithUpdatePoolName(updatePoolName), + blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n1.Start(ctx); err != nil { + panic(err) + } + defer n1.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) + + // Instantiate the second node in the pool + pid2, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h2, err := libp2p.New(libp2p.Identity(pid2)) + if err != nil { + panic(err) + } + n2, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h2), + blox.WithUpdatePoolName(updatePoolName), + blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n2.Start(ctx); err != nil { + panic(err) + } + defer n2.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) + + // Instantiate the third node in the pool + pid3, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h3, err := libp2p.New(libp2p.Identity(pid3)) + if err != nil { + panic(err) + } + n3, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h3), + blox.WithUpdatePoolName(updatePoolName), + blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n3.Start(ctx); err != nil { + panic(err) + } + defer n3.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) + + // Instantiate the fourth node not in the pool + pid4, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h4, err := libp2p.New(libp2p.Identity(pid4)) + if err != nil { + panic(err) + } + n4, err := blox.New( + blox.WithPoolName("0"), + blox.WithTopicName("0"), + blox.WithHost(h4), + blox.WithUpdatePoolName(updatePoolName), + blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n4.Start(ctx); err != nil { + panic(err) + } + defer n4.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", "0", h4.ID().String()) + + // Connect n1 to n2 and n3 so that there is a path for gossip propagation. + // Note that we are not connecting n2 to n3 as they should discover + // each other via pool's iexist announcements. + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { + panic(err) + } + h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { + panic(err) + } + + // Wait until the nodes discover each other + for { + if len(h1.Peerstore().Peers()) == 4 && + len(h2.Peerstore().Peers()) == 4 && + len(h3.Peerstore().Peers()) == 4 { + break + } + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(time.Second) + } + } + + h1Peers := h1.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) + for _, id := range h1Peers { + fmt.Printf("- %s\n", id) + } + + h2Peers := h2.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) + for _, id := range h2Peers { + fmt.Printf("- %s\n", id) + } + + h3Peers := h3.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) + for _, id := range h3Peers { + fmt.Printf("- %s\n", id) + } + + //Manually adding h4 as it is not in the same pool + h1.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + panic(err) + } + //Manually adding h4 as it is not in the same pool + h2.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) + if err = h2.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + panic(err) + } + //Manually adding h4 as it is not in the same pool + h3.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) + if err = h3.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + panic(err) + } + + // Wait until the fourth node discover others + for { + if len(h4.Peerstore().Peers()) >= 4 { + break + } else { + fmt.Printf("%s peerstore contains %d nodes:\n", h4.ID(), len(h4.Peerstore().Peers())) + } + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(time.Second) + } + } + + //Store a link in h1 and find providers from h2 + + // Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1 + n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { + na.AssembleEntry("this").AssignBool(true) + }) + n1leafLink, err := n1.Store(ctx, n1leaf) + if err != nil { + panic(err) + } + n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { + na.AssembleEntry("that").AssignInt(42) + na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink) + }) + n1RootLink, err := n1.Store(ctx, n1Root) + if err != nil { + panic(err) + } + fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) + + // Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1 + n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { + na.AssembleEntry("that").AssignBool(false) + }) + n2leafLink, err := n2.Store(ctx, n2leaf) + if err != nil { + panic(err) + } + n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { + na.AssembleEntry("this").AssignInt(24) + na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink) + }) + n2RootLink, err := n2.Store(ctx, n2Root) + if err != nil { + panic(err) + } + fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n2RootLink, n2leafLink) + + //n1.UpdateDhtPeers(h2.Peerstore().Peers()) + //n2.UpdateDhtPeers(h1.Peerstore().Peers()) + + err = n1.ProvideLinkByDht(n1RootLink) + if err != nil { + fmt.Print("Error happened in ProvideLinkByDht") + panic(err) + } + peerlist1, err := n2.FindLinkProvidersByDht(n1RootLink) + if err != nil { + fmt.Print("Error happened in FindLinkProvidersByDht") + panic(err) + } + // Iterate over the slice and print the peer ID of each AddrInfo + for _, addrInfo := range peerlist1 { + fmt.Printf("Found %s on %s\n", n1RootLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string + } + + err = n1.PingDht(h3.ID()) + if err != nil { + fmt.Print("Error happened in PingDht") + panic(err) + } + + fmt.Println("exchanging by Pull...") + // Pull the sample DAG stored on node 1 from node 2 by only asking for the root link. + // Because fetch implementation is recursive, it should fetch the leaf link too. + if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil { + panic(err) + } + + // Assert that n2 now has both root and leaf links + if exists, err := n2.Has(ctx, n1RootLink); err != nil { + panic(err) + } else if !exists { + panic("expected n2 to have fetched the entire sample DAG") + } else { + fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID()) + n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any) + if err != nil { + panic(err) + } + var buf bytes.Buffer + if err := dagjson.Encode(n, &buf); err != nil { + panic(err) + } + fmt.Printf(" content: %s\n", buf.String()) + } + if exists, err := n2.Has(ctx, n1leafLink); err != nil { + panic(err) + } else if !exists { + panic("expected n2 to have fetched the entire sample DAG") + } else { + fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID()) + n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any) + if err != nil { + panic(err) + } + var buf bytes.Buffer + if err := dagjson.Encode(n, &buf); err != nil { + panic(err) + } + fmt.Printf(" content: %s\n", buf.String()) + } + + fmt.Println("exchanging by Push...") + // Push the sample DAG stored on node 2 to node 1 by only pushing the root link. + // Because Push implementation is recursive, it should push the leaf link too. + if err := n2.Push(ctx, h1.ID(), n2leafLink); err != nil { + panic(err) + } + + peerlist3, err := n3.FindLinkProvidersByDht(n2leafLink) + if err != nil { + fmt.Print("Error happened in FindLinkProvidersByDht3") + panic(err) + } + + // Iterate over the slice and print the peer ID of each AddrInfo + for _, addrInfo := range peerlist3 { + fmt.Printf("Found %s on %s", n2leafLink, addrInfo.ID.String()) // ID.String() converts the peer ID to a string + } + + // Unordered output: + // Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // Instantiated node in pool 1 with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // Instantiated node in pool 0 with ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 4 nodes: + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 4 nodes: + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // Finally QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 4 nodes: + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links: + // root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy + // leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 + // QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links: + // root: bafyreiekrzm6lpylw7hhrgvmzqfsek7og6ucqgpzns3ysbc5wj3imfrsge + // leaf:bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4 + // Found bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy on QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // exchanging by Pull... + // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched: + // link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy + // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // content: {"oneLeafLink":{"/":"bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"},"that":42} + // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched: + // link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 + // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // content: {"this":true} + // exchanging by Push... + // Found bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4 on QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT +} + +// Example_poolExchangeDagBetweenPoolNodes starts up a pool with 2 nodes, stores a sample DAG in +// one node and fetches it via GraphSync from the other node. +func Example_poolExchangeDagBetweenPoolNodes() { + server := startMockServer("127.0.0.1:4001") + defer func() { + // Shutdown the server after test + if err := server.Shutdown(context.Background()); err != nil { + panic(err) // Handle the error as you see fit + } + }() + + const poolName = "1" + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Elevate log level to show internal communications. + if err := logging.SetLogLevel("*", "info"); err != nil { + panic(err) + } + + // Use a deterministic random generator to generate deterministic + // output for the example. + rng := rand.New(rand.NewSource(42)) + + // Instantiate the first node in the pool + pid1, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h1, err := libp2p.New(libp2p.Identity(pid1)) + if err != nil { + panic(err) + } + n1, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h1)) + if err != nil { + panic(err) + } + if err := n1.Start(ctx); err != nil { + panic(err) + } + defer n1.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) + + // Instantiate the second node in the pool + pid2, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h2, err := libp2p.New(libp2p.Identity(pid2)) + if err != nil { + panic(err) + } + n2, err := blox.New(blox.WithPoolName(poolName), blox.WithHost(h2)) + if err != nil { + panic(err) + } + if err := n2.Start(ctx); err != nil { + panic(err) + } + defer n2.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) + + // Connect n1 to n2. + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { + panic(err) + } + h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) + if err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil { + panic(err) + } + + // Authorize exchange between the two nodes + if err := n1.SetAuth(ctx, h1.ID(), h2.ID(), true); err != nil { + panic(err) + } + if err := n2.SetAuth(ctx, h2.ID(), h1.ID(), true); err != nil { + panic(err) + } + + // Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1 + n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { + na.AssembleEntry("this").AssignBool(true) + }) + n1leafLink, err := n1.Store(ctx, n1leaf) + if err != nil { + panic(err) + } + n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { + na.AssembleEntry("that").AssignInt(42) + na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink) + }) + n1RootLink, err := n1.Store(ctx, n1Root) + if err != nil { + panic(err) + } + fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) + + // Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1 + n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { + na.AssembleEntry("that").AssignBool(false) + }) + n2leafLink, err := n2.Store(ctx, n2leaf) + if err != nil { + panic(err) + } + n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { + na.AssembleEntry("this").AssignInt(24) + na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink) + }) + n2RootLink, err := n2.Store(ctx, n2Root) + if err != nil { + panic(err) + } + fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) + + fmt.Println("exchanging by Pull...") + // Pull the sample DAG stored on node 1 from node 2 by only asking for the root link. + // Because fetch implementation is recursive, it should fetch the leaf link too. + if err := n2.Pull(ctx, h1.ID(), n1RootLink); err != nil { + panic(err) + } + + // Assert that n2 now has both root and leaf links + if exists, err := n2.Has(ctx, n1RootLink); err != nil { + panic(err) + } else if !exists { + panic("expected n2 to have fetched the entire sample DAG") + } else { + fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID()) + n, err := n2.Load(ctx, n1RootLink, basicnode.Prototype.Any) + if err != nil { + panic(err) + } + var buf bytes.Buffer + if err := dagjson.Encode(n, &buf); err != nil { + panic(err) + } + fmt.Printf(" content: %s\n", buf.String()) + } + if exists, err := n2.Has(ctx, n1leafLink); err != nil { + panic(err) + } else if !exists { + panic("expected n2 to have fetched the entire sample DAG") + } else { + fmt.Printf("%s successfully fetched:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID()) + n, err := n2.Load(ctx, n1leafLink, basicnode.Prototype.Any) + if err != nil { + panic(err) + } + var buf bytes.Buffer + if err := dagjson.Encode(n, &buf); err != nil { + panic(err) + } + fmt.Printf(" content: %s\n", buf.String()) + } + + fmt.Println("exchanging by Push...") + // Push the sample DAG stored on node 2 to node 1 by only pushing the root link. + // Because Push implementation is recursive, it should push the leaf link too. + if err := n2.Push(ctx, h1.ID(), n2RootLink); err != nil { + panic(err) + } + + // Since push is an asynchronous operation, wait until background push is finished + // by periodically checking if link is present on node 1. + for { + if exists, _ := n1.Has(ctx, n2RootLink); exists { + break + } + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(time.Second) + } + } + + // Assert that n1 now has both root and leaf links + if exists, err := n1.Has(ctx, n2RootLink); err != nil { + panic(err) + } else if !exists { + panic("expected n2 to have pushed the entire sample DAG") + } else { + fmt.Printf("%s successfully pushed:\n link: %s\n from %s\n", h2.ID(), n1RootLink, h1.ID()) + n, err := n1.Load(ctx, n2RootLink, basicnode.Prototype.Any) + if err != nil { + panic(err) + } + var buf bytes.Buffer + if err := dagjson.Encode(n, &buf); err != nil { + panic(err) + } + fmt.Printf(" content: %s\n", buf.String()) + } + if exists, err := n1.Has(ctx, n2leafLink); err != nil { + panic(err) + } else if !exists { + panic("expected n2 to have pushed the entire sample DAG") + } else { + fmt.Printf("%s successfully pushed:\n link: %s\n from %s\n", h2.ID(), n1leafLink, h1.ID()) + n, err := n1.Load(ctx, n2leafLink, basicnode.Prototype.Any) + if err != nil { + panic(err) + } + var buf bytes.Buffer + if err := dagjson.Encode(n, &buf); err != nil { + panic(err) + } + fmt.Printf(" content: %s\n", buf.String()) + } + + // Output: + // Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links: + // root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy + // leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 + // QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT stored IPLD data with links: + // root: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy + // leaf:bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 + // exchanging by Pull... + // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched: + // link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy + // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // content: {"oneLeafLink":{"/":"bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"},"that":42} + // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully fetched: + // link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 + // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // content: {"this":true} + // exchanging by Push... + // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed: + // link: bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy + // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // content: {"anotherLeafLink":{"/":"bafyreibzxn3zdk6e53h7cvx2sfbbroozp5e3kuvz6t4jfo2hfu4ic2ooc4"},"this":24} + // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF successfully pushed: + // link: bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34 + // from QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // content: {"that":false} +} diff --git a/exchange/fx_exchange.go b/exchange/fx_exchange.go index 83c24e3a..7bffc66a 100644 --- a/exchange/fx_exchange.go +++ b/exchange/fx_exchange.go @@ -121,16 +121,40 @@ func (e *FxExchange) GetAuth(ctx context.Context) (peer.ID, error) { return e.authorizer, nil } -// The below method is exposed for unit testing only func (e *FxExchange) ProvideDht(l ipld.Link) error { return e.dht.Provide(l) } -// The below method is exposed for unit testing only +func (e *FxExchange) PingDht(p peer.ID) error { + return e.dht.PingDht(p) +} + func (e *FxExchange) FindProvidersDht(l ipld.Link) ([]peer.AddrInfo, error) { return e.dht.FindProviders(l) } +func (e *FxExchange) PutValueDht(ctx context.Context, key string, val string) error { + return e.dht.dh.PutValue(ctx, key, []byte(val)) +} + +func (e *FxExchange) SearchValueDht(ctx context.Context, key string) (string, error) { + valueStream, err := e.dht.dh.SearchValue(ctx, key) + if err != nil { + return "", fmt.Errorf("error searching value in DHT: %w", err) + } + + var mostAccurateValue []byte + for val := range valueStream { + mostAccurateValue = val // The last value received before the channel closes is the most accurate + } + + if mostAccurateValue == nil { + return "", errors.New("no value found for the key") + } + + return string(mostAccurateValue), nil +} + func (e *FxExchange) GetAuthorizedPeers(ctx context.Context) ([]peer.ID, error) { var peerList []peer.ID for peerId := range e.authorizedPeers { @@ -187,6 +211,13 @@ func (e *FxExchange) Pull(ctx context.Context, from peer.ID, l ipld.Link) error if e.allowTransientConnection { ctx = network.WithUseTransient(ctx, "fx.exchange") } + // Call Provide for the last block link of each response + if err := e.dht.Provide(l); err != nil { + log.Warnw("Failed to provide link via DHT", "link", l, "err", err) + // Decide how to handle the error, e.g., continue, return, etc. + } else { + log.Debug("Success provide link via DHT") + } resps, errs := e.gx.Request(ctx, from, l, selectorparse.CommonSelector_ExploreAllRecursively) for { select { @@ -197,6 +228,13 @@ func (e *FxExchange) Pull(ctx context.Context, from peer.ID, l ipld.Link) error return nil } log.Infow("synced node", "node", resp.Node) + // Call Provide for the last block link of each response + if err := e.dht.Provide(resp.LastBlock.Link); err != nil { + log.Warnw("Failed to provide link via DHT", "link", resp.LastBlock.Link, "err", err) + // Decide how to handle the error, e.g., continue, return, etc. + } else { + log.Debug("Success provide link via DHT") + } case err, ok := <-errs: if !ok { return nil diff --git a/mobile/blockchain.go b/mobile/blockchain.go index 3d413199..0352d921 100644 --- a/mobile/blockchain.go +++ b/mobile/blockchain.go @@ -6,13 +6,6 @@ import ( "github.com/functionland/go-fula/blockchain" ) -// Seeded requests blox at Config.BloxAddr to create a seeded account. -// The seed must start with "/", and the addr must be a valid multiaddr that includes peer ID. -func (c *Client) Seeded(seed string) ([]byte, error) { - ctx := context.TODO() - return c.bl.Seeded(ctx, c.bloxPid, blockchain.SeededRequest{Seed: seed}) -} - // AccountExists requests blox at Config.BloxAddr to check if the account exists or not. // the addr must be a valid multiaddr that includes peer ID. func (c *Client) AccountExists(account string) ([]byte, error) { @@ -26,14 +19,6 @@ func (c *Client) AccountCreate() ([]byte, error) { return c.bl.AccountCreate(ctx, c.bloxPid) } -// AccountFund requests blox at Config.BloxAddr to fund the account. -// the addr must be a valid multiaddr that includes peer ID. -// TODO: This still needs rethink as someone should not be able to put another person PeerID in request -func (c *Client) AccountFund(seed string, amount blockchain.BigInt, to string) ([]byte, error) { - ctx := context.TODO() - return c.bl.AccountFund(ctx, c.bloxPid, blockchain.AccountFundRequest{Amount: amount, To: to}) -} - // AccountBalance requests blox at Config.BloxAddr to get the balance of the account. // the addr must be a valid multiaddr that includes peer ID. func (c *Client) AccountBalance(account string) ([]byte, error) { @@ -41,15 +26,6 @@ func (c *Client) AccountBalance(account string) ([]byte, error) { return c.bl.AccountBalance(ctx, c.bloxPid, blockchain.AccountBalanceRequest{Account: account}) } -// PoolCreate requests blox at Config.BloxAddr to creates a pool with the name. -// the addr must be a valid multiaddr that includes peer ID. -// Note that this call is only allowed on a user's own blox -// TODO: This still needs rethink as someone should not be able to put another person PeerID in request -func (c *Client) PoolCreate(seed string, poolName string) ([]byte, error) { - ctx := context.TODO() - return c.bl.PoolCreate(ctx, c.bloxPid, blockchain.PoolCreateRequest{PoolName: poolName, PeerID: c.bloxPid.String()}) -} - // PoolJoin requests blox at Config.BloxAddr to join a pool with the id. // the addr must be a valid multiaddr that includes peer ID. // Note that this call is only allowed on a user's own blox @@ -88,15 +64,6 @@ func (c *Client) PoolUserList(poolID int) ([]byte, error) { return c.bl.PoolUserList(ctx, c.bloxPid, blockchain.PoolUserListRequest{PoolID: poolID}) } -// PoolVote requests blox at Config.BloxAddr to vote for a join request. -// the addr must be a valid multiaddr that includes peer ID. -// Note that this call is only allowed on a user's own blox -// TODO: This still needs rethink as someone should not be able to put another person PeerID in request -func (c *Client) PoolVote(seed string, poolID int, account string, voteValue bool) ([]byte, error) { - ctx := context.TODO() - return c.bl.PoolVote(ctx, c.bloxPid, blockchain.PoolVoteRequest{PoolID: poolID, Account: account, VoteValue: voteValue}) -} - // PoolLeave requests blox at Config.BloxAddr to leave a pool with the id. // the addr must be a valid multiaddr that includes peer ID. // Note that this call is only allowed on a user's own blox @@ -106,28 +73,6 @@ func (c *Client) PoolLeave(poolID int) ([]byte, error) { return c.bl.PoolLeave(ctx, c.bloxPid, blockchain.PoolLeaveRequest{PoolID: poolID}) } -// ManifestUpload requests blox at Config.BloxAddr to add a manifest(upload request) -// the addr must be a valid multiaddr that includes peer ID. -func (c *Client) ManifestUpload(seed string, poolID int, ReplicationFactor int, uri string) ([]byte, error) { - ctx := context.TODO() - manifestJob := blockchain.ManifestJob{ - Work: "IPFS", - Engine: "Storage", - Uri: uri, - } - manifestMetadata := blockchain.ManifestMetadata{ - Job: manifestJob, - } - return c.bl.ManifestUpload(ctx, c.bloxPid, blockchain.ManifestUploadRequest{PoolID: poolID, ReplicationFactor: ReplicationFactor, ManifestMetadata: manifestMetadata}) -} - -// ManifestStore requests blox at Config.BloxAddr to store a manifest(store request) -// the addr must be a valid multiaddr that includes peer ID. -func (c *Client) ManifestStore(seed string, poolID int, uploader string, cid string) ([]byte, error) { - ctx := context.TODO() - return c.bl.ManifestStore(ctx, c.bloxPid, blockchain.ManifestStoreRequest{PoolID: poolID, Uploader: uploader, Cid: cid}) -} - // ManifestAvailable requests blox at Config.BloxAddr to list manifests // the addr must be a valid multiaddr that includes peer ID. func (c *Client) ManifestAvailable(poolID int) ([]byte, error) { @@ -135,27 +80,6 @@ func (c *Client) ManifestAvailable(poolID int) ([]byte, error) { return c.bl.ManifestAvailable(ctx, c.bloxPid, blockchain.ManifestAvailableRequest{PoolID: poolID}) } -// ManifestRemove requests blox at Config.BloxAddr to remove a manifest -// the addr must be a valid multiaddr that includes peer ID. -func (c *Client) ManifestRemove(seed string, poolID int, cid string) ([]byte, error) { - ctx := context.TODO() - return c.bl.ManifestRemove(ctx, c.bloxPid, blockchain.ManifestRemoveRequest{Cid: cid, PoolID: poolID}) -} - -// The uploader or admin can remove an account that is storing a given manifest. -// the addr must be a valid multiaddr that includes peer ID. -func (c *Client) ManifestRemoveStorer(seed string, storage string, poolID int, cid string) ([]byte, error) { - ctx := context.TODO() - return c.bl.ManifestRemoveStorer(ctx, c.bloxPid, blockchain.ManifestRemoveStorerRequest{Storage: storage, Cid: cid, PoolID: poolID}) -} - -// The storer can stop storing a given manifest -// the addr must be a valid multiaddr that includes peer ID. -func (c *Client) ManifestRemoveStored(seed string, uploader string, poolID int, cid string) ([]byte, error) { - ctx := context.TODO() - return c.bl.ManifestRemoveStored(ctx, c.bloxPid, blockchain.ManifestRemoveStoredRequest{Uploader: uploader, Cid: cid, PoolID: poolID}) -} - ////////////////////////////////////////////////// /////////////////////HARDWARE///////////////////// ////////////////////////////////////////////////// diff --git a/exchange/dht_test.go b/ping/example_test similarity index 74% rename from exchange/dht_test.go rename to ping/example_test index 0c16cb41..2047a32f 100644 --- a/exchange/dht_test.go +++ b/ping/example_test @@ -1,4 +1,4 @@ -package exchange_test +package ping_test import ( "bytes" @@ -8,23 +8,17 @@ import ( "io" "math/rand" "net/http" - "testing" "time" "github.com/functionland/go-fula/blox" - "github.com/functionland/go-fula/exchange" logging "github.com/ipfs/go-log/v2" - "github.com/ipld/go-ipld-prime/fluent" - "github.com/ipld/go-ipld-prime/node/basicnode" "github.com/libp2p/go-libp2p" - dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/libp2p/go-libp2p/core/protocol" ) -var log = logging.Logger("fula/dhttest") +var log = logging.Logger("fula/mockserver") // requestLoggerMiddleware logs the details of each request func requestLoggerMiddleware(next http.Handler) http.Handler { @@ -165,13 +159,11 @@ func startMockServer(addr string) *http.Server { return server } - func updatePoolName(newPoolName string) error { return nil } - -func TestDHTFunctionality(t *testing.T) { - server := startMockServer("127.0.0.1:4001") +func Example_ping() { + server := startMockServer("127.0.0.1:4002") defer func() { // Shutdown the server after test if err := server.Shutdown(context.Background()); err != nil { @@ -206,16 +198,8 @@ func TestDHTFunctionality(t *testing.T) { blox.WithTopicName(poolName), blox.WithHost(h1), blox.WithUpdatePoolName(updatePoolName), - blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithBlockchainEndPoint("127.0.0.1:4002"), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), - blox.WithExchangeOpts( - exchange.WithDhtProviderOptions( - dht.ProtocolExtension(protocol.ID("/"+poolName)), - dht.ProtocolPrefix("/fula"), - dht.Resiliency(1), - dht.Mode(dht.ModeAutoServer), - ), - ), ) if err != nil { panic(err) @@ -240,16 +224,8 @@ func TestDHTFunctionality(t *testing.T) { blox.WithTopicName(poolName), blox.WithHost(h2), blox.WithUpdatePoolName(updatePoolName), - blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithBlockchainEndPoint("127.0.0.1:4002"), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), - blox.WithExchangeOpts( - exchange.WithDhtProviderOptions( - dht.ProtocolExtension(protocol.ID("/"+poolName)), - dht.ProtocolPrefix("/fula"), - dht.Resiliency(1), - dht.Mode(dht.ModeAutoServer), - ), - ), ) if err != nil { panic(err) @@ -274,16 +250,8 @@ func TestDHTFunctionality(t *testing.T) { blox.WithTopicName(poolName), blox.WithHost(h3), blox.WithUpdatePoolName(updatePoolName), - blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithBlockchainEndPoint("127.0.0.1:4002"), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), - blox.WithExchangeOpts( - exchange.WithDhtProviderOptions( - dht.ProtocolExtension(protocol.ID("/"+poolName)), - dht.ProtocolPrefix("/fula"), - dht.Resiliency(1), - dht.Mode(dht.ModeAutoServer), - ), - ), ) if err != nil { panic(err) @@ -308,16 +276,8 @@ func TestDHTFunctionality(t *testing.T) { blox.WithTopicName("0"), blox.WithHost(h4), blox.WithUpdatePoolName(updatePoolName), - blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithBlockchainEndPoint("127.0.0.1:4002"), blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), - blox.WithExchangeOpts( - exchange.WithDhtProviderOptions( - dht.ProtocolExtension(protocol.ID("/"+poolName)), - dht.ProtocolPrefix("/fula"), - dht.Resiliency(1), - dht.Mode(dht.ModeAutoServer), - ), - ), ) if err != nil { panic(err) @@ -393,8 +353,6 @@ func TestDHTFunctionality(t *testing.T) { for { if len(h4.Peerstore().Peers()) >= 4 { break - } else { - fmt.Printf("%s peerstore contains %d nodes:\n", h4.ID(), len(h4.Peerstore().Peers())) } select { case <-ctx.Done(): @@ -404,57 +362,39 @@ func TestDHTFunctionality(t *testing.T) { } } - //Store a link in h1 and find providers from h2 - - // Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1 - n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { - na.AssembleEntry("this").AssignBool(true) - }) - n1leafLink, err := n1.Store(ctx, n1leaf) - if err != nil { + if err = n4.StartPingServer(ctx); err != nil { panic(err) + } else { + fmt.Print("Node 4 ping server started\n") } - n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { - na.AssembleEntry("that").AssignInt(42) - na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink) - }) - n1RootLink, err := n1.Store(ctx, n1Root) + + average, rate, err := n1.Ping(ctx, h4.ID()) if err != nil { + fmt.Println("Error occured in Ping", "err", err) panic(err) } - fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) + fmt.Printf("%s ping results success_count: %d:\n", h1.ID(), rate) + if average < 0 { + panic("average is 0 for first ping") + } - // Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1 - n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { - na.AssembleEntry("that").AssignBool(false) - }) - n2leafLink, err := n2.Store(ctx, n2leaf) + average, rate, err = n2.Ping(ctx, h4.ID()) if err != nil { panic(err) } - n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { - na.AssembleEntry("this").AssignInt(24) - na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink) - }) - n2RootLink, err := n2.Store(ctx, n2Root) - if err != nil { - panic(err) + fmt.Printf("%s ping results success_count: %d:\n", h2.ID(), rate) + if average < 0 { + panic("average is 0 for second ping") } - fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n2RootLink, n2leafLink) - //n1.UpdateDhtPeers(h2.Peerstore().Peers()) - //n2.UpdateDhtPeers(h1.Peerstore().Peers()) - - err = n1.ProvideLinkByDht(n1RootLink) + average, rate, err = n3.Ping(ctx, h4.ID()) if err != nil { - fmt.Print("Error happened here") panic(err) } - peerlist1, err := n2.FindLinkProvidersByDht(n1RootLink) - if err != nil { - panic(err) + fmt.Printf("%s ping results success_count: %d:", h3.ID(), rate) + if average < 0 { + panic("average is 0 for third ping") } - fmt.Println(peerlist1) // Unordered output: // Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT @@ -476,5 +416,8 @@ func TestDHTFunctionality(t *testing.T) { // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe - // [{QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT: [/ip4/127.0.0.1/udp/62495/quic-v1/webtransport/certhash/uEiA8nNwsF3xgNvpbRn7TBQgFKFmTFHka5os9D8yXYaeK2Q/certhash/uEiCDw6zn5RGqUe6FHv_bz45QnLPkCoZDwNdx6wETFMx4pA /ip4/192.168.2.14/tcp/42433 /ip4/192.168.2.14/udp/62494/quic-v1 /ip6/::1/udp/62497/quic-v1/webtransport/certhash/uEiA8nNwsF3xgNvpbRn7TBQgFKFmTFHka5os9D8yXYaeK2Q/certhash/uEiCDw6zn5RGqUe6FHv_bz45QnLPkCoZDwNdx6wETFMx4pA /ip4/127.0.0.1/tcp/42433 /ip4/127.0.0.1/udp/62494/quic-v1 /ip4/192.168.2.14/udp/62495/quic-v1/webtransport/certhash/uEiA8nNwsF3xgNvpbRn7TBQgFKFmTFHka5os9D8yXYaeK2Q/certhash/uEiCDw6zn5RGqUe6FHv_bz45QnLPkCoZDwNdx6wETFMx4pA /ip6/::1/tcp/42434 /ip6/::1/udp/62496/quic /ip6/::1/udp/62496/quic-v1 /dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835/p2p-circuit /ip4/127.0.0.1/udp/62494/quic /ip4/192.168.2.14/udp/62494/quic]}] + // Node 4 ping server started + // QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT ping results success_count: 5: + // QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF ping results success_count: 5: + // QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA ping results success_count: 5: }