From 8b115a75e97f7b5d046ad55f6bfd0286d6d60e2d Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 11 Oct 2021 10:09:32 +0300 Subject: [PATCH 1/5] Allow multiple transactions in one block per key --- service/flow_helpers/account.go | 57 ++++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/service/flow_helpers/account.go b/service/flow_helpers/account.go index 8e2d13c..d019bf5 100644 --- a/service/flow_helpers/account.go +++ b/service/flow_helpers/account.go @@ -17,6 +17,10 @@ var accounts map[flow.Address]*Account var accountsLock = &sync.Mutex{} // Making sure our "accounts" var is a singleton var keyIndexLock = &sync.Mutex{} +var seqNumLock = &sync.Mutex{} +var lastAccountKeySeqNumber map[flow.Address]map[int]uint64 +var lastAccountKeyBlock map[flow.Address]map[int]flow.Identifier + const GOOGLE_KMS_KEY_TYPE = "google_kms" type Account struct { @@ -74,10 +78,16 @@ func (a *Account) KeyIndex() int { func (a *Account) GetProposalKey(ctx context.Context, flowClient *client.Client) (*flow.AccountKey, error) { account, err := flowClient.GetAccount(ctx, a.Address) - k := account.Keys[a.KeyIndex()] if err != nil { return nil, fmt.Errorf("error in flow_helpers.Account.GetProposalKey: %w", err) } + + k := account.Keys[a.KeyIndex()] + + if latestBlockHeader, err := flowClient.GetLatestBlockHeader(ctx, true); err == nil { + k.SequenceNumber = getSequenceNumber(a.Address, k, latestBlockHeader.ID) + } + return k, nil } @@ -120,3 +130,48 @@ func getGoogleKMSSigner(address flow.Address, resourceId string) (crypto.Signer, return s, nil } + +// getSequenceNumber, is a hack around the fact that GetAccount on Flow Client returns +// the latest SequenceNumber on-chain but it might be outdated as we may be +// sending multiple transactions in the current block +func getSequenceNumber(address flow.Address, accountKey *flow.AccountKey, currentBlockID flow.Identifier) uint64 { + seqNumLock.Lock() + defer seqNumLock.Unlock() + + // Init lastAccountKeySeqNumber + if lastAccountKeySeqNumber == nil { + lastAccountKeySeqNumber = make(map[flow.Address]map[int]uint64) + } + + if lastAccountKeySeqNumber[address] == nil { + lastAccountKeySeqNumber[address] = make(map[int]uint64) + } + + // Init lastAccountKeyBlock + if lastAccountKeyBlock == nil { + lastAccountKeyBlock = make(map[flow.Address]map[int]flow.Identifier) + } + + if lastAccountKeyBlock[address] == nil { + lastAccountKeyBlock[address] = make(map[int]flow.Identifier) + } + + useGiven := true + + // Check if operating in the same block as before + if prevID, ok := lastAccountKeyBlock[address][accountKey.Index]; ok && prevID == currentBlockID { + // Check if we have a previous number stored and if it is larger or equal to new number + if prevNumber, ok := lastAccountKeySeqNumber[address][accountKey.Index]; ok && accountKey.SequenceNumber <= prevNumber { + lastAccountKeySeqNumber[address][accountKey.Index]++ + useGiven = false + } + } + + if useGiven { + lastAccountKeySeqNumber[address][accountKey.Index] = accountKey.SequenceNumber + } + + lastAccountKeyBlock[address][accountKey.Index] = currentBlockID + + return lastAccountKeySeqNumber[address][accountKey.Index] +} From 8ab512ccabea884838af9d2ad3c4f88f3e28114c Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 11 Oct 2021 10:11:01 +0300 Subject: [PATCH 2/5] Check if admin is given too many key indexes --- main.go | 6 +++++- service/app/app.go | 10 +++++++--- service/app/contract_interface.go | 11 +++++++++-- test_lib.go | 5 ++++- 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index 39d21f9..a72e570 100644 --- a/main.go +++ b/main.go @@ -94,7 +94,11 @@ func runServer(cfg *config.Config) error { } // Application - app := app.New(cfg, logger, db, flowClient, true) + app, err := app.New(cfg, logger, db, flowClient, true) + if err != nil { + return err + } + defer app.Close() // HTTP server diff --git a/service/app/app.go b/service/app/app.go index 9efa4fd..eb2c996 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -20,12 +20,16 @@ type App struct { quit chan bool // Chan type does not matter as we only use this to 'close' } -func New(cfg *config.Config, logger *log.Logger, db *gorm.DB, flowClient *client.Client, poll bool) *App { +func New(cfg *config.Config, logger *log.Logger, db *gorm.DB, flowClient *client.Client, poll bool) (*App, error) { if logger == nil { panic("no logger") } - contract := NewContract(cfg, logger, flowClient) + contract, err := NewContract(cfg, logger, flowClient) + if err != nil { + return nil, err + } + quit := make(chan bool) app := &App{cfg, logger, db, flowClient, contract, quit} @@ -33,7 +37,7 @@ func New(cfg *config.Config, logger *log.Logger, db *gorm.DB, flowClient *client go poller(app) } - return app + return app, nil } // Closes allows the poller to close controllably diff --git a/service/app/contract_interface.go b/service/app/contract_interface.go index 504076a..02756cd 100644 --- a/service/app/contract_interface.go +++ b/service/app/contract_interface.go @@ -59,14 +59,21 @@ func minInt(a int, b int) int { return a } -func NewContract(cfg *config.Config, logger *log.Logger, flowClient *client.Client) *Contract { +func NewContract(cfg *config.Config, logger *log.Logger, flowClient *client.Client) (*Contract, error) { pdsAccount := flow_helpers.GetAccount( flow.HexToAddress(cfg.AdminAddress), cfg.AdminPrivateKey, cfg.AdminPrivateKeyType, cfg.AdminPrivateKeyIndexes, ) - return &Contract{cfg, logger, flowClient, pdsAccount} + flowAccount, err := flowClient.GetAccount(context.Background(), pdsAccount.Address) + if err != nil { + return nil, err + } + if len(flowAccount.Keys) < len(pdsAccount.KeyIndexes) { + return nil, fmt.Errorf("too many key indexes given for admin account") + } + return &Contract{cfg, logger, flowClient, pdsAccount}, nil } // StartSettlement sets the given distributions state to 'settling' and starts the settlement diff --git a/test_lib.go b/test_lib.go index d0ce10a..e3d48ff 100644 --- a/test_lib.go +++ b/test_lib.go @@ -79,7 +79,10 @@ func getTestApp(cfg *config.Config, poll bool) (*app.App, func()) { panic(err) } - app := app.New(cfg, testLogger, db, flowClient, poll) + app, err := app.New(cfg, testLogger, db, flowClient, poll) + if err != nil { + panic(err) + } clean := func() { app.Close() From e9b367644b1651ea07f4a545572280a2e4ec1970 Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 11 Oct 2021 10:29:09 +0300 Subject: [PATCH 3/5] Use reference block id to make fewer calls to flow --- service/flow_helpers/account.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/service/flow_helpers/account.go b/service/flow_helpers/account.go index d019bf5..e82e401 100644 --- a/service/flow_helpers/account.go +++ b/service/flow_helpers/account.go @@ -76,7 +76,7 @@ func (a *Account) KeyIndex() int { return i } -func (a *Account) GetProposalKey(ctx context.Context, flowClient *client.Client) (*flow.AccountKey, error) { +func (a *Account) GetProposalKey(ctx context.Context, flowClient *client.Client, referenceBlockID flow.Identifier) (*flow.AccountKey, error) { account, err := flowClient.GetAccount(ctx, a.Address) if err != nil { return nil, fmt.Errorf("error in flow_helpers.Account.GetProposalKey: %w", err) @@ -84,9 +84,7 @@ func (a *Account) GetProposalKey(ctx context.Context, flowClient *client.Client) k := account.Keys[a.KeyIndex()] - if latestBlockHeader, err := flowClient.GetLatestBlockHeader(ctx, true); err == nil { - k.SequenceNumber = getSequenceNumber(a.Address, k, latestBlockHeader.ID) - } + k.SequenceNumber = getSequenceNumber(a.Address, k, referenceBlockID) return k, nil } @@ -134,6 +132,7 @@ func getGoogleKMSSigner(address flow.Address, resourceId string) (crypto.Signer, // getSequenceNumber, is a hack around the fact that GetAccount on Flow Client returns // the latest SequenceNumber on-chain but it might be outdated as we may be // sending multiple transactions in the current block +// NOTE: This breaks if running in a multi-instance setup func getSequenceNumber(address flow.Address, accountKey *flow.AccountKey, currentBlockID flow.Identifier) uint64 { seqNumLock.Lock() defer seqNumLock.Unlock() From 8372d374b950d0484b2987ef69550821e23ffd5c Mon Sep 17 00:00:00 2001 From: Lauri Junkkari Date: Mon, 11 Oct 2021 10:29:27 +0300 Subject: [PATCH 4/5] Use GetLatestBlockHeader instead of GetLatestBlock --- service/app/contract_interface.go | 22 +++++++++++----------- service/flow_helpers/transaction.go | 2 +- service/transactions/transactions.go | 4 ++-- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/service/app/contract_interface.go b/service/app/contract_interface.go index 02756cd..f560cde 100644 --- a/service/app/contract_interface.go +++ b/service/app/contract_interface.go @@ -102,7 +102,7 @@ func (c *Contract) StartSettlement(ctx context.Context, db *gorm.DB, dist *Distr return err // rollback } - latestBlock, err := c.flowClient.GetLatestBlock(ctx, true) + latestBlockHeader, err := c.flowClient.GetLatestBlockHeader(ctx, true) if err != nil { return err // rollback } @@ -131,7 +131,7 @@ func (c *Contract) StartSettlement(ctx context.Context, db *gorm.DB, dist *Distr DistributionID: dist.ID, CurrentCount: 0, TotalCount: uint(len(collectibles)), - StartAtBlock: latestBlock.Height - 1, + StartAtBlock: latestBlockHeader.Height - 1, EscrowAddress: common.FlowAddressFromString(c.cfg.AdminAddress), Collectibles: settlementCollectibles, } @@ -218,7 +218,7 @@ func (c *Contract) StartMinting(ctx context.Context, db *gorm.DB, dist *Distribu return err // rollback } - latestBlock, err := c.flowClient.GetLatestBlock(ctx, true) + latestBlockHeader, err := c.flowClient.GetLatestBlockHeader(ctx, true) if err != nil { return err // rollback } @@ -227,7 +227,7 @@ func (c *Contract) StartMinting(ctx context.Context, db *gorm.DB, dist *Distribu cpc := CirculatingPackContract{ Name: dist.PackTemplate.PackReference.Name, Address: dist.PackTemplate.PackReference.Address, - StartAtBlock: latestBlock.Height - 1, + StartAtBlock: latestBlockHeader.Height - 1, } // Try to find an existing one (CirculatingPackContract) @@ -271,7 +271,7 @@ func (c *Contract) StartMinting(ctx context.Context, db *gorm.DB, dist *Distribu DistributionID: dist.ID, CurrentCount: 0, TotalCount: uint(len(packs)), - StartAtBlock: latestBlock.Height - 1, + StartAtBlock: latestBlockHeader.Height - 1, } if err := InsertMinting(db, &minting); err != nil { @@ -389,13 +389,13 @@ func (c *Contract) UpdateSettlementStatus(ctx context.Context, db *gorm.DB, dist return err // rollback } - latestBlock, err := c.flowClient.GetLatestBlock(ctx, true) + latestBlockHeader, err := c.flowClient.GetLatestBlockHeader(ctx, true) if err != nil { return err // rollback } begin := settlement.StartAtBlock + 1 - end := min(latestBlock.Height, begin+MAX_EVENTS_PER_CHECK) + end := min(latestBlockHeader.Height, begin+MAX_EVENTS_PER_CHECK) logger = logger.WithFields(log.Fields{ "blockBegin": begin, @@ -523,13 +523,13 @@ func (c *Contract) UpdateMintingStatus(ctx context.Context, db *gorm.DB, dist *D return err // rollback } - latestBlock, err := c.flowClient.GetLatestBlock(ctx, true) + latestBlockHeader, err := c.flowClient.GetLatestBlockHeader(ctx, true) if err != nil { return err // rollback } begin := minting.StartAtBlock + 1 - end := min(latestBlock.Height, begin+MAX_EVENTS_PER_CHECK) + end := min(latestBlockHeader.Height, begin+MAX_EVENTS_PER_CHECK) logger = logger.WithFields(log.Fields{ "blockBegin": begin, @@ -680,13 +680,13 @@ func (c *Contract) UpdateCirculatingPack(ctx context.Context, db *gorm.DB, cpc * OPENED, } - latestBlock, err := c.flowClient.GetLatestBlock(ctx, true) + latestBlockHeader, err := c.flowClient.GetLatestBlockHeader(ctx, true) if err != nil { return err // rollback } begin := cpc.StartAtBlock + 1 - end := min(latestBlock.Height, begin+MAX_EVENTS_PER_CHECK) + end := min(latestBlockHeader.Height, begin+MAX_EVENTS_PER_CHECK) logger = logger.WithFields(log.Fields{ "blockBegin": begin, diff --git a/service/flow_helpers/transaction.go b/service/flow_helpers/transaction.go index f667959..2ec68cc 100644 --- a/service/flow_helpers/transaction.go +++ b/service/flow_helpers/transaction.go @@ -8,7 +8,7 @@ import ( ) func SignProposeAndPayAs(ctx context.Context, flowClient *client.Client, account *Account, tx *flow.Transaction) error { - key, err := account.GetProposalKey(ctx, flowClient) + key, err := account.GetProposalKey(ctx, flowClient, tx.ReferenceBlockID) if err != nil { return err } diff --git a/service/transactions/transactions.go b/service/transactions/transactions.go index 5d13ad6..9dbac28 100644 --- a/service/transactions/transactions.go +++ b/service/transactions/transactions.go @@ -105,12 +105,12 @@ func (t *StorableTransaction) Prepare(ctx context.Context, flowClient *client.Cl } } - latestBlock, err := flowClient.GetLatestBlock(ctx, true) + latestBlockHeader, err := flowClient.GetLatestBlockHeader(ctx, true) if err != nil { return nil, err } - tx.SetReferenceBlockID(latestBlock.ID) + tx.SetReferenceBlockID(latestBlockHeader.ID) if err := flow_helpers.SignProposeAndPayAs(ctx, flowClient, account, tx); err != nil { return nil, err From dee15aa499e0182369da24bc0f7b27c7d5e038d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Klaus=20Dahl=C3=A9n?= Date: Wed, 13 Oct 2021 15:11:13 +0300 Subject: [PATCH 5/5] Update README with note about single instance support --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index f94a9dc..3b471f0 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,10 @@ Dev environment # Needs docker-compose installed make dev +### Deployment notes + +**NOTE:** Currently the PDS backend only supports a single instance setup. This is because of sequence number bookkeeping in `service/flow_helpers/account.go` (see `getSequenceNumber`). + ## Testing cp env.example .env.test