Skip to content

Commit

Permalink
[NOD-1420] Restructure main (#942)
Browse files Browse the repository at this point in the history
* [NOD-1420] Moved setting limits to executor

* [NOD-1420] Moved all code dealing with windows service to separate package

* [NOD-1420] Move practically all main to restructured app package

* [NOD-1420] Check for running as interactive only after checking if we are doing any service operation

* [NOD-1420] Add comments

* [NOD-1420] Add a comment
  • Loading branch information
svarogg committed Oct 1, 2020
1 parent ef6c46a commit 513ffa7
Show file tree
Hide file tree
Showing 19 changed files with 859 additions and 793 deletions.
347 changes: 141 additions & 206 deletions app/app.go

Large diffs are not rendered by default.

260 changes: 260 additions & 0 deletions app/component_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
package app

import (
"fmt"
"sync/atomic"

"github.com/kaspanet/kaspad/infrastructure/network/addressmanager"

"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"

"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol"
"github.com/kaspanet/kaspad/app/rpc"
"github.com/kaspanet/kaspad/domain/blockdag"
"github.com/kaspanet/kaspad/domain/blockdag/indexers"
"github.com/kaspanet/kaspad/domain/mempool"
"github.com/kaspanet/kaspad/domain/mining"
"github.com/kaspanet/kaspad/domain/txscript"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/db/dbaccess"
"github.com/kaspanet/kaspad/infrastructure/network/connmanager"
"github.com/kaspanet/kaspad/infrastructure/network/dnsseed"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter"
"github.com/kaspanet/kaspad/util/panics"
)

// ComponentManager is a wrapper for all the kaspad services
type ComponentManager struct {
cfg *config.Config
addressManager *addressmanager.AddressManager
protocolManager *protocol.Manager
rpcManager *rpc.Manager
connectionManager *connmanager.ConnectionManager
netAdapter *netadapter.NetAdapter

started, shutdown int32
}

// Start launches all the kaspad services.
func (a *ComponentManager) Start() {
// Already started?
if atomic.AddInt32(&a.started, 1) != 1 {
return
}

log.Trace("Starting kaspad")

err := a.netAdapter.Start()
if err != nil {
panics.Exit(log, fmt.Sprintf("Error starting the net adapter: %+v", err))
}

a.maybeSeedFromDNS()

a.connectionManager.Start()
}

// Stop gracefully shuts down all the kaspad services.
func (a *ComponentManager) Stop() {
// Make sure this only happens once.
if atomic.AddInt32(&a.shutdown, 1) != 1 {
log.Infof("Kaspad is already in the process of shutting down")
return
}

log.Warnf("Kaspad shutting down")

a.connectionManager.Stop()

err := a.netAdapter.Stop()
if err != nil {
log.Errorf("Error stopping the net adapter: %+v", err)
}

err = a.addressManager.Stop()
if err != nil {
log.Errorf("Error stopping address manager: %s", err)
}

return
}

// NewComponentManager returns a new ComponentManager instance.
// Use Start() to begin all services within this ComponentManager
func NewComponentManager(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt chan<- struct{}) (*ComponentManager, error) {
indexManager, acceptanceIndex := setupIndexes(cfg)

sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize)

// Create a new block DAG instance with the appropriate configuration.
dag, err := setupDAG(cfg, databaseContext, sigCache, indexManager)
if err != nil {
return nil, err
}

txMempool := setupMempool(cfg, dag, sigCache)

netAdapter, err := netadapter.NewNetAdapter(cfg)
if err != nil {
return nil, err
}
addressManager, err := addressmanager.New(cfg, databaseContext)
if err != nil {
return nil, err
}
connectionManager, err := connmanager.New(cfg, netAdapter, addressManager)
if err != nil {
return nil, err
}
protocolManager, err := protocol.NewManager(cfg, dag, netAdapter, addressManager, txMempool, connectionManager)
if err != nil {
return nil, err
}
rpcManager := setupRPC(cfg, txMempool, dag, sigCache, netAdapter, protocolManager, connectionManager, addressManager, acceptanceIndex, interrupt)

return &ComponentManager{
cfg: cfg,
protocolManager: protocolManager,
rpcManager: rpcManager,
connectionManager: connectionManager,
netAdapter: netAdapter,
addressManager: addressManager,
}, nil

}

func setupRPC(
cfg *config.Config,
txMempool *mempool.TxPool,
dag *blockdag.BlockDAG,
sigCache *txscript.SigCache,
netAdapter *netadapter.NetAdapter,
protocolManager *protocol.Manager,
connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager,
acceptanceIndex *indexers.AcceptanceIndex,
shutDownChan chan<- struct{},
) *rpc.Manager {

blockTemplateGenerator := mining.NewBlkTmplGenerator(&mining.Policy{BlockMaxMass: cfg.BlockMaxMass}, txMempool, dag, sigCache)
rpcManager := rpc.NewManager(cfg, netAdapter, dag, protocolManager, connectionManager, blockTemplateGenerator, txMempool, addressManager, acceptanceIndex, shutDownChan)
protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG)
protocolManager.SetOnTransactionAddedToMempoolHandler(rpcManager.NotifyTransactionAddedToMempool)
dag.Subscribe(func(notification *blockdag.Notification) {
err := handleBlockDAGNotifications(notification, acceptanceIndex, rpcManager)
if err != nil {
panic(err)
}
})
return rpcManager
}

func handleBlockDAGNotifications(notification *blockdag.Notification,
acceptanceIndex *indexers.AcceptanceIndex, rpcManager *rpc.Manager) error {

switch notification.Type {
case blockdag.NTChainChanged:
if acceptanceIndex == nil {
return nil
}
chainChangedNotificationData := notification.Data.(*blockdag.ChainChangedNotificationData)
err := rpcManager.NotifyChainChanged(chainChangedNotificationData.RemovedChainBlockHashes,
chainChangedNotificationData.AddedChainBlockHashes)
if err != nil {
return err
}
case blockdag.NTFinalityConflict:
finalityConflictNotificationData := notification.Data.(*blockdag.FinalityConflictNotificationData)
err := rpcManager.NotifyFinalityConflict(finalityConflictNotificationData.ViolatingBlockHash.String())
if err != nil {
return err
}
case blockdag.NTFinalityConflictResolved:
finalityConflictResolvedNotificationData := notification.Data.(*blockdag.FinalityConflictResolvedNotificationData)
err := rpcManager.NotifyFinalityConflictResolved(finalityConflictResolvedNotificationData.FinalityBlockHash.String())
if err != nil {
return err
}
}
return nil
}

func (a *ComponentManager) maybeSeedFromDNS() {
if !a.cfg.DisableDNSSeed {
dnsseed.SeedFromDNS(a.cfg.NetParams(), a.cfg.DNSSeed, appmessage.SFNodeNetwork, false, nil,
a.cfg.Lookup, func(addresses []*appmessage.NetAddress) {
// Kaspad uses a lookup of the dns seeder here. Since seeder returns
// IPs of nodes and not its own IP, we can not know real IP of
// source. So we'll take first returned address as source.
a.addressManager.AddAddresses(addresses, addresses[0], nil)
})
}

if a.cfg.GRPCSeed != "" {
dnsseed.SeedFromGRPC(a.cfg.NetParams(), a.cfg.GRPCSeed, appmessage.SFNodeNetwork, false, nil,
func(addresses []*appmessage.NetAddress) {
a.addressManager.AddAddresses(addresses, addresses[0], nil)
})
}
}

func setupDAG(cfg *config.Config, databaseContext *dbaccess.DatabaseContext,
sigCache *txscript.SigCache, indexManager blockdag.IndexManager) (*blockdag.BlockDAG, error) {

dag, err := blockdag.New(&blockdag.Config{
DatabaseContext: databaseContext,
DAGParams: cfg.NetParams(),
TimeSource: blockdag.NewTimeSource(),
SigCache: sigCache,
IndexManager: indexManager,
SubnetworkID: cfg.SubnetworkID,
MaxUTXOCacheSize: cfg.MaxUTXOCacheSize,
})
return dag, err
}

func setupIndexes(cfg *config.Config) (blockdag.IndexManager, *indexers.AcceptanceIndex) {
// Create indexes if needed.
var indexes []indexers.Indexer
var acceptanceIndex *indexers.AcceptanceIndex
if cfg.AcceptanceIndex {
log.Info("acceptance index is enabled")
acceptanceIndex = indexers.NewAcceptanceIndex()
indexes = append(indexes, acceptanceIndex)
}

// Create an index manager if any of the optional indexes are enabled.
if len(indexes) < 0 {
return nil, nil
}
indexManager := indexers.NewManager(indexes)
return indexManager, acceptanceIndex
}

func setupMempool(cfg *config.Config, dag *blockdag.BlockDAG, sigCache *txscript.SigCache) *mempool.TxPool {
mempoolConfig := mempool.Config{
Policy: mempool.Policy{
AcceptNonStd: cfg.RelayNonStd,
MaxOrphanTxs: cfg.MaxOrphanTxs,
MaxOrphanTxSize: config.DefaultMaxOrphanTxSize,
MinRelayTxFee: cfg.MinRelayTxFee,
MaxTxVersion: 1,
},
CalcTxSequenceLockFromReferencedUTXOEntries: dag.CalcTxSequenceLockFromReferencedUTXOEntries,
SigCache: sigCache,
DAG: dag,
}

return mempool.New(&mempoolConfig)
}

// P2PNodeID returns the network ID associated with this ComponentManager
func (a *ComponentManager) P2PNodeID() *id.ID {
return a.netAdapter.ID()
}

// AddressManager returns the AddressManager associated with this ComponentManager
func (a *ComponentManager) AddressManager() *addressmanager.AddressManager {
return a.addressManager
}
5 changes: 3 additions & 2 deletions cmd/addblock/addblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
package main

import (
"github.com/kaspanet/kaspad/infrastructure/logger"
"os"
"runtime"

"github.com/kaspanet/kaspad/infrastructure/logger"

"github.com/kaspanet/kaspad/infrastructure/os/limits"
"github.com/kaspanet/kaspad/util/panics"
)
Expand Down Expand Up @@ -77,7 +78,7 @@ func realMain() error {
func main() {
// Use all processor cores and up some limits.
runtime.GOMAXPROCS(runtime.NumCPU())
if err := limits.SetLimits(); err != nil {
if err := limits.SetLimits(nil); err != nil {
os.Exit(1)
}

Expand Down
Loading

0 comments on commit 513ffa7

Please sign in to comment.