From 729d1ca3adb9c60e6dc0dbae17332b54992fbc71 Mon Sep 17 00:00:00 2001 From: Patrick Schork <354473+pschork@users.noreply.github.com> Date: Tue, 15 Oct 2024 20:55:27 -0700 Subject: [PATCH] Adds operator ejections dataapi endpoint --- .../dataapi/queried_operators_handlers.go | 15 +++++ disperser/dataapi/server.go | 66 ++++++++++++++++++- disperser/dataapi/subgraph/api.go | 33 ++++++++++ disperser/dataapi/subgraph/queries.go | 13 ++++ disperser/dataapi/subgraph_client.go | 51 ++++++++++++++ 5 files changed, 175 insertions(+), 3 deletions(-) diff --git a/disperser/dataapi/queried_operators_handlers.go b/disperser/dataapi/queried_operators_handlers.go index d1856641f1..9f3779b4b1 100644 --- a/disperser/dataapi/queried_operators_handlers.go +++ b/disperser/dataapi/queried_operators_handlers.go @@ -96,6 +96,21 @@ func (s *server) getRegisteredOperatorForDays(ctx context.Context, days int32) ( return RegisteredOperatorMetadata, nil } +// Function to get ejected operators for given number of days +// Queries subgraph for ejected operators +// Returns list of Operators with their quorum, block number, txn and timestemp they were ejected +func (s *server) getEjectedOperatorForDays(ctx context.Context, days int32) ([]*QueriedOperatorEjections, error) { + startTime := time.Now() + + operatorEjections, err := s.subgraphClient.QueryIndexedOperatorEjectionsForTimeWindow(ctx, days) + if err != nil { + return nil, err + } + + s.logger.Info("Time taken to get ejected operators for days", "days", days, "duration", time.Since(startTime)) + return operatorEjections, nil +} + func processOperatorOnlineCheck(queriedOperatorsInfo *IndexedQueriedOperatorInfo, operatorOnlineStatusresultsChan chan<- *QueriedStateOperatorMetadata, logger logging.Logger) { operators := queriedOperatorsInfo.Operators wp := workerpool.New(poolSize) diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 63f6870e20..500d6c9b09 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -42,7 +42,8 @@ const ( maxOperatorsNonsigningPercentageAge = 10 maxOperatorPortCheckAge = 60 maxNonSignerAge = 10 - maxDeregisteredOperatorAage = 10 + maxDeregisteredOperatorAge = 10 + maxEjectedOperatorAge = 10 maxThroughputAge = 10 maxMetricAage = 10 maxFeedBlobsAge = 10 @@ -145,6 +146,17 @@ type ( Data []*QueriedStateOperatorMetadata `json:"data"` } + QueriedOperatorEjections struct { + OperatorId string `json:"operator_id"` + Quorum uint8 `json:"quorum"` + BlockNumber uint `json:"block_number"` + BlockTimestamp string `json:"block_timestamp"` + TransactionHash string `json:"transaction_hash"` + } + QueriedOperatorEjectionsResponse struct { + Ejections []*QueriedOperatorEjections `json:"ejections"` + } + ServiceAvailability struct { ServiceName string `json:"service_name"` ServiceStatus string `json:"service_status"` @@ -264,6 +276,7 @@ func (s *server) Start() error { operatorsInfo := v1.Group("/operators-info") { operatorsInfo.GET("/deregistered-operators", s.FetchDeregisteredOperators) + operatorsInfo.GET("/ejected-operators", s.FetchEjectedOperators) operatorsInfo.GET("/registered-operators", s.FetchRegisteredOperators) operatorsInfo.GET("/port-check", s.OperatorPortCheck) operatorsInfo.GET("/semver-scan", s.SemverScan) @@ -801,7 +814,7 @@ func (s *server) FetchDeregisteredOperators(c *gin.Context) { } s.metrics.IncrementSuccessfulRequestNum("FetchDeregisteredOperators") - c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxDeregisteredOperatorAage)) + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxDeregisteredOperatorAge)) c.JSON(http.StatusOK, QueriedStateOperatorsResponse{ Meta: Meta{ Size: len(operatorMetadatas), @@ -850,7 +863,7 @@ func (s *server) FetchRegisteredOperators(c *gin.Context) { } s.metrics.IncrementSuccessfulRequestNum("FetchRegisteredOperators") - c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxDeregisteredOperatorAage)) + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxDeregisteredOperatorAge)) c.JSON(http.StatusOK, QueriedStateOperatorsResponse{ Meta: Meta{ Size: len(operatorMetadatas), @@ -859,6 +872,53 @@ func (s *server) FetchRegisteredOperators(c *gin.Context) { }) } +// FetchEjectedOperators godoc +// +// @Summary Fetch list of operators that have been ejected over days. Days is a query parameter with a default value of 1 and max value of 30. +// @Tags OperatorsInfo +// @Produce json +// @Success 200 {object} QueriedStateOperatorsResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /operators-info/ejected-operators [get] +func (s *server) FetchEjectedOperators(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchEjectedOperators", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + // Get query parameters + // Default Value 14 days + days := c.DefaultQuery("days", "1") // If not specified, defaults to 1 + + // Convert days to integer + daysInt, err := strconv.Atoi(days) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid 'days' parameter"}) + return + } + + if daysInt > 30 { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid 'days' parameter. Max value is 30"}) + return + } + + operatorEjections, err := s.getEjectedOperatorForDays(c.Request.Context(), int32(daysInt)) + if err != nil { + s.logger.Error("Failed to fetch ejected operators", "error", err) + s.metrics.IncrementFailedRequestNum("FetchEjectedOperators") + errorResponse(c, err) + return + } + + s.metrics.IncrementSuccessfulRequestNum("FetchEjectedOperators") + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxEjectedOperatorAge)) + c.JSON(http.StatusOK, QueriedOperatorEjectionsResponse{ + Ejections: operatorEjections, + }) +} + // OperatorPortCheck godoc // // @Summary Operator node reachability port check diff --git a/disperser/dataapi/subgraph/api.go b/disperser/dataapi/subgraph/api.go index 0b4f645c9b..0efad1bfeb 100644 --- a/disperser/dataapi/subgraph/api.go +++ b/disperser/dataapi/subgraph/api.go @@ -27,6 +27,8 @@ type ( QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context.Context, operatorId string, blockNumber uint32) (*IndexedOperatorInfo, error) QueryOperatorAddedToQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error) QueryOperatorRemovedFromQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error) + QueryOperatorEjectionsByOperatorId(ctx context.Context, operatorId string, blockTimestamp uint64) ([]*OperatorEjection, error) + QueryOperatorEjectionsGteBlockTimestamp(ctx context.Context, blockTimestamp uint64) ([]*OperatorEjection, error) } api struct { @@ -209,6 +211,37 @@ func (a *api) QueryOperatorInfoByOperatorIdAtBlockNumber(ctx context.Context, op return &query.Operator, nil } +func (a *api) QueryOperatorEjectionsByOperatorId(ctx context.Context, operatorId string, blockTimestamp uint64) ([]*OperatorEjection, error) { + var ( + query queryOperatorEjectedsByOperatorID + variables = map[string]any{ + "blockTimestamp_gte": graphql.Int(blockTimestamp), + "operatorId": graphql.String(fmt.Sprintf("0x%s", operatorId)), + } + ) + err := a.operatorStateGql.Query(context.Background(), &query, variables) + if err != nil { + return nil, err + } + + return query.OperatorEjections, nil +} + +func (a *api) QueryOperatorEjectionsGteBlockTimestamp(ctx context.Context, blockTimestamp uint64) ([]*OperatorEjection, error) { + var ( + query queryOperatorEjectedsGteBlockTimestamp + variables = map[string]any{ + "blockTimestamp_gte": graphql.Int(blockTimestamp), + } + ) + err := a.operatorStateGql.Query(context.Background(), &query, variables) + if err != nil { + return nil, err + } + + return query.OperatorEjections, nil +} + // QueryOperatorAddedToQuorum finds operators' quorum opt-in history in range [startBlock, endBlock]. func (a *api) QueryOperatorAddedToQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error) { if startBlock > endBlock { diff --git a/disperser/dataapi/subgraph/queries.go b/disperser/dataapi/subgraph/queries.go index 9072f6c78b..6d73a95d05 100644 --- a/disperser/dataapi/subgraph/queries.go +++ b/disperser/dataapi/subgraph/queries.go @@ -35,6 +35,13 @@ type ( BlockNumber graphql.String BlockTimestamp graphql.String } + OperatorEjection struct { + OperatorId graphql.String `graphql:"operatorId"` + QuorumNumber graphql.String `graphql:"quorumNumber"` + BlockNumber graphql.String `graphql:"blockNumber"` + BlockTimestamp graphql.String `graphql:"blockTimestamp"` + TransactionHash graphql.String `graphql:"transactionHash"` + } BatchNonSigningOperatorIds struct { NonSigning struct { NonSigners []struct { @@ -105,4 +112,10 @@ type ( queryOperatorRemovedFromQuorum struct { OperatorRemovedFromQuorum []*OperatorQuorum `graphql:"operatorRemovedFromQuorums(first: $first, skip: $skip, orderBy: blockTimestamp, where: {and: [{blockNumber_gt: $blockNumber_gt}, {blockNumber_lt: $blockNumber_lt}]})"` } + queryOperatorEjectedsGteBlockTimestamp struct { + OperatorEjections []*OperatorEjection `graphql:"operatorEjecteds(orderBy: blockTimestamp, where: {blockTimestamp_gte: $blockTimestamp_gte})"` + } + queryOperatorEjectedsByOperatorID struct { + OperatorEjections []*OperatorEjection `graphql:"operatorEjecteds(orderBy: blockTimestamp, where: {and: [{blockTimestamp_gte: $blockTimestamp_gte}, {operatorId: $operatorId}]})"` + } ) diff --git a/disperser/dataapi/subgraph_client.go b/disperser/dataapi/subgraph_client.go index 5665641503..534cded45a 100644 --- a/disperser/dataapi/subgraph_client.go +++ b/disperser/dataapi/subgraph_client.go @@ -36,6 +36,7 @@ type ( QueryOperatorQuorumEvent(ctx context.Context, startBlock, endBlock uint32) (*OperatorQuorumEvents, error) QueryIndexedOperatorsWithStateForTimeWindow(ctx context.Context, days int32, state OperatorState) (*IndexedQueriedOperatorInfo, error) QueryOperatorInfoByOperatorId(ctx context.Context, operatorId string) (*core.IndexedOperatorInfo, error) + QueryIndexedOperatorEjectionsForTimeWindow(ctx context.Context, days int32) ([]*QueriedOperatorEjections, error) } Batch struct { Id []byte @@ -84,6 +85,14 @@ type ( IndexedQueriedOperatorInfo struct { Operators map[core.OperatorID]*QueriedOperatorInfo } + + //IndexedQueriedOperatorEjections struct { + // OperatorId string + // QuorumNumber uint8 + // BlockNumber uint + // BlockTimestamp string + // TransactionHash string + //} NonSigner struct { OperatorId string Count int @@ -274,6 +283,48 @@ func (sc *subgraphClient) QueryIndexedOperatorsWithStateForTimeWindow(ctx contex }, nil } +func (sc *subgraphClient) QueryIndexedOperatorEjectionsForTimeWindow(ctx context.Context, days int32) ([]*QueriedOperatorEjections, error) { + // Query all operators in the last N days. + lastNDayInSeconds := uint64(time.Now().Add(-time.Duration(days) * 24 * time.Hour).Unix()) + + ejections, err := sc.api.QueryOperatorEjectionsGteBlockTimestamp(ctx, lastNDayInSeconds) + if err != nil { + return nil, err + } + + queriedEjections := make([]*QueriedOperatorEjections, len(ejections)) + for i, ejection := range ejections { + quorumNumber, err := strconv.ParseUint(string(ejection.QuorumNumber), 10, 8) + if err != nil { + fmt.Println("Error parsing quorumNumber:", err) + return nil, err + } + blockNumber, err := strconv.ParseUint(string(ejection.BlockNumber), 10, 32) + if err != nil { + fmt.Println("Error parsing blockNumber:", err) + return nil, err + } + + timestamp, err := strconv.ParseInt(string(ejection.BlockTimestamp), 10, 64) + if err != nil { + fmt.Println("Error parsing timestamp:", err) + return nil, err + } + + t := time.Unix(timestamp, 0) + blockTimestamp := t.Format(time.RFC3339) + queriedEjections[i] = &QueriedOperatorEjections{ + OperatorId: string(ejection.OperatorId), + Quorum: uint8(quorumNumber), + BlockNumber: uint(blockNumber), + BlockTimestamp: blockTimestamp, + TransactionHash: string(ejection.TransactionHash), + } + } + + return queriedEjections, nil +} + func (sc *subgraphClient) QueryIndexedDeregisteredOperatorsForTimeWindow(ctx context.Context, days int32) (*IndexedQueriedOperatorInfo, error) { // Query all deregistered operators in the last N days. lastNDayInSeconds := uint64(time.Now().Add(-time.Duration(days) * 24 * time.Hour).Unix())