Skip to content

Commit

Permalink
make local mode docker composable with single file (standalone)
Browse files Browse the repository at this point in the history
  • Loading branch information
canercidam committed Mar 13, 2023
1 parent 2e84d57 commit 48cade4
Show file tree
Hide file tree
Showing 16 changed files with 366 additions and 91 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ coverage
forta-test
node_modules
test-bot-*

# ignore other files under the docker-compose directory
docker-compose/*/**
# but keep any docker compose files itself
!docker-compose.yml
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ containers:

containers-dev:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o forta-node cmd/node/main.go
DOCKER_BUILDKIT=1 docker build --network=host -t forta-network/forta-node -f Dockerfile.buildkit.dev.node .
DOCKER_BUILDKIT=1 docker build --no-cache --network=host -t forta-network/forta-node -f Dockerfile.buildkit.dev.node .
docker pull nats:2.3.2

main:
Expand Down
18 changes: 18 additions & 0 deletions clients/messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,21 @@ func (client *Client) PublishProto(subject string, payload proto.Message) {
}
logger.Debugf("published: %s", string(data))
}

type nopClient struct{}

func NewNopClient() *nopClient {
return &nopClient{}
}

func (sc *nopClient) Subscribe(subject string, handler interface{}) {

}

func (sc *nopClient) Publish(subject string, payload interface{}) {

}

func (sc *nopClient) PublishProto(subject string, payload proto.Message) {

}
62 changes: 32 additions & 30 deletions cmd/scanner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/forta-network/forta-core-go/clients/health"
"github.com/forta-network/forta-core-go/ethereum"
"github.com/forta-network/forta-core-go/feeds"
"github.com/forta-network/forta-core-go/security"
"github.com/forta-network/forta-core-go/utils"
"github.com/forta-network/forta-node/clients"
"github.com/forta-network/forta-node/clients/messaging"
Expand Down Expand Up @@ -57,7 +56,7 @@ func initTxStream(ctx context.Context, ethClient, traceClient ethereum.Client, c

var maxAgePtr *time.Duration
// support scanning old block ranges in local mode
hasLocalModeBlockRange := cfg.LocalModeConfig.Enable && cfg.LocalModeConfig.RuntimeLimits.StopBlock > 0
hasLocalModeBlockRange := cfg.LocalModeConfig.Enable && cfg.LocalModeConfig.RuntimeLimits.StopBlock != nil
if !hasLocalModeBlockRange && cfg.Scan.BlockMaxAgeSeconds > 0 {
maxAge := time.Duration(cfg.Scan.BlockMaxAgeSeconds) * time.Second
maxAgePtr = &maxAge
Expand All @@ -69,14 +68,18 @@ func initTxStream(ctx context.Context, ethClient, traceClient ethereum.Client, c
)
if cfg.LocalModeConfig.Enable {
runtimeLimits := cfg.LocalModeConfig.RuntimeLimits
if runtimeLimits.StartBlock > 0 {
startBlock = big.NewInt(0).SetUint64(runtimeLimits.StartBlock)
if runtimeLimits.StartBlock != nil {
startBlock = big.NewInt(0).SetUint64(*runtimeLimits.StartBlock)
}
if runtimeLimits.StopBlock > 0 {
stopBlock = big.NewInt(0).SetUint64(runtimeLimits.StopBlock)
if runtimeLimits.StopBlock != nil {
stopBlock = big.NewInt(0).SetUint64(*runtimeLimits.StopBlock)
}
}

if startBlock != nil && stopBlock != nil && !(stopBlock.Cmp(startBlock) > 0) {
log.Fatal("stop block is not greater than the start block - please check the runtime limits")
}

ethClient.SetRetryInterval(time.Second * time.Duration(cfg.Scan.RetryIntervalSeconds))

blockFeed, err := feeds.NewBlockFeed(ctx, ethClient, traceClient, feeds.BlockFeedConfig{
Expand Down Expand Up @@ -148,7 +151,7 @@ func getBlockOffset(cfg config.Config) int {
return chainSettings.DefaultOffset
}

func initCombinationStream(ctx context.Context, msgClient *messaging.Client, cfg config.Config) (*scanner.CombinerAlertStreamService, feeds.AlertFeed, error) {
func initCombinationStream(ctx context.Context, msgClient clients.MessageClient, cfg config.Config) (*scanner.CombinerAlertStreamService, feeds.AlertFeed, error) {
combinerFeed, err := feeds.NewCombinerFeed(
ctx, feeds.CombinerFeedConfig{
APIUrl: cfg.CombinerConfig.AlertAPIURL,
Expand Down Expand Up @@ -258,7 +261,7 @@ func initServices(ctx context.Context, cfg config.Config) ([]services.Service, e
cfg.LocalModeConfig.WebhookURL = utils.ConvertToDockerHostURL(cfg.LocalModeConfig.WebhookURL)
msgClient := messaging.NewClient("scanner", fmt.Sprintf("%s:%s", config.DockerNatsContainerName, config.DefaultNatsPort))

key, err := security.LoadKey(config.DefaultContainerKeyDirPath)
key, err := config.LoadKeyInContainer(cfg)
if err != nil {
return nil, err
}
Expand All @@ -268,7 +271,7 @@ func initServices(ctx context.Context, cfg config.Config) ([]services.Service, e
return nil, err
}

as, err := initAlertSender(ctx, key, publisherSvc, cfg)
alertSender, err := initAlertSender(ctx, key, publisherSvc, cfg)
if err != nil {
return nil, err
}
Expand All @@ -288,20 +291,10 @@ func initServices(ctx context.Context, cfg config.Config) ([]services.Service, e
return nil, err
}

combinationStream, combinationFeed, err := initCombinationStream(ctx, msgClient, cfg)
if err != nil {
return nil, err
}

registryClient, err := ethereum.NewStreamEthClient(ctx, "registry", cfg.Registry.JsonRpc.Url)
if err != nil {
return nil, err
}
registryService := registry.New(cfg, key.Address, msgClient, registryClient, blockFeed)

var waitBots int
if cfg.LocalModeConfig.Enable {
waitBots = len(cfg.LocalModeConfig.BotImages)
waitBots += len(cfg.LocalModeConfig.BotImages)
waitBots += len(cfg.LocalModeConfig.Standalone.BotContainers)
// sharded bots spawn on multiple containers, so total "wait bot" count is shards * target
for _, bot := range cfg.LocalModeConfig.ShardedBots {
if bot != nil {
Expand All @@ -310,24 +303,35 @@ func initServices(ctx context.Context, cfg config.Config) ([]services.Service, e
}
}

agentPool := agentpool.NewAgentPool(ctx, cfg.Scan, msgClient, waitBots)
txAnalyzer, err := initTxAnalyzer(ctx, cfg, as, txStream, agentPool, msgClient)
agentPool := agentpool.NewAgentPool(ctx, cfg, msgClient, waitBots)
txAnalyzer, err := initTxAnalyzer(ctx, cfg, alertSender, txStream, agentPool, msgClient)
if err != nil {
return nil, err
}
blockAnalyzer, err := initBlockAnalyzer(ctx, cfg, alertSender, txStream, agentPool, msgClient)
if err != nil {
return nil, err
}
blockAnalyzer, err := initBlockAnalyzer(ctx, cfg, as, txStream, agentPool, msgClient)

// Start the main block feed so all transaction feeds can start consuming.
if !cfg.Scan.DisableAutostart {
blockFeed.Start()
}

combinationStream, combinationFeed, err := initCombinationStream(ctx, msgClient, cfg)
if err != nil {
return nil, err
}

combinationAnalyzer, err := initCombinerAlertAnalyzer(ctx, cfg, as, combinationStream, agentPool, msgClient)
registryClient, err := ethereum.NewStreamEthClient(ctx, "registry", cfg.Registry.JsonRpc.Url)
if err != nil {
return nil, err
}
registryService := registry.New(cfg, key.Address, msgClient, registryClient, blockFeed)

// Start the main block feed so all transaction feeds can start consuming.
if !cfg.Scan.DisableAutostart {
blockFeed.Start()
combinationAnalyzer, err := initCombinerAlertAnalyzer(ctx, cfg, alertSender, combinationStream, agentPool, msgClient)
if err != nil {
return nil, err
}

svcs := []services.Service{
Expand All @@ -341,8 +345,6 @@ func initServices(ctx context.Context, cfg config.Config) ([]services.Service, e
blockAnalyzer,
combinationStream,
combinationAnalyzer,
scanner.NewScannerAPI(ctx, blockFeed),
scanner.NewTxLogger(ctx),
publisherSvc,
}

Expand Down
24 changes: 15 additions & 9 deletions config/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,23 @@ const (
)

type AgentConfig struct {
ID string `yaml:"id" json:"id"`
Image string `yaml:"image" json:"image"`
Manifest string `yaml:"manifest" json:"manifest"`
IsLocal bool `yaml:"isLocal" json:"isLocal"`
StartBlock *uint64 `yaml:"startBlock" json:"startBlock,omitempty"`
StopBlock *uint64 `yaml:"stopBlock" json:"stopBlock,omitempty"`
AlertConfig *protocol.AlertConfig
ShardConfig *ShardConfig
ID string `yaml:"id" json:"id"`
Image string `yaml:"image" json:"image"`
Manifest string `yaml:"manifest" json:"manifest"`
IsLocal bool `yaml:"isLocal" json:"isLocal"`
IsStandalone bool `yaml:"isStandalone" json:"isStandalone"`
StartBlock *uint64 `yaml:"startBlock" json:"startBlock,omitempty"`
StopBlock *uint64 `yaml:"stopBlock" json:"stopBlock,omitempty"`
AlertConfig *protocol.AlertConfig
ShardConfig *ShardConfig
}

type ShardConfig struct {
ShardID uint `yaml:"shardId" json:"shardId"`
Shards uint `yaml:"shards" json:"shards"`
Target uint `yaml:"target" json:"target"`
}

// ToAgentInfo transforms the agent config to the agent info.
func (ac AgentConfig) ToAgentInfo() *protocol.AgentInfo {
return &protocol.AgentInfo{
Expand All @@ -44,10 +46,14 @@ func (ac AgentConfig) ImageHash() string {
}

func (ac AgentConfig) ContainerName() string {
_, digest := utils.SplitImageRef(ac.Image)
if ac.IsStandalone {
// the container is already running - don't mess with the name
return ac.ID
}
if ac.IsLocal {
return fmt.Sprintf("%s-agent-%s", ContainerNamePrefix, utils.ShortenString(ac.ID, 8))
}
_, digest := utils.SplitImageRef(ac.Image)
return fmt.Sprintf(
"%s-agent-%s-%s", ContainerNamePrefix, utils.ShortenString(ac.ID, 8), utils.ShortenString(digest, 4),
)
Expand Down
103 changes: 85 additions & 18 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"errors"
"fmt"
"os"
"path"

Expand Down Expand Up @@ -113,11 +114,11 @@ type ContainerRegistryConfig struct {
}

type RuntimeLimits struct {
StartBlock uint64 `yaml:"startBlock" json:"startBlock"`
StopBlock uint64 `yaml:"stopBlock" json:"stopBlock" validate:"omitempty,gtfield=StartBlock"`
StopTimeoutSeconds int `yaml:"stopTimeoutSeconds" json:"stopTimeoutSeconds" default:"30"`
StartCombiner uint64 `yaml:"startCombiner" json:"startCombiner"`
StopCombiner uint64 `yaml:"stopCombiner" json:"stopCombiner"`
StartBlock *uint64 `yaml:"startBlock" json:"startBlock"`
StopBlock *uint64 `yaml:"stopBlock" json:"stopBlock" validate:"omitempty,gtfield=StartBlock"`
StopTimeoutSeconds int `yaml:"stopTimeoutSeconds" json:"stopTimeoutSeconds" default:"30"`
StartCombiner uint64 `yaml:"startCombiner" json:"startCombiner"`
StopCombiner uint64 `yaml:"stopCombiner" json:"stopCombiner"`
}

type RedisConfig struct {
Expand All @@ -138,18 +139,32 @@ type DeduplicationConfig struct {
RedisCluster *RedisClusterConfig `yaml:"redisCluster" json:"redisCluster"`
}

type StandaloneModeConfig struct {
Enable bool `yaml:"enable" json:"enable"`
BotContainers []string `yaml:"botContainers" json:"botContainers"`
}

type LocalModeConfig struct {
Enable bool `yaml:"enable" json:"enable"`
IncludeMetrics bool `yaml:"includeMetrics" json:"includeMetrics"`
BotIDs []string `yaml:"botIds" json:"botIds"`
BotImages []string `yaml:"botImages" json:"botImages"`
WebhookURL string `yaml:"webhookUrl" json:"webhookUrl"`
LogFileName string `yaml:"logFileName" json:"logFileName"`
LogToStdout bool `yaml:"logToStdout" json:"logToStdout"`
ContainerRegistry *ContainerRegistryConfig `yaml:"containerRegistry" json:"containerRegistry"`
RuntimeLimits RuntimeLimits `yaml:"runtimeLimits" json:"runtimeLimits"`
ForceEnableInspection bool `yaml:"forceEnableInspection" json:"forceEnableInspection"`
Deduplication *DeduplicationConfig `yaml:"deduplication" json:"deduplication"`
ShardedBots []*LocalShardedBot `yaml:"shardedBots" json:"shardedBots"`
PrivateKeyHex string `yaml:"privateKeyHex" json:"privateKeyHex"`
Standalone StandaloneModeConfig `yaml:"standalone" json:"standalone"`
}

// IsStandalone checks if the node is in standalone mode. It should only be available
// as another local mode setting.
func (lmc LocalModeConfig) IsStandalone() bool {
return lmc.Enable && lmc.Standalone.Enable
}

type LocalShardedBot struct {
Expand Down Expand Up @@ -217,12 +232,7 @@ func (cfg *Config) ConfigFilePath() string {

// GetConfigForContainer is how a container gets the forta configuration (file or env var)
func GetConfigForContainer() (Config, error) {
var cfg Config
if _, err := os.Stat(DefaultContainerConfigPath); os.IsNotExist(err) {
return cfg, errors.New("config file not found")
}

cfg, err := getConfigFromFile(DefaultContainerConfigPath)
cfg, err := getConfigFromFile()
if err != nil {
return Config{}, err
}
Expand Down Expand Up @@ -259,13 +269,70 @@ func applyContextDefaults(cfg *Config) {
cfg.CombinerConfig.CombinerCachePath = path.Join(cfg.FortaDir, DefaultCombinerCacheFileName)
}

func getConfigFromFile(filename string) (Config, error) {
var cfg Config
if err := readFile(filename, &cfg); err != nil {
return Config{}, err
func getConfigFromFile() (cfg Config, err error) {
var (
successfullyLoadedTimes int
wrappedErr error
)

// if the default config file exists, load from there
if err = checkIfConfigFileExists(DefaultContainerConfigPath); err == nil {
if err = readYamlFile(DefaultContainerConfigPath, &cfg); err != nil {
return
}
successfullyLoadedTimes++
}
if err := defaults.Set(&cfg); err != nil {
return Config{}, err
if err != nil {
wrappedErr = err
}
return cfg, nil

// if the wrapped config file exists, load from there
if err = checkIfConfigFileExists(DefaultContainerWrappedConfigPath); err == nil {
var wrapped map[string]Config
if err = readYamlFile(DefaultContainerWrappedConfigPath, &wrapped); err != nil {
return
}
var found bool
cfg, found = wrapped[DefaultConfigWrapperKey]
if !found {
err = fmt.Errorf("wrapped config file was found but did not have the config under '%s'", DefaultConfigWrapperKey)
return
}
successfullyLoadedTimes++
}
if err != nil {
wrappedErr = fmt.Errorf("%v, %v", wrappedErr, err)
}

// at this point we expect that at least one of the config files were loaded without errors
switch successfullyLoadedTimes {
case 2:
err = errors.New("multiple config files found in the forta dir - please use only one of them")
return

case 0:
err = fmt.Errorf("failed to load any of the config files: %v", wrappedErr)
return

case 1:
// yay! (ignore)

default:
err = fmt.Errorf("successfully loaded unexpected amount of config files (%d) - errors: %w", successfullyLoadedTimes, wrappedErr)
}

// finally set the defaults
err = defaults.Set(&cfg)
return
}

func checkIfConfigFileExists(configPath string) error {
_, err := os.Stat(configPath)
if os.IsNotExist(err) {
return errors.New("config file not found")
}
if err != nil {
return fmt.Errorf("failed to check if config file exists: %w", err)
}
return nil
}
7 changes: 4 additions & 3 deletions config/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ var (

DockerNetworkName = DockerScannerContainerName

DefaultContainerFortaDirPath = "/.forta"
DefaultContainerConfigPath = path.Join(DefaultContainerFortaDirPath, DefaultConfigFileName)
DefaultContainerKeyDirPath = path.Join(DefaultContainerFortaDirPath, DefaultKeysDirName)
DefaultContainerFortaDirPath = "/.forta"
DefaultContainerConfigPath = path.Join(DefaultContainerFortaDirPath, DefaultConfigFileName)
DefaultContainerWrappedConfigPath = path.Join(DefaultContainerFortaDirPath, DefaultWrappedConfigFileName)
DefaultContainerKeyDirPath = path.Join(DefaultContainerFortaDirPath, DefaultKeysDirName)
)
Loading

0 comments on commit 48cade4

Please sign in to comment.