diff --git a/auctioneer.go b/auctioneer.go index c35cb6bc2..511990450 100644 --- a/auctioneer.go +++ b/auctioneer.go @@ -250,6 +250,10 @@ type AuctioneerConfig struct { // automatically clear the above TraderRejected map. TraderRejectResetInterval time.Duration + // TraderOnline is an order filter that is used to filter out offline + // traders before we start match making. + TraderOnline matching.OrderFilter + // RatingsAgency if non-nil, will be used as an extract matching // predicate when doing match making. RatingsAgency ratings.Agency @@ -1554,7 +1558,7 @@ func (a *Auctioneer) stateStep(currentState AuctionState, // nolint:gocyclo ) filterChain := []matching.OrderFilter{ matching.NewBatchFeeRateFilter(s.batchFeeRate), - accountFilter, + accountFilter, a.cfg.TraderOnline, } // We pass in our two conflict handlers that also act as match diff --git a/rpcserver.go b/rpcserver.go index 7c2e59dc8..5f29c095f 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -165,6 +165,10 @@ type rpcServer struct { snapshotCache map[orderT.BatchID]*subastadb.BatchSnapshot snapshotCacheMtx sync.Mutex + // activeTraders is a map where we'll add/remove traders as they come + // and go. + activeTraders *activeTradersMap + // connectedStreams is the list of all currently connected // bi-directional update streams. Each trader has exactly one stream // but can subscribe to updates for multiple accounts through the same @@ -183,7 +187,7 @@ func newRPCServer(store subastadb.Store, signer lndclient.SignerClient, ratingAgency ratings.Agency, ratingsDB ratings.NodeRatingsDatabase, listener, restListener net.Listener, serverOpts []grpc.ServerOption, restProxyCertOpt grpc.DialOption, - subscribeTimeout time.Duration) *rpcServer { + subscribeTimeout time.Duration, activeTraders *activeTradersMap) *rpcServer { return &rpcServer{ grpcServer: grpc.NewServer(serverOpts...), @@ -204,6 +208,7 @@ func newRPCServer(store subastadb.Store, signer lndclient.SignerClient, subscribeTimeout: subscribeTimeout, ratingAgency: ratingAgency, ratingsDB: ratingsDB, + activeTraders: activeTraders, } } @@ -1392,7 +1397,7 @@ func (s *rpcServer) addStreamSubscription(traderID lsat.TokenID, // There's no subscription for that account yet, notify our batch // executor that the trader for a certain account is now connected. trader.Subscriptions[newSub.AccountKey] = newSub - err := s.batchExecutor.RegisterTrader(newSub) + err := s.activeTraders.RegisterTrader(newSub) if err != nil { return fmt.Errorf("error registering trader at venue: %v", err) } @@ -1421,7 +1426,7 @@ func (s *rpcServer) disconnectTrader(traderID lsat.TokenID) error { for acctKey, trader := range subscriptions { monitoring.ObserveFailedConnection(acctKey) - err := s.batchExecutor.UnregisterTrader(trader) + err := s.activeTraders.UnregisterTrader(trader) if err != nil { return fmt.Errorf("error unregistering"+ "trader at venue: %v", err) diff --git a/rpcserver_test.go b/rpcserver_test.go index c9aa80b7d..6ec2b1bb5 100644 --- a/rpcserver_test.go +++ b/rpcserver_test.go @@ -450,6 +450,9 @@ func TestRPCServerBatchAuctionStreamInitialTimeout(t *testing.T) { } func newServer(store subastadb.Store) *rpcServer { + activeTraders := &activeTradersMap{ + activeTraders: make(map[matching.AccountID]*venue.ActiveTrader), + } batchExecutor := venue.NewBatchExecutor(&venue.ExecutorConfig{ Store: &executorStore{ Store: store, @@ -457,6 +460,7 @@ func newServer(store subastadb.Store) *rpcServer { Signer: mockSigner, BatchStorer: venue.NewExeBatchStorer(store), TraderMsgTimeout: time.Second * 15, + ActiveTraders: activeTraders.GetTraders, }) return newRPCServer( @@ -465,7 +469,7 @@ func newServer(store subastadb.Store) *rpcServer { OrderExecBaseFee: 1, OrderExecFeeRate: 100, }, nil, nil, bufconn.Listen(100), bufconn.Listen(100), nil, nil, - defaultTimeout, + defaultTimeout, activeTraders, ) } diff --git a/server.go b/server.go index 590dbfe10..a9ae06d31 100644 --- a/server.go +++ b/server.go @@ -126,6 +126,63 @@ func (e *executorStore) UpdateExecutionState(newState venue.ExecutionState) erro return nil } +type activeTradersMap struct { + activeTraders map[matching.AccountID]*venue.ActiveTrader + sync.RWMutex +} + +// RegisterTrader registers a new trader as being active. An active traders is +// eligible to join execution of a batch that they're a part of. +func (a *activeTradersMap) RegisterTrader(t *venue.ActiveTrader) error { + a.Lock() + defer a.Unlock() + + _, ok := a.activeTraders[t.AccountKey] + if ok { + return fmt.Errorf("trader %x already registered", + t.AccountKey) + } + a.activeTraders[t.AccountKey] = t + + log.Infof("Registering new trader: %x", t.AccountKey[:]) + + return nil +} + +// UnregisterTrader removes a registered trader from the batch. +func (a *activeTradersMap) UnregisterTrader(t *venue.ActiveTrader) error { + a.Lock() + defer a.Unlock() + + delete(a.activeTraders, t.AccountKey) + + log.Infof("Disconnecting trader: %x", t.AccountKey[:]) + return nil +} + +// IsActive returns true if the given key is among the active traders. +func (a *activeTradersMap) IsActive(acctKey [33]byte) bool { + a.RLock() + defer a.RUnlock() + + _, ok := a.activeTraders[acctKey] + return ok +} + +// GetTrades returns the current set of active traders. +func (a *activeTradersMap) GetTraders() map[matching.AccountID]*venue.ActiveTrader { + a.RLock() + defer a.RUnlock() + + c := make(map[matching.AccountID]*venue.ActiveTrader, len(a.activeTraders)) + + for k, v := range a.activeTraders { + c[k] = v + } + + return c +} + var _ venue.ExecutorStore = (*executorStore)(nil) // Server is the main auction auctioneer server. @@ -146,6 +203,11 @@ type Server struct { batchExecutor *venue.BatchExecutor + // activeTraders is a map of all the current active traders. An active + // trader is one that's online and has a live communication channel + // with the BatchExecutor. + activeTraders *activeTradersMap + auctioneer *Auctioneer channelEnforcer *chanenforcement.ChannelEnforcer @@ -232,12 +294,16 @@ func NewServer(cfg *Config) (*Server, error) { exeStore := &executorStore{ Store: store, } + activeTraders := &activeTradersMap{ + activeTraders: make(map[matching.AccountID]*venue.ActiveTrader), + } batchExecutor := venue.NewBatchExecutor(&venue.ExecutorConfig{ Store: exeStore, Signer: lnd.Signer, BatchStorer: venue.NewExeBatchStorer(store), AccountWatcher: accountManager, TraderMsgTimeout: defaultMsgTimeout, + ActiveTraders: activeTraders.GetTraders, }) durationBuckets := order.NewDurationBuckets() @@ -297,6 +363,7 @@ func NewServer(cfg *Config) (*Server, error) { accountManager: accountManager, orderBook: orderBook, batchExecutor: batchExecutor, + activeTraders: activeTraders, auctioneer: NewAuctioneer(AuctioneerConfig{ DB: newAuctioneerStore(store), ChainNotifier: lnd.ChainNotifier, @@ -340,6 +407,7 @@ func NewServer(cfg *Config) (*Server, error) { FundingConflictsResetInterval: cfg.FundingConflictResetInterval, TraderRejected: traderRejected, TraderRejectResetInterval: cfg.TraderRejectResetInterval, + TraderOnline: matching.NewTraderOnlineFilter(activeTraders.IsActive), RatingsAgency: ratingsAgency, }), channelEnforcer: channelEnforcer, @@ -425,6 +493,7 @@ func NewServer(cfg *Config) (*Server, error) { server.orderBook, batchExecutor, server.auctioneer, auctionTerms, ratingsAgency, ratingsDB, grpcListener, restListener, serverOpts, clientCertOpt, cfg.SubscribeTimeout, + activeTraders, ) server.rpcServer = auctioneerServer cfg.Prometheus.PublicRPCServer = auctioneerServer.grpcServer diff --git a/venue/batch_executor.go b/venue/batch_executor.go index 78c2c2024..61867dee4 100644 --- a/venue/batch_executor.go +++ b/venue/batch_executor.go @@ -322,6 +322,11 @@ type ExecutorConfig struct { // takes longer than that it is timed out and removed from the current // batch. TraderMsgTimeout time.Duration + + // ActiveTraders is a closure that returns a map of all the current + // active traders. An active trader is one that's online and has a live + // communication channel with the BatchExecutor. + ActiveTraders func() map[matching.AccountID]*ActiveTrader } // BatchExecutor is the primary state machine that executes a cleared batch. @@ -332,11 +337,6 @@ type BatchExecutor struct { started uint32 // To be used atomically. stopped uint32 // To be used atomically. - // activeTraders is a map of all the current active traders. An active - // trader is one that's online and has a live communication channel - // with the BatchExecutor. - activeTraders map[matching.AccountID]*ActiveTrader - // newBatches is a channel used to accept new incoming requests to // execute a batch. newBatches chan *executionReq @@ -347,8 +347,6 @@ type BatchExecutor struct { cfg *ExecutorConfig - sync.RWMutex - quit chan struct{} wg sync.WaitGroup } @@ -357,11 +355,10 @@ type BatchExecutor struct { // configuration. func NewBatchExecutor(cfg *ExecutorConfig) *BatchExecutor { return &BatchExecutor{ - cfg: cfg, - quit: make(chan struct{}), - activeTraders: make(map[matching.AccountID]*ActiveTrader), - newBatches: make(chan *executionReq), - venueEvents: make(chan EventTrigger), + cfg: cfg, + quit: make(chan struct{}), + newBatches: make(chan *executionReq), + venueEvents: make(chan EventTrigger), } } @@ -397,7 +394,10 @@ func (b *BatchExecutor) Stop() error { // validateTradersOnline ensures that all the traders included in this batch // are currently online within the venue. If not, then the batch will be failed // with ErrMissingTraders. -func (b *BatchExecutor) validateTradersOnline(batch *matching.OrderBatch) error { +func (b *BatchExecutor) validateTradersOnline( + activeTraders map[matching.AccountID]*ActiveTrader, + batch *matching.OrderBatch) error { + offlineTraders := make(map[matching.AccountID]struct{}) offlineNonces := make(map[orderT.Nonce]struct{}) @@ -406,12 +406,12 @@ func (b *BatchExecutor) validateTradersOnline(batch *matching.OrderBatch) error // trader. for _, order := range batch.Orders { - if _, ok := b.activeTraders[order.Asker.AccountKey]; !ok { + if _, ok := activeTraders[order.Asker.AccountKey]; !ok { offlineTraders[order.Asker.AccountKey] = struct{}{} offlineNonces[order.Details.Ask.Nonce()] = struct{}{} } - if _, ok := b.activeTraders[order.Bidder.AccountKey]; !ok { + if _, ok := activeTraders[order.Bidder.AccountKey]; !ok { offlineTraders[order.Bidder.AccountKey] = struct{}{} offlineNonces[order.Details.Bid.Nonce()] = struct{}{} } @@ -526,19 +526,20 @@ func (b *BatchExecutor) stateStep(currentState ExecutionState, // nolint:gocyclo // all the trader's in the bach are actually online and // register. If one or more of them aren't, then we'll need to // reject this batch with the proper error. - err := b.validateTradersOnline(env.exeCtx.OrderBatch) + activeTraders := b.cfg.ActiveTraders() + err := b.validateTradersOnline( + activeTraders, env.exeCtx.OrderBatch, + ) if err != nil { env.tempErr = err return BatchTempError, env, nil } - b.Lock() - // To ensure that we have a fully up to date view of the state // of each of the trader's, we'll sync what we have here, with // the set of traders on disk. - if err := b.syncTraderState(); err != nil { - b.Unlock() + activeTraders, err = b.syncTraderState(activeTraders) + if err != nil { return 0, env, err } @@ -546,11 +547,9 @@ func (b *BatchExecutor) stateStep(currentState ExecutionState, // nolint:gocyclo // of active traders in the environment so we can begin our // message passing phase. for trader := range env.exeCtx.OrderBatch.FeeReport.AccountDiffs { - env.traders[trader] = b.activeTraders[trader] + env.traders[trader] = activeTraders[trader] } - b.Unlock() - // If there are no traders part of this batch, it means the // batch is empty and there is nothing to execute. In this case // we skip straight ahead to the finalization stage. @@ -793,10 +792,6 @@ func (b *BatchExecutor) stateStep(currentState ExecutionState, // nolint:gocyclo return BatchTempError, env, nil } - b.RLock() - trader := b.activeTraders[src] - b.RUnlock() - signMsg := msgRecv.msg.(*TraderSignMsg) acctSig, ok := signMsg.Sigs[hex.EncodeToString(src[:])] if !ok { @@ -808,6 +803,7 @@ func (b *BatchExecutor) stateStep(currentState ExecutionState, // nolint:gocyclo // generate our own signature for his input to ensure // we can properly spend it with broadcast of the batch // transaction. + trader := env.traders[src] traderAcctInput, ok := env.exeCtx.AcctInputForTrader( trader.AccountKey, ) @@ -1267,41 +1263,6 @@ func (b *BatchExecutor) Submit(exeCtx *batchtx.ExecutionContext) ( return exeReq.Result, nil } -// RegisterTrader registers a new trader as being active. An active traders is -// eligible to join execution of a batch that they're a part of. -func (b *BatchExecutor) RegisterTrader(t *ActiveTrader) error { - b.Lock() - defer b.Unlock() - - _, ok := b.activeTraders[t.AccountKey] - if ok { - return fmt.Errorf("trader %x already registered", - t.AccountKey) - } - b.activeTraders[t.AccountKey] = t - - log.Infof("Registering new trader: %x", t.AccountKey[:]) - - return nil -} - -// UnregisterTrader removes a registered trader from the batch. -// -// TODO(roasbeef): job of the caller to unregister the traders to ensure we -// don't loop in the state machine -func (b *BatchExecutor) UnregisterTrader(t *ActiveTrader) error { - b.Lock() - defer b.Unlock() - - delete(b.activeTraders, t.AccountKey) - - log.Infof("Disconnecting trader: %x", t.AccountKey[:]) - - // TODO(roasbeef): client always removes traders? - - return nil -} - // HandleTraderMsg sends a new message from the target to the main batch // executor. func (b *BatchExecutor) HandleTraderMsg(m TraderMsg) error { @@ -1318,11 +1279,12 @@ func (b *BatchExecutor) HandleTraderMsg(m TraderMsg) error { // syncTraderState syncs the passed state with the resulting account state // after the batch has been applied for all traders involved in the executed -// batch. -// -// NOTE: The write lock MUST be held when calling this method. -func (b *BatchExecutor) syncTraderState() error { - log.Debugf("Syncing account state for %v traders", len(b.activeTraders)) +// batch. The set of refreshed traders is returned. +func (b *BatchExecutor) syncTraderState( + activeTraders map[matching.AccountID]*ActiveTrader) ( + map[matching.AccountID]*ActiveTrader, error) { + + log.Debugf("Syncing account state for %v traders", len(activeTraders)) // For each active trader, we'll attempt to sync the state of our // in-memory representation with the resulting state after the batch @@ -1330,16 +1292,17 @@ func (b *BatchExecutor) syncTraderState() error { // // TODO(roasbeef): optimize by only refreshing traders that were in a // recent batch? for account updates send them to the executor - for acctID := range b.activeTraders { + refreshed := make(map[matching.AccountID]*ActiveTrader, len(activeTraders)) + for acctID, trader := range activeTraders { acctKey, err := btcec.ParsePubKey(acctID[:], btcec.S256()) if err != nil { - return err + return nil, err } diskTraderAcct, err := b.cfg.Store.Account( context.Background(), acctKey, false, ) if err != nil { - return err + return nil, err } // Now that we have the fresh trader state from disk, we'll @@ -1347,8 +1310,13 @@ func (b *BatchExecutor) syncTraderState() error { refreshedTrader := matching.NewTraderFromAccount( diskTraderAcct, ) - b.activeTraders[acctID].Trader = &refreshedTrader + refreshed[acctID] = &ActiveTrader{ + Trader: &refreshedTrader, + TokenID: trader.TokenID, + CommLine: trader.CommLine, + } + } - return nil + return refreshed, nil } diff --git a/venue/batch_executor_test.go b/venue/batch_executor_test.go index b4408f7d1..1a94e0d82 100644 --- a/venue/batch_executor_test.go +++ b/venue/batch_executor_test.go @@ -128,6 +128,8 @@ type executorTestHarness struct { outgoingChans map[matching.AccountID]chan ExecutionMsg + activeTraders map[matching.AccountID]*ActiveTrader + batchTx *wire.MsgTx } @@ -137,17 +139,23 @@ func newExecutorTestHarness(t *testing.T, msgTimeout time.Duration) *executorTes PrivKeys: []*btcec.PrivateKey{batchPriv}, } + activeTraders := make(map[matching.AccountID]*ActiveTrader) + watcher := &mockAccountWatcher{} return &executorTestHarness{ t: t, store: store, outgoingChans: make(map[matching.AccountID]chan ExecutionMsg), + activeTraders: activeTraders, executor: NewBatchExecutor(&ExecutorConfig{ Store: store, Signer: signer, BatchStorer: NewExeBatchStorer(store), AccountWatcher: watcher, TraderMsgTimeout: msgTimeout, + ActiveTraders: func() map[matching.AccountID]*ActiveTrader { + return activeTraders + }, }), watcher: watcher, } @@ -203,10 +211,7 @@ func (e *executorTestHarness) RegisterTrader(acct *account.Account) { TokenID: randomTokenID(), } - err := e.executor.RegisterTrader(activeTrader) - if err != nil { - e.t.Fatalf("unable to register trader: %v", err) - } + e.activeTraders[trader.AccountKey] = activeTrader } func (e *executorTestHarness) SubmitBatch(batch *matching.OrderBatch, diff --git a/venue/matching/order_filter.go b/venue/matching/order_filter.go index bccba3189..67919f845 100644 --- a/venue/matching/order_filter.go +++ b/venue/matching/order_filter.go @@ -212,3 +212,35 @@ func isAccountReady(acct *account.Account, expiryHeightCutoff uint32) bool { return false } } + +// TraderOnlineFilter is a filter that filters out orders for offline traders. +type TraderOnlineFilter struct { + isOnline func([33]byte) bool +} + +// A compile time check to make sure TraderOnlineFilter implements the +// OrderFilter interface. +var _ OrderFilter = (*TraderOnlineFilter)(nil) + +// NewTraderOnlineFilter creates a new order filter using the passed method +// checking whether a trader is online. +func NewTraderOnlineFilter(isOnline func([33]byte) bool) *TraderOnlineFilter { + return &TraderOnlineFilter{ + isOnline: isOnline, + } +} + +// IsSuitable returns true if this specific predicate doesn't have any objection +// about an order being included in the matchmaking process. +// +// NOTE: This is part of the OrderFilter interface. +func (p *TraderOnlineFilter) IsSuitable(o order.ServerOrder) bool { + if !p.isOnline(o.Details().AcctKey) { + log.Debugf("Filtered out order %v with offline trader ("+ + "node=%x, acct=%x)", o.Nonce(), + o.ServerDetails().NodeKey[:], o.Details().AcctKey[:]) + return false + } + + return true +} diff --git a/venue/matching/order_filter_test.go b/venue/matching/order_filter_test.go index 7f77c7fad..f5a4b914b 100644 --- a/venue/matching/order_filter_test.go +++ b/venue/matching/order_filter_test.go @@ -70,6 +70,22 @@ func TestLeaseDurationFilter(t *testing.T) { require.True(t, filter.IsSuitable(order3)) } +func TestTraderOnlineFilter(t *testing.T) { + t.Parallel() + + online := false + filter := NewTraderOnlineFilter(func(_ [33]byte) bool { + return online + }) + + require.False(t, filter.IsSuitable(order1)) + require.False(t, filter.IsSuitable(order2)) + + online = true + require.True(t, filter.IsSuitable(order1)) + require.True(t, filter.IsSuitable(order2)) +} + func TestAccountPredicate(t *testing.T) { t.Parallel()