diff --git a/.circleci/config.yml b/.circleci/config.yml index 0aca44827..1a8a69b79 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -363,4 +363,9 @@ workflows: suite: booster-bitswap target: "./cmd/booster-bitswap" + - test: + name: test-itest-lid-cleanup + suite: itest-lid-cleanup + target: "./itests/lid_cleanup_test.go" + - lid-docker-compose diff --git a/.github/workflows/container-build.yml b/.github/workflows/container-build.yml index fa9fbc559..57abeb1cc 100644 --- a/.github/workflows/container-build.yml +++ b/.github/workflows/container-build.yml @@ -16,7 +16,7 @@ jobs: contents: read packages: write env: - LOTUS_VERSION: 'v1.23.3' + LOTUS_VERSION: 'v1.26.0-rc1' LOTUS_SOURCE_IMAGE: 'ghcr.io/filecoin-shipyard/lotus-containers:lotus' NETWORK_NAME: 'devnet' FFI_BUILD_FROM_SOURCE: '0' diff --git a/api/api.go b/api/api.go index 327fb1866..96f794b44 100644 --- a/api/api.go +++ b/api/api.go @@ -27,7 +27,7 @@ type Boost interface { // MethodGroup: Boost BoostIndexerAnnounceAllDeals(ctx context.Context) error //perm:admin - BoostIndexerListMultihashes(ctx context.Context, proposalCid cid.Cid) ([]multihash.Multihash, error) //perm:admin + BoostIndexerListMultihashes(ctx context.Context, contextID []byte) ([]multihash.Multihash, error) //perm:admin BoostIndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) //perm:admin BoostIndexerAnnounceLatestHttp(ctx context.Context, urls []string) (cid.Cid, error) //perm:admin BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (*ProviderDealRejectionInfo, error) //perm:admin @@ -48,6 +48,7 @@ type Boost interface { // MethodGroup: PieceDirectory PdBuildIndexForPieceCid(ctx context.Context, piececid cid.Cid) error //perm:admin PdRemoveDealForPiece(ctx context.Context, piececid cid.Cid, dealID string) error //perm:admin + PdCleanup(ctx context.Context) error //perm:admin // MethodGroup: Misc OnlineBackup(context.Context, string) error //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 372c6ee3b..1488c7f80 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -58,7 +58,7 @@ type BoostStruct struct { BoostIndexerAnnounceLegacyDeal func(p0 context.Context, p1 cid.Cid) (cid.Cid, error) `perm:"admin"` - BoostIndexerListMultihashes func(p0 context.Context, p1 cid.Cid) ([]multihash.Multihash, error) `perm:"admin"` + BoostIndexerListMultihashes func(p0 context.Context, p1 []byte) ([]multihash.Multihash, error) `perm:"admin"` BoostLegacyDealByProposalCid func(p0 context.Context, p1 cid.Cid) (legacytypes.MinerDeal, error) `perm:"admin"` @@ -68,6 +68,8 @@ type BoostStruct struct { PdBuildIndexForPieceCid func(p0 context.Context, p1 cid.Cid) error `perm:"admin"` + PdCleanup func(p0 context.Context) error `perm:"admin"` + PdRemoveDealForPiece func(p0 context.Context, p1 cid.Cid, p2 string) error `perm:"admin"` } } @@ -340,14 +342,14 @@ func (s *BoostStub) BoostIndexerAnnounceLegacyDeal(p0 context.Context, p1 cid.Ci return *new(cid.Cid), ErrNotSupported } -func (s *BoostStruct) BoostIndexerListMultihashes(p0 context.Context, p1 cid.Cid) ([]multihash.Multihash, error) { +func (s *BoostStruct) BoostIndexerListMultihashes(p0 context.Context, p1 []byte) ([]multihash.Multihash, error) { if s.Internal.BoostIndexerListMultihashes == nil { return *new([]multihash.Multihash), ErrNotSupported } return s.Internal.BoostIndexerListMultihashes(p0, p1) } -func (s *BoostStub) BoostIndexerListMultihashes(p0 context.Context, p1 cid.Cid) ([]multihash.Multihash, error) { +func (s *BoostStub) BoostIndexerListMultihashes(p0 context.Context, p1 []byte) ([]multihash.Multihash, error) { return *new([]multihash.Multihash), ErrNotSupported } @@ -395,6 +397,17 @@ func (s *BoostStub) PdBuildIndexForPieceCid(p0 context.Context, p1 cid.Cid) erro return ErrNotSupported } +func (s *BoostStruct) PdCleanup(p0 context.Context) error { + if s.Internal.PdCleanup == nil { + return ErrNotSupported + } + return s.Internal.PdCleanup(p0) +} + +func (s *BoostStub) PdCleanup(p0 context.Context) error { + return ErrNotSupported +} + func (s *BoostStruct) PdRemoveDealForPiece(p0 context.Context, p1 cid.Cid, p2 string) error { if s.Internal.PdRemoveDealForPiece == nil { return ErrNotSupported diff --git a/build/openrpc/boost.json.gz b/build/openrpc/boost.json.gz index ecddcb396..ba3fd28c0 100644 Binary files a/build/openrpc/boost.json.gz and b/build/openrpc/boost.json.gz differ diff --git a/cmd/boostd/index.go b/cmd/boostd/index.go index e0c575a33..0e4b0fd69 100644 --- a/cmd/boostd/index.go +++ b/cmd/boostd/index.go @@ -8,6 +8,7 @@ import ( lcli "github.com/filecoin-project/lotus/cli" "github.com/google/uuid" "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" "github.com/urfave/cli/v2" ) @@ -45,16 +46,11 @@ var indexProvAnnounceAllCmd = &cli.Command{ var indexProvListMultihashesCmd = &cli.Command{ Name: "list-multihashes", - Usage: "list-multihashes ", - UsageText: "List multihashes for a deal by proposal cid", + Usage: "list-multihashes ", + UsageText: "List multihashes for a deal by proposal cid or deal UUID", Action: func(cctx *cli.Context) error { if cctx.NArg() != 1 { - return fmt.Errorf("must supply proposal cid") - } - - propCid, err := cid.Parse(cctx.Args().First()) - if err != nil { - return fmt.Errorf("parsing proposal cid %s: %w", cctx.Args().First(), err) + return fmt.Errorf("must supply a proposal cid or deal UUID") } ctx := lcli.ReqContext(cctx) @@ -66,13 +62,46 @@ var indexProvListMultihashesCmd = &cli.Command{ } defer closer() - // get list of multihashes - mhs, err := napi.BoostIndexerListMultihashes(ctx, propCid) + if cctx.Args().Len() != 1 { + return fmt.Errorf("must specify only one proposal CID / deal UUID") + } + + id := cctx.Args().Get(0) + + var proposalCid cid.Cid + var mhs []multihash.Multihash + dealUuid, err := uuid.Parse(id) + if err != nil { + propCid, err := cid.Decode(id) + if err != nil { + return fmt.Errorf("could not parse '%s' as deal uuid or proposal cid", id) + } + proposalCid = propCid + } + + if !proposalCid.Defined() { + contextID, err := dealUuid.MarshalBinary() + if err != nil { + return fmt.Errorf("parsing UUID to bytes: %w", err) + } + mhs, err = napi.BoostIndexerListMultihashes(ctx, contextID) + if err != nil { + return err + } + fmt.Printf("Found %d multihashes for deal with ID %s:\n", len(mhs), id) + for _, mh := range mhs { + fmt.Println(" " + mh.String()) + } + + return nil + } + + mhs, err = napi.BoostIndexerListMultihashes(ctx, proposalCid.Bytes()) if err != nil { return err } - fmt.Printf("Found %d multihashes for deal with proposal cid %s:\n", len(mhs), propCid) + fmt.Printf("Found %d multihashes for deal with ID %s:\n", len(mhs), id) for _, mh := range mhs { fmt.Println(" " + mh.String()) } diff --git a/cmd/boostd/piecedir.go b/cmd/boostd/piecedir.go index f508af37d..c8c1cf448 100644 --- a/cmd/boostd/piecedir.go +++ b/cmd/boostd/piecedir.go @@ -19,6 +19,7 @@ var pieceDirCmd = &cli.Command{ pdIndexGenerate, recoverCmd, removeDealCmd, + lidCleanupCmd, }, } @@ -109,3 +110,24 @@ var removeDealCmd = &cli.Command{ }, } + +var lidCleanupCmd = &cli.Command{ + Name: "cleanup", + Usage: "Triggers a cleanup for LID. Command will wait for existing cleanup jobs to finish if there are any", + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + + napi, closer, err := bcli.GetBoostAPI(cctx) + if err != nil { + return err + } + defer closer() + + err = napi.PdCleanup(ctx) + if err != nil { + return fmt.Errorf("clean up failed: %w", err) + } + fmt.Println("LID clean up complete") + return nil + }, +} diff --git a/docker/devnet/Dockerfile.source b/docker/devnet/Dockerfile.source index f3f15a467..e5f6d7e99 100644 --- a/docker/devnet/Dockerfile.source +++ b/docker/devnet/Dockerfile.source @@ -4,7 +4,7 @@ ARG LOTUS_TEST_IMAGE=filecoin/lotus-test:latest FROM ${LOTUS_TEST_IMAGE} as lotus-test ######################################################################################### -FROM node:16.16-alpine3.15 AS react-builder +FROM node:20.11.1-alpine3.19 AS react-builder WORKDIR /src COPY react /src/react @@ -13,7 +13,7 @@ COPY gql /src/gql RUN npm_config_legacy_peer_deps=yes npm ci --no-audit --prefix react&& \ npm run --prefix react build ######################################################################################### -FROM golang:1.20-bullseye as builder +FROM golang:1.21-bullseye as builder RUN apt update && apt install -y \ build-essential \ @@ -31,16 +31,18 @@ RUN apt update && apt install -y \ ENV RUSTUP_HOME=/usr/local/rustup \ CARGO_HOME=/usr/local/cargo \ PATH=/usr/local/cargo/bin:$PATH \ - RUST_VERSION=1.63.0 + RUST_VERSION=1.76.0 RUN set -eux; \ dpkgArch="$(dpkg --print-architecture)"; \ case "${dpkgArch##*-}" in \ - amd64) rustArch='x86_64-unknown-linux-gnu'; rustupSha256='5cc9ffd1026e82e7fb2eec2121ad71f4b0f044e88bca39207b3f6b769aaa799c' ;; \ - arm64) rustArch='aarch64-unknown-linux-gnu'; rustupSha256='e189948e396d47254103a49c987e7fb0e5dd8e34b200aa4481ecc4b8e41fb929' ;; \ + amd64) rustArch='x86_64-unknown-linux-gnu'; rustupSha256='0b2f6c8f85a3d02fde2efc0ced4657869d73fccfce59defb4e8d29233116e6db' ;; \ + armhf) rustArch='armv7-unknown-linux-gnueabihf'; rustupSha256='f21c44b01678c645d8fbba1e55e4180a01ac5af2d38bcbd14aa665e0d96ed69a' ;; \ + arm64) rustArch='aarch64-unknown-linux-gnu'; rustupSha256='673e336c81c65e6b16dcdede33f4cc9ed0f08bde1dbe7a935f113605292dc800' ;; \ + i386) rustArch='i686-unknown-linux-gnu'; rustupSha256='e7b0f47557c1afcd86939b118cbcf7fb95a5d1d917bdd355157b63ca00fc4333' ;; \ *) echo >&2 "unsupported architecture: ${dpkgArch}"; exit 1 ;; \ esac; \ - url="https://static.rust-lang.org/rustup/archive/1.25.1/${rustArch}/rustup-init"; \ + url="https://static.rust-lang.org/rustup/archive/1.26.0/${rustArch}/rustup-init"; \ wget "$url"; \ echo "${rustupSha256} *rustup-init" | sha256sum -c -; \ chmod +x rustup-init; \ diff --git a/documentation/en/api-v1-methods.md b/documentation/en/api-v1-methods.md index 065025d50..6406b3486 100644 --- a/documentation/en/api-v1-methods.md +++ b/documentation/en/api-v1-methods.md @@ -55,6 +55,7 @@ * [OnlineBackup](#onlinebackup) * [Pd](#pd) * [PdBuildIndexForPieceCid](#pdbuildindexforpiececid) + * [PdCleanup](#pdcleanup) * [PdRemoveDealForPiece](#pdremovedealforpiece) ## @@ -554,9 +555,7 @@ Perms: admin Inputs: ```json [ - { - "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" - } + "Ynl0ZSBhcnJheQ==" ] ``` @@ -1245,6 +1244,15 @@ Inputs: Response: `{}` +### PdCleanup + + +Perms: admin + +Inputs: `null` + +Response: `{}` + ### PdRemoveDealForPiece diff --git a/itests/ddo_test.go b/itests/ddo_test.go index fc3f4449e..f51170c1a 100644 --- a/itests/ddo_test.go +++ b/itests/ddo_test.go @@ -59,7 +59,7 @@ func TestDirectDeal(t *testing.T) { require.NoError(t, err) defer f.Stop() - // Send funs to PSD wallet as it is being used for POST + // Send funds to PSD wallet as it is being used for POST info, err := f.FullNode.StateMinerInfo(ctx, f.MinerAddr, types.EmptyTSK) require.NoError(t, err) addresses := []address.Address{info.Owner, info.Worker} diff --git a/itests/lid_cleanup_test.go b/itests/lid_cleanup_test.go new file mode 100644 index 000000000..a9af2dafd --- /dev/null +++ b/itests/lid_cleanup_test.go @@ -0,0 +1,308 @@ +package itests + +import ( + "context" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/filecoin-project/boost/cmd/boost/util" + "github.com/filecoin-project/boost/itests/framework" + "github.com/filecoin-project/boost/storagemarket" + smtypes "github.com/filecoin-project/boost/storagemarket/types" + "github.com/filecoin-project/boost/testutil" + "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-state-types/abi" + verifregst "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" + lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/actors/adt" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/actors/builtin/verifreg" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/wallet/key" + "github.com/filecoin-project/lotus/itests/kit" + sealing "github.com/filecoin-project/lotus/storage/pipeline" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestLIDCleanup(t *testing.T) { + ctx := context.Background() + log := framework.Log + + kit.QuietMiningLogs() + framework.SetLogLevel() + + // Setup datacap wallet and initialise a new ensemble with datacap keys + rootKey, err := key.GenerateKey(types.KTSecp256k1) + require.NoError(t, err) + + verifier1Key, err := key.GenerateKey(types.KTSecp256k1) + require.NoError(t, err) + + bal, err := types.ParseFIL("10000fil") + require.NoError(t, err) + + var eopts []kit.EnsembleOpt + eopts = append(eopts, kit.RootVerifier(rootKey, abi.NewTokenAmount(bal.Int64()))) + eopts = append(eopts, kit.Account(verifier1Key, abi.NewTokenAmount(bal.Int64()))) + esemble := kit.NewEnsemble(t, eopts...) + + var opts []framework.FrameworkOpts + opts = append(opts, framework.WithEnsemble(esemble)) + opts = append(opts, framework.SetProvisionalWalletBalances(int64(9e18))) + opts = append(opts, framework.WithStartEpochSealingBuffer(30)) + opts = append(opts, framework.WithMaxStagingDealsBytes(100000000)) + f := framework.NewTestFramework(ctx, t, opts...) + esemble.Start() + blockTime := 100 * time.Millisecond + esemble.BeginMining(blockTime) + + err = f.Start() + require.NoError(t, err) + defer f.Stop() + + // Send funds to PSD wallet as it is being used for POST + info, err := f.FullNode.StateMinerInfo(ctx, f.MinerAddr, types.EmptyTSK) + require.NoError(t, err) + addresses := []address.Address{info.Owner, info.Worker} + addresses = append(addresses, info.ControlAddresses...) + eg := errgroup.Group{} + eg.SetLimit(4) + for i := 0; i < 6; i++ { + for _, a := range addresses { + addr := a + eg.Go(func() error { + return framework.SendFunds(ctx, f.FullNode, addr, abi.NewTokenAmount(int64(9e18))) + }) + } + } + err = eg.Wait() + require.NoError(t, err) + + // Give the boost client's address enough datacap to make the deal + err = f.AddClientDataCap(t, ctx, rootKey, verifier1Key) + require.NoError(t, err) + + err = f.AddClientProviderBalance(abi.NewTokenAmount(1e15)) + require.NoError(t, err) + + // Create a CAR file + tempdir := t.TempDir() + log.Debugw("using tempdir", "dir", tempdir) + + fileSize := 7048576 + randomFilepath1, err := testutil.CreateRandomFile(tempdir, 5, fileSize) + require.NoError(t, err) + + randomFilepath2, err := testutil.CreateRandomFile(tempdir, 6, fileSize) + require.NoError(t, err) + + // NOTE: these calls to CreateDenseCARv2 have the identity CID builder enabled so will + // produce a root identity CID for this case. So we're testing deal-making and retrieval + // where a DAG has an identity CID root + rootCid1, carFilepath1, err := testutil.CreateDenseCARv2(tempdir, randomFilepath1) + require.NoError(t, err) + + rootCid2, carFilepath2, err := testutil.CreateDenseCARv2(tempdir, randomFilepath2) + require.NoError(t, err) + + // Start a web server to serve the car files + log.Debug("starting webserver") + server, err := testutil.HttpTestFileServer(t, tempdir) + require.NoError(t, err) + defer server.Close() + + // Create a new dummy deal + log.Debug("creating dummy deal") + dealUuid1 := uuid.New() + + // Make a deal + res1, err := f.MakeDummyDeal(dealUuid1, carFilepath1, rootCid1, server.URL+"/"+filepath.Base(carFilepath1), false) + require.NoError(t, err) + require.True(t, res1.Result.Accepted) + log.Debugw("got response from MarketDummyDeal", "res", spew.Sdump(res1)) + + time.Sleep(2 * time.Second) + + dealUuid2 := uuid.New() + res2, err := f.MakeDummyDeal(dealUuid2, carFilepath2, rootCid2, server.URL+"/"+filepath.Base(carFilepath2), false) + require.NoError(t, err) + require.True(t, res2.Result.Accepted) + log.Debugw("got response from MarketDummyDeal", "res", spew.Sdump(res2)) + + time.Sleep(2 * time.Second) + + // Create a CAR file for DDO deal + randomFilepath, err := testutil.CreateRandomFile(tempdir, 5, fileSize) + require.NoError(t, err) + _, carFilepath, err := testutil.CreateDenseCARv2(tempdir, randomFilepath) + require.NoError(t, err) + commp, err := storagemarket.GenerateCommPLocally(carFilepath) + require.NoError(t, err) + t.Logf("Piece CID: %s, Piece Size: %d", commp.PieceCID.String(), commp.Size) + + pieceInfos := []string{fmt.Sprintf("%s=%d", commp.PieceCID, commp.Size)} + minerIds := []string{"f01000"} + allocateMsg, err := util.CreateAllocationMsg(ctx, f.FullNode, pieceInfos, minerIds, f.ClientAddr, + verifregst.MinimumVerifiedAllocationTerm, + verifregst.MaximumVerifiedAllocationTerm, + verifregst.MaximumVerifiedAllocationExpiration) + require.NoError(t, err) + + sm, err := f.FullNode.MpoolPushMessage(ctx, allocateMsg, nil) + require.NoError(t, err) + + _, err = f.FullNode.StateWaitMsg(ctx, sm.Cid(), 1, 1e10, true) + require.NoError(t, err) + + allocations, err := f.FullNode.StateGetAllocations(ctx, f.ClientAddr, types.EmptyTSK) + require.NoError(t, err) + require.Len(t, allocations, 1) + + var allocationId uint64 + for id := range allocations { + allocationId = uint64(id) + } + + head, err := f.FullNode.ChainHead(ctx) + require.NoError(t, err) + + startEpoch := head.Height() + 200 + endEpoch := head.Height() + +2880*400 + + dealUuid := uuid.New() + ddParams := smtypes.DirectDealParams{ + DealUUID: dealUuid, + AllocationID: verifreg.AllocationId(allocationId), + PieceCid: commp.PieceCID, + ClientAddr: f.ClientAddr, + StartEpoch: startEpoch, + EndEpoch: endEpoch, + FilePath: carFilepath, + DeleteAfterImport: false, + RemoveUnsealedCopy: false, + SkipIPNIAnnounce: false, + } + + rej, err := f.Boost.BoostDirectDeal(ctx, ddParams) + require.NoError(t, err) + if rej != nil && rej.Reason != "" { + require.Fail(t, "direct data import rejected: %s", rej.Reason) + } + t.Log("Direct data import scheduled for execution") + + // Wait for sector to start sealing + time.Sleep(2 * time.Second) + + // Wait for all 5 sectors to get to proving state + states := []lapi.SectorState{lapi.SectorState(sealing.Proving)} + require.Eventuallyf(t, func() bool { + stateList, err := f.LotusMiner.SectorsListInStates(ctx, states) + require.NoError(t, err) + return len(stateList) == 5 + }, time.Minute, 2*time.Second, "sectors are still not proving after a minute") + + // Verify that LID has entries for all deals + prop1, err := cborutil.AsIpld(&res1.DealParams.ClientDealProposal) + require.NoError(t, err) + + prop2, err := cborutil.AsIpld(&res2.DealParams.ClientDealProposal) + require.NoError(t, err) + + mhs, err := f.Boost.BoostIndexerListMultihashes(ctx, prop1.Cid().Bytes()) + require.NoError(t, err) + require.Greater(t, len(mhs), 0) + + mhs, err = f.Boost.BoostIndexerListMultihashes(ctx, prop2.Cid().Bytes()) + require.NoError(t, err) + require.Greater(t, len(mhs), 0) + + ddo, err := dealUuid.MarshalBinary() + require.NoError(t, err) + + mhs, err = f.Boost.BoostIndexerListMultihashes(ctx, ddo) + require.NoError(t, err) + require.Greater(t, len(mhs), 0) + + // Wait for wdPost + require.Eventuallyf(t, func() bool { + mActor, err := f.FullNode.StateGetActor(ctx, f.MinerAddr, types.EmptyTSK) + require.NoError(t, err) + store := adt.WrapStore(ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(f.FullNode))) + mas, err := miner.Load(store, mActor) + require.NoError(t, err) + unproven, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors) + require.NoError(t, err) + count, err := unproven.Count() + require.NoError(t, err) + return count == 0 + }, blockTime*(2880), 3*time.Second, "timeout waiting for wdPost") + + // Terminate DDO sector and a deal sector + err = f.LotusMiner.SectorTerminate(ctx, abi.SectorNumber(2)) + require.NoError(t, err) + err = f.LotusMiner.SectorTerminate(ctx, abi.SectorNumber(4)) + require.NoError(t, err) + time.Sleep(2 * time.Second) + _, err = f.LotusMiner.SectorTerminateFlush(ctx) + require.NoError(t, err) + + // Keep trying to terminate in case of deadline issues + require.Eventually(t, func() bool { + tpending, err := f.LotusMiner.SectorTerminatePending(ctx) + require.NoError(t, err) + if len(tpending) == 0 { + return true + } + _, err = f.LotusMiner.SectorTerminateFlush(ctx) + require.NoError(t, err) + return false + }, blockTime*(60*2), 1*time.Second, "timeout waiting for sectors to terminate") + + // Wait for terminate message to be processed + states = []lapi.SectorState{lapi.SectorState(sealing.TerminateFinality)} + require.Eventuallyf(t, func() bool { + stateList, err := f.LotusMiner.SectorsListInStates(ctx, states) + require.NoError(t, err) + return len(stateList) == 2 + }, time.Second*15, 1*time.Second, "timeout waiting for sectors to reach TerminateFinality") + + // Clean up LID + err = f.Boost.PdCleanup(ctx) + require.NoError(t, err) + + // Listing multihashes for DDO deal should fail + _, err = f.Boost.BoostIndexerListMultihashes(ctx, ddo) + require.ErrorContains(t, err, " key not found") + + st, err := f.LotusMiner.SectorsStatus(ctx, abi.SectorNumber(3), true) + require.NoError(t, err) + require.Len(t, st.Pieces, 1) + var removedProp cid.Cid + var remainingProp cid.Cid + + if res1.DealParams.ClientDealProposal.Proposal.PieceCID.Equals(st.Pieces[0].Piece.PieceCID) { + removedProp = prop2.Cid() + remainingProp = prop1.Cid() + } else { + removedProp = prop1.Cid() + remainingProp = prop2.Cid() + } + + // Listing multihashes for removed Boost deal should fail + _, err = f.Boost.BoostIndexerListMultihashes(ctx, removedProp.Bytes()) + require.ErrorContains(t, err, " key not found") + + // Listing multihashes for remaining deal should succeed + mhs, err = f.Boost.BoostIndexerListMultihashes(ctx, remainingProp.Bytes()) + require.NoError(t, err) + require.GreaterOrEqual(t, len(mhs), 1) +} diff --git a/lib/pdcleaner/pdcleaner.go b/lib/pdcleaner/pdcleaner.go index 4efca4d56..7c898a7ae 100644 --- a/lib/pdcleaner/pdcleaner.go +++ b/lib/pdcleaner/pdcleaner.go @@ -2,7 +2,6 @@ package pdcleaner import ( "fmt" - "strconv" "sync" "time" @@ -13,24 +12,24 @@ import ( "github.com/filecoin-project/boost/storagemarket/types" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" - "github.com/filecoin-project/go-state-types/abi" - verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" "github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - "github.com/filecoin-project/lotus/chain/actors/builtin/verifreg" + chaintypes "github.com/filecoin-project/lotus/chain/types" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log/v2" "go.uber.org/fx" "golang.org/x/net/context" + "golang.org/x/sync/errgroup" ) var log = logging.Logger("pdcleaner") type PieceDirectoryCleanup interface { Start(ctx context.Context) - CleanOnce() error + CleanOnce(ctx context.Context) error + getActiveUnprovenSectors(ctx context.Context, tskey chaintypes.TipSetKey) (bitfield.BitField, error) } type pdcleaner struct { @@ -107,7 +106,7 @@ func (p *pdcleaner) clean() { select { case <-ticker.C: log.Infof("Starting LID clean up") - err := p.CleanOnce() + err := p.CleanOnce(p.ctx) if err != nil { log.Errorf("Failed to cleanup LID: %s", err) continue @@ -122,25 +121,21 @@ func (p *pdcleaner) clean() { // CleanOnce generates a list of all Expired-Boost, Legacy and Direct deals. It then attempts to clean up these deals. // It also generated a list of all pieces in LID and tries to find any pieceMetadata with no deals in Boost, Direct or Legacy DB. // If such a deal is found, it is cleaned up as well -func (p *pdcleaner) CleanOnce() error { +func (p *pdcleaner) CleanOnce(ctx context.Context) error { p.lk.Lock() defer p.lk.Unlock() - head, err := p.full.ChainHead(p.ctx) + head, err := p.full.ChainHead(ctx) if err != nil { return fmt.Errorf("getting chain head: %w", err) } tskey := head.Key() - deals, err := p.full.StateMarketDeals(p.ctx, tskey) - if err != nil { - return fmt.Errorf("getting market deals: %w", err) - } - boostCompleteDeals, err := p.dealsDB.ListCompleted(p.ctx) + boostCompleteDeals, err := p.dealsDB.ListCompleted(ctx) if err != nil { return fmt.Errorf("getting complete boost deals: %w", err) } - boostActiveDeals, err := p.dealsDB.ListActive(p.ctx) + boostActiveDeals, err := p.dealsDB.ListActive(ctx) if err != nil { return fmt.Errorf("getting active boost deals: %w", err) } @@ -154,212 +149,171 @@ func (p *pdcleaner) CleanOnce() error { if err != nil { return fmt.Errorf("getting legacy deals: %w", err) } - completeDirectDeals, err := p.directDealsDB.ListCompleted(p.ctx) + completeDirectDeals, err := p.directDealsDB.ListCompleted(ctx) if err != nil { return fmt.Errorf("getting complete direct deals: %w", err) } - // Clean up completed/slashed Boost deals - for _, d := range boostDeals { - // Confirm deal did not reach termination before Publishing. Otherwise, no need to clean up - if d.ChainDealID > abi.DealID(0) { - // If deal exists online - md, ok := deals[strconv.FormatInt(int64(d.ChainDealID), 10)] - if ok { - // If deal is slashed or end epoch has passed. No other reason for deal to reach termination - // Same is true for verified deals. We rely on EndEpoch/SlashEpoch for verified deals created by f05 - toCheck := termOrSlash(md.Proposal.EndEpoch, md.State.SlashEpoch) - if toCheck < head.Height() { - err = p.pd.RemoveDealForPiece(p.ctx, d.ClientDealProposal.Proposal.PieceCID, d.DealUuid.String()) - if err != nil { - // Don't return if cleaning up a deal results in error. Try them all. - log.Errorf("cleaning up boost deal %s for piece %s: %s", d.DealUuid.String(), d.ClientDealProposal.Proposal.PieceCID.String(), err.Error()) - } - } - } - } + activeSectors, err := p.getActiveUnprovenSectors(ctx, tskey) + if err != nil { + return err } - // Clean up completed/slashed legacy deals - for _, d := range legacyDeals { - // Confirm deal did not reach termination before Publishing. Otherwise, no need to clean up - if d.DealID > abi.DealID(0) { - // If deal exists online - md, ok := deals[strconv.FormatInt(int64(d.DealID), 10)] - if ok { - // If deal is slashed or end epoch has passed. No other reason for deal to reach termination - toCheck := termOrSlash(md.Proposal.EndEpoch, md.State.SlashEpoch) - if toCheck < head.Height() { - err = p.pd.RemoveDealForPiece(p.ctx, d.ClientDealProposal.Proposal.PieceCID, d.ProposalCid.String()) - if err != nil { - // Don't return if cleaning up a deal results in error. Try them all. - log.Errorf("cleaning up legacy deal %s for piece %s: %s", d.ProposalCid.String(), d.ClientDealProposal.Proposal.PieceCID.String(), err.Error()) - } - } + // Clean up Boost deals where sector does not exist anymore + boosteg := errgroup.Group{} + boosteg.SetLimit(20) + for _, d := range boostDeals { + deal := d + boosteg.Go(func() error { + present, err := activeSectors.IsSet(uint64(deal.SectorID)) + if err != nil { + return fmt.Errorf("checking if bitfield is set: %w", err) } - } - } - - // Clean up direct deals if there are any otherwise skip this step - if len(completeDirectDeals) > 0 { - claims, err := p.full.StateGetClaims(p.ctx, p.miner, tskey) - if err != nil { - return fmt.Errorf("getting claims for the miner %s: %w", p.miner, err) - } - - // Loading miner actor locally is preferred to avoid getting unnecessary data from full.StateMinerActiveSectors() - mActor, err := p.full.StateGetActor(p.ctx, p.miner, tskey) - if err != nil { - return fmt.Errorf("getting actor for the miner %s: %w", p.miner, err) - } - store := adt.WrapStore(p.ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(p.full))) - mas, err := miner.Load(store, mActor) - if err != nil { - return fmt.Errorf("loading miner actor state %s: %w", p.miner, err) - } - activeSectors, err := miner.AllPartSectors(mas, miner.Partition.ActiveSectors) - if err != nil { - return fmt.Errorf("getting active sector sets for miner %s: %w", p.miner, err) - } - unProvenSectors, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors) - if err != nil { - return fmt.Errorf("getting unproven sector sets for miner %s: %w", p.miner, err) - } - finalSectors, err := bitfield.MergeBitFields(activeSectors, unProvenSectors) - if err != nil { - return fmt.Errorf("merging bitfields to generate all deal sectors on miner %s: %w", p.miner, err) - } - - // Load verifreg actor locally - verifregActor, err := p.full.StateGetActor(p.ctx, verifreg.Address, tskey) - if err != nil { - return fmt.Errorf("getting verified registry actor state: %w", err) - } - verifregState, err := verifreg.Load(store, verifregActor) - if err != nil { - return fmt.Errorf("loading verified registry actor state: %w", err) - } - - for _, d := range completeDirectDeals { - cID := verifregtypes.ClaimId(d.AllocationID) - c, ok := claims[cID] - // If claim found - if ok { - // Claim Sector number and Deal Sector number should match(regardless of how DDO works) - // If they don't match and older sector is removed, then we can't use the metadata - // This check can be removed once Curio has resealing enabled, and it can provide - // new replacement sector details to Boost before deal reached "Complete" state. - if c.Sector != d.SectorID { - err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) - if err != nil { - // Don't return if cleaning up a deal results in error. Try them all. - log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) - } - continue - } - present, err := finalSectors.IsSet(uint64(c.Sector)) + if !present { + err = p.pd.RemoveDealForPiece(ctx, deal.ClientDealProposal.Proposal.PieceCID, deal.DealUuid.String()) if err != nil { - return fmt.Errorf("checking if bitfield is set: %w", err) - } - // Each claim is created with ProveCommit message. So, a sector in claim cannot be unproven. - // it must be either Active(Proving, Faulty, Recovering) or terminated. If bitfield is not set - // then sector must have been terminated. This method will also account for future change in sector numbers - // of a claim. Even if the sector is changed then it must be Active as this change will require a - // ProveCommit message. - if !present { - err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) - if err != nil { - // Don't return if cleaning up a deal results in error. Try them all. - log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) - } + // Don't return if cleaning up a deal results in error. Try them all. + return fmt.Errorf("cleaning up boost deal %s for piece %s: %s", deal.DealUuid.String(), deal.ClientDealProposal.Proposal.PieceCID.String(), err.Error()) } - continue } + return nil + }) + } + err = boosteg.Wait() + if err != nil { + return err + } - // If no claim found - alloc, ok, err := verifregState.GetAllocation(d.Client, d.AllocationID) + // Clean up legacy deals where sector does not exist anymore + legacyeg := errgroup.Group{} + legacyeg.SetLimit(20) + for _, d := range legacyDeals { + deal := d + legacyeg.Go(func() error { + present, err := activeSectors.IsSet(uint64(deal.SectorNumber)) if err != nil { - return fmt.Errorf("getting allocation %d for client %s: %w", d.AllocationID, d.Client, err) + return fmt.Errorf("checking if bitfield is set: %w", err) } - if !ok || alloc.Expiration < head.Height() { - // If allocation is expired, clean up the deal. If the allocation does not exist anymore. - // Either it was claimed and then claim was cleaned up after TermMax - // or allocation expired before it could be claimed and was cleaned up - // Deal should be cleaned up in either case - err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) + if !present { + err = p.pd.RemoveDealForPiece(ctx, deal.ClientDealProposal.Proposal.PieceCID, deal.ProposalCid.String()) if err != nil { // Don't return if cleaning up a deal results in error. Try them all. - log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) + return fmt.Errorf("cleaning up legacy deal %s for piece %s: %s", deal.ProposalCid.String(), deal.ClientDealProposal.Proposal.PieceCID.String(), err.Error()) } - continue } + return nil + }) + } + err = legacyeg.Wait() + if err != nil { + return err + } - if alloc.Expiration < head.Height() { - err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) + // Clean up Direct deals where sector does not exist anymore + // TODO: Refactor for Curio sealing redundancy + ddoeg := errgroup.Group{} + ddoeg.SetLimit(20) + for _, d := range completeDirectDeals { + deal := d + ddoeg.Go(func() error { + present, err := activeSectors.IsSet(uint64(deal.SectorID)) + if err != nil { + return fmt.Errorf("checking if bitfield is set: %w", err) + } + if !present { + err = p.pd.RemoveDealForPiece(ctx, deal.PieceCID, deal.ID.String()) if err != nil { // Don't return if cleaning up a deal results in error. Try them all. - log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) + return fmt.Errorf("cleaning up direct deal %s for piece %s: %s", deal.ID.String(), deal.PieceCID, err.Error()) } - continue } - } + return nil + }) + } + err = ddoeg.Wait() + if err != nil { + return err } // Clean up dangling LID deals with no Boost, Direct or Legacy deals attached to them - plist, err := p.pd.ListPieces(p.ctx) + plist, err := p.pd.ListPieces(ctx) if err != nil { return fmt.Errorf("getting piece list from LID: %w", err) } - for _, piece := range plist { - pdeals, err := p.pd.GetPieceDeals(p.ctx, piece) - if err != nil { - return fmt.Errorf("getting piece deals from LID: %w", err) - } - for _, deal := range pdeals { - // Remove only if the miner ID matches to avoid removing for other miners in case of shared LID - if deal.MinerAddr == p.miner { + lideg := errgroup.Group{} + lideg.SetLimit(50) + for _, pi := range plist { + piece := pi + lideg.Go(func() error { + pdeals, err := p.pd.GetPieceDeals(ctx, piece) + if err != nil { + return fmt.Errorf("getting piece deals from LID: %w", err) + } + for _, deal := range pdeals { + // Remove only if the miner ID matches to avoid removing for other miners in case of shared LID + if deal.MinerAddr == p.miner { - bd, err := p.dealsDB.ByPieceCID(p.ctx, piece) - if err != nil { - return err - } - if len(bd) > 0 { - continue - } + bd, err := p.dealsDB.ByPieceCID(ctx, piece) + if err != nil { + return err + } + if len(bd) > 0 { + continue + } - ld, err := p.legacyDeals.ByPieceCid(p.ctx, piece) - if err != nil { - return err - } - if len(ld) > 0 { - continue - } + ld, err := p.legacyDeals.ByPieceCid(ctx, piece) + if err != nil { + return err + } + if len(ld) > 0 { + continue + } - dd, err := p.directDealsDB.ByPieceCID(p.ctx, piece) - if err != nil { - return err - } - if len(dd) > 0 { - continue - } + dd, err := p.directDealsDB.ByPieceCID(ctx, piece) + if err != nil { + return err + } + if len(dd) > 0 { + continue + } - err = p.pd.RemoveDealForPiece(p.ctx, piece, deal.DealUuid) - if err != nil { - // Don't return if cleaning up a deal results in error. Try them all. - log.Errorf("cleaning up dangling deal %s for piece %s: %s", deal.DealUuid, piece, err.Error()) + err = p.pd.RemoveDealForPiece(ctx, piece, deal.DealUuid) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up dangling deal %s for piece %s: %s", deal.DealUuid, piece, err.Error()) + } } } - } + return nil + }) } - return nil + return lideg.Wait() } -func termOrSlash(term, slash abi.ChainEpoch) abi.ChainEpoch { - if term > slash && slash > 0 { - return slash +func (p *pdcleaner) getActiveUnprovenSectors(ctx context.Context, tskey chaintypes.TipSetKey) (bitfield.BitField, error) { + mActor, err := p.full.StateGetActor(ctx, p.miner, tskey) + if err != nil { + return bitfield.BitField{}, fmt.Errorf("getting actor for the miner %s: %w", p.miner, err) } - return term + store := adt.WrapStore(ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(p.full))) + mas, err := miner.Load(store, mActor) + if err != nil { + return bitfield.BitField{}, fmt.Errorf("loading miner actor state %s: %w", p.miner, err) + } + liveSectors, err := miner.AllPartSectors(mas, miner.Partition.LiveSectors) + if err != nil { + return bitfield.BitField{}, fmt.Errorf("getting live sector sets for miner %s: %w", p.miner, err) + } + unProvenSectors, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors) + if err != nil { + return bitfield.BitField{}, fmt.Errorf("getting unproven sector sets for miner %s: %w", p.miner, err) + } + activeSectors, err := bitfield.MergeBitFields(liveSectors, unProvenSectors) + if err != nil { + return bitfield.BitField{}, fmt.Errorf("merging bitfields to generate all sealed sectors on miner %s: %w", p.miner, err) + } + return activeSectors, nil } diff --git a/lib/pdcleaner/pdcleaner_test.go b/lib/pdcleaner/pdcleaner_test.go deleted file mode 100644 index bad5d52df..000000000 --- a/lib/pdcleaner/pdcleaner_test.go +++ /dev/null @@ -1,192 +0,0 @@ -package pdcleaner - -import ( - "context" - "fmt" - "os" - "strconv" - "testing" - - bdb "github.com/filecoin-project/boost/db" - "github.com/filecoin-project/boost/db/migrations" - "github.com/filecoin-project/boost/extern/boostd-data/client" - "github.com/filecoin-project/boost/extern/boostd-data/model" - "github.com/filecoin-project/boost/extern/boostd-data/svc" - mocks_legacy "github.com/filecoin-project/boost/lib/legacy/mocks" - "github.com/filecoin-project/boost/piecedirectory" - "github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/api" - lotusmocks "github.com/filecoin-project/lotus/api/mocks" - test "github.com/filecoin-project/lotus/chain/events/state/mock" - chaintypes "github.com/filecoin-project/lotus/chain/types" - "github.com/golang/mock/gomock" - "github.com/google/uuid" - "github.com/ipfs/go-cid" - "github.com/ipld/go-car/v2" - "github.com/stretchr/testify/require" -) - -func TestPieceDirectoryCleaner(t *testing.T) { - ctx := context.Background() - - sqldb := bdb.CreateTestTmpDB(t) - require.NoError(t, bdb.CreateAllBoostTables(ctx, sqldb, sqldb)) - require.NoError(t, migrations.Migrate(sqldb)) - - dealsDB := bdb.NewDealsDB(sqldb) - directDB := bdb.NewDirectDealsDB(sqldb) - - bdsvc, err := svc.NewLevelDB("") - require.NoError(t, err) - ln, err := bdsvc.Start(ctx, "localhost:0") - require.NoError(t, err) - - cl := client.NewStore() - err = cl.Dial(ctx, fmt.Sprintf("ws://%s", ln)) - require.NoError(t, err) - defer cl.Close(ctx) - - pieceCount := 5 - readers := make(map[abi.SectorNumber]car.SectionReader) - for i := 0; i < pieceCount; i++ { - // Create a random CAR file - _, carFilePath := piecedirectory.CreateCarFile(t, i+1) - carFile, err := os.Open(carFilePath) - require.NoError(t, err) - defer carFile.Close() - - carReader, err := car.OpenReader(carFilePath) - require.NoError(t, err) - defer carReader.Close() - carv1Reader, err := carReader.DataReader() - require.NoError(t, err) - - readers[abi.SectorNumber(i+1)] = carv1Reader - } - - // Any calls to get a reader over data should return a reader over the random CAR file - pr := piecedirectory.CreateMockPieceReaders(t, readers) - - pm := piecedirectory.NewPieceDirectory(cl, pr, 1) - pm.Start(ctx) - - type dealData struct { - sector abi.SectorNumber - chainDealID abi.DealID - piece cid.Cid - used bool - } - - deals, err := bdb.GenerateDeals() - require.NoError(t, err) - - // Create and update a map to keep track of chainDealID and UUID bindings - dealMap := make(map[uuid.UUID]*dealData) - for _, deal := range deals { - dealMap[deal.DealUuid] = &dealData{chainDealID: deal.ChainDealID, used: false} - } - - // Add deals to LID and note down details to update SQL DB - for sectorNumber, reader := range readers { - pieceCid := piecedirectory.CalculateCommp(t, reader).PieceCID - - var uid uuid.UUID - var cdid abi.DealID - - for id, data := range dealMap { - // If this value from deals list has not be used - if !data.used { - uid = id // Use the UUID from deals list - cdid = data.chainDealID - data.used = true - data.sector = sectorNumber // Use the sector number from deals list - data.piece = pieceCid - break - } - } - - // Add deal info for each piece - di := model.DealInfo{ - DealUuid: uid.String(), - ChainDealID: cdid, - SectorID: sectorNumber, - PieceOffset: 0, - PieceLength: 0, - } - err := pm.AddDealForPiece(ctx, pieceCid, di) - require.NoError(t, err) - } - - // Setup Full node, legacy manager - ctrl := gomock.NewController(t) - fn := lotusmocks.NewMockFullNode(ctrl) - legacyProv := mocks_legacy.NewMockLegacyDealManager(ctrl) - provAddr, err := address.NewIDAddress(1523) - require.NoError(t, err) - - // Start a new PieceDirectoryCleaner - pdc := newPDC(dealsDB, directDB, legacyProv, pm, fn, 1) - pdc.ctx = ctx - - chainHead, err := test.MockTipset(provAddr, 1) - require.NoError(t, err) - chainHeadFn := func(ctx context.Context) (*chaintypes.TipSet, error) { - return chainHead, nil - } - - // Add deals to SQL DB - cDealMap := make(map[string]*api.MarketDeal) - for i, deal := range deals { - data, ok := dealMap[deal.DealUuid] - require.True(t, ok) - deal.SectorID = data.sector - deal.ClientDealProposal.Proposal.PieceCID = data.piece - deal.ClientDealProposal.Proposal.EndEpoch = 3 // because chain head is always 5 - deal.Checkpoint = dealcheckpoints.Complete - p, err := deal.SignedProposalCid() - require.NoError(t, err) - t.Logf("signed p %s", p.String()) - // Test a slashed deal - if i == 0 { - deal.Checkpoint = dealcheckpoints.Accepted - deal.ClientDealProposal.Proposal.EndEpoch = 6 - cDealMap[strconv.FormatInt(int64(deal.ChainDealID), 10)] = &api.MarketDeal{ - Proposal: deal.ClientDealProposal.Proposal, - State: api.MarketDealState{ - SlashEpoch: 3, // Slash this deal - }, - } - err = dealsDB.Insert(ctx, &deal) - require.NoError(t, err) - continue - } - cDealMap[strconv.FormatInt(int64(deal.ChainDealID), 10)] = &api.MarketDeal{ - Proposal: deal.ClientDealProposal.Proposal, - State: api.MarketDealState{ - SlashEpoch: -1, - }, - } - err = dealsDB.Insert(ctx, &deal) - require.NoError(t, err) - } - - // Confirm we have 5 pieces in LID - pl, err := pm.ListPieces(ctx) - require.NoError(t, err) - require.Len(t, pl, 5) - - fn.EXPECT().ChainHead(gomock.Any()).DoAndReturn(chainHeadFn).AnyTimes() - fn.EXPECT().StateMarketDeals(gomock.Any(), gomock.Any()).Return(cDealMap, nil).AnyTimes() - legacyProv.EXPECT().ListDeals().Return(nil, nil).AnyTimes() - legacyProv.EXPECT().ByPieceCid(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() - - err = pdc.CleanOnce() - require.NoError(t, err) - - // Confirm we have 0 pieces in LID after clean up - pl, err = pm.ListPieces(ctx) - require.NoError(t, err) - require.Len(t, pl, 0) -} diff --git a/node/impl/boost.go b/node/impl/boost.go index 305b833de..3b1cf67a4 100644 --- a/node/impl/boost.go +++ b/node/impl/boost.go @@ -8,6 +8,7 @@ import ( "net/http" "github.com/filecoin-project/boost/lib/legacy" + "github.com/filecoin-project/boost/lib/pdcleaner" "github.com/filecoin-project/boost/node/impl/backupmgr" "github.com/filecoin-project/boost/piecedirectory" "github.com/filecoin-project/boost/storagemarket/types/legacytypes" @@ -65,7 +66,8 @@ type BoostAPI struct { Sps sealingpipeline.API // Piece Directory - Pd *piecedirectory.PieceDirectory + Pd *piecedirectory.PieceDirectory + Pdc pdcleaner.PieceDirectoryCleanup // GraphSQL server GraphqlServer *gql.Server @@ -136,8 +138,8 @@ func (sm *BoostAPI) BoostIndexerAnnounceAllDeals(ctx context.Context) error { } // BoostIndexerListMultihashes calls the index provider multihash lister for a given proposal cid -func (sm *BoostAPI) BoostIndexerListMultihashes(ctx context.Context, proposalCid cid.Cid) ([]multihash.Multihash, error) { - it, err := sm.IndexProvider.MultihashLister(ctx, "", proposalCid.Bytes()) +func (sm *BoostAPI) BoostIndexerListMultihashes(ctx context.Context, contextID []byte) ([]multihash.Multihash, error) { + it, err := sm.IndexProvider.MultihashLister(ctx, "", contextID) if err != nil { return nil, err } @@ -221,3 +223,7 @@ func (sm *BoostAPI) PdRemoveDealForPiece(ctx context.Context, piececid cid.Cid, return sm.Pd.RemoveDealForPiece(ctx, piececid, dealID) } + +func (sm *BoostAPI) PdCleanup(ctx context.Context) error { + return sm.Pdc.CleanOnce(ctx) +} diff --git a/storagemarket/direct_deals_provider.go b/storagemarket/direct_deals_provider.go index 91a15e5f3..c88999e9a 100644 --- a/storagemarket/direct_deals_provider.go +++ b/storagemarket/direct_deals_provider.go @@ -487,9 +487,9 @@ func (ddp *DirectDealsProvider) watchSealingUpdates(entry *smtypes.DirectDeal) * return nil } - // Check status every couple of minutes + // Check status every 10 seconds ddp.dealLogger.Infow(entry.ID, "watching deal sealing state changes") - ticker := time.NewTicker(2 * time.Minute) + ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select {