From 74770652a2287e4d6cf48681c5ccc5830e5d0942 Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Thu, 30 Nov 2023 11:49:38 +0000 Subject: [PATCH 1/5] experiment --- itests/framework/framework.go | 24 ++++++++---- piecedirectory/piecedirectory.go | 66 ++++++++++++++++++++++++++++++-- 2 files changed, 79 insertions(+), 11 deletions(-) diff --git a/itests/framework/framework.go b/itests/framework/framework.go index 70acf1b61..f7bb624ff 100644 --- a/itests/framework/framework.go +++ b/itests/framework/framework.go @@ -77,9 +77,10 @@ import ( var Log = logging.Logger("boosttest") type TestFrameworkConfig struct { - Ensemble *kit.Ensemble - EnableLegacy bool - MaxStagingBytes int64 + Ensemble *kit.Ensemble + EnableLegacy bool + MaxStagingBytes int64 + ProvisionalWalletBalances int64 } type TestFramework struct { @@ -117,8 +118,17 @@ func WithEnsemble(e *kit.Ensemble) FrameworkOpts { } } +func SetProvisionalWalletBalances(balance int64) FrameworkOpts { + return func(tmc *TestFrameworkConfig) { + tmc.ProvisionalWalletBalances = balance + } +} + func NewTestFramework(ctx context.Context, t *testing.T, opts ...FrameworkOpts) *TestFramework { - fmc := &TestFrameworkConfig{} + fmc := &TestFrameworkConfig{ + // default provisional balance + ProvisionalWalletBalances: 1e18, + } for _, opt := range opts { opt(fmc) } @@ -224,7 +234,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { clientAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, clientAddr, amt) Log.Infof("Created client wallet %s with %d attoFil", clientAddr, amt) wg.Done() @@ -239,7 +249,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { Log.Info("Creating publish storage deals wallet") psdWalletAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, psdWalletAddr, amt) Log.Infof("Created publish storage deals wallet %s with %d attoFil", psdWalletAddr, amt) wg.Done() @@ -248,7 +258,7 @@ func (f *TestFramework) Start(opts ...ConfigOpt) error { Log.Info("Creating deal collateral wallet") dealCollatAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - amt := abi.NewTokenAmount(1e18) + amt := abi.NewTokenAmount(f.config.ProvisionalWalletBalances) _ = sendFunds(f.ctx, fullnodeApi, dealCollatAddr, amt) Log.Infof("Created deal collateral wallet %s with %d attoFil", dealCollatAddr, amt) wg.Done() diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index c10249f1a..ada3b4890 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -7,7 +7,10 @@ import ( "errors" "fmt" "io" + "os" + "strconv" "sync" + "sync/atomic" "time" carutil "github.com/filecoin-project/boost/car" @@ -384,13 +387,51 @@ func parseRecordsFromCar(reader io.Reader) ([]model.Record, error) { return recs, nil } +type countingReader struct { + io.Reader + + cnt *int32 +} + +func (cr *countingReader) Read(p []byte) (n int, err error) { + ncnt := atomic.AddInt32(cr.cnt, 1) + if ncnt%10000 == 0 { + log.Infof("podsi: performed %d read operations", ncnt) + } + return cr.Reader.Read(p) +} + func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { + ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() dsis := datasegment.DataSegmentIndexStartOffset(ps) + + var readsCnt int32 + cr := &countingReader{ + Reader: r, + cnt: &readsCnt, + } + + useBufferedReader := os.Getenv("PODSI_USE_BUFFERED_READER") == "true" + bufferSize, err := strconv.Atoi(os.Getenv("PODSI_BUFFER_SIZE")) + if err != nil { + bufferSize = int(4e6) + } + log.Infow("podsi: ", "userBufferedReader", useBufferedReader, "bufferSize", bufferSize) + if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { return nil, fmt.Errorf("could not seek to data segment index: %w", err) } - dataSegments, err := datasegment.ParseDataSegmentIndex(r) + + var rr io.Reader + if useBufferedReader { + rr = bufio.NewReaderSize(cr, bufferSize) + } else { + rr = cr + } + + start := time.Now() + dataSegments, err := datasegment.ParseDataSegmentIndex(rr) if err != nil { return nil, fmt.Errorf("could not parse data segment index: %w", err) } @@ -401,14 +442,29 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type if len(segments) == 0 { return nil, fmt.Errorf("no data segments found") } - + log.Infow("podsi: parsed and validated data segment index", "reads", readsCnt, "time", time.Since(start).String()) + recs := make([]model.Record, 0) + + readsCnt = 0 + start = time.Now() for _, s := range segments { segOffset := s.UnpaddedOffest() segSize := s.UnpaddedLength() lr := io.NewSectionReader(r, int64(segOffset), int64(segSize)) - subRecs, err := parseRecordsFromCar(lr) + + cr = &countingReader{ + Reader: lr, + cnt: &readsCnt, + } + + if useBufferedReader { + rr = bufio.NewReaderSize(cr, bufferSize) + } else { + rr = cr + } + subRecs, err := parseRecordsFromCar(rr) if err != nil { // revisit when non-car files supported: one corrupt segment shouldn't translate into an error in other segments. return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) @@ -419,6 +475,8 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type recs = append(recs, subRecs...) } + log.Infow("podsi: parsed records from data segments", "reads", readsCnt, "time", time.Since(start).String()) + return recs, nil } @@ -849,7 +907,7 @@ func (ps *PieceDirectory) GetBlockstore(ctx context.Context, pieceCid cid.Cid) ( bsR = carutil.NewMultiReaderAt( bytes.NewReader(headerBuf.Bytes()), // payload (CARv1) header bytes.NewReader(make([]byte, dataOffset)), // padding to account for the CARv2 wrapper - sectionReader, // payload (CARv1) data + sectionReader, // payload (CARv1) data ) } else { bsR = reader From 2f2bac2128bb59eb8854afd652bf7201df6c2dae Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Thu, 30 Nov 2023 12:24:38 +0000 Subject: [PATCH 2/5] add test --- itests/dummydeal_podsi_test.go | 242 +++++++++++++++++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 itests/dummydeal_podsi_test.go diff --git a/itests/dummydeal_podsi_test.go b/itests/dummydeal_podsi_test.go new file mode 100644 index 000000000..6f7d9328e --- /dev/null +++ b/itests/dummydeal_podsi_test.go @@ -0,0 +1,242 @@ +package itests + +import ( + "bytes" + "context" + "fmt" + "io" + "math/bits" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/filecoin-project/boost/itests/framework" + "github.com/filecoin-project/boost/testutil" + "github.com/filecoin-project/go-data-segment/datasegment" + commcid "github.com/filecoin-project/go-fil-commcid" + commp "github.com/filecoin-project/go-fil-commp-hashhash" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" + "github.com/ipfs/go-unixfsnode/data/builder" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/blockstore" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/multiformats/go-multicodec" + multihash "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +func TestDummyPodsiDealOnline(t *testing.T) { + os.Setenv("PODSI_USE_BUFFERED_READER", "true") + os.Setenv("PODSI_BUFFER_SIZE", "10000") + + randomFileSize := int(4e6) + + ctx := context.Background() + log := framework.Log + + kit.QuietMiningLogs() + framework.SetLogLevel() + var opts []framework.FrameworkOpts + opts = append(opts, framework.EnableLegacyDeals(true), framework.SetMaxStagingBytes(10e9), framework.SetProvisionalWalletBalances(9e18)) + f := framework.NewTestFramework(ctx, t, opts...) + err := f.Start() + require.NoError(t, err) + defer f.Stop() + + err = f.AddClientProviderBalance(abi.NewTokenAmount(5e18)) + require.NoError(t, err) + + tempdir := t.TempDir() + log.Debugw("using tempdir", "dir", tempdir) + + // create a random file + randomFilepath, err := testutil.CreateRandomFile(tempdir, 5, randomFileSize) + require.NoError(t, err) + + carFile := filepath.Join(tempdir, "test.car") + dataSegmentFile := filepath.Join(tempdir, "datasegment.dat") + + // pack it into the car + rootCid, err := createCar(t, carFile, []string{randomFilepath}) + require.NoError(t, err) + + // pack the car into data segement piece twice so that we have two segments + makeDataSegmentPiece(t, dataSegmentFile, []string{carFile, carFile}) + + // Start a web server to serve the car files + log.Debug("starting webserver") + server, err := testutil.HttpTestFileServer(t, tempdir) + require.NoError(t, err) + defer server.Close() + + // Create a new dummy deal + log.Debug("creating dummy deal") + dealUuid := uuid.New() + + // Make a deal + res, err := f.MakeDummyDeal(dealUuid, dataSegmentFile, rootCid, server.URL+"/"+filepath.Base(dataSegmentFile), false) + require.NoError(t, err) + require.True(t, res.Result.Accepted) + log.Debugw("got response from MarketDummyDeal", "res", spew.Sdump(res)) + + time.Sleep(2 * time.Second) + + // Wait for the first deal to be added to a sector and cleaned up so space is made + err = f.WaitForDealAddedToSector(dealUuid) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + +} + +func makeDataSegmentPiece(t *testing.T, dataSegmentFile string, subPieces []string) { + readers := make([]io.Reader, 0) + deals := make([]abi.PieceInfo, 0) + for _, sp := range subPieces { + arg, err := os.Open(sp) + require.NoError(t, err) + + readers = append(readers, arg) + cp := new(commp.Calc) + io.Copy(cp, arg) + rawCommP, size, err := cp.Digest() + require.NoError(t, err) + + arg.Seek(0, io.SeekStart) + c, _ := commcid.DataCommitmentV1ToCID(rawCommP) + subdeal := abi.PieceInfo{ + Size: abi.PaddedPieceSize(size), + PieceCID: c, + } + deals = append(deals, subdeal) + } + require.NotEqual(t, 0, len(deals)) + + _, size, err := datasegment.ComputeDealPlacement(deals) + require.NoError(t, err) + + overallSize := abi.PaddedPieceSize(size) + // we need to make this the 'next' power of 2 in order to have space for the index + next := 1 << (64 - bits.LeadingZeros64(uint64(overallSize+256))) + + a, err := datasegment.NewAggregate(abi.PaddedPieceSize(next), deals) + require.NoError(t, err) + out, err := a.AggregateObjectReader(readers) + require.NoError(t, err) + + // open output file + fo, err := os.Create(dataSegmentFile) + require.NoError(t, err) + defer fo.Close() + + written, err := io.Copy(fo, out) + require.NoError(t, err) + require.NotZero(t, written) +} + +func createCar(t *testing.T, carFile string, files []string) (cid.Cid, error) { + // make a cid with the right length that we eventually will patch with the root. + hasher, err := multihash.GetHasher(multihash.SHA2_256) + if err != nil { + return cid.Undef, err + } + digest := hasher.Sum([]byte{}) + hash, err := multihash.Encode(digest, multihash.SHA2_256) + if err != nil { + return cid.Undef, err + } + proxyRoot := cid.NewCidV1(uint64(multicodec.DagPb), hash) + + options := []car.Option{} + + cdest, err := blockstore.OpenReadWrite(carFile, []cid.Cid{proxyRoot}, options...) + + if err != nil { + return cid.Undef, err + } + + // Write the unixfs blocks into the store. + root, err := writeFiles(context.Background(), false, cdest, files...) + if err != nil { + return cid.Undef, err + } + + if err := cdest.Finalize(); err != nil { + return cid.Undef, err + } + // re-open/finalize with the final root. + return root, car.ReplaceRootsInFile(carFile, []cid.Cid{root}) +} + +func writeFiles(ctx context.Context, noWrap bool, bs *blockstore.ReadWrite, paths ...string) (cid.Cid, error) { + ls := cidlink.DefaultLinkSystem() + ls.TrustedStorage = true + ls.StorageReadOpener = func(_ ipld.LinkContext, l ipld.Link) (io.Reader, error) { + cl, ok := l.(cidlink.Link) + if !ok { + return nil, fmt.Errorf("not a cidlink") + } + blk, err := bs.Get(ctx, cl.Cid) + if err != nil { + return nil, err + } + return bytes.NewBuffer(blk.RawData()), nil + } + ls.StorageWriteOpener = func(_ ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) { + buf := bytes.NewBuffer(nil) + return buf, func(l ipld.Link) error { + cl, ok := l.(cidlink.Link) + if !ok { + return fmt.Errorf("not a cidlink") + } + blk, err := blocks.NewBlockWithCid(buf.Bytes(), cl.Cid) + if err != nil { + return err + } + bs.Put(ctx, blk) + return nil + }, nil + } + + topLevel := make([]dagpb.PBLink, 0, len(paths)) + for _, p := range paths { + l, size, err := builder.BuildUnixFSRecursive(p, &ls) + if err != nil { + return cid.Undef, err + } + if noWrap { + rcl, ok := l.(cidlink.Link) + if !ok { + return cid.Undef, fmt.Errorf("could not interpret %s", l) + } + return rcl.Cid, nil + } + name := path.Base(p) + entry, err := builder.BuildUnixFSDirectoryEntry(name, int64(size), l) + if err != nil { + return cid.Undef, err + } + topLevel = append(topLevel, entry) + } + + // make a directory for the file(s). + + root, _, err := builder.BuildUnixFSDirectory(topLevel, &ls) + if err != nil { + return cid.Undef, nil + } + rcl, ok := root.(cidlink.Link) + if !ok { + return cid.Undef, fmt.Errorf("could not interpret %s", root) + } + + return rcl.Cid, nil +} From 7d0595b028cabe6739bc093c75bc72cac5f9906e Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Thu, 30 Nov 2023 17:33:23 +0000 Subject: [PATCH 3/5] parallelise entries validation --- itests/dummydeal_podsi_test.go | 3 +- piecedirectory/piecedirectory.go | 106 +++++++++++++++++++++++++------ 2 files changed, 87 insertions(+), 22 deletions(-) diff --git a/itests/dummydeal_podsi_test.go b/itests/dummydeal_podsi_test.go index 6f7d9328e..c343650a8 100644 --- a/itests/dummydeal_podsi_test.go +++ b/itests/dummydeal_podsi_test.go @@ -37,8 +37,9 @@ import ( func TestDummyPodsiDealOnline(t *testing.T) { os.Setenv("PODSI_USE_BUFFERED_READER", "true") os.Setenv("PODSI_BUFFER_SIZE", "10000") + os.Setenv("PODSI_VALIDATION_CONCURRENCY", "4") - randomFileSize := int(4e6) + randomFileSize := int(1e6) ctx := context.Background() log := framework.Log diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index ada3b4890..fea476bfc 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "os" + "runtime" "strconv" "sync" "sync/atomic" @@ -40,6 +41,7 @@ import ( mh "github.com/multiformats/go-multihash" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" ) var log = logging.Logger("piecedirectory") @@ -403,52 +405,98 @@ func (cr *countingReader) Read(p []byte) (n int, err error) { func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { - ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() - dsis := datasegment.DataSegmentIndexStartOffset(ps) - - var readsCnt int32 - cr := &countingReader{ - Reader: r, - cnt: &readsCnt, - } - useBufferedReader := os.Getenv("PODSI_USE_BUFFERED_READER") == "true" bufferSize, err := strconv.Atoi(os.Getenv("PODSI_BUFFER_SIZE")) if err != nil { bufferSize = int(4e6) } - log.Infow("podsi: ", "userBufferedReader", useBufferedReader, "bufferSize", bufferSize) + concurrency, err := strconv.Atoi(os.Getenv("PODSI_VALIDATION_CONCURRENCY")) + if err != nil { + concurrency = runtime.NumCPU() + } + if concurrency < 1 { + concurrency = 1 + } + if concurrency > 16 { + concurrency = 16 + } + + log.Infow("podsi: ", "userBufferedReader", useBufferedReader, "bufferSize", bufferSize, "validationConcurrency", concurrency) + start := time.Now() + ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() + dsis := datasegment.DataSegmentIndexStartOffset(ps) if _, err := r.Seek(int64(dsis), io.SeekStart); err != nil { return nil, fmt.Errorf("could not seek to data segment index: %w", err) } var rr io.Reader + var readsCnt int32 + cr := &countingReader{ + Reader: r, + cnt: &readsCnt, + } if useBufferedReader { rr = bufio.NewReaderSize(cr, bufferSize) } else { rr = cr } - start := time.Now() - dataSegments, err := datasegment.ParseDataSegmentIndex(rr) + indexData, err := datasegment.ParseDataSegmentIndex(rr) if err != nil { return nil, fmt.Errorf("could not parse data segment index: %w", err) } - segments, err := dataSegments.ValidEntries() - if err != nil { + + log.Infow("podsi: parsed data segment index", "segments", len(indexData.Entries), "reads", readsCnt, "time", time.Since(start).String()) + start = time.Now() + + if len(indexData.Entries) < concurrency { + concurrency = len(indexData.Entries) + } + + chunkSize := len(indexData.Entries) / concurrency + results := make([][]datasegment.SegmentDesc, concurrency) + + var eg errgroup.Group + for i := 0; i < concurrency; i++ { + i := i + eg.Go(func() error { + start := i * chunkSize + end := start + chunkSize + if i == concurrency-1 { + end = len(indexData.Entries) + } + + res, err := validateEntries(indexData.Entries[start:end]) + if err != nil { + return err + } + + results[i] = res + + return nil + }) + } + + if err := eg.Wait(); err != nil { return nil, fmt.Errorf("could not calculate valid entries: %w", err) } - if len(segments) == 0 { - return nil, fmt.Errorf("no data segments found") + + validSegments := make([]datasegment.SegmentDesc, 0, len(indexData.Entries)) + for _, res := range results { + validSegments = append(validSegments, res...) } - log.Infow("podsi: parsed and validated data segment index", "reads", readsCnt, "time", time.Since(start).String()) - recs := make([]model.Record, 0) + if len(validSegments) == 0 { + return nil, fmt.Errorf("no data segments found") + } - readsCnt = 0 + log.Infow("podsi: validated data segment index", "validSegments", len(validSegments), "time", time.Since(start).String()) start = time.Now() - for _, s := range segments { + readsCnt = 0 + + recs := make([]model.Record, 0) + for _, s := range validSegments { segOffset := s.UnpaddedOffest() segSize := s.UnpaddedLength() @@ -475,11 +523,27 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type recs = append(recs, subRecs...) } - log.Infow("podsi: parsed records from data segments", "reads", readsCnt, "time", time.Since(start).String()) + log.Infow("podsi: parsed records from data segments", "recs", len(recs), "reads", readsCnt, "time", time.Since(start).String()) return recs, nil } +func validateEntries(entries []datasegment.SegmentDesc) ([]datasegment.SegmentDesc, error) { + res := make([]datasegment.SegmentDesc, 0, len(entries)) + for i, e := range entries { + + if err := e.Validate(); err != nil { + if errors.Is(err, datasegment.ErrValidation) { + continue + } else { + return nil, xerrors.Errorf("got unknown error for entry %d: %w", i, err) + } + } + res = append(res, e) + } + return res, nil +} + // BuildIndexForPiece builds indexes for a given piece CID. The piece must contain a valid deal // corresponding to an unsealed sector for this method to work. It will try to build index // using all available deals and will exit as soon as it succeeds for one of the deals From 39b5be542c0dd8e81ca2b128ae1e0193ea522cef Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Fri, 1 Dec 2023 09:04:26 +0000 Subject: [PATCH 4/5] brush up code --- itests/dummydeal_podsi_test.go | 4 --- piecedirectory/piecedirectory.go | 56 ++++++++++---------------------- 2 files changed, 18 insertions(+), 42 deletions(-) diff --git a/itests/dummydeal_podsi_test.go b/itests/dummydeal_podsi_test.go index c343650a8..f74bc000e 100644 --- a/itests/dummydeal_podsi_test.go +++ b/itests/dummydeal_podsi_test.go @@ -35,10 +35,6 @@ import ( ) func TestDummyPodsiDealOnline(t *testing.T) { - os.Setenv("PODSI_USE_BUFFERED_READER", "true") - os.Setenv("PODSI_BUFFER_SIZE", "10000") - os.Setenv("PODSI_VALIDATION_CONCURRENCY", "4") - randomFileSize := int(1e6) ctx := context.Background() diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index fea476bfc..2f3c0486a 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -7,9 +7,7 @@ import ( "errors" "fmt" "io" - "os" "runtime" - "strconv" "sync" "sync/atomic" "time" @@ -48,6 +46,12 @@ var log = logging.Logger("piecedirectory") const ( MaxCachedReaders = 128 + // 20 MiB x 4 parallel deals is just 80MiB RAM overhead required + PodsiBuffesrSize = 20e6 + // Concurrency is driven by the number of available cores. Set reasonable max and mins + // to support multiple concurrent AddIndex operations + PodsiMaxConcurrency = 32 + PodsiMinConcurrency = 4 ) type settings struct { @@ -396,32 +400,20 @@ type countingReader struct { } func (cr *countingReader) Read(p []byte) (n int, err error) { - ncnt := atomic.AddInt32(cr.cnt, 1) - if ncnt%10000 == 0 { - log.Infof("podsi: performed %d read operations", ncnt) - } + atomic.AddInt32(cr.cnt, 1) return cr.Reader.Read(p) } func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r types.SectionReader) ([]model.Record, error) { - - useBufferedReader := os.Getenv("PODSI_USE_BUFFERED_READER") == "true" - bufferSize, err := strconv.Atoi(os.Getenv("PODSI_BUFFER_SIZE")) - if err != nil { - bufferSize = int(4e6) - } - concurrency, err := strconv.Atoi(os.Getenv("PODSI_VALIDATION_CONCURRENCY")) - if err != nil { - concurrency = runtime.NumCPU() - } - if concurrency < 1 { - concurrency = 1 + concurrency := runtime.NumCPU() + if concurrency < PodsiMinConcurrency { + concurrency = PodsiMinConcurrency } - if concurrency > 16 { - concurrency = 16 + if concurrency > PodsiMaxConcurrency { + concurrency = PodsiMaxConcurrency } - log.Infow("podsi: ", "userBufferedReader", useBufferedReader, "bufferSize", bufferSize, "validationConcurrency", concurrency) + log.Debugw("podsi: ", "bufferSize", PodsiBuffesrSize, "validationConcurrency", concurrency) start := time.Now() ps := abi.UnpaddedPieceSize(unpaddedSize).Padded() @@ -430,24 +422,17 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type return nil, fmt.Errorf("could not seek to data segment index: %w", err) } - var rr io.Reader var readsCnt int32 cr := &countingReader{ Reader: r, cnt: &readsCnt, } - if useBufferedReader { - rr = bufio.NewReaderSize(cr, bufferSize) - } else { - rr = cr - } - - indexData, err := datasegment.ParseDataSegmentIndex(rr) + indexData, err := datasegment.ParseDataSegmentIndex(bufio.NewReaderSize(cr, PodsiBuffesrSize)) if err != nil { return nil, fmt.Errorf("could not parse data segment index: %w", err) } - log.Infow("podsi: parsed data segment index", "segments", len(indexData.Entries), "reads", readsCnt, "time", time.Since(start).String()) + log.Debugw("podsi: parsed data segment index", "segments", len(indexData.Entries), "reads", readsCnt, "time", time.Since(start).String()) start = time.Now() if len(indexData.Entries) < concurrency { @@ -491,7 +476,7 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type return nil, fmt.Errorf("no data segments found") } - log.Infow("podsi: validated data segment index", "validSegments", len(validSegments), "time", time.Since(start).String()) + log.Debugw("podsi: validated data segment index", "validSegments", len(validSegments), "time", time.Since(start).String()) start = time.Now() readsCnt = 0 @@ -507,12 +492,7 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type cnt: &readsCnt, } - if useBufferedReader { - rr = bufio.NewReaderSize(cr, bufferSize) - } else { - rr = cr - } - subRecs, err := parseRecordsFromCar(rr) + subRecs, err := parseRecordsFromCar(bufio.NewReaderSize(cr, PodsiBuffesrSize)) if err != nil { // revisit when non-car files supported: one corrupt segment shouldn't translate into an error in other segments. return nil, fmt.Errorf("could not parse data segment #%d at offset %d: %w", len(recs), segOffset, err) @@ -523,7 +503,7 @@ func parsePieceWithDataSegmentIndex(pieceCid cid.Cid, unpaddedSize int64, r type recs = append(recs, subRecs...) } - log.Infow("podsi: parsed records from data segments", "recs", len(recs), "reads", readsCnt, "time", time.Since(start).String()) + log.Debugw("podsi: parsed records from data segments", "recs", len(recs), "reads", readsCnt, "time", time.Since(start).String()) return recs, nil } From c5c68bdc76f37526a08117dae17e66deec62c6ea Mon Sep 17 00:00:00 2001 From: Ivan Schasny Date: Fri, 1 Dec 2023 09:05:43 +0000 Subject: [PATCH 5/5] mod tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 3d51753f8..c6edf2f6c 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/ipfs/go-ipfs-files v0.3.0 // indirect github.com/ipfs/go-ipld-format v0.6.0 github.com/ipfs/go-ipld-legacy v0.2.1 - github.com/ipfs/go-libipfs v0.7.0 // indirect + github.com/ipfs/go-libipfs v0.7.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipfs/go-merkledag v0.11.0 // indirect github.com/ipfs/go-metrics-interface v0.0.1