Skip to content

Commit

Permalink
wip refactor of tools and apps package
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Dec 5, 2023
1 parent f58cd08 commit efd96c3
Show file tree
Hide file tree
Showing 26 changed files with 407 additions and 365 deletions.
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/streamingfast/firehose-core/cmd/tools"

"github.com/streamingfast/firehose-core/cmd/apps"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
Expand All @@ -24,6 +22,8 @@ import (
dmeteringgrpc "github.com/streamingfast/dmetering/grpc"
dmeteringlogger "github.com/streamingfast/dmetering/logger"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/cmd/apps"

"github.com/streamingfast/logging"
"go.uber.org/zap"
)
Expand Down
37 changes: 20 additions & 17 deletions cmd/tools/tools_check_blocks.go → cmd/tools/check/blocks.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tools
package check

import (
"context"
Expand All @@ -9,12 +9,15 @@ import (
"regexp"
"strconv"

firecore "github.com/streamingfast/firehose-core"
print2 "github.com/streamingfast/firehose-core/cmd/tools/print"

"github.com/streamingfast/firehose-core/types"

"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/forkable"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
"go.uber.org/zap"
)

Expand All @@ -29,7 +32,7 @@ const (
MaxUint64 = ^uint64(0)
)

func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Chain[B], logger *zap.Logger, storeURL string, fileBlockSize uint64, blockRange BlockRange, printDetails PrintDetails) error {
func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Chain[B], logger *zap.Logger, storeURL string, fileBlockSize uint64, blockRange types.BlockRange, printDetails PrintDetails) error {
readAllBlocks := printDetails != PrintNoDetails
fmt.Printf("Checking block holes on %s\n", storeURL)
if readAllBlocks {
Expand All @@ -50,7 +53,7 @@ func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Ch
// }

holeFound := false
expected = RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize)
expected = types.RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize)
currentStartBlk := uint64(blockRange.Start)

blocksStore, err := dstore.NewDBinStore(storeURL)
Expand Down Expand Up @@ -83,11 +86,11 @@ func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Ch
if baseNum != expected {
// There is no previous valid block range if we are at the ever first seen file
if count > 1 {
fmt.Printf("✅ Range %s\n", NewClosedRange(int64(currentStartBlk), uint64(RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize))))
fmt.Printf("✅ Range %s\n", types.NewClosedRange(int64(currentStartBlk), uint64(types.RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize))))
}

// Otherwise, we do not follow last seen element (previous is `100 - 199` but we are `299 - 300`)
missingRange := NewClosedRange(int64(expected), RoundToBundleEndBlock(baseNum-fileBlockSize, fileBlockSize))
missingRange := types.NewClosedRange(int64(expected), types.RoundToBundleEndBlock(baseNum-fileBlockSize, fileBlockSize))
fmt.Printf("❌ Range %s (Missing, [%s])\n", missingRange, missingRange.ReprocRange())
currentStartBlk = baseNum

Expand All @@ -113,11 +116,11 @@ func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Ch
}

if count%10000 == 0 {
fmt.Printf("✅ Range %s\n", NewClosedRange(int64(currentStartBlk), RoundToBundleEndBlock(baseNum, fileBlockSize)))
fmt.Printf("✅ Range %s\n", types.NewClosedRange(int64(currentStartBlk), types.RoundToBundleEndBlock(baseNum, fileBlockSize)))
currentStartBlk = baseNum + fileBlockSize
}

