Skip to content

Commit

Permalink
Implement the ability to pull a single block (#202)
Browse files Browse the repository at this point in the history
* Implement the ability to pull a single block

Add the ability to pull a single block via fx exchange.

* changed Pull to PullBlock in MAnifestStore and correctd unittest

* tidy up TODOs

* Update example_test.go

* Update example_test.go

---------

Co-authored-by: ehsan shariati <[email protected]>
  • Loading branch information
masih and ehsan6sha authored Dec 23, 2023
1 parent 22514b9 commit cddfe22
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 49 deletions.
10 changes: 5 additions & 5 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error {
for _, provider := range providers {
if provider.ID != p.h.ID() {
//Found a storer, now pull the cid
//TODO: Ideally this should fetch only the cid itseld or the path that is changed with root below it
//TODO: Ideally we should have a mechanism to reserve the pull requests and keep the pulled+requests to a max of replication factor
log.Debugw("Found a storer", "id", provider.ID)

Expand All @@ -143,7 +142,7 @@ func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error {
}
p.h.Peerstore().AddAddrs(provider.ID, []multiaddr.Multiaddr{addr}, peerstore.ConnectedAddrTTL)
log.Debugw("Started Pull in StoreCid", "from", provider.ID, "l", l)
err = p.ex.Pull(ctx, provider.ID, l)
err = p.ex.PullBlock(ctx, provider.ID, l)
if err != nil {
log.Errorw("Error happened in pulling from provider", "err", err)
continue
Expand Down Expand Up @@ -223,8 +222,10 @@ func (p *Blox) StoreManifest(ctx context.Context, links []blockchain.LinkWithLim

// FetchAvailableManifestsAndStore fetches available manifests and stores them.
func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int) error {
childCtx, cancel := context.WithTimeout(ctx, 4*time.Minute)
defer cancel() // It's good practice to call cancel to free resources if the childCtx finishes before the timeout
// Fetch the available manifests for a specific pool_id
availableLinks, err := p.bl.HandleManifestsAvailable(ctx, p.topicName, maxCids)
availableLinks, err := p.bl.HandleManifestsAvailable(childCtx, p.topicName, maxCids)
if err != nil {
return fmt.Errorf("failed to fetch available manifests: %w", err)
}
Expand All @@ -235,11 +236,10 @@ func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int)
}

// Attempt to store the fetched manifests
err = p.StoreManifest(ctx, availableLinks, maxCids)
err = p.StoreManifest(childCtx, availableLinks, maxCids)
if err != nil {
return fmt.Errorf("failed to store manifests: %w", err)
}

return nil
}

Expand Down
35 changes: 21 additions & 14 deletions blox/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func startMockServer(addr string) *http.Server {
"manifest_metadata": map[string]interface{}{
"job": map[string]string{
"engine": "IPFS",
"uri": "bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34",
"uri": "bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji",
"work": "Storage",
},
},
Expand All @@ -161,7 +161,7 @@ func startMockServer(addr string) *http.Server {
"manifest_metadata": map[string]interface{}{
"job": map[string]string{
"engine": "IPFS",
"uri": "bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy",
"uri": "bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4",
"work": "Storage",
},
},
Expand Down Expand Up @@ -217,9 +217,9 @@ func startMockServer(addr string) *http.Server {
// Prepare the ContextID based on the CID
var contextID string
switch cid {
case "bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy":
case "bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4":
contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX"))
case "bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34":
case "bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji":
contextID = base64.StdEncoding.EncodeToString([]byte("12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM"))
default:
http.Error(w, "Not Found", http.StatusNotFound)
Expand Down Expand Up @@ -412,10 +412,12 @@ func Example_poolDiscoverPeersViaPubSub() {

// Wait until the nodes discover each other
for {
if len(h1.Peerstore().Peers()) == 3 &&
len(h2.Peerstore().Peers()) == 3 &&
len(h3.Peerstore().Peers()) == 3 {
if len(h1.Peerstore().Peers()) == 4 &&
len(h2.Peerstore().Peers()) == 4 &&
len(h3.Peerstore().Peers()) == 4 {
break
} else {
log.Infow("h1.Peerstore().Peers() is waitting", "h1.Peerstore().Peers()", h1.Peerstore().Peers())
}
select {
case <-ctx.Done():
Expand Down Expand Up @@ -447,18 +449,21 @@ func Example_poolDiscoverPeersViaPubSub() {
// Instantiated node in pool 1 with ID: 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// Instantiated node in pool 1 with ID: 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
// Instantiated node in pool 1 with ID: 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
// 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM peerstore contains 3 nodes:
// 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM peerstore contains 4 nodes:
// - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
// - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
// - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX peerstore contains 3 nodes:
// - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R
// 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX peerstore contains 4 nodes:
// - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
// - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
// 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX peerstore contains 3 nodes:
// - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R
// 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX peerstore contains 4 nodes:
// - 12D3KooWH9swjeCyuR6utzKU1UspiW5RDGzAFvNDwqkT5bUHwuxX
// - 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// - 12D3KooWRde3N9rHE8vEyzTiPMVBvs1RpjS4oaWjVkfAt17412vX
// - 12D3KooWFmfEsXjWotvqJ6B3ASXx1w3p6udj8R9f344a9JTu2k4R
}

func updatePoolName(newPoolName string) error {
Expand Down Expand Up @@ -741,7 +746,7 @@ func Example_testMockserver() {
}
}()
// Define the URL
url := "http://127.0.0.1:4000/cid/bafyreibzsetfhqrayathm5tkmm7axuljxcas3pbqrncrosx2fiky4wj5gy"
url := "http://127.0.0.1:4000/cid/bafyr4iauqnsshryxfg2262z6mqev5fyef7gmgjk54skmtggnplehusyno4"

// Send a GET request to the server
resp, err := http.Get(url)
Expand Down Expand Up @@ -802,7 +807,7 @@ func Example_storeManifest() {
}()

const poolName = "1"
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()

// Elevate log level to show internal communications.
Expand Down Expand Up @@ -1154,8 +1159,10 @@ func Example_storeManifest() {
// from 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// content: {"that":false}
// Found bafyr4iaab3lel4ykjcyzqajx5np2uluetwvfyv3ujupxt5qs57owhpo6ty on 12D3KooWQfGkPUkoLDEeJE3H3ZTmu9BZvAdbJpmhha8WpjeSLKMM
// Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"],"pool_id":1}
// Stored manifest: {"cid":["bafyreidulpo7on77a6pkq7c6da5mlj4n2p3av2zjomrpcpeht5zqgafc34"],"pool_id":1}
// Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
// Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
// Voted on 12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q {"pool_id":1,"account":"12D3KooWRTzN7HfmjoUBHokyRZuKdyohVVSGqKBMF24ZC3tGK78Q","vote_value":true}
// Stored manifest: {"cid":["bafyr4ifwexg2ka3kueem7wp36diai4wzqswkdiqscw2su4llkhgwcmq2ji"],"pool_id":1}
}

type Config struct {
Expand Down
111 changes: 89 additions & 22 deletions exchange/fx_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
Expand All @@ -37,7 +38,7 @@ import (
)

const (
FxExchangeProtocolID = "/fx.land/exchange/0.0.2"
FxExchangeProtocolID = "/fx.land/exchange/0.0.3"

actionPull = "pull"
actionPush = "push"
Expand Down Expand Up @@ -280,7 +281,7 @@ func (e *FxExchange) Start(ctx context.Context) error {
defer e.wg.Done() // Decrement the counter when the goroutine completes
}
defer log.Debug("After HandlerFunc go routine is ending")
if err := e.s.Serve(listen); err != http.ErrServerClosed {
if err := e.s.Serve(listen); !errors.Is(err, http.ErrServerClosed) {
log.Errorw("HTTP server stopped with error", "err", err)
}
}()
Expand Down Expand Up @@ -342,6 +343,45 @@ func (e *FxExchange) Pull(ctx context.Context, from peer.ID, l ipld.Link) error
return nil
}
}
func (e *FxExchange) PullBlock(ctx context.Context, from peer.ID, l ipld.Link) error {
if e.allowTransientConnection {
ctx = network.WithUseTransient(ctx, "fx.exchange")
}

cl := l.(cidlink.Link).Cid

e.tempAuthsLock.Lock()
e.tempAuths[cl.String()] = tempAuthEntry{peerID: from, timestamp: time.Now()}
e.tempAuthsLock.Unlock()

log.Debugw("Setting a temporary auth for Push in Pull", "from", from, "cid", cl.String(), "e.tempAuths", e.tempAuths)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "libp2p://"+from.String()+"/"+actionPull+"/"+cl.String(), nil)
if err != nil {
log.Errorw("Failed to instantiate pull request", "err", err)
return err
}
switch resp, err := e.c.Do(req); {
case err != nil:
log.Errorw("Failed to read pull response", "err", err)
return err
case resp.StatusCode != http.StatusOK:
defer resp.Body.Close()
log.Errorw("Expected status accepted on pull response", "got", resp.StatusCode)
if b, err := io.ReadAll(resp.Body); err != nil {
log.Errorw("Failed to read response in non-OK status", "got", resp.StatusCode, "err", err)
return fmt.Errorf("unexpected response: %d", resp.StatusCode)
} else {
return fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(b))
}
default:
if err := e.decodeAndStoreBlock(ctx, resp.Body, l); err != nil {
log.Errorw("Failed to pull block", "err", err)
return err
}
log.Debug("Successfully sent push request")
return nil
}
}

func (e *FxExchange) Push(ctx context.Context, to peer.ID, l ipld.Link) error {
if e.allowTransientConnection {
Expand Down Expand Up @@ -445,7 +485,7 @@ func (e *FxExchange) serve(w http.ResponseWriter, r *http.Request) {
}
switch action {
case actionPull:
e.handlePull(from, w, r)
e.handlePull(from, w, r, c)
case actionPush:
e.handlePush(from, w, r, c)
case actionAuth:
Expand All @@ -468,43 +508,70 @@ func (e *FxExchange) handlePush(from peer.ID, w http.ResponseWriter, r *http.Req
http.Error(w, "invalid cid", http.StatusBadRequest)
return
}
defer r.Body.Close()
if err := e.decodeAndStoreBlock(r.Context(), r.Body, cidlink.Link{Cid: cidInPath}); err != nil {
http.Error(w, "invalid cid", http.StatusBadRequest)
}
}

codec := cidInPath.Prefix().Codec
func (e *FxExchange) decodeAndStoreBlock(ctx context.Context, r io.ReadCloser, link ipld.Link) error {
defer r.Close()
codec := link.(cidlink.Link).Cid.Prefix().Codec
decoder, err := ipldmc.LookupDecoder(codec)
if err != nil {
log.Errorw("No decoder found", "codec", codec)
http.Error(w, "invalid cid", http.StatusBadRequest)
return
return err
}
nb := basicnode.Prototype.Any.NewBuilder()
if err = decoder(nb, r.Body); err != nil {
if err = decoder(nb, r); err != nil {
log.Errorw("Failed to decode pushed node as dagcbor", "err", err)
http.Error(w, "failed to decode node", http.StatusBadRequest)
return
return err
}
l, err := e.ls.Store(
ipld.LinkContext{Ctx: r.Context()}, cidlink.LinkPrototype{
Prefix: cidInPath.Prefix(),
}, nb.Build())
storedLink, err := e.ls.Store(
ipld.LinkContext{Ctx: ctx}, link.Prototype(), nb.Build())
if err != nil {
log.Errorw("Failed to store pushed node", "err", err)
http.Error(w, "failed to store node", http.StatusInternalServerError)
return
return err
}
log.Debugw("Successfully stored pushed node", "cid", l.(cidlink.Link).Cid)
log.Debugw("Notifying stored pushed link to IPNI publisher", "link", l)
e.pub.notifyReceivedLink(l)
log.Debugw("Successfully notified stored pushed link to IPNI publisher", "link", l)
log.Debugw("Successfully stored pushed node", "cid", storedLink.(cidlink.Link).Cid)
log.Debugw("Notifying stored pushed link to IPNI publisher", "link", storedLink)
e.pub.notifyReceivedLink(storedLink)
log.Debugw("Successfully notified stored pushed link to IPNI publisher", "link", storedLink)
return nil
}

func (e *FxExchange) handlePull(from peer.ID, w http.ResponseWriter, r *http.Request) {
func (e *FxExchange) handlePull(from peer.ID, w http.ResponseWriter, r *http.Request, c string) {
log := log.With("action", actionPull, "from", from)
if r.Method != http.MethodPost {
switch r.Method {
case http.MethodGet:
cc, err := cid.Decode(c)
if err != nil {
log.Errorw("Cannot decode CID while handking GET /pull", "cid", c, "err", err)
http.Error(w, "", http.StatusBadRequest)
return
}

raw, err := e.ls.LoadRaw(ipld.LinkContext{Ctx: r.Context()}, cidlink.Link{Cid: cc})
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
log.Errorw("Cannot find link locally while handing GET /pull", "cid", c)
http.Error(w, "", http.StatusNotFound)
return
}
log.Errorw("Failed to load link while handing GET /pull", "cid", c)
http.Error(w, "", http.StatusInternalServerError)
return
}
if _, err := w.Write(raw); err != nil {
log.Errorw("Failed to write response while handing GET /pull", "cid", c)
}
return
case http.MethodPost: // Proceed; POST is allowed.
default:
log.Errorw("Only POST is allowed on pull", "got", r.Method)
http.Error(w, "", http.StatusMethodNotAllowed)
return
}

defer r.Body.Close()
b, err := io.ReadAll(r.Body)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions exchange/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
type Exchange interface {
Start(context.Context) error
Push(context.Context, peer.ID, ipld.Link) error
// Pull recursively traverses the given link and syncs all its associated blocks and itself from the given peer ID.
Pull(context.Context, peer.ID, ipld.Link) error
// PullBlock Pulls a single block associated to the given link from the given peer ID.
PullBlock(context.Context, peer.ID, ipld.Link) error
SetAuth(context.Context, peer.ID, peer.ID, bool) error
Shutdown(context.Context) error
IpniNotifyLink(link ipld.Link)
Expand Down
5 changes: 5 additions & 0 deletions exchange/noop_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ func (n NoopExchange) Pull(_ context.Context, from peer.ID, l ipld.Link) error {
return nil
}

func (n NoopExchange) PullBlock(_ context.Context, from peer.ID, l ipld.Link) error {
log.Debugw("Pulled block noop exchange.", "from", from, "link", l)
return nil
}

func (n NoopExchange) SetAuth(_ context.Context, on peer.ID, subject peer.ID, allow bool) error {
log.Debugw("Set auth noop exchange.", "on", on, "subject", subject, "allow", allow)
return nil
Expand Down
2 changes: 0 additions & 2 deletions mobile/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (c *Client) PoolJoin(poolID int) ([]byte, error) {
// PoolJoin requests blox at Config.BloxAddr to cancel a join request for a pool with the id.
// the addr must be a valid multiaddr that includes peer ID.
// Note that this call is only allowed on a user's own blox
// TODO: This still needs rethink as someone should not be able to put another person PeerID in request
func (c *Client) PoolCancelJoin(poolID int) ([]byte, error) {
ctx := context.TODO()
return c.bl.PoolCancelJoin(ctx, c.bloxPid, blockchain.PoolCancelJoinRequest{PoolID: poolID})
Expand Down Expand Up @@ -75,7 +74,6 @@ func (c *Client) PoolUserList(poolID int) ([]byte, error) {
// PoolLeave requests blox at Config.BloxAddr to leave a pool with the id.
// the addr must be a valid multiaddr that includes peer ID.
// Note that this call is only allowed on a user's own blox
// TODO: This still needs rethink as someone should not be able to put another person PeerID in request
func (c *Client) PoolLeave(poolID int) ([]byte, error) {
ctx := context.TODO()
return c.bl.PoolLeave(ctx, c.bloxPid, blockchain.PoolLeaveRequest{PoolID: poolID})
Expand Down
15 changes: 9 additions & 6 deletions wap/pkg/wifi/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,31 @@ func RunCommand(ctx context.Context, commands string) (stdout, stderr string, er
func runCommand(ctx context.Context, commands string) (stdout, stderr string, err error) {
log.Infow("running", "commands", commands)
command := strings.Fields(commands)
// TODO: consider exec.CommandContext as an alternative to enable context timeout/deadline
cmd := exec.Command(command[0])
cmd := exec.CommandContext(ctx, command[0])
if len(command) > 0 {
cmd = exec.Command(command[0], command[1:]...)
cmd = exec.CommandContext(ctx, command[0], command[1:]...)
}
var outb, errb bytes.Buffer
cmd.Stdout = &outb
cmd.Stderr = &errb
err = cmd.Start()
if err != nil {
return
return stdout, stderr, err
}
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case <-ctx.Done():
err = cmd.Process.Kill()
if killErr := cmd.Process.Kill(); killErr != nil {
log.Errorw("Failed to kill process", "error", killErr)
stderr = errb.String()
return outb.String(), stderr, killErr
}
case err = <-done:
stdout = outb.String()
stderr = errb.String()
}
return
return stdout, stderr, err
}

0 comments on commit cddfe22

Please sign in to comment.