Skip to content

Commit

Permalink
feat: mine new overlay (#4685)
Browse files Browse the repository at this point in the history
  • Loading branch information
nugaon authored Jul 9, 2024
1 parent 4369afe commit 98eb3e0
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 36 deletions.
11 changes: 6 additions & 5 deletions cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer/migration"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/util/ioutil"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -159,7 +160,7 @@ func dbCompactCmd(cmd *cobra.Command) {
time.Sleep(10 * time.Second)
logger.Warning("proceeding with database compaction...")

localstorePath := path.Join(dataDir, "localstore")
localstorePath := path.Join(dataDir, ioutil.DataPathLocalstore)

err = storer.Compact(context.Background(), localstorePath, &storer.Options{
Logger: logger,
Expand Down Expand Up @@ -214,7 +215,7 @@ func dbValidatePinsCmd(cmd *cobra.Command) {
return fmt.Errorf("read location option: %w", err)
}

localstorePath := path.Join(dataDir, "localstore")
localstorePath := path.Join(dataDir, ioutil.DataPathLocalstore)

err = storer.ValidatePinCollectionChunks(context.Background(), localstorePath, providedPin, outputLoc, &storer.Options{
Logger: logger,
Expand Down Expand Up @@ -340,7 +341,7 @@ func dbValidateCmd(cmd *cobra.Command) {
logger.Warning(" Progress logged at Info level.")
logger.Warning(" SOC chunks logged at Debug level.")

localstorePath := path.Join(dataDir, "localstore")
localstorePath := path.Join(dataDir, ioutil.DataPathLocalstore)

err = storer.ValidateRetrievalIndex(context.Background(), localstorePath, &storer.Options{
Logger: logger,
Expand Down Expand Up @@ -768,8 +769,8 @@ func dbNukeCmd(cmd *cobra.Command) {
optionNameForgetOverlay = "forget-overlay"
optionNameForgetStamps = "forget-stamps"

localstore = "localstore"
kademlia = "kademlia-metrics"
localstore = ioutil.DataPathLocalstore
kademlia = ioutil.DataPathKademlia
statestore = "statestore"
stamperstore = "stamperstore"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/node/devnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (b *DevBee) Shutdown() error {
tryClose(b.pssCloser, "pss")
tryClose(b.tracerCloser, "tracer")
tryClose(b.stateStoreCloser, "statestore")
tryClose(b.localstoreCloser, "localstore")
tryClose(b.localstoreCloser, ioutil.DataPathLocalstore)

return mErr
}
Expand Down
78 changes: 50 additions & 28 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,6 @@ func NewBee(
if err != nil {
return nil, err
}
b.stateStoreCloser = stateStore

// Check if the batchstore exists. If not, we can assume it's missing
// due to a migration or it's a fresh install.
batchStoreExists, err := batchStoreExists(stateStore)
if err != nil {
return nil, fmt.Errorf("batchstore: exists: %w", err)
}

addressbook := addressbook.New(stateStore)

pubKey, err := signer.PublicKey()
if err != nil {
Expand All @@ -274,37 +264,69 @@ func NewBee(
return nil, fmt.Errorf("compute overlay address: %w", err)
}

if nonceExists && o.TargetNeighborhood != "" {
logger.Warning("an overlay has already been created before, skipping targeting the selected neighborhood")
targetNeighborhood := o.TargetNeighborhood
if targetNeighborhood == "" && !nonceExists && o.NeighborhoodSuggester != "" {
logger.Info("fetching target neighborhood from suggester", "url", o.NeighborhoodSuggester)
targetNeighborhood, err = nbhdutil.FetchNeighborhood(&http.Client{}, o.NeighborhoodSuggester)
if err != nil {
return nil, fmt.Errorf("neighborhood suggestion: %w", err)
}
}

if !nonceExists {
// mine the overlay
targetNeighborhood := o.TargetNeighborhood
if o.TargetNeighborhood == "" && o.NeighborhoodSuggester != "" {
logger.Info("fetching target neighborhood from suggester", "url", o.NeighborhoodSuggester)
targetNeighborhood, err = nbhdutil.FetchNeighborhood(&http.Client{}, o.NeighborhoodSuggester)
if err != nil {
return nil, fmt.Errorf("neighborhood suggestion: %w", err)
}
if targetNeighborhood != "" {
neighborhood, err := swarm.ParseBitStrAddress(targetNeighborhood)
if err != nil {
return nil, fmt.Errorf("invalid neighborhood. %s", targetNeighborhood)
}

if targetNeighborhood != "" {
if swarm.Proximity(swarmAddress.Bytes(), neighborhood.Bytes()) < uint8(len(targetNeighborhood)) {
// mine the overlay
logger.Info("mining an overlay address for the fresh node to target the selected neighborhood", "target", targetNeighborhood)
swarmAddress, nonce, err = nbhdutil.MineOverlay(ctx, *pubKey, networkID, targetNeighborhood)
newSwarmAddress, newNonce, err := nbhdutil.MineOverlay(ctx, *pubKey, networkID, targetNeighborhood)
if err != nil {
return nil, fmt.Errorf("mine overlay address: %w", err)
}
}

err = setOverlay(stateStore, swarmAddress, nonce)
if err != nil {
return nil, fmt.Errorf("statestore: save new overlay: %w", err)
if nonceExists {
logger.Info("Override nonce %d to %d and clean state for neighborhood %s", nonce, newNonce, targetNeighborhood)
logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...")
time.Sleep(10 * time.Second)

dirsToNuke := []string{ioutil.DataPathLocalstore, ioutil.DataPathKademlia}
for _, dir := range dirsToNuke {
err := ioutil.RemoveContent(filepath.Join(o.DataDir, dir))
if err != nil {
return nil, fmt.Errorf("delete %s: %w", dir, err)
}
}

if err := stateStore.ClearForHopping(); err != nil {
return nil, fmt.Errorf("clearing stateStore %w", err)
}
}

swarmAddress = newSwarmAddress
nonce = newNonce
err = setOverlay(stateStore, swarmAddress, nonce)
if err != nil {
return nil, fmt.Errorf("statestore: save new overlay: %w", err)
}
}
}

b.stateStoreCloser = stateStore
// Check if the batchstore exists. If not, we can assume it's missing
// due to a migration or it's a fresh install.
batchStoreExists, err := batchStoreExists(stateStore)
if err != nil {
return nil, fmt.Errorf("batchstore: exists: %w", err)
}

addressbook := addressbook.New(stateStore)

logger.Info("using overlay address", "address", swarmAddress)

// this will set overlay if it was not set before
if err = checkOverlay(stateStore, swarmAddress); err != nil {
return nil, fmt.Errorf("check overlay address: %w", err)
}
Expand Down Expand Up @@ -672,7 +694,7 @@ func NewBee(

if o.DataDir != "" {
logger.Info("using datadir", "path", o.DataDir)
path = filepath.Join(o.DataDir, "localstore")
path = filepath.Join(o.DataDir, ioutil.DataPathLocalstore)
}

lo := &storer.Options{
Expand Down
2 changes: 1 addition & 1 deletion pkg/node/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// InitStateStore will initialize the stateStore with the given path to the
// data directory. When given an empty directory path, the function will instead
// initialize an in-memory state store that will not be persisted.
func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (storage.StateStorer, metrics.Collector, error) {
func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (storage.StateStorerManager, metrics.Collector, error) {
if dataDir == "" {
logger.Warning("using in-mem state store, no node state will be persisted")
} else {
Expand Down
18 changes: 18 additions & 0 deletions pkg/statestore/storeadapter/storeadapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,24 @@ func (s *StateStorerAdapter) Nuke() error {
return s.deleteKeys(keys)
}

func (s *StateStorerAdapter) ClearForHopping() error {
var (
prefixesToPreserve = []string{
"swap_chequebook", // to not redeploy chequebook contract
"batchstore", // avoid unnecessary syncing
"transaction", // to not resync blockchain transactions
}
keys []string
err error
)

keys, err = s.collectKeysExcept(prefixesToPreserve)
if err != nil {
return fmt.Errorf("collect keys except: %w", err)
}
return s.deleteKeys(keys)
}

func (s *StateStorerAdapter) collectKeysExcept(prefixesToPreserve []string) (keys []string, err error) {
if err := s.Iterate("", func(k, v []byte) (bool, error) {
stk := string(k)
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,12 @@ type StateStorer interface {
type StateStorerCleaner interface {
// Nuke the store so that only the bare essential entries are left.
Nuke() error
// ClearForHopping removes all data not required in a new neighborhood
ClearForHopping() error
}

// StateStorerManager defines all external methods of the state storage
type StateStorerManager interface {
StateStorer
StateStorerCleaner
}
3 changes: 2 additions & 1 deletion pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
im "github.com/ethersphere/bee/v2/pkg/topology/kademlia/internal/metrics"
"github.com/ethersphere/bee/v2/pkg/topology/kademlia/internal/waitnext"
"github.com/ethersphere/bee/v2/pkg/topology/pslice"
"github.com/ethersphere/bee/v2/pkg/util/ioutil"
ma "github.com/multiformats/go-multiaddr"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -209,7 +210,7 @@ func New(
if o.DataDir == "" {
logger.Warning("using in-mem store for kademlia metrics, no state will be persisted")
} else {
o.DataDir = filepath.Join(o.DataDir, "kademlia-metrics")
o.DataDir = filepath.Join(o.DataDir, ioutil.DataPathKademlia)
}
sdb, err := shed.NewDB(o.DataDir, nil)
if err != nil {
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/ioutil/ioutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@

package ioutil

import (
"errors"
"os"
"path/filepath"
)

// DB folders paths from bee datadir
const (
DataPathLocalstore = "localstore"
DataPathKademlia = "kademlia-metrics"
)

// The WriterFunc type is an adapter to allow the use of
// ordinary functions as io.Writer Write method. If f is
// a function with the appropriate signature, WriterFunc(f)
Expand All @@ -14,3 +26,28 @@ type WriterFunc func([]byte) (int, error)
func (f WriterFunc) Write(p []byte) (n int, err error) {
return f(p)
}

// RemoveContent removes all files in path. Copied function from cmd/db.go
func RemoveContent(path string) error {
dir, err := os.Open(path)
if errors.Is(err, os.ErrNotExist) {
return nil
}
if err != nil {
return err
}
defer dir.Close()

subpaths, err := dir.Readdirnames(0)
if err != nil {
return err
}

for _, sub := range subpaths {
err = os.RemoveAll(filepath.Join(path, sub))
if err != nil {
return err
}
}
return nil
}

0 comments on commit 98eb3e0

Please sign in to comment.