if blockRange.IsClosed() && RoundToBundleEndBlock(baseNum, fileBlockSize) >= *blockRange.Stop-1 {
if blockRange.IsClosed() && types.RoundToBundleEndBlock(baseNum, fileBlockSize) >= *blockRange.Stop-1 {
return dstore.StopIteration
}

Expand All @@ -135,9 +138,9 @@ func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Ch
zap.Uint64("highest_block_seen", highestBlockSeen),
)
if tfdb.lastLinkedBlock != nil && tfdb.lastLinkedBlock.Number < highestBlockSeen {
fmt.Printf("🔶 Range %s has issues with forks, last linkable block number: %d\n", NewClosedRange(int64(currentStartBlk), uint64(highestBlockSeen)), tfdb.lastLinkedBlock.Number)
fmt.Printf("🔶 Range %s has issues with forks, last linkable block number: %d\n", types.NewClosedRange(int64(currentStartBlk), uint64(highestBlockSeen)), tfdb.lastLinkedBlock.Number)
} else {
fmt.Printf("✅ Range %s\n", NewClosedRange(int64(currentStartBlk), uint64(highestBlockSeen)))
fmt.Printf("✅ Range %s\n", types.NewClosedRange(int64(currentStartBlk), uint64(highestBlockSeen)))
}

fmt.Println()
Expand All @@ -146,7 +149,7 @@ func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Ch
if blockRange.IsClosed() &&
(highestBlockSeen < uint64(*blockRange.Stop-1) ||
(lowestBlockSeen > uint64(blockRange.Start) && lowestBlockSeen > bstream.GetProtocolFirstStreamableBlock)) {
fmt.Printf("> 🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, PrettyBlockNum(lowestBlockSeen), PrettyBlockNum(highestBlockSeen))
fmt.Printf("> 🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, types.PrettyBlockNum(lowestBlockSeen), types.PrettyBlockNum(highestBlockSeen))
}

if holeFound {
Expand All @@ -171,7 +174,7 @@ func validateBlockSegment[B firecore.Block](
store dstore.Store,
segment string,
fileBlockSize uint64,
blockRange BlockRange,
blockRange types.BlockRange,
printDetails PrintDetails,
tfdb *trackedForkDB,
) (lowestBlockSeen, highestBlockSeen uint64) {
Expand All @@ -197,7 +200,7 @@ func validateBlockSegment[B firecore.Block](
continue
}

if blockRange.IsClosed() && block.Number > uint64(*blockRange.Stop) {
if blockRange.IsClosed() && block.Number > *blockRange.Stop {
return
}

Expand Down Expand Up @@ -239,7 +242,7 @@ func validateBlockSegment[B firecore.Block](
seenBlockCount++

if printDetails == PrintStats {
err := printBStreamBlock(block, false, os.Stdout)
err := print2.PrintBStreamBlock(block, false, os.Stdout)
if err != nil {
fmt.Printf("❌ Unable to print block %s: %s\n", block.AsRef(), err)
continue
Expand Down Expand Up @@ -288,13 +291,13 @@ func validateBlockSegment[B firecore.Block](
return
}

func WalkBlockPrefix(blockRange BlockRange, fileBlockSize uint64) string {
func WalkBlockPrefix(blockRange types.BlockRange, fileBlockSize uint64) string {
if blockRange.IsOpen() {
return ""
}

startString := fmt.Sprintf("%010d", RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize))
endString := fmt.Sprintf("%010d", RoundToBundleEndBlock(uint64(*blockRange.Stop-1), fileBlockSize)+1)
startString := fmt.Sprintf("%010d", types.RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize))
endString := fmt.Sprintf("%010d", types.RoundToBundleEndBlock(*blockRange.Stop-1, fileBlockSize)+1)

offset := 0
for i := 0; i < len(startString); i++ {
Expand Down
50 changes: 26 additions & 24 deletions cmd/tools/tools_check.go → cmd/tools/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,47 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package tools
package check

import (
"fmt"
"strings"

"go.uber.org/zap"

firecore "github.com/streamingfast/firehose-core"

"github.com/dustin/go-humanize"
"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
"github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/types"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

var toolsCheckCmd = &cobra.Command{Use: "check", Short: "Various checks for deployment, data integrity & debugging"}
func NewCheckCommand[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) *cobra.Command {

var toolsCheckForksCmd = &cobra.Command{
Use: "forks <forked-blocks-store-url>",
Short: "Reads all forked blocks you have and print longest linkable segments for each fork",
Args: cobra.ExactArgs(1),
}
toolsCheckCmd := &cobra.Command{Use: "check", Short: "Various checks for deployment, data integrity & debugging"}

var toolsCheckMergedBlocksCmd = &cobra.Command{
// TODO: Not sure, it's now a required thing, but we could probably use the same logic as `start`
// and avoid altogether passing the args. If this would also load the config and everything else,
// that would be much more seamless!
Use: "merged-blocks <store-url>",
Short: "Checks for any holes in merged blocks as well as ensuring merged blocks integrity",
Args: cobra.ExactArgs(1),
}
toolsCheckForksCmd := &cobra.Command{
Use: "forks <forked-blocks-store-url>",
Short: "Reads all forked blocks you have and print longest linkable segments for each fork",
Args: cobra.ExactArgs(1),
}

var (
toolsCheckMergedBlocksCmd = &cobra.Command{
// TODO: Not sure, it's now a required thing, but we could probably use the same logic as `start`
// and avoid altogether passing the args. If this would also load the config and everything else,
// that would be much more seamless!
Use: "merged-blocks <store-url>",
Short: "Checks for any holes in merged blocks as well as ensuring merged blocks integrity",
Args: cobra.ExactArgs(1),
}
)

func init() {
ToolsCmd.AddCommand(toolsCheckCmd)
toolsCheckCmd.AddCommand(newCheckMergedBlockBatchCmd())
toolsCheckCmd.AddCommand(toolsCheckForksCmd)
toolsCheckCmd.AddCommand(toolsCheckMergedBlocksCmd)

Expand All @@ -61,9 +63,7 @@ func init() {

toolsCheckForksCmd.Flags().Uint64("min-depth", 1, "Only show forks that are at least this deep")
toolsCheckForksCmd.Flags().Uint64("after-block", 0, "Only show forks that happened after this block number, if value is not 0")
}

func configureToolsCheckCmd[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) {
toolsCheckMergedBlocksCmd.RunE = createToolsCheckMergedBlocksE(chain, rootLog)
toolsCheckMergedBlocksCmd.Example = firecore.ExamplePrefixed(chain, "tools check merged-blocks", `
"./sf-data/storage/merged-blocks"
Expand All @@ -73,14 +73,16 @@ func configureToolsCheckCmd[B firecore.Block](chain *firecore.Chain[B], rootLog
`)

toolsCheckForksCmd.RunE = toolsCheckForksE

return toolsCheckCmd
}

func createToolsCheckMergedBlocksE[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) error {
storeURL := args[0]
fileBlockSize := uint64(100)

blockRange, err := GetBlockRangeFromFlag(cmd, "range")
blockRange, err := types.GetBlockRangeFromFlag(cmd, "range")
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package tools
package check

import (
"context"
Expand All @@ -7,10 +7,10 @@ import (
"strconv"
"strings"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"

"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dstore"
"github.com/streamingfast/firehose-core/types"
)

type blockRef struct {
Expand Down Expand Up @@ -41,13 +41,13 @@ func CheckMergedBlocksBatch(
sourceStoreURL string,
destStoreURL string,
fileBlockSize uint64,
blockRange BlockRange,
blockRange types.BlockRange,
) error {
if !blockRange.IsResolved() {
return fmt.Errorf("check merged blocks can only work with fully resolved range, got %s", blockRange)
}

expected := RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize)
expected := types.RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize)
fileBlockSize64 := uint64(fileBlockSize)

blocksStore, err := dstore.NewDBinStore(sourceStoreURL)
Expand All @@ -62,7 +62,7 @@ func CheckMergedBlocksBatch(
}
}

var firstFilename = fmt.Sprintf("%010d", RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize))
var firstFilename = fmt.Sprintf("%010d", types.RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize))

lastSeenBlock := &blockRef{}

Expand Down Expand Up @@ -103,12 +103,15 @@ func CheckMergedBlocksBatch(
destStore.WriteObject(ctx, outputFile, strings.NewReader(""))
}
} else {
brokenSince := RoundToBundleStartBlock(uint64(lastSeenBlock.num+1), 100)
brokenSince := types.RoundToBundleStartBlock(uint64(lastSeenBlock.num+1), 100)
for i := brokenSince; i <= baseNum; i += fileBlockSize64 {
fmt.Printf("found broken file %q, %s\n", filename, details)
if destStore != nil {
outputFile := fmt.Sprintf("%010d.broken", i)
destStore.WriteObject(ctx, outputFile, strings.NewReader(""))
err := destStore.WriteObject(ctx, outputFile, strings.NewReader(""))
if err != nil {
return fmt.Errorf("unable to write broken file %q: %w", outputFile, err)
}
}
}
}
Expand All @@ -120,7 +123,7 @@ func CheckMergedBlocksBatch(
return err
}

if blockRange.IsClosed() && RoundToBundleEndBlock(baseNum, fileBlockSize) >= *blockRange.Stop-1 {
if blockRange.IsClosed() && types.RoundToBundleEndBlock(baseNum, fileBlockSize) >= *blockRange.Stop-1 {
return dstore.StopIteration
}
expected = baseNum + fileBlockSize64
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package tools
package check

import (
"strconv"

"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/firehose-core/types"
)

var toolsCheckMergedBlocksBatchCmd = &cobra.Command{
Use: "merged-blocks-batch <store-url> <start> <stop>",
Short: "Checks for any missing, disordered or duplicate blocks in merged blocks files",
Args: cobra.ExactArgs(3),
RunE: checkMergedBlocksBatchRunE,
}

func init() {
toolsCheckCmd.AddCommand(toolsCheckMergedBlocksBatchCmd)
func newCheckMergedBlockBatchCmd() *cobra.Command {
var toolsCheckMergedBlocksBatchCmd = &cobra.Command{
Use: "merged-blocks-batch <store-url> <start> <stop>",
Short: "Checks for any missing, disordered or duplicate blocks in merged blocks files",
Args: cobra.ExactArgs(3),
RunE: checkMergedBlocksBatchRunE,
}
toolsCheckMergedBlocksBatchCmd.PersistentFlags().String("output-to-store", "", "If non-empty, an empty file called <blocknum>.broken will be created for every problematic merged-blocks-file. This is a convenient way to gather the results from multiple parallel processes.")
return toolsCheckMergedBlocksBatchCmd

}

func checkMergedBlocksBatchRunE(cmd *cobra.Command, args []string) error {
Expand All @@ -45,7 +46,7 @@ func checkMergedBlocksBatchRunE(cmd *cobra.Command, args []string) error {
}
fileBlockSize := uint64(100)

blockRange := BlockRange{
blockRange := types.BlockRange{
Start: int64(start),
Stop: &stop,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package tools
package firehose

import (
"context"
"fmt"
"io"

"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/types"
"github.com/streamingfast/jsonpb"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.uber.org/zap"
"io"
)

func newToolsFirehoseClientCmd[B firecore.Block](chain *firecore.Chain[B], logger *zap.Logger) *cobra.Command {
func NewToolsFirehoseClientCmd[B firecore.Block](chain *firecore.Chain[B], logger *zap.Logger) *cobra.Command {
cmd := &cobra.Command{
Use: "firehose-client <endpoint> <range>",
Short: "Connects to a Firehose endpoint over gRPC and print block stream as JSON to terminal",
Expand Down Expand Up @@ -43,7 +45,7 @@ func getFirehoseClientE[B firecore.Block](chain *firecore.Chain[B], rootLog *zap
}
defer connClose()

blockRange, err := GetBlockRangeFromArg(args[1])
blockRange, err := types.GetBlockRangeFromArg(args[1])
if err != nil {
return fmt.Errorf("invalid range %q: %w", args[1], err)
}
Expand Down
Loading

0 comments on commit efd96c3

Please sign in to comment.