Skip to content

Commit

Permalink
The firehose, substreams-tier1 and substream-tier2 health endpo…
Browse files Browse the repository at this point in the history
…int now respects the `common-system-shutdown-signal-delay` configuration value

This means that the health endpoint will return `false` now if `SIGINT` has been received but we are still in the shutdown unready period defined by the config value. If you use some sort of load balancer, you should make sure they are configured to use the health endpoint and you should `common-system-shutdown-signal-delay` to something like `15s`.

Fixes #9
  • Loading branch information
maoueh committed Jan 19, 2024
1 parent 405fc14 commit a4e7f79
Show file tree
Hide file tree
Showing 32 changed files with 1,070 additions and 41 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
.envrc
.env
.DS_Store
firehose-data*
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s

## Unreleased

* The `firehose`, `substreams-tier1` and `substream-tier2` health endpoint now respects the `common-system-shutdown-signal-delay` configuration value meaning that the health endpoint will return `false` now if `SIGINT` has been received but we are still in the shutdown unready period defined by the config value. If you use some sort of load balancer, you should make sure they are configured to use the health endpoint and you should `common-system-shutdown-signal-delay` to something like `15s`.

* The `firecore.ConsoleReader` gained the ability to print stats as it ingest blocks.

* The `firecore.ConsoleReader` has been made stricter by ensuring Firehose chain exchange protocol is respected.
Expand Down
3 changes: 2 additions & 1 deletion cmd/apps/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"github.com/streamingfast/bstream/transform"
"github.com/streamingfast/dauth"
discoveryservice "github.com/streamingfast/dgrpc/server/discovery-service"
"github.com/streamingfast/dlauncher/launcher"
"github.com/streamingfast/dmetrics"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/firehose/app/firehose"
"github.com/streamingfast/firehose-core/firehose/server"
"github.com/streamingfast/firehose-core/launcher"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -100,6 +100,7 @@ func RegisterFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *za
HeadTimeDriftMetric: headTimeDriftmetric,
HeadBlockNumberMetric: headBlockNumMetric,
TransformRegistry: registry,
CheckPendingShutdown: runtime.IsPendingShutdown,
}), nil
},
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/index_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
bstransform "github.com/streamingfast/bstream/transform"
"github.com/streamingfast/dlauncher/launcher"
firecore "github.com/streamingfast/firehose-core"
index_builder "github.com/streamingfast/firehose-core/index-builder/app/index-builder"
"github.com/streamingfast/firehose-core/launcher"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/streamingfast/dlauncher/launcher"
"github.com/streamingfast/firehose-core/launcher"
"github.com/streamingfast/firehose-core/merger/app/merger"
"go.uber.org/zap"
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/reader_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/streamingfast/bstream/blockstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/cli"
"github.com/streamingfast/dlauncher/launcher"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/launcher"
nodeManager "github.com/streamingfast/firehose-core/node-manager"
nodeManagerApp "github.com/streamingfast/firehose-core/node-manager/app/node_manager"
"github.com/streamingfast/firehose-core/node-manager/metrics"
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/reader_node_stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package apps
import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/streamingfast/dlauncher/launcher"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/launcher"
nodeManager "github.com/streamingfast/firehose-core/node-manager"
nodeReaderStdinApp "github.com/streamingfast/firehose-core/node-manager/app/node_reader_stdin"
"github.com/streamingfast/firehose-core/node-manager/metrics"
Expand Down
2 changes: 1 addition & 1 deletion cmd/apps/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/streamingfast/dlauncher/launcher"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/launcher"
"github.com/streamingfast/firehose-core/relayer/app/relayer"
"go.uber.org/zap"
)
Expand Down
16 changes: 8 additions & 8 deletions cmd/apps/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,16 @@ import (
"github.com/streamingfast/bstream"
"github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dlauncher/launcher"
"github.com/streamingfast/dmetering"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/launcher"
tracing "github.com/streamingfast/sf-tracing"
"go.uber.org/zap"
)

