Skip to content

Commit

Permalink
feat: resend failed certificate (#172)
Browse files Browse the repository at this point in the history
- The `aggsender`is going to check periodically the status of certificates (`AggSender.CheckStatusCertificateInterval`) and rebuild certificate as soon as `InError`is detected.
- To avoid spamming Agglayer implements a rate limiter
- To avoid submit a certificate too close to end of a epoch avoid to send certificates after reaching a percentage of epoch.

## Configuration
```
[AggSender]
CheckStatusCertificateInterval = "5m"
```
- Rate limit configuration:
```
[AggSender.MaxSubmitCertificateRate]
		NumRequests = 20
		Interval = "1h"
```
It can be disabled setting `NumRequests` to 0

- Configuration of percentatge that we can't submit a new certificate to agglayer:
```
[AggSender]
MaxEpochPercentageAllowedToSendCertificate=80
```
A value of 0 or > 100 disable this check
  • Loading branch information
joanestebanr authored Feb 5, 2025
1 parent 17dca96 commit cea7306
Show file tree
Hide file tree
Showing 20 changed files with 589 additions and 42 deletions.
8 changes: 8 additions & 0 deletions agglayer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ type Certificate struct {
Metadata common.Hash `json:"metadata"`
}

// ID returns a string with the ident of this cert (height/certID)
func (c *Certificate) ID() string {
if c == nil {
return "cert{" + nilStr + "}"
}
return fmt.Sprintf("cert{height:%d, networkID:%d}", c.Height, c.NetworkID)
}

// Brief returns a string with a brief cert
func (c *Certificate) Brief() string {
if c == nil {
Expand Down
10 changes: 10 additions & 0 deletions agglayer/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,3 +1201,13 @@ func Test_UnmarshalClaimFromRollup(t *testing.T) {
require.NoError(t, err)
require.Equal(t, claim, unmarshalled)
}

func TestCertificate_ID(t *testing.T) {
var cert *Certificate
require.Equal(t, "cert{"+nilStr+"}", cert.ID())
cert = &Certificate{
NetworkID: 1,
Height: 2,
}
require.Equal(t, "cert{height:2, networkID:1}", cert.ID())
}
115 changes: 96 additions & 19 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ var (
zeroLER = common.HexToHash("0x27ae5ba08d7291c96c8cbddcc148bf48a6d68c7974b94356f53754ef6171d757")
)

type RateLimiter interface {
Call(msg string, allowToSleep bool) *time.Duration
String() string
}

// AggSender is a component that will send certificates to the aggLayer
type AggSender struct {
log types.Logger
Expand All @@ -48,10 +53,11 @@ type AggSender struct {

sequencerKey *ecdsa.PrivateKey

status types.AggsenderStatus
status types.AggsenderStatus
rateLimiter RateLimiter
}

// New returns a new AggSender
// New returns a new AggSender instance
func New(
ctx context.Context,
logger *log.Logger,
Expand All @@ -73,6 +79,7 @@ func New(
if err != nil {
return nil, err
}
rateLimit := aggkitcommon.NewRateLimit(cfg.MaxSubmitCertificateRate)

logger.Infof("Aggsender Config: %s.", cfg.String())

Expand All @@ -86,6 +93,7 @@ func New(
sequencerKey: sequencerPrivateKey,
epochNotifier: epochNotifier,
status: types.AggsenderStatus{Status: types.StatusNone},
rateLimiter: rateLimit,
}, nil
}

Expand Down Expand Up @@ -119,7 +127,7 @@ func (a *AggSender) Start(ctx context.Context) {
a.log.Info("AggSender started")
a.status.Start(time.Now().UTC())
a.checkInitialStatus(ctx)
a.sendCertificates(ctx)
a.sendCertificates(ctx, 0)
}

// checkInitialStatus check local status vs agglayer status
Expand All @@ -146,15 +154,48 @@ func (a *AggSender) checkInitialStatus(ctx context.Context) {
}

// sendCertificates sends certificates to the aggLayer
func (a *AggSender) sendCertificates(ctx context.Context) {
func (a *AggSender) sendCertificates(ctx context.Context, returnAfterNIterations int) {
var checkCertChannel <-chan time.Time
if a.cfg.CheckStatusCertificateInterval.Duration > 0 {
checkCertTicker := time.NewTicker(a.cfg.CheckStatusCertificateInterval.Duration)
defer checkCertTicker.Stop()
checkCertChannel = checkCertTicker.C
} else {
a.log.Infof("CheckStatusCertificateInterval is 0, so we are not going to check the certificate status")
checkCertChannel = make(chan time.Time)
}

chEpoch := a.epochNotifier.Subscribe("aggsender")
a.status.Status = types.StatusCertificateStage
iteration := 0
for {
select {
case <-checkCertChannel:
iteration++
a.log.Debugf("Checking perodical certificates status (%s)",
a.cfg.CheckCertConfigBriefString())
checkResult := a.checkPendingCertificatesStatus(ctx)
if !checkResult.existPendingCerts && checkResult.existNewInErrorCert {
if a.cfg.RetryCertAfterInError {
a.log.Infof("An InError cert exists. Sending a new one (%s)", a.cfg.CheckCertConfigBriefString())
_, err := a.sendCertificate(ctx)
a.status.SetLastError(err)
if err != nil {
a.log.Error(err)
}
} else {
a.log.Infof("An InError cert exists but skipping send cert because RetryCertInmediatlyAfterInError is false")
}
}
if returnAfterNIterations > 0 && iteration >= returnAfterNIterations {
a.log.Warnf("reached number of iterations, so we are going to return")
return
}
case epoch := <-chEpoch:
iteration++
a.log.Infof("Epoch received: %s", epoch.String())
thereArePendingCerts := a.checkPendingCertificatesStatus(ctx)
if !thereArePendingCerts {
checkResult := a.checkPendingCertificatesStatus(ctx)
if !checkResult.existPendingCerts {
_, err := a.sendCertificate(ctx)
a.status.SetLastError(err)
if err != nil {
Expand All @@ -164,6 +205,11 @@ func (a *AggSender) sendCertificates(ctx context.Context) {
log.Infof("Skipping epoch %s because there are pending certificates",
epoch.String())
}

if returnAfterNIterations > 0 && iteration >= returnAfterNIterations {
a.log.Warnf("reached number of iterations, so we are going to return")
return
}
case <-ctx.Done():
a.log.Info("AggSender stopped")
return
Expand Down Expand Up @@ -245,6 +291,16 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayer.SignedCertif
return nil, fmt.Errorf("error signing certificate: %w", err)
}

if rateLimitSleepTime := a.rateLimiter.Call("sendCertificate", false); rateLimitSleepTime != nil {
a.log.Warnf("rate limit reached , next cert %s can be submitted after %s so sleeping. Rate:%s",
certificate.ID(),
rateLimitSleepTime.String(), a.rateLimiter.String())
time.Sleep(*rateLimitSleepTime)
}
if !a.isAllowedSendCertificateEpochPercent() {
return nil, fmt.Errorf("forbidden to send certificate due epoch percentage")
}

a.saveCertificateToFile(signedCertificate)
a.log.Infof("certificate ready to be send to AggLayer: %s", signedCertificate.Brief())
if a.cfg.DryRun {
Expand Down Expand Up @@ -288,6 +344,18 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayer.SignedCertif

return signedCertificate, nil
}
func (a *AggSender) isAllowedSendCertificateEpochPercent() bool {
if a.cfg.MaxEpochPercentageAllowedToSendCertificate == 0 ||
a.cfg.MaxEpochPercentageAllowedToSendCertificate >= maxPercent {
return true
}
status := a.epochNotifier.GetEpochStatus()
if status.PercentEpoch >= float64(a.cfg.MaxEpochPercentageAllowedToSendCertificate)/100.0 {
a.log.Warnf("forbidden to send certificate after epoch percentage: %f", status.PercentEpoch)
return false
}
return true
}

// saveCertificateToStorage saves the certificate to the storage
// it retries if it fails. if param retries == 0 it retries indefinitely
Expand Down Expand Up @@ -658,45 +726,54 @@ func (a *AggSender) signCertificate(certificate *agglayer.Certificate) (*agglaye
}, nil
}

type checkCertResult struct {
// existPendingCerts means that there are still pending certificates
existPendingCerts bool
// existNewInErrorCert means than in this run a cert pass from xxx to InError
existNewInErrorCert bool
}

// checkPendingCertificatesStatus checks the status of pending certificates
// and updates in the storage if it changed on agglayer
// It returns:
// bool -> if there are pending certificates
func (a *AggSender) checkPendingCertificatesStatus(ctx context.Context) bool {
func (a *AggSender) checkPendingCertificatesStatus(ctx context.Context) checkCertResult {
pendingCertificates, err := a.storage.GetCertificatesByStatus(agglayer.NonSettledStatuses)
if err != nil {
a.log.Errorf("error getting pending certificates: %w", err)
return true
return checkCertResult{existPendingCerts: true, existNewInErrorCert: false}
}

a.log.Debugf("checkPendingCertificatesStatus num of pendingCertificates: %d", len(pendingCertificates))
thereArePendingCerts := false

for _, certificate := range pendingCertificates {
certificateHeader, err := a.aggLayerClient.GetCertificateHeader(certificate.CertificateID)
appearsNewInErrorCert := false
for _, certificateLocal := range pendingCertificates {
certificateHeader, err := a.aggLayerClient.GetCertificateHeader(certificateLocal.CertificateID)
if err != nil {
a.log.Errorf("error getting certificate header of %s from agglayer: %w",
certificate.ID(), err)
return true
certificateLocal.ID(), err)
return checkCertResult{existPendingCerts: true, existNewInErrorCert: false}
}

a.log.Debugf("aggLayerClient.GetCertificateHeader status [%s] of certificate %s elapsed time:%s",
certificateHeader.Status,
certificateHeader.ID(),
certificate.ElapsedTimeSinceCreation())
certificateLocal.ElapsedTimeSinceCreation())
appearsNewInErrorCert = appearsNewInErrorCert ||
(!certificateLocal.Status.IsInError() && certificateHeader.Status.IsInError())

if err := a.updateCertificateStatus(ctx, certificate, certificateHeader); err != nil {
if err := a.updateCertificateStatus(ctx, certificateLocal, certificateHeader); err != nil {
a.log.Errorf("error updating certificate %s status in storage: %w", certificateHeader.String(), err)
return true
return checkCertResult{existPendingCerts: true, existNewInErrorCert: false}
}

if !certificate.IsClosed() {
if !certificateLocal.IsClosed() {
a.log.Infof("certificate %s is still pending, elapsed time:%s ",
certificateHeader.ID(), certificate.ElapsedTimeSinceCreation())
certificateHeader.ID(), certificateLocal.ElapsedTimeSinceCreation())
thereArePendingCerts = true
}
}
return thereArePendingCerts
return checkCertResult{existPendingCerts: thereArePendingCerts, existNewInErrorCert: appearsNewInErrorCert}
}

// updateCertificate updates the certificate status in the storage
Expand Down
Loading

0 comments on commit cea7306

Please sign in to comment.