Skip to content

Commit

Permalink
Second round of misc improvements to dataapi (#1245)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Feb 8, 2025
1 parent ecd8171 commit 36119d7
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 52 deletions.
7 changes: 7 additions & 0 deletions disperser/dataapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ type Config struct {
ChurnerHostname string
BatcherHealthEndpt string
}

type DataApiVersion uint

const (
V1 DataApiVersion = 1
V2 DataApiVersion = 2
)
15 changes: 15 additions & 0 deletions disperser/dataapi/docs/v2/V2_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,17 +1306,32 @@ const docTemplateV2 = `{
"v2.OperatorsSigningInfoResponse": {
"type": "object",
"properties": {
"end_block": {
"type": "integer"
},
"end_time_unix_sec": {
"type": "integer"
},
"operator_signing_info": {
"type": "array",
"items": {
"$ref": "#/definitions/v2.OperatorSigningInfo"
}
},
"start_block": {
"type": "integer"
},
"start_time_unix_sec": {
"type": "integer"
}
}
},
"v2.OperatorsStakeResponse": {
"type": "object",
"properties": {
"current_block": {
"type": "integer"
},
"stake_ranked_operators": {
"type": "object",
"additionalProperties": {
Expand Down
15 changes: 15 additions & 0 deletions disperser/dataapi/docs/v2/V2_swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1303,17 +1303,32 @@
"v2.OperatorsSigningInfoResponse": {
"type": "object",
"properties": {
"end_block": {
"type": "integer"
},
"end_time_unix_sec": {
"type": "integer"
},
"operator_signing_info": {
"type": "array",
"items": {
"$ref": "#/definitions/v2.OperatorSigningInfo"
}
},
"start_block": {
"type": "integer"
},
"start_time_unix_sec": {
"type": "integer"
}
}
},
"v2.OperatorsStakeResponse": {
"type": "object",
"properties": {
"current_block": {
"type": "integer"
},
"stake_ranked_operators": {
"type": "object",
"additionalProperties": {
Expand Down
10 changes: 10 additions & 0 deletions disperser/dataapi/docs/v2/V2_swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -448,13 +448,23 @@ definitions:
type: object
v2.OperatorsSigningInfoResponse:
properties:
end_block:
type: integer
end_time_unix_sec:
type: integer
operator_signing_info:
items:
$ref: '#/definitions/v2.OperatorSigningInfo'
type: array
start_block:
type: integer
start_time_unix_sec:
type: integer
type: object
v2.OperatorsStakeResponse:
properties:
current_block:
type: integer
stake_ranked_operators:
additionalProperties:
items:
Expand Down
22 changes: 19 additions & 3 deletions disperser/dataapi/metrics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,24 @@ const (
type MetricsHandler struct {
// For accessing metrics info
promClient PrometheusClient
version DataApiVersion
}

func NewMetricsHandler(promClient PrometheusClient) *MetricsHandler {
func NewMetricsHandler(promClient PrometheusClient, version DataApiVersion) *MetricsHandler {
return &MetricsHandler{
promClient: promClient,
version: version,
}
}

func (mh *MetricsHandler) GetAvgThroughput(ctx context.Context, startTime int64, endTime int64) (float64, error) {
result, err := mh.promClient.QueryDisperserBlobSizeBytesPerSecond(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0))
var result *PrometheusResult
var err error
if mh.version == V1 {
result, err = mh.promClient.QueryDisperserBlobSizeBytesPerSecond(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0))
} else {
result, err = mh.promClient.QueryDisperserBlobSizeBytesPerSecondV2(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0))
}
if err != nil {
return 0, err
}
Expand All @@ -41,7 +49,15 @@ func (mh *MetricsHandler) GetThroughputTimeseries(ctx context.Context, startTime
if endTime-startTime >= 7*24*60*60 {
throughputRateSecs = uint16(sevenDayThroughputRateSecs)
}
result, err := mh.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0), throughputRateSecs)

var result *PrometheusResult
var err error
if mh.version == V1 {
result, err = mh.promClient.QueryDisperserAvgThroughputBlobSizeBytes(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0), throughputRateSecs)
} else {
result, err = mh.promClient.QueryDisperserAvgThroughputBlobSizeBytesV2(ctx, time.Unix(startTime, 0), time.Unix(endTime, 0), throughputRateSecs)
}

if err != nil {
return nil, err
}
Expand Down
27 changes: 10 additions & 17 deletions disperser/dataapi/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,8 @@ func checkServiceOnline(ctx context.Context, serviceName string, socket string,
return false, fmt.Sprintf("grpc available but %s service not found at %s", serviceName, socket)
}

func (oh *OperatorHandler) GetOperatorsStake(ctx context.Context, operatorId string) (*OperatorsStakeResponse, error) {
currentBlock, err := oh.indexedChainState.GetCurrentBlockNumber()
if err != nil {
return nil, fmt.Errorf("failed to fetch current block number: %w", err)
}
state, err := oh.chainState.GetOperatorState(ctx, currentBlock, []core.QuorumID{0, 1, 2})
func (oh *OperatorHandler) GetOperatorsStakeAtBlock(ctx context.Context, operatorId string, currentBlock uint32) (*OperatorsStakeResponse, error) {
state, err := oh.chainState.GetOperatorState(ctx, uint(currentBlock), []core.QuorumID{0, 1, 2})
if err != nil {
return nil, fmt.Errorf("failed to fetch indexed operator state: %w", err)
}
Expand All @@ -249,22 +245,19 @@ func (oh *OperatorHandler) GetOperatorsStake(ctx context.Context, operatorId str
}
}
}
stakeRanked["total"] = make([]*OperatorStake, 0)
for i, op := range tqs {
if len(operatorId) == 0 || operatorId == op.OperatorId.Hex() {
stakeRanked["total"] = append(stakeRanked["total"], &OperatorStake{
QuorumId: "total",
OperatorId: op.OperatorId.Hex(),
StakePercentage: op.StakeShare / 100.0,
Rank: i + 1,
})
}
}
return &OperatorsStakeResponse{
StakeRankedOperators: stakeRanked,
}, nil
}

func (oh *OperatorHandler) GetOperatorsStake(ctx context.Context, operatorId string) (*OperatorsStakeResponse, error) {
currentBlock, err := oh.indexedChainState.GetCurrentBlockNumber()
if err != nil {
return nil, fmt.Errorf("failed to fetch current block number: %w", err)
}
return oh.GetOperatorsStakeAtBlock(ctx, operatorId, uint32(currentBlock))
}

func (s *OperatorHandler) ScanOperatorsHostInfo(ctx context.Context) (*SemverReportResponse, error) {
currentBlock, err := s.indexedChainState.GetCurrentBlockNumber()
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions disperser/dataapi/prometheus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type (
PrometheusClient interface {
QueryDisperserBlobSizeBytesPerSecond(ctx context.Context, start time.Time, end time.Time) (*PrometheusResult, error)
QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, windowSizeInSec uint16) (*PrometheusResult, error)
QueryDisperserBlobSizeBytesPerSecondV2(ctx context.Context, start time.Time, end time.Time) (*PrometheusResult, error)
QueryDisperserAvgThroughputBlobSizeBytesV2(ctx context.Context, start time.Time, end time.Time, windowSizeInSec uint16) (*PrometheusResult, error)
}

PrometheusResultValues struct {
Expand Down Expand Up @@ -46,11 +48,21 @@ func (pc *prometheusClient) QueryDisperserBlobSizeBytesPerSecond(ctx context.Con
return pc.queryRange(ctx, query, start, end)
}

func (pc *prometheusClient) QueryDisperserBlobSizeBytesPerSecondV2(ctx context.Context, start time.Time, end time.Time) (*PrometheusResult, error) {
query := fmt.Sprintf("eigenda_dispatcher_completed_blobs_total{state=\"complete\",data=\"size\",cluster=\"%s\"}", pc.cluster)
return pc.queryRange(ctx, query, start, end)
}

func (pc *prometheusClient) QueryDisperserAvgThroughputBlobSizeBytes(ctx context.Context, start time.Time, end time.Time, throughputRateSecs uint16) (*PrometheusResult, error) {
query := fmt.Sprintf("avg_over_time( sum by (job) (rate(eigenda_batcher_blobs_total{state=\"confirmed\",data=\"size\",cluster=\"%s\"}[%ds])) [9m:])", pc.cluster, throughputRateSecs)
return pc.queryRange(ctx, query, start, end)
}

func (pc *prometheusClient) QueryDisperserAvgThroughputBlobSizeBytesV2(ctx context.Context, start time.Time, end time.Time, throughputRateSecs uint16) (*PrometheusResult, error) {
query := fmt.Sprintf("avg_over_time( sum by (job) (rate(eigenda_dispatcher_completed_blobs_total{state=\"complete\",data=\"size\",cluster=\"%s\"}[%ds])) [9m:])", pc.cluster, throughputRateSecs)
return pc.queryRange(ctx, query, start, end)
}

func (pc *prometheusClient) queryRange(ctx context.Context, query string, start time.Time, end time.Time) (*PrometheusResult, error) {
numSecondsInTimeRange := end.Sub(start).Seconds()
step := uint64(numSecondsInTimeRange / maxNumOfDataPoints)
Expand Down
3 changes: 2 additions & 1 deletion disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type (
}

OperatorsStakeResponse struct {
CurrentBlock uint32 `json:"current_block"`
StakeRankedOperators map[string][]*OperatorStake `json:"stake_ranked_operators"`
}

Expand Down Expand Up @@ -267,7 +268,7 @@ func NewServer(
eigenDAGRPCServiceChecker: eigenDAGRPCServiceChecker,
eigenDAHttpServiceChecker: eigenDAHttpServiceChecker,
operatorHandler: NewOperatorHandler(logger, metrics, transactor, chainState, indexedChainState, subgraphClient),
metricsHandler: NewMetricsHandler(promClient),
metricsHandler: NewMetricsHandler(promClient, V1),
}
}

Expand Down
72 changes: 52 additions & 20 deletions disperser/dataapi/v2/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ type (
StakePercentage float64 `json:"stake_percentage"`
}
OperatorsSigningInfoResponse struct {
StartBlock uint32 `json:"start_block"`
EndBlock uint32 `json:"end_block"`
StartTimeUnixSec int64 `json:"start_time_unix_sec"`
EndTimeUnixSec int64 `json:"end_time_unix_sec"`
OperatorSigningInfo []*OperatorSigningInfo `json:"operator_signing_info"`
}

Expand All @@ -139,6 +143,7 @@ type (
}

OperatorsStakeResponse struct {
CurrentBlock uint32 `json:"current_block"`
StakeRankedOperators map[string][]*OperatorStake `json:"stake_ranked_operators"`
}

Expand Down Expand Up @@ -214,7 +219,7 @@ func NewServerV2(
indexedChainState: indexedChainState,
metrics: metrics,
operatorHandler: dataapi.NewOperatorHandler(l, metrics, chainReader, chainState, indexedChainState, subgraphClient),
metricsHandler: dataapi.NewMetricsHandler(promClient),
metricsHandler: dataapi.NewMetricsHandler(promClient, dataapi.V2),
}
}

Expand Down Expand Up @@ -725,7 +730,12 @@ func (s *ServerV2) FetchOperatorSigningInfo(c *gin.Context) {
errorResponse(c, fmt.Errorf("failed to compute the operators signing info: %w", err))
return
}
startBlock, endBlock := computeBlockRange(attestations)
response := OperatorsSigningInfoResponse{
StartBlock: startBlock,
EndBlock: endBlock,
StartTimeUnixSec: startTime.Unix(),
EndTimeUnixSec: endTime.Unix(),
OperatorSigningInfo: signingInfo,
}

Expand Down Expand Up @@ -886,12 +896,19 @@ func (s *ServerV2) FetchOperatorsStake(c *gin.Context) {
operatorId := c.DefaultQuery("operator_id", "")
s.logger.Info("getting operators stake distribution", "operatorId", operatorId)

currentBlock, err := s.indexedChainState.GetCurrentBlockNumber()
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchOperatorsStake")
errorResponse(c, fmt.Errorf("failed to get current block number: %w", err))
return
}
operatorsStakeResponse, err := s.operatorHandler.GetOperatorsStake(c.Request.Context(), operatorId)
if err != nil {
s.metrics.IncrementFailedRequestNum("FetchOperatorsStake")
errorResponse(c, fmt.Errorf("failed to get operator stake - %s", err))
errorResponse(c, fmt.Errorf("failed to get operator stake: %w", err))
return
}
operatorsStakeResponse.CurrentBlock = uint32(currentBlock)

s.metrics.IncrementSuccessfulRequestNum("FetchOperatorsStake")
s.metrics.ObserveLatency("FetchOperatorsStake", time.Since(handlerStart))
Expand Down Expand Up @@ -1111,21 +1128,12 @@ func (s *ServerV2) computeOperatorsSigningInfo(

// Compute the block number range [startBlock, endBlock] (both inclusive) when the
// attestations have happened.
startBlock := attestations[0].ReferenceBlockNumber
endBlock := attestations[0].ReferenceBlockNumber
for i := range attestations {
if startBlock > attestations[i].ReferenceBlockNumber {
startBlock = attestations[i].ReferenceBlockNumber
}
if endBlock < attestations[i].ReferenceBlockNumber {
endBlock = attestations[i].ReferenceBlockNumber
}
}
startBlock, endBlock := computeBlockRange(attestations)

// Get quorum change events in range [startBlock+1, endBlock].
// We don't need the events at startBlock because we'll fetch all active operators and
// quorums at startBlock.
operatorQuorumEvents, err := s.subgraphClient.QueryOperatorQuorumEvent(ctx, uint32(startBlock+1), uint32(endBlock))
operatorQuorumEvents, err := s.subgraphClient.QueryOperatorQuorumEvent(ctx, startBlock+1, endBlock)
if err != nil {
return nil, err
}
Expand All @@ -1144,7 +1152,7 @@ func (s *ServerV2) computeOperatorsSigningInfo(
// increasing and non-overlapping block intervals during which the operator "op" is
// registered in quorum "q".
operatorQuorumIntervals, _, err := s.operatorHandler.CreateOperatorQuorumIntervals(
ctx, operatorList, operatorQuorumEvents, uint32(startBlock), uint32(endBlock),
ctx, operatorList, operatorQuorumEvents, startBlock, endBlock,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1196,17 +1204,24 @@ func (s *ServerV2) computeOperatorsSigningInfo(
s.logger.Error("Internal error: failed to find address for operatorId", "operatorId", operatorId)
}

// Signing percentage with 2 decimal (e.g. 95.75, which means 95.75%)
// Signing percentage with 8 decimal (e.g. 95.75000000, which means 95.75%).
// We need 8 decimal because if there is one attestation per second, then we
// need to have resolution 1/(3600*24*14), which is 8.26719577e-7. At this
// resolution we can capture the signing rate difference caused by 1 unsigned
// batch.
signingPercentage := math.Round(
(float64(numShouldHaveSigned-numFailedToSign)/float64(numShouldHaveSigned))*100*100,
) / 100
(float64(numShouldHaveSigned-numFailedToSign)/float64(numShouldHaveSigned))*100*1e8,
) / 1e8

stakePercentage := float64(0)
if stake, ok := state.Operators[q][op]; ok {
totalStake := new(big.Float).SetInt(state.Totals[q].Stake)
stakePercentage, _ = new(big.Float).Quo(
stakeRatio := new(big.Float).Quo(
new(big.Float).SetInt(stake.Stake),
totalStake).Float64()
totalStake,
)
stakeRatio.Mul(stakeRatio, big.NewFloat(100))
stakePercentage, _ = stakeRatio.Float64()
}

si := &OperatorSigningInfo{
Expand Down Expand Up @@ -1244,7 +1259,7 @@ func (s *ServerV2) computeOperatorsSigningInfo(
// - the operators that joined after startBlock
func (s *ServerV2) getOperatorsOfInterest(
ctx context.Context,
startBlock, endBlock uint64,
startBlock, endBlock uint32,
quorumIDs []uint8,
operatorQuorumEvents *dataapi.OperatorQuorumEvents,
) (*dataapi.OperatorList, error) {
Expand Down Expand Up @@ -1372,6 +1387,23 @@ func computeTotalNumBatchesPerQuorum(attestations []*corev2.Attestation) map[uin
return numBatchesPerQuorum
}

func computeBlockRange(attestations []*corev2.Attestation) (uint32, uint32) {
if len(attestations) == 0 {
return 0, 0
}
startBlock := attestations[0].ReferenceBlockNumber
endBlock := attestations[0].ReferenceBlockNumber
for i := range attestations {
if startBlock > attestations[i].ReferenceBlockNumber {
startBlock = attestations[i].ReferenceBlockNumber
}
if endBlock < attestations[i].ReferenceBlockNumber {
endBlock = attestations[i].ReferenceBlockNumber
}
}
return uint32(startBlock), uint32(endBlock)
}

func safeAccess(data map[string]map[uint8]int, i string, j uint8) (int, bool) {
innerMap, ok := data[i]
if !ok {
Expand Down
Loading

0 comments on commit 36119d7

Please sign in to comment.