var StartCmd = &cobra.Command{Use: "start", Args: cobra.ArbitraryArgs}

func ConfigureStartCmd[B firecore.Block](chain *firecore.Chain[B], binaryName string, rootLog *zap.Logger) {

StartCmd.Short = fmt.Sprintf("Starts `%s` services all at once", binaryName)
StartCmd.RunE = func(cmd *cobra.Command, args []string) (err error) {
cmd.SilenceUsage = true
Expand Down Expand Up @@ -66,10 +65,6 @@ func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logge
return err
}

modules := &launcher.Runtime{
AbsDataDir: dataDirAbs,
}

bstream.GetProtocolFirstStreamableBlock = sflags.MustGetUint64(cmd, "common-first-streamable-block")

err = bstream.ValidateRegistry()
Expand All @@ -86,7 +81,7 @@ func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logge
}()
dmetering.SetDefaultEmitter(eventEmitter)

launch := launcher.NewLauncher(rootLog, modules)
launch := launcher.NewLauncher(rootLog, dataDirAbs)
rootLog.Debug("launcher created")

runByDefault := func(app string) bool {
Expand All @@ -111,7 +106,12 @@ func start(cmd *cobra.Command, dataDir string, args []string, rootLog *zap.Logge
return err
}

signalHandler, _, _ := cli.SetupSignalHandler(sflags.MustGetDuration(cmd, "common-system-shutdown-signal-delay"), rootLog)
signalHandler, hasBeenSignaled, _ := cli.SetupSignalHandler(sflags.MustGetDuration(cmd, "common-system-shutdown-signal-delay"), rootLog)

// We need to pass the signal handler so that runtime.IsPendingShutdown() is properly
// linked to the signal handler, otherwise, it will always return false.
launch.SwitchHasBeenSignaledAtomic(hasBeenSignaled)

select {
case <-signalHandler:
rootLog.Info("received termination signal, quitting")
Expand Down
3 changes: 2 additions & 1 deletion cmd/apps/substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/spf13/viper"
"github.com/streamingfast/dauth"
discoveryservice "github.com/streamingfast/dgrpc/server/discovery-service"
"github.com/streamingfast/dlauncher/launcher"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/launcher"
"github.com/streamingfast/logging"
app "github.com/streamingfast/substreams/app"
"go.uber.org/zap"
Expand Down Expand Up @@ -129,6 +129,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
Authenticator: authenticator,
HeadTimeDriftMetric: ss1HeadTimeDriftmetric,
HeadBlockNumberMetric: ss1HeadBlockNumMetric,
CheckPendingShutDown: runtime.IsPendingShutdown,
}), nil
},
})
Expand Down
4 changes: 3 additions & 1 deletion cmd/apps/substreams_tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
discoveryservice "github.com/streamingfast/dgrpc/server/discovery-service"
"github.com/streamingfast/dlauncher/launcher"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/launcher"
"github.com/streamingfast/logging"
"github.com/streamingfast/substreams/app"
"go.uber.org/zap"
Expand Down Expand Up @@ -98,6 +98,8 @@ func RegisterSubstreamsTier2App[B firecore.Block](chain *firecore.Chain[B], root

GRPCListenAddr: grpcListenAddr,
ServiceDiscoveryURL: serviceDiscoveryURL,
}, &app.Tier2Modules{
CheckPendingShutDown: runtime.IsPendingShutdown,
}), nil
},
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
dauthnull "github.com/streamingfast/dauth/null"
dauthsecret "github.com/streamingfast/dauth/secret"
dauthtrust "github.com/streamingfast/dauth/trust"
"github.com/streamingfast/dlauncher/launcher"
"github.com/streamingfast/dmetering"
dmeteringgrpc "github.com/streamingfast/dmetering/grpc"
dmeteringlogger "github.com/streamingfast/dmetering/logger"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/cmd/apps"
"github.com/streamingfast/firehose-core/cmd/tools"
"github.com/streamingfast/firehose-core/launcher"

