Skip to content

Commit

Permalink
Adds operator ejections dataapi endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Oct 16, 2024
1 parent 5620f73 commit 729d1ca
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 3 deletions.
15 changes: 15 additions & 0 deletions disperser/dataapi/queried_operators_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,21 @@ func (s *server) getRegisteredOperatorForDays(ctx context.Context, days int32) (
return RegisteredOperatorMetadata, nil
}

// Function to get ejected operators for given number of days
// Queries subgraph for ejected operators
// Returns list of Operators with their quorum, block number, txn and timestemp they were ejected
func (s *server) getEjectedOperatorForDays(ctx context.Context, days int32) ([]*QueriedOperatorEjections, error) {
startTime := time.Now()

operatorEjections, err := s.subgraphClient.QueryIndexedOperatorEjectionsForTimeWindow(ctx, days)
if err != nil {
return nil, err
}

s.logger.Info("Time taken to get ejected operators for days", "days", days, "duration", time.Since(startTime))
return operatorEjections, nil
}

func processOperatorOnlineCheck(queriedOperatorsInfo *IndexedQueriedOperatorInfo, operatorOnlineStatusresultsChan chan<- *QueriedStateOperatorMetadata, logger logging.Logger) {
operators := queriedOperatorsInfo.Operators
wp := workerpool.New(poolSize)
Expand Down
66 changes: 63 additions & 3 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ const (
maxOperatorsNonsigningPercentageAge = 10
maxOperatorPortCheckAge = 60
maxNonSignerAge = 10
maxDeregisteredOperatorAage = 10
maxDeregisteredOperatorAge = 10
maxEjectedOperatorAge = 10
maxThroughputAge = 10
maxMetricAage = 10
maxFeedBlobsAge = 10
Expand Down Expand Up @@ -145,6 +146,17 @@ type (
Data []*QueriedStateOperatorMetadata `json:"data"`
}

QueriedOperatorEjections struct {
OperatorId string `json:"operator_id"`
Quorum uint8 `json:"quorum"`
BlockNumber uint `json:"block_number"`
BlockTimestamp string `json:"block_timestamp"`
TransactionHash string `json:"transaction_hash"`
}
QueriedOperatorEjectionsResponse struct {
Ejections []*QueriedOperatorEjections `json:"ejections"`
}

ServiceAvailability struct {
ServiceName string `json:"service_name"`
ServiceStatus string `json:"service_status"`
Expand Down Expand Up @@ -264,6 +276,7 @@ func (s *server) Start() error {
operatorsInfo := v1.Group("/operators-info")
{
operatorsInfo.GET("/deregistered-operators", s.FetchDeregisteredOperators)
operatorsInfo.GET("/ejected-operators", s.FetchEjectedOperators)
operatorsInfo.GET("/registered-operators", s.FetchRegisteredOperators)
operatorsInfo.GET("/port-check", s.OperatorPortCheck)
operatorsInfo.GET("/semver-scan", s.SemverScan)
Expand Down Expand Up @@ -801,7 +814,7 @@ func (s *server) FetchDeregisteredOperators(c *gin.Context) {
}

s.metrics.IncrementSuccessfulRequestNum("FetchDeregisteredOperators")
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxDeregisteredOperatorAage))
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxDeregisteredOperatorAge))
c.JSON(http.StatusOK, QueriedStateOperatorsResponse{
Meta: Meta{
Size: len(operatorMetadatas),
Expand Down Expand Up @@ -850,7 +863,7 @@ func (s *server) FetchRegisteredOperators(c *gin.Context) {
}

s.metrics.IncrementSuccessfulRequestNum("FetchRegisteredOperators")
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxDeregisteredOperatorAage))
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxDeregisteredOperatorAge))
c.JSON(http.StatusOK, QueriedStateOperatorsResponse{
Meta: Meta{
Size: len(operatorMetadatas),
Expand All @@ -859,6 +872,53 @@ func (s *server) FetchRegisteredOperators(c *gin.Context) {
})
}

// FetchEjectedOperators godoc
//
// @Summary Fetch list of operators that have been ejected over days. Days is a query parameter with a default value of 1 and max value of 30.
// @Tags OperatorsInfo
// @Produce json
// @Success 200 {object} QueriedStateOperatorsResponse
// @Failure 400 {object} ErrorResponse "error: Bad request"
// @Failure 404 {object} ErrorResponse "error: Not found"
// @Failure 500 {object} ErrorResponse "error: Server error"
// @Router /operators-info/ejected-operators [get]
func (s *server) FetchEjectedOperators(c *gin.Context) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("FetchEjectedOperators", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()

// Get query parameters
// Default Value 14 days
days := c.DefaultQuery("days", "1") // If not specified, defaults to 1

// Convert days to integer
daysInt, err := strconv.Atoi(days)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid 'days' parameter"})
return
}

if daysInt > 30 {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid 'days' parameter. Max value is 30"})
return
}

operatorEjections, err := s.getEjectedOperatorForDays(c.Request.Context(), int32(daysInt))
if err != nil {
s.logger.Error("Failed to fetch ejected operators", "error", err)
s.metrics.IncrementFailedRequestNum("FetchEjectedOperators")
errorResponse(c, err)
return
}

s.metrics.IncrementSuccessfulRequestNum("FetchEjectedOperators")
c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxEjectedOperatorAge))
c.JSON(http.StatusOK, QueriedOperatorEjectionsResponse{
Ejections: operatorEjections,
})
}

// OperatorPortCheck godoc
//
// @Summary Operator node reachability port check
Expand Down
33 changes: 33 additions & 0 deletions disperser/dataapi/subgraph/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type (
QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context.Context, operatorId string, blockNumber uint32) (*IndexedOperatorInfo, error)
QueryOperatorAddedToQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error)
QueryOperatorRemovedFromQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error)
QueryOperatorEjectionsByOperatorId(ctx context.Context, operatorId string, blockTimestamp uint64) ([]*OperatorEjection, error)
QueryOperatorEjectionsGteBlockTimestamp(ctx context.Context, blockTimestamp uint64) ([]*OperatorEjection, error)
}

api struct {
Expand Down Expand Up @@ -209,6 +211,37 @@ func (a *api) QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context.Context, op
return &query.Operator, nil
}

func (a *api) QueryOperatorEjectionsByOperatorId(ctx context.Context, operatorId string, blockTimestamp uint64) ([]*OperatorEjection, error) {
var (
query queryOperatorEjectedsByOperatorID
variables = map[string]any{
"blockTimestamp_gte": graphql.Int(blockTimestamp),
"operatorId": graphql.String(fmt.Sprintf("0x%s", operatorId)),
}
)
err := a.operatorStateGql.Query(context.Background(), &query, variables)
if err != nil {
return nil, err
}

return query.OperatorEjections, nil
}

func (a *api) QueryOperatorEjectionsGteBlockTimestamp(ctx context.Context, blockTimestamp uint64) ([]*OperatorEjection, error) {
var (
query queryOperatorEjectedsGteBlockTimestamp
variables = map[string]any{
"blockTimestamp_gte": graphql.Int(blockTimestamp),
}
)
err := a.operatorStateGql.Query(context.Background(), &query, variables)
if err != nil {
return nil, err
}

return query.OperatorEjections, nil
}

// QueryOperatorAddedToQuorum finds operators' quorum opt-in history in range [startBlock, endBlock].
func (a *api) QueryOperatorAddedToQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error) {
if startBlock > endBlock {
Expand Down
13 changes: 13 additions & 0 deletions disperser/dataapi/subgraph/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ type (
BlockNumber graphql.String
BlockTimestamp graphql.String
}
OperatorEjection struct {
OperatorId graphql.String `graphql:"operatorId"`
QuorumNumber graphql.String `graphql:"quorumNumber"`
BlockNumber graphql.String `graphql:"blockNumber"`
BlockTimestamp graphql.String `graphql:"blockTimestamp"`
TransactionHash graphql.String `graphql:"transactionHash"`
}
BatchNonSigningOperatorIds struct {
NonSigning struct {
NonSigners []struct {
Expand Down Expand Up @@ -105,4 +112,10 @@ type (
queryOperatorRemovedFromQuorum struct {
OperatorRemovedFromQuorum []*OperatorQuorum `graphql:"operatorRemovedFromQuorums(first: $first, skip: $skip, orderBy: blockTimestamp, where: {and: [{blockNumber_gt: $blockNumber_gt}, {blockNumber_lt: $blockNumber_lt}]})"`
}
queryOperatorEjectedsGteBlockTimestamp struct {
OperatorEjections []*OperatorEjection `graphql:"operatorEjecteds(orderBy: blockTimestamp, where: {blockTimestamp_gte: $blockTimestamp_gte})"`
}
queryOperatorEjectedsByOperatorID struct {
OperatorEjections []*OperatorEjection `graphql:"operatorEjecteds(orderBy: blockTimestamp, where: {and: [{blockTimestamp_gte: $blockTimestamp_gte}, {operatorId: $operatorId}]})"`
}
)
51 changes: 51 additions & 0 deletions disperser/dataapi/subgraph_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type (
QueryOperatorQuorumEvent(ctx context.Context, startBlock, endBlock uint32) (*OperatorQuorumEvents, error)
QueryIndexedOperatorsWithStateForTimeWindow(ctx context.Context, days int32, state OperatorState) (*IndexedQueriedOperatorInfo, error)
QueryOperatorInfoByOperatorId(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error)
QueryIndexedOperatorEjectionsForTimeWindow(ctx context.Context, days int32) ([]*QueriedOperatorEjections, error)
}
Batch struct {
Id []byte
Expand Down Expand Up @@ -84,6 +85,14 @@ type (
IndexedQueriedOperatorInfo struct {
Operators map[core.OperatorID]*QueriedOperatorInfo
}

//IndexedQueriedOperatorEjections struct {
// OperatorId string
// QuorumNumber uint8
// BlockNumber uint
// BlockTimestamp string
// TransactionHash string
//}
NonSigner struct {
OperatorId string
Count int
Expand Down Expand Up @@ -274,6 +283,48 @@ func (sc *subgraphClient) QueryIndexedOperatorsWithStateForTimeWindow(ctx contex
}, nil
}

func (sc *subgraphClient) QueryIndexedOperatorEjectionsForTimeWindow(ctx context.Context, days int32) ([]*QueriedOperatorEjections, error) {
// Query all operators in the last N days.
lastNDayInSeconds := uint64(time.Now().Add(-time.Duration(days) * 24 * time.Hour).Unix())

ejections, err := sc.api.QueryOperatorEjectionsGteBlockTimestamp(ctx, lastNDayInSeconds)
if err != nil {
return nil, err
}

queriedEjections := make([]*QueriedOperatorEjections, len(ejections))
for i, ejection := range ejections {
quorumNumber, err := strconv.ParseUint(string(ejection.QuorumNumber), 10, 8)
if err != nil {
fmt.Println("Error parsing quorumNumber:", err)
return nil, err
}
blockNumber, err := strconv.ParseUint(string(ejection.BlockNumber), 10, 32)
if err != nil {
fmt.Println("Error parsing blockNumber:", err)
return nil, err
}

timestamp, err := strconv.ParseInt(string(ejection.BlockTimestamp), 10, 64)
if err != nil {
fmt.Println("Error parsing timestamp:", err)
return nil, err
}

t := time.Unix(timestamp, 0)
blockTimestamp := t.Format(time.RFC3339)
queriedEjections[i] = &QueriedOperatorEjections{
OperatorId: string(ejection.OperatorId),
Quorum: uint8(quorumNumber),
BlockNumber: uint(blockNumber),
BlockTimestamp: blockTimestamp,
TransactionHash: string(ejection.TransactionHash),
}
}

return queriedEjections, nil
}

func (sc *subgraphClient) QueryIndexedDeregisteredOperatorsForTimeWindow(ctx context.Context, days int32) (*IndexedQueriedOperatorInfo, error) {
// Query all deregistered operators in the last N days.
lastNDayInSeconds := uint64(time.Now().Add(-time.Duration(days) * 24 * time.Hour).Unix())
Expand Down

0 comments on commit 729d1ca

Please sign in to comment.