Skip to content

Commit

Permalink
refactor of tools and apps package
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Dec 5, 2023
1 parent b639760 commit f58cd08
Show file tree
Hide file tree
Showing 43 changed files with 276 additions and 229 deletions.
3 changes: 2 additions & 1 deletion chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/firehose-core/node-manager/mindreader"
"github.com/streamingfast/firehose-core/node-manager/operator"
"github.com/streamingfast/firehose-core/substreams"
"github.com/streamingfast/logging"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -149,7 +150,7 @@ type Chain[B Block] struct {
//
BlockEncoder BlockEncoder

RegisterSubstreamsExtensions func(chain *Chain[B]) ([]SubstreamsExtension, error)
RegisterSubstreamsExtensions func(chain *Chain[B]) ([]substreams.Extension, error)

// CoreBinaryEnabled is a flag that when set to true indicates that `firecore` binary is being run directly? (not through firexxx)
CoreBinaryEnabled bool
Expand Down
14 changes: 8 additions & 6 deletions firehose.go → cmd/apps/firehose.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package firecore
package apps

import (
"fmt"
Expand All @@ -12,24 +12,26 @@ import (
discoveryservice "github.com/streamingfast/dgrpc/server/discovery-service"
"github.com/streamingfast/dlauncher/launcher"
"github.com/streamingfast/dmetrics"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/firehose/app/firehose"
"github.com/streamingfast/firehose-core/firehose/server"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

var metricset = dmetrics.NewSet()
var headBlockNumMetric = metricset.NewHeadBlockNumber("firehose")
var headTimeDriftmetric = metricset.NewHeadTimeDrift("firehose")

func registerFirehoseApp[B Block](chain *Chain[B]) {
appLogger, _ := logging.PackageLogger("firehose", chain.LoggerPackageID("firehose"))
func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) {
appLogger, _ := logging.PackageLogger("firehose", "firehose")

launcher.RegisterApp(rootLog, &launcher.AppDef{
ID: "firehose",
Title: "Block Firehose",
Description: "Provides on-demand filtered blocks, depends on common-merged-blocks-store-url and common-live-blocks-addr",
RegisterFlags: func(cmd *cobra.Command) error {
cmd.Flags().String("firehose-grpc-listen-addr", FirehoseGRPCServingAddr, "Address on which the firehose will listen")
cmd.Flags().String("firehose-grpc-listen-addr", firecore.FirehoseGRPCServingAddr, "Address on which the firehose will listen")
cmd.Flags().String("firehose-discovery-service-url", "", "Url to configure the gRPC discovery service") //traffic-director://xds?vpc_network=vpc-global&use_xds_reds=true
cmd.Flags().Int("firehose-rate-limit-bucket-size", -1, "Rate limit bucket size (default: no rate limit)")
cmd.Flags().Duration("firehose-rate-limit-bucket-fill-rate", 10*time.Second, "Rate limit bucket refill rate (default: 10s)")
Expand All @@ -43,7 +45,7 @@ func registerFirehoseApp[B Block](chain *Chain[B]) {
return nil, fmt.Errorf("unable to initialize authenticator: %w", err)
}

mergedBlocksStoreURL, oneBlocksStoreURL, forkedBlocksStoreURL, err := GetCommonStoresURLs(runtime.AbsDataDir)
mergedBlocksStoreURL, oneBlocksStoreURL, forkedBlocksStoreURL, err := firecore.GetCommonStoresURLs(runtime.AbsDataDir)
if err != nil {
return nil, err
}
Expand All @@ -61,7 +63,7 @@ func registerFirehoseApp[B Block](chain *Chain[B]) {
}
}

indexStore, possibleIndexSizes, err := GetIndexStore(runtime.AbsDataDir)
indexStore, possibleIndexSizes, err := firecore.GetIndexStore(runtime.AbsDataDir)
if err != nil {
return nil, fmt.Errorf("unable to initialize indexes: %w", err)
}
Expand Down
12 changes: 7 additions & 5 deletions index_builder.go → cmd/apps/index_builder.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package firecore
package apps

import (
"context"
Expand All @@ -10,16 +10,18 @@ import (
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
bstransform "github.com/streamingfast/bstream/transform"
"github.com/streamingfast/dlauncher/launcher"
firecore "github.com/streamingfast/firehose-core"
index_builder "github.com/streamingfast/firehose-core/index-builder/app/index-builder"
"go.uber.org/zap"
)

func registerIndexBuilderApp[B Block](chain *Chain[B]) {
func RegisterIndexBuilderApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) {
launcher.RegisterApp(rootLog, &launcher.AppDef{
ID: "index-builder",
Title: "Index Builder",
Description: "App the builds indexes out of Firehose blocks",
RegisterFlags: func(cmd *cobra.Command) error {
cmd.Flags().String("index-builder-grpc-listen-addr", IndexBuilderServiceAddr, "Address to listen for grpc-based healthz check")
cmd.Flags().String("index-builder-grpc-listen-addr", firecore.IndexBuilderServiceAddr, "Address to listen for grpc-based healthz check")
cmd.Flags().Uint64("index-builder-index-size", 10000, "Size of index bundles that will be created")
cmd.Flags().Uint64("index-builder-start-block", 0, "Block number to start indexing")
cmd.Flags().Uint64("index-builder-stop-block", 0, "Block number to stop indexing")
Expand All @@ -29,12 +31,12 @@ func registerIndexBuilderApp[B Block](chain *Chain[B]) {
return nil
},
FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
mergedBlocksStoreURL, _, _, err := GetCommonStoresURLs(runtime.AbsDataDir)
mergedBlocksStoreURL, _, _, err := firecore.GetCommonStoresURLs(runtime.AbsDataDir)
if err != nil {
return nil, err
}

indexStore, lookupIdxSizes, err := GetIndexStore(runtime.AbsDataDir)
indexStore, lookupIdxSizes, err := firecore.GetIndexStore(runtime.AbsDataDir)
if err != nil {
return nil, err
}
Expand Down
11 changes: 7 additions & 4 deletions merger.go → cmd/apps/merger.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
package firecore
package apps

import (
"time"

firecore "github.com/streamingfast/firehose-core"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/streamingfast/dlauncher/launcher"
"github.com/streamingfast/firehose-core/merger/app/merger"
"go.uber.org/zap"
)

func registerMergerApp() {
func RegisterMergerApp(rootLog *zap.Logger) {
launcher.RegisterApp(rootLog, &launcher.AppDef{
ID: "merger",
Title: "Merger",
Description: "Produces merged block files from single-block files",
RegisterFlags: func(cmd *cobra.Command) error {
cmd.Flags().String("merger-grpc-listen-addr", MergerServingAddr, "Address to listen for incoming gRPC requests")
cmd.Flags().String("merger-grpc-listen-addr", firecore.MergerServingAddr, "Address to listen for incoming gRPC requests")
cmd.Flags().Uint64("merger-prune-forked-blocks-after", 50000, "Number of blocks that must pass before we delete old forks (one-block-files lingering)")
cmd.Flags().Uint64("merger-stop-block", 0, "If non-zero, merger will trigger shutdown when blocks have been merged up to this block")
cmd.Flags().Duration("merger-time-between-store-lookups", 1*time.Second, "Delay between source store polling (should be higher for remote storage)")
cmd.Flags().Duration("merger-time-between-store-pruning", time.Minute, "Delay between source store pruning loops")
return nil
},
FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
mergedBlocksStoreURL, oneBlocksStoreURL, forkedBlocksStoreURL, err := GetCommonStoresURLs(runtime.AbsDataDir)
mergedBlocksStoreURL, oneBlocksStoreURL, forkedBlocksStoreURL, err := firecore.GetCommonStoresURLs(runtime.AbsDataDir)
if err != nil {
return nil, err
}
Expand Down
40 changes: 24 additions & 16 deletions reader_node.go → cmd/apps/reader_node.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package firecore
package apps

import (
"context"
Expand All @@ -14,6 +14,7 @@ import (
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/cli"
"github.com/streamingfast/dlauncher/launcher"
firecore "github.com/streamingfast/firehose-core"
nodeManager "github.com/streamingfast/firehose-core/node-manager"
nodeManagerApp "github.com/streamingfast/firehose-core/node-manager/app/node_manager"
"github.com/streamingfast/firehose-core/node-manager/metrics"
Expand All @@ -27,8 +28,8 @@ import (
"google.golang.org/grpc"
)

func registerReaderNodeApp[B Block](chain *Chain[B]) {
appLogger, appTracer := logging.PackageLogger("reader", chain.LoggerPackageID("reader"))
func RegisterReaderNodeApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) {
appLogger, appTracer := logging.PackageLogger("reader", "reader")

launcher.RegisterApp(rootLog, &launcher.AppDef{
ID: "reader-node",
Expand All @@ -41,7 +42,7 @@ func registerReaderNodeApp[B Block](chain *Chain[B]) {
`))
cmd.Flags().String("reader-node-data-dir", "{data-dir}/reader/data", "Directory for node data")
cmd.Flags().Bool("reader-node-debug-firehose-logs", false, "[DEV] Prints firehose instrumentation logs to standard output, should be use for debugging purposes only")
cmd.Flags().String("reader-node-manager-api-addr", ReaderNodeManagerAPIAddr, "Acme node manager API address")
cmd.Flags().String("reader-node-manager-api-addr", firecore.ReaderNodeManagerAPIAddr, "Acme node manager API address")
cmd.Flags().Duration("reader-node-readiness-max-latency", 30*time.Second, "Determine the maximum head block latency at which the instance will be determined healthy. Some chains have more regular block production than others.")
cmd.Flags().String("reader-node-arguments", "", string(cli.Description(`
Defines the node arguments that will be passed to the node on execution. Supports templating, where we will replace certain sub-string with the appropriate value
Expand All @@ -55,7 +56,7 @@ func registerReaderNodeApp[B Block](chain *Chain[B]) {
Example: 'run blockchain -start {start-block-num} -end {stop-block-num}' may yield 'run blockchain -start 200 -end 500'
`)))
cmd.Flags().StringSlice("reader-node-backups", []string{}, "Repeatable, space-separated key=values definitions for backups. Example: 'type=gke-pvc-snapshot prefix= tag=v1 freq-blocks=1000 freq-time= project=myproj'")
cmd.Flags().String("reader-node-grpc-listen-addr", ReaderNodeGRPCAddr, "The gRPC listening address to use for serving real-time blocks")
cmd.Flags().String("reader-node-grpc-listen-addr", firecore.ReaderNodeGRPCAddr, "The gRPC listening address to use for serving real-time blocks")
cmd.Flags().Bool("reader-node-discard-after-stop-num", false, "Ignore remaining blocks being processed after stop num (only useful if we discard the reader data after reprocessing a chunk of blocks)")
cmd.Flags().String("reader-node-working-dir", "{data-dir}/reader/work", "Path where reader will stores its files")
cmd.Flags().Uint("reader-node-start-block-num", 0, "Blocks that were produced with smaller block number then the given block num are skipped")
Expand All @@ -75,7 +76,7 @@ func registerReaderNodeApp[B Block](chain *Chain[B]) {
sfDataDir := runtime.AbsDataDir

nodePath := viper.GetString("reader-node-path")
nodeDataDir := MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-data-dir"))
nodeDataDir := firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-data-dir"))

readinessMaxLatency := viper.GetDuration("reader-node-readiness-max-latency")
debugFirehose := viper.GetBool("reader-node-debug-firehose-logs")
Expand All @@ -93,10 +94,19 @@ func registerReaderNodeApp[B Block](chain *Chain[B]) {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Minute)
defer cancel()

resolveStartBlockNum, err := UnsafeResolveReaderNodeStartBlock(ctx, startCmd, runtime, rootLog)
if err != nil {
return nil, fmt.Errorf("resolve start block: %w", err)
userDefined := viper.IsSet("reader-node-start-block-num")
startBlockNum := viper.GetUint64("reader-node-start-block-num")
firstStreamableBlock := viper.GetUint64("common-first-streamable-block")

resolveStartBlockNum := startBlockNum
if !userDefined {
resolveStartBlockNum, err = firecore.UnsafeResolveReaderNodeStartBlock(ctx, startBlockNum, firstStreamableBlock, runtime, rootLog)
if err != nil {
return nil, fmt.Errorf("resolve start block: %w", err)
}

}

stopBlockNum := viper.GetUint64("reader-node-stop-block-num")

hostname, _ := os.Hostname()
Expand All @@ -123,7 +133,7 @@ func registerReaderNodeApp[B Block](chain *Chain[B]) {

var bootstrapper operator.Bootstrapper
if chain.ReaderNodeBootstrapperFactory != nil {
bootstrapper, err = chain.ReaderNodeBootstrapperFactory(startCmd.Context(), appLogger, startCmd, nodeArguments, nodeArgumentResolver)
bootstrapper, err = chain.ReaderNodeBootstrapperFactory(StartCmd.Context(), appLogger, StartCmd, nodeArguments, nodeArgumentResolver)
if err != nil {
return nil, fmt.Errorf("new bootstrapper: %w", err)
}
Expand Down Expand Up @@ -157,8 +167,8 @@ func registerReaderNodeApp[B Block](chain *Chain[B]) {
}

blockStreamServer := blockstream.NewUnmanagedServer(blockstream.ServerOptionWithLogger(appLogger))
oneBlocksStoreURL := MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url"))
workingDir := MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir"))
oneBlocksStoreURL := firecore.MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url"))
workingDir := firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir"))
gprcListenAddr := viper.GetString("reader-node-grpc-listen-addr")
oneBlockFileSuffix := viper.GetString("reader-node-one-block-suffix")
blocksChanCapacity := viper.GetInt("reader-node-blocks-chan-capacity")
Expand Down Expand Up @@ -210,7 +220,7 @@ var variablesRegex = regexp.MustCompile(`\{(data-dir|node-data-dir|hostname|star
// buildNodeArguments will resolve and split the given string into arguments, replacing the variables with the appropriate values.
//
// We are using a function for testing purposes, so that we can test arguments resolving and splitting correctly.
func buildNodeArguments(in string, resolver ReaderNodeArgumentResolver) ([]string, error) {
func buildNodeArguments(in string, resolver firecore.ReaderNodeArgumentResolver) ([]string, error) {
// Split arguments according to standard shell rules
nodeArguments, err := shellquote.Split(resolver(in))
if err != nil {
Expand All @@ -220,9 +230,7 @@ func buildNodeArguments(in string, resolver ReaderNodeArgumentResolver) ([]strin
return nodeArguments, nil
}

type ReaderNodeArgumentResolver = func(in string) string

func createNodeArgumentsResolver(dataDir, nodeDataDir, hostname string, startBlockNum, stopBlockNum uint64) ReaderNodeArgumentResolver {
func createNodeArgumentsResolver(dataDir, nodeDataDir, hostname string, startBlockNum, stopBlockNum uint64) firecore.ReaderNodeArgumentResolver {
return func(in string) string {
return variablesRegex.ReplaceAllStringFunc(in, func(match string) string {
switch match {
Expand Down
10 changes: 6 additions & 4 deletions reader_node_stdin.go → cmd/apps/reader_node_stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package firecore
package apps

import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/streamingfast/dlauncher/launcher"
firecore "github.com/streamingfast/firehose-core"
nodeManager "github.com/streamingfast/firehose-core/node-manager"
nodeReaderStdinApp "github.com/streamingfast/firehose-core/node-manager/app/node_reader_stdin"
"github.com/streamingfast/firehose-core/node-manager/metrics"
"github.com/streamingfast/firehose-core/node-manager/mindreader"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

func registerReaderNodeStdinApp[B Block](chain *Chain[B]) {
func RegisterReaderNodeStdinApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) {
appLogger, appTracer := logging.PackageLogger("reader-node-stdin", chain.LoggerPackageID("reader-node-stdin"))

launcher.RegisterApp(rootLog, &launcher.AppDef{
Expand All @@ -35,7 +37,7 @@ func registerReaderNodeStdinApp[B Block](chain *Chain[B]) {
RegisterFlags: func(cmd *cobra.Command) error { return nil },
FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
sfDataDir := runtime.AbsDataDir
archiveStoreURL := MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url"))
archiveStoreURL := firecore.MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url"))
consoleReaderFactory := func(lines chan string) (mindreader.ConsolerReader, error) {
return chain.ConsoleReaderFactory(lines, chain.BlockEncoder, appLogger, appTracer)
}
Expand All @@ -52,7 +54,7 @@ func registerReaderNodeStdinApp[B Block](chain *Chain[B]) {
MindReadBlocksChanCapacity: viper.GetInt("reader-node-blocks-chan-capacity"),
StartBlockNum: viper.GetUint64("reader-node-start-block-num"),
StopBlockNum: viper.GetUint64("reader-node-stop-block-num"),
WorkingDir: MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir")),
WorkingDir: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir")),
OneBlockSuffix: viper.GetString("reader-node-one-block-suffix"),
}, &nodeReaderStdinApp.Modules{
ConsoleReaderFactory: consoleReaderFactory,
Expand Down
2 changes: 1 addition & 1 deletion reader_node_test.go → cmd/apps/reader_node_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package firecore
package apps

import (
"testing"
Expand Down
12 changes: 7 additions & 5 deletions relayer.go → cmd/apps/relayer.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package firecore
package apps

import (
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/streamingfast/dlauncher/launcher"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/relayer/app/relayer"
"go.uber.org/zap"
)

func registerRelayerApp() {
func RegisterRelayerApp(rootLog *zap.Logger) {
launcher.RegisterApp(rootLog, &launcher.AppDef{
ID: "relayer",
Title: "Relayer",
Description: "Serves blocks as a stream, with a buffer",
RegisterFlags: func(cmd *cobra.Command) error {
cmd.Flags().String("relayer-grpc-listen-addr", RelayerServingAddr, "Address to listen for incoming gRPC requests")
cmd.Flags().StringSlice("relayer-source", []string{ReaderNodeGRPCAddr}, "List of live sources (reader(s)) to connect to for live block feeds (repeat flag as needed)")
cmd.Flags().String("relayer-grpc-listen-addr", firecore.RelayerServingAddr, "Address to listen for incoming gRPC requests")
cmd.Flags().StringSlice("relayer-source", []string{firecore.ReaderNodeGRPCAddr}, "List of live sources (reader(s)) to connect to for live block feeds (repeat flag as needed)")
cmd.Flags().Duration("relayer-max-source-latency", 999999*time.Hour, "Max latency tolerated to connect to a source. A performance optimization for when you have redundant sources and some may not have caught up")
return nil
},
Expand All @@ -25,7 +27,7 @@ func registerRelayerApp() {

return relayer.New(&relayer.Config{
SourcesAddr: viper.GetStringSlice("relayer-source"),
OneBlocksURL: MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url")),
OneBlocksURL: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url")),
GRPCListenAddr: viper.GetString("relayer-grpc-listen-addr"),
MaxSourceLatency: viper.GetDuration("relayer-max-source-latency"),
}), nil
Expand Down
Loading

0 comments on commit f58cd08

Please sign in to comment.