"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand Down
2 changes: 1 addition & 1 deletion cmd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dlauncher/launcher"
"github.com/streamingfast/firehose-core/launcher"
)

func setupCmd(cmd *cobra.Command, binaryName string) error {
Expand Down
39 changes: 30 additions & 9 deletions consolereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"strconv"
"strings"
"sync"
"time"

"github.com/streamingfast/bstream"
Expand All @@ -29,10 +30,11 @@ type ParsingStats struct {
}

type ConsoleReader struct {
lines chan string
done chan interface{}
logger *zap.Logger
tracer logging.Tracer
lines chan string
done chan interface{}
closeOnce sync.Once
logger *zap.Logger
tracer logging.Tracer

// Parsing context
readerProtocolVersion string
Expand Down Expand Up @@ -84,17 +86,36 @@ func (r *ConsoleReader) Done() <-chan interface{} {
}

func (r *ConsoleReader) Close() error {
r.blockRate.SyncNow()
r.printStats()
r.closeOnce.Do(func() {
r.blockRate.SyncNow()
r.printStats()

r.logger.Info("console reader done")
close(r.done)
r.logger.Info("console reader done")
close(r.done)
})

return nil
}

type blockRefView struct {
ref bstream.BlockRef
}

func (v blockRefView) String() string {
if v.ref == nil {
return "<unset>"
}

return v.ref.String()
}

func (r *ConsoleReader) printStats() {
r.logger.Info("console reader stats", zap.Stringer("block_rate", r.blockRate), zap.Stringer("last_block", r.lastBlock), zap.Stringer("last_parent_block", r.lastParentBlock), zap.Uint64("lib", r.lib))
r.logger.Info("console reader stats",
zap.Stringer("block_rate", r.blockRate),
zap.Stringer("last_block", blockRefView{r.lastBlock}),
zap.Stringer("last_parent_block", blockRefView{r.lastParentBlock}),
zap.Uint64("lib", r.lib),
)
}

func (r *ConsoleReader) ReadBlock() (out *pbbstream.Block, err error) {
Expand Down
22 changes: 22 additions & 0 deletions devel/standard/standard.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
start:
args:
- reader-node
- merger
- relayer
- firehose
- substreams-tier1
- substreams-tier2
flags:
# Specifies the path to the binary, we assume you did
# `go install github.com/streamingfast/dummy-blockchain@latest` (and that you have value
# of `go env GOPATH` in your environment).
reader-node-path: "dummy-blockchain"
reader-node-data-dir: "{data-dir}/reader-node"

# Flags that will be added to the dummy chain process command
reader-node-arguments:
start
--tracer=firehose
--store-dir="{node-data-dir}"
--block-rate=1200
--genesis-height=0
49 changes: 49 additions & 0 deletions devel/standard/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env bash

ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

clean=
firecore="$ROOT/../firecore"

main() {
pushd "$ROOT" &> /dev/null

while getopts "hc" opt; do
case $opt in
h) usage && exit 0;;
c) clean=true;;
\?) usage_error "Invalid option: -$OPTARG";;
esac
done
shift $((OPTIND-1))
[[ $1 = "--" ]] && shift

set -e

if [[ $clean == "true" ]]; then
rm -rf firehose-data &> /dev/null || true
fi

exec $firecore -c $(basename $ROOT).yaml start "$@"
}

usage_error() {
message="$1"
exit_code="$2"

echo "ERROR: $message"
echo ""
usage
exit ${exit_code:-1}
}

usage() {
echo "usage: start.sh [-c]"
echo ""
echo "Start $(basename $ROOT) environment."
echo ""
echo "Options"
echo " -c Clean actual data directory first"
}

main "$@"
Loading

0 comments on commit a4e7f79

Please sign in to comment.