Skip to content

Commit

Permalink
feat: add aggsender RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
joanestebanr committed Dec 17, 2024
1 parent e3d6f5e commit 3c7ec15
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 3 deletions.
20 changes: 17 additions & 3 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type AggSender struct {
cfg Config

sequencerKey *ecdsa.PrivateKey

status types.AggsenderStatus
}

// New returns a new AggSender
Expand Down Expand Up @@ -82,9 +84,14 @@ func New(
l1infoTreeSyncer: l1InfoTreeSyncer,
sequencerKey: sequencerPrivateKey,
epochNotifier: epochNotifier,
status: types.AggsenderStatus{Status: types.StatusNone},
}, nil
}

func (a *AggSender) Status() types.AggsenderStatus {
return a.status
}

// GetRPCServices returns the list of services that the RPC provider exposes
func (a *AggSender) GetRPCServices() []jRPC.Service {
if !a.cfg.EnableRPC {
Expand All @@ -95,14 +102,16 @@ func (a *AggSender) GetRPCServices() []jRPC.Service {
return []jRPC.Service{
{
Name: "aggsender",
Service: aggsenderrpc.NewAggsenderRPC(logger),
Service: aggsenderrpc.NewAggsenderRPC(logger, a.storage, a),
},
}
}

// Start starts the AggSender
func (a *AggSender) Start(ctx context.Context) {
a.log.Info("AggSender started")
a.status.Running = true
a.status.StartTime = time.Now().UTC()
a.checkInitialStatus(ctx)
a.sendCertificates(ctx)
}
Expand All @@ -111,11 +120,13 @@ func (a *AggSender) Start(ctx context.Context) {
func (a *AggSender) checkInitialStatus(ctx context.Context) {
ticker := time.NewTicker(a.cfg.DelayBeetweenRetries.Duration)
defer ticker.Stop()

a.status.Status = types.StatusCheckingInitialStage
for {
if err := a.checkLastCertificateFromAgglayer(ctx); err != nil {
a.status.SetLastError(err)
a.log.Errorf("error checking initial status: %w, retrying in %s", err, a.cfg.DelayBeetweenRetries.String())
} else {
a.status.SetLastError(err)
a.log.Info("Initial status checked successfully")
return
}
Expand All @@ -130,13 +141,16 @@ func (a *AggSender) checkInitialStatus(ctx context.Context) {
// sendCertificates sends certificates to the aggLayer
func (a *AggSender) sendCertificates(ctx context.Context) {
chEpoch := a.epochNotifier.Subscribe("aggsender")
a.status.Status = types.StatusCertificateStage
for {
select {
case epoch := <-chEpoch:
a.log.Infof("Epoch received: %s", epoch.String())
thereArePendingCerts := a.checkPendingCertificatesStatus(ctx)
if !thereArePendingCerts {
if _, err := a.sendCertificate(ctx); err != nil {
_, err := a.sendCertificate(ctx)
a.status.SetLastError(err)
if err != nil {
a.log.Error(err)
}
} else {
Expand Down
72 changes: 72 additions & 0 deletions aggsender/rpc/aggsender_rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package aggsenderrpc

import (
"fmt"

zkevm "github.com/0xPolygon/cdk"
"github.com/0xPolygon/cdk-rpc/rpc"
"github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/log"
)

const (
base10 = 10
)

type aggsenderStorer interface {
GetCertificateByHeight(height uint64) (*types.CertificateInfo, error)
GetLastSentCertificate() (*types.CertificateInfo, error)
}

type aggsenderStatuser interface {
Status() types.AggsenderStatus
}

type AggsenderRPC struct {
logger *log.Logger
storage aggsenderStorer
aggsender aggsenderStatuser
}

func NewAggsenderRPC(
logger *log.Logger,
storage aggsenderStorer,
aggsender aggsenderStatuser,
) *AggsenderRPC {
return &AggsenderRPC{
logger: logger,
storage: storage,
aggsender: aggsender,
}
}

// curl -X POST http://localhost:5576/ -H "Con -application/json" \
// -d '{"method":"aggsender_status", "params":[], "id":1}'
func (b *AggsenderRPC) Status() (interface{}, rpc.Error) {
status := b.aggsender.Status()
res := struct {
Version zkevm.FullVersion `json:"version"`
Status types.AggsenderStatus `json:"status"`
EpochNotifier string `json:"epoch_notifier"`
}{
Version: zkevm.GetVersion(),
Status: status,
}
return res, nil
}

// latest: curl -X POST http://localhost:5576/ -H "Con -application/json" \
// -d '{"method":"aggsender_getCertificateHeaderPerHeight", "params":[], "id":1}'
func (b *AggsenderRPC) GetCertificateHeaderPerHeight(height *uint64) (interface{}, rpc.Error) {
var certInfo *types.CertificateInfo
var err error
if height == nil {
certInfo, err = b.storage.GetLastSentCertificate()
} else {
certInfo, err = b.storage.GetCertificateByHeight(*height)
}
if err != nil {
return nil, rpc.NewRPCError(rpc.DefaultErrorCode, fmt.Sprintf("error getting certificate by height: %v", err))
}
return certInfo, nil
}
39 changes: 39 additions & 0 deletions aggsender/rpcclient/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package rpcclient

import (
"encoding/json"
"fmt"

"github.com/0xPolygon/cdk-rpc/rpc"
"github.com/0xPolygon/cdk/aggsender/types"
)

// Client wraps all the available endpoints of the data abailability committee node server
type Client struct {
url string
}

func NewClient(url string) *Client {
return &Client{
url: url,
}
}

func (c *Client) GetCertificateHeaderPerHeight(height *uint64) (*types.CertificateInfo, error) {
response, err := rpc.JSONRPCCall(c.url, "aggsender_getCertificateHeaderPerHeight", height)
if err != nil {
return nil, err
}

// Check if the response is an error
if response.Error != nil {
return nil, fmt.Errorf("error in the response calling aggsender_getCertificateHeaderPerHeight: %v", response.Error)
}
cert := types.CertificateInfo{}
// Get the batch number from the response hex string
err = json.Unmarshal(response.Result, &cert)
if err != nil {
return nil, err
}
return &cert, nil
}
26 changes: 26 additions & 0 deletions aggsender/types/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package types

import "time"

type AggsenderStatusType string

const (
StatusNone AggsenderStatusType = "none"
StatusCheckingInitialStage AggsenderStatusType = "checking_initial_stage"
StatusCertificateStage AggsenderStatusType = "certificate_stage"
)

type AggsenderStatus struct {
Running bool `json:"running"`
StartTime time.Time `json:"start_time"`
Status AggsenderStatusType `json:"status"`
LastError string `json:"last_error"`
}

func (a *AggsenderStatus) SetLastError(err error) {
if err == nil {
a.LastError = ""
} else {
a.LastError = err.Error()
}
}
22 changes: 22 additions & 0 deletions version.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,25 @@ func PrintVersion(w io.Writer) {
fmt.Fprintf(w, "Built: %s\n", BuildDate)
fmt.Fprintf(w, "OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH)
}

type FullVersion struct {
Version string `json:"version"`
GitRev string
GitBranch string
BuildDate string
GoVersion string
OS string
Arch string
}

func GetVersion() FullVersion {
return FullVersion{
Version: Version,
GitRev: GitRev,
GitBranch: GitBranch,
BuildDate: BuildDate,
GoVersion: runtime.Version(),
OS: runtime.GOOS,
Arch: runtime.GOARCH,
}
}

0 comments on commit 3c7ec15

Please sign in to comment.