From efd96c34861e439b40f5e56809f97c4dc3175202 Mon Sep 17 00:00:00 2001 From: billettc Date: Tue, 5 Dec 2023 13:02:55 -0500 Subject: [PATCH] wip refactor of tools and apps package --- cmd/main.go | 4 +- .../blocks.go} | 37 ++--- cmd/tools/{tools_check.go => check/check.go} | 50 +++---- .../merged_batch.go} | 21 +-- .../mergedbatch.go} | 23 +-- .../client.go} | 10 +- cmd/tools/firehose/firehose.go | 123 ++++++++++++++++ .../prometheus_exporter.go} | 4 +- .../single_block_client.go} | 4 +- .../tools_download_from_firehose.go | 13 +- cmd/tools/{ => print}/tools_print.go | 54 ++++---- cmd/tools/{ => print}/tools_print_enum.go | 2 +- cmd/tools/tools.go | 131 ++---------------- cmd/tools/tools_compare_blocks.go | 16 ++- cmd/tools/tools_fix_bloated_merged_blocks.go | 18 +-- cmd/tools/tools_unmerge_blocks.go | 7 +- cmd/tools/tools_upgrade_merged_blocks.go | 114 +-------------- jsonencoder/encoder.go | 6 +- mergedblockswriter.go | 113 +++++++++++++++ {cmd/tools => types}/block_range.go | 2 +- {cmd/tools => types}/block_range_enum.go | 2 +- {cmd/tools => types}/flags.go | 6 +- {cmd/tools => types}/types.go | 2 +- {cmd/tools => types}/types_test.go | 2 +- {cmd/tools => types}/utils.go | 4 +- {cmd/tools => types}/utils_test.go | 4 +- 26 files changed, 407 insertions(+), 365 deletions(-) rename cmd/tools/{tools_check_blocks.go => check/blocks.go} (83%) rename cmd/tools/{tools_check.go => check/check.go} (81%) rename cmd/tools/{tools_check_merged_batch.go => check/merged_batch.go} (86%) rename cmd/tools/{tools_checkmergedbatch.go => check/mergedbatch.go} (76%) rename cmd/tools/{tools_firehose_client.go => firehose/client.go} (94%) create mode 100644 cmd/tools/firehose/firehose.go rename cmd/tools/{tools_firehose_prometheus_exporter.go => firehose/prometheus_exporter.go} (97%) rename cmd/tools/{tools_firehose_single_block_client.go => firehose/single_block_client.go} (96%) rename cmd/tools/{ => firehose}/tools_download_from_firehose.go (95%) rename cmd/tools/{ => print}/tools_print.go (88%) rename cmd/tools/{ => print}/tools_print_enum.go (99%) create mode 100644 mergedblockswriter.go rename {cmd/tools => types}/block_range.go (99%) rename {cmd/tools => types}/block_range_enum.go (99%) rename {cmd/tools => types}/flags.go (82%) rename {cmd/tools => types}/types.go (96%) rename {cmd/tools => types}/types_test.go (97%) rename {cmd/tools => types}/utils.go (97%) rename {cmd/tools => types}/utils_test.go (97%) diff --git a/cmd/main.go b/cmd/main.go index e308e52..e5558c9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,8 +8,6 @@ import ( "github.com/streamingfast/firehose-core/cmd/tools" - "github.com/streamingfast/firehose-core/cmd/apps" - "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/spf13/viper" @@ -24,6 +22,8 @@ import ( dmeteringgrpc "github.com/streamingfast/dmetering/grpc" dmeteringlogger "github.com/streamingfast/dmetering/logger" firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-core/cmd/apps" + "github.com/streamingfast/logging" "go.uber.org/zap" ) diff --git a/cmd/tools/tools_check_blocks.go b/cmd/tools/check/blocks.go similarity index 83% rename from cmd/tools/tools_check_blocks.go rename to cmd/tools/check/blocks.go index 1f44507..8b86091 100644 --- a/cmd/tools/tools_check_blocks.go +++ b/cmd/tools/check/blocks.go @@ -1,4 +1,4 @@ -package tools +package check import ( "context" @@ -9,12 +9,15 @@ import ( "regexp" "strconv" - firecore "github.com/streamingfast/firehose-core" + print2 "github.com/streamingfast/firehose-core/cmd/tools/print" + + "github.com/streamingfast/firehose-core/types" "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/dstore" + firecore "github.com/streamingfast/firehose-core" "go.uber.org/zap" ) @@ -29,7 +32,7 @@ const ( MaxUint64 = ^uint64(0) ) -func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Chain[B], logger *zap.Logger, storeURL string, fileBlockSize uint64, blockRange BlockRange, printDetails PrintDetails) error { +func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Chain[B], logger *zap.Logger, storeURL string, fileBlockSize uint64, blockRange types.BlockRange, printDetails PrintDetails) error { readAllBlocks := printDetails != PrintNoDetails fmt.Printf("Checking block holes on %s\n", storeURL) if readAllBlocks { @@ -50,7 +53,7 @@ func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Ch // } holeFound := false - expected = RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize) + expected = types.RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize) currentStartBlk := uint64(blockRange.Start) blocksStore, err := dstore.NewDBinStore(storeURL) @@ -83,11 +86,11 @@ func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Ch if baseNum != expected { // There is no previous valid block range if we are at the ever first seen file if count > 1 { - fmt.Printf("✅ Range %s\n", NewClosedRange(int64(currentStartBlk), uint64(RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize)))) + fmt.Printf("✅ Range %s\n", types.NewClosedRange(int64(currentStartBlk), uint64(types.RoundToBundleEndBlock(expected-fileBlockSize, fileBlockSize)))) } // Otherwise, we do not follow last seen element (previous is `100 - 199` but we are `299 - 300`) - missingRange := NewClosedRange(int64(expected), RoundToBundleEndBlock(baseNum-fileBlockSize, fileBlockSize)) + missingRange := types.NewClosedRange(int64(expected), types.RoundToBundleEndBlock(baseNum-fileBlockSize, fileBlockSize)) fmt.Printf("❌ Range %s (Missing, [%s])\n", missingRange, missingRange.ReprocRange()) currentStartBlk = baseNum @@ -113,11 +116,11 @@ func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Ch } if count%10000 == 0 { - fmt.Printf("✅ Range %s\n", NewClosedRange(int64(currentStartBlk), RoundToBundleEndBlock(baseNum, fileBlockSize))) + fmt.Printf("✅ Range %s\n", types.NewClosedRange(int64(currentStartBlk), types.RoundToBundleEndBlock(baseNum, fileBlockSize))) currentStartBlk = baseNum + fileBlockSize } - if blockRange.IsClosed() && RoundToBundleEndBlock(baseNum, fileBlockSize) >= *blockRange.Stop-1 { + if blockRange.IsClosed() && types.RoundToBundleEndBlock(baseNum, fileBlockSize) >= *blockRange.Stop-1 { return dstore.StopIteration } @@ -135,9 +138,9 @@ func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Ch zap.Uint64("highest_block_seen", highestBlockSeen), ) if tfdb.lastLinkedBlock != nil && tfdb.lastLinkedBlock.Number < highestBlockSeen { - fmt.Printf("🔶 Range %s has issues with forks, last linkable block number: %d\n", NewClosedRange(int64(currentStartBlk), uint64(highestBlockSeen)), tfdb.lastLinkedBlock.Number) + fmt.Printf("🔶 Range %s has issues with forks, last linkable block number: %d\n", types.NewClosedRange(int64(currentStartBlk), uint64(highestBlockSeen)), tfdb.lastLinkedBlock.Number) } else { - fmt.Printf("✅ Range %s\n", NewClosedRange(int64(currentStartBlk), uint64(highestBlockSeen))) + fmt.Printf("✅ Range %s\n", types.NewClosedRange(int64(currentStartBlk), uint64(highestBlockSeen))) } fmt.Println() @@ -146,7 +149,7 @@ func CheckMergedBlocks[B firecore.Block](ctx context.Context, chain *firecore.Ch if blockRange.IsClosed() && (highestBlockSeen < uint64(*blockRange.Stop-1) || (lowestBlockSeen > uint64(blockRange.Start) && lowestBlockSeen > bstream.GetProtocolFirstStreamableBlock)) { - fmt.Printf("> 🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, PrettyBlockNum(lowestBlockSeen), PrettyBlockNum(highestBlockSeen)) + fmt.Printf("> 🔶 Incomplete range %s, started at block %s and stopped at block: %s\n", blockRange, types.PrettyBlockNum(lowestBlockSeen), types.PrettyBlockNum(highestBlockSeen)) } if holeFound { @@ -171,7 +174,7 @@ func validateBlockSegment[B firecore.Block]( store dstore.Store, segment string, fileBlockSize uint64, - blockRange BlockRange, + blockRange types.BlockRange, printDetails PrintDetails, tfdb *trackedForkDB, ) (lowestBlockSeen, highestBlockSeen uint64) { @@ -197,7 +200,7 @@ func validateBlockSegment[B firecore.Block]( continue } - if blockRange.IsClosed() && block.Number > uint64(*blockRange.Stop) { + if blockRange.IsClosed() && block.Number > *blockRange.Stop { return } @@ -239,7 +242,7 @@ func validateBlockSegment[B firecore.Block]( seenBlockCount++ if printDetails == PrintStats { - err := printBStreamBlock(block, false, os.Stdout) + err := print2.PrintBStreamBlock(block, false, os.Stdout) if err != nil { fmt.Printf("❌ Unable to print block %s: %s\n", block.AsRef(), err) continue @@ -288,13 +291,13 @@ func validateBlockSegment[B firecore.Block]( return } -func WalkBlockPrefix(blockRange BlockRange, fileBlockSize uint64) string { +func WalkBlockPrefix(blockRange types.BlockRange, fileBlockSize uint64) string { if blockRange.IsOpen() { return "" } - startString := fmt.Sprintf("%010d", RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize)) - endString := fmt.Sprintf("%010d", RoundToBundleEndBlock(uint64(*blockRange.Stop-1), fileBlockSize)+1) + startString := fmt.Sprintf("%010d", types.RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize)) + endString := fmt.Sprintf("%010d", types.RoundToBundleEndBlock(*blockRange.Stop-1, fileBlockSize)+1) offset := 0 for i := 0; i < len(startString); i++ { diff --git a/cmd/tools/tools_check.go b/cmd/tools/check/check.go similarity index 81% rename from cmd/tools/tools_check.go rename to cmd/tools/check/check.go index 74014df..97bf7e4 100644 --- a/cmd/tools/tools_check.go +++ b/cmd/tools/check/check.go @@ -12,45 +12,47 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tools +package check import ( "fmt" "strings" - "go.uber.org/zap" - - firecore "github.com/streamingfast/firehose-core" - "github.com/dustin/go-humanize" "github.com/spf13/cobra" "github.com/streamingfast/bstream" "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" "github.com/streamingfast/dstore" + firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-core/types" + "go.uber.org/zap" "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) -var toolsCheckCmd = &cobra.Command{Use: "check", Short: "Various checks for deployment, data integrity & debugging"} +func NewCheckCommand[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) *cobra.Command { -var toolsCheckForksCmd = &cobra.Command{ - Use: "forks ", - Short: "Reads all forked blocks you have and print longest linkable segments for each fork", - Args: cobra.ExactArgs(1), -} + toolsCheckCmd := &cobra.Command{Use: "check", Short: "Various checks for deployment, data integrity & debugging"} -var toolsCheckMergedBlocksCmd = &cobra.Command{ - // TODO: Not sure, it's now a required thing, but we could probably use the same logic as `start` - // and avoid altogether passing the args. If this would also load the config and everything else, - // that would be much more seamless! - Use: "merged-blocks ", - Short: "Checks for any holes in merged blocks as well as ensuring merged blocks integrity", - Args: cobra.ExactArgs(1), -} + toolsCheckForksCmd := &cobra.Command{ + Use: "forks ", + Short: "Reads all forked blocks you have and print longest linkable segments for each fork", + Args: cobra.ExactArgs(1), + } + + var ( + toolsCheckMergedBlocksCmd = &cobra.Command{ + // TODO: Not sure, it's now a required thing, but we could probably use the same logic as `start` + // and avoid altogether passing the args. If this would also load the config and everything else, + // that would be much more seamless! + Use: "merged-blocks ", + Short: "Checks for any holes in merged blocks as well as ensuring merged blocks integrity", + Args: cobra.ExactArgs(1), + } + ) -func init() { - ToolsCmd.AddCommand(toolsCheckCmd) + toolsCheckCmd.AddCommand(newCheckMergedBlockBatchCmd()) toolsCheckCmd.AddCommand(toolsCheckForksCmd) toolsCheckCmd.AddCommand(toolsCheckMergedBlocksCmd) @@ -61,9 +63,7 @@ func init() { toolsCheckForksCmd.Flags().Uint64("min-depth", 1, "Only show forks that are at least this deep") toolsCheckForksCmd.Flags().Uint64("after-block", 0, "Only show forks that happened after this block number, if value is not 0") -} -func configureToolsCheckCmd[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) { toolsCheckMergedBlocksCmd.RunE = createToolsCheckMergedBlocksE(chain, rootLog) toolsCheckMergedBlocksCmd.Example = firecore.ExamplePrefixed(chain, "tools check merged-blocks", ` "./sf-data/storage/merged-blocks" @@ -73,6 +73,8 @@ func configureToolsCheckCmd[B firecore.Block](chain *firecore.Chain[B], rootLog `) toolsCheckForksCmd.RunE = toolsCheckForksE + + return toolsCheckCmd } func createToolsCheckMergedBlocksE[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) firecore.CommandExecutor { @@ -80,7 +82,7 @@ func createToolsCheckMergedBlocksE[B firecore.Block](chain *firecore.Chain[B], r storeURL := args[0] fileBlockSize := uint64(100) - blockRange, err := GetBlockRangeFromFlag(cmd, "range") + blockRange, err := types.GetBlockRangeFromFlag(cmd, "range") if err != nil { return err } diff --git a/cmd/tools/tools_check_merged_batch.go b/cmd/tools/check/merged_batch.go similarity index 86% rename from cmd/tools/tools_check_merged_batch.go rename to cmd/tools/check/merged_batch.go index 268c467..309d364 100644 --- a/cmd/tools/tools_check_merged_batch.go +++ b/cmd/tools/check/merged_batch.go @@ -1,4 +1,4 @@ -package tools +package check import ( "context" @@ -7,10 +7,10 @@ import ( "strconv" "strings" - pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" - "github.com/streamingfast/bstream" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/dstore" + "github.com/streamingfast/firehose-core/types" ) type blockRef struct { @@ -41,13 +41,13 @@ func CheckMergedBlocksBatch( sourceStoreURL string, destStoreURL string, fileBlockSize uint64, - blockRange BlockRange, + blockRange types.BlockRange, ) error { if !blockRange.IsResolved() { return fmt.Errorf("check merged blocks can only work with fully resolved range, got %s", blockRange) } - expected := RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize) + expected := types.RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize) fileBlockSize64 := uint64(fileBlockSize) blocksStore, err := dstore.NewDBinStore(sourceStoreURL) @@ -62,7 +62,7 @@ func CheckMergedBlocksBatch( } } - var firstFilename = fmt.Sprintf("%010d", RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize)) + var firstFilename = fmt.Sprintf("%010d", types.RoundToBundleStartBlock(uint64(blockRange.Start), fileBlockSize)) lastSeenBlock := &blockRef{} @@ -103,12 +103,15 @@ func CheckMergedBlocksBatch( destStore.WriteObject(ctx, outputFile, strings.NewReader("")) } } else { - brokenSince := RoundToBundleStartBlock(uint64(lastSeenBlock.num+1), 100) + brokenSince := types.RoundToBundleStartBlock(uint64(lastSeenBlock.num+1), 100) for i := brokenSince; i <= baseNum; i += fileBlockSize64 { fmt.Printf("found broken file %q, %s\n", filename, details) if destStore != nil { outputFile := fmt.Sprintf("%010d.broken", i) - destStore.WriteObject(ctx, outputFile, strings.NewReader("")) + err := destStore.WriteObject(ctx, outputFile, strings.NewReader("")) + if err != nil { + return fmt.Errorf("unable to write broken file %q: %w", outputFile, err) + } } } } @@ -120,7 +123,7 @@ func CheckMergedBlocksBatch( return err } - if blockRange.IsClosed() && RoundToBundleEndBlock(baseNum, fileBlockSize) >= *blockRange.Stop-1 { + if blockRange.IsClosed() && types.RoundToBundleEndBlock(baseNum, fileBlockSize) >= *blockRange.Stop-1 { return dstore.StopIteration } expected = baseNum + fileBlockSize64 diff --git a/cmd/tools/tools_checkmergedbatch.go b/cmd/tools/check/mergedbatch.go similarity index 76% rename from cmd/tools/tools_checkmergedbatch.go rename to cmd/tools/check/mergedbatch.go index 946d5b1..3b484eb 100644 --- a/cmd/tools/tools_checkmergedbatch.go +++ b/cmd/tools/check/mergedbatch.go @@ -12,25 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tools +package check import ( "strconv" "github.com/spf13/cobra" "github.com/streamingfast/cli/sflags" + "github.com/streamingfast/firehose-core/types" ) -var toolsCheckMergedBlocksBatchCmd = &cobra.Command{ - Use: "merged-blocks-batch ", - Short: "Checks for any missing, disordered or duplicate blocks in merged blocks files", - Args: cobra.ExactArgs(3), - RunE: checkMergedBlocksBatchRunE, -} - -func init() { - toolsCheckCmd.AddCommand(toolsCheckMergedBlocksBatchCmd) +func newCheckMergedBlockBatchCmd() *cobra.Command { + var toolsCheckMergedBlocksBatchCmd = &cobra.Command{ + Use: "merged-blocks-batch ", + Short: "Checks for any missing, disordered or duplicate blocks in merged blocks files", + Args: cobra.ExactArgs(3), + RunE: checkMergedBlocksBatchRunE, + } toolsCheckMergedBlocksBatchCmd.PersistentFlags().String("output-to-store", "", "If non-empty, an empty file called .broken will be created for every problematic merged-blocks-file. This is a convenient way to gather the results from multiple parallel processes.") + return toolsCheckMergedBlocksBatchCmd + } func checkMergedBlocksBatchRunE(cmd *cobra.Command, args []string) error { @@ -45,7 +46,7 @@ func checkMergedBlocksBatchRunE(cmd *cobra.Command, args []string) error { } fileBlockSize := uint64(100) - blockRange := BlockRange{ + blockRange := types.BlockRange{ Start: int64(start), Stop: &stop, } diff --git a/cmd/tools/tools_firehose_client.go b/cmd/tools/firehose/client.go similarity index 94% rename from cmd/tools/tools_firehose_client.go rename to cmd/tools/firehose/client.go index 02eda5b..d416ff3 100644 --- a/cmd/tools/tools_firehose_client.go +++ b/cmd/tools/firehose/client.go @@ -1,18 +1,20 @@ -package tools +package firehose import ( "context" "fmt" + "io" + "github.com/spf13/cobra" "github.com/streamingfast/cli/sflags" firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-core/types" "github.com/streamingfast/jsonpb" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" - "io" ) -func newToolsFirehoseClientCmd[B firecore.Block](chain *firecore.Chain[B], logger *zap.Logger) *cobra.Command { +func NewToolsFirehoseClientCmd[B firecore.Block](chain *firecore.Chain[B], logger *zap.Logger) *cobra.Command { cmd := &cobra.Command{ Use: "firehose-client ", Short: "Connects to a Firehose endpoint over gRPC and print block stream as JSON to terminal", @@ -43,7 +45,7 @@ func getFirehoseClientE[B firecore.Block](chain *firecore.Chain[B], rootLog *zap } defer connClose() - blockRange, err := GetBlockRangeFromArg(args[1]) + blockRange, err := types.GetBlockRangeFromArg(args[1]) if err != nil { return fmt.Errorf("invalid range %q: %w", args[1], err) } diff --git a/cmd/tools/firehose/firehose.go b/cmd/tools/firehose/firehose.go new file mode 100644 index 0000000..6c81705 --- /dev/null +++ b/cmd/tools/firehose/firehose.go @@ -0,0 +1,123 @@ +package firehose + +import ( + "fmt" + "os" + + "github.com/mostynb/go-grpc-compression/zstd" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/streamingfast/cli/sflags" + firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-core/firehose/client" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" + "google.golang.org/protobuf/types/known/anypb" +) + +type firehoseRequestInfo struct { + GRPCCallOpts []grpc.CallOption + Cursor string + FinalBlocksOnly bool + Transforms []*anypb.Any +} + +func getFirehoseFetchClientFromCmd[B firecore.Block](cmd *cobra.Command, logger *zap.Logger, endpoint string, chain *firecore.Chain[B]) ( + firehoseClient pbfirehose.FetchClient, + connClose func() error, + requestInfo *firehoseRequestInfo, + err error, +) { + return getFirehoseClientFromCmd[B, pbfirehose.FetchClient](cmd, logger, "fetch-client", endpoint, chain) +} + +func getFirehoseStreamClientFromCmd[B firecore.Block](cmd *cobra.Command, logger *zap.Logger, endpoint string, chain *firecore.Chain[B]) ( + firehoseClient pbfirehose.StreamClient, + connClose func() error, + requestInfo *firehoseRequestInfo, + err error, +) { + return getFirehoseClientFromCmd[B, pbfirehose.StreamClient](cmd, logger, "stream-client", endpoint, chain) +} + +func getFirehoseClientFromCmd[B firecore.Block, C any](cmd *cobra.Command, logger *zap.Logger, kind string, endpoint string, chain *firecore.Chain[B]) ( + firehoseClient C, + connClose func() error, + requestInfo *firehoseRequestInfo, + err error, +) { + requestInfo = &firehoseRequestInfo{} + + jwt := os.Getenv(sflags.MustGetString(cmd, "api-token-env-var")) + plaintext := sflags.MustGetBool(cmd, "plaintext") + insecure := sflags.MustGetBool(cmd, "insecure") + + if sflags.FlagDefined(cmd, "cursor") { + requestInfo.Cursor = sflags.MustGetString(cmd, "cursor") + } + + if sflags.FlagDefined(cmd, "final-blocks-only") { + requestInfo.FinalBlocksOnly = sflags.MustGetBool(cmd, "final-blocks-only") + } + + var rawClient any + if kind == "stream-client" { + rawClient, connClose, requestInfo.GRPCCallOpts, err = client.NewFirehoseClient(endpoint, jwt, insecure, plaintext) + } else if kind == "fetch-client" { + rawClient, connClose, err = client.NewFirehoseFetchClient(endpoint, jwt, insecure, plaintext) + } else { + panic(fmt.Errorf("unsupported Firehose client kind: %s", kind)) + } + + if err != nil { + return firehoseClient, nil, nil, err + } + + firehoseClient = rawClient.(C) + + compression := sflags.MustGetString(cmd, "compression") + var compressor grpc.CallOption + switch compression { + case "gzip": + compressor = grpc.UseCompressor(gzip.Name) + case "zstd": + compressor = grpc.UseCompressor(zstd.Name) + case "none": + // Valid value but nothing to do + default: + return firehoseClient, nil, nil, fmt.Errorf("invalid value for compression: only 'gzip', 'zstd' or 'none' are accepted") + + } + + if compressor != nil { + requestInfo.GRPCCallOpts = append(requestInfo.GRPCCallOpts, compressor) + } + + if chain.Tools.TransformFlags != nil { + requestInfo.Transforms, err = chain.Tools.TransformFlags.Parse(cmd, logger) + } + + if err != nil { + return firehoseClient, nil, nil, fmt.Errorf("unable to parse transforms flags: %w", err) + } + + return +} + +func addFirehoseStreamClientFlagsToSet[B firecore.Block](flags *pflag.FlagSet, chain *firecore.Chain[B]) { + addFirehoseFetchClientFlagsToSet(flags, chain) + + flags.String("cursor", "", "Use this cursor with the request to resume your stream at the following block pointed by the cursor") +} + +func addFirehoseFetchClientFlagsToSet[B firecore.Block](flags *pflag.FlagSet, chain *firecore.Chain[B]) { + flags.StringP("api-token-env-var", "a", "FIREHOSE_API_TOKEN", "Look for a JWT in this environment variable to authenticate against endpoint") + flags.String("compression", "none", "The HTTP compression: use either 'none', 'gzip' or 'zstd'") + flags.BoolP("plaintext", "p", false, "Use plaintext connection to Firehose") + flags.BoolP("insecure", "k", false, "Use SSL connection to Firehose but skip SSL certificate validation") + if chain.Tools.TransformFlags != nil { + chain.Tools.TransformFlags.Register(flags) + } +} diff --git a/cmd/tools/tools_firehose_prometheus_exporter.go b/cmd/tools/firehose/prometheus_exporter.go similarity index 97% rename from cmd/tools/tools_firehose_prometheus_exporter.go rename to cmd/tools/firehose/prometheus_exporter.go index 5cc8a00..10b9971 100644 --- a/cmd/tools/tools_firehose_prometheus_exporter.go +++ b/cmd/tools/firehose/prometheus_exporter.go @@ -1,4 +1,4 @@ -package tools +package firehose import ( "context" @@ -22,7 +22,7 @@ var lastBlockReceived time.Time var driftSec = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "firehose_healthcheck_drift", Help: "Time since the most recent block received (seconds)"}, []string{"endpoint"}) // You should add your custom 'transforms' flags to this command in your init(), then parse them in transformsSetter -func newToolsFirehosePrometheusExporterCmd[B firecore.Block](chain *firecore.Chain[B], zlog *zap.Logger, tracer logging.Tracer) *cobra.Command { +func NewToolsFirehosePrometheusExporterCmd[B firecore.Block](chain *firecore.Chain[B], zlog *zap.Logger, tracer logging.Tracer) *cobra.Command { cmd := &cobra.Command{ Use: "firehose-prometheus-exporter ", Short: "stream blocks near the chain HEAD and report to prometheus", diff --git a/cmd/tools/tools_firehose_single_block_client.go b/cmd/tools/firehose/single_block_client.go similarity index 96% rename from cmd/tools/tools_firehose_single_block_client.go rename to cmd/tools/firehose/single_block_client.go index bf2dbee..b6b0a5a 100644 --- a/cmd/tools/tools_firehose_single_block_client.go +++ b/cmd/tools/firehose/single_block_client.go @@ -1,4 +1,4 @@ -package tools +package firehose import ( "context" @@ -15,7 +15,7 @@ import ( ) // You should add your custom 'transforms' flags to this command in your init(), then parse them in transformsSetter -func newToolsFirehoseSingleBlockClientCmd[B firecore.Block](chain *firecore.Chain[B], zlog *zap.Logger, tracer logging.Tracer) *cobra.Command { +func NewToolsFirehoseSingleBlockClientCmd[B firecore.Block](chain *firecore.Chain[B], zlog *zap.Logger, tracer logging.Tracer) *cobra.Command { cmd := &cobra.Command{ Use: "firehose-single-block-client {endpoint} {block_num|block_num:block_id|cursor}", Short: "fetch a single block from firehose and print as JSON", diff --git a/cmd/tools/tools_download_from_firehose.go b/cmd/tools/firehose/tools_download_from_firehose.go similarity index 95% rename from cmd/tools/tools_download_from_firehose.go rename to cmd/tools/firehose/tools_download_from_firehose.go index 5d4995e..0ff8562 100644 --- a/cmd/tools/tools_download_from_firehose.go +++ b/cmd/tools/firehose/tools_download_from_firehose.go @@ -1,8 +1,9 @@ -package tools +package firehose import ( "context" "fmt" + "io" "strconv" "time" @@ -18,7 +19,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" ) -func newToolsDownloadFromFirehoseCmd[B firecore.Block](chain *firecore.Chain[B], zlog *zap.Logger) *cobra.Command { +func NewToolsDownloadFromFirehoseCmd[B firecore.Block](chain *firecore.Chain[B], zlog *zap.Logger) *cobra.Command { cmd := &cobra.Command{ Use: "download-from-firehose ", Short: "Download blocks from Firehose and save them to merged-blocks", @@ -68,10 +69,10 @@ func createToolsDownloadFromFirehoseE[B firecore.Block](chain *firecore.Chain[B] return err } - mergeWriter := &mergedBlocksWriter{ - store: store, - tweakBlock: func(b *pbbstream.Block) (*pbbstream.Block, error) { return b, nil }, - logger: zlog, + mergeWriter := &firecore.MergedBlocksWriter{ + Store: store, + TweakBlock: func(b *pbbstream.Block) (*pbbstream.Block, error) { return b, nil }, + Logger: zlog, } approximateLIBWarningIssued := false diff --git a/cmd/tools/tools_print.go b/cmd/tools/print/tools_print.go similarity index 88% rename from cmd/tools/tools_print.go rename to cmd/tools/print/tools_print.go index 0ac63d0..04aa29b 100644 --- a/cmd/tools/tools_print.go +++ b/cmd/tools/print/tools_print.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tools +package print import ( "fmt" @@ -20,6 +20,8 @@ import ( "os" "strconv" + "github.com/streamingfast/firehose-core/types" + "github.com/spf13/cobra" "github.com/streamingfast/bstream" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" @@ -31,25 +33,23 @@ import ( "google.golang.org/protobuf/proto" ) -var toolsPrintCmd = &cobra.Command{ - Use: "print", - Short: "Prints of one block or merged blocks file", -} - -var toolsPrintOneBlockCmd = &cobra.Command{ - Use: "one-block ", - Short: "Prints a block from a one-block file", - Args: cobra.ExactArgs(2), -} +func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B]) *cobra.Command { + toolsPrintCmd := &cobra.Command{ + Use: "print", + Short: "Prints of one block or merged blocks file", + } -var toolsPrintMergedBlocksCmd = &cobra.Command{ - Use: "merged-blocks ", - Short: "Prints the content summary of a merged blocks file.", - Args: cobra.ExactArgs(2), -} + toolsPrintOneBlockCmd := &cobra.Command{ + Use: "one-block ", + Short: "Prints a block from a one-block file", + Args: cobra.ExactArgs(2), + } -func init() { - ToolsCmd.AddCommand(toolsPrintCmd) + toolsPrintMergedBlocksCmd := &cobra.Command{ + Use: "merged-blocks ", + Short: "Prints the content summary of a merged blocks file.", + Args: cobra.ExactArgs(2), + } toolsPrintCmd.AddCommand(toolsPrintOneBlockCmd) toolsPrintCmd.AddCommand(toolsPrintMergedBlocksCmd) @@ -57,11 +57,11 @@ func init() { toolsPrintCmd.PersistentFlags().StringP("output", "o", "text", "Output mode for block printing, either 'text', 'json' or 'jsonl'") toolsPrintCmd.PersistentFlags().StringSlice("proto-paths", []string{"~/.proto"}, "Paths to proto files to use for dynamic decoding of blocks") toolsPrintCmd.PersistentFlags().Bool("transactions", false, "When in 'text' output mode, also print transactions summary") -} -func configureToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B]) { toolsPrintOneBlockCmd.RunE = createToolsPrintOneBlockE(chain) toolsPrintMergedBlocksCmd.RunE = createToolsPrintMergedBlocksE(chain) + + return toolsPrintCmd } func createToolsPrintMergedBlocksE[B firecore.Block](chain *firecore.Chain[B]) firecore.CommandExecutor { @@ -85,7 +85,7 @@ func createToolsPrintMergedBlocksE[B firecore.Block](chain *firecore.Chain[B]) f if err != nil { return fmt.Errorf("invalid base block %q: %w", args[1], err) } - blockBoundary := RoundToBundleStartBlock(startBlock, 100) + blockBoundary := types.RoundToBundleStartBlock(startBlock, 100) filename := fmt.Sprintf("%010d", blockBoundary) reader, err := store.OpenObject(ctx, filename) @@ -215,13 +215,13 @@ func toolsPrintCmdGetOutputMode(cmd *cobra.Command) (PrintOutputMode, error) { return out, nil } -func displayBlock[B firecore.Block](pbBlock *pbbstream.Block, chain *firecore.Chain[B], outputMode PrintOutputMode, printTransactions bool, jencoder *jsonencoder.Encoder) error { +func displayBlock[B firecore.Block](pbBlock *pbbstream.Block, chain *firecore.Chain[B], outputMode PrintOutputMode, printTransactions bool, encoder *jsonencoder.Encoder) error { if pbBlock == nil { return fmt.Errorf("block is nil") } if outputMode == PrintOutputModeText { - if err := printBStreamBlock(pbBlock, printTransactions, os.Stdout); err != nil { + if err := PrintBStreamBlock(pbBlock, printTransactions, os.Stdout); err != nil { return fmt.Errorf("pbBlock text printing: %w", err) } return nil @@ -241,7 +241,7 @@ func displayBlock[B firecore.Block](pbBlock *pbbstream.Block, chain *firecore.Ch } } - err := jencoder.Marshal(marshallableBlock) + err := encoder.Marshal(marshallableBlock) if err != nil { return fmt.Errorf("pbBlock JSON printing: json marshal: %w", err) } @@ -250,13 +250,13 @@ func displayBlock[B firecore.Block](pbBlock *pbbstream.Block, chain *firecore.Ch // since we are running directly the firecore binary we will *NOT* use the BlockFactory if isLegacyBlock { - return jencoder.MarshalLegacy(pbBlock.GetPayloadKind(), pbBlock.GetPayloadBuffer()) + return encoder.MarshalLegacy(pbBlock.GetPayloadKind(), pbBlock.GetPayloadBuffer()) } - return jencoder.Marshal(pbBlock.Payload) + return encoder.Marshal(pbBlock.Payload) } -func printBStreamBlock(b *pbbstream.Block, printTransactions bool, out io.Writer) error { +func PrintBStreamBlock(b *pbbstream.Block, printTransactions bool, out io.Writer) error { _, err := out.Write( []byte( fmt.Sprintf( diff --git a/cmd/tools/tools_print_enum.go b/cmd/tools/print/tools_print_enum.go similarity index 99% rename from cmd/tools/tools_print_enum.go rename to cmd/tools/print/tools_print_enum.go index 5749abf..1c70e04 100644 --- a/cmd/tools/tools_print_enum.go +++ b/cmd/tools/print/tools_print_enum.go @@ -4,7 +4,7 @@ // Build Date: // Built By: -package tools +package print import ( "fmt" diff --git a/cmd/tools/tools.go b/cmd/tools/tools.go index ff13269..9d20cf3 100644 --- a/cmd/tools/tools.go +++ b/cmd/tools/tools.go @@ -16,37 +16,33 @@ package tools import ( "fmt" - "os" - "github.com/mostynb/go-grpc-compression/zstd" "github.com/spf13/cobra" - "github.com/spf13/pflag" - "github.com/streamingfast/cli/sflags" firecore "github.com/streamingfast/firehose-core" - "github.com/streamingfast/firehose-core/firehose/client" + "github.com/streamingfast/firehose-core/cmd/tools/check" + "github.com/streamingfast/firehose-core/cmd/tools/firehose" + print2 "github.com/streamingfast/firehose-core/cmd/tools/print" "github.com/streamingfast/logging" - pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/encoding/gzip" - "google.golang.org/protobuf/types/known/anypb" ) var ToolsCmd = &cobra.Command{Use: "tools", Short: "Developer tools for operators and developers"} +var MaxUint64 = ^uint64(0) func ConfigureToolsCmd[B firecore.Block]( chain *firecore.Chain[B], logger *zap.Logger, tracer logging.Tracer, ) error { - configureToolsCheckCmd(chain, logger) - configureToolsPrintCmd(chain) + + ToolsCmd.AddCommand(check.NewCheckCommand(chain, logger)) + ToolsCmd.AddCommand(print2.NewToolsPrintCmd(chain)) ToolsCmd.AddCommand(newToolsCompareBlocksCmd(chain)) - ToolsCmd.AddCommand(newToolsDownloadFromFirehoseCmd(chain, logger)) - ToolsCmd.AddCommand(newToolsFirehoseClientCmd(chain, logger)) - ToolsCmd.AddCommand(newToolsFirehoseSingleBlockClientCmd(chain, logger, tracer)) - ToolsCmd.AddCommand(newToolsFirehosePrometheusExporterCmd(chain, logger, tracer)) + ToolsCmd.AddCommand(firehose.NewToolsDownloadFromFirehoseCmd(chain, logger)) + ToolsCmd.AddCommand(firehose.NewToolsFirehoseClientCmd(chain, logger)) + ToolsCmd.AddCommand(firehose.NewToolsFirehoseSingleBlockClientCmd(chain, logger, tracer)) + ToolsCmd.AddCommand(firehose.NewToolsFirehosePrometheusExporterCmd(chain, logger, tracer)) ToolsCmd.AddCommand(newToolsUnmergeBlocksCmd(chain, logger)) ToolsCmd.AddCommand(newToolsFixBloatedMergedBlocks(chain, logger)) @@ -71,108 +67,3 @@ func ConfigureToolsCmd[B firecore.Block]( return nil } - -func addFirehoseStreamClientFlagsToSet[B firecore.Block](flags *pflag.FlagSet, chain *firecore.Chain[B]) { - addFirehoseFetchClientFlagsToSet(flags, chain) - - flags.String("cursor", "", "Use this cursor with the request to resume your stream at the following block pointed by the cursor") -} - -func addFirehoseFetchClientFlagsToSet[B firecore.Block](flags *pflag.FlagSet, chain *firecore.Chain[B]) { - flags.StringP("api-token-env-var", "a", "FIREHOSE_API_TOKEN", "Look for a JWT in this environment variable to authenticate against endpoint") - flags.String("compression", "none", "The HTTP compression: use either 'none', 'gzip' or 'zstd'") - flags.BoolP("plaintext", "p", false, "Use plaintext connection to Firehose") - flags.BoolP("insecure", "k", false, "Use SSL connection to Firehose but skip SSL certificate validation") - if chain.Tools.TransformFlags != nil { - chain.Tools.TransformFlags.Register(flags) - } -} - -type firehoseRequestInfo struct { - GRPCCallOpts []grpc.CallOption - Cursor string - FinalBlocksOnly bool - Transforms []*anypb.Any -} - -func getFirehoseFetchClientFromCmd[B firecore.Block](cmd *cobra.Command, logger *zap.Logger, endpoint string, chain *firecore.Chain[B]) ( - firehoseClient pbfirehose.FetchClient, - connClose func() error, - requestInfo *firehoseRequestInfo, - err error, -) { - return getFirehoseClientFromCmd[B, pbfirehose.FetchClient](cmd, logger, "fetch-client", endpoint, chain) -} - -func getFirehoseStreamClientFromCmd[B firecore.Block](cmd *cobra.Command, logger *zap.Logger, endpoint string, chain *firecore.Chain[B]) ( - firehoseClient pbfirehose.StreamClient, - connClose func() error, - requestInfo *firehoseRequestInfo, - err error, -) { - return getFirehoseClientFromCmd[B, pbfirehose.StreamClient](cmd, logger, "stream-client", endpoint, chain) -} - -func getFirehoseClientFromCmd[B firecore.Block, C any](cmd *cobra.Command, logger *zap.Logger, kind string, endpoint string, chain *firecore.Chain[B]) ( - firehoseClient C, - connClose func() error, - requestInfo *firehoseRequestInfo, - err error, -) { - requestInfo = &firehoseRequestInfo{} - - jwt := os.Getenv(sflags.MustGetString(cmd, "api-token-env-var")) - plaintext := sflags.MustGetBool(cmd, "plaintext") - insecure := sflags.MustGetBool(cmd, "insecure") - - if sflags.FlagDefined(cmd, "cursor") { - requestInfo.Cursor = sflags.MustGetString(cmd, "cursor") - } - - if sflags.FlagDefined(cmd, "final-blocks-only") { - requestInfo.FinalBlocksOnly = sflags.MustGetBool(cmd, "final-blocks-only") - } - - var rawClient any - if kind == "stream-client" { - rawClient, connClose, requestInfo.GRPCCallOpts, err = client.NewFirehoseClient(endpoint, jwt, insecure, plaintext) - } else if kind == "fetch-client" { - rawClient, connClose, err = client.NewFirehoseFetchClient(endpoint, jwt, insecure, plaintext) - } else { - panic(fmt.Errorf("unsupported Firehose client kind: %s", kind)) - } - - if err != nil { - return firehoseClient, nil, nil, err - } - - firehoseClient = rawClient.(C) - - compression := sflags.MustGetString(cmd, "compression") - var compressor grpc.CallOption - switch compression { - case "gzip": - compressor = grpc.UseCompressor(gzip.Name) - case "zstd": - compressor = grpc.UseCompressor(zstd.Name) - case "none": - // Valid value but nothing to do - default: - return firehoseClient, nil, nil, fmt.Errorf("invalid value for compression: only 'gzip', 'zstd' or 'none' are accepted") - - } - - if compressor != nil { - requestInfo.GRPCCallOpts = append(requestInfo.GRPCCallOpts, compressor) - } - - if chain.Tools.TransformFlags != nil { - requestInfo.Transforms, err = chain.Tools.TransformFlags.Parse(cmd, logger) - } - - if err != nil { - return firehoseClient, nil, nil, fmt.Errorf("unable to parse transforms flags: %w", err) - } - - return -} diff --git a/cmd/tools/tools_compare_blocks.go b/cmd/tools/tools_compare_blocks.go index 3997b38..d38c009 100644 --- a/cmd/tools/tools_compare_blocks.go +++ b/cmd/tools/tools_compare_blocks.go @@ -31,6 +31,8 @@ import ( "github.com/streamingfast/cli/sflags" "github.com/streamingfast/dstore" firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-core/cmd/tools/check" + "github.com/streamingfast/firehose-core/types" "go.uber.org/multierr" "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/proto" @@ -82,7 +84,7 @@ func runCompareBlocksE[B firecore.Block](chain *firecore.Chain[B]) firecore.Comm warnAboutExtraBlocks := sync.Once{} ctx := cmd.Context() - blockRange, err := GetBlockRangeFromArg(args[2]) + blockRange, err := types.GetBlockRangeFromArg(args[2]) if err != nil { return fmt.Errorf("parsing range: %w", err) } @@ -103,7 +105,7 @@ func runCompareBlocksE[B firecore.Block](chain *firecore.Chain[B]) firecore.Comm return fmt.Errorf("unable to create store at path %q: %w", args[1], err) } - segments, err := blockRange.Split(segmentSize, EndBoundaryExclusive) + segments, err := blockRange.Split(segmentSize, types.EndBoundaryExclusive) if err != nil { return fmt.Errorf("unable to split blockrage in segments: %w", err) } @@ -111,7 +113,7 @@ func runCompareBlocksE[B firecore.Block](chain *firecore.Chain[B]) firecore.Comm segments: segments, } - err = storeReference.Walk(ctx, WalkBlockPrefix(blockRange, 100), func(filename string) (err error) { + err = storeReference.Walk(ctx, check.WalkBlockPrefix(blockRange, 100), func(filename string) (err error) { fileStartBlock, err := strconv.Atoi(filename) if err != nil { return fmt.Errorf("parsing filename: %w", err) @@ -122,7 +124,7 @@ func runCompareBlocksE[B firecore.Block](chain *firecore.Chain[B]) firecore.Comm return dstore.StopIteration } - if blockRange.Contains(uint64(fileStartBlock), EndBoundaryExclusive) { + if blockRange.Contains(uint64(fileStartBlock), types.EndBoundaryExclusive) { var wg sync.WaitGroup var bundleErrLock sync.Mutex var bundleReadErr error @@ -247,7 +249,7 @@ func readBundle[B firecore.Block]( } type state struct { - segments []BlockRange + segments []types.BlockRange currentSegmentIdx int blocksCountedInThisSegment int differencesFound int @@ -256,10 +258,10 @@ type state struct { } func (s *state) process(blockNum uint64, isDifferent bool, isMissing bool) { - if !s.segments[s.currentSegmentIdx].Contains(blockNum, EndBoundaryExclusive) { // moving forward + if !s.segments[s.currentSegmentIdx].Contains(blockNum, types.EndBoundaryExclusive) { // moving forward s.print() for i := s.currentSegmentIdx; i < len(s.segments); i++ { - if s.segments[i].Contains(blockNum, EndBoundaryExclusive) { + if s.segments[i].Contains(blockNum, types.EndBoundaryExclusive) { s.currentSegmentIdx = i s.totalBlocksCounted += s.blocksCountedInThisSegment s.differencesFound = 0 diff --git a/cmd/tools/tools_fix_bloated_merged_blocks.go b/cmd/tools/tools_fix_bloated_merged_blocks.go index 063a78c..6be4194 100644 --- a/cmd/tools/tools_fix_bloated_merged_blocks.go +++ b/cmd/tools/tools_fix_bloated_merged_blocks.go @@ -9,6 +9,8 @@ import ( pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/dstore" firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-core/cmd/tools/check" + "github.com/streamingfast/firehose-core/types" "go.uber.org/zap" ) @@ -35,12 +37,12 @@ func runFixBloatedMergedBlocksE(zlog *zap.Logger) firecore.CommandExecutor { return fmt.Errorf("unable to create destination store: %w", err) } - blockRange, err := GetBlockRangeFromArg(args[2]) + blockRange, err := types.GetBlockRangeFromArg(args[2]) if err != nil { return fmt.Errorf("parsing block range: %w", err) } - err = srcStore.Walk(ctx, WalkBlockPrefix(blockRange, 100), func(filename string) error { + err = srcStore.Walk(ctx, check.WalkBlockPrefix(blockRange, 100), func(filename string) error { zlog.Debug("checking merged block file", zap.String("filename", filename)) startBlock := mustParseUint64(filename) @@ -66,10 +68,10 @@ func runFixBloatedMergedBlocksE(zlog *zap.Logger) firecore.CommandExecutor { return fmt.Errorf("creating block reader: %w", err) } - mergeWriter := &mergedBlocksWriter{ - store: destStore, - tweakBlock: func(b *pbbstream.Block) (*pbbstream.Block, error) { return b, nil }, - logger: zlog, + mergeWriter := &firecore.MergedBlocksWriter{ + Store: destStore, + TweakBlock: func(b *pbbstream.Block) (*pbbstream.Block, error) { return b, nil }, + Logger: zlog, } seen := make(map[string]bool) @@ -84,11 +86,11 @@ func runFixBloatedMergedBlocksE(zlog *zap.Logger) firecore.CommandExecutor { break } - if block.Number < uint64(startBlock) { + if block.Number < startBlock { continue } - if block.Number > uint64(blockRange.GetStopBlockOr(MaxUint64)) { + if block.Number > blockRange.GetStopBlockOr(MaxUint64) { break } diff --git a/cmd/tools/tools_unmerge_blocks.go b/cmd/tools/tools_unmerge_blocks.go index 8d90e5b..f448aa5 100644 --- a/cmd/tools/tools_unmerge_blocks.go +++ b/cmd/tools/tools_unmerge_blocks.go @@ -5,6 +5,9 @@ import ( "io" "strconv" + "github.com/streamingfast/firehose-core/cmd/tools/check" + "github.com/streamingfast/firehose-core/types" + "github.com/spf13/cobra" "github.com/streamingfast/bstream" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" @@ -37,12 +40,12 @@ func runUnmergeBlocksE(zlog *zap.Logger) firecore.CommandExecutor { return fmt.Errorf("unable to create destination store: %w", err) } - blockRange, err := GetBlockRangeFromArg(args[2]) + blockRange, err := types.GetBlockRangeFromArg(args[2]) if err != nil { return fmt.Errorf("parsing block range: %w", err) } - err = srcStore.Walk(ctx, WalkBlockPrefix(blockRange, 100), func(filename string) error { + err = srcStore.Walk(ctx, check.WalkBlockPrefix(blockRange, 100), func(filename string) error { zlog.Debug("checking merged block file", zap.String("filename", filename)) startBlock := mustParseUint64(filename) diff --git a/cmd/tools/tools_upgrade_merged_blocks.go b/cmd/tools/tools_upgrade_merged_blocks.go index f0025ae..e277329 100644 --- a/cmd/tools/tools_upgrade_merged_blocks.go +++ b/cmd/tools/tools_upgrade_merged_blocks.go @@ -8,7 +8,6 @@ import ( "strconv" "github.com/spf13/cobra" - "github.com/streamingfast/bstream" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/bstream/stream" "github.com/streamingfast/dstore" @@ -49,12 +48,12 @@ func getMergedBlockUpgrader(tweakFunc func(block *pbbstream.Block) (*pbbstream.B } rootLog.Info("starting block upgrader process", zap.Uint64("start", start), zap.Uint64("stop", stop), zap.String("source", source), zap.String("dest", dest)) - writer := &mergedBlocksWriter{ - cmd: cmd, - store: destStore, - lowBlockNum: lowBoundary(start), - stopBlockNum: stop, - tweakBlock: tweakFunc, + writer := &firecore.MergedBlocksWriter{ + Cmd: cmd, + Store: destStore, + LowBlockNum: firecore.LowBoundary(start), + StopBlockNum: stop, + TweakBlock: tweakFunc, } stream := stream.New(nil, sourceStore, nil, int64(start), writer, stream.WithFinalBlocksOnly()) @@ -66,104 +65,3 @@ func getMergedBlockUpgrader(tweakFunc func(block *pbbstream.Block) (*pbbstream.B return err } } - -type mergedBlocksWriter struct { - store dstore.Store - lowBlockNum uint64 - stopBlockNum uint64 - - blocks []*pbbstream.Block - logger *zap.Logger - cmd *cobra.Command - - tweakBlock func(*pbbstream.Block) (*pbbstream.Block, error) -} - -func (w *mergedBlocksWriter) ProcessBlock(blk *pbbstream.Block, obj interface{}) error { - if w.tweakBlock != nil { - b, err := w.tweakBlock(blk) - if err != nil { - return fmt.Errorf("tweaking block: %w", err) - } - blk = b - } - - if w.lowBlockNum == 0 && blk.Number > 99 { // initial block - if blk.Number%100 != 0 && blk.Number != bstream.GetProtocolFirstStreamableBlock { - return fmt.Errorf("received unexpected block %s (not a boundary, not the first streamable block %d)", blk, bstream.GetProtocolFirstStreamableBlock) - } - w.lowBlockNum = lowBoundary(blk.Number) - w.logger.Debug("setting initial boundary to %d upon seeing block %s", zap.Uint64("low_boundary", w.lowBlockNum), zap.Stringer("blk", blk)) - } - - if blk.Number > w.lowBlockNum+99 { - w.logger.Debug("bundling because we saw block %s from next bundle (%d was not seen, it must not exist on this chain)", zap.Stringer("blk", blk), zap.Uint64("last_bundle_block", w.lowBlockNum+99)) - if err := w.writeBundle(); err != nil { - return err - } - } - - if w.stopBlockNum > 0 && blk.Number >= w.stopBlockNum { - return io.EOF - } - - w.blocks = append(w.blocks, blk) - - if blk.Number == w.lowBlockNum+99 { - w.logger.Debug("bundling on last bundle block", zap.Uint64("last_bundle_block", w.lowBlockNum+99)) - if err := w.writeBundle(); err != nil { - return err - } - return nil - } - - return nil -} - -func filename(num uint64) string { - return fmt.Sprintf("%010d", num) -} - -func (w *mergedBlocksWriter) writeBundle() error { - file := filename(w.lowBlockNum) - w.logger.Info("writing merged file to store (suffix: .dbin.zst)", zap.String("filename", file), zap.Uint64("lowBlockNum", w.lowBlockNum)) - - if len(w.blocks) == 0 { - return fmt.Errorf("no blocks to write to bundle") - } - - pr, pw := io.Pipe() - - go func() { - var err error - defer func() { - pw.CloseWithError(err) - }() - - blockWriter, err := bstream.NewDBinBlockWriter(pw) - if err != nil { - return - } - - for _, blk := range w.blocks { - err = blockWriter.Write(blk) - if err != nil { - return - } - } - }() - - err := w.store.WriteObject(context.Background(), file, pr) - if err != nil { - w.logger.Error("writing to store", zap.Error(err)) - } - - w.lowBlockNum += 100 - w.blocks = nil - - return err -} - -func lowBoundary(i uint64) uint64 { - return i - (i % 100) -} diff --git a/jsonencoder/encoder.go b/jsonencoder/encoder.go index 15094fb..13515b7 100644 --- a/jsonencoder/encoder.go +++ b/jsonencoder/encoder.go @@ -4,12 +4,10 @@ import ( "fmt" "os" - pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" - - "github.com/streamingfast/firehose-core/protoregistry" - "github.com/go-json-experiment/json" "github.com/go-json-experiment/json/jsontext" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/firehose-core/protoregistry" ) type Encoder struct { diff --git a/mergedblockswriter.go b/mergedblockswriter.go new file mode 100644 index 0000000..d141846 --- /dev/null +++ b/mergedblockswriter.go @@ -0,0 +1,113 @@ +package firecore + +import ( + "context" + "fmt" + "io" + + "github.com/spf13/cobra" + "github.com/streamingfast/bstream" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dstore" + "go.uber.org/zap" +) + +type MergedBlocksWriter struct { + Store dstore.Store + LowBlockNum uint64 + StopBlockNum uint64 + + blocks []*pbbstream.Block + Logger *zap.Logger + Cmd *cobra.Command + + TweakBlock func(*pbbstream.Block) (*pbbstream.Block, error) +} + +func (w *MergedBlocksWriter) ProcessBlock(blk *pbbstream.Block, obj interface{}) error { + if w.TweakBlock != nil { + b, err := w.TweakBlock(blk) + if err != nil { + return fmt.Errorf("tweaking block: %w", err) + } + blk = b + } + + if w.LowBlockNum == 0 && blk.Number > 99 { // initial block + if blk.Number%100 != 0 && blk.Number != bstream.GetProtocolFirstStreamableBlock { + return fmt.Errorf("received unexpected block %s (not a boundary, not the first streamable block %d)", blk, bstream.GetProtocolFirstStreamableBlock) + } + w.LowBlockNum = LowBoundary(blk.Number) + w.Logger.Debug("setting initial boundary to %d upon seeing block %s", zap.Uint64("low_boundary", w.LowBlockNum), zap.Stringer("blk", blk)) + } + + if blk.Number > w.LowBlockNum+99 { + w.Logger.Debug("bundling because we saw block %s from next bundle (%d was not seen, it must not exist on this chain)", zap.Stringer("blk", blk), zap.Uint64("last_bundle_block", w.LowBlockNum+99)) + if err := w.writeBundle(); err != nil { + return err + } + } + + if w.StopBlockNum > 0 && blk.Number >= w.StopBlockNum { + return io.EOF + } + + w.blocks = append(w.blocks, blk) + + if blk.Number == w.LowBlockNum+99 { + w.Logger.Debug("bundling on last bundle block", zap.Uint64("last_bundle_block", w.LowBlockNum+99)) + if err := w.writeBundle(); err != nil { + return err + } + return nil + } + + return nil +} + +func (w *MergedBlocksWriter) writeBundle() error { + file := filename(w.LowBlockNum) + w.Logger.Info("writing merged file to store (suffix: .dbin.zst)", zap.String("filename", file), zap.Uint64("lowBlockNum", w.LowBlockNum)) + + if len(w.blocks) == 0 { + return fmt.Errorf("no blocks to write to bundle") + } + + pr, pw := io.Pipe() + + go func() { + var err error + defer func() { + pw.CloseWithError(err) + }() + + blockWriter, err := bstream.NewDBinBlockWriter(pw) + if err != nil { + return + } + + for _, blk := range w.blocks { + err = blockWriter.Write(blk) + if err != nil { + return + } + } + }() + + err := w.Store.WriteObject(context.Background(), file, pr) + if err != nil { + w.Logger.Error("writing to store", zap.Error(err)) + } + + w.LowBlockNum += 100 + w.blocks = nil + + return err +} +func filename(num uint64) string { + return fmt.Sprintf("%010d", num) +} + +func LowBoundary(i uint64) uint64 { + return i - (i % 100) +} diff --git a/cmd/tools/block_range.go b/types/block_range.go similarity index 99% rename from cmd/tools/block_range.go rename to types/block_range.go index d386e98..a51d3da 100644 --- a/cmd/tools/block_range.go +++ b/types/block_range.go @@ -1,4 +1,4 @@ -package tools +package types import ( "fmt" diff --git a/cmd/tools/block_range_enum.go b/types/block_range_enum.go similarity index 99% rename from cmd/tools/block_range_enum.go rename to types/block_range_enum.go index 7a1ec49..c7d4c86 100644 --- a/cmd/tools/block_range_enum.go +++ b/types/block_range_enum.go @@ -4,7 +4,7 @@ // Build Date: // Built By: -package tools +package types import ( "fmt" diff --git a/cmd/tools/flags.go b/types/flags.go similarity index 82% rename from cmd/tools/flags.go rename to types/flags.go index 7a35858..06b51b6 100644 --- a/cmd/tools/flags.go +++ b/types/flags.go @@ -1,4 +1,4 @@ -package tools +package types import ( "fmt" @@ -10,7 +10,7 @@ import ( ) func GetBlockRangeFromArg(in string) (out BlockRange, err error) { - return parseBlockRange(in, bstream.GetProtocolFirstStreamableBlock) + return ParseBlockRange(in, bstream.GetProtocolFirstStreamableBlock) } func GetBlockRangeFromFlag(cmd *cobra.Command, flagName string) (out BlockRange, err error) { @@ -25,7 +25,7 @@ func GetBlockRangeFromFlag(cmd *cobra.Command, flagName string) (out BlockRange, return out, fmt.Errorf("accepting a single range for now, got %d", len(rawRanges)) } - out, err = parseBlockRange(rawRanges[0], bstream.GetProtocolFirstStreamableBlock) + out, err = ParseBlockRange(rawRanges[0], bstream.GetProtocolFirstStreamableBlock) if err != nil { return out, fmt.Errorf("decode range: %w", err) } diff --git a/cmd/tools/types.go b/types/types.go similarity index 96% rename from cmd/tools/types.go rename to types/types.go index c1c5a63..e872b13 100644 --- a/cmd/tools/types.go +++ b/types/types.go @@ -1,4 +1,4 @@ -package tools +package types import ( "fmt" diff --git a/cmd/tools/types_test.go b/types/types_test.go similarity index 97% rename from cmd/tools/types_test.go rename to types/types_test.go index 1844009..01afea7 100644 --- a/cmd/tools/types_test.go +++ b/types/types_test.go @@ -1,4 +1,4 @@ -package tools +package types import ( "testing" diff --git a/cmd/tools/utils.go b/types/utils.go similarity index 97% rename from cmd/tools/utils.go rename to types/utils.go index 64a3313..c241b08 100644 --- a/cmd/tools/utils.go +++ b/types/utils.go @@ -1,4 +1,4 @@ -package tools +package types import ( "fmt" @@ -26,7 +26,7 @@ func PrettyBlockNum(b uint64) string { return "#" + strings.ReplaceAll(humanize.Comma(int64(b)), ",", " ") } -func parseBlockRange(input string, firstStreamableBlock uint64) (out BlockRange, err error) { +func ParseBlockRange(input string, firstStreamableBlock uint64) (out BlockRange, err error) { if input == "" || input == "-1" { return NewOpenRange(-1), nil } diff --git a/cmd/tools/utils_test.go b/types/utils_test.go similarity index 97% rename from cmd/tools/utils_test.go rename to types/utils_test.go index 6373d0a..25f4c8b 100644 --- a/cmd/tools/utils_test.go +++ b/types/utils_test.go @@ -1,4 +1,4 @@ -package tools +package types import ( "testing" @@ -57,7 +57,7 @@ func Test_readBlockRange(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := parseBlockRange(tt.args.blockRangeArg, tt.args.chainFirstStreamableBlock) + got, err := ParseBlockRange(tt.args.blockRangeArg, tt.args.chainFirstStreamableBlock) if tt.assertion == nil { tt.assertion = require.NoError