Skip to content

Commit

Permalink
Added DHT Provide to Pull (#186)
Browse files Browse the repository at this point in the history
* Added DHT Provide to Pull

After Pull the node announces that it can provide the link on dht

* Added method to store cid

* updated tests

* Added storeManifests and tests

StoreManifest first calls blockchain to get  the avaialble manifests and then fetch them and store them

* Added dht to store and get the number of repliactions

* corrected a go cehck

* Update blox.go

* Added Fetch available and Store to a time when blox starts

* Update example_test.go

* Update example_test.go

* Update example_test.go

* Adde check to make sure we are not storing an already stored link

* Added mechanism to make sure we do not get stuck on limit
  • Loading branch information
ehsan6sha authored Dec 3, 2023
1 parent c0ce3d0 commit 69155ad
Show file tree
Hide file tree
Showing 9 changed files with 1,561 additions and 643 deletions.
76 changes: 76 additions & 0 deletions blockchain/bl_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import (
"fmt"
"io"
"net/http"
"strconv"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)
Expand Down Expand Up @@ -80,6 +84,78 @@ func (bl *FxBlockchain) ManifestStore(ctx context.Context, to peer.ID, r Manifes
}
}

func (bl *FxBlockchain) HandleManifestBatchStore(ctx context.Context, poolIDString string, links []ipld.Link) ([]string, error) {
var linksString []string
for _, link := range links {
linksString = append(linksString, link.String())
}
poolID, err := strconv.Atoi(poolIDString)
if err != nil {
// Handle the error if the conversion fails
return nil, fmt.Errorf("invalid topic, not an integer: %s", err)
}
manifestBatchStoreRequest := ManifestBatchStoreRequest{
PoolID: poolID,
Cid: linksString,
}

// Call manifestBatchStore method
responseBody, err := bl.callBlockchain(ctx, "POST", actionManifestBatchStore, manifestBatchStoreRequest)
if err != nil {
return nil, err
}
// Interpret the response
var manifestBatchStoreResponse ManifestBatchStoreResponse
if err := json.Unmarshal(responseBody, &manifestBatchStoreResponse); err != nil {
return nil, fmt.Errorf("failed to unmarshal manifestBatchStore response: %w", err)
}
return manifestBatchStoreResponse.Cid, nil

}
func (bl *FxBlockchain) HandleManifestsAvailable(ctx context.Context, poolIDString string, limit int) ([]LinkWithLimit, error) {

poolID, err := strconv.Atoi(poolIDString)
if err != nil {
// Handle the error if the conversion fails
return nil, fmt.Errorf("invalid topic, not an integer: %s", err)
}
manifestAvailableRequest := ManifestAvailableRequest{
PoolID: poolID,
}

// Call manifestBatchStore method
responseBody, err := bl.callBlockchain(ctx, "POST", actionManifestAvailable, manifestAvailableRequest)
if err != nil {
return nil, err
}
// Interpret the response
var manifestAvailableResponse ManifestAvailableResponse
if err := json.Unmarshal(responseBody, &manifestAvailableResponse); err != nil {
return nil, fmt.Errorf("failed to unmarshal manifestAvailable response: %w", err)
}

var linksWithLimits []LinkWithLimit

// Group links by uploader
for _, manifest := range manifestAvailableResponse.Manifests {
c, err := cid.Decode(manifest.ManifestMetadata.Job.Uri)
if err != nil {
return nil, fmt.Errorf("failed to decode CID: %w", err)
}

if len(linksWithLimits) < limit {
linksWithLimits = append(linksWithLimits, LinkWithLimit{
Link: cidlink.Link{Cid: c},
Limit: manifest.ReplicationAvailable, // Assuming you have the replication limit in the manifest
})
} else {
break
}
}

return linksWithLimits, nil
}

func (bl *FxBlockchain) ManifestAvailable(ctx context.Context, to peer.ID, r ManifestAvailableRequest) ([]byte, error) {

if bl.allowTransientConnection {
Expand Down
26 changes: 23 additions & 3 deletions blockchain/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"reflect"

wifi "github.com/functionland/go-fula/wap/pkg/wifi"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand All @@ -24,6 +25,7 @@ const (
actionPoolLeave = "fula-pool-leave"
actionManifestUpload = "fula-manifest-upload"
actionManifestStore = "fula-manifest-storage"
actionManifestBatchStore = "fula-manifest-batch_storage"
actionManifestAvailable = "fula-manifest-available"
actionManifestRemove = "fula-manifest-remove"
actionManifestRemoveStorer = "fula-manifest-remove_storer"
Expand All @@ -37,6 +39,11 @@ const (
actionDeleteFulaConfig = "delete-fula-config"
)

type LinkWithLimit struct {
Link ipld.Link
Limit int
}

type SeededRequest struct {
Seed string `json:"seed"`
}
Expand Down Expand Up @@ -210,6 +217,17 @@ type ManifestStoreResponse struct {
Cid string `json:"cid"`
}

type ManifestBatchStoreRequest struct {
Cid []string `json:"cid"`
PoolID int `json:"pool_id"`
}

type ManifestBatchStoreResponse struct {
PoolID int `json:"pool_id"`
Storer string `json:"storer"`
Cid []string `json:"cid"`
}

type ManifestAvailableRequest struct {
PoolID int `json:"pool_id"`
}
Expand All @@ -220,9 +238,9 @@ type ManifestData struct {
}

type Manifest struct {
PoolID int `json:"pool_id"`
ReplicationAvailable int `json:"replication_available"`
ManifestData ManifestData `json:"manifest_data"`
PoolID int `json:"pool_id"`
ReplicationAvailable int `json:"replication_available"`
ManifestMetadata ManifestMetadata `json:"manifest_metadata"`
}

type ManifestAvailableResponse struct {
Expand Down Expand Up @@ -311,6 +329,7 @@ var requestTypes = map[string]reflect.Type{
actionPoolLeave: reflect.TypeOf(PoolLeaveRequest{}),
actionManifestUpload: reflect.TypeOf(ManifestUploadRequest{}),
actionManifestStore: reflect.TypeOf(ManifestStoreRequest{}),
actionManifestBatchStore: reflect.TypeOf(ManifestBatchStoreRequest{}),
actionManifestAvailable: reflect.TypeOf(ManifestAvailableRequest{}),
actionManifestRemove: reflect.TypeOf(ManifestRemoveRequest{}),
actionManifestRemoveStorer: reflect.TypeOf(ManifestRemoveStorerRequest{}),
Expand Down Expand Up @@ -339,6 +358,7 @@ var responseTypes = map[string]reflect.Type{
actionPoolLeave: reflect.TypeOf(PoolLeaveResponse{}),
actionManifestUpload: reflect.TypeOf(ManifestUploadResponse{}),
actionManifestStore: reflect.TypeOf(ManifestStoreResponse{}),
actionManifestBatchStore: reflect.TypeOf(ManifestBatchStoreResponse{}),
actionManifestAvailable: reflect.TypeOf(ManifestAvailableResponse{}),
actionManifestRemove: reflect.TypeOf(ManifestRemoveResponse{}),
actionManifestRemoveStorer: reflect.TypeOf(ManifestRemoveStorerResponse{}),
Expand Down
159 changes: 159 additions & 0 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package blox

import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"sync"
"time"

"github.com/functionland/go-fula/announcements"
"github.com/functionland/go-fula/blockchain"
Expand Down Expand Up @@ -105,6 +109,128 @@ func (p *Blox) PubsubValidator(ctx context.Context, id peer.ID, msg *pubsub.Mess
return p.an.ValidateAnnouncement(ctx, id, msg, status, exists)
}

func (p *Blox) StoreCid(ctx context.Context, l ipld.Link, limit int) error {
exists, err := p.Has(ctx, l)
if err != nil {
log.Errorw("And Error happened in checking Has", "err", err)
}
if exists {
log.Warnw("link already exists in datastore", "l", l.String())
return fmt.Errorf("link already exists in datastore %s", l.String())
}
providers, err := p.ex.FindProvidersDht(l)
if err != nil {
log.Errorw("And Error happened in StoreCid", "err", err)
return err
}
// Iterate over the providers and ping
for _, provider := range providers {
if provider.ID != p.h.ID() {
log.Debugw("Pinging storer", "peerID", provider.ID)
err := p.ex.PingDht(provider.ID)
if err == nil {
//Found a storer, now pull the cid
//TODO: Ideally this should fetch only the cid itseld or the path that is changed with root below it
//TODO: Ideally we should have a mechanism to reserve the pull requests and keep the pulled+requests to a max of replication factor
log.Debugw("Found a storer", "id", provider.ID)
replicasStr := ""
replicasStr, err = p.ex.SearchValueDht(ctx, l.String())
if err != nil {
log.Warnw("SearchValue returned an error", "err", err)
}
log.Debugw("SearchValue returned value", "val", replicasStr, "for", l.String())
replicas := 0
replicas, err = strconv.Atoi(replicasStr)
if err != nil {
log.Warn(err)
replicas = 0
}
log.Debugw("Checking replicas vs limit", "replicas", replicas, "limit", limit)
if replicas < limit {
newReplicas := replicas + 1
p.ex.PutValueDht(ctx, l.String(), strconv.Itoa(newReplicas))
err = p.ex.Pull(ctx, provider.ID, l)
if err != nil {
log.Errorw("Error happened in pulling from provider", "err", err)
continue
}
return nil
} else {
return fmt.Errorf("limit of %d is reached for %s", limit, l.String())
}
}
} else {
log.Warnw("provider is the same as requestor", "l", l.String())
return fmt.Errorf("provider is the same as requestor for link %s", l.String())
}
}
return errors.New("no provider found")
}

func (p *Blox) StoreManifest(ctx context.Context, links []blockchain.LinkWithLimit, maxCids int) error {
log.Debugw("StoreManifest", "links", links)
var storedLinks []ipld.Link // Initialize an empty slice for successful storage

for _, l := range links {
if len(storedLinks) >= maxCids {
break
}
exists, err := p.Has(ctx, l.Link)
if err != nil {
continue
}
if exists {
continue
}
err = p.StoreCid(ctx, l.Link, l.Limit) // Assuming StoreCid is a function that stores the link and returns an error if it fails
if err != nil {
// If there's an error, log it and continue with the next link
log.Errorw("Error storing CID", "link", l, "err", err)
continue // Skip appending this link to the storedLinks slice
}
// Append to storedLinks only if StoreCid is successful
storedLinks = append(storedLinks, l.Link)
}
log.Debugw("StoreManifest", "storedLinks", storedLinks)
// Handle the successfully stored links with the blockchain
if len(storedLinks) > 0 {
_, err := p.bl.HandleManifestBatchStore(ctx, p.topicName, storedLinks)
if err != nil {
log.Errorw("Error happened in storing manifest", "err", err)
return err
}
}

// If all links failed to store, return an error
if len(storedLinks) == 0 {
return errors.New("all links failed to store")
}

return nil
}

// FetchAvailableManifestsAndStore fetches available manifests and stores them.
func (p *Blox) FetchAvailableManifestsAndStore(ctx context.Context, maxCids int) error {
// Fetch the available manifests for a specific pool_id
availableLinks, err := p.bl.HandleManifestsAvailable(ctx, p.topicName, maxCids)
if err != nil {
return fmt.Errorf("failed to fetch available manifests: %w", err)
}
log.Debugw("FetchAvailableManifestsAndStore", "availableLinks", availableLinks)
// Check if there are enough manifests available
if len(availableLinks) == 0 {
return errors.New("no available manifests to store")
}

// Attempt to store the fetched manifests
err = p.StoreManifest(ctx, availableLinks, maxCids)
if err != nil {
return fmt.Errorf("failed to store manifests: %w", err)
}

return nil
}

func (p *Blox) Start(ctx context.Context) error {
// implemented topic validators with chain integration.
validator := p.PubsubValidator
Expand Down Expand Up @@ -154,6 +280,34 @@ func (p *Blox) Start(ctx context.Context) error {
}
p.ctx, p.cancel = context.WithCancel(context.Background())

// Starting a new goroutine for periodic task
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer log.Debug("Start blox FetchAvailableManifestsAndStore go routine is ending")
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// This block will execute every 5 minutes
if err := p.FetchAvailableManifestsAndStore(ctx, 10); err != nil {
log.Errorw("Error in FetchAvailableManifestsAndStore", "err", err)
// Handle the error or continue based on your requirement
}
case <-ctx.Done():
// This will handle the case where the parent context is canceled
log.Info("Stopping periodic FetchAvailableManifestsAndStore due to context cancellation")
return
case <-p.ctx.Done():
// This will handle the case where the parent context is canceled
log.Info("Stopping periodic FetchAvailableManifestsAndStore due to context cancellation")
return
}
}
}()

return nil
}

Expand All @@ -175,6 +329,11 @@ func (p *Blox) Ping(ctx context.Context, to peer.ID) (int, int, error) {
return p.pn.Ping(ctx, to)
}

func (p *Blox) PingDht(to peer.ID) error {
//This is for unit testing and no need to call directly
return p.ex.PingDht(to)
}

func (p *Blox) GetBlMembers() map[peer.ID]common.MemberStatus {
//This is for unit testing and no need to call directly
return p.bl.GetMembers()
Expand Down
Loading

0 comments on commit 69155ad

Please sign in to comment.