From a71b69dfcdbcac9ff299ba12f8d1ebadbfc878d3 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Mon, 6 Jan 2025 16:07:07 +1100 Subject: [PATCH] feat(shed): actor state diff stats --- cmd/lotus-shed/block-matcher.go | 270 ++++++++++++++++++++++++++++++++ cmd/lotus-shed/state-stats.go | 211 ++++++++++++++++++++++--- 2 files changed, 460 insertions(+), 21 deletions(-) create mode 100644 cmd/lotus-shed/block-matcher.go diff --git a/cmd/lotus-shed/block-matcher.go b/cmd/lotus-shed/block-matcher.go new file mode 100644 index 0000000000..14b3415bb9 --- /dev/null +++ b/cmd/lotus-shed/block-matcher.go @@ -0,0 +1,270 @@ +package main + +import ( + "bytes" + "context" + "fmt" + + cbor "github.com/ipfs/go-ipld-cbor" + format "github.com/ipfs/go-ipld-format" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/ipld/go-ipld-prime/schema" + schemadmt "github.com/ipld/go-ipld-prime/schema/dmt" + schemadsl "github.com/ipld/go-ipld-prime/schema/dsl" + "github.com/ipld/go-ipld-prime/traversal" + cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-amt-ipld/v4" + "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/go-hamt-ipld/v3" + gstbuiltin "github.com/filecoin-project/go-state-types/builtin" + miner16 "github.com/filecoin-project/go-state-types/builtin/v16/miner" + "github.com/filecoin-project/go-state-types/builtin/v16/util/adt" + + "github.com/filecoin-project/lotus/chain/types" +) + +// matchKnownBlockType attempts to determine the type of a block by inspecting its bytes. First we +// attempt to decode it as part of a HAMT or AMT, and if we get one, we inspect the types of the +// values. Otherwise we attempt to decode it as a known type using matchKnownBlockTypeFromBytes. +func matchKnownBlockType(ctx context.Context, nd format.Node) (string, error) { + if m, err := matchKnownBlockTypeFromBytes(nd.RawData()); err != nil { + return "", err + } else if m != "" { + return m, nil + } + + // block store with just one block in it, for interacting with the hamt and amt libraries + store := cbor.NewMemCborStore() + if err := store.(*cbor.BasicIpldStore).Blocks.Put(ctx, nd); err != nil { + return "", err + } + + // try to load as a HAMT root/node (they are the same thing) + if _, err := hamt.LoadNode(ctx, store, nd.Cid(), append(adt.DefaultHamtOptions, hamt.UseTreeBitWidth(gstbuiltin.DefaultHamtBitwidth))...); err == nil { + // got a HAMT, now inspect it + hamtNode, err := ipld.DecodeUsingPrototype(nd.RawData(), dagcbor.Decode, bindnode.Prototype(nil, knownTypeSystem.TypeByName("HamtNode"))) + if err != nil { + return "", xerrors.Errorf("failed to decode HamtNode: %w", err) + } + typ, err := matchHamtValues(hamtNode) + if err != nil { + return "", err + } + return fmt.Sprintf("HAMTNode{%d}%s", gstbuiltin.DefaultHamtBitwidth, typ), nil + } + + // try to load as an AMT root, we have to try all bitwidths used in the chain + for _, bitwidth := range []uint{2, 3, 4, 5, 6} { + if _, err := amt.LoadAMT(ctx, store, nd.Cid(), append(adt.DefaultAmtOptions, amt.UseTreeBitWidth(bitwidth))...); err == nil { + // got an AMT root, now inspect it + amtRoot, err := ipld.DecodeUsingPrototype(nd.RawData(), dagcbor.Decode, bindnode.Prototype(nil, knownTypeSystem.TypeByName("AMTRoot"))) + if err != nil { + return "", xerrors.Errorf("failed to decode AMTRoot: %w", err) + } + values, err := traversal.Get(amtRoot, datamodel.ParsePath("Node/Values")) + if err != nil { + return "", xerrors.Errorf("failed to get AMTRoot.Node.Values: %w", err) + } + typ, err := matchAmtValues(values) + if err != nil { + return "", err + } + return fmt.Sprintf("AMTRoot{%d}%s", bitwidth, typ), nil + } + } + + // try to load as an AMT intermediate node, which we can't do using the amt package so we'll + // infer by schema + if amtNode, err := ipld.DecodeUsingPrototype(nd.RawData(), dagcbor.Decode, bindnode.Prototype(nil, knownTypeSystem.TypeByName("AMTNode"))); err == nil { + // got an AMT node, now inspect it + values, err := amtNode.LookupByString("Values") + if err != nil { + return "", xerrors.Errorf("failed to get AMTNode.Values: %w", err) + } + typ, err := matchAmtValues(values) + if err != nil { + return "", err + } + return "AMTNode" + typ, nil + } + + return "", nil +} + +// given a datamodel.Node form of the Values array within an AMT node, attempt to determine the +// type of the values by iterating through them all and checking from their bytes. +func matchAmtValues(values datamodel.Node) (string, error) { + var match string + itr := values.ListIterator() + for !itr.Done() { + _, v, err := itr.Next() + if err != nil { + return "", err + } + enc, err := ipld.Encode(v, dagcbor.Encode) + if err != nil { + return "", err + } + if m, _ := matchKnownBlockTypeFromBytes(enc); m != "" { + if match == "" { + match = m + } else if match != m { + return "", xerrors.Errorf("inconsistent types in AMT values") + } + } + } + if match != "" { + return "[" + match + "]", nil + } + return "", nil +} + +// given a datamodel.Node form of a HAMT node, attempt to determine the type of the values, if there +// are any, by iterating through them all and checking from their bytes. +func matchHamtValues(hamtNode datamodel.Node) (string, error) { + pointers, err := hamtNode.LookupByString("Pointers") + if err != nil { + return "", xerrors.Errorf("failed to get HamtNode.Pointers: %w", err) + } + var match string + itr := pointers.ListIterator() + for !itr.Done() { + _, v, err := itr.Next() + if err != nil { + return "", err + } + b, err := v.LookupByString("Bucket") + if err == nil { + bitr := b.ListIterator() + for !bitr.Done() { + _, kv, err := bitr.Next() + if err != nil { + return "", err + } + bval, err := kv.LookupByString("Value") + if err != nil { + return "", err + } + enc, err := ipld.Encode(bval, dagcbor.Encode) + if err != nil { + return "", err + } + if m, _ := matchKnownBlockTypeFromBytes(enc); m != "" { + if match == "" { + match = m + } else if match != m { + return "", xerrors.Errorf("inconsistent types in HAMT values") + } + } + } + } + } + if match != "" { + return "[" + match + "]", nil + } + return "", nil +} + +var wellKnownBlockBytes = map[string][]byte{ + "EmptyArray": {0x80}, + "EmptyBytes": {0x40}, + "EmptyString": {0x60}, // is this used anywhere in the chain? + "Zero": {0x00}, // is this used anywhere in the chain? + "HAMTNode{5}[empty]": {0x82, 0x40, 0x80}, + "AMTRoot{2/3}[empty]": {0x84, 0x02, 0x00, 0x00, 0x83, 0x41, 0x00, 0x80, 0x80}, + "AMTRoot{4}[empty]": {0x84, 0x04, 0x00, 0x00, 0x83, 0x42, 0x00, 0x00, 0x80, 0x80}, + "AMTRoot{5}[empty]": {0x84, 0x05, 0x00, 0x00, 0x83, 0x44, 0x00, 0x00, 0x00, 0x00, 0x80, 0x80}, + "AMTRoot{6}[empty]": {0x84, 0x06, 0x00, 0x00, 0x83, 0x48, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x80, 0x80}, +} + +func matchWellKnownBlockType(b []byte) (string, error) { + for name, wkb := range wellKnownBlockBytes { + if bytes.Equal(b, wkb) { + return name, nil + } + } + return "", nil +} + +// matchKnownBlockTypeFromBytes attempts to determine the type of a block by inspecting its bytes. +// We use a fixed list of known types that have a CBORUnmarshaler that we believe may be possible. +// This list is not exhaustive and should be expanded as unknown types are encountered. +func matchKnownBlockTypeFromBytes(b []byte) (string, error) { + if m, _ := matchWellKnownBlockType(b); m != "" { + return m, nil + } + + if _, err := cbg.ReadCid(bytes.NewReader(b)); err == nil { + return "Cid", nil + } + known := map[string]cbg.CBORUnmarshaler{ + // Fill this out with known types when you see them missing and can identify them + "BlockHeader": &types.BlockHeader{}, + "miner16.State": &miner16.State{}, + "miner16.MinerInfo": &miner16.MinerInfo{}, + "miner16.Deadlines": &miner16.Deadlines{}, + "miner16.Deadline": &miner16.Deadline{}, + "miner16.Partition": &miner16.Partition{}, + "miner16.ExpirationSet": &miner16.ExpirationSet{}, + "miner16.WindowedPoSt": &miner16.WindowedPoSt{}, + "miner16.SectorOnChainInfo": &miner16.SectorOnChainInfo{}, + "miner16.SectorPreCommitOnChainInfo": &miner16.SectorPreCommitOnChainInfo{}, + "Bitfield": &bitfield.BitField{}, + } + for name, v := range known { + if err := v.UnmarshalCBOR(bytes.NewReader(b)); err == nil { + return name, nil + } + } + return "", nil +} + +const knownTypesSchema = ` +type HamtNode struct { + Bitfield Bytes + Pointers [Pointer] +} representation tuple + +type Pointer union { + | Any link # link to HamtNode + | Bucket list +} representation kinded + +type Bucket [KV] + +type KV struct { + Key Bytes + Value Any +} representation tuple + +type AMTNode struct { + Bmap Bytes + Links [Link] + Values [Any] +} representation tuple + +type AMTRoot struct { + BitWidth Int + Height Int + Count Int + Node AMTNode +} representation tuple +` + +var knownTypeSystem schema.TypeSystem + +func init() { + sch, err := schemadsl.ParseBytes([]byte(knownTypesSchema)) + if err != nil { + panic(err) + } + knownTypeSystem.Init() + if err := schemadmt.Compile(&knownTypeSystem, sch); err != nil { + panic(err) + } +} diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go index 88b21f4076..3f07aa3eba 100644 --- a/cmd/lotus-shed/state-stats.go +++ b/cmd/lotus-shed/state-stats.go @@ -19,6 +19,10 @@ import ( "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/codec/dagjson" + "github.com/ipld/go-ipld-prime/printer" "github.com/urfave/cli/v2" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" @@ -50,22 +54,33 @@ type actorStats struct { Actor *types.Actor Fields []fieldItem Stats api.ObjStat + Blocks []blockRepr `json:",omitempty"` } type fieldItem struct { - Name string - Cid cid.Cid - Stats api.ObjStat + Name string + Cid cid.Cid + Stats api.ObjStat + Blocks []blockRepr `json:",omitempty"` +} + +type blockRepr struct { + Cid cid.Cid + Size uint64 + Representation string `json:",omitempty"` + KnownType string `json:",omitempty"` } type job struct { c cid.Cid key string // prefix path for the region being recorded i.e. "/state/mineractor" } + type cidCall struct { c cid.Cid resp chan bool } + type result struct { key string stats api.ObjStat @@ -121,12 +136,38 @@ func (cng *cacheNodeGetter) GetMany(ctx context.Context, list []cid.Cid) <-chan return out } +type representationType string + +const ( + representationTypeNone representationType = "none" + representationTypeCid representationType = "cid" + representationTypePrintable representationType = "printable" + representationTypeDagJson representationType = "dagjson" +) + +func parseRepresentationType(s string) (representationType, error) { + switch s { + case "", "none": + return representationTypeNone, nil + case "cid": + return representationTypeCid, nil + case "printable": + return representationTypePrintable, nil + case "dagjson": + return representationTypeDagJson, nil + default: + return "", xerrors.Errorf("unknown representation type: %s", s) + } +} + type dagStatCollector struct { - ds format.NodeGetter - walk func(format.Node) ([]*format.Link, error) + ds format.NodeGetter + walk func(format.Node) ([]*format.Link, error) + representationType representationType statsLk sync.Mutex stats api.ObjStat + blocks []blockRepr } func (dsc *dagStatCollector) record(ctx context.Context, nd format.Node) error { @@ -140,6 +181,35 @@ func (dsc *dagStatCollector) record(ctx context.Context, nd format.Node) error { dsc.stats.Size = dsc.stats.Size + size dsc.stats.Links = dsc.stats.Links + 1 + if dsc.representationType != representationTypeNone { + br := blockRepr{Cid: nd.Cid(), Size: size} + + if dsc.representationType != representationTypeCid { + node, err := ipld.Decode(nd.RawData(), dagcbor.Decode) + if err != nil { + return xerrors.Errorf("decoding node: %w", err) + } + + switch dsc.representationType { + case representationTypePrintable: + br.Representation = printer.Sprint(node) + case representationTypeDagJson: + dj, err := ipld.Encode(node, dagjson.Encode) + if err != nil { + return xerrors.Errorf("encoding node to dag-json: %w", err) + } + br.Representation = string(dj) + } + } + + if typ, err := matchKnownBlockType(ctx, nd); err != nil { + log.Warnf("failed to match block type: %s", err) + } else { + br.KnownType = typ + } + + dsc.blocks = append(dsc.blocks, br) + } return nil } @@ -360,7 +430,7 @@ var statSnapshotCmd = &cli.Command{ Flags: []cli.Flag{ &cli.StringFlag{ Name: "tipset", - Usage: "specify tipset to call method on (pass comma separated array of cids)", + Usage: "specify tipset to call method on (pass comma separated array of cids or @ to specify tipset by height)", }, &cli.IntFlag{ Name: "workers", @@ -618,11 +688,23 @@ The top level stats reported for an actor is computed independently of all field accounting of the true size of the actor in the state datastore. The calculation of these stats results in the actor state being traversed twice. The dag-cache-size flag can be used -to reduce the number of decode operations performed by caching the decoded object after first access.`, +to reduce the number of decode operations performed by caching the decoded object after first access. + +When using the diff-tipset flag, the stats output will only include the mutated state between the two tipsets, not +the total state of the actor in either tipset. +`, Flags: []cli.Flag{ &cli.StringFlag{ Name: "tipset", - Usage: "specify tipset to call method on (pass comma separated array of cids)", + Usage: "specify tipset to call method on (pass comma separated array of cids or @ to specify tipset by height)", + }, + &cli.StringFlag{ + Name: "diff-tipset", + Usage: "specify tipset to diff against, stat output will include only the mutated state between the two tipsets (pass comma separated array of cids or @ to specify tipset by height)", + }, + &cli.StringFlag{ + Name: "show-blocks", + Usage: "show blocks as one of 'none', 'cid' a 'short' or 'full' representation of the block contents, or 'dagjson' to dump the full block contents as DAG-JSON. In the case of a diff-tipset this will be the blocks that are different between the two tipsets.", }, &cli.IntFlag{ Name: "workers", @@ -687,13 +769,20 @@ to reduce the number of decode operations performed by caching the decoded objec numWorkers := cctx.Int("workers") dagCacheSize := cctx.Int("dag-cache-size") - - eg, egctx := errgroup.WithContext(ctx) + reprType, err := parseRepresentationType(cctx.String("show-blocks")) + if err != nil { + return err + } + if cctx.IsSet("diff-tipset") { // if diff, don't list on first pass + reprType = representationTypeNone + } jobs := make(chan address.Address, numWorkers) results := make(chan actorStats, numWorkers) - worker := func(ctx context.Context, id int) error { + sc := &statCollector{representationType: reprType} + + worker := func(ctx context.Context, id int, ts *types.TipSet) error { completed := 0 defer func() { log.Infow("worker done", "id", id, "completed", completed) @@ -720,7 +809,7 @@ to reduce the number of decode operations performed by caching the decoded objec } } - actStats, err := collectStats(ctx, addr, actor, dag) + actStats, err := sc.collectStats(ctx, addr, actor, dag) if err != nil { return err } @@ -738,20 +827,70 @@ to reduce the number of decode operations performed by caching the decoded objec } } + eg, egctx := errgroup.WithContext(ctx) for w := 0; w < numWorkers; w++ { id := w eg.Go(func() error { - return worker(egctx, id) + return worker(egctx, id, ts) }) } + done := make(chan struct{}) go func() { - defer close(jobs) + defer func() { + close(jobs) + close(done) + }() for _, addr := range addrs { jobs <- addr } }() + // if diff-tipset is set, we need to load the actors from the diff tipset and compare, so we'll + // discard the results for this run, then run the workers again with a new set of jobs and take + // the results from the second run which should just include the diff. + + if diffTs := cctx.String("diff-tipset"); diffTs != "" { + // read and discard results + go func() { + for range results { // nolint:revive + } + }() + + _ = eg.Wait() + log.Infow("done with first pass, starting diff") + close(results) + + <-done + + dts, err := lcli.ParseTipSetRef(ctx, tsr, diffTs) + if err != nil { + return err + } + // TODO: if anyone cares for the "all" case, re-load actors here + log.Infow("diff tipset", "parentstate", dts.ParentState()) + + jobs = make(chan address.Address, numWorkers) + results = make(chan actorStats, numWorkers) + reprType, _ := parseRepresentationType(cctx.String("show-blocks")) + sc.representationType = reprType + + eg, egctx = errgroup.WithContext(ctx) + for w := 0; w < numWorkers; w++ { + id := w + eg.Go(func() error { + return worker(egctx, id, dts) + }) + } + + go func() { + defer close(jobs) + for _, addr := range addrs { + jobs <- addr + } + }() + } + go func() { // error is check later _ = eg.Wait() @@ -866,7 +1005,18 @@ func collectSnapshotJobStats(ctx context.Context, in job, dag format.NodeGetter, return results, nil } -func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) { +type statCollector struct { + rootCidSet *cid.Set + representationType representationType + fieldCidSets map[string]*cid.Set +} + +func (sc *statCollector) collectStats( + ctx context.Context, + addr address.Address, + actor *types.Actor, + dag format.NodeGetter, +) (actorStats, error) { log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) nd, err := dag.Get(ctx, actor.Head) @@ -903,33 +1053,52 @@ func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, } } + if sc.rootCidSet == nil { + sc.rootCidSet = cid.NewSet() + sc.fieldCidSets = make(map[string]*cid.Set) + for _, field := range fields { + sc.fieldCidSets[field.Name] = cid.NewSet() + } + } + actStats := actorStats{ Address: addr, Actor: actor, } dsc := &dagStatCollector{ - ds: dag, - walk: carWalkFunc, + ds: dag, + walk: carWalkFunc, + representationType: sc.representationType, } - if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, cid.NewSet().Visit, merkledag.Concurrent()); err != nil { + if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, sc.rootCidSet.Visit, merkledag.Concurrent()); err != nil { return actorStats{}, err } actStats.Stats = dsc.stats + if sc.representationType != representationTypeNone { + actStats.Blocks = dsc.blocks + } for _, field := range fields { dsc := &dagStatCollector{ - ds: dag, - walk: carWalkFunc, + ds: dag, + walk: carWalkFunc, + representationType: sc.representationType, } - if err := merkledag.Walk(ctx, dsc.walkLinks, field.Cid, cid.NewSet().Visit, merkledag.Concurrent()); err != nil { + if err := merkledag.Walk(ctx, func(ctx context.Context, c cid.Cid) ([]*format.Link, error) { + links, err := dsc.walkLinks(ctx, c) + return links, err + }, field.Cid, sc.fieldCidSets[field.Name].Visit, merkledag.Concurrent()); err != nil { return actorStats{}, err } field.Stats = dsc.stats + if sc.representationType != representationTypeNone { + field.Blocks = dsc.blocks + } actStats.Fields = append(actStats.Fields, field) }