Skip to content

Commit

Permalink
add test and fix lint errs
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Feb 28, 2024
1 parent 2e8b90f commit 718b79c
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 28 deletions.
74 changes: 46 additions & 28 deletions lib/pdcleaner/pdcleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/boost/lib/legacy"
"github.com/filecoin-project/boost/node/config"
"github.com/filecoin-project/boost/piecedirectory"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
Expand All @@ -21,6 +22,8 @@ import (
var log = logging.Logger("pdcleaner")

type PieceDirectoryCleanup interface {
Start(ctx context.Context)
CleanOnce() error
}

type pdcleaner struct {
Expand All @@ -36,24 +39,17 @@ type pdcleaner struct {
func NewPieceDirectoryCleaner(cfg *config.Boost) func(lc fx.Lifecycle, dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, legacyDeals legacy.LegacyDealManager, pd *piecedirectory.PieceDirectory, full v1api.FullNode) PieceDirectoryCleanup {
return func(lc fx.Lifecycle, dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, legacyDeals legacy.LegacyDealManager, pd *piecedirectory.PieceDirectory, full v1api.FullNode) PieceDirectoryCleanup {

mid, err := address.NewFromString(cfg.Wallets.Miner)
if err != nil {
return fmt.Errorf("failed to parse the miner ID %s: %w", cfg.Wallets.Miner, err)
}

pdc := &pdcleaner{
miner: mid,
dealsDB: dealsDB,
directDealsDB: directDealsDB,
legacyDeals: legacyDeals,
pd: pd,
full: full,
}
pdc := newPDC(dealsDB, directDealsDB, legacyDeals, pd, full)

ctx, cancel := context.WithCancel(context.Background())

lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
mid, err := address.NewFromString(cfg.Wallets.Miner)
if err != nil {
return fmt.Errorf("failed to parse the miner ID %s: %w", cfg.Wallets.Miner, err)
}
pdc.miner = mid
go pdc.Start(ctx)
return nil
},
Expand All @@ -68,6 +64,16 @@ func NewPieceDirectoryCleaner(cfg *config.Boost) func(lc fx.Lifecycle, dealsDB *
}
}

func newPDC(dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, legacyDeals legacy.LegacyDealManager, pd *piecedirectory.PieceDirectory, full v1api.FullNode) *pdcleaner {
return &pdcleaner{
dealsDB: dealsDB,
directDealsDB: directDealsDB,
legacyDeals: legacyDeals,
pd: pd,
full: full,
}
}

func (p *pdcleaner) Start(ctx context.Context) {
p.ctx = ctx
go p.clean()
Expand All @@ -82,7 +88,7 @@ func (p *pdcleaner) clean() {
select {
case <-ticker.C:
log.Infof("Starting LID clean up")
err := p.cleanOnce()
err := p.CleanOnce()
if err != nil {
log.Errorf("Failed to cleanup LID: %s", err)
continue
Expand All @@ -94,10 +100,10 @@ func (p *pdcleaner) clean() {
}
}

// cleanOnce generates a list of all Expired-Boost, Legacy and Direct deals. It then attempts to clean up these deals.
// CleanOnce generates a list of all Expired-Boost, Legacy and Direct deals. It then attempts to clean up these deals.
// It also generated a list of all pieces in LID and tries to find any pieceMetadata with no deals in Boost, Direct or Legacy DB.
// If such a deal is found, it is cleaned up as well
func (p *pdcleaner) cleanOnce() error {
func (p *pdcleaner) CleanOnce() error {
head, err := p.full.ChainHead(p.ctx)
if err != nil {
return fmt.Errorf("getting chain head: %w", err)
Expand All @@ -107,15 +113,32 @@ func (p *pdcleaner) cleanOnce() error {
if err != nil {
return fmt.Errorf("getting market deals: %w", err)
}

var boostDeals []*types.ProviderDealState

boostCompleteDeals, err := p.dealsDB.ListCompleted(p.ctx)
if err != nil {
return fmt.Errorf("getting complete boost deals: %w", err)
}
boostActiveDeals, err := p.dealsDB.ListActive(p.ctx)
if err != nil {
return fmt.Errorf("getting active boost deals: %w", err)
}

boostDeals = append(boostDeals, boostCompleteDeals...)
boostDeals = append(boostDeals, boostActiveDeals...)

legacyDeals, err := p.legacyDeals.ListDeals()
if err != nil {
return fmt.Errorf("getting legacy deals: %w", err)
}
completeDirectDeals, err := p.directDealsDB.ListCompleted(p.ctx)
if err != nil {
return fmt.Errorf("getting complete direct deals: %w", err)
}

// Clean up completed Boost deals
for _, d := range boostCompleteDeals {
// Clean up completed/slashed Boost deals
for _, d := range boostDeals {
// Confirm deal did not reach termination before Publishing. Otherwise, no need to clean up
if d.ChainDealID > abi.DealID(0) {
// If deal exists online
Expand Down Expand Up @@ -162,6 +185,7 @@ func (p *pdcleaner) cleanOnce() error {
cID := verifregtypes.ClaimId(d.AllocationID)
c, ok := claims[cID]
if ok {
// TODO: Figure out slashing mechanism in Direct Deals and add that condition here
if c.TermMax < head.Height() {
err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String())
if err != nil {
Expand All @@ -186,14 +210,12 @@ func (p *pdcleaner) cleanOnce() error {
for _, deal := range pdeals {
// Remove only if the miner ID matches to avoid removing for other miners in case of shared LID
if deal.MinerAddr == p.miner {
remove := true

bd, err := p.dealsDB.ByPieceCID(p.ctx, piece)
if err != nil {
return err
}
if len(bd) > 0 {
remove = false
continue
}

Expand All @@ -202,7 +224,6 @@ func (p *pdcleaner) cleanOnce() error {
return err
}
if len(ld) > 0 {
remove = false
continue
}

Expand All @@ -211,16 +232,13 @@ func (p *pdcleaner) cleanOnce() error {
return err
}
if len(dd) > 0 {
remove = false
continue
}

if remove {
err = p.pd.RemoveDealForPiece(p.ctx, piece, deal.DealUuid)
if err != nil {
// Don't return if cleaning up a deal results in error. Try them all.
log.Errorf("cleaning up dangling deal %s for piece %s: %s", deal.DealUuid, piece, err.Error())
}
err = p.pd.RemoveDealForPiece(p.ctx, piece, deal.DealUuid)
if err != nil {
// Don't return if cleaning up a deal results in error. Try them all.
log.Errorf("cleaning up dangling deal %s for piece %s: %s", deal.DealUuid, piece, err.Error())
}
}
}
Expand Down
195 changes: 195 additions & 0 deletions lib/pdcleaner/pdcleaner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package pdcleaner

import (
"context"
"fmt"
"os"
"strconv"
"testing"

bdb "github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/db/migrations"
"github.com/filecoin-project/boost/extern/boostd-data/client"
"github.com/filecoin-project/boost/extern/boostd-data/model"
"github.com/filecoin-project/boost/extern/boostd-data/svc"
mocks_legacy "github.com/filecoin-project/boost/lib/legacy/mocks"
"github.com/filecoin-project/boost/piecedirectory"
"github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v9/market"
"github.com/filecoin-project/lotus/api"
lotusmocks "github.com/filecoin-project/lotus/api/mocks"
test "github.com/filecoin-project/lotus/chain/events/state/mock"
chaintypes "github.com/filecoin-project/lotus/chain/types"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/stretchr/testify/require"
)

func TestPieceDirectoryCleaner(t *testing.T) {
req := require.New(t)
ctx := context.Background()

sqldb := bdb.CreateTestTmpDB(t)
require.NoError(t, bdb.CreateAllBoostTables(ctx, sqldb, sqldb))
require.NoError(t, migrations.Migrate(sqldb))

dealsDB := bdb.NewDealsDB(sqldb)
directDB := bdb.NewDirectDealsDB(sqldb)

bdsvc, err := svc.NewLevelDB("")
require.NoError(t, err)
ln, err := bdsvc.Start(ctx, "localhost:0")
require.NoError(t, err)

cl := client.NewStore()
err = cl.Dial(ctx, fmt.Sprintf("ws://%s", ln))
require.NoError(t, err)
defer cl.Close(ctx)

pieceCount := 5
readers := make(map[abi.SectorNumber]car.SectionReader)
for i := 0; i < pieceCount; i++ {
// Create a random CAR file
_, carFilePath := piecedirectory.CreateCarFile(t, i+1)
carFile, err := os.Open(carFilePath)
require.NoError(t, err)
defer carFile.Close()

carReader, err := car.OpenReader(carFilePath)
require.NoError(t, err)
defer carReader.Close()
carv1Reader, err := carReader.DataReader()
require.NoError(t, err)

readers[abi.SectorNumber(i+1)] = carv1Reader
}

// Any calls to get a reader over data should return a reader over the random CAR file
pr := piecedirectory.CreateMockPieceReaders(t, readers)

pm := piecedirectory.NewPieceDirectory(cl, pr, 1)
pm.Start(ctx)

type dealData struct {
sector abi.SectorNumber
chainDealID abi.DealID
piece cid.Cid
used bool
}

deals, err := bdb.GenerateDeals()
req.NoError(err)

// Create and update a map to keep track of chainDealID and UUID bindings
dealMap := make(map[uuid.UUID]*dealData)
for _, deal := range deals {
dealMap[deal.DealUuid] = &dealData{chainDealID: deal.ChainDealID, used: false}
}

// Add deals to LID and note down details to update SQL DB
for sectorNumber, reader := range readers {
pieceCid := piecedirectory.CalculateCommp(t, reader).PieceCID

var uid uuid.UUID
var cdid abi.DealID

for id, data := range dealMap {
// If this value from deals list has not be used
if !data.used {
uid = id // Use the UUID from deals list
cdid = data.chainDealID
data.used = true
data.sector = sectorNumber // Use the sector number from deals list
data.piece = pieceCid
break
}
}

// Add deal info for each piece
di := model.DealInfo{
DealUuid: uid.String(),
ChainDealID: cdid,
SectorID: sectorNumber,
PieceOffset: 0,
PieceLength: 0,
}
err := pm.AddDealForPiece(ctx, pieceCid, di)
require.NoError(t, err)
}

// Setup Full node, legacy manager
ctrl := gomock.NewController(t)
fn := lotusmocks.NewMockFullNode(ctrl)
legacyProv := mocks_legacy.NewMockLegacyDealManager(ctrl)
provAddr, err := address.NewIDAddress(1523)
require.NoError(t, err)

// Start a new PieceDirectoryCleaner
pdc := newPDC(dealsDB, directDB, legacyProv, pm, fn)
pdc.ctx = ctx

chainHead, err := test.MockTipset(provAddr, 1)
require.NoError(t, err)
chainHeadFn := func(ctx context.Context) (*chaintypes.TipSet, error) {
return chainHead, nil
}

// Add deals to SQL DB
cDealMap := make(map[string]*api.MarketDeal)
for i, deal := range deals {
data, ok := dealMap[deal.DealUuid]
require.True(t, ok)
deal.SectorID = data.sector
deal.ClientDealProposal.Proposal.PieceCID = data.piece
deal.ClientDealProposal.Proposal.EndEpoch = 3 // because chain head is always 5
deal.Checkpoint = dealcheckpoints.Complete
p, err := deal.SignedProposalCid()
require.NoError(t, err)
t.Logf("signed p %s", p.String())
// Test a slashed deal
if i == 0 {
deal.Checkpoint = dealcheckpoints.Accepted
deal.ClientDealProposal.Proposal.EndEpoch = 6
cDealMap[strconv.FormatInt(int64(deal.ChainDealID), 10)] = &api.MarketDeal{
Proposal: deal.ClientDealProposal.Proposal,
State: market.DealState{
SlashEpoch: 3, // Slash this deal
},
}
err = dealsDB.Insert(ctx, &deal)
req.NoError(err)
continue
}
cDealMap[strconv.FormatInt(int64(deal.ChainDealID), 10)] = &api.MarketDeal{
Proposal: deal.ClientDealProposal.Proposal,
State: market.DealState{
SlashEpoch: -1,
},
}
err = dealsDB.Insert(ctx, &deal)
req.NoError(err)
}

// Confirm we have 5 pieces in LID
pl, err := pm.ListPieces(ctx)
require.NoError(t, err)
require.Len(t, pl, 5)

fn.EXPECT().ChainHead(gomock.Any()).DoAndReturn(chainHeadFn).AnyTimes()
fn.EXPECT().StateMarketDeals(gomock.Any(), gomock.Any()).Return(cDealMap, nil).AnyTimes()
fn.EXPECT().StateGetClaims(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
legacyProv.EXPECT().ListDeals().Return(nil, nil).AnyTimes()
legacyProv.EXPECT().ByPieceCid(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

err = pdc.CleanOnce()
require.NoError(t, err)

// Confirm we have 5 pieces in LID after clean up
pl, err = pm.ListPieces(ctx)
require.NoError(t, err)
require.Len(t, pl, 0)
}

0 comments on commit 718b79c

Please sign in to comment.