Skip to content

Commit

Permalink
refactor(dpos2.0): transaction storage refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
RainFallsSilent committed Jun 10, 2022
1 parent 8ee37f2 commit eedba26
Show file tree
Hide file tree
Showing 20 changed files with 161 additions and 71 deletions.
7 changes: 6 additions & 1 deletion benchmark/tools/generator/chain/datagen.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,13 @@ func (g *DataGen) generateBlock(
func (g *DataGen) storeData(block *types.Block) error {
blockHash := block.Hash()
newNode := blockchain.NewBlockNode(&block.Header, &blockHash)

ps, err := blockchain.GetProcessorsFromBlock(block)
if err != nil {
return err
}
if err := g.chain.GetDB().GetFFLDB().SaveBlock(block, newNode,
nil, time.Unix(int64(block.Timestamp), 0)); err != nil {
nil, time.Unix(int64(block.Timestamp), 0), ps); err != nil {
return err
}

Expand Down
17 changes: 11 additions & 6 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,35 +201,40 @@ func (b *BlockChain) MigrateOldDB(
if start >= params.CRCOnlyDPOSHeight {
confirm, err = b.db.GetConfirm(hash)
if err != nil {
done <- fmt.Errorf("GetConfirm err: %s", err)
done <- fmt.Errorf("get confirm err: %s", err)
break
}
}

node, err := b.LoadBlockNode(&block.Header, &hash)
if err != nil {
done <- fmt.Errorf("LoadBlockNode err: %s", err)
done <- fmt.Errorf("load block node err: %s", err)
break
}
b.SetTip(node)

b.index.SetFlags(&block.Header, statusDataStored)
err = b.index.flushToDB()
if err != nil {
done <- fmt.Errorf("flushToDB err: %s", err)
done <- fmt.Errorf("flusht to DB err: %s", err)
break
}

err = b.db.GetFFLDB().SaveBlock(block, node, confirm, CalcPastMedianTime(node))
ps, err := GetProcessorsFromBlock(block)
if err != nil {
done <- fmt.Errorf("SaveBlock err: %s", err)
done <- fmt.Errorf("get processors err: %s", err)
break
}
err = b.db.GetFFLDB().SaveBlock(block, node, confirm, CalcPastMedianTime(node), ps)
if err != nil {
done <- fmt.Errorf("save block err: %s", err)
break
}

b.index.SetFlags(&block.Header, statusDataStored|statusValid)
err = b.index.flushToDB()
if err != nil {
done <- fmt.Errorf("flushToDB err: %s", err)
done <- fmt.Errorf("flush to DB err: %s", err)
break
}

Expand Down
21 changes: 20 additions & 1 deletion blockchain/chainio.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package blockchain
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
common2 "github.com/elastos/Elastos.ELA/core/types/common"
"math/big"
Expand Down Expand Up @@ -233,7 +234,16 @@ func dbStoreBlock(dbTx database.Tx, block *types.DposBlock) error {
if hasBlock {
return nil
}
return dbTx.StoreBlock(block)

buf := new(bytes.Buffer)
err = block.Serialize(buf)
if err != nil {
str := fmt.Sprintf("failed to get serialized bytes for block %s",
blockHash)
return errors.New(str)
}

return dbTx.StoreBlock(blockHash, buf.Bytes())
}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -283,6 +293,15 @@ func dbPutProposalDraftData(dbTx database.Tx, hash *common.Uint256, draftData []
return draftDataBucket.Put(hash[:], draftData)
}

// dbPutData store data to ffldb.
func DBPutData(dbTx database.Tx, name []byte, key []byte, value []byte) error {
// Add the block hash to height mapping to the index.
meta := dbTx.Metadata()
// Add the block height to hash mapping to the index.
dataBucket := meta.Bucket(name)
return dataBucket.Put(key, value)
}

// dbFetchProposalDraftData get the proposal draft data by draft hash.
func dbFetchProposalDraftData(dbTx database.Tx, hash *common.Uint256) ([]byte, error) {
meta := dbTx.Metadata()
Expand Down
23 changes: 21 additions & 2 deletions blockchain/chainstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"bytes"
"encoding/hex"
"errors"
"github.com/elastos/Elastos.ELA/core/types/functions"
"github.com/elastos/Elastos.ELA/database"
"path/filepath"
"sync"
"sync/atomic"
Expand All @@ -20,6 +20,7 @@ import (
"github.com/elastos/Elastos.ELA/common/log"
. "github.com/elastos/Elastos.ELA/core/types"
"github.com/elastos/Elastos.ELA/core/types/common"
"github.com/elastos/Elastos.ELA/core/types/functions"
"github.com/elastos/Elastos.ELA/core/types/interfaces"
"github.com/elastos/Elastos.ELA/core/types/payload"
_ "github.com/elastos/Elastos.ELA/database/ffldb"
Expand Down Expand Up @@ -215,12 +216,30 @@ func (c *ChainStore) rollback(b *Block, node *BlockNode,
return nil
}

func GetProcessorsFromBlock(b *Block) ([]database.TXProcessor, error) {
ps := make([]database.TXProcessor, 0)
for _, t := range b.Transactions {
processor, err := t.GetProcessor()
if err != nil {
return nil, err
}
if processor != nil {
ps = append(ps, processor)
}
}
return ps, nil
}

func (c *ChainStore) persist(b *Block, node *BlockNode,
confirm *payload.Confirm, medianTimePast time.Time) error {
c.persistMutex.Lock()
defer c.persistMutex.Unlock()

if err := c.fflDB.SaveBlock(b, node, confirm, medianTimePast); err != nil {
ps, err := GetProcessorsFromBlock(b)
if err != nil {
return err
}
if err := c.fflDB.SaveBlock(b, node, confirm, medianTimePast, ps); err != nil {
return err
}
return nil
Expand Down
40 changes: 6 additions & 34 deletions blockchain/chainstoreffldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,37 +133,6 @@ func (c *ChainStoreFFLDB) Close() error {
return c.db.Close()
}

func ProcessProposalDraftData(dbTx database.Tx, Transactions []interfaces.Transaction) (err error) {
for _, tx := range Transactions {
switch tx.TxType() {
case common.CRCProposal:
proposal := tx.Payload().(*payload.CRCProposal)
err = dbPutProposalDraftData(dbTx, &proposal.DraftHash, proposal.DraftData)
if err != nil {
return err
}
case common.CRCProposalTracking:
proposalTracking := tx.Payload().(*payload.CRCProposalTracking)
err = dbPutProposalDraftData(dbTx, &proposalTracking.SecretaryGeneralOpinionHash,
proposalTracking.SecretaryGeneralOpinionData)
if err != nil {
return err
}
err = dbPutProposalDraftData(dbTx, &proposalTracking.MessageHash, proposalTracking.MessageData)
if err != nil {
return err
}
case common.CRCProposalReview:
proposalReview := tx.Payload().(*payload.CRCProposalReview)
err = dbPutProposalDraftData(dbTx, &proposalReview.OpinionHash, proposalReview.OpinionData)
if err != nil {
return err
}
}
}
return err
}

func RollbackProcessProposalDraftData(dbTx database.Tx, Transactions []interfaces.Transaction) (err error) {
for _, tx := range Transactions {
switch tx.TxType() {
Expand Down Expand Up @@ -195,7 +164,7 @@ func RollbackProcessProposalDraftData(dbTx database.Tx, Transactions []interface
}

func (c *ChainStoreFFLDB) SaveBlock(b *Block, node *BlockNode,
confirm *payload.Confirm, medianTimePast time.Time) error {
confirm *payload.Confirm, medianTimePast time.Time, ps []database.TXProcessor) error {

err := c.db.Update(func(dbTx database.Tx) error {
return dbStoreBlock(dbTx, &DposBlock{
Expand Down Expand Up @@ -231,8 +200,11 @@ func (c *ChainStoreFFLDB) SaveBlock(b *Block, node *BlockNode,
return err
}

if b.Height >= c.params.ChangeCommitteeNewCRHeight {
ProcessProposalDraftData(dbTx, b.Transactions)
for _, processor := range ps {
err = processor(dbTx)
if err != nil {
return err
}
}

// Allow the index manager to call each of the currently active
Expand Down
1 change: 1 addition & 0 deletions blockchain/indexers/tx3index.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (idx *Tx3Index) ConnectBlock(dbTx database.Tx, block *types.Block) error {
}
}
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion blockchain/ledgerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type IFFLDBChainStore interface {

// SaveBlock will write block into file DB.
SaveBlock(b *Block, node *BlockNode, confirm *payload.Confirm,
medianTimePast time.Time) error
medianTimePast time.Time, ps []database.TXProcessor) error

// RollbackBlock only remove block state and block index.
RollbackBlock(b *Block, node *BlockNode,
Expand Down
8 changes: 8 additions & 0 deletions core/transaction/bucketname.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package transaction

var (

// proposalDraftDataBucketName is the name of the DB bucket used to house the
// proposal releated draft data and draft hash.
proposalDraftDataBucketName = []byte("proposaldraftdata")
)
9 changes: 9 additions & 0 deletions core/transaction/crcproposalreviewtransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"errors"
"fmt"
"github.com/elastos/Elastos.ELA/database"

"github.com/elastos/Elastos.ELA/blockchain"
"github.com/elastos/Elastos.ELA/common"
Expand Down Expand Up @@ -109,3 +110,11 @@ func (t *CRCProposalReviewTransaction) SpecialContextCheck() (result elaerr.ELAE
}
return nil, false
}

func (t *CRCProposalReviewTransaction) Process() (database.TXProcessor, elaerr.ELAError) {
proposalReview := t.Payload().(*payload.CRCProposalReview)
return func(tx database.Tx) error {
return blockchain.DBPutData(tx, proposalDraftDataBucketName,
proposalReview.OpinionHash[:], proposalReview.OpinionData)
}, nil
}
25 changes: 20 additions & 5 deletions core/transaction/crcproposaltrackingtransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/elastos/Elastos.ELA/database"

"github.com/elastos/Elastos.ELA/blockchain"
"github.com/elastos/Elastos.ELA/common"
Expand Down Expand Up @@ -82,15 +83,15 @@ func (t *CRCProposalTrackingTransaction) SpecialContextCheck() (result elaerr.EL
}
tempMessageHash := common.Hash(cptPayload.MessageData)
if !cptPayload.MessageHash.IsEqual(tempMessageHash) {
return elaerr.Simple(elaerr.ErrTxPayload, errors.New("the message data and message hash of" +
return elaerr.Simple(elaerr.ErrTxPayload, errors.New("the message data and message hash of"+
" proposal tracking are inconsistent")), true
}
if len(cptPayload.SecretaryGeneralOpinionData) >= payload.MaxSecretaryGeneralOpinionDataSize {
return elaerr.Simple(elaerr.ErrTxPayload, errors.New("the opinion data cannot be more than 200K byte")), true
}
tempOpinionHash := common.Hash(cptPayload.SecretaryGeneralOpinionData)
if !cptPayload.SecretaryGeneralOpinionHash.IsEqual(tempOpinionHash) {
return elaerr.Simple(elaerr.ErrTxPayload, errors.New("the opinion data and opinion hash of" +
return elaerr.Simple(elaerr.ErrTxPayload, errors.New("the opinion data and opinion hash of"+
" proposal tracking are inconsistent")), true
}
}
Expand Down Expand Up @@ -151,7 +152,6 @@ func (t *CRCProposalTrackingTransaction) checkCRCProposalCommonTracking(
return t.normalCheckCRCProposalTrackingSignature(t.parameters, cptPayload, pState, payloadVersion)
}


func (t *CRCProposalTrackingTransaction) normalCheckCRCProposalTrackingSignature(
params *TransactionParameters, cptPayload *payload.CRCProposalTracking, pState *crstate.ProposalState,
payloadVersion byte) error {
Expand Down Expand Up @@ -285,7 +285,6 @@ func (t *CRCProposalTrackingTransaction) checkCRCProposalProgressTracking(
return t.normalCheckCRCProposalTrackingSignature(t.parameters, cptPayload, pState, payloadVersion)
}


func (t *CRCProposalTrackingTransaction) checkCRCProposalRejectedTracking(
params *TransactionParameters, cptPayload *payload.CRCProposalTracking, pState *crstate.ProposalState,
blockHeight uint32, payloadVersion byte) error {
Expand Down Expand Up @@ -373,4 +372,20 @@ func (t *CRCProposalTrackingTransaction) checkCRCProposalFinalizedTracking(

// Check signature.
return t.normalCheckCRCProposalTrackingSignature(t.parameters, cptPayload, pState, payloadVersion)
}
}

func (t *CRCProposalTrackingTransaction) Process() (database.TXProcessor, elaerr.ELAError) {
proposalTracking := t.Payload().(*payload.CRCProposalTracking)

return func(tx database.Tx) error {
err := blockchain.DBPutData(tx, proposalDraftDataBucketName,
proposalTracking.SecretaryGeneralOpinionHash[:],
proposalTracking.SecretaryGeneralOpinionData)
if err != nil {
return err
}

return blockchain.DBPutData(tx, proposalDraftDataBucketName,
proposalTracking.MessageHash[:], proposalTracking.MessageData)
}, nil
}
9 changes: 9 additions & 0 deletions core/transaction/crcproposaltransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"errors"
"fmt"
"github.com/elastos/Elastos.ELA/database"
"sort"

"github.com/elastos/Elastos.ELA/blockchain"
Expand Down Expand Up @@ -669,3 +670,11 @@ func (t *CRCProposalTransaction) checkNormalOrELIPProposal(params *TransactionPa
crCouncilMember := t.parameters.BlockChain.GetCRCommittee().GetMember(proposal.CRCouncilMemberDID)
return t.checkOwnerAndCRCouncilMemberSign(proposal, crCouncilMember.Info.Code, PayloadVersion)
}

func (t *CRCProposalTransaction) Process() (database.TXProcessor, elaerr.ELAError) {
proposal := t.Payload().(*payload.CRCProposal)
return func(tx database.Tx) error {
return blockchain.DBPutData(tx, proposalDraftDataBucketName,
proposal.DraftHash[:], proposal.DraftData)
}, nil
}
1 change: 1 addition & 0 deletions core/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (

type BaseTransaction struct {
DefaultChecker
DefaultProcessor

version common2.TransactionVersion // New field added in TxVersion09
txType common2.TxType
Expand Down
20 changes: 20 additions & 0 deletions core/transaction/transactionprocesser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2017-2021 The Elastos Foundation
// Use of this source code is governed by an MIT
// license that can be found in the LICENSE file.
//

package transaction

import (
"github.com/elastos/Elastos.ELA/database"
elaerr "github.com/elastos/Elastos.ELA/errors"
)

type DefaultProcessor struct {
}

func (t *DefaultProcessor) GetProcessor() (database.TXProcessor, elaerr.ELAError) {

// todo process transaction
return nil, nil
}
1 change: 1 addition & 0 deletions core/types/interfaces/basetransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type Transaction interface {
TransactionChecker
TransactionProcessor

// get data
Version() common2.TransactionVersion
Expand Down
15 changes: 15 additions & 0 deletions core/types/interfaces/transactionprocessorinterface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2017-2021 The Elastos Foundation
// Use of this source code is governed by an MIT
// license that can be found in the LICENSE file.
//

package interfaces

import (
"github.com/elastos/Elastos.ELA/database"
elaerr "github.com/elastos/Elastos.ELA/errors"
)

type TransactionProcessor interface {
GetProcessor() (database.TXProcessor, elaerr.ELAError)
}
Loading

0 comments on commit eedba26

Please sign in to comment.