Skip to content

Commit

Permalink
Cherry pick data stream modifications from 0.7.0 into develop (0xPoly…
Browse files Browse the repository at this point in the history
…gonHermez#3671)

* Change data stream format (0xPolygonHermez#3597)

* protobuf datastream

* Update DS Format (0xPolygonHermez#3608)

* protobuf datastream

* proto batch end (0xPolygonHermez#3612)

* fix genesis DS (0xPolygonHermez#3615)

* Fix DSSendL2Block batch number (0xPolygonHermez#3617)

* Fix DSSendL2Block batch number

* latest proto (0xPolygonHermez#3620)

* Use Eth block hash for l2 blocks in data stream (0xPolygonHermez#3661)

* Use Eth block hash for l2 blocks in data stream

* handle minTimestamp

* empty imStateRoot (0xPolygonHermez#3663)

* empty imStateRoot

* fix comment

* fix docker-compose

* remove commented code
  • Loading branch information
ToniRamirezM authored Jun 4, 2024
1 parent 83ccc8e commit d34c824
Show file tree
Hide file tree
Showing 25 changed files with 2,033 additions and 939 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ generate-code-from-proto: ## Generates code from proto files
cd proto/src/proto/hashdb/v1 && protoc --proto_path=. --proto_path=../../../../include --go_out=../../../../../merkletree/hashdb --go-grpc_out=../../../../../merkletree/hashdb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative hashdb.proto
cd proto/src/proto/executor/v1 && protoc --proto_path=. --go_out=../../../../../state/runtime/executor --go-grpc_out=../../../../../state/runtime/executor --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative executor.proto
cd proto/src/proto/aggregator/v1 && protoc --proto_path=. --proto_path=../../../../include --go_out=../../../../../aggregator/prover --go-grpc_out=../../../../../aggregator/prover --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative aggregator.proto
cd proto/src/proto/datastream/v1 && protoc --proto_path=. --proto_path=../../../../include --go_out=../../../../../state/datastream --go-grpc_out=../../../../../state/datastream --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative datastream.proto

## Help display.
## Pulls comments from beside commands and prints a nicely formatted
Expand Down
89 changes: 89 additions & 0 deletions proto/src/proto/datastream/v1/datastream.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
syntax = "proto3";

package datastream.v1;

option go_package = "github.com/0xPolygonHermez/zkevm-node/state/datastream";

message BatchStart {
uint64 number = 1;
BatchType type = 2;
uint64 fork_id = 3;
uint64 chain_id = 4;
Debug debug = 5;
}

message BatchEnd {
uint64 number = 1;
bytes local_exit_root = 2;
bytes state_root = 3;
Debug debug = 4;
}

message L2Block {
uint64 number = 1;
uint64 batch_number = 2;
uint64 timestamp = 3;
uint32 delta_timestamp = 4;
uint64 min_timestamp = 5;
bytes l1_blockhash = 6;
uint32 l1_infotree_index = 7;
bytes hash = 8;
bytes state_root = 9;
bytes global_exit_root = 10;
bytes coinbase = 11;
uint64 block_gas_limit = 12;
bytes block_info_root = 13;
Debug debug = 14;
}

message Transaction {
uint64 l2block_number = 1;
uint64 index = 2;
bool is_valid = 3;
bytes encoded = 4;
uint32 effective_gas_price_percentage = 5;
bytes im_state_root = 6;
Debug debug = 7;
}

message UpdateGER {
uint64 batch_number = 1;
uint64 timestamp = 2;
bytes global_exit_root = 3;
bytes coinbase = 4;
uint64 fork_id = 5;
uint64 chain_id = 6;
bytes state_root = 7;
Debug debug = 8;
}

message BookMark {
BookmarkType type = 1;
uint64 value = 2;
}

message Debug {
string message = 1;
}

enum BookmarkType {
BOOKMARK_TYPE_UNSPECIFIED = 0;
BOOKMARK_TYPE_BATCH = 1;
BOOKMARK_TYPE_L2_BLOCK = 2;
}

enum EntryType {
ENTRY_TYPE_UNSPECIFIED = 0;
ENTRY_TYPE_BATCH_START = 1;
ENTRY_TYPE_L2_BLOCK = 2;
ENTRY_TYPE_TRANSACTION = 3;
ENTRY_TYPE_BATCH_END = 4;
ENTRY_TYPE_UPDATE_GER = 5;
}

enum BatchType {
BATCH_TYPE_UNSPECIFIED = 0;
BATCH_TYPE_REGULAR = 1;
BATCH_TYPE_FORCED = 2;
BATCH_TYPE_INJECTED = 3;
}
8 changes: 8 additions & 0 deletions sequencer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Batch struct {
finalRemainingResources state.BatchResources // remaining batch resources when a L2 block is processed
finalHighReservedZKCounters state.ZKCounters
closingReason state.ClosingReason
finalLocalExitRoot common.Hash
}

func (b *Batch) isEmpty() bool {
Expand Down Expand Up @@ -99,6 +100,7 @@ func (f *finalizer) setWIPBatch(ctx context.Context, wipStateBatch *state.Batch)
finalRemainingResources: remainingResources,
imHighReservedZKCounters: wipStateBatch.HighReservedZKCounters,
finalHighReservedZKCounters: wipStateBatch.HighReservedZKCounters,
finalLocalExitRoot: wipStateBatch.LocalExitRoot,
}

return wipBatch, nil
Expand Down Expand Up @@ -312,6 +314,7 @@ func (f *finalizer) openNewWIPBatch(batchNumber uint64, stateRoot common.Hash) *
imRemainingResources: maxRemainingResources,
finalRemainingResources: maxRemainingResources,
closingReason: state.EmptyClosingReason,
finalLocalExitRoot: state.ZeroHash,
}
}

Expand All @@ -336,6 +339,8 @@ func (f *finalizer) insertSIPBatch(ctx context.Context, batchNumber uint64, stat

// Send batch bookmark to the datastream
f.DSSendBatchBookmark(batchNumber)
// Send batch start to the datastream
f.DSSendBatchStart(batchNumber, false)

// Check if synchronizer is up-to-date
//TODO: review if this is needed
Expand Down Expand Up @@ -400,6 +405,9 @@ func (f *finalizer) closeSIPBatch(ctx context.Context, dbTx pgx.Tx) error {
}()
}

// Sent batch to DS
f.DSSendBatchEnd(f.wipBatch.batchNumber, f.wipBatch.finalStateRoot, f.wipBatch.finalLocalExitRoot)

log.Infof("sip batch %d closed in statedb, closing reason: %s", f.sipBatch.batchNumber, f.sipBatch.closingReason)

f.sipBatch = nil
Expand Down
60 changes: 53 additions & 7 deletions sequencer/datastreamer.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
package sequencer

import (
"context"

"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/datastream"
"github.com/ethereum/go-ethereum/common"
)

func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error {
func (f *finalizer) DSSendL2Block(ctx context.Context, batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32, minTimestamp uint64, blockHash common.Hash) error {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)

// Send data to streamer
if f.streamServer != nil {
l2Block := state.DSL2Block{
BatchNumber: batchNumber,
L2BlockNumber: blockResponse.BlockNumber,
Timestamp: int64(blockResponse.Timestamp),
Timestamp: blockResponse.Timestamp,
MinTimestamp: minTimestamp,
L1InfoTreeIndex: l1InfoTreeIndex,
L1BlockHash: blockResponse.BlockHashL1,
GlobalExitRoot: blockResponse.GlobalExitRoot,
Coinbase: f.sequencerAddress,
ForkID: uint16(forkID),
BlockHash: blockResponse.BlockHash,
ForkID: forkID,
BlockHash: blockHash,
StateRoot: blockResponse.BlockHash, //From etrog, the blockhash is the block root
BlockInfoRoot: blockResponse.BlockInfoRoot,
}

if l2Block.ForkID >= state.FORKID_ETROG && l2Block.L1InfoTreeIndex == 0 {
l2Block.MinTimestamp = 0
}

l2Transactions := []state.DSL2Transaction{}

for _, txResponse := range blockResponse.TransactionResponses {
for i, txResponse := range blockResponse.TransactionResponses {
binaryTxData, err := txResponse.Tx.MarshalBinary()
if err != nil {
return err
Expand All @@ -34,12 +44,17 @@ func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.Proce
l2Transaction := state.DSL2Transaction{
L2BlockNumber: blockResponse.BlockNumber,
EffectiveGasPricePercentage: uint8(txResponse.EffectivePercentage),
Index: uint64(i),
IsValid: 1,
EncodedLength: uint32(len(binaryTxData)),
Encoded: binaryTxData,
StateRoot: txResponse.StateRoot,
}

if txResponse.Logs != nil && len(txResponse.Logs) > 0 {
l2Transaction.Index = uint64(txResponse.Logs[0].TxIndex)
}

l2Transactions = append(l2Transactions, l2Transaction)
}

Expand All @@ -57,9 +72,40 @@ func (f *finalizer) DSSendBatchBookmark(batchNumber uint64) {
// Check if stream server enabled
if f.streamServer != nil {
// Send batch bookmark to the streamer
f.dataToStream <- state.DSBookMark{
Type: state.BookMarkTypeBatch,
f.dataToStream <- datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH,
Value: batchNumber,
}
}
}

func (f *finalizer) DSSendBatchStart(batchNumber uint64, isForced bool) {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)

batchStart := datastream.BatchStart{
Number: batchNumber,
ForkId: forkID,
}

if isForced {
batchStart.Type = datastream.BatchType_BATCH_TYPE_FORCED
} else {
batchStart.Type = datastream.BatchType_BATCH_TYPE_REGULAR
}

if f.streamServer != nil {
// Send batch start to the streamer
f.dataToStream <- batchStart
}
}

func (f *finalizer) DSSendBatchEnd(batchNumber uint64, stateRoot common.Hash, localExitRoot common.Hash) {
if f.streamServer != nil {
// Send batch end to the streamer
f.dataToStream <- datastream.BatchEnd{
Number: batchNumber,
StateRoot: stateRoot.Bytes(),
LocalExitRoot: localExitRoot.Bytes(),
}
}
}
3 changes: 2 additions & 1 deletion sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,9 @@ func TestFinalizer_finalizeSIPBatch(t *testing.T) {

// arrange
stateMock.On("BeginStateTransaction", ctx).Return(dbTxMock, nilErr).Once()
stateMock.On("GetForkIDByBatchNumber", mock.Anything).Return(uint64(state.FORKID_BLUEBERRY))
stateMock.On("CloseWIPBatch", ctx, receipt, mock.Anything).Return(tc.managerErr).Once()

stateMock.On("GetForkIDByBatchNumber", mock.Anything).Return(uint64(state.FORKID_BLUEBERRY))
if tc.managerErr == nil {
stateMock.On("GetBatchByNumber", ctx, f.sipBatch.batchNumber, nil).Return(&state.Batch{BatchNumber: f.sipBatch.batchNumber}, nilErr).Once()
stateMock.On("GetForkIDByBatchNumber", f.wipBatch.batchNumber).Return(uint64(9)).Once()
Expand Down
4 changes: 2 additions & 2 deletions sequencer/forcedbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, newBat
// process L2 blocks responses for the forced batch
for _, forcedL2BlockResponse := range batchResponse.BlockResponses {
// Store forced L2 blocks in the state
err := f.stateIntf.StoreL2Block(ctx, newBatchNumber, forcedL2BlockResponse, nil, dbTx)
blockHash, err := f.stateIntf.StoreL2Block(ctx, newBatchNumber, forcedL2BlockResponse, nil, dbTx)
if err != nil {
return fmt.Errorf("database error on storing L2 block %d, error: %v", forcedL2BlockResponse.BlockNumber, err)
}
Expand All @@ -198,7 +198,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, newBat
}

// Send L2 block to data streamer
err = f.DSSendL2Block(newBatchNumber, forcedL2BlockResponse, 0)
err = f.DSSendL2Block(ctx, newBatchNumber, forcedL2BlockResponse, 0, forcedL2BlockResponse.Timestamp, blockHash)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d to data streamer, error: %v", forcedL2BlockResponse.BlockNumber, err)
Expand Down
2 changes: 1 addition & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type stateInterface interface {
GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSL2Block, error)
GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*state.DSL2Transaction, error)
GetStorageAt(ctx context.Context, address common.Address, position *big.Int, root common.Hash) (*big.Int, error)
StoreL2Block(ctx context.Context, batchNumber uint64, l2Block *state.ProcessBlockResponse, txsEGPLog []*state.EffectiveGasPriceLog, dbTx pgx.Tx) error
StoreL2Block(ctx context.Context, batchNumber uint64, l2Block *state.ProcessBlockResponse, txsEGPLog []*state.EffectiveGasPriceLog, dbTx pgx.Tx) (common.Hash, error)
BuildChangeL2Block(deltaTimestamp uint32, l1InfoTreeIndex uint32) []byte
GetL1InfoTreeDataFromBatchL2Data(ctx context.Context, batchL2Data []byte, dbTx pgx.Tx) (map[uint32]state.L1DataV2, common.Hash, common.Hash, error)
GetBlockByNumber(ctx context.Context, blockNumber uint64, dbTx pgx.Tx) (*state.Block, error)
Expand Down
7 changes: 4 additions & 3 deletions sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error
return fmt.Errorf(overflowLog)
}

// Update finalStateRoot of the batch to the newStateRoot for the L2 block
// Update finalStateRoot/finalLocalExitRoot of the batch to the newStateRoot/newLocalExitRoot for the L2 block
l2Block.batch.finalStateRoot = l2Block.batchResponse.NewStateRoot
l2Block.batch.finalLocalExitRoot = l2Block.batchResponse.NewLocalExitRoot

f.updateFlushIDs(batchResponse.FlushID, batchResponse.StoredFlushID)

Expand Down Expand Up @@ -418,7 +419,7 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
}

// Store L2 block in the state
err = f.stateIntf.StoreL2Block(ctx, l2Block.batch.batchNumber, blockResponse, txsEGPLog, dbTx)
blockHash, err := f.stateIntf.StoreL2Block(ctx, l2Block.batch.batchNumber, blockResponse, txsEGPLog, dbTx)
if err != nil {
return rollbackOnError(fmt.Errorf("database error on storing L2 block %d [%d], error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err))
}
Expand Down Expand Up @@ -485,7 +486,7 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
}

// Send L2 block to data streamer
err = f.DSSendL2Block(f.wipBatch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex())
err = f.DSSendL2Block(ctx, l2Block.batch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex(), l2Block.timestamp, blockHash)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d [%d] to data streamer, error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err)
Expand Down
22 changes: 17 additions & 5 deletions sequencer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d34c824

Please sign in to comment.