From cddfe22a8947c7d4301dc356b2683f8085e44dca Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Sat, 23 Dec 2023 17:29:49 +0000 Subject: [PATCH] Implement the ability to pull a single block (#202) * Implement the ability to pull a single block Add the ability to pull a single block via fx exchange. * changed Pull to PullBlock in MAnifestStore and correctd unittest * tidy up TODOs * Update example_test.go * Update example_test.go --------- Co-authored-by: ehsan shariati --- blox/blox.go | 10 ++-- blox/example_test.go | 35 +++++++----- exchange/fx_exchange.go | 111 ++++++++++++++++++++++++++++++-------- exchange/interface.go | 3 ++ exchange/noop_exchange.go | 5 ++ mobile/blockchain.go | 2 - wap/pkg/wifi/init.go | 15 +++--- 7 files changed, 132 insertions(+), 49 deletions(-) diff --git a/blox/blox.go b/blox/blox.go index 17b9b776..94117d3e 100644 --- a/blox/blox.go +++ b/blox/blox.go @@ -128,7 +128,6 @@ func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error { for _, provider := range providers { if provider.ID != p.h.ID() { //Found a storer, now pull the cid - //TODO: Ideally this should fetch only the cid itseld or the path that is changed with root below it //TODO: Ideally we should have a mechanism to reserve the pull requests and keep the pulled+requests to a max of replication factor log.Debugw("Found a storer", "id", provider.ID) @@ -143,7 +142,7 @@ func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error { } p.h.Peerstore().AddAddrs(provider.ID, []multiaddr.Multiaddr{addr}, peerstore.ConnectedAddrTTL) log.Debugw("Started Pull in StoreCid", "from", provider.ID, "l", l) - err = p.ex.Pull(ctx, provider.ID, l) + err = p.ex.PullBlock(ctx, provider.ID, l) if err != nil { log.Errorw("Error happened in pulling from provider", "err", err) continue @@ -223,8 +222,10 @@ func (p *Blox) StoreManifest(ctx context.Context, links []blockchain.LinkWithLim // FetchAvailableManifestsAndStore fetches available manifests and stores them. func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int) error { + childCtx, cancel := context.WithTimeout(ctx, 4*time.Minute) + defer cancel() // It's good practice to call cancel to free resources if the childCtx finishes before the timeout // Fetch the available manifests for a specific pool_id - availableLinks, err := p.bl.HandleManifestsAvailable(ctx, p.topicName, maxCids) + availableLinks, err := p.bl.HandleManifestsAvailable(childCtx, p.topicName, maxCids) if err != nil { return fmt.Errorf("failed to fetch available manifests: %w", err) } @@ -235,11 +236,10 @@ func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int) } // Attempt to store the fetched manifests - err = p.StoreManifest(ctx, availableLinks, maxCids) + err = p.StoreManifest(childCtx, availableLinks, maxCids) if err != nil { return fmt.Errorf("failed to store manifests: %w", err) } - return nil } diff --git a/blox/example_test.go b/blox/example_test.go index 05ea1f0a..6ad4d5e4 100644 --- a/blox/example_test.go +++ b/blox/example_test.go @@ -150,7 +150,7 @@ func startMockServer(addr string) *http.Server { "manifest_metadata": map[string]interface{}{ "job": map[string]string{ "engine": "IPFS", - "uri": "bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34", + "uri": "bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji", "work": "Storage", }, }, @@ -161,7 +161,7 @@ func startMockServer(addr string) *http.Server { "manifest_metadata": map[string]interface{}{ "job": map[string]string{ "engine": "IPFS", - "uri": "bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy", + "uri": "bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4", "work": "Storage", }, }, @@ -217,9 +217,9 @@ func startMockServer(addr string) *http.Server { // Prepare the ContextID based on the CID var contextID string switch cid { - case "bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy": + case "bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4": contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX")) - case "bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34": + case "bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji": contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM")) default: http.Error(w, "Not Found", http.StatusNotFound) @@ -412,10 +412,12 @@ func Example_poolDiscoverPeersViaPubSub() { // Wait until the nodes discover each other for { - if len(h1.Peerstore().Peers()) == 3 && - len(h2.Peerstore().Peers()) == 3 && - len(h3.Peerstore().Peers()) == 3 { + if len(h1.Peerstore().Peers()) == 4 && + len(h2.Peerstore().Peers()) == 4 && + len(h3.Peerstore().Peers()) == 4 { break + } else { + log.Infow("h1.Peerstore().Peers() is waitting", "h1.Peerstore().Peers()", h1.Peerstore().Peers()) } select { case <-ctx.Done(): @@ -447,18 +449,21 @@ func Example_poolDiscoverPeersViaPubSub() { // Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM // Instantiated node in pool 1 with ID: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX // Instantiated node in pool 1 with ID: 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX - // 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM peerstore contains 3 nodes: + // 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM peerstore contains 4 nodes: // - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX // - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX // - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM - // 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX peerstore contains 3 nodes: + // - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R + // 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX peerstore contains 4 nodes: // - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX // - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM // - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX - // 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX peerstore contains 3 nodes: + // - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R + // 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX peerstore contains 4 nodes: // - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX // - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM // - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX + // - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R } func updatePoolName(newPoolName string) error { @@ -741,7 +746,7 @@ func Example_testMockserver() { } }() // Define the URL - url := "http://127.0.0.1:4000/cid/bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy" + url := "http://127.0.0.1:4000/cid/bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4" // Send a GET request to the server resp, err := http.Get(url) @@ -802,7 +807,7 @@ func Example_storeManifest() { }() const poolName = "1" - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) defer cancel() // Elevate log level to show internal communications. @@ -1154,8 +1159,10 @@ func Example_storeManifest() { // from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM // content: {"that":false} // Found bafyr4iaab3lel4ykjcyzqajx5np2uluetwvfyv3ujupxt5qs57owhpo6ty on 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM - // Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"],"pool_id":1} - // Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"],"pool_id":1} + // Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true} + // Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true} + // Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true} + // Stored manifest: {"cid":["bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji"],"pool_id":1} } type Config struct { diff --git a/exchange/fx_exchange.go b/exchange/fx_exchange.go index 9a6663e0..3c937161 100644 --- a/exchange/fx_exchange.go +++ b/exchange/fx_exchange.go @@ -15,6 +15,7 @@ import ( bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" _ "github.com/ipld/go-ipld-prime/codec/dagcbor" @@ -37,7 +38,7 @@ import ( ) const ( - FxExchangeProtocolID = "/fx.land/exchange/0.0.2" + FxExchangeProtocolID = "/fx.land/exchange/0.0.3" actionPull = "pull" actionPush = "push" @@ -280,7 +281,7 @@ func (e *FxExchange) Start(ctx context.Context) error { defer e.wg.Done() // Decrement the counter when the goroutine completes } defer log.Debug("After HandlerFunc go routine is ending") - if err := e.s.Serve(listen); err != http.ErrServerClosed { + if err := e.s.Serve(listen); !errors.Is(err, http.ErrServerClosed) { log.Errorw("HTTP server stopped with error", "err", err) } }() @@ -342,6 +343,45 @@ func (e *FxExchange) Pull(ctx context.Context, from peer.ID, l ipld.Link) error return nil } } +func (e *FxExchange) PullBlock(ctx context.Context, from peer.ID, l ipld.Link) error { + if e.allowTransientConnection { + ctx = network.WithUseTransient(ctx, "fx.exchange") + } + + cl := l.(cidlink.Link).Cid + + e.tempAuthsLock.Lock() + e.tempAuths[cl.String()] = tempAuthEntry{peerID: from, timestamp: time.Now()} + e.tempAuthsLock.Unlock() + + log.Debugw("Setting a temporary auth for Push in Pull", "from", from, "cid", cl.String(), "e.tempAuths", e.tempAuths) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "libp2p://"+from.String()+"/"+actionPull+"/"+cl.String(), nil) + if err != nil { + log.Errorw("Failed to instantiate pull request", "err", err) + return err + } + switch resp, err := e.c.Do(req); { + case err != nil: + log.Errorw("Failed to read pull response", "err", err) + return err + case resp.StatusCode != http.StatusOK: + defer resp.Body.Close() + log.Errorw("Expected status accepted on pull response", "got", resp.StatusCode) + if b, err := io.ReadAll(resp.Body); err != nil { + log.Errorw("Failed to read response in non-OK status", "got", resp.StatusCode, "err", err) + return fmt.Errorf("unexpected response: %d", resp.StatusCode) + } else { + return fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b)) + } + default: + if err := e.decodeAndStoreBlock(ctx, resp.Body, l); err != nil { + log.Errorw("Failed to pull block", "err", err) + return err + } + log.Debug("Successfully sent push request") + return nil + } +} func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error { if e.allowTransientConnection { @@ -445,7 +485,7 @@ func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) { } switch action { case actionPull: - e.handlePull(from, w, r) + e.handlePull(from, w, r, c) case actionPush: e.handlePush(from, w, r, c) case actionAuth: @@ -468,43 +508,70 @@ func (e *FxExchange) handlePush(from peer.ID, w http.ResponseWriter, r *http.Req http.Error(w, "invalid cid", http.StatusBadRequest) return } - defer r.Body.Close() + if err := e.decodeAndStoreBlock(r.Context(), r.Body, cidlink.Link{Cid: cidInPath}); err != nil { + http.Error(w, "invalid cid", http.StatusBadRequest) + } +} - codec := cidInPath.Prefix().Codec +func (e *FxExchange) decodeAndStoreBlock(ctx context.Context, r io.ReadCloser, link ipld.Link) error { + defer r.Close() + codec := link.(cidlink.Link).Cid.Prefix().Codec decoder, err := ipldmc.LookupDecoder(codec) if err != nil { log.Errorw("No decoder found", "codec", codec) - http.Error(w, "invalid cid", http.StatusBadRequest) - return + return err } nb := basicnode.Prototype.Any.NewBuilder() - if err = decoder(nb, r.Body); err != nil { + if err = decoder(nb, r); err != nil { log.Errorw("Failed to decode pushed node as dagcbor", "err", err) - http.Error(w, "failed to decode node", http.StatusBadRequest) - return + return err } - l, err := e.ls.Store( - ipld.LinkContext{Ctx: r.Context()}, cidlink.LinkPrototype{ - Prefix: cidInPath.Prefix(), - }, nb.Build()) + storedLink, err := e.ls.Store( + ipld.LinkContext{Ctx: ctx}, link.Prototype(), nb.Build()) if err != nil { log.Errorw("Failed to store pushed node", "err", err) - http.Error(w, "failed to store node", http.StatusInternalServerError) - return + return err } - log.Debugw("Successfully stored pushed node", "cid", l.(cidlink.Link).Cid) - log.Debugw("Notifying stored pushed link to IPNI publisher", "link", l) - e.pub.notifyReceivedLink(l) - log.Debugw("Successfully notified stored pushed link to IPNI publisher", "link", l) + log.Debugw("Successfully stored pushed node", "cid", storedLink.(cidlink.Link).Cid) + log.Debugw("Notifying stored pushed link to IPNI publisher", "link", storedLink) + e.pub.notifyReceivedLink(storedLink) + log.Debugw("Successfully notified stored pushed link to IPNI publisher", "link", storedLink) + return nil } -func (e *FxExchange) handlePull(from peer.ID, w http.ResponseWriter, r *http.Request) { +func (e *FxExchange) handlePull(from peer.ID, w http.ResponseWriter, r *http.Request, c string) { log := log.With("action", actionPull, "from", from) - if r.Method != http.MethodPost { + switch r.Method { + case http.MethodGet: + cc, err := cid.Decode(c) + if err != nil { + log.Errorw("Cannot decode CID while handking GET /pull", "cid", c, "err", err) + http.Error(w, "", http.StatusBadRequest) + return + } + + raw, err := e.ls.LoadRaw(ipld.LinkContext{Ctx: r.Context()}, cidlink.Link{Cid: cc}) + if err != nil { + if errors.Is(err, datastore.ErrNotFound) { + log.Errorw("Cannot find link locally while handing GET /pull", "cid", c) + http.Error(w, "", http.StatusNotFound) + return + } + log.Errorw("Failed to load link while handing GET /pull", "cid", c) + http.Error(w, "", http.StatusInternalServerError) + return + } + if _, err := w.Write(raw); err != nil { + log.Errorw("Failed to write response while handing GET /pull", "cid", c) + } + return + case http.MethodPost: // Proceed; POST is allowed. + default: log.Errorw("Only POST is allowed on pull", "got", r.Method) http.Error(w, "", http.StatusMethodNotAllowed) return } + defer r.Body.Close() b, err := io.ReadAll(r.Body) if err != nil { diff --git a/exchange/interface.go b/exchange/interface.go index 074066f5..23ae62f2 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -10,7 +10,10 @@ import ( type Exchange interface { Start(context.Context) error Push(context.Context, peer.ID, ipld.Link) error + // Pull recursively traverses the given link and syncs all its associated blocks and itself from the given peer ID. Pull(context.Context, peer.ID, ipld.Link) error + // PullBlock Pulls a single block associated to the given link from the given peer ID. + PullBlock(context.Context, peer.ID, ipld.Link) error SetAuth(context.Context, peer.ID, peer.ID, bool) error Shutdown(context.Context) error IpniNotifyLink(link ipld.Link) diff --git a/exchange/noop_exchange.go b/exchange/noop_exchange.go index 7f3d1f35..27bfc098 100644 --- a/exchange/noop_exchange.go +++ b/exchange/noop_exchange.go @@ -26,6 +26,11 @@ func (n NoopExchange) Pull(_ context.Context, from peer.ID, l ipld.Link) error { return nil } +func (n NoopExchange) PullBlock(_ context.Context, from peer.ID, l ipld.Link) error { + log.Debugw("Pulled block noop exchange.", "from", from, "link", l) + return nil +} + func (n NoopExchange) SetAuth(_ context.Context, on peer.ID, subject peer.ID, allow bool) error { log.Debugw("Set auth noop exchange.", "on", on, "subject", subject, "allow", allow) return nil diff --git a/mobile/blockchain.go b/mobile/blockchain.go index 6f50e560..518e9507 100644 --- a/mobile/blockchain.go +++ b/mobile/blockchain.go @@ -45,7 +45,6 @@ func (c *Client) PoolJoin(poolID int) ([]byte, error) { // PoolJoin requests blox at Config.BloxAddr to cancel a join request for a pool with the id. // the addr must be a valid multiaddr that includes peer ID. // Note that this call is only allowed on a user's own blox -// TODO: This still needs rethink as someone should not be able to put another person PeerID in request func (c *Client) PoolCancelJoin(poolID int) ([]byte, error) { ctx := context.TODO() return c.bl.PoolCancelJoin(ctx, c.bloxPid, blockchain.PoolCancelJoinRequest{PoolID: poolID}) @@ -75,7 +74,6 @@ func (c *Client) PoolUserList(poolID int) ([]byte, error) { // PoolLeave requests blox at Config.BloxAddr to leave a pool with the id. // the addr must be a valid multiaddr that includes peer ID. // Note that this call is only allowed on a user's own blox -// TODO: This still needs rethink as someone should not be able to put another person PeerID in request func (c *Client) PoolLeave(poolID int) ([]byte, error) { ctx := context.TODO() return c.bl.PoolLeave(ctx, c.bloxPid, blockchain.PoolLeaveRequest{PoolID: poolID}) diff --git a/wap/pkg/wifi/init.go b/wap/pkg/wifi/init.go index acb0ae5e..a791a505 100644 --- a/wap/pkg/wifi/init.go +++ b/wap/pkg/wifi/init.go @@ -79,17 +79,16 @@ func RunCommand(ctx context.Context, commands string) (stdout, stderr string, er func runCommand(ctx context.Context, commands string) (stdout, stderr string, err error) { log.Infow("running", "commands", commands) command := strings.Fields(commands) - // TODO: consider exec.CommandContext as an alternative to enable context timeout/deadline - cmd := exec.Command(command[0]) + cmd := exec.CommandContext(ctx, command[0]) if len(command) > 0 { - cmd = exec.Command(command[0], command[1:]...) + cmd = exec.CommandContext(ctx, command[0], command[1:]...) } var outb, errb bytes.Buffer cmd.Stdout = &outb cmd.Stderr = &errb err = cmd.Start() if err != nil { - return + return stdout, stderr, err } done := make(chan error, 1) go func() { @@ -97,10 +96,14 @@ func runCommand(ctx context.Context, commands string) (stdout, stderr string, er }() select { case <-ctx.Done(): - err = cmd.Process.Kill() + if killErr := cmd.Process.Kill(); killErr != nil { + log.Errorw("Failed to kill process", "error", killErr) + stderr = errb.String() + return outb.String(), stderr, killErr + } case err = <-done: stdout = outb.String() stderr = errb.String() } - return + return stdout, stderr, err }