diff --git a/go.mod b/go.mod index 5e8e02008..f62c7e55a 100644 --- a/go.mod +++ b/go.mod @@ -213,7 +213,7 @@ require ( github.com/ipfs/go-ipld-cbor v0.1.0 github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-peertaskqueue v0.8.1 // indirect - github.com/ipfs/go-unixfsnode v1.8.0 + github.com/ipfs/go-unixfsnode v1.9.0 github.com/ipfs/go-verifcid v0.0.2 // indirect github.com/ipld/go-codec-dagpb v1.6.0 github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 // indirect @@ -327,6 +327,7 @@ require ( github.com/filecoin-project/lotus v1.23.4-rc1 github.com/ipfs/boxo v0.12.0 github.com/ipfs/kubo v0.22.0 + github.com/ipld/go-trustless-utils v0.4.1 github.com/ipni/go-libipni v0.5.1 github.com/ipni/ipni-cli v0.1.1 github.com/schollz/progressbar/v3 v3.13.1 diff --git a/go.sum b/go.sum index 2b9f602af..6c8a506aa 100644 --- a/go.sum +++ b/go.sum @@ -890,8 +890,8 @@ github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul github.com/ipfs/go-unixfs v0.3.1/go.mod h1:h4qfQYzghiIc8ZNFKiLMFWOTzrWIAtzYQ59W/pCFf1o= github.com/ipfs/go-unixfs v0.4.5 h1:wj8JhxvV1G6CD7swACwSKYa+NgtdWC1RUit+gFnymDU= github.com/ipfs/go-unixfsnode v1.4.0/go.mod h1:qc7YFFZ8tABc58p62HnIYbUMwj9chhUuFWmxSokfePo= -github.com/ipfs/go-unixfsnode v1.8.0 h1:yCkakzuE365glu+YkgzZt6p38CSVEBPgngL9ZkfnyQU= -github.com/ipfs/go-unixfsnode v1.8.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8= +github.com/ipfs/go-unixfsnode v1.9.0 h1:ubEhQhr22sPAKO2DNsyVBW7YB/zA8Zkif25aBvz8rc8= +github.com/ipfs/go-unixfsnode v1.9.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8= github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0= github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs= github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU= @@ -926,6 +926,9 @@ github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo= github.com/ipld/go-ipld-selector-text-lite v0.0.1 h1:lNqFsQpBHc3p5xHob2KvEg/iM5dIFn6iw4L/Hh+kS1Y= github.com/ipld/go-ipld-selector-text-lite v0.0.1/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM= +github.com/ipld/go-trustless-utils v0.4.1 h1:puA14381Hg2LzH724mZ5ZFKFx+FFjjT5fPFs01vwlgM= +github.com/ipld/go-trustless-utils v0.4.1/go.mod h1:DgGuyfJ33goYwYVisjnxrlra0HVmZuHWVisVIkzVo1o= +github.com/ipld/ipld/specs v0.0.0-20231012031213-54d3b21deda4 h1:0VXv637/xpI0Pb5J8K+K8iRtTw4DOcxs0MB1HMzfwNY= github.com/ipni/go-libipni v0.5.1 h1:HumuJtKmV8RoDpBakLgxCSl5QPiD2ljTZl/NOyXO6nM= github.com/ipni/go-libipni v0.5.1/go.mod h1:UnrhEqjVI2Z2HXlaieOBONJmtW557nZkYpB4IIsMD+s= github.com/ipni/index-provider v0.14.2 h1:daA3IFnI2n2x/mL0K91SQHNLq6Vvfp5q4uFX9G4glvE= diff --git a/itests/dummydeal_offline_test.go b/itests/dummydeal_offline_test.go index 296d02600..fb11e06f6 100644 --- a/itests/dummydeal_offline_test.go +++ b/itests/dummydeal_offline_test.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/itests/kit" "github.com/google/uuid" + trustlessutils "github.com/ipld/go-trustless-utils" "github.com/stretchr/testify/require" ) @@ -46,6 +47,11 @@ func TestDummydealOffline(t *testing.T) { err = f.WaitForDealAddedToSector(offlineDealUuid) require.NoError(t, err) - outFile := f.Retrieve(ctx, t, tempdir, rootCid, dealRes.DealParams.ClientDealProposal.Proposal.PieceCID, true, nil) + outFile := f.Retrieve( + ctx, + t, + trustlessutils.Request{Root: rootCid, Scope: trustlessutils.DagScopeAll}, + true, + ) kit.AssertFilesEqual(t, randomFilepath, outFile) } diff --git a/itests/dummydeal_test.go b/itests/dummydeal_test.go index 2a7c9a692..b24ea3c10 100644 --- a/itests/dummydeal_test.go +++ b/itests/dummydeal_test.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/itests/kit" "github.com/google/uuid" + trustlessutils "github.com/ipld/go-trustless-utils" "github.com/stretchr/testify/require" ) @@ -94,6 +95,11 @@ func TestDummydealOnline(t *testing.T) { require.NoError(t, err) // rootCid is an identity CID - outFile := f.Retrieve(ctx, t, tempdir, rootCid, res.DealParams.ClientDealProposal.Proposal.PieceCID, true, nil) + outFile := f.Retrieve( + ctx, + t, + trustlessutils.Request{Root: rootCid, Scope: trustlessutils.DagScopeAll}, + true, + ) kit.AssertFilesEqual(t, randomFilepath, outFile) } diff --git a/itests/framework/framework.go b/itests/framework/framework.go index f7bb51525..ac96e5e57 100644 --- a/itests/framework/framework.go +++ b/itests/framework/framework.go @@ -1,10 +1,12 @@ package framework import ( + "bytes" "context" "encoding/json" "errors" "fmt" + "io" "math/rand" "os" "path" @@ -71,10 +73,15 @@ import ( ipldformat "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-car" + carv2 "github.com/ipld/go-car/v2" + storagecar "github.com/ipld/go-car/v2/storage" "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagjson" "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/ipld/go-ipld-prime/traversal" + trustless "github.com/ipld/go-trustless-utils" + traversal "github.com/ipld/go-trustless-utils/traversal" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -798,9 +805,15 @@ func (f *TestFramework) ExtractFileFromCAR(ctx context.Context, t *testing.T, fi return tmpFile } -func (f *TestFramework) Retrieve(ctx context.Context, t *testing.T, tempdir string, root cid.Cid, pieceCid cid.Cid, extractCar bool, selectorNode datamodel.Node) string { +func (f *TestFramework) Retrieve(ctx context.Context, t *testing.T, request trustless.Request, extractCar bool) string { + tempdir := t.TempDir() + + var out string + retPath := path.Join(tempdir, "retrievals") + require.NoError(t, os.Mkdir(retPath, 0755)) + clientPath := path.Join(tempdir, "client") - _ = os.Mkdir(clientPath, 0755) + require.NoError(t, os.Mkdir(clientPath, 0755)) clientNode, err := clinode.Setup(clientPath) require.NoError(t, err) @@ -812,8 +825,7 @@ func (f *TestFramework) Retrieve(ctx context.Context, t *testing.T, tempdir stri bstore := blockstore.NewBlockstore(bstoreDatastore, blockstore.NoPrefix()) require.NoError(t, err) - //ds, err := levelds.NewDatastore(path.Join(clientPath, "dstore"), nil) - ds, err := levelds.NewDatastore("", nil) + ds, err := levelds.NewDatastore(path.Join(clientPath, "dstore"), nil) require.NoError(t, err) // Create the retrieval client @@ -823,11 +835,18 @@ func (f *TestFramework) Retrieve(ctx context.Context, t *testing.T, tempdir stri baddrs, err := f.Boost.NetAddrsListen(ctx) require.NoError(t, err) - query, err := RetrievalQuery(ctx, t, clientNode, &baddrs, pieceCid) + // Query the remote to find out the retrieval parameters + query, err := RetrievalQuery(ctx, t, clientNode, &baddrs, request.Root) + require.NoError(t, err) + + // Create a matching proposal for the query + proposal, err := rc.RetrievalProposalForAsk(query, request.Root, request.Selector()) require.NoError(t, err) - proposal, err := rc.RetrievalProposalForAsk(query, root, selectorNode) + // Let's see the selector we're working with + encoded, err := ipld.Encode(request.Selector(), dagjson.Encode) require.NoError(t, err) + t.Logf("Retrieving with selector: %s", string(encoded)) // Retrieve the data _, err = fc.RetrieveContentWithProgressCallback( @@ -840,56 +859,52 @@ func (f *TestFramework) Retrieve(ctx context.Context, t *testing.T, tempdir stri ) require.NoError(t, err) + // Validate the data + dservOffline := dag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore))) + lsys := utils.CreateLinkSystem(dservOffline) - // if we used a selector - need to find the sub-root the user actually wanted to retrieve - if selectorNode != nil { - if !selectorNode.IsNull() { - var subRootFound bool - err := utils.TraverseDag( - ctx, - dservOffline, - root, - selectorNode, - func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { - if r == traversal.VisitReason_SelectionMatch { - - require.Equal(t, p.LastBlock.Path.String(), p.Path.String()) - - cidLnk, castOK := p.LastBlock.Link.(cidlink.Link) - require.True(t, castOK) - - root = cidLnk.Cid - subRootFound = true - } - return nil - }, - ) - require.NoError(t, err) - require.True(t, subRootFound) + if !extractCar { + // If the caller wants a CAR, we create it and then when we run our check traversal over the DAG + // each load will trigger a write to the CAR + file, err := os.CreateTemp(retPath, "*"+request.Root.String()+".car") + require.NoError(t, err) + out = file.Name() + storage, err := storagecar.NewWritable(file, []cid.Cid{request.Root}, carv2.WriteAsCarV1(true)) + require.NoError(t, err) + sro := lsys.StorageReadOpener + lsys.StorageReadOpener = func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { + r, err := sro(lc, l) + if err != nil { + return nil, err + } + buf, err := io.ReadAll(r) + if err != nil { + return nil, err + } + if err := storage.Put(lc.Ctx, l.(cidlink.Link).Cid.KeyString(), buf); err != nil { + return nil, err + } + return bytes.NewReader(buf), nil } } - dnode, err := dservOffline.Get(ctx, root) + // Check that we got what we expected by executing the same selector over our + // retrieved DAG + _, err = traversal.Config{ + Root: request.Root, + Selector: request.Selector(), + }.Traverse(ctx, lsys, nil) require.NoError(t, err) - var out string - retPath := path.Join(tempdir, "retrievals") - _ = os.Mkdir(retPath, 0755) - - if !extractCar { - // Write file as car file - file, err := os.CreateTemp(retPath, "*"+root.String()+".car") - require.NoError(t, err) - out = file.Name() - err = car.WriteCar(ctx, dservOffline, []cid.Cid{root}, file) + if extractCar { + // Caller doesn't want the raw blocks, so extract the file as UnixFS and + // assume that we've fetched the right blocks to be able to do this. + dnode, err := dservOffline.Get(ctx, request.Root) require.NoError(t, err) - - } else { - // Otherwise write file as UnixFS File ufsFile, err := unixfile.NewUnixfsFile(ctx, dservOffline, dnode) require.NoError(t, err) - file, err := os.CreateTemp(retPath, "*"+root.String()) + file, err := os.CreateTemp(retPath, "*"+request.Root.String()) require.NoError(t, err) err = file.Close() require.NoError(t, err) diff --git a/itests/graphsync_identity_cid_test.go b/itests/graphsync_identity_cid_test.go index daa6b9ca7..a0e9a80d1 100644 --- a/itests/graphsync_identity_cid_test.go +++ b/itests/graphsync_identity_cid_test.go @@ -24,7 +24,7 @@ import ( "github.com/ipld/go-ipld-prime/fluent/qp" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/node/basicnode" - selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" + trustlessutils "github.com/ipld/go-trustless-utils" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" ) @@ -110,7 +110,6 @@ func TestDealAndRetrievalWithIdentityCID(t *testing.T) { log.Debugw("got response from MarketDummyDeal", "res", spew.Sdump(res)) dealCid, err := res.DealParams.ClientDealProposal.Proposal.Cid() require.NoError(t, err) - pieceCid := res.DealParams.ClientDealProposal.Proposal.PieceCID log.Infof("deal ID is : %s", dealCid.String()) // Wait for the first deal to be added to a sector and cleaned up so space is made err = f.WaitForDealAddedToSector(dealUuid) @@ -121,7 +120,7 @@ func TestDealAndRetrievalWithIdentityCID(t *testing.T) { // Deal is stored and sealed, attempt different retrieval forms log.Debugw("deal is sealed, starting retrieval", "cid", dealCid.String(), "root", root.String()) - outPath := f.Retrieve(ctx, t, tempdir, root, pieceCid, false, selectorparse.CommonSelector_ExploreAllRecursively) + outPath := f.Retrieve(ctx, t, trustlessutils.Request{Root: root, Scope: trustlessutils.DagScopeAll}, false) // Inspect what we got gotCids, err := testutil.CidsInCar(outPath) diff --git a/itests/graphsync_retrieval_test.go b/itests/graphsync_retrieval_test.go index 8e1810687..5d3c59506 100644 --- a/itests/graphsync_retrieval_test.go +++ b/itests/graphsync_retrieval_test.go @@ -2,7 +2,6 @@ package itests import ( "context" - "math" "path/filepath" "testing" "time" @@ -14,10 +13,7 @@ import ( "github.com/filecoin-project/lotus/itests/kit" "github.com/google/uuid" "github.com/ipfs/go-cid" - "github.com/ipfs/go-unixfsnode" - "github.com/ipld/go-ipld-prime/datamodel" - "github.com/ipld/go-ipld-prime/node/basicnode" - "github.com/ipld/go-ipld-prime/traversal/selector/builder" + trustless "github.com/ipld/go-trustless-utils" "github.com/stretchr/testify/require" ) @@ -114,51 +110,71 @@ func TestDealRetrieval(t *testing.T) { // Deal is stored and sealed, attempt different retrieval forms retrievalCases := []struct { - name string - selector datamodel.Node - matcherFrom, matcherTo int64 - expectCids []cid.Cid + name string + request trustless.Request + expectCids []cid.Cid }{ { - name: "full file, explore-all", - selector: unixfsnode.UnixFSPathSelectorBuilder("", unixfsnode.ExploreAllRecursivelySelector, false), + name: "full file, explore-all", + request: trustless.Request{ + Root: root, + Scope: trustless.DagScopeAll, + }, expectCids: append([]cid.Cid{root}, leaves...), }, { - name: "slice: 0 to 7MiB", - matcherFrom: 0, - matcherTo: 7 << 20, - expectCids: append([]cid.Cid{root}, leaves...), + name: "slice: 0 to 7MiB", + request: trustless.Request{ + Root: root, + Scope: trustless.DagScopeEntity, + Bytes: &trustless.ByteRange{From: 0, To: ptrInt(7 << 20)}, + }, + expectCids: append([]cid.Cid{root}, leaves...), }, { - name: "slice: 1MiB to 2MiB", - matcherFrom: 1 << 20, - matcherTo: 2 << 20, - expectCids: append([]cid.Cid{root}, leaves[4:9]...), + name: "slice: 1MiB to 2MiB", + request: trustless.Request{ + Root: root, + Scope: trustless.DagScopeEntity, + Bytes: &trustless.ByteRange{From: 1 << 20, To: ptrInt(2 << 20)}, + }, + expectCids: append([]cid.Cid{root}, leaves[4:9]...), }, { - name: "slice: first byte", - matcherFrom: 0, - matcherTo: 1, - expectCids: append([]cid.Cid{root}, leaves[0]), + name: "slice: first byte", + request: trustless.Request{ + Root: root, + Scope: trustless.DagScopeEntity, + Bytes: &trustless.ByteRange{From: 0, To: ptrInt(1)}, + }, + expectCids: append([]cid.Cid{root}, leaves[0]), }, { - name: "slice: last byte", - matcherFrom: 7340031, - matcherTo: 7340032, - expectCids: append([]cid.Cid{root}, leaves[len(leaves)-1]), + name: "slice: last byte", + request: trustless.Request{ + Root: root, + Scope: trustless.DagScopeEntity, + Bytes: &trustless.ByteRange{From: 7340031, To: ptrInt(7340032)}, + }, + expectCids: append([]cid.Cid{root}, leaves[len(leaves)-1]), }, { - name: "slice: last two blocks, negative range, boundary", - matcherFrom: -168000 - 1, - matcherTo: math.MaxInt64, - expectCids: append([]cid.Cid{root}, leaves[len(leaves)-2:]...), + name: "slice: last two blocks, negative range, boundary", + request: trustless.Request{ + Root: root, + Scope: trustless.DagScopeEntity, + Bytes: &trustless.ByteRange{From: -168000 - 1}, + }, + expectCids: append([]cid.Cid{root}, leaves[len(leaves)-2:]...), }, { - name: "slice: last block, negative range, boundary", - matcherFrom: -168000, - matcherTo: math.MaxInt64, - expectCids: append([]cid.Cid{root}, leaves[len(leaves)-1]), + name: "slice: last block, negative range, boundary", + request: trustless.Request{ + Root: root, + Scope: trustless.DagScopeEntity, + Bytes: &trustless.ByteRange{From: -168000}, + }, + expectCids: append([]cid.Cid{root}, leaves[len(leaves)-1]), }, { // In this case we are attempting to traverse beyond the file to a @@ -166,24 +182,26 @@ func TestDealRetrieval(t *testing.T) { // return that. This is not strictly an error case, it's up to the // consumer of this data to verify the path doesn't resolve in the // data they get back. - name: "path beyond file", - selector: unixfsnode.UnixFSPathSelectorBuilder("not/a/path", unixfsnode.ExploreAllRecursivelySelector, false), + name: "path beyond file", + request: trustless.Request{ + Root: root, + Scope: trustless.DagScopeAll, + Path: "not/a/path", + }, expectCids: []cid.Cid{root}, }, } for _, tc := range retrievalCases { t.Run(tc.name, func(t *testing.T) { - selNode := tc.selector - if selNode == nil { - // build a selector from the specified slice matcher range - ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) - ss := ssb.ExploreInterpretAs("unixfs", ssb.MatcherSubset(tc.matcherFrom, tc.matcherTo)) - selNode = ss.Node() - } - log.Debugw("deal is sealed, starting retrieval", "cid", dealCid.String(), "root", root) - outPath := f.Retrieve(ctx, t, tempdir, root, res.DealParams.ClientDealProposal.Proposal.PieceCID, false, selNode) + + outPath := f.Retrieve( + ctx, + t, + tc.request, + false, + ) // Inspect what we got gotCids, err := testutil.CidsInCar(outPath) @@ -202,3 +220,7 @@ func TestDealRetrieval(t *testing.T) { }) } } + +func ptrInt(i int64) *int64 { + return &i +} diff --git a/itests/multiminer_retrieval_graphsync_test.go b/itests/multiminer_retrieval_graphsync_test.go index c79b9f8be..81633074b 100644 --- a/itests/multiminer_retrieval_graphsync_test.go +++ b/itests/multiminer_retrieval_graphsync_test.go @@ -6,6 +6,7 @@ import ( "github.com/filecoin-project/boost/itests/shared" "github.com/filecoin-project/lotus/itests/kit" + trustlessutils "github.com/ipld/go-trustless-utils" ) func TestMultiMinerRetrievalGraphsync(t *testing.T) { @@ -17,7 +18,12 @@ func TestMultiMinerRetrievalGraphsync(t *testing.T) { // - recognize that the deal is for a sector on the first miner // - read the data for the deal from the first miner t.Logf("deal is added to piece, starting retrieval of root %s", rt.RootCid) - outPath := rt.BoostAndMiner2.Retrieve(ctx, t, rt.TempDir, rt.RootCid, rt.PieceCid, true, nil) + outPath := rt.BoostAndMiner2.Retrieve( + ctx, + t, + trustlessutils.Request{Root: rt.RootCid, Scope: trustlessutils.DagScopeAll}, + true, + ) t.Logf("retrieval is done, compare in- and out- files in: %s, out: %s", rt.SampleFilePath, outPath) kit.AssertFilesEqual(t, rt.SampleFilePath, outPath) diff --git a/markets/utils/selectors.go b/markets/utils/selectors.go index e1009d1ff..ec6a0426d 100644 --- a/markets/utils/selectors.go +++ b/markets/utils/selectors.go @@ -7,20 +7,41 @@ import ( "io" // must be imported to init() raw-codec support + dagpb "github.com/ipld/go-codec-dagpb" _ "github.com/ipld/go-ipld-prime/codec/raw" + "github.com/ipld/go-ipld-prime/linking" + "github.com/ipld/go-ipld-prime/node/basicnode" "github.com/ipfs/go-cid" mdagipld "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-unixfsnode" - dagpb "github.com/ipld/go-codec-dagpb" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" - basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" ) +func CreateLinkSystem(ds mdagipld.DAGService) linking.LinkSystem { + // this is how we implement GETs + linkSystem := cidlink.DefaultLinkSystem() + linkSystem.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { + cl, isCid := lnk.(cidlink.Link) + if !isCid { + return nil, fmt.Errorf("unexpected link type %#v", lnk) + } + + node, err := ds.Get(lctx.Ctx, cl.Cid) + if err != nil { + return nil, err + } + + return bytes.NewBuffer(node.RawData()), nil + } + unixfsnode.AddUnixFSReificationToLinkSystem(&linkSystem) + return linkSystem +} + func TraverseDag( ctx context.Context, ds mdagipld.DAGService, @@ -38,9 +59,8 @@ func TraverseDag( return err } - // not sure what this is for TBH: we also provide ctx in &traversal.Config{} linkContext := ipld.LinkContext{Ctx: ctx} - + linkSystem := CreateLinkSystem(ds) // this is what allows us to understand dagpb nodePrototypeChooser := dagpb.AddSupportToChooser( func(ipld.Link, ipld.LinkContext) (ipld.NodePrototype, error) { @@ -48,23 +68,6 @@ func TraverseDag( }, ) - // this is how we implement GETs - linkSystem := cidlink.DefaultLinkSystem() - linkSystem.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { - cl, isCid := lnk.(cidlink.Link) - if !isCid { - return nil, fmt.Errorf("unexpected link type %#v", lnk) - } - - node, err := ds.Get(lctx.Ctx, cl.Cid) - if err != nil { - return nil, err - } - - return bytes.NewBuffer(node.RawData()), nil - } - unixfsnode.AddUnixFSReificationToLinkSystem(&linkSystem) - // this is how we pull the start node out of the DS startLink := cidlink.Link{Cid: startFrom} startNodePrototype, err := nodePrototypeChooser(startLink, linkContext) diff --git a/retrievalmarket/client/client.go b/retrievalmarket/client/client.go index 78ca1f28e..d9aa1fa10 100644 --- a/retrievalmarket/client/client.go +++ b/retrievalmarket/client/client.go @@ -31,6 +31,7 @@ import ( "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagcbor" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/libp2p/go-libp2p/core/host" inet "github.com/libp2p/go-libp2p/core/network" @@ -537,7 +538,15 @@ func (c *Client) retrieveContentFromPeerWithProgressCallback( defer unsubscribe() // Submit the retrieval deal proposal to the miner - newchid, err := c.dataTransfer.OpenPullDataChannel(ctx, peerID, proposal, proposal.PayloadCID, selectorparse.CommonSelector_ExploreAllRecursively) + selector := selectorparse.CommonSelector_ExploreAllRecursively + if proposal.SelectorSpecified() { + var err error + selector, err = ipld.Decode(proposal.Selector.Raw, dagcbor.Decode) + if err != nil { + return nil, fmt.Errorf("failed to decode selector from proposal: %w", err) + } + } + newchid, err := c.dataTransfer.OpenPullDataChannel(ctx, peerID, proposal, proposal.PayloadCID, selector) if err != nil { // We could fail before a successful proposal // publish event failure