From 513ffa7e0c8252424b69df4070053e8075210bd1 Mon Sep 17 00:00:00 2001 From: Svarog Date: Wed, 30 Sep 2020 17:07:40 +0300 Subject: [PATCH] [NOD-1420] Restructure main (#942) * [NOD-1420] Moved setting limits to executor * [NOD-1420] Moved all code dealing with windows service to separate package * [NOD-1420] Move practically all main to restructured app package * [NOD-1420] Check for running as interactive only after checking if we are doing any service operation * [NOD-1420] Add comments * [NOD-1420] Add a comment --- app/app.go | 347 +++++++----------- app/component_manager.go | 260 +++++++++++++ cmd/addblock/addblock.go | 5 +- infrastructure/config/config.go | 97 ++--- infrastructure/logger/logger.go | 6 +- infrastructure/os/execenv/initialize.go | 22 ++ infrastructure/os/limits/desired_limits.go | 7 + infrastructure/os/limits/limits_plan9.go | 2 +- infrastructure/os/limits/limits_unix.go | 22 +- infrastructure/os/limits/limits_windows.go | 2 +- infrastructure/os/winservice/common.go | 17 + infrastructure/os/winservice/log.go | 13 + .../os/winservice/service_command_windows.go | 178 +++++++++ .../os/winservice/service_main_windows.go | 40 ++ .../os/winservice/service_windows.go | 119 ++++++ log.go | 2 - main.go | 200 +--------- service_windows.go | 309 ---------------- testing/integration/setup_test.go | 4 +- 19 files changed, 859 insertions(+), 793 deletions(-) create mode 100644 app/component_manager.go create mode 100644 infrastructure/os/execenv/initialize.go create mode 100644 infrastructure/os/limits/desired_limits.go create mode 100644 infrastructure/os/winservice/common.go create mode 100644 infrastructure/os/winservice/log.go create mode 100644 infrastructure/os/winservice/service_command_windows.go create mode 100644 infrastructure/os/winservice/service_main_windows.go create mode 100644 infrastructure/os/winservice/service_windows.go delete mode 100644 service_windows.go diff --git a/app/app.go b/app/app.go index bfa396a846..0ef15b42f0 100644 --- a/app/app.go +++ b/app/app.go @@ -2,259 +2,194 @@ package app import ( "fmt" - "sync/atomic" + "os" + "path/filepath" + "runtime" + "time" - "github.com/kaspanet/kaspad/infrastructure/network/addressmanager" - - "github.com/kaspanet/kaspad/infrastructure/network/netadapter/id" + "github.com/kaspanet/kaspad/infrastructure/db/dbaccess" - "github.com/kaspanet/kaspad/app/appmessage" - "github.com/kaspanet/kaspad/app/protocol" - "github.com/kaspanet/kaspad/app/rpc" - "github.com/kaspanet/kaspad/domain/blockdag" "github.com/kaspanet/kaspad/domain/blockdag/indexers" - "github.com/kaspanet/kaspad/domain/mempool" - "github.com/kaspanet/kaspad/domain/mining" - "github.com/kaspanet/kaspad/domain/txscript" - "github.com/kaspanet/kaspad/infrastructure/config" - "github.com/kaspanet/kaspad/infrastructure/db/dbaccess" - "github.com/kaspanet/kaspad/infrastructure/network/connmanager" - "github.com/kaspanet/kaspad/infrastructure/network/dnsseed" - "github.com/kaspanet/kaspad/infrastructure/network/netadapter" + "github.com/kaspanet/kaspad/infrastructure/os/signal" + "github.com/kaspanet/kaspad/util/profiling" + "github.com/kaspanet/kaspad/version" + "github.com/kaspanet/kaspad/util/panics" -) -// App is a wrapper for all the kaspad services -type App struct { - cfg *config.Config - addressManager *addressmanager.AddressManager - protocolManager *protocol.Manager - rpcManager *rpc.Manager - connectionManager *connmanager.ConnectionManager - netAdapter *netadapter.NetAdapter + "github.com/kaspanet/kaspad/infrastructure/config" + "github.com/kaspanet/kaspad/infrastructure/os/execenv" + "github.com/kaspanet/kaspad/infrastructure/os/limits" + "github.com/kaspanet/kaspad/infrastructure/os/winservice" +) - started, shutdown int32 +var desiredLimits = &limits.DesiredLimits{ + FileLimitWant: 2048, + FileLimitMin: 1024, } -// Start launches all the kaspad services. -func (a *App) Start() { - // Already started? - if atomic.AddInt32(&a.started, 1) != 1 { - return - } - - log.Trace("Starting kaspad") - - err := a.netAdapter.Start() - if err != nil { - panics.Exit(log, fmt.Sprintf("Error starting the net adapter: %+v", err)) - } - - a.maybeSeedFromDNS() - - a.connectionManager.Start() +var serviceDescription = &winservice.ServiceDescription{ + Name: "kaspadsvc", + DisplayName: "Kaspad Service", + Description: "Downloads and stays synchronized with the Kaspa blockDAG and " + + "provides DAG services to applications.", } -// Stop gracefully shuts down all the kaspad services. -func (a *App) Stop() { - // Make sure this only happens once. - if atomic.AddInt32(&a.shutdown, 1) != 1 { - log.Infof("Kaspad is already in the process of shutting down") - return - } - - log.Warnf("Kaspad shutting down") +type kaspadApp struct { + cfg *config.Config +} - a.connectionManager.Stop() +// StartApp starts the kaspad app, and blocks until it finishes running +func StartApp() error { + execenv.Initialize(desiredLimits) - err := a.netAdapter.Stop() + // Load configuration and parse command line. This function also + // initializes logging and configures it accordingly. + cfg, err := config.LoadConfig() if err != nil { - log.Errorf("Error stopping the net adapter: %+v", err) + fmt.Fprint(os.Stderr, err) + return err } + defer panics.HandlePanic(log, "MAIN", nil) - err = a.addressManager.Stop() - if err != nil { - log.Errorf("Error stopping address manager: %s", err) + app := &kaspadApp{cfg: cfg} + + // Call serviceMain on Windows to handle running as a service. When + // the return isService flag is true, exit now since we ran as a + // service. Otherwise, just fall through to normal operation. + if runtime.GOOS == "windows" { + isService, err := winservice.WinServiceMain(app.main, serviceDescription, cfg) + if err != nil { + return err + } + if isService { + return nil + } } - return + return app.main(nil) } -// New returns a new App instance configured to listen on addr for the -// kaspa network type specified by dagParams. Use start to begin accepting -// connections from peers. -func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt chan<- struct{}) (*App, error) { - indexManager, acceptanceIndex := setupIndexes(cfg) +func (app *kaspadApp) main(startedChan chan<- struct{}) error { + // Get a channel that will be closed when a shutdown signal has been + // triggered either from an OS signal such as SIGINT (Ctrl+C) or from + // another subsystem such as the RPC server. + interrupt := signal.InterruptListener() + defer log.Info("Shutdown complete") - sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize) + // Show version at startup. + log.Infof("Version %s", version.Version()) - // Create a new block DAG instance with the appropriate configuration. - dag, err := setupDAG(cfg, databaseContext, sigCache, indexManager) - if err != nil { - return nil, err + // Enable http profiling server if requested. + if app.cfg.Profile != "" { + profiling.Start(app.cfg.Profile, log) } - txMempool := setupMempool(cfg, dag, sigCache) - - netAdapter, err := netadapter.NewNetAdapter(cfg) - if err != nil { - return nil, err + // Perform upgrades to kaspad as new versions require it. + if err := doUpgrades(); err != nil { + log.Error(err) + return err } - addressManager, err := addressmanager.New(cfg, databaseContext) - if err != nil { - return nil, err + + // Return now if an interrupt signal was triggered. + if signal.InterruptRequested(interrupt) { + return nil } - connectionManager, err := connmanager.New(cfg, netAdapter, addressManager) - if err != nil { - return nil, err + + if app.cfg.ResetDatabase { + err := removeDatabase(app.cfg) + if err != nil { + log.Error(err) + return err + } } - protocolManager, err := protocol.NewManager(cfg, dag, netAdapter, addressManager, txMempool, connectionManager) + + // Open the database + databaseContext, err := openDB(app.cfg) if err != nil { - return nil, err + log.Error(err) + return err } - rpcManager := setupRPC(cfg, txMempool, dag, sigCache, netAdapter, protocolManager, connectionManager, addressManager, acceptanceIndex, interrupt) - - return &App{ - cfg: cfg, - protocolManager: protocolManager, - rpcManager: rpcManager, - connectionManager: connectionManager, - netAdapter: netAdapter, - addressManager: addressManager, - }, nil - -} -func setupRPC( - cfg *config.Config, - txMempool *mempool.TxPool, - dag *blockdag.BlockDAG, - sigCache *txscript.SigCache, - netAdapter *netadapter.NetAdapter, - protocolManager *protocol.Manager, - connectionManager *connmanager.ConnectionManager, - addressManager *addressmanager.AddressManager, - acceptanceIndex *indexers.AcceptanceIndex, - shutDownChan chan<- struct{}, -) *rpc.Manager { - - blockTemplateGenerator := mining.NewBlkTmplGenerator(&mining.Policy{BlockMaxMass: cfg.BlockMaxMass}, txMempool, dag, sigCache) - rpcManager := rpc.NewManager(cfg, netAdapter, dag, protocolManager, connectionManager, blockTemplateGenerator, txMempool, addressManager, acceptanceIndex, shutDownChan) - protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG) - protocolManager.SetOnTransactionAddedToMempoolHandler(rpcManager.NotifyTransactionAddedToMempool) - dag.Subscribe(func(notification *blockdag.Notification) { - err := handleBlockDAGNotifications(notification, acceptanceIndex, rpcManager) + defer func() { + log.Infof("Gracefully shutting down the database...") + err := databaseContext.Close() if err != nil { - panic(err) + log.Errorf("Failed to close the database: %s", err) } - }) - return rpcManager -} + }() -func handleBlockDAGNotifications(notification *blockdag.Notification, - acceptanceIndex *indexers.AcceptanceIndex, rpcManager *rpc.Manager) error { + // Return now if an interrupt signal was triggered. + if signal.InterruptRequested(interrupt) { + return nil + } - switch notification.Type { - case blockdag.NTChainChanged: - if acceptanceIndex == nil { - return nil - } - chainChangedNotificationData := notification.Data.(*blockdag.ChainChangedNotificationData) - err := rpcManager.NotifyChainChanged(chainChangedNotificationData.RemovedChainBlockHashes, - chainChangedNotificationData.AddedChainBlockHashes) - if err != nil { - return err - } - case blockdag.NTFinalityConflict: - finalityConflictNotificationData := notification.Data.(*blockdag.FinalityConflictNotificationData) - err := rpcManager.NotifyFinalityConflict(finalityConflictNotificationData.ViolatingBlockHash.String()) - if err != nil { + // Drop indexes and exit if requested. + if app.cfg.DropAcceptanceIndex { + if err := indexers.DropAcceptanceIndex(databaseContext); err != nil { + log.Errorf("%s", err) return err } - case blockdag.NTFinalityConflictResolved: - finalityConflictResolvedNotificationData := notification.Data.(*blockdag.FinalityConflictResolvedNotificationData) - err := rpcManager.NotifyFinalityConflictResolved(finalityConflictResolvedNotificationData.FinalityBlockHash.String()) - if err != nil { - return err - } - } - return nil -} -func (a *App) maybeSeedFromDNS() { - if !a.cfg.DisableDNSSeed { - dnsseed.SeedFromDNS(a.cfg.NetParams(), a.cfg.DNSSeed, appmessage.SFNodeNetwork, false, nil, - a.cfg.Lookup, func(addresses []*appmessage.NetAddress) { - // Kaspad uses a lookup of the dns seeder here. Since seeder returns - // IPs of nodes and not its own IP, we can not know real IP of - // source. So we'll take first returned address as source. - a.addressManager.AddAddresses(addresses, addresses[0], nil) - }) + return nil } - if a.cfg.GRPCSeed != "" { - dnsseed.SeedFromGRPC(a.cfg.NetParams(), a.cfg.GRPCSeed, appmessage.SFNodeNetwork, false, nil, - func(addresses []*appmessage.NetAddress) { - a.addressManager.AddAddresses(addresses, addresses[0], nil) - }) + // Create componentManager and start it. + componentManager, err := NewComponentManager(app.cfg, databaseContext, interrupt) + if err != nil { + log.Errorf("Unable to start kaspad: %+v", err) + return err } -} -func setupDAG(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, - sigCache *txscript.SigCache, indexManager blockdag.IndexManager) (*blockdag.BlockDAG, error) { - - dag, err := blockdag.New(&blockdag.Config{ - DatabaseContext: databaseContext, - DAGParams: cfg.NetParams(), - TimeSource: blockdag.NewTimeSource(), - SigCache: sigCache, - IndexManager: indexManager, - SubnetworkID: cfg.SubnetworkID, - MaxUTXOCacheSize: cfg.MaxUTXOCacheSize, - }) - return dag, err -} -func setupIndexes(cfg *config.Config) (blockdag.IndexManager, *indexers.AcceptanceIndex) { - // Create indexes if needed. - var indexes []indexers.Indexer - var acceptanceIndex *indexers.AcceptanceIndex - if cfg.AcceptanceIndex { - log.Info("acceptance index is enabled") - acceptanceIndex = indexers.NewAcceptanceIndex() - indexes = append(indexes, acceptanceIndex) - } + defer func() { + log.Infof("Gracefully shutting down kaspad...") + + shutdownDone := make(chan struct{}) + go func() { + componentManager.Stop() + shutdownDone <- struct{}{} + }() + + const shutdownTimeout = 2 * time.Minute - // Create an index manager if any of the optional indexes are enabled. - if len(indexes) < 0 { - return nil, nil + select { + case <-shutdownDone: + case <-time.After(shutdownTimeout): + log.Criticalf("Graceful shutdown timed out %s. Terminating...", shutdownTimeout) + } + log.Infof("Kaspad shutdown complete") + }() + + componentManager.Start() + + if startedChan != nil { + startedChan <- struct{}{} } - indexManager := indexers.NewManager(indexes) - return indexManager, acceptanceIndex + + // Wait until the interrupt signal is received from an OS signal or + // shutdown is requested through one of the subsystems such as the RPC + // server. + <-interrupt + return nil } -func setupMempool(cfg *config.Config, dag *blockdag.BlockDAG, sigCache *txscript.SigCache) *mempool.TxPool { - mempoolConfig := mempool.Config{ - Policy: mempool.Policy{ - AcceptNonStd: cfg.RelayNonStd, - MaxOrphanTxs: cfg.MaxOrphanTxs, - MaxOrphanTxSize: config.DefaultMaxOrphanTxSize, - MinRelayTxFee: cfg.MinRelayTxFee, - MaxTxVersion: 1, - }, - CalcTxSequenceLockFromReferencedUTXOEntries: dag.CalcTxSequenceLockFromReferencedUTXOEntries, - SigCache: sigCache, - DAG: dag, - } +// doUpgrades performs upgrades to kaspad as new versions require it. +// currently it's a placeholder we got from kaspad upstream, that does nothing +func doUpgrades() error { + return nil +} - return mempool.New(&mempoolConfig) +// dbPath returns the path to the block database given a database type. +func databasePath(cfg *config.Config) string { + return filepath.Join(cfg.DataDir, "db") } -// P2PNodeID returns the network ID associated with this App -func (a *App) P2PNodeID() *id.ID { - return a.netAdapter.ID() +func removeDatabase(cfg *config.Config) error { + dbPath := databasePath(cfg) + return os.RemoveAll(dbPath) } -// AddressManager returns the AddressManager associated with this App -func (a *App) AddressManager() *addressmanager.AddressManager { - return a.addressManager +func openDB(cfg *config.Config) (*dbaccess.DatabaseContext, error) { + dbPath := databasePath(cfg) + log.Infof("Loading database from '%s'", dbPath) + return dbaccess.New(dbPath) } diff --git a/app/component_manager.go b/app/component_manager.go new file mode 100644 index 0000000000..d99c930b22 --- /dev/null +++ b/app/component_manager.go @@ -0,0 +1,260 @@ +package app + +import ( + "fmt" + "sync/atomic" + + "github.com/kaspanet/kaspad/infrastructure/network/addressmanager" + + "github.com/kaspanet/kaspad/infrastructure/network/netadapter/id" + + "github.com/kaspanet/kaspad/app/appmessage" + "github.com/kaspanet/kaspad/app/protocol" + "github.com/kaspanet/kaspad/app/rpc" + "github.com/kaspanet/kaspad/domain/blockdag" + "github.com/kaspanet/kaspad/domain/blockdag/indexers" + "github.com/kaspanet/kaspad/domain/mempool" + "github.com/kaspanet/kaspad/domain/mining" + "github.com/kaspanet/kaspad/domain/txscript" + "github.com/kaspanet/kaspad/infrastructure/config" + "github.com/kaspanet/kaspad/infrastructure/db/dbaccess" + "github.com/kaspanet/kaspad/infrastructure/network/connmanager" + "github.com/kaspanet/kaspad/infrastructure/network/dnsseed" + "github.com/kaspanet/kaspad/infrastructure/network/netadapter" + "github.com/kaspanet/kaspad/util/panics" +) + +// ComponentManager is a wrapper for all the kaspad services +type ComponentManager struct { + cfg *config.Config + addressManager *addressmanager.AddressManager + protocolManager *protocol.Manager + rpcManager *rpc.Manager + connectionManager *connmanager.ConnectionManager + netAdapter *netadapter.NetAdapter + + started, shutdown int32 +} + +// Start launches all the kaspad services. +func (a *ComponentManager) Start() { + // Already started? + if atomic.AddInt32(&a.started, 1) != 1 { + return + } + + log.Trace("Starting kaspad") + + err := a.netAdapter.Start() + if err != nil { + panics.Exit(log, fmt.Sprintf("Error starting the net adapter: %+v", err)) + } + + a.maybeSeedFromDNS() + + a.connectionManager.Start() +} + +// Stop gracefully shuts down all the kaspad services. +func (a *ComponentManager) Stop() { + // Make sure this only happens once. + if atomic.AddInt32(&a.shutdown, 1) != 1 { + log.Infof("Kaspad is already in the process of shutting down") + return + } + + log.Warnf("Kaspad shutting down") + + a.connectionManager.Stop() + + err := a.netAdapter.Stop() + if err != nil { + log.Errorf("Error stopping the net adapter: %+v", err) + } + + err = a.addressManager.Stop() + if err != nil { + log.Errorf("Error stopping address manager: %s", err) + } + + return +} + +// NewComponentManager returns a new ComponentManager instance. +// Use Start() to begin all services within this ComponentManager +func NewComponentManager(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt chan<- struct{}) (*ComponentManager, error) { + indexManager, acceptanceIndex := setupIndexes(cfg) + + sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize) + + // Create a new block DAG instance with the appropriate configuration. + dag, err := setupDAG(cfg, databaseContext, sigCache, indexManager) + if err != nil { + return nil, err + } + + txMempool := setupMempool(cfg, dag, sigCache) + + netAdapter, err := netadapter.NewNetAdapter(cfg) + if err != nil { + return nil, err + } + addressManager, err := addressmanager.New(cfg, databaseContext) + if err != nil { + return nil, err + } + connectionManager, err := connmanager.New(cfg, netAdapter, addressManager) + if err != nil { + return nil, err + } + protocolManager, err := protocol.NewManager(cfg, dag, netAdapter, addressManager, txMempool, connectionManager) + if err != nil { + return nil, err + } + rpcManager := setupRPC(cfg, txMempool, dag, sigCache, netAdapter, protocolManager, connectionManager, addressManager, acceptanceIndex, interrupt) + + return &ComponentManager{ + cfg: cfg, + protocolManager: protocolManager, + rpcManager: rpcManager, + connectionManager: connectionManager, + netAdapter: netAdapter, + addressManager: addressManager, + }, nil + +} + +func setupRPC( + cfg *config.Config, + txMempool *mempool.TxPool, + dag *blockdag.BlockDAG, + sigCache *txscript.SigCache, + netAdapter *netadapter.NetAdapter, + protocolManager *protocol.Manager, + connectionManager *connmanager.ConnectionManager, + addressManager *addressmanager.AddressManager, + acceptanceIndex *indexers.AcceptanceIndex, + shutDownChan chan<- struct{}, +) *rpc.Manager { + + blockTemplateGenerator := mining.NewBlkTmplGenerator(&mining.Policy{BlockMaxMass: cfg.BlockMaxMass}, txMempool, dag, sigCache) + rpcManager := rpc.NewManager(cfg, netAdapter, dag, protocolManager, connectionManager, blockTemplateGenerator, txMempool, addressManager, acceptanceIndex, shutDownChan) + protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG) + protocolManager.SetOnTransactionAddedToMempoolHandler(rpcManager.NotifyTransactionAddedToMempool) + dag.Subscribe(func(notification *blockdag.Notification) { + err := handleBlockDAGNotifications(notification, acceptanceIndex, rpcManager) + if err != nil { + panic(err) + } + }) + return rpcManager +} + +func handleBlockDAGNotifications(notification *blockdag.Notification, + acceptanceIndex *indexers.AcceptanceIndex, rpcManager *rpc.Manager) error { + + switch notification.Type { + case blockdag.NTChainChanged: + if acceptanceIndex == nil { + return nil + } + chainChangedNotificationData := notification.Data.(*blockdag.ChainChangedNotificationData) + err := rpcManager.NotifyChainChanged(chainChangedNotificationData.RemovedChainBlockHashes, + chainChangedNotificationData.AddedChainBlockHashes) + if err != nil { + return err + } + case blockdag.NTFinalityConflict: + finalityConflictNotificationData := notification.Data.(*blockdag.FinalityConflictNotificationData) + err := rpcManager.NotifyFinalityConflict(finalityConflictNotificationData.ViolatingBlockHash.String()) + if err != nil { + return err + } + case blockdag.NTFinalityConflictResolved: + finalityConflictResolvedNotificationData := notification.Data.(*blockdag.FinalityConflictResolvedNotificationData) + err := rpcManager.NotifyFinalityConflictResolved(finalityConflictResolvedNotificationData.FinalityBlockHash.String()) + if err != nil { + return err + } + } + return nil +} + +func (a *ComponentManager) maybeSeedFromDNS() { + if !a.cfg.DisableDNSSeed { + dnsseed.SeedFromDNS(a.cfg.NetParams(), a.cfg.DNSSeed, appmessage.SFNodeNetwork, false, nil, + a.cfg.Lookup, func(addresses []*appmessage.NetAddress) { + // Kaspad uses a lookup of the dns seeder here. Since seeder returns + // IPs of nodes and not its own IP, we can not know real IP of + // source. So we'll take first returned address as source. + a.addressManager.AddAddresses(addresses, addresses[0], nil) + }) + } + + if a.cfg.GRPCSeed != "" { + dnsseed.SeedFromGRPC(a.cfg.NetParams(), a.cfg.GRPCSeed, appmessage.SFNodeNetwork, false, nil, + func(addresses []*appmessage.NetAddress) { + a.addressManager.AddAddresses(addresses, addresses[0], nil) + }) + } +} + +func setupDAG(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, + sigCache *txscript.SigCache, indexManager blockdag.IndexManager) (*blockdag.BlockDAG, error) { + + dag, err := blockdag.New(&blockdag.Config{ + DatabaseContext: databaseContext, + DAGParams: cfg.NetParams(), + TimeSource: blockdag.NewTimeSource(), + SigCache: sigCache, + IndexManager: indexManager, + SubnetworkID: cfg.SubnetworkID, + MaxUTXOCacheSize: cfg.MaxUTXOCacheSize, + }) + return dag, err +} + +func setupIndexes(cfg *config.Config) (blockdag.IndexManager, *indexers.AcceptanceIndex) { + // Create indexes if needed. + var indexes []indexers.Indexer + var acceptanceIndex *indexers.AcceptanceIndex + if cfg.AcceptanceIndex { + log.Info("acceptance index is enabled") + acceptanceIndex = indexers.NewAcceptanceIndex() + indexes = append(indexes, acceptanceIndex) + } + + // Create an index manager if any of the optional indexes are enabled. + if len(indexes) < 0 { + return nil, nil + } + indexManager := indexers.NewManager(indexes) + return indexManager, acceptanceIndex +} + +func setupMempool(cfg *config.Config, dag *blockdag.BlockDAG, sigCache *txscript.SigCache) *mempool.TxPool { + mempoolConfig := mempool.Config{ + Policy: mempool.Policy{ + AcceptNonStd: cfg.RelayNonStd, + MaxOrphanTxs: cfg.MaxOrphanTxs, + MaxOrphanTxSize: config.DefaultMaxOrphanTxSize, + MinRelayTxFee: cfg.MinRelayTxFee, + MaxTxVersion: 1, + }, + CalcTxSequenceLockFromReferencedUTXOEntries: dag.CalcTxSequenceLockFromReferencedUTXOEntries, + SigCache: sigCache, + DAG: dag, + } + + return mempool.New(&mempoolConfig) +} + +// P2PNodeID returns the network ID associated with this ComponentManager +func (a *ComponentManager) P2PNodeID() *id.ID { + return a.netAdapter.ID() +} + +// AddressManager returns the AddressManager associated with this ComponentManager +func (a *ComponentManager) AddressManager() *addressmanager.AddressManager { + return a.addressManager +} diff --git a/cmd/addblock/addblock.go b/cmd/addblock/addblock.go index 7fd0d9552d..704fbf18c5 100644 --- a/cmd/addblock/addblock.go +++ b/cmd/addblock/addblock.go @@ -5,10 +5,11 @@ package main import ( - "github.com/kaspanet/kaspad/infrastructure/logger" "os" "runtime" + "github.com/kaspanet/kaspad/infrastructure/logger" + "github.com/kaspanet/kaspad/infrastructure/os/limits" "github.com/kaspanet/kaspad/util/panics" ) @@ -77,7 +78,7 @@ func realMain() error { func main() { // Use all processor cores and up some limits. runtime.GOMAXPROCS(runtime.NumCPU()) - if err := limits.SetLimits(); err != nil { + if err := limits.SetLimits(nil); err != nil { os.Exit(1) } diff --git a/infrastructure/config/config.go b/infrastructure/config/config.go index a9715b9419..05c7e5161f 100644 --- a/infrastructure/config/config.go +++ b/infrastructure/config/config.go @@ -107,7 +107,6 @@ type Flags struct { ProxyPass string `long:"proxypass" default-mask:"-" description:"Password for proxy server"` DbType string `long:"dbtype" description:"Database backend to use for the Block DAG"` Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"` - CPUProfile string `long:"cpuprofile" description:"Write CPU profile to the specified file"` DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify =,=,... to set the log level for individual subsystems -- Use show to list available subsystems"` Upnp bool `long:"upnp" description:"Use UPnP to map our listening port outside of NAT"` MinRelayTxFee float64 `long:"minrelaytxfee" description:"The minimum transaction fee in KAS/kB to be considered a non-zero fee."` @@ -124,6 +123,7 @@ type Flags struct { ResetDatabase bool `long:"reset-db" description:"Reset database before starting node. It's needed when switching between subnetworks."` MaxUTXOCacheSize uint64 `long:"maxutxocachesize" description:"Max size of loaded UTXO into ram from the disk in bytes"` NetworkFlags + ServiceOptions *ServiceOptions } // Config defines the configuration options for kaspad. @@ -139,9 +139,9 @@ type Config struct { SubnetworkID *subnetworkid.SubnetworkID // nil in full nodes } -// serviceOptions defines the configuration options for the daemon as a service on +// ServiceOptions defines the configuration options for the daemon as a service on // Windows. -type serviceOptions struct { +type ServiceOptions struct { ServiceCommand string `short:"s" long:"service" description:"Service command {install, remove, start, stop}"` } @@ -160,10 +160,10 @@ func cleanAndExpandPath(path string) string { } // newConfigParser returns a new command line flags parser. -func newConfigParser(cfgFlags *Flags, so *serviceOptions, options flags.Options) *flags.Parser { +func newConfigParser(cfgFlags *Flags, options flags.Options) *flags.Parser { parser := flags.NewParser(cfgFlags, options) if runtime.GOOS == "windows" { - parser.AddGroup("Service Options", "Service Options", so) + parser.AddGroup("Service Options", "Service Options", cfgFlags.ServiceOptions) } return parser } @@ -189,6 +189,7 @@ func defaultFlags() *Flags { MinRelayTxFee: defaultMinRelayTxFee, AcceptanceIndex: defaultAcceptanceIndex, MaxUTXOCacheSize: defaultMaxUTXOCacheSize, + ServiceOptions: &ServiceOptions{}, } } @@ -211,24 +212,20 @@ func DefaultConfig() *Config { // The above results in kaspad functioning properly without any config settings // while still allowing the user to override settings with config files and // command line options. Command line options always take precedence. -func LoadConfig() (cfg *Config, remainingArgs []string, err error) { +func LoadConfig() (*Config, error) { cfgFlags := defaultFlags() - // Service options which are only added on Windows. - serviceOpts := serviceOptions{} - // Pre-parse the command line options to see if an alternative config // file or the version flag was specified. Any errors aside from the // help message error can be ignored here since they will be caught by // the final parse below. preCfg := cfgFlags - preParser := newConfigParser(preCfg, &serviceOpts, flags.HelpFlag) - _, err = preParser.Parse() + preParser := newConfigParser(preCfg, flags.HelpFlag) + _, err := preParser.Parse() if err != nil { var flagsErr *flags.Error if ok := errors.As(err, &flagsErr); ok && flagsErr.Type == flags.ErrHelp { - fmt.Fprintln(os.Stderr, err) - return nil, nil, err + return nil, err } } @@ -242,21 +239,10 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { os.Exit(0) } - // Perform service command and exit if specified. Invalid service - // commands show an appropriate error. Only runs on Windows since - // the RunServiceCommand function will be nil when not on Windows. - if serviceOpts.ServiceCommand != "" && RunServiceCommand != nil { - err := RunServiceCommand(serviceOpts.ServiceCommand) - if err != nil { - fmt.Fprintln(os.Stderr, err) - } - os.Exit(0) - } - // Load additional config from file. var configFileError error - parser := newConfigParser(cfgFlags, &serviceOpts, flags.Default) - cfg = &Config{ + parser := newConfigParser(cfgFlags, flags.Default) + cfg := &Config{ Flags: cfgFlags, } if !preCfg.Simnet || preCfg.ConfigFile != @@ -265,31 +251,27 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { if _, err := os.Stat(preCfg.ConfigFile); os.IsNotExist(err) { err := createDefaultConfigFile(preCfg.ConfigFile) if err != nil { - fmt.Fprintf(os.Stderr, "Error creating a "+ - "default config file: %s\n", err) + return nil, errors.Wrap(err, "Error creating a default config file") } } err := flags.NewIniParser(parser).ParseFile(preCfg.ConfigFile) if err != nil { if pErr := &(os.PathError{}); !errors.As(err, &pErr) { - fmt.Fprintf(os.Stderr, "Error parsing config "+ - "file: %s\n", err) - fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, errors.Wrapf(err, "Error parsing config file: %s\n\n%s", err, usageMessage) } configFileError = err } } // Parse command line options again to ensure they take precedence. - remainingArgs, err = parser.Parse() + _, err = parser.Parse() if err != nil { var flagsErr *flags.Error if ok := errors.As(err, &flagsErr); !ok || flagsErr.Type != flags.ErrHelp { - fmt.Fprintln(os.Stderr, usageMessage) + return nil, errors.Wrapf(err, "Error parsing command line arguments: %s\n\n%s", err, usageMessage) } - return nil, nil, err + return nil, err } // Create the home directory if it doesn't already exist. @@ -309,13 +291,12 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { str := "%s: Failed to create home directory: %s" err := errors.Errorf(str, funcName, err) - fmt.Fprintln(os.Stderr, err) - return nil, nil, err + return nil, err } err = cfg.ResolveNetwork(parser) if err != nil { - return nil, nil, err + return nil, err } // Set the default policy for relaying non-standard transactions @@ -330,7 +311,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err := errors.Errorf(str, funcName) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err case cfg.RejectNonStd: relayNonStd = false case cfg.RelayNonStd: @@ -367,7 +348,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err := errors.Errorf("%s: %s", funcName, err.Error()) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } // Validate profile port number @@ -378,7 +359,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err := errors.Errorf(str, funcName) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } } @@ -388,7 +369,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err := errors.Errorf(str, funcName, cfg.BanDuration) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } // Validate any given whitelisted IP addresses and networks. @@ -405,7 +386,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err = errors.Errorf(str, funcName, addr) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } var bits int if ip.To4() == nil { @@ -430,7 +411,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err := errors.Errorf(str, funcName) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } // --proxy or --connect without --listen disables listening. @@ -473,7 +454,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err := errors.Errorf(str, funcName, cfg.RPCMaxConcurrentReqs) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } // Validate the the minrelaytxfee. @@ -483,7 +464,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err := errors.Errorf(str, funcName, err) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } // Disallow 0 and negative min tx fees. @@ -492,7 +473,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err := errors.Errorf(str, funcName, cfg.MinRelayTxFee) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } // Limit the max block mass to a sane value. @@ -505,7 +486,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { blockMaxMassMax, cfg.BlockMaxMass) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } // Limit the max orphan count to a sane value. @@ -515,7 +496,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err := errors.Errorf(str, funcName, cfg.MaxOrphanTxs) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } // Look for illegal characters in the user agent comments. @@ -526,7 +507,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { funcName) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } } @@ -537,7 +518,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { funcName) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } // Add default port to all listener addresses if needed and remove @@ -545,7 +526,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { cfg.Listeners, err = network.NormalizeAddresses(cfg.Listeners, cfg.NetParams().DefaultPort) if err != nil { - return nil, nil, err + return nil, err } // Add default port to all rpc listener addresses if needed and remove @@ -553,7 +534,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { cfg.RPCListeners, err = network.NormalizeAddresses(cfg.RPCListeners, cfg.NetParams().RPCPort) if err != nil { - return nil, nil, err + return nil, err } // Disallow --addpeer and --connect used together @@ -562,7 +543,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err := errors.Errorf(str, funcName) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } // Add default port to all added peer addresses if needed and remove @@ -570,13 +551,13 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { cfg.AddPeers, err = network.NormalizeAddresses(cfg.AddPeers, cfg.NetParams().DefaultPort) if err != nil { - return nil, nil, err + return nil, err } cfg.ConnectPeers, err = network.NormalizeAddresses(cfg.ConnectPeers, cfg.NetParams().DefaultPort) if err != nil { - return nil, nil, err + return nil, err } // Setup dial and DNS resolution (lookup) functions depending on the @@ -593,7 +574,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { err := errors.Errorf(str, funcName, cfg.Proxy, err) fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, usageMessage) - return nil, nil, err + return nil, err } proxy := &socks.Proxy{ @@ -611,7 +592,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) { log.Warnf("%s", configFileError) } - return cfg, remainingArgs, nil + return cfg, nil } // createDefaultConfig copies the file sample-kaspad.conf to the given destination path, diff --git a/infrastructure/logger/logger.go b/infrastructure/logger/logger.go index 280816cfd3..71887d3d79 100644 --- a/infrastructure/logger/logger.go +++ b/infrastructure/logger/logger.go @@ -53,6 +53,7 @@ var ( dnssLog = BackendLog.Logger("DNSS") snvrLog = BackendLog.Logger("SNVR") ibdsLog = BackendLog.Logger("IBDS") + wsvcLog = BackendLog.Logger("WSVC") ) // SubsystemTags is an enum of all sub system tags @@ -83,7 +84,8 @@ var SubsystemTags = struct { NTAR, DNSS, SNVR, - IBDS string + IBDS, + WSVC string }{ ADXR: "ADXR", AMGR: "AMGR", @@ -112,6 +114,7 @@ var SubsystemTags = struct { DNSS: "DNSS", SNVR: "SNVR", IBDS: "IBDS", + WSVC: "WSVC", } // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -143,6 +146,7 @@ var subsystemLoggers = map[string]*Logger{ SubsystemTags.DNSS: dnssLog, SubsystemTags.SNVR: snvrLog, SubsystemTags.IBDS: ibdsLog, + SubsystemTags.WSVC: wsvcLog, } // InitLog attaches log file and error log file to the backend log. diff --git a/infrastructure/os/execenv/initialize.go b/infrastructure/os/execenv/initialize.go new file mode 100644 index 0000000000..f6ebd3f0ce --- /dev/null +++ b/infrastructure/os/execenv/initialize.go @@ -0,0 +1,22 @@ +package execenv + +import ( + "fmt" + "os" + "runtime" + + "github.com/kaspanet/kaspad/infrastructure/os/limits" +) + +// Initialize initializes the execution environment required to run kaspad +func Initialize(desiredLimits *limits.DesiredLimits) { + // Use all processor cores. + runtime.GOMAXPROCS(runtime.NumCPU()) + + // Up some limits. + if err := limits.SetLimits(desiredLimits); err != nil { + fmt.Fprintf(os.Stderr, "failed to set limits: %s\n", err) + os.Exit(1) + } + +} diff --git a/infrastructure/os/limits/desired_limits.go b/infrastructure/os/limits/desired_limits.go new file mode 100644 index 0000000000..94e4a10362 --- /dev/null +++ b/infrastructure/os/limits/desired_limits.go @@ -0,0 +1,7 @@ +package limits + +// DesiredLimits is a structure that specifies the limits desired by a running application +type DesiredLimits struct { + FileLimitWant uint64 + FileLimitMin uint64 +} diff --git a/infrastructure/os/limits/limits_plan9.go b/infrastructure/os/limits/limits_plan9.go index 9c4699d6c0..9897e7d873 100644 --- a/infrastructure/os/limits/limits_plan9.go +++ b/infrastructure/os/limits/limits_plan9.go @@ -5,6 +5,6 @@ package limits // SetLimits is a no-op on Plan 9 due to the lack of process accounting. -func SetLimits() error { +func SetLimits(*DesiredLimits) error { return nil } diff --git a/infrastructure/os/limits/limits_unix.go b/infrastructure/os/limits/limits_unix.go index 9b53876722..fc6ccbbd90 100644 --- a/infrastructure/os/limits/limits_unix.go +++ b/infrastructure/os/limits/limits_unix.go @@ -7,41 +7,39 @@ package limits import ( - "github.com/pkg/errors" "syscall" -) -const ( - fileLimitWant = 2048 - fileLimitMin = 1024 + "github.com/pkg/errors" ) +const () + // SetLimits raises some process limits to values which allow kaspad and // associated utilities to run. -func SetLimits() error { +func SetLimits(desiredLimits *DesiredLimits) error { var rLimit syscall.Rlimit err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit) if err != nil { return err } - if rLimit.Cur > fileLimitWant { + if rLimit.Cur > desiredLimits.FileLimitWant { return nil } - if rLimit.Max < fileLimitMin { + if rLimit.Max < desiredLimits.FileLimitMin { err = errors.Errorf("need at least %d file descriptors", - fileLimitMin) + desiredLimits.FileLimitMin) return err } - if rLimit.Max < fileLimitWant { + if rLimit.Max < desiredLimits.FileLimitWant { rLimit.Cur = rLimit.Max } else { - rLimit.Cur = fileLimitWant + rLimit.Cur = desiredLimits.FileLimitWant } err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit) if err != nil { // try min value - rLimit.Cur = fileLimitMin + rLimit.Cur = desiredLimits.FileLimitMin err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit) if err != nil { return err diff --git a/infrastructure/os/limits/limits_windows.go b/infrastructure/os/limits/limits_windows.go index 62655d739e..e0ef3da9b1 100644 --- a/infrastructure/os/limits/limits_windows.go +++ b/infrastructure/os/limits/limits_windows.go @@ -5,6 +5,6 @@ package limits // SetLimits is a no-op on Windows since it's not required there. -func SetLimits() error { +func SetLimits(*DesiredLimits) error { return nil } diff --git a/infrastructure/os/winservice/common.go b/infrastructure/os/winservice/common.go new file mode 100644 index 0000000000..d25e86b611 --- /dev/null +++ b/infrastructure/os/winservice/common.go @@ -0,0 +1,17 @@ +package winservice + +import "github.com/kaspanet/kaspad/infrastructure/config" + +// ServiceDescription contains information about a service, needed to administer it +type ServiceDescription struct { + Name string + DisplayName string + Description string +} + +// MainFunc specifies the signature of an application's main function to be able to run as a windows service +type MainFunc func(startedChan chan<- struct{}) error + +// WinServiceMain is only invoked on Windows. It detects when kaspad is running +// as a service and reacts accordingly. +var WinServiceMain = func(MainFunc, *ServiceDescription, *config.Config) (bool, error) { return false, nil } diff --git a/infrastructure/os/winservice/log.go b/infrastructure/os/winservice/log.go new file mode 100644 index 0000000000..b9cac8388a --- /dev/null +++ b/infrastructure/os/winservice/log.go @@ -0,0 +1,13 @@ +// Copyright (c) 2013-2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package winservice + +import ( + "github.com/kaspanet/kaspad/infrastructure/logger" + "github.com/kaspanet/kaspad/util/panics" +) + +var log, _ = logger.Get(logger.SubsystemTags.CNFG) +var spawn = panics.GoroutineWrapperFunc(log) diff --git a/infrastructure/os/winservice/service_command_windows.go b/infrastructure/os/winservice/service_command_windows.go new file mode 100644 index 0000000000..955df7fe00 --- /dev/null +++ b/infrastructure/os/winservice/service_command_windows.go @@ -0,0 +1,178 @@ +package winservice + +import ( + "os" + "path/filepath" + "time" + + "github.com/btcsuite/winsvc/eventlog" + + "github.com/btcsuite/winsvc/mgr" + + "github.com/btcsuite/winsvc/svc" + "github.com/pkg/errors" +) + +// performServiceCommand attempts to run one of the supported service commands +// provided on the command line via the service command flag. An appropriate +// error is returned if an invalid command is specified. +func (s *Service) performServiceCommand() error { + var err error + command := s.cfg.ServiceOptions.ServiceCommand + switch command { + case "install": + err = s.installService() + + case "remove": + err = s.removeService() + + case "start": + err = s.startService() + + case "stop": + err = s.controlService(svc.Stop, svc.Stopped) + + default: + err = errors.Errorf("invalid service command [%s]", command) + } + + return err +} + +// installService attempts to install the kaspad service. Typically this should +// be done by the msi installer, but it is provided here since it can be useful +// for development. +func (s *Service) installService() error { + // Get the path of the current executable. This is needed because + // os.Args[0] can vary depending on how the application was launched. + // For example, under cmd.exe it will only be the name of the app + // without the path or extension, but under mingw it will be the full + // path including the extension. + exePath, err := filepath.Abs(os.Args[0]) + if err != nil { + return err + } + if filepath.Ext(exePath) == "" { + exePath += ".exe" + } + + // Connect to the windows service manager. + serviceManager, err := mgr.Connect() + if err != nil { + return err + } + defer serviceManager.Disconnect() + + // Ensure the service doesn't already exist. + service, err := serviceManager.OpenService(s.description.Name) + if err == nil { + service.Close() + return errors.Errorf("service %s already exists", s.description.Name) + } + + // Install the service. + service, err = serviceManager.CreateService(s.description.Name, exePath, mgr.Config{ + DisplayName: s.description.DisplayName, + Description: s.description.Description, + }) + if err != nil { + return err + } + defer service.Close() + + // Support events to the event log using the standard "standard" Windows + // EventCreate.exe message file. This allows easy logging of custom + // messges instead of needing to create our own message catalog. + err = eventlog.Remove(s.description.Name) + if err != nil { + return err + } + eventsSupported := uint32(eventlog.Error | eventlog.Warning | eventlog.Info) + return eventlog.InstallAsEventCreate(s.description.Name, eventsSupported) +} + +// removeService attempts to uninstall the kaspad service. Typically this should +// be done by the msi uninstaller, but it is provided here since it can be +// useful for development. Not the eventlog entry is intentionally not removed +// since it would invalidate any existing event log messages. +func (s *Service) removeService() error { + // Connect to the windows service manager. + serviceManager, err := mgr.Connect() + if err != nil { + return err + } + defer serviceManager.Disconnect() + + // Ensure the service exists. + service, err := serviceManager.OpenService(s.description.Name) + if err != nil { + return errors.Errorf("service %s is not installed", s.description.Name) + } + defer service.Close() + + // Remove the service. + return service.Delete() +} + +// startService attempts to Start the kaspad service. +func (s *Service) startService() error { + // Connect to the windows service manager. + serviceManager, err := mgr.Connect() + if err != nil { + return err + } + defer serviceManager.Disconnect() + + service, err := serviceManager.OpenService(s.description.Name) + if err != nil { + return errors.Errorf("could not access service: %s", err) + } + defer service.Close() + + err = service.Start(os.Args) + if err != nil { + return errors.Errorf("could not start service: %s", err) + } + + return nil +} + +// controlService allows commands which change the status of the service. It +// also waits for up to 10 seconds for the service to change to the passed +// state. +func (s *Service) controlService(c svc.Cmd, to svc.State) error { + // Connect to the windows service manager. + serviceManager, err := mgr.Connect() + if err != nil { + return err + } + defer serviceManager.Disconnect() + + service, err := serviceManager.OpenService(s.description.Name) + if err != nil { + return errors.Errorf("could not access service: %s", err) + } + defer service.Close() + + status, err := service.Control(c) + if err != nil { + return errors.Errorf("could not send control=%d: %s", c, err) + } + + // Send the control message. + timeout := time.Now().Add(10 * time.Second) + for status.State != to { + if timeout.Before(time.Now()) { + return errors.Errorf("timeout waiting for service to go "+ + "to state=%d", to) + } + time.Sleep(300 * time.Millisecond) + status, err = service.Query() + if err != nil { + return errors.Errorf("could not retrieve service "+ + "status: %s", err) + } + } + + return nil +} diff --git a/infrastructure/os/winservice/service_main_windows.go b/infrastructure/os/winservice/service_main_windows.go new file mode 100644 index 0000000000..362489f5ea --- /dev/null +++ b/infrastructure/os/winservice/service_main_windows.go @@ -0,0 +1,40 @@ +package winservice + +import ( + "github.com/btcsuite/winsvc/svc" + "github.com/kaspanet/kaspad/infrastructure/config" +) + +// serviceMain checks whether we're being invoked as a service, and if so uses +// the service control manager to start the long-running server. A flag is +// returned to the caller so the application can determine whether to exit (when +// running as a service) or launch in normal interactive mode. +func serviceMain(main MainFunc, description *ServiceDescription, cfg *config.Config) (bool, error) { + service := newService(main, description, cfg) + + if cfg.ServiceOptions.ServiceCommand != "" { + return true, service.performServiceCommand() + } + + // Don't run as a service if we're running interactively (or that can't + // be determined due to an error). + isInteractive, err := svc.IsAnInteractiveSession() + if err != nil { + return false, err + } + if isInteractive { + return false, nil + } + + err = service.Start() + if err != nil { + return true, err + } + + return true, nil +} + +// Set windows specific functions to real functions. +func init() { + WinServiceMain = serviceMain +} diff --git a/infrastructure/os/winservice/service_windows.go b/infrastructure/os/winservice/service_windows.go new file mode 100644 index 0000000000..4b88c7124a --- /dev/null +++ b/infrastructure/os/winservice/service_windows.go @@ -0,0 +1,119 @@ +// Copyright (c) 2013-2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package winservice + +import ( + "fmt" + + "github.com/btcsuite/winsvc/eventlog" + "github.com/btcsuite/winsvc/svc" + "github.com/kaspanet/kaspad/infrastructure/config" + "github.com/kaspanet/kaspad/infrastructure/os/signal" + "github.com/kaspanet/kaspad/version" +) + +// Service houses the main service handler which handles all service +// updates and launching the application's main. +type Service struct { + main MainFunc + description *ServiceDescription + cfg *config.Config + eventLog *eventlog.Log +} + +func newService(main MainFunc, description *ServiceDescription, cfg *config.Config) *Service { + return &Service{ + main: main, + description: description, + cfg: cfg, + } +} + +// Start starts the srevice +func (s *Service) Start() error { + elog, err := eventlog.Open(s.description.Name) + if err != nil { + return err + } + s.eventLog = elog + defer s.eventLog.Close() + + err = svc.Run(s.description.Name, &Service{}) + if err != nil { + s.eventLog.Error(1, fmt.Sprintf("Service start failed: %s", err)) + return err + } + + return nil +} + +// Execute is the main entry point the winsvc package calls when receiving +// information from the Windows service control manager. It launches the +// long-running kaspadMain (which is the real meat of kaspad), handles service +// change requests, and notifies the service control manager of changes. +func (s *Service) Execute(args []string, r <-chan svc.ChangeRequest, changes chan<- svc.Status) (bool, uint32) { + // Service start is pending. + const cmdsAccepted = svc.AcceptStop | svc.AcceptShutdown + changes <- svc.Status{State: svc.StartPending} + + // Start kaspadMain in a separate goroutine so the service can start + // quickly. Shutdown (along with a potential error) is reported via + // doneChan. startedChan is notified once kaspad is started so this can + // be properly logged + doneChan := make(chan error) + startedChan := make(chan struct{}) + spawn("kaspadMain-windows", func() { + err := s.main(startedChan) + doneChan <- err + }) + + // Service is now started. + changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted} +loop: + for { + select { + case c := <-r: + switch c.Cmd { + case svc.Interrogate: + changes <- c.CurrentStatus + + case svc.Stop, svc.Shutdown: + // Service stop is pending. Don't accept any + // more commands while pending. + changes <- svc.Status{State: svc.StopPending} + + // Signal the main function to exit. + signal.ShutdownRequestChannel <- struct{}{} + + default: + s.eventLog.Error(1, fmt.Sprintf("Unexpected control "+ + "request #%d.", c)) + } + + case <-startedChan: + s.logServiceStart() + + case err := <-doneChan: + if err != nil { + s.eventLog.Error(1, err.Error()) + } + break loop + } + } + + // Service is now stopped. + changes <- svc.Status{State: svc.Stopped} + return false, 0 +} + +// logServiceStart logs information about kaspad when the main server has +// been started to the Windows event log. +func (s *Service) logServiceStart() { + var message string + message += fmt.Sprintf("%s version %s\n", s.description.DisplayName, version.Version()) + message += fmt.Sprintf("Configuration file: %s\n", s.cfg.ConfigFile) + message += fmt.Sprintf("Data directory: %s\n", s.cfg.DataDir) + message += fmt.Sprintf("Logs directory: %s\n", s.cfg.LogDir) +} diff --git a/log.go b/log.go index 6c7cd34ab7..500a870bf9 100644 --- a/log.go +++ b/log.go @@ -7,8 +7,6 @@ package main import ( "github.com/kaspanet/kaspad/infrastructure/logger" - "github.com/kaspanet/kaspad/util/panics" ) var log, _ = logger.Get(logger.SubsystemTags.KASD) -var spawn = panics.GoroutineWrapperFunc(log) diff --git a/main.go b/main.go index c259bc00b1..3d7c65df03 100644 --- a/main.go +++ b/main.go @@ -5,212 +5,14 @@ package main import ( - "fmt" _ "net/http/pprof" "os" - "path/filepath" - "runtime" - "runtime/pprof" - "time" "github.com/kaspanet/kaspad/app" - - "github.com/kaspanet/kaspad/infrastructure/db/dbaccess" - - "github.com/kaspanet/kaspad/domain/blockdag/indexers" - "github.com/kaspanet/kaspad/infrastructure/config" - "github.com/kaspanet/kaspad/infrastructure/os/limits" - "github.com/kaspanet/kaspad/infrastructure/os/signal" - "github.com/kaspanet/kaspad/util/panics" - "github.com/kaspanet/kaspad/util/profiling" - "github.com/kaspanet/kaspad/version" -) - -const ( - // blockDbNamePrefix is the prefix for the block database name. The - // database type is appended to this value to form the full block - // database name. - blockDbNamePrefix = "blocks" ) -// winServiceMain is only invoked on Windows. It detects when kaspad is running -// as a service and reacts accordingly. -var winServiceMain func() (bool, error) - -// kaspadMain is the real main function for kaspad. It is necessary to work -// around the fact that deferred functions do not run when os.Exit() is called. -// The optional startedChan writes once all services has started. -func kaspadMain(startedChan chan<- struct{}) error { - interrupt := signal.InterruptListener() - - // Load configuration and parse command line. This function also - // initializes logging and configures it accordingly. - cfg, _, err := config.LoadConfig() - if err != nil { - return err - } - defer panics.HandlePanic(log, "MAIN", nil) - - // Get a channel that will be closed when a shutdown signal has been - // triggered either from an OS signal such as SIGINT (Ctrl+C) or from - // another subsystem such as the RPC server. - defer log.Info("Shutdown complete") - - // Show version at startup. - log.Infof("Version %s", version.Version()) - - // Enable http profiling server if requested. - if cfg.Profile != "" { - profiling.Start(cfg.Profile, log) - } - - // Write cpu profile if requested. - if cfg.CPUProfile != "" { - f, err := os.Create(cfg.CPUProfile) - if err != nil { - log.Errorf("Unable to create cpu profile: %s", err) - return err - } - pprof.StartCPUProfile(f) - defer f.Close() - defer pprof.StopCPUProfile() - } - - // Perform upgrades to kaspad as new versions require it. - if err := doUpgrades(); err != nil { - log.Errorf("%s", err) - return err - } - - // Return now if an interrupt signal was triggered. - if signal.InterruptRequested(interrupt) { - return nil - } - - if cfg.ResetDatabase { - err := removeDatabase(cfg) - if err != nil { - log.Errorf("%s", err) - return err - } - } - - // Open the database - databaseContext, err := openDB(cfg) - if err != nil { - log.Errorf("%s", err) - return err - } - defer func() { - log.Infof("Gracefully shutting down the database...") - err := databaseContext.Close() - if err != nil { - log.Errorf("Failed to close the database: %s", err) - } - }() - - // Return now if an interrupt signal was triggered. - if signal.InterruptRequested(interrupt) { - return nil - } - - // Drop indexes and exit if requested. - if cfg.DropAcceptanceIndex { - if err := indexers.DropAcceptanceIndex(databaseContext); err != nil { - log.Errorf("%s", err) - return err - } - - return nil - } - - // Create app and start it. - app, err := app.New(cfg, databaseContext, interrupt) - if err != nil { - log.Errorf("Unable to start kaspad: %+v", err) - return err - } - defer func() { - log.Infof("Gracefully shutting down kaspad...") - - shutdownDone := make(chan struct{}) - go func() { - app.Stop() - shutdownDone <- struct{}{} - }() - - const shutdownTimeout = 2 * time.Minute - - select { - case <-shutdownDone: - case <-time.After(shutdownTimeout): - log.Criticalf("Graceful shutdown timed out %s. Terminating...", shutdownTimeout) - } - log.Infof("Kaspad shutdown complete") - }() - app.Start() - if startedChan != nil { - startedChan <- struct{}{} - } - - // Wait until the interrupt signal is received from an OS signal or - // shutdown is requested through one of the subsystems such as the RPC - // server. - <-interrupt - return nil -} - -func removeDatabase(cfg *config.Config) error { - dbPath := blockDbPath(cfg) - return os.RemoveAll(dbPath) -} - -// dbPath returns the path to the block database given a database type. -func blockDbPath(cfg *config.Config) string { - // The database name is based on the database type. - dbName := blockDbNamePrefix + "_" + cfg.DbType - dbPath := filepath.Join(cfg.DataDir, dbName) - return dbPath -} - -func openDB(cfg *config.Config) (*dbaccess.DatabaseContext, error) { - dbPath := filepath.Join(cfg.DataDir, "db") - log.Infof("Loading database from '%s'", dbPath) - return dbaccess.New(dbPath) -} - func main() { - // Use all processor cores. - runtime.GOMAXPROCS(runtime.NumCPU()) - - // Up some limits. - if err := limits.SetLimits(); err != nil { - fmt.Fprintf(os.Stderr, "failed to set limits: %s\n", err) - os.Exit(1) - } - - // Call serviceMain on Windows to handle running as a service. When - // the return isService flag is true, exit now since we ran as a - // service. Otherwise, just fall through to normal operation. - if runtime.GOOS == "windows" { - isService, err := winServiceMain() - if err != nil { - fmt.Println(err) - os.Exit(1) - } - if isService { - os.Exit(0) - } - } - - // Work around defer not working after os.Exit() - if err := kaspadMain(nil); err != nil { + if err := app.StartApp(); err != nil { os.Exit(1) } } - -// doUpgrades performs upgrades to kaspad as new versions require it. -// currently it's a placeholder we got from kaspad upstream, that does nothing -func doUpgrades() error { - return nil -} diff --git a/service_windows.go b/service_windows.go deleted file mode 100644 index 620ffa48d9..0000000000 --- a/service_windows.go +++ /dev/null @@ -1,309 +0,0 @@ -// Copyright (c) 2013-2016 The btcsuite developers -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - -package main - -import ( - "fmt" - "os" - "path/filepath" - "time" - - "github.com/pkg/errors" - - "github.com/btcsuite/winsvc/eventlog" - "github.com/btcsuite/winsvc/mgr" - "github.com/btcsuite/winsvc/svc" - "github.com/kaspanet/kaspad/infrastructure/config" - "github.com/kaspanet/kaspad/infrastructure/os/signal" - "github.com/kaspanet/kaspad/version" -) - -const ( - // svcName is the name of kaspad service. - svcName = "kaspadsvc" - - // svcDisplayName is the service name that will be shown in the windows - // services list. Not the svcName is the "real" name which is used - // to control the service. This is only for display purposes. - svcDisplayName = "Kaspad Service" - - // svcDesc is the description of the service. - svcDesc = "Downloads and stays synchronized with the Kaspa block " + - "DAG and provides DAG services to applications." -) - -// elog is used to send messages to the Windows event log. -var elog *eventlog.Log - -// logServiceStart logs information about kaspad when the main server has -// been started to the Windows event log. -func logServiceStart() { - var message string - message += fmt.Sprintf("Version %s\n", version.Version()) - message += fmt.Sprintf("Configuration directory: %s\n", config.DefaultHomeDir) - message += fmt.Sprintf("Configuration file: %s\n", cfg.ConfigFile) - message += fmt.Sprintf("Data directory: %s\n", cfg.DataDir) - - elog.Info(1, message) -} - -// kaspadService houses the main service handler which handles all service -// updates and launching kaspadMain. -type kaspadService struct{} - -// Execute is the main entry point the winsvc package calls when receiving -// information from the Windows service control manager. It launches the -// long-running kaspadMain (which is the real meat of kaspad), handles service -// change requests, and notifies the service control manager of changes. -func (s *kaspadService) Execute(args []string, r <-chan svc.ChangeRequest, changes chan<- svc.Status) (bool, uint32) { - // Service start is pending. - const cmdsAccepted = svc.AcceptStop | svc.AcceptShutdown - changes <- svc.Status{State: svc.StartPending} - - // Start kaspadMain in a separate goroutine so the service can start - // quickly. Shutdown (along with a potential error) is reported via - // doneChan. startedChan is notified once kaspad is started so this can - // be properly logged - doneChan := make(chan error) - startedChan := make(chan struct{}) - spawn("kaspadMain-windows", func() { - err := kaspadMain(startedChan) - doneChan <- err - }) - - // Service is now started. - changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted} -loop: - for { - select { - case c := <-r: - switch c.Cmd { - case svc.Interrogate: - changes <- c.CurrentStatus - - case svc.Stop, svc.Shutdown: - // Service stop is pending. Don't accept any - // more commands while pending. - changes <- svc.Status{State: svc.StopPending} - - // Signal the main function to exit. - signal.ShutdownRequestChannel <- struct{}{} - - default: - elog.Error(1, fmt.Sprintf("Unexpected control "+ - "request #%d.", c)) - } - - case <-startedChan: - logServiceStart() - - case err := <-doneChan: - if err != nil { - elog.Error(1, err.Error()) - } - break loop - } - } - - // Service is now stopped. - changes <- svc.Status{State: svc.Stopped} - return false, 0 -} - -// installService attempts to install the kaspad service. Typically this should -// be done by the msi installer, but it is provided here since it can be useful -// for development. -func installService() error { - // Get the path of the current executable. This is needed because - // os.Args[0] can vary depending on how the application was launched. - // For example, under cmd.exe it will only be the name of the app - // without the path or extension, but under mingw it will be the full - // path including the extension. - exePath, err := filepath.Abs(os.Args[0]) - if err != nil { - return err - } - if filepath.Ext(exePath) == "" { - exePath += ".exe" - } - - // Connect to the windows service manager. - serviceManager, err := mgr.Connect() - if err != nil { - return err - } - defer serviceManager.Disconnect() - - // Ensure the service doesn't already exist. - service, err := serviceManager.OpenService(svcName) - if err == nil { - service.Close() - return errors.Errorf("service %s already exists", svcName) - } - - // Install the service. - service, err = serviceManager.CreateService(svcName, exePath, mgr.Config{ - DisplayName: svcDisplayName, - Description: svcDesc, - }) - if err != nil { - return err - } - defer service.Close() - - // Support events to the event log using the standard "standard" Windows - // EventCreate.exe message file. This allows easy logging of custom - // messges instead of needing to create our own message catalog. - eventlog.Remove(svcName) - eventsSupported := uint32(eventlog.Error | eventlog.Warning | eventlog.Info) - return eventlog.InstallAsEventCreate(svcName, eventsSupported) -} - -// removeService attempts to uninstall the kaspad service. Typically this should -// be done by the msi uninstaller, but it is provided here since it can be -// useful for development. Not the eventlog entry is intentionally not removed -// since it would invalidate any existing event log messages. -func removeService() error { - // Connect to the windows service manager. - serviceManager, err := mgr.Connect() - if err != nil { - return err - } - defer serviceManager.Disconnect() - - // Ensure the service exists. - service, err := serviceManager.OpenService(svcName) - if err != nil { - return errors.Errorf("service %s is not installed", svcName) - } - defer service.Close() - - // Remove the service. - return service.Delete() -} - -// startService attempts to Start the kaspad service. -func startService() error { - // Connect to the windows service manager. - serviceManager, err := mgr.Connect() - if err != nil { - return err - } - defer serviceManager.Disconnect() - - service, err := serviceManager.OpenService(svcName) - if err != nil { - return errors.Errorf("could not access service: %s", err) - } - defer service.Close() - - err = service.Start(os.Args) - if err != nil { - return errors.Errorf("could not start service: %s", err) - } - - return nil -} - -// controlService allows commands which change the status of the service. It -// also waits for up to 10 seconds for the service to change to the passed -// state. -func controlService(c svc.Cmd, to svc.State) error { - // Connect to the windows service manager. - serviceManager, err := mgr.Connect() - if err != nil { - return err - } - defer serviceManager.Disconnect() - - service, err := serviceManager.OpenService(svcName) - if err != nil { - return errors.Errorf("could not access service: %s", err) - } - defer service.Close() - - status, err := service.Control(c) - if err != nil { - return errors.Errorf("could not send control=%d: %s", c, err) - } - - // Send the control message. - timeout := time.Now().Add(10 * time.Second) - for status.State != to { - if timeout.Before(time.Now()) { - return errors.Errorf("timeout waiting for service to go "+ - "to state=%d", to) - } - time.Sleep(300 * time.Millisecond) - status, err = service.Query() - if err != nil { - return errors.Errorf("could not retrieve service "+ - "status: %s", err) - } - } - - return nil -} - -// performServiceCommand attempts to run one of the supported service commands -// provided on the command line via the service command flag. An appropriate -// error is returned if an invalid command is specified. -func performServiceCommand(command string) error { - var err error - switch command { - case "install": - err = installService() - - case "remove": - err = removeService() - - case "start": - err = startService() - - case "stop": - err = controlService(svc.Stop, svc.Stopped) - - default: - err = errors.Errorf("invalid service command [%s]", command) - } - - return err -} - -// serviceMain checks whether we're being invoked as a service, and if so uses -// the service control manager to start the long-running server. A flag is -// returned to the caller so the application can determine whether to exit (when -// running as a service) or launch in normal interactive mode. -func serviceMain() (bool, error) { - // Don't run as a service if we're running interactively (or that can't - // be determined due to an error). - isInteractive, err := svc.IsAnInteractiveSession() - if err != nil { - return false, err - } - if isInteractive { - return false, nil - } - - elog, err = eventlog.Open(svcName) - if err != nil { - return false, err - } - defer elog.Close() - - err = svc.Run(svcName, &kaspadService{}) - if err != nil { - elog.Error(1, fmt.Sprintf("Service start failed: %s", err)) - return true, err - } - - return true, nil -} - -// Set windows specific functions to real functions. -func init() { - config.RunServiceCommand = performServiceCommand - winServiceMain = serviceMain -} diff --git a/testing/integration/setup_test.go b/testing/integration/setup_test.go index e2f018c9e6..86430f9beb 100644 --- a/testing/integration/setup_test.go +++ b/testing/integration/setup_test.go @@ -10,7 +10,7 @@ import ( ) type appHarness struct { - app *app.App + app *app.ComponentManager rpcClient *testRPCClient p2pAddress string rpcAddress string @@ -108,7 +108,7 @@ func teardownHarness(t *testing.T, harness *appHarness) { func setApp(t *testing.T, harness *appHarness) { var err error - harness.app, err = app.New(harness.config, harness.databaseContext, make(chan struct{})) + harness.app, err = app.NewComponentManager(harness.config, harness.databaseContext, make(chan struct{})) if err != nil { t.Fatalf("Error creating app: %+v", err) }