From 6d443ad2bafcb1aa2a3c6481b8d45ff75faa9552 Mon Sep 17 00:00:00 2001 From: curryxbo Date: Fri, 13 Sep 2024 11:16:01 +0800 Subject: [PATCH 1/2] refactor oracle rollup epoch --- oracle/config/config.go | 3 +- oracle/db/config.go | 41 +++++ oracle/db/iterator.go | 1 + oracle/db/keys.go | 8 + oracle/db/store.go | 85 +++++++++ oracle/db/store_test.go | 61 +++++++ oracle/oracle/change_point.go | 326 ++++++++++++++++++++++++++++++++++ oracle/oracle/oracle.go | 31 +++- oracle/oracle/rollup.go | 276 ++++++++++------------------ oracle/types/types.go | 54 ++++++ 10 files changed, 697 insertions(+), 189 deletions(-) create mode 100644 oracle/db/config.go create mode 100644 oracle/db/iterator.go create mode 100644 oracle/db/keys.go create mode 100644 oracle/db/store.go create mode 100644 oracle/db/store_test.go create mode 100644 oracle/oracle/change_point.go create mode 100644 oracle/types/types.go diff --git a/oracle/config/config.go b/oracle/config/config.go index 071cab7aa..e841830a6 100644 --- a/oracle/config/config.go +++ b/oracle/config/config.go @@ -59,7 +59,8 @@ type Config struct { // MetricsPort is the port at which the metrics server is running. MetricsPort uint64 - RollupAddr common.Address + RollupAddr common.Address + L1StakingAddr common.Address MaxSize uint64 MinSize uint64 diff --git a/oracle/db/config.go b/oracle/db/config.go new file mode 100644 index 000000000..e88c46b7d --- /dev/null +++ b/oracle/db/config.go @@ -0,0 +1,41 @@ +package db + +import ( + "github.com/urfave/cli" + + "morph-l2/node/flags" +) + +type Config struct { + DBPath string `json:"db_path"` + Namespace string `json:"namespace"` + DatabaseHandles int `json:"database_handles"` + DatabaseCache int `json:"database_cache"` + DatabaseFreezer string `json:"database_freezer"` +} + +func DefaultConfig() *Config { + return &Config{ + Namespace: "staking-oracle", + DatabaseHandles: 256, + DatabaseCache: 256, + } +} + +func (c *Config) SetCliContext(ctx *cli.Context) { + if ctx.GlobalIsSet(flags.DBDataDir.Name) { + c.DBPath = ctx.GlobalString(flags.DBDataDir.Name) + } + if ctx.GlobalIsSet(flags.DBNamespace.Name) { + c.Namespace = ctx.GlobalString(flags.DBNamespace.Name) + } + if ctx.GlobalIsSet(flags.DBHandles.Name) { + c.DatabaseHandles = ctx.GlobalInt(flags.DBHandles.Name) + } + if ctx.GlobalIsSet(flags.DBCache.Name) { + c.DatabaseCache = ctx.GlobalInt(flags.DBCache.Name) + } + if ctx.GlobalIsSet(flags.DBFreezer.Name) { + c.DatabaseFreezer = ctx.GlobalString(flags.DBFreezer.Name) + } +} diff --git a/oracle/db/iterator.go b/oracle/db/iterator.go new file mode 100644 index 000000000..3a49c63e3 --- /dev/null +++ b/oracle/db/iterator.go @@ -0,0 +1 @@ +package db diff --git a/oracle/db/keys.go b/oracle/db/keys.go new file mode 100644 index 000000000..dd1309358 --- /dev/null +++ b/oracle/db/keys.go @@ -0,0 +1,8 @@ +package db + +var ( + syncedL1HeightKey = []byte("LastSyncedL1Height") + latestL1ChangePointKey = []byte("LastSyncedL1Height") + syncedL2HeightKey = []byte("LastSyncedL1Height") + changePointsKey = []byte("ChangePoints") +) diff --git a/oracle/db/store.go b/oracle/db/store.go new file mode 100644 index 000000000..842b1e650 --- /dev/null +++ b/oracle/db/store.go @@ -0,0 +1,85 @@ +package db + +import ( + "fmt" + + "path/filepath" + "sync" + + "github.com/morph-l2/go-ethereum/core/rawdb" + "github.com/morph-l2/go-ethereum/ethdb" + "github.com/morph-l2/go-ethereum/rlp" + "github.com/syndtr/goleveldb/leveldb" + + "morph-l2/oracle/types" +) + +type Store struct { + db ethdb.Database + ChainPointSync sync.Mutex +} + +func NewMemoryStore() *Store { + return &Store{ + db: rawdb.NewMemoryDatabase(), + } +} + +func NewStore(config *Config, home string) (*Store, error) { + var ( + db ethdb.Database + err error + dbPath = config.DBPath + freezer = config.DatabaseFreezer + ) + + if dbPath == "" { + if home == "" { + return nil, fmt.Errorf("either Home or DB path has to be provided") + } + dbPath = filepath.Join(home, "node-data") + } + + if config.DatabaseFreezer == "" { + freezer = filepath.Join(dbPath, "ancient") + } + db, err = rawdb.NewLevelDBDatabaseWithFreezer(dbPath, config.DatabaseCache, config.DatabaseHandles, freezer, config.Namespace, false) + if err != nil { + return nil, err + } + + return &Store{ + db: db, + ChainPointSync: sync.Mutex{}, + }, nil +} + +func (s *Store) WriteLatestChangeContext(changePoints types.ChangeContext) error { + data, err := rlp.EncodeToBytes(changePoints) + if err != nil { + return err + } + if err := s.db.Put(changePointsKey, data); err != nil { + panic(fmt.Sprintf("failed to update change points failed, err: %v", err)) + } + return nil +} + +func (s *Store) ReadLatestChangePoints() types.ChangeContext { + data, err := s.db.Get(changePointsKey) + if err != nil && !isNotFoundErr(err) { + panic(fmt.Sprintf("failed to read change points, err: %v", err)) + } + if err != nil { + panic(fmt.Sprintf("failed to sync change points, err: %v", err)) + } + var changeCtx types.ChangeContext + if err := rlp.DecodeBytes(data, &changeCtx); err != nil { + panic(fmt.Sprintf("decode data to changepoint error:%v", err)) + } + return changeCtx +} + +func isNotFoundErr(err error) bool { + return err.Error() == leveldb.ErrNotFound.Error() || err.Error() == types.ErrMemoryDBNotFound.Error() +} diff --git a/oracle/db/store_test.go b/oracle/db/store_test.go new file mode 100644 index 000000000..4fd5044fc --- /dev/null +++ b/oracle/db/store_test.go @@ -0,0 +1,61 @@ +package db + +import ( + "testing" + + "morph-l2/oracle/types" + + "github.com/morph-l2/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +func TestLatestSyncedL1Height(t *testing.T) { + db := NewMemoryStore() + err := db.WriteLatestChangeContext(types.ChangeContext{ + L2Sequencers: []types.L2Sequencer{ + { + TimeStamp: 1627891200, + EpochInterval: 10, + Addresses: []common.Address{common.HexToAddress("0x123")}, + }, + { + TimeStamp: 1627891260, + EpochInterval: 10, + Addresses: []common.Address{}, + }, + }, + ActiveStakersByTime: []types.ActiveStakers{ + { + TimeStamp: 1627891200, + BlockNumber: 100, + Addresses: []common.Address{}, + }, + { + TimeStamp: 1627891260, + BlockNumber: 101, + Addresses: []common.Address{}, + }, + }, + ChangePoints: []types.ChangePoint{ + { + TimeStamp: 1627891200, + BlockNumber: 100, + EpochInterval: 10, + Submitters: []common.Address{}, + ChangeType: 1, + }, + { + TimeStamp: 1627891260, + BlockNumber: 101, + EpochInterval: 10, + Submitters: []common.Address{}, + ChangeType: 2, + }, + }, + L1Synced: 1, + L2synced: 1, + }) + require.NoError(t, err) + changePoints := db.ReadLatestChangePoints() + t.Log(changePoints) +} diff --git a/oracle/oracle/change_point.go b/oracle/oracle/change_point.go new file mode 100644 index 000000000..c8ed8b825 --- /dev/null +++ b/oracle/oracle/change_point.go @@ -0,0 +1,326 @@ +package oracle + +import ( + "fmt" + "math/big" + "sort" + "time" + + "morph-l2/oracle/types" + + "github.com/morph-l2/go-ethereum/accounts/abi/bind" + "github.com/morph-l2/go-ethereum/common" + "github.com/morph-l2/go-ethereum/log" +) + +//func (o *Oracle) queryActiveStakersByTime(timestamp uint64) ([]common.Address, error) { +// for _, activeStakers := range o.ChangeCtx.ActiveStakersByTime { +// if timestamp >= activeStakers.TimeStamp { +// return activeStakers.Addresses, nil +// } +// } +// return nil, fmt.Errorf("not found,invalid timestamp,expect bigger or equal than %v but have :%v ", o.ChangeCtx.ActiveStakersByTime[0].TimeStamp, timestamp) +//} + +func (o *Oracle) syncL1ChangePoint(start, end, startTime, endTime uint64) error { + var epochBlock []int + stakerAddedPoint, err := o.fetchL1StakerAdded(o.ctx, start, end) + if err != nil { + return err + } + epochBlock = append(epochBlock, stakerAddedPoint...) + stakerRemoved, err := o.fetchL1StakerRemoved(o.ctx, start, end) + if err != nil { + return err + } + epochBlock = append(epochBlock, stakerRemoved...) + sortedBlocks := removeDuplicatesAndSort(epochBlock) + sort.Ints(sortedBlocks) + for _, sortBlock := range sortedBlocks { + header, err := o.l1Client.HeaderByNumber(o.ctx, big.NewInt(int64(sortBlock))) + if err != nil { + return err + } + activeStakers, err := o.l1Staking.GetActiveStakers(&bind.CallOpts{ + BlockNumber: header.Number, + }) + if err != nil { + return err + } + l1Staker := types.ActiveStakers{ + TimeStamp: header.Time, + Addresses: activeStakers, + BlockNumber: uint64(sortBlock), + } + // TODO + if header.Time > endTime { + break + } + if header.Time > o.ChangeCtx.ActiveStakersByTime[len(o.ChangeCtx.ActiveStakersByTime)-1].TimeStamp { + o.ChangeCtx.ActiveStakersByTime = append(o.ChangeCtx.ActiveStakersByTime, l1Staker) + o.ChangeCtx.L1Synced = uint64(sortBlock) + } + + } + var changePoints []types.ChangePoint + for _, eb := range stakerRemoved { + header, err := o.l1Client.HeaderByNumber(o.ctx, big.NewInt(int64(eb))) + if err != nil { + return err + } + if header.Time < startTime { + continue + } + if header.Time > endTime { + break + } + changePoint := types.ChangePoint{ + //Submitters: activeSequencerSet, + TimeStamp: header.Time, + BlockNumber: header.Number.Uint64(), + ChangeType: types.L1ChangePoint, + } + if header.Time > endTime { + break + } + if len(changePoints) == 0 || header.Time > changePoints[len(changePoints)-1].TimeStamp { + changePoints = append(changePoints, changePoint) + } + changePoints = append(changePoints, changePoint) + } + if len(stakerRemoved) == 0 { + o.ChangeCtx.L1Synced = end + } + o.insertL1AndMerge(changePoints) + return nil +} + +func (o *Oracle) syncL2ChangePoint(start, end uint64) ([]types.ChangePoint, error) { + var epochBlock []int + rollupEpochUpdated, err := o.fetchRollupEpochUpdated(o.ctx, start, end) + if err != nil { + return nil, err + } + epochBlock = append(epochBlock, rollupEpochUpdated...) + sequencerSetUpdated, err := o.fetchSequencerSetUpdated(o.ctx, start, end) + if err != nil { + return nil, err + } + epochBlock = append(epochBlock, sequencerSetUpdated...) + sortedBlocks := removeDuplicatesAndSort(epochBlock) + sort.Ints(sortedBlocks) + var changePoints []types.ChangePoint + for _, eb := range sortedBlocks { + header, err := o.l2Client.HeaderByNumber(o.ctx, big.NewInt(int64(eb))) + if err != nil { + return nil, err + } + sequencerSets, err := o.sequencer.GetCurrentSequencerSet(&bind.CallOpts{ + BlockNumber: big.NewInt(int64(eb)), + }) + if err != nil { + return nil, err + } + epochInterval, err := o.gov.RollupEpoch(&bind.CallOpts{ + BlockNumber: header.Number, + }) + if err != nil { + return nil, err + } + changePoint := types.ChangePoint{ + Submitters: sequencerSets, + EpochInterval: epochInterval.Uint64(), + TimeStamp: header.Time, + ChangeType: types.L2ChangePoint, + } + changePoints = append(changePoints, changePoint) + o.ChangeCtx.L2Sequencers = append(o.ChangeCtx.L2Sequencers, types.L2Sequencer{ + Addresses: sequencerSets, + EpochInterval: epochInterval.Uint64(), + TimeStamp: header.Time, + }) + } + return changePoints, nil +} + +func (o *Oracle) recordRollupEpoch() error { + l2Start := o.ChangeCtx.L2synced + 1 + l2Latest, err := o.l2Client.BlockNumber(o.ctx) + if err != nil { + return err + } + l2End := l2Latest + if l2Start+o.rollupEpochMaxBlock < l2Latest { + l2End = l2Start + o.rollupEpochMaxBlock - 1 + } + epochIndex, err := o.record.NextRollupEpochIndex(nil) + if err != nil { + return err + } + lastEpoch, err := o.record.RollupEpochs(nil, epochIndex.Sub(epochIndex, big.NewInt(1))) + if err != nil { + return err + } + //o.PruneChangeCtx() + changePointIndex := 0 + for i, cps := range o.ChangeCtx.ChangePoints { + if cps.TimeStamp > lastEpoch.EndTime.Uint64() { + changePointIndex = i + break + } + } + + // Check if a valid index was found + if changePointIndex > 0 { + // Trim the slice to keep only the points after the found index + o.ChangeCtx.ChangePoints = o.ChangeCtx.ChangePoints[changePointIndex-1:] + } + + startHeader, err := o.l2Client.HeaderByNumber(o.ctx, big.NewInt(int64(l2Start))) + if err != nil { + return err + } + endHeader, err := o.l2Client.HeaderByNumber(o.ctx, big.NewInt(int64(l2End))) + if err != nil { + return err + } + startTime := startHeader.Time + endTime := endHeader.Time + // clean fake point + if o.ChangeCtx.ChangePoints[len(o.ChangeCtx.ChangePoints)-1].ChangeType != types.L1ChangePoint && o.ChangeCtx.ChangePoints[len(o.ChangeCtx.ChangePoints)-1].ChangeType != types.L2ChangePoint { + o.ChangeCtx.ChangePoints = o.ChangeCtx.ChangePoints[:len(o.ChangeCtx.ChangePoints)-1] + } + _, err = o.syncL2ChangePoint(l2Start, l2End) + if err != nil { + return err + } + l1Start := o.ChangeCtx.L1Synced + 1 + l1Latest, err := o.l1Client.BlockNumber(o.ctx) + if err != nil { + return err + } + l1End := l1Latest + if l1Start+o.rollupEpochMaxBlock < l1Latest { + l1End = l1Start + o.rollupEpochMaxBlock - 1 + } + if err = o.syncL1ChangePoint(l1Start, l1End, startTime, endTime); err != nil { + return err + } + // insert a fake change point + if o.ChangeCtx.ChangePoints[len(o.ChangeCtx.ChangePoints)-1].TimeStamp < endTime { + o.ChangeCtx.ChangePoints = append(o.ChangeCtx.ChangePoints, types.ChangePoint{TimeStamp: endTime}) + } + // TODO + + if err = o.db.WriteLatestChangeContext(o.ChangeCtx); err != nil { + return err + } + epochs, err := o.generateEpochs(lastEpoch, endTime) + if err != nil { + return err + } + if len(epochs) == 0 { + time.Sleep(defaultSleepTime) + log.Info("rollup epoch count too small", "startTime", lastEpoch.StartTime, "index", lastEpoch.Index) + return nil + } + log.Info("submit rollup epoch infos", "l1Start", l1Start, "l1End", l1End, "l2Start", l2Start, "l2End", l2End, "infoLength", len(epochs)) + err = o.submitRollupEpoch(epochs) + if err != nil { + if len(epochs) > 50 { + if o.cfg.MinSize*2 <= o.rollupEpochMaxBlock { + o.rollupEpochMaxBlock -= o.cfg.MinSize + } else { + o.rollupEpochMaxBlock = o.rollupEpochMaxBlock / 2 + } + } + return fmt.Errorf("submit rollup epoch info error:%v,rollupEpochMaxBlock:%v", err, o.rollupEpochMaxBlock) + } + if o.rollupEpochMaxBlock+o.cfg.MinSize <= o.cfg.MaxSize { + o.rollupEpochMaxBlock += o.cfg.MinSize + } + + log.Info("submit rollup epoch info success", "rollupEpochMaxBlock", o.rollupEpochMaxBlock) + return nil + +} + +func (o *Oracle) GetSequencerSetsByTime(t uint64) ([]common.Address, error) { + for i, p := range o.ChangeCtx.L2Sequencers { + if i+1 > len(o.ChangeCtx.L2Sequencers) { + break + } + if t > p.TimeStamp && t < o.ChangeCtx.L2Sequencers[i+1].TimeStamp { + return p.Addresses, nil + } + } + return nil, fmt.Errorf("not found") +} + +func (o *Oracle) GetActiveStakersByTime(t uint64) ([]common.Address, error) { + for i, as := range o.ChangeCtx.ActiveStakersByTime { + if i+1 > len(o.ChangeCtx.ActiveStakersByTime) { + break + } + if t > as.TimeStamp && t < o.ChangeCtx.ActiveStakersByTime[i+1].TimeStamp { + return as.Addresses, nil + } + } + return nil, fmt.Errorf("not found") +} + +func findIntersection(arr1, arr2 []common.Address) []common.Address { + addressMap := make(map[common.Address]struct{}) + for _, addr := range arr1 { + addressMap[addr] = struct{}{} + } + intersection := []common.Address{} + for _, addr := range arr2 { + if _, exists := addressMap[addr]; exists { + intersection = append(intersection, addr) + } + } + + return intersection +} + +// InsertL1AndMerge inserts new ChangePoints into the old array while maintaining order +func (o *Oracle) insertL1AndMerge(newPoints []types.ChangePoint) { + for _, newPoint := range newPoints { + // Flag to check if the new element has been inserted + inserted := false + for i := 0; i <= len(o.ChangeCtx.ChangePoints); i++ { + // If we've reached the end of the array or the new element's timestamp is less than the current element + if i == len(o.ChangeCtx.ChangePoints) || newPoint.TimeStamp < o.ChangeCtx.ChangePoints[i].TimeStamp { + // If ChangeType is 1, assign the previous element's Submitters + if newPoint.ChangeType == 1 && i > 0 { + newPoint.Submitters = o.ChangeCtx.ChangePoints[i-1].Submitters + newPoint.EpochInterval = o.ChangeCtx.ChangePoints[i-1].EpochInterval + } + // Insert the new element + o.ChangeCtx.ChangePoints = append(o.ChangeCtx.ChangePoints[:i], append([]types.ChangePoint{newPoint}, o.ChangeCtx.ChangePoints[i:]...)...) + inserted = true + break + } + } + // If the new element wasn't inserted, it means it's the largest element, so append it to the end + if !inserted { + newPoint.Submitters = o.ChangeCtx.ChangePoints[len(o.ChangeCtx.ChangePoints)-1].Submitters + newPoint.EpochInterval = o.ChangeCtx.ChangePoints[len(o.ChangeCtx.ChangePoints)-1].EpochInterval + o.ChangeCtx.ChangePoints = append(o.ChangeCtx.ChangePoints, newPoint) + } + } +} + +func (o *Oracle) insertL2AndMerge(newPoints []types.ChangePoint) error { + if len(newPoints) == 0 { + return nil + } + if newPoints[0].TimeStamp <= o.ChangeCtx.ChangePoints[len(o.ChangeCtx.ChangePoints)-1].TimeStamp { + return fmt.Errorf("invalid l2 newPoint,expect bigger than %v but have %v", + o.ChangeCtx.ChangePoints[len(o.ChangeCtx.ChangePoints)-1].TimeStamp, + newPoints[0].TimeStamp) + } + o.ChangeCtx.ChangePoints = append(o.ChangeCtx.ChangePoints, newPoints...) + return nil +} diff --git a/oracle/oracle/oracle.go b/oracle/oracle/oracle.go index 4ce941bb1..bd2c8f4ab 100644 --- a/oracle/oracle/oracle.go +++ b/oracle/oracle/oracle.go @@ -16,13 +16,15 @@ import ( "morph-l2/bindings/bindings" "morph-l2/bindings/predeploys" "morph-l2/oracle/config" + "morph-l2/oracle/db" "morph-l2/oracle/metrics" + "morph-l2/oracle/types" "github.com/morph-l2/externalsign" "github.com/morph-l2/go-ethereum" "github.com/morph-l2/go-ethereum/accounts/abi" "github.com/morph-l2/go-ethereum/common" - "github.com/morph-l2/go-ethereum/core/types" + coretypes "github.com/morph-l2/go-ethereum/core/types" "github.com/morph-l2/go-ethereum/crypto" "github.com/morph-l2/go-ethereum/ethclient" "github.com/morph-l2/go-ethereum/log" @@ -72,6 +74,7 @@ type Oracle struct { ctx context.Context l1Client *ethclient.Client l2Client *ethclient.Client + l1Staking *bindings.L1Staking l2Staking *bindings.L2Staking sequencer *bindings.Sequencer gov *bindings.Gov @@ -84,12 +87,14 @@ type Oracle struct { cfg *config.Config privKey *ecdsa.PrivateKey externalRsaPriv *rsa.PrivateKey - signer types.Signer + signer coretypes.Signer chainId *big.Int isFinalized bool enable bool rollupEpochMaxBlock uint64 metrics *metrics.Metrics + db *db.Store + ChangeCtx types.ChangeContext } func NewOracle(cfg *config.Config, m *metrics.Metrics) (*Oracle, error) { @@ -130,6 +135,11 @@ func NewOracle(cfg *config.Config, m *metrics.Metrics) (*Oracle, error) { } log.Root().SetHandler(log.LvlFilterHandler(logLevel, logHandler)) + + store, err := db.NewStore(nil, "staking-oracle") + if err != nil { + return nil, err + } l1Client, err := ethclient.Dial(cfg.L1EthRpc) if err != nil { return nil, err @@ -155,6 +165,10 @@ func NewOracle(cfg *config.Config, m *metrics.Metrics) (*Oracle, error) { if err != nil { return nil, err } + l1Staking, err := bindings.NewL1Staking(cfg.L1StakingAddr, l1Client) + if err != nil { + return nil, err + } l2Staking, err := bindings.NewL2Staking(predeploys.L2StakingAddr, l2Client) if err != nil { return nil, err @@ -163,7 +177,7 @@ func NewOracle(cfg *config.Config, m *metrics.Metrics) (*Oracle, error) { if err != nil { return nil, err } - abi, err := bindings.RecordMetaData.GetAbi() + recordAbi, err := bindings.RecordMetaData.GetAbi() if err != nil { return nil, err } @@ -191,17 +205,18 @@ func NewOracle(cfg *config.Config, m *metrics.Metrics) (*Oracle, error) { if err != nil { return nil, fmt.Errorf("parse privkey err:%w", err) } - } - return &Oracle{ l1Client: l1Client, l2Client: l2Client, + db: store, rollup: rollup, + l1Staking: l1Staking, l2Staking: l2Staking, record: record, recordAddr: predeploys.RecordAddr, - recordAbi: abi, + recordAbi: recordAbi, + ChangeCtx: store.ReadLatestChangePoints(), sequencer: sequencer, gov: gov, TmClient: tmClient, @@ -209,7 +224,7 @@ func NewOracle(cfg *config.Config, m *metrics.Metrics) (*Oracle, error) { rewardEpoch: defaultRewardEpoch, privKey: privKey, externalRsaPriv: rsaPriv, - signer: types.LatestSignerForChainID(chainId), + signer: coretypes.LatestSignerForChainID(chainId), chainId: chainId, ctx: context.TODO(), rollupEpochMaxBlock: cfg.MaxSize, @@ -250,7 +265,7 @@ func (o *Oracle) Start() { } -func (o *Oracle) waitReceiptWithCtx(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { +func (o *Oracle) waitReceiptWithCtx(ctx context.Context, txHash common.Hash) (*coretypes.Receipt, error) { t := time.NewTicker(time.Second) for { select { diff --git a/oracle/oracle/rollup.go b/oracle/oracle/rollup.go index 6c9256b09..a928147e6 100644 --- a/oracle/oracle/rollup.go +++ b/oracle/oracle/rollup.go @@ -3,170 +3,83 @@ package oracle import ( "container/list" "context" + "errors" "fmt" - "math" "math/big" - "sort" - "time" "morph-l2/bindings/bindings" "morph-l2/oracle/backoff" + "morph-l2/oracle/types" "github.com/morph-l2/go-ethereum/accounts/abi/bind" - "github.com/morph-l2/go-ethereum/common" - "github.com/morph-l2/go-ethereum/core/types" + coretypes "github.com/morph-l2/go-ethereum/core/types" "github.com/morph-l2/go-ethereum/log" ) -var ( - MaxEpochCount = 50 -) +func (o *Oracle) getLatestUsingL1ChangePoint() types.ChangePoint { + return types.ChangePoint{} +} -type SequencerSetUpdateEpoch struct { - Submitters []common.Address - StartTime *big.Int - EndTime *big.Int - EndBlock *big.Int +func (o *Oracle) getLatestUsingL2ChangePoint() types.ChangePoint { + return types.ChangePoint{} } -func (o *Oracle) generateRollupEpoch(index, startTime, rollupEpoch, updateTime, endBlock, endBlockTime, nextUpdateTime int64, sequencerSets []common.Address) ([]bindings.IRecordRollupEpochInfo, error) { - var rollupEpochInfos []bindings.IRecordRollupEpochInfo - if startTime == 0 { - startTime = updateTime +func (o *Oracle) generateEpochs(lastEpoch bindings.IRecordRollupEpochInfo, syncedEndTime uint64) ([]bindings.IRecordRollupEpochInfo, error) { + // Check that lastEpoch.EndTime is valid + // TODO + if len(o.ChangeCtx.ChangePoints) < 2 { + return nil, fmt.Errorf("invalid change points length,expect >= 2,have %v", len(o.ChangeCtx.ChangePoints)) } - epochsStart := startTime - for { - endTime := startTime + rollupEpoch - if endTime > nextUpdateTime { - endTime = nextUpdateTime - } - rollupEpochInfo := bindings.IRecordRollupEpochInfo{ - Index: big.NewInt(index), - Submitter: sequencerSets[(endTime-updateTime)/rollupEpoch%int64(len(sequencerSets))], - StartTime: big.NewInt(startTime), - EndTime: big.NewInt(endTime), - EndBlock: big.NewInt(endBlock), - } - if endTime > endBlockTime { - break - } - // TODO - if o.rollupEpochMaxBlock == 1 && len(rollupEpochInfos) >= MaxEpochCount { - rollupEpochInfo.EndBlock = big.NewInt(endBlock - 1) - rollupEpochInfos = append(rollupEpochInfos, rollupEpochInfo) - break - } - rollupEpochInfos = append(rollupEpochInfos, rollupEpochInfo) - if endTime == endBlockTime { + if lastEpoch.EndTime.Uint64() < o.ChangeCtx.ChangePoints[0].TimeStamp || lastEpoch.EndTime.Uint64() >= o.ChangeCtx.ChangePoints[len(o.ChangeCtx.ChangePoints)-1].TimeStamp { + return nil, errors.New("lastEpoch.EndTime must be greater than or equal to changePoints[0].TimeStamp") + } + var epochs []bindings.IRecordRollupEpochInfo + startTime := lastEpoch.EndTime.Uint64() + + // Initialize epochIndex starting from lastEpoch.Index + 1 + epochIndex := lastEpoch.Index.Uint64() + 1 + + for i, point := range o.ChangeCtx.ChangePoints { + // Check if the next ChangePoint exists + if i+1 == len(o.ChangeCtx.ChangePoints) { break } - startTime = endTime - index++ - } - log.Info("generate rollup epoch", "startTime", epochsStart, "endBlockTime", endBlockTime, "epochLength", len(rollupEpochInfos)) - return rollupEpochInfos, nil -} + nextPoint := o.ChangeCtx.ChangePoints[i+1] + epochDuration := point.EpochInterval -func (o *Oracle) recordRollupEpoch() error { - epochIndex, err := o.record.NextRollupEpochIndex(nil) - if err != nil { - return err - } - o.metrics.SetRollupEpoch(epochIndex.Uint64() - 1) - rollupEpoch, err := o.record.RollupEpochs(nil, new(big.Int).Sub(epochIndex, big.NewInt(1))) - if err != nil { - return err - } - startBlock := rollupEpoch.EndBlock.Uint64() - blockNumber, err := o.l2Client.BlockNumber(o.ctx) - if err != nil { - return err - } - if startBlock+o.cfg.MinSize >= blockNumber { - log.Info("too few blocks are newer than startBlock", "startBlock", startBlock, "latestBlock", blockNumber, "minSize", o.cfg.MinSize) - time.Sleep(defaultSleepTime) - return nil - } - endBlock := startBlock + o.rollupEpochMaxBlock - if endBlock > blockNumber { - endBlock = blockNumber - } - log.Info("record rollup epoch info start", "startBlock", startBlock, "endBlock", endBlock, "nextEpochIndex", epochIndex, "lastEpochInfo", rollupEpoch) - setsEpochs, err := o.GetSequencerSetsEpoch(startBlock, endBlock) - if err != nil { - return err - } - var rollupEpochInfos []bindings.IRecordRollupEpochInfo - var epochTime *big.Int - if len(setsEpochs) != 0 { - for _, setsEpoch := range setsEpochs { - log.Info("received new sets change", "startTime", setsEpoch.StartTime, "endTime", setsEpoch.EndTime, "endBlock", setsEpoch.EndBlock) - updateTime, err := o.GetUpdateTime(setsEpoch.EndBlock.Int64() - 1) - if err != nil { - return err + // Create epochs until the nextPoint's timestamp is less than endTime + for { + if startTime >= nextPoint.TimeStamp { + break } - epochTime, err = o.gov.RollupEpoch(&bind.CallOpts{ - BlockNumber: big.NewInt(setsEpoch.EndBlock.Int64() - 1), - }) - if err != nil { - return err + endTime := startTime + epochDuration + if endTime > nextPoint.TimeStamp { + if nextPoint.ChangeType == types.L1ChangePoint || nextPoint.ChangeType == types.L2ChangePoint { + endTime = nextPoint.TimeStamp + } else { + break + } } - epochs, err := o.generateRollupEpoch(epochIndex.Int64()+int64(len(rollupEpochInfos)), rollupEpoch.EndTime.Int64(), epochTime.Int64(), updateTime, setsEpoch.EndBlock.Int64(), setsEpoch.EndTime.Int64(), setsEpoch.EndTime.Int64(), setsEpoch.Submitters) - if err != nil { - return err + + // Ensure StartTime and EndTime are valid + epochInfo := bindings.IRecordRollupEpochInfo{ + Index: new(big.Int).SetUint64(epochIndex), // Set the current epoch index + Submitter: point.Submitters[int((startTime-point.TimeStamp)/point.EpochInterval)%len(point.Submitters)], + StartTime: new(big.Int).SetUint64(startTime), // Use SetUint64 for a copy + EndTime: new(big.Int).SetUint64(endTime), // Use SetUint64 for a copy + EndBlock: big.NewInt(0), // Can set the corresponding EndBlock } - rollupEpochInfos = append(rollupEpochInfos, epochs...) - } - } else { - updateTime, err := o.GetUpdateTime(int64(endBlock)) - if err != nil { - return fmt.Errorf("get update time error:%v", err) - } - epochTime, err = o.gov.RollupEpoch(&bind.CallOpts{ - BlockNumber: big.NewInt(int64(endBlock)), - }) - if err != nil { - return fmt.Errorf("get rollup epoch time error:%v", err) - } - header, err := o.l2Client.HeaderByNumber(o.ctx, big.NewInt(int64(endBlock))) - if err != nil { - return fmt.Errorf("get header by number error:%v", err) - } - sets, err := o.sequencer.GetSequencerSet2(&bind.CallOpts{ - BlockNumber: big.NewInt(int64(endBlock)), - }) - if err != nil { - return fmt.Errorf("get sequencer set error:%v", err) - } - epochs, err := o.generateRollupEpoch(epochIndex.Int64(), rollupEpoch.EndTime.Int64(), epochTime.Int64(), updateTime, int64(endBlock), int64(header.Time), math.MaxInt64, sets) - if err != nil { - return fmt.Errorf("gernerate rollup epoch info error:%v", err) - } - rollupEpochInfos = append(rollupEpochInfos, epochs...) - } - if len(rollupEpochInfos) == 0 { - log.Info("rollup epoch infos length is zero", "startBlock", startBlock, "endBlock", endBlock, "rollupEpochMaxBlock", o.rollupEpochMaxBlock, "epochTime", epochTime) - time.Sleep(defaultSleepTime) - return nil - } - log.Info("submit rollup epoch infos", "startBlock", startBlock, "endBlock", endBlock, "infoLength", len(rollupEpochInfos)) - err = o.submitRollupEpoch(rollupEpochInfos) - if err != nil { - if len(rollupEpochInfos) > 50 { - if o.cfg.MinSize*2 <= o.rollupEpochMaxBlock { - o.rollupEpochMaxBlock -= o.cfg.MinSize - } else { - o.rollupEpochMaxBlock = o.rollupEpochMaxBlock / 2 + epochs = append(epochs, epochInfo) + if len(epochs) > types.MaxEpochCount { + return epochs, nil } + // Increment epochIndex for the next epoch + epochIndex++ + // Update startTime for the next epoch + startTime = endTime } - return fmt.Errorf("submit rollup epoch info error:%v,rollupEpochMaxBlock:%v", err, o.rollupEpochMaxBlock) } - if o.rollupEpochMaxBlock+o.cfg.MinSize <= o.cfg.MaxSize { - o.rollupEpochMaxBlock += o.cfg.MinSize - } - - log.Info("submit rollup epoch info success", "rollupEpochMaxBlock", o.rollupEpochMaxBlock) - return nil + return epochs, nil } func (o *Oracle) submitRollupEpoch(epochs []bindings.IRecordRollupEpochInfo) error { @@ -179,7 +92,7 @@ func (o *Oracle) submitRollupEpoch(epochs []bindings.IRecordRollupEpochInfo) err return err } log.Info("send record rollup epoch tx success", "txHash", tx.Hash().Hex(), "nonce", tx.Nonce()) - var receipt *types.Receipt + var receipt *coretypes.Receipt err = backoff.Do(30, backoff.Exponential(), func() error { var err error receipt, err = o.waitReceiptWithCtx(o.ctx, tx.Hash()) @@ -188,7 +101,7 @@ func (o *Oracle) submitRollupEpoch(epochs []bindings.IRecordRollupEpochInfo) err if err != nil { return fmt.Errorf("receipt record rollup epochs error:%v", err) } - if receipt.Status != types.ReceiptStatusSuccessful { + if receipt.Status != coretypes.ReceiptStatusSuccessful { return fmt.Errorf("record rollup epochs not success") } log.Info("wait receipt success", "txHash", tx.Hash()) @@ -221,60 +134,63 @@ func (o *Oracle) GetUpdateTime(blockNumber int64) (int64, error) { return updateTime.Int64(), nil } -func (o *Oracle) GetSequencerSetsEpoch(start, end uint64) ([]SequencerSetUpdateEpoch, error) { - var epochBlock []int - rollupEpochUpdated, err := o.fetchRollupEpochUpdated(o.ctx, start, end) +func (o *Oracle) fetchRollupEpochUpdated(ctx context.Context, start, end uint64) ([]int, error) { + opts := &bind.FilterOpts{ + Context: ctx, + Start: start, + End: &end, + } + iter, err := o.gov.FilterRollupEpochUpdated(opts) if err != nil { return nil, err } - epochBlock = append(epochBlock, rollupEpochUpdated...) - sequencerSetUpdated, err := o.fetchSequencerSetUpdated(o.ctx, start, end) + defer func() { + if err := iter.Close(); err != nil { + log.Info("GovRollupEpochUpdatedIterator close failed", "error", err) + } + }() + var blocks []int + for iter.Next() { + blocks = append(blocks, int(iter.Event.Raw.BlockNumber)) + } + return blocks, nil +} + +func (o *Oracle) fetchSequencerSetUpdated(ctx context.Context, start, end uint64) ([]int, error) { + opts := &bind.FilterOpts{ + Context: ctx, + Start: start, + End: &end, + } + iter, err := o.sequencer.FilterSequencerSetUpdated(opts) if err != nil { return nil, err } - epochBlock = append(epochBlock, sequencerSetUpdated...) - sortedBlocks := removeDuplicatesAndSort(epochBlock) - sort.Ints(sortedBlocks) - var setsEpochInfos []SequencerSetUpdateEpoch - for _, eb := range sortedBlocks { - header, err := o.l2Client.HeaderByNumber(o.ctx, big.NewInt(int64(eb))) - if err != nil { - return nil, err - } - sequencerSets, err := o.sequencer.GetSequencerSet2(&bind.CallOpts{ - BlockNumber: big.NewInt(int64(eb - 1)), - }) - if err != nil { - return nil, err - } - lastTime, err := o.GetUpdateTime(header.Number.Int64() - 1) - if err != nil { - return nil, err - } - epochInfo := SequencerSetUpdateEpoch{ - Submitters: sequencerSets, - StartTime: big.NewInt(lastTime), - EndTime: big.NewInt(int64(header.Time)), - EndBlock: header.Number, + defer func() { + if err := iter.Close(); err != nil { + log.Info("SequencerSequencerSetUpdatedIterator close failed", "error", err) } - setsEpochInfos = append(setsEpochInfos, epochInfo) + }() + var blocks []int + for iter.Next() { + blocks = append(blocks, int(iter.Event.Raw.BlockNumber)) } - return setsEpochInfos, nil + return blocks, nil } -func (o *Oracle) fetchRollupEpochUpdated(ctx context.Context, start, end uint64) ([]int, error) { +func (o *Oracle) fetchL1StakerRemoved(ctx context.Context, start, end uint64) ([]int, error) { opts := &bind.FilterOpts{ Context: ctx, Start: start, End: &end, } - iter, err := o.gov.FilterRollupEpochUpdated(opts) + iter, err := o.l1Staking.FilterStakersRemoved(opts) if err != nil { return nil, err } defer func() { if err := iter.Close(); err != nil { - log.Info("GovRollupEpochUpdatedIterator close failed", "error", err) + log.Info("L1StakingStakersRemovedIterator close failed", "error", err) } }() var blocks []int @@ -284,19 +200,19 @@ func (o *Oracle) fetchRollupEpochUpdated(ctx context.Context, start, end uint64) return blocks, nil } -func (o *Oracle) fetchSequencerSetUpdated(ctx context.Context, start, end uint64) ([]int, error) { +func (o *Oracle) fetchL1StakerAdded(ctx context.Context, start, end uint64) ([]int, error) { opts := &bind.FilterOpts{ Context: ctx, Start: start, End: &end, } - iter, err := o.sequencer.FilterSequencerSetUpdated(opts) + iter, err := o.l1Staking.FilterRegistered(opts) if err != nil { return nil, err } defer func() { if err := iter.Close(); err != nil { - log.Info("SequencerSequencerSetUpdatedIterator close failed", "error", err) + log.Info("L1StakingRegisteredIterator close failed", "error", err) } }() var blocks []int diff --git a/oracle/types/types.go b/oracle/types/types.go new file mode 100644 index 000000000..8485f6cfe --- /dev/null +++ b/oracle/types/types.go @@ -0,0 +1,54 @@ +package types + +import ( + "errors" + "github.com/morph-l2/go-ethereum/common" + "math/big" +) + +var ( + MaxEpochCount = 50 +) + +var ErrMemoryDBNotFound = errors.New("not found") + +const ( + L1ChangePoint = iota + 1 + L2ChangePoint +) + +type SequencerSetUpdateEpoch struct { + Submitters []common.Address + StartTime *big.Int + EndTime *big.Int + EndBlock *big.Int +} + +// TODO +type ChangePoint struct { + TimeStamp uint64 + BlockNumber uint64 + EpochInterval uint64 + Submitters []common.Address + ChangeType uint64 +} + +type L2Sequencer struct { + TimeStamp uint64 + EpochInterval uint64 + Addresses []common.Address +} + +type ActiveStakers struct { + TimeStamp uint64 + BlockNumber uint64 + Addresses []common.Address +} + +type ChangeContext struct { + L2Sequencers []L2Sequencer + ActiveStakersByTime []ActiveStakers + ChangePoints []ChangePoint + L1Synced uint64 + L2synced uint64 +} From c393ab490f040de7f16196ba87c7420528a69131 Mon Sep 17 00:00:00 2001 From: curryxbo Date: Thu, 19 Sep 2024 16:15:25 +0800 Subject: [PATCH 2/2] fix --- oracle/config/config.go | 4 + oracle/flags/flags.go | 7 + oracle/oracle/batch.go | 31 +--- oracle/oracle/change_point.go | 43 ++--- oracle/oracle/manager.go | 314 ++++++++++++++++++++++++++++++++++ oracle/oracle/oracle.go | 30 +++- oracle/oracle/reward.go | 27 +-- oracle/oracle/rollup.go | 30 +--- 8 files changed, 379 insertions(+), 107 deletions(-) create mode 100644 oracle/oracle/manager.go diff --git a/oracle/config/config.go b/oracle/config/config.go index 45405e4c6..fa43ff8ff 100644 --- a/oracle/config/config.go +++ b/oracle/config/config.go @@ -72,6 +72,8 @@ type Config struct { ExternalSignChain string ExternalSignUrl string ExternalSignRsaPriv string + + MockRecord bool } // NewConfig parses the Config from the provided flags or environment variables. @@ -99,6 +101,8 @@ func NewConfig(ctx *cli.Context) (Config, error) { ExternalSignChain: ctx.GlobalString(flags.ExternalSignChain.Name), ExternalSignUrl: ctx.GlobalString(flags.ExternalSignUrl.Name), ExternalSignRsaPriv: ctx.GlobalString(flags.ExternalSignRsaPriv.Name), + // mock flag + MockRecord: ctx.GlobalBool(flags.MockRecordFlag.Name), } if ctx.GlobalIsSet(flags.LogFilenameFlag.Name) { diff --git a/oracle/flags/flags.go b/oracle/flags/flags.go index daaef255f..554a658a2 100644 --- a/oracle/flags/flags.go +++ b/oracle/flags/flags.go @@ -148,6 +148,11 @@ var ( Usage: "The rsa private key of the external sign", EnvVar: prefixEnvVar("EXTERNAL_SIGN_RSA_PRIV"), } + MockRecordFlag = cli.BoolFlag{ + Name: "MOCK_RECORD", + Usage: "mock record client", + EnvVar: prefixEnvVar("MOCK_RECORD"), + } ) var requiredFlags = []cli.Flag{ @@ -179,6 +184,8 @@ var optionalFlags = []cli.Flag{ ExternalSignChain, ExternalSignUrl, ExternalSignRsaPriv, + // mock record client + MockRecordFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/oracle/oracle/batch.go b/oracle/oracle/batch.go index 1e83c37d6..f2dc0e3c9 100644 --- a/oracle/oracle/batch.go +++ b/oracle/oracle/batch.go @@ -8,16 +8,14 @@ import ( "math/big" "time" - "morph-l2/bindings/bindings" - "morph-l2/node/derivation" - "morph-l2/oracle/backoff" - "github.com/morph-l2/go-ethereum/accounts/abi/bind" "github.com/morph-l2/go-ethereum/common" "github.com/morph-l2/go-ethereum/common/hexutil" "github.com/morph-l2/go-ethereum/core/types" "github.com/morph-l2/go-ethereum/eth" "github.com/morph-l2/go-ethereum/log" + "morph-l2/bindings/bindings" + "morph-l2/node/derivation" ) type BatchInfoMap map[common.Hash][]BatchInfo @@ -195,7 +193,7 @@ func (o *Oracle) LastBatchIndex(opts *bind.CallOpts) (*big.Int, error) { } func (o *Oracle) submitRecord() error { - nextBatchSubmissionIndex, err := o.GetNextBatchSubmissionIndex() + nextBatchSubmissionIndex, err := o.rm.NextBatchEpochIndex() if err != nil { return fmt.Errorf("get next batch submission index failed:%v", err) } @@ -218,26 +216,5 @@ func (o *Oracle) submitRecord() error { if err != nil { return fmt.Errorf("get batch submission error:%v", err) } - callData, err := o.recordAbi.Pack("recordFinalizedBatchSubmissions", batchSubmissions) - if err != nil { - return err - } - tx, err := o.newRecordTxAndSign(callData) - if err != nil { - return fmt.Errorf("record finalized batch error:%v,batchLength:%v", err, len(batchSubmissions)) - } - log.Info("record finalized batch success", "txHash", tx.Hash(), "batchLength", len(batchSubmissions)) - var receipt *types.Receipt - err = backoff.Do(30, backoff.Exponential(), func() error { - var err error - receipt, err = o.waitReceiptWithCtx(o.ctx, tx.Hash()) - return err - }) - if err != nil { - return fmt.Errorf("wait tx receipt error:%v,txHash:%v", err, tx.Hash()) - } - if receipt.Status != types.ReceiptStatusSuccessful { - return fmt.Errorf("record batch receipt failed,txHash:%v", tx.Hash()) - } - return nil + return o.rm.UploadBatchEpoch(batchSubmissions) } diff --git a/oracle/oracle/change_point.go b/oracle/oracle/change_point.go index c8ed8b825..6ca49a06a 100644 --- a/oracle/oracle/change_point.go +++ b/oracle/oracle/change_point.go @@ -13,15 +13,6 @@ import ( "github.com/morph-l2/go-ethereum/log" ) -//func (o *Oracle) queryActiveStakersByTime(timestamp uint64) ([]common.Address, error) { -// for _, activeStakers := range o.ChangeCtx.ActiveStakersByTime { -// if timestamp >= activeStakers.TimeStamp { -// return activeStakers.Addresses, nil -// } -// } -// return nil, fmt.Errorf("not found,invalid timestamp,expect bigger or equal than %v but have :%v ", o.ChangeCtx.ActiveStakersByTime[0].TimeStamp, timestamp) -//} - func (o *Oracle) syncL1ChangePoint(start, end, startTime, endTime uint64) error { var epochBlock []int stakerAddedPoint, err := o.fetchL1StakerAdded(o.ctx, start, end) @@ -153,18 +144,23 @@ func (o *Oracle) recordRollupEpoch() error { if l2Start+o.rollupEpochMaxBlock < l2Latest { l2End = l2Start + o.rollupEpochMaxBlock - 1 } - epochIndex, err := o.record.NextRollupEpochIndex(nil) + lastEpoch, err := o.rm.LatestRollupEpoch() if err != nil { return err } - lastEpoch, err := o.record.RollupEpochs(nil, epochIndex.Sub(epochIndex, big.NewInt(1))) - if err != nil { - return err + + if lastEpoch.Index == nil { + header, err := o.l2Client.HeaderByNumber(o.ctx, big.NewInt(1)) + if err != nil { + return fmt.Errorf("quert first block header error:%v", err) + } + lastEpoch.EndTime = big.NewInt(int64(header.Time)) + lastEpoch.Index = big.NewInt(0) } //o.PruneChangeCtx() changePointIndex := 0 - for i, cps := range o.ChangeCtx.ChangePoints { - if cps.TimeStamp > lastEpoch.EndTime.Uint64() { + for i, cp := range o.ChangeCtx.ChangePoints { + if cp.TimeStamp > lastEpoch.EndTime.Uint64() { changePointIndex = i break } @@ -186,6 +182,11 @@ func (o *Oracle) recordRollupEpoch() error { } startTime := startHeader.Time endTime := endHeader.Time + if endTime-startTime < o.ChangeCtx.ChangePoints[0].EpochInterval*uint64(types.MaxEpochCount) { + time.Sleep(time.Duration(o.ChangeCtx.ChangePoints[0].EpochInterval*uint64(types.MaxEpochCount)) * time.Second) + log.Info("Too few epochs,wait... ", "startTime", startTime, "endTime", endTime) + return nil + } // clean fake point if o.ChangeCtx.ChangePoints[len(o.ChangeCtx.ChangePoints)-1].ChangeType != types.L1ChangePoint && o.ChangeCtx.ChangePoints[len(o.ChangeCtx.ChangePoints)-1].ChangeType != types.L2ChangePoint { o.ChangeCtx.ChangePoints = o.ChangeCtx.ChangePoints[:len(o.ChangeCtx.ChangePoints)-1] @@ -225,7 +226,7 @@ func (o *Oracle) recordRollupEpoch() error { return nil } log.Info("submit rollup epoch infos", "l1Start", l1Start, "l1End", l1End, "l2Start", l2Start, "l2End", l2End, "infoLength", len(epochs)) - err = o.submitRollupEpoch(epochs) + err = o.rm.UploadRollupEpoch(epochs) if err != nil { if len(epochs) > 50 { if o.cfg.MinSize*2 <= o.rollupEpochMaxBlock { @@ -234,13 +235,13 @@ func (o *Oracle) recordRollupEpoch() error { o.rollupEpochMaxBlock = o.rollupEpochMaxBlock / 2 } } - return fmt.Errorf("submit rollup epoch info error:%v,rollupEpochMaxBlock:%v", err, o.rollupEpochMaxBlock) - } - if o.rollupEpochMaxBlock+o.cfg.MinSize <= o.cfg.MaxSize { - o.rollupEpochMaxBlock += o.cfg.MinSize + return fmt.Errorf("submit rollup epoch info error:%v", err) } + //if o.rollupEpochMaxBlock+o.cfg.MinSize <= o.cfg.MaxSize { + // o.rollupEpochMaxBlock += o.cfg.MinSize + //} - log.Info("submit rollup epoch info success", "rollupEpochMaxBlock", o.rollupEpochMaxBlock) + log.Info("submit rollup epoch info success") return nil } diff --git a/oracle/oracle/manager.go b/oracle/oracle/manager.go new file mode 100644 index 000000000..f1896f0af --- /dev/null +++ b/oracle/oracle/manager.go @@ -0,0 +1,314 @@ +package oracle + +import ( + "context" + "crypto/ecdsa" + "crypto/rsa" + "errors" + "fmt" + "math/big" + "time" + + "github.com/morph-l2/externalsign" + "github.com/morph-l2/go-ethereum" + "github.com/morph-l2/go-ethereum/accounts/abi" + "github.com/morph-l2/go-ethereum/common" + "github.com/morph-l2/go-ethereum/core/types" + coretypes "github.com/morph-l2/go-ethereum/core/types" + "github.com/morph-l2/go-ethereum/crypto" + "github.com/morph-l2/go-ethereum/ethclient" + "github.com/morph-l2/go-ethereum/log" + + "morph-l2/bindings/bindings" + "morph-l2/bindings/predeploys" + "morph-l2/oracle/backoff" + "morph-l2/oracle/config" +) + +type RecordManager interface { + UploadRollupEpoch(recordRollupEpochInfos []bindings.IRecordRollupEpochInfo) error + LatestRollupEpoch() (*bindings.IRecordRollupEpochInfo, error) + UploadRewardsEpoch(recordRewardsEpochInfos []bindings.IRecordRewardEpochInfo) error + NextRewardEpochIndex() (*big.Int, error) + UploadBatchEpoch(recordBatchSubmissions []bindings.IRecordBatchSubmission) error + NextBatchEpochIndex() (*big.Int, error) +} + +type RecordClient struct { + l2Client *ethclient.Client + record *bindings.Record + recordAddr common.Address + recordAbi *abi.ABI + ctx context.Context + + privKey *ecdsa.PrivateKey + externalRsaPriv *rsa.PrivateKey + signer coretypes.Signer + cfg *config.Config + chainId *big.Int +} + +func NewRecordClient( + l2Client *ethclient.Client, + record *bindings.Record, + recordAddr common.Address, + recordAbi *abi.ABI, + ctx context.Context, + privKey *ecdsa.PrivateKey, + externalRsaPriv *rsa.PrivateKey, + signer coretypes.Signer, + cfg *config.Config, + chainId *big.Int, +) RecordManager { + return &RecordClient{ + l2Client: l2Client, + record: record, + recordAddr: recordAddr, + recordAbi: recordAbi, + ctx: ctx, + privKey: privKey, + externalRsaPriv: externalRsaPriv, + signer: signer, + cfg: cfg, + chainId: chainId, + } +} + +func (r *RecordClient) UploadRollupEpoch(epochs []bindings.IRecordRollupEpochInfo) error { + callData, err := r.recordAbi.Pack("recordRollupEpochs", epochs) + if err != nil { + return err + } + tx, err := r.newRecordTxAndSign(callData) + if err != nil { + return err + } + log.Info("send record rollup epoch tx success", "txHash", tx.Hash().Hex(), "nonce", tx.Nonce()) + var receipt *coretypes.Receipt + err = backoff.Do(30, backoff.Exponential(), func() error { + var err error + receipt, err = r.waitReceiptWithCtx(r.ctx, tx.Hash()) + return err + }) + if err != nil { + return fmt.Errorf("receipt record rollup epochs error:%v", err) + } + if receipt.Status != coretypes.ReceiptStatusSuccessful { + return fmt.Errorf("record rollup epochs not success") + } + log.Info("wait receipt success", "txHash", tx.Hash()) + return nil +} + +func (r *RecordClient) LatestRollupEpoch() (*bindings.IRecordRollupEpochInfo, error) { + epochIndex, err := r.record.NextRollupEpochIndex(nil) + if err != nil { + return nil, err + } + lastEpoch, err := r.record.RollupEpochs(nil, epochIndex.Sub(epochIndex, big.NewInt(1))) + if err != nil { + return nil, err + } + return &bindings.IRecordRollupEpochInfo{ + Index: lastEpoch.Index, + Submitter: lastEpoch.Submitter, + StartTime: lastEpoch.StartTime, + EndTime: lastEpoch.EndTime, + EndBlock: lastEpoch.EndBlock, + }, nil +} + +func (r *RecordClient) UploadRewardsEpoch(recordRewardsEpochInfos []bindings.IRecordRewardEpochInfo) error { + callData, err := r.recordAbi.Pack("recordRewardEpochs", recordRewardsEpochInfos) + if err != nil { + return err + } + tx, err := r.newRecordTxAndSign(callData) + if err != nil { + return fmt.Errorf("record reward epochs error:%v", err) + } + err = r.l2Client.SendTransaction(r.ctx, tx) + if err != nil { + return fmt.Errorf("send transaction error:%v", err) + } + log.Info("send record reward tx success", "txHash", tx.Hash().Hex(), "nonce", tx.Nonce()) + var receipt *types.Receipt + err = backoff.Do(30, backoff.Exponential(), func() error { + var err error + receipt, err = r.waitReceiptWithCtx(r.ctx, tx.Hash()) + return err + }) + if err != nil { + return fmt.Errorf("receipt record reward epochs error:%v", err) + } + if receipt.Status != types.ReceiptStatusSuccessful { + return fmt.Errorf("record reward epochs not success") + } + return nil +} + +func (r *RecordClient) newRecordTxAndSign(callData []byte) (*types.Transaction, error) { + from := common.HexToAddress(r.cfg.ExternalSignAddress) + if !r.cfg.ExternalSign { + from = crypto.PubkeyToAddress(r.privKey.PublicKey) + } + nonce, err := r.l2Client.NonceAt(r.ctx, from, nil) + if err != nil { + return nil, err + } + // tip and cap + tip, err := r.l2Client.SuggestGasTipCap(r.ctx) + if err != nil { + return nil, err + } + head, err := r.l2Client.HeaderByNumber(r.ctx, nil) + if err != nil { + return nil, err + } + var gasFeeCap *big.Int + if head.BaseFee != nil { + gasFeeCap = new(big.Int).Add( + tip, + new(big.Int).Mul(head.BaseFee, big.NewInt(2)), + ) + } else { + gasFeeCap = new(big.Int).Set(tip) + } + gas, err := r.l2Client.EstimateGas(r.ctx, ethereum.CallMsg{ + From: from, + To: &predeploys.RecordAddr, + GasFeeCap: gasFeeCap, + GasTipCap: tip, + Data: callData, + }) + if err != nil { + return nil, err + } + return r.sign(types.NewTx(&types.DynamicFeeTx{ + ChainID: r.chainId, + Nonce: nonce, + GasTipCap: tip, + GasFeeCap: gasFeeCap, + Gas: gas, + To: &r.recordAddr, + Data: callData})) +} + +func (r *RecordClient) sign(tx *types.Transaction) (*types.Transaction, error) { + if r.cfg.ExternalSign { + if externalSigner == nil { + externalSigner = externalsign.NewExternalSign(r.cfg.ExternalSignAppid, r.externalRsaPriv, r.cfg.ExternalSignAddress, r.cfg.ExternalSignChain, r.signer) + } + signedTx, err := externalSigner.RequestSign(r.cfg.ExternalSignUrl, tx) + if err != nil { + return nil, fmt.Errorf("externalsign sign tx error:%v", err) + } + return signedTx, nil + } else { + signedTx, err := types.SignTx(tx, r.signer, r.privKey) + if err != nil { + return nil, fmt.Errorf("sign tx error:%v", err) + } + return signedTx, nil + + } +} + +func (r *RecordClient) waitReceiptWithCtx(ctx context.Context, txHash common.Hash) (*coretypes.Receipt, error) { + t := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + case <-t.C: + receipt, err := r.l2Client.TransactionReceipt(r.ctx, txHash) + if errors.Is(err, ethereum.NotFound) { + continue + } + if err != nil { + return nil, err + } + if receipt != nil { + t.Stop() + return receipt, nil + } + } + } +} + +func (r *RecordClient) NextRewardEpochIndex() (*big.Int, error) { + return r.record.NextRewardEpochIndex(nil) + +} + +func (r *RecordClient) UploadBatchEpoch(batchSubmissions []bindings.IRecordBatchSubmission) error { + callData, err := r.recordAbi.Pack("recordFinalizedBatchSubmissions", batchSubmissions) + if err != nil { + return err + } + tx, err := r.newRecordTxAndSign(callData) + if err != nil { + return fmt.Errorf("record finalized batch error:%v,batchLength:%v", err, len(batchSubmissions)) + } + log.Info("record finalized batch success", "txHash", tx.Hash(), "batchLength", len(batchSubmissions)) + var receipt *types.Receipt + err = backoff.Do(30, backoff.Exponential(), func() error { + var err error + receipt, err = r.waitReceiptWithCtx(r.ctx, tx.Hash()) + return err + }) + if err != nil { + return fmt.Errorf("wait tx receipt error:%v,txHash:%v", err, tx.Hash()) + } + if receipt.Status != types.ReceiptStatusSuccessful { + return fmt.Errorf("record batch receipt failed,txHash:%v", tx.Hash()) + } + return nil +} + +func (r *RecordClient) NextBatchEpochIndex() (*big.Int, error) { + nextBatchSubmissionIndex, err := r.record.NextBatchSubmissionIndex(nil) + if err != nil { + return nil, fmt.Errorf("get next batch submission index failed:%v", err) + } + return nextBatchSubmissionIndex, nil +} + +type MockClient struct { + recordRollupEpochInfo bindings.IRecordRollupEpochInfo + nextRewardEpochIndex *big.Int + batchEpochIndex *big.Int +} + +func (m *MockClient) UploadRollupEpoch(recordRollupEpochInfos []bindings.IRecordRollupEpochInfo) error { + m.recordRollupEpochInfo = recordRollupEpochInfos[len(recordRollupEpochInfos)-1] + return nil +} + +func (m *MockClient) LatestRollupEpoch() (*bindings.IRecordRollupEpochInfo, error) { + return nil, nil +} + +func (m *MockClient) UploadRewardsEpoch(recordRewardsEpochInfos []bindings.IRecordRewardEpochInfo) error { + return nil +} + +func (m *MockClient) NextRewardEpochIndex() (*big.Int, error) { + if m.nextRewardEpochIndex == nil { + return big.NewInt(1), nil + } + + return m.nextRewardEpochIndex, nil +} + +func (m *MockClient) UploadBatchEpoch(recordBatchSubmissions []bindings.IRecordBatchSubmission) error { + m.batchEpochIndex = recordBatchSubmissions[len(recordBatchSubmissions)-1].Index + return nil +} + +func (m *MockClient) NextBatchEpochIndex() (*big.Int, error) { + if m.batchEpochIndex == nil { + return big.NewInt(1), nil + } + return m.batchEpochIndex, nil +} diff --git a/oracle/oracle/oracle.go b/oracle/oracle/oracle.go index 733ee89b3..a70861797 100644 --- a/oracle/oracle/oracle.go +++ b/oracle/oracle/oracle.go @@ -6,7 +6,6 @@ import ( "crypto/rsa" "errors" "fmt" - "io" "math/big" "os" @@ -88,6 +87,7 @@ type Oracle struct { privKey *ecdsa.PrivateKey externalRsaPriv *rsa.PrivateKey signer coretypes.Signer + rm RecordManager chainId *big.Int isFinalized bool enable bool @@ -206,6 +206,27 @@ func NewOracle(cfg *config.Config, m *metrics.Metrics) (*Oracle, error) { return nil, fmt.Errorf("parse privkey err:%w", err) } } + ctx := context.TODO() + recordAddr := predeploys.RecordAddr + signer := coretypes.LatestSignerForChainID(chainId) + var recordManager RecordManager + if cfg.MockRecord { + + } else { + recordManager = NewRecordClient( + l2Client, + record, + recordAddr, + recordAbi, + ctx, + privKey, + rsaPriv, + signer, + cfg, + chainId, + ) + } + return &Oracle{ l1Client: l1Client, l2Client: l2Client, @@ -214,8 +235,9 @@ func NewOracle(cfg *config.Config, m *metrics.Metrics) (*Oracle, error) { l1Staking: l1Staking, l2Staking: l2Staking, record: record, - recordAddr: predeploys.RecordAddr, + recordAddr: recordAddr, recordAbi: recordAbi, + rm: recordManager, ChangeCtx: store.ReadLatestChangePoints(), sequencer: sequencer, gov: gov, @@ -224,9 +246,9 @@ func NewOracle(cfg *config.Config, m *metrics.Metrics) (*Oracle, error) { rewardEpoch: defaultRewardEpoch, privKey: privKey, externalRsaPriv: rsaPriv, - signer: coretypes.LatestSignerForChainID(chainId), + signer: signer, chainId: chainId, - ctx: context.TODO(), + ctx: ctx, rollupEpochMaxBlock: cfg.MaxSize, metrics: m, }, nil diff --git a/oracle/oracle/reward.go b/oracle/oracle/reward.go index f67ad451e..bfed781e6 100644 --- a/oracle/oracle/reward.go +++ b/oracle/oracle/reward.go @@ -72,32 +72,7 @@ func (o *Oracle) syncRewardEpoch() error { if err != nil { return err } - callData, err := o.recordAbi.Pack("recordRewardEpochs", []bindings.IRecordRewardEpochInfo{*recordRewardEpochInfo}) - if err != nil { - return err - } - tx, err := o.newRecordTxAndSign(callData) - if err != nil { - return fmt.Errorf("record reward epochs error:%v", err) - } - err = o.l2Client.SendTransaction(o.ctx, tx) - if err != nil { - return fmt.Errorf("send transaction error:%v", err) - } - log.Info("send record reward tx success", "txHash", tx.Hash().Hex(), "nonce", tx.Nonce()) - var receipt *types.Receipt - err = backoff.Do(30, backoff.Exponential(), func() error { - var err error - receipt, err = o.waitReceiptWithCtx(o.ctx, tx.Hash()) - return err - }) - if err != nil { - return fmt.Errorf("receipt record reward epochs error:%v", err) - } - if receipt.Status != types.ReceiptStatusSuccessful { - return fmt.Errorf("record reward epochs not success") - } - return nil + return o.rm.UploadRewardsEpoch([]bindings.IRecordRewardEpochInfo{*recordRewardEpochInfo}) } func (o *Oracle) getRewardEpochs(startRewardEpochIndex, startHeight *big.Int) (*bindings.IRecordRewardEpochInfo, error) { diff --git a/oracle/oracle/rollup.go b/oracle/oracle/rollup.go index a928147e6..c8ece983c 100644 --- a/oracle/oracle/rollup.go +++ b/oracle/oracle/rollup.go @@ -8,11 +8,9 @@ import ( "math/big" "morph-l2/bindings/bindings" - "morph-l2/oracle/backoff" "morph-l2/oracle/types" "github.com/morph-l2/go-ethereum/accounts/abi/bind" - coretypes "github.com/morph-l2/go-ethereum/core/types" "github.com/morph-l2/go-ethereum/log" ) @@ -24,7 +22,7 @@ func (o *Oracle) getLatestUsingL2ChangePoint() types.ChangePoint { return types.ChangePoint{} } -func (o *Oracle) generateEpochs(lastEpoch bindings.IRecordRollupEpochInfo, syncedEndTime uint64) ([]bindings.IRecordRollupEpochInfo, error) { +func (o *Oracle) generateEpochs(lastEpoch *bindings.IRecordRollupEpochInfo, syncedEndTime uint64) ([]bindings.IRecordRollupEpochInfo, error) { // Check that lastEpoch.EndTime is valid // TODO if len(o.ChangeCtx.ChangePoints) < 2 { @@ -82,32 +80,6 @@ func (o *Oracle) generateEpochs(lastEpoch bindings.IRecordRollupEpochInfo, synce return epochs, nil } -func (o *Oracle) submitRollupEpoch(epochs []bindings.IRecordRollupEpochInfo) error { - callData, err := o.recordAbi.Pack("recordRollupEpochs", epochs) - if err != nil { - return err - } - tx, err := o.newRecordTxAndSign(callData) - if err != nil { - return err - } - log.Info("send record rollup epoch tx success", "txHash", tx.Hash().Hex(), "nonce", tx.Nonce()) - var receipt *coretypes.Receipt - err = backoff.Do(30, backoff.Exponential(), func() error { - var err error - receipt, err = o.waitReceiptWithCtx(o.ctx, tx.Hash()) - return err - }) - if err != nil { - return fmt.Errorf("receipt record rollup epochs error:%v", err) - } - if receipt.Status != coretypes.ReceiptStatusSuccessful { - return fmt.Errorf("record rollup epochs not success") - } - log.Info("wait receipt success", "txHash", tx.Hash()) - return nil -} - func (o *Oracle) GetUpdateTime(blockNumber int64) (int64, error) { updateTime, err := o.sequencer.UpdateTime(&bind.CallOpts{ BlockNumber: big.NewInt(blockNumber),