diff --git a/api/clients/node_client.go b/api/clients/node_client.go index f2284620f4..4a86d50ece 100644 --- a/api/clients/node_client.go +++ b/api/clients/node_client.go @@ -41,7 +41,7 @@ func (c client) GetBlobHeader( blobIndex uint32, ) (*core.BlobHeader, *merkletree.Proof, error) { conn, err := grpc.NewClient( - core.OperatorSocket(socket).GetRetrievalSocket(), + core.OperatorSocket(socket).GetV1RetrievalSocket(), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { @@ -86,7 +86,7 @@ func (c client) GetChunks( chunksChan chan RetrievedChunks, ) { conn, err := grpc.NewClient( - core.OperatorSocket(opInfo.Socket).GetRetrievalSocket(), + core.OperatorSocket(opInfo.Socket).GetV1RetrievalSocket(), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { diff --git a/api/clients/v2/retrieval_client.go b/api/clients/v2/retrieval_client.go index 8304274d42..c9046b566e 100644 --- a/api/clients/v2/retrieval_client.go +++ b/api/clients/v2/retrieval_client.go @@ -157,7 +157,8 @@ func (r *retrievalClient) getChunksFromOperator( chunksChan chan clients.RetrievedChunks, ) { conn, err := grpc.NewClient( - core.OperatorSocket(opInfo.Socket).GetRetrievalSocket(), + //TODO: Verify if this should point to V2RetrievalSocket + core.OperatorSocket(opInfo.Socket).GetV2RetrievalSocket(), grpc.WithTransportCredentials(insecure.NewCredentials()), ) defer func() { diff --git a/core/mock/state.go b/core/mock/state.go index 136f193e85..ffdd14fdaa 100644 --- a/core/mock/state.go +++ b/core/mock/state.go @@ -32,6 +32,7 @@ type PrivateOperatorInfo struct { DispersalPort string RetrievalPort string V2DispersalPort string + V2RetrievalPort string } type PrivateOperatorState struct { @@ -140,7 +141,8 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl dispersalPort := fmt.Sprintf("3%03v", 2*i) retrievalPort := fmt.Sprintf("3%03v", 2*i+1) v2DispersalPort := fmt.Sprintf("3%03v", 2*i+2) - socket := core.MakeOperatorSocket(host, dispersalPort, retrievalPort, v2DispersalPort) + v2RetrievalPort := fmt.Sprintf("3%03v", 2*i+3) + socket := core.MakeOperatorSocket(host, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort) indexed := &core.IndexedOperatorInfo{ Socket: string(socket), @@ -161,6 +163,7 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl DispersalPort: dispersalPort, RetrievalPort: retrievalPort, V2DispersalPort: v2DispersalPort, + V2RetrievalPort: v2RetrievalPort, } indexedOperators[id] = indexed diff --git a/core/serialization.go b/core/serialization.go index d69d03c462..5c06d682f2 100644 --- a/core/serialization.go +++ b/core/serialization.go @@ -528,7 +528,7 @@ func decode(data []byte, obj any) error { } func (s OperatorSocket) GetV1DispersalSocket() string { - ip, v1DispersalPort, _, _, err := ParseOperatorSocket(string(s)) + ip, v1DispersalPort, _, _, _, err := ParseOperatorSocket(string(s)) if err != nil { return "" } @@ -536,17 +536,25 @@ func (s OperatorSocket) GetV1DispersalSocket() string { } func (s OperatorSocket) GetV2DispersalSocket() string { - ip, _, _, v2DispersalPort, err := ParseOperatorSocket(string(s)) + ip, _, _, v2DispersalPort, _, err := ParseOperatorSocket(string(s)) if err != nil || v2DispersalPort == "" { return "" } return fmt.Sprintf("%s:%s", ip, v2DispersalPort) } -func (s OperatorSocket) GetRetrievalSocket() string { - ip, _, retrievalPort, _, err := ParseOperatorSocket(string(s)) +func (s OperatorSocket) GetV1RetrievalSocket() string { + ip, _, v1retrievalPort, _, _, err := ParseOperatorSocket(string(s)) if err != nil { return "" } - return fmt.Sprintf("%s:%s", ip, retrievalPort) + return fmt.Sprintf("%s:%s", ip, v1retrievalPort) +} + +func (s OperatorSocket) GetV2RetrievalSocket() string { + ip, _, _, _, v2RetrievalPort, err := ParseOperatorSocket(string(s)) + if err != nil || v2RetrievalPort == "" { + return "" + } + return fmt.Sprintf("%s:%s", ip, v2RetrievalPort) } diff --git a/core/serialization_test.go b/core/serialization_test.go index 727933d150..45592b37aa 100644 --- a/core/serialization_test.go +++ b/core/serialization_test.go @@ -195,36 +195,37 @@ func TestHashPubKeyG1(t *testing.T) { } func TestParseOperatorSocket(t *testing.T) { - operatorSocket := "localhost:1234;5678;9999" - host, dispersalPort, retrievalPort, v2DispersalPort, err := core.ParseOperatorSocket(operatorSocket) + operatorSocket := "localhost:1234;5678;9999;10001" + host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err := core.ParseOperatorSocket(operatorSocket) assert.NoError(t, err) assert.Equal(t, "localhost", host) - assert.Equal(t, "1234", dispersalPort) - assert.Equal(t, "5678", retrievalPort) + assert.Equal(t, "1234", v1DispersalPort) + assert.Equal(t, "5678", v1RetrievalPort) assert.Equal(t, "9999", v2DispersalPort) + assert.Equal(t, "10001", v2RetrievalPort) - host, dispersalPort, retrievalPort, v2DispersalPort, err = core.ParseOperatorSocket("localhost:1234;5678") + host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, _, err = core.ParseOperatorSocket("localhost:1234;5678") assert.NoError(t, err) assert.Equal(t, "localhost", host) - assert.Equal(t, "1234", dispersalPort) - assert.Equal(t, "5678", retrievalPort) + assert.Equal(t, "1234", v1DispersalPort) + assert.Equal(t, "5678", v1RetrievalPort) assert.Equal(t, "", v2DispersalPort) - _, _, _, _, err = core.ParseOperatorSocket("localhost;1234;5678") + _, _, _, _, _, err = core.ParseOperatorSocket("localhost;1234;5678") assert.NotNil(t, err) - assert.ErrorContains(t, err, "invalid socket address format") + assert.ErrorContains(t, err, "invalid host address format") - _, _, _, _, err = core.ParseOperatorSocket("localhost:12345678") + _, _, _, _, _, err = core.ParseOperatorSocket("localhost:12345678") assert.NotNil(t, err) - assert.ErrorContains(t, err, "invalid socket address format") + assert.ErrorContains(t, err, "invalid v1 dispersal port format") - _, _, _, _, err = core.ParseOperatorSocket("localhost1234;5678") + _, _, _, _, _, err = core.ParseOperatorSocket("localhost1234;5678") assert.NotNil(t, err) - assert.ErrorContains(t, err, "invalid socket address format") + assert.ErrorContains(t, err, "invalid host address format") } func TestGetV1DispersalSocket(t *testing.T) { - operatorSocket := core.OperatorSocket("localhost:1234;5678;9999") + operatorSocket := core.OperatorSocket("localhost:1234;5678;9999;1025") socket := operatorSocket.GetV1DispersalSocket() assert.Equal(t, "localhost:1234", socket) @@ -234,28 +235,84 @@ func TestGetV1DispersalSocket(t *testing.T) { operatorSocket = core.OperatorSocket("localhost:1234;5678;") socket = operatorSocket.GetV1DispersalSocket() - assert.Equal(t, "localhost:1234", socket) + assert.Equal(t, "", socket) operatorSocket = core.OperatorSocket("localhost:1234") socket = operatorSocket.GetV1DispersalSocket() assert.Equal(t, "", socket) } -func TestGetRetrievalSocket(t *testing.T) { - operatorSocket := core.OperatorSocket("localhost:1234;5678;9999") - socket := operatorSocket.GetRetrievalSocket() +func TestGetV1RetrievalSocket(t *testing.T) { + // Valid v1/v2 socket + operatorSocket := core.OperatorSocket("localhost:1234;5678;9999;10001") + socket := operatorSocket.GetV1RetrievalSocket() assert.Equal(t, "localhost:5678", socket) + // Valid v1 socket operatorSocket = core.OperatorSocket("localhost:1234;5678") - socket = operatorSocket.GetRetrievalSocket() + socket = operatorSocket.GetV1RetrievalSocket() assert.Equal(t, "localhost:5678", socket) + // Invalid socket testcases + operatorSocket = core.OperatorSocket("localhost:1234;5678;9999;10001;") + socket = operatorSocket.GetV1RetrievalSocket() + assert.Equal(t, "", socket) + operatorSocket = core.OperatorSocket("localhost:1234;5678;") - socket = operatorSocket.GetRetrievalSocket() - assert.Equal(t, "localhost:5678", socket) + socket = operatorSocket.GetV1RetrievalSocket() + assert.Equal(t, "", socket) + + operatorSocket = core.OperatorSocket("localhost:;1234;5678;") + socket = operatorSocket.GetV1RetrievalSocket() + assert.Equal(t, "", socket) + + operatorSocket = core.OperatorSocket("localhost:1234;:;5678;") + socket = operatorSocket.GetV1RetrievalSocket() + assert.Equal(t, "", socket) + + operatorSocket = core.OperatorSocket("localhost:;;;") + socket = operatorSocket.GetV1RetrievalSocket() + assert.Equal(t, "", socket) + + operatorSocket = core.OperatorSocket("localhost:1234") + socket = operatorSocket.GetV1RetrievalSocket() + assert.Equal(t, "", socket) +} + +func TestGetV2RetrievalSocket(t *testing.T) { + // Valid v1/v2 socket + operatorSocket := core.OperatorSocket("localhost:1234;5678;9999;10001") + socket := operatorSocket.GetV2RetrievalSocket() + assert.Equal(t, "localhost:10001", socket) + + // Invalid v2 socket + operatorSocket = core.OperatorSocket("localhost:1234;5678") + socket = operatorSocket.GetV2RetrievalSocket() + assert.Equal(t, "", socket) + + // Invalid socket testcases + operatorSocket = core.OperatorSocket("localhost:1234;5678;9999;10001;") + socket = operatorSocket.GetV2RetrievalSocket() + assert.Equal(t, "", socket) + + operatorSocket = core.OperatorSocket("localhost:1234;5678;") + socket = operatorSocket.GetV2RetrievalSocket() + assert.Equal(t, "", socket) + + operatorSocket = core.OperatorSocket("localhost:;1234;5678;") + socket = operatorSocket.GetV2RetrievalSocket() + assert.Equal(t, "", socket) + + operatorSocket = core.OperatorSocket("localhost:1234;:;5678;") + socket = operatorSocket.GetV2RetrievalSocket() + assert.Equal(t, "", socket) + + operatorSocket = core.OperatorSocket("localhost:;;;") + socket = operatorSocket.GetV2RetrievalSocket() + assert.Equal(t, "", socket) operatorSocket = core.OperatorSocket("localhost:1234") - socket = operatorSocket.GetRetrievalSocket() + socket = operatorSocket.GetV2RetrievalSocket() assert.Equal(t, "", socket) } diff --git a/core/state.go b/core/state.go index 6993fca175..1e02bad95e 100644 --- a/core/state.go +++ b/core/state.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "math/big" + "net" "slices" "strings" ) @@ -19,48 +20,66 @@ func (s OperatorSocket) String() string { return string(s) } -func MakeOperatorSocket(nodeIP, dispersalPort, retrievalPort, v2DispersalPort string) OperatorSocket { - if v2DispersalPort == "" { +func MakeOperatorSocket(nodeIP, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort string) OperatorSocket { + //TODO: Add config checks for invalid v1/v2 configs -- for v1 both v2 ports must be empty and for v2 both ports must be valid, reject any other combinations + if v2DispersalPort == "" && v2RetrievalPort == "" { return OperatorSocket(fmt.Sprintf("%s:%s;%s", nodeIP, dispersalPort, retrievalPort)) } - return OperatorSocket(fmt.Sprintf("%s:%s;%s;%s", nodeIP, dispersalPort, retrievalPort, v2DispersalPort)) + return OperatorSocket(fmt.Sprintf("%s:%s;%s;%s;%s", nodeIP, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort)) } type StakeAmount = *big.Int -func ParseOperatorSocket(socket string) (host string, dispersalPort string, retrievalPort string, v2DispersalPort string, err error) { - s := strings.Split(socket, ";") +func ParseOperatorSocket(socket string) (host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort string, err error) { - if len(s) == 2 { - // no v2 dispersal port - retrievalPort = s[1] - s = strings.Split(s[0], ":") - if len(s) != 2 { - err = fmt.Errorf("invalid socket address format: %s", socket) - return - } - host = s[0] - dispersalPort = s[1] + s := strings.Split(socket, ";") + host, v1DispersalPort, err = net.SplitHostPort(s[0]) + if _, err = net.LookupHost(host); err != nil { + //Invalid host + host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = + "", "", "", "", "", + fmt.Errorf("invalid host address format in %s: it must specify valid IP or host name (ex. 0.0.0.0:32004;32005;32006;32007)", socket) + return + } + if err = ValidatePort(v1DispersalPort); err != nil { + host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = + "", "", "", "", "", + fmt.Errorf("invalid v1 dispersal port format in %s: it must specify valid v1 dispersal port (ex. 0.0.0.0:32004;32005;32006;32007)", socket) return } - if len(s) == 3 { - // all ports specified + switch len(s) { + case 4: v2DispersalPort = s[2] - retrievalPort = s[1] + if err = ValidatePort(v2DispersalPort); err != nil { + host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = + "", "", "", "", "", + fmt.Errorf("invalid v2 dispersal port format in %s: it must specify valid v2 dispersal port (ex. 0.0.0.0:32004;32005;32006;32007)", socket) + } - s = strings.Split(s[0], ":") - if len(s) != 2 { - err = fmt.Errorf("invalid socket address format: %s", socket) - return + v2RetrievalPort = s[3] + if err = ValidatePort(v2RetrievalPort); err != nil { + host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = + "", "", "", "", "", + fmt.Errorf("invalid v2 retrieval port format in %s: it must specify valid v2 retrieval port (ex. 0.0.0.0:32004;32005;32006;32007)", socket) + } + fallthrough + case 2: + // V1 Parsing + v1RetrievalPort = s[1] + if err = ValidatePort(v1RetrievalPort); err != nil { + host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = + "", "", "", "", "", + fmt.Errorf("invalid v1 retrieval port format in %s: it must specify valid v1 retrieval port (ex. 0.0.0.0:32004;32005;32006;32007)", socket) } - host = s[0] - dispersalPort = s[1] + return + default: + host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = + "", "", "", "", "", + fmt.Errorf("invalid socket address format %s: it must specify v1 dispersal/retrieval ports, or v2 dispersal/retrieval ports (ex. 0.0.0.0:32004;32005;32006;32007)", socket) return } - - return "", "", "", "", fmt.Errorf("invalid socket address format %s: it must specify dispersal port, retrieval port, and/or v2 dispersal port (ex. 0.0.0.0:32004;32005;32006)", socket) } // OperatorInfo contains information about an operator which is stored on the blockchain state, diff --git a/core/utils.go b/core/utils.go index 479e61a872..f2254067a2 100644 --- a/core/utils.go +++ b/core/utils.go @@ -1,8 +1,10 @@ package core import ( + "fmt" "math" "math/big" + "strconv" "golang.org/x/exp/constraints" ) @@ -23,3 +25,15 @@ func NextPowerOf2[T constraints.Integer](d T) T { nextPower := math.Ceil(math.Log2(float64(d))) return T(math.Pow(2.0, nextPower)) } + +func ValidatePort(portStr string) error { + port, err := strconv.Atoi(portStr) + if err != nil { + return fmt.Errorf("port is not a valid number: %v", err) + } + + if port < 1 || port > 65535 { + return fmt.Errorf("port number out of valid range (1-65535)") + } + return err +} diff --git a/disperser/common/semver/semver.go b/disperser/common/semver/semver.go index 2e7c80b3e1..b3720361b1 100644 --- a/disperser/common/semver/semver.go +++ b/disperser/common/semver/semver.go @@ -31,7 +31,7 @@ func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, oper operatorSocket := core.OperatorSocket(operators[operatorId].Socket) var socket string if useRetrievalSocket { - socket = operatorSocket.GetRetrievalSocket() + socket = operatorSocket.GetV1RetrievalSocket() } else { socket = operatorSocket.GetV1DispersalSocket() } diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index c584d55d08..31c9396268 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -148,7 +148,7 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, for opID, op := range state.IndexedOperators { opID := opID op := op - host, _, _, v2DispersalPort, err := core.ParseOperatorSocket(op.Socket) + host, _, _, v2DispersalPort, _, err := core.ParseOperatorSocket(op.Socket) if err != nil { return nil, nil, fmt.Errorf("failed to parse operator socket (%s): %w", op.Socket, err) } diff --git a/disperser/dataapi/operator_handler.go b/disperser/dataapi/operator_handler.go index 94f70cd3a1..47927db7f4 100644 --- a/disperser/dataapi/operator_handler.go +++ b/disperser/dataapi/operator_handler.go @@ -92,11 +92,11 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st } operatorSocket := core.OperatorSocket(operatorInfo.Socket) - retrievalSocket := operatorSocket.GetRetrievalSocket() - retrievalPortOpen := checkIsOperatorPortOpen(retrievalSocket, 3, oh.logger) - retrievalOnline, retrievalStatus := false, "port closed or unreachable" - if retrievalPortOpen { - retrievalOnline, retrievalStatus = checkServiceOnline(ctx, "node.Retrieval", retrievalSocket, 3*time.Second) + v1RetrievalSocket := operatorSocket.GetV1RetrievalSocket() + v1RetrievalPortOpen := checkIsOperatorPortOpen(v1RetrievalSocket, 3, oh.logger) + v1RetrievalOnline, v1RetrievalStatus := false, "port closed or unreachable" + if v1RetrievalPortOpen { + v1RetrievalOnline, v1RetrievalStatus = checkServiceOnline(ctx, "node.Retrieval", v1RetrievalSocket, 3*time.Second) } v1DispersalSocket := operatorSocket.GetV1DispersalSocket() @@ -119,6 +119,19 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st } } + v2RetrievalOnline, v2RetrievalStatus := false, "" + v2RetrievalSocket := operatorSocket.GetV2RetrievalSocket() + if v2RetrievalSocket == "" { + v2RetrievalStatus = "v2 retrieval port is not registered" + } else { + v2RetrievalPortOpen := checkIsOperatorPortOpen(v2RetrievalSocket, 3, oh.logger) + if !v2RetrievalPortOpen { + v2RetrievalStatus = "port closed or unreachable" + } else { + v2RetrievalOnline, v2RetrievalStatus = checkServiceOnline(ctx, "node.v2.Retrieval", v2RetrievalSocket, 3*time.Second) + } + } + // Create the metadata regardless of online status portCheckResponse := &OperatorPortCheckResponse{ OperatorId: operatorId, @@ -128,9 +141,12 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st V2DispersalSocket: v2DispersalSocket, V2DispersalOnline: v2DispersalOnline, V2DispersalStatus: v2DispersalStatus, - RetrievalSocket: retrievalSocket, - RetrievalOnline: retrievalOnline, - RetrievalStatus: retrievalStatus, + RetrievalSocket: v1RetrievalSocket, + RetrievalOnline: v1RetrievalOnline, + RetrievalStatus: v1RetrievalStatus, + V2RetrievalSocket: v2RetrievalSocket, + V2RetrievalOnline: v2RetrievalOnline, + V2RetrievalStatus: v2RetrievalStatus, } // Log the online status diff --git a/disperser/dataapi/queried_operators_handlers.go b/disperser/dataapi/queried_operators_handlers.go index f905328e4c..b282f0ff10 100644 --- a/disperser/dataapi/queried_operators_handlers.go +++ b/disperser/dataapi/queried_operators_handlers.go @@ -198,7 +198,7 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat var isOnline bool var socket string if operatorStatus.IndexedOperatorInfo != nil { - socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetRetrievalSocket() + socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetV1RetrievalSocket() isOnline = checkIsOperatorPortOpen(socket, 10, logger) } diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index c28a6ead7f..2875699061 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -185,6 +185,9 @@ type ( V2DispersalSocket string `json:"v2_dispersal_socket"` V2DispersalOnline bool `json:"v2_dispersal_online"` V2DispersalStatus string `json:"v2_dispersal_status"` + V2RetrievalSocket string `json:"v2_retrieval_socket"` + V2RetrievalOnline bool `json:"v2_retrieval_online"` + V2RetrievalStatus string `json:"v2_retrieval_status"` } SemverReportResponse struct { Semver map[string]*semver.SemverMetrics `json:"semver"` diff --git a/disperser/dataapi/subgraph_client_test.go b/disperser/dataapi/subgraph_client_test.go index 9f7e4a3322..4b4e81cf3b 100644 --- a/disperser/dataapi/subgraph_client_test.go +++ b/disperser/dataapi/subgraph_client_test.go @@ -330,7 +330,7 @@ var ( }, SocketUpdates: []subgraph.SocketUpdates{ { - Socket: "localhost:32008;32009;32010", + Socket: "localhost:32008;32009;32010;32011", }, }, } diff --git a/go.mod b/go.mod index 46ecc44b2b..43ce8f8edf 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/kms v1.31.0 github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6 github.com/consensys/gnark-crypto v0.12.1 + github.com/docker/go-units v0.5.0 github.com/emirpasic/gods v1.18.1 github.com/ethereum/go-ethereum v1.14.8 github.com/fxamacker/cbor/v2 v2.5.0 @@ -85,7 +86,6 @@ require ( github.com/docker/cli v25.0.3+incompatible // indirect github.com/docker/docker v25.0.6+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect - github.com/docker/go-units v0.5.0 // indirect github.com/ethereum/c-kzg-4844 v1.0.0 // indirect github.com/ethereum/go-verkle v0.1.1-0.20240306133620-7d920df305f0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index 299c99321b..37cfe72bbb 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -385,7 +385,7 @@ func (env *Config) generateRelayVars(ind int, graphUrl, grpcPort string) RelayVa } // Generates DA node .env -func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, metricsPort, nodeApiPort string) OperatorVars { +func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort, metricsPort, nodeApiPort string) OperatorVars { max, _ := new(big.Int).SetString("21888242871839275222246405745257275088548364400416034343698204186575808495617", 10) // max.Exp(big.NewInt(2), big.NewInt(130), nil).Sub(max, big.NewInt(1)) @@ -412,6 +412,7 @@ func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, NODE_INTERNAL_DISPERSAL_PORT: dispersalPort, NODE_INTERNAL_RETRIEVAL_PORT: retrievalPort, NODE_V2_DISPERSAL_PORT: v2DispersalPort, + NODE_V2_RETRIEVAL_PORT: v2RetrievalPort, NODE_ENABLE_METRICS: "true", NODE_METRICS_PORT: metricsPort, NODE_ENABLE_NODE_API: "true", @@ -653,8 +654,9 @@ func (env *Config) GenerateAllVariables() { dispersalPort := fmt.Sprint(port + 2) retrievalPort := fmt.Sprint(port + 3) v2DispersalPort := fmt.Sprint(port + 4) - nodeApiPort := fmt.Sprint(port + 5) - port += 6 + v2RetrievalPort := fmt.Sprint(port + 5) + nodeApiPort := fmt.Sprint(port + 6) + port += 7 name := fmt.Sprintf("opr%v", i) logPath, dbPath, filename, envFile := env.getPaths(name) @@ -662,7 +664,7 @@ func (env *Config) GenerateAllVariables() { // Convert key to address - operatorConfig := env.generateOperatorVars(i, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, fmt.Sprint(metricsPort), nodeApiPort) + operatorConfig := env.generateOperatorVars(i, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort, fmt.Sprint(metricsPort), nodeApiPort) writeEnv(operatorConfig.getEnvMap(), envFile) env.Operators = append(env.Operators, operatorConfig) diff --git a/inabox/deploy/deploy.go b/inabox/deploy/deploy.go index db492e11e3..3e0c2a0e48 100644 --- a/inabox/deploy/deploy.go +++ b/inabox/deploy/deploy.go @@ -365,7 +365,7 @@ func (env *Config) StopAnvil() { func (env *Config) RunNodePluginBinary(operation string, operator OperatorVars) { changeDirectory(filepath.Join(env.rootPath, "inabox")) - socket := string(core.MakeOperatorSocket(operator.NODE_HOSTNAME, operator.NODE_DISPERSAL_PORT, operator.NODE_RETRIEVAL_PORT, operator.NODE_V2_DISPERSAL_PORT)) + socket := string(core.MakeOperatorSocket(operator.NODE_HOSTNAME, operator.NODE_DISPERSAL_PORT, operator.NODE_RETRIEVAL_PORT, operator.NODE_V2_DISPERSAL_PORT, operator.NODE_V2_RETRIEVAL_PORT)) envVars := []string{ "NODE_OPERATION=" + operation, diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index ea2f94c694..252f27ded8 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -346,6 +346,8 @@ type OperatorVars struct { NODE_V2_DISPERSAL_PORT string + NODE_V2_RETRIEVAL_PORT string + NODE_ENABLE_METRICS string NODE_METRICS_PORT string diff --git a/node/config.go b/node/config.go index c1580fa52c..8afadcdd57 100644 --- a/node/config.go +++ b/node/config.go @@ -49,6 +49,7 @@ type Config struct { InternalRetrievalPort string InternalDispersalPort string V2DispersalPort string + V2RetrievalPort string EnableNodeApi bool NodeApiPort string EnableMetrics bool @@ -233,13 +234,13 @@ func NewConfig(ctx *cli.Context) (*Config, error) { // check if the ports are valid integers dispersalPort := ctx.GlobalString(flags.DispersalPortFlag.Name) - _, err = strconv.Atoi(dispersalPort) + err = core.ValidatePort(dispersalPort) if err != nil { return nil, fmt.Errorf("invalid dispersal port: %s", dispersalPort) } retrievalPort := ctx.GlobalString(flags.RetrievalPortFlag.Name) - _, err = strconv.Atoi(retrievalPort) + err = core.ValidatePort(retrievalPort) if err != nil { return nil, fmt.Errorf("invalid retrieval port: %s", retrievalPort) } @@ -252,12 +253,25 @@ func NewConfig(ctx *cli.Context) (*Config, error) { return nil, fmt.Errorf("v2 dispersal port (NODE_V2_DISPERSAL_PORT) must be specified if v2 is enabled") } } else { - _, err = strconv.Atoi(v2DispersalPort) + // TODO: should we mandate v2Enabled == true + err = core.ValidatePort(v2DispersalPort) if err != nil { return nil, fmt.Errorf("invalid v2 dispersal port: %s", v2DispersalPort) } } + v2RetrievalPort := ctx.GlobalString(flags.V2RetrievalPortFlag.Name) + if v2RetrievalPort == "" { + if v2Enabled { + return nil, fmt.Errorf("v2 retrieval port (NODE_V2_RETRIEVAL_PORT) must be specified if v2 is enabled") + } + } else { + err = core.ValidatePort(v2RetrievalPort) + if err != nil { + return nil, fmt.Errorf("invalid v2 retrieval port: %s", v2RetrievalPort) + } + } + return &Config{ Hostname: ctx.GlobalString(flags.HostnameFlag.Name), DispersalPort: dispersalPort, @@ -265,6 +279,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { InternalDispersalPort: internalDispersalFlag, InternalRetrievalPort: internalRetrievalFlag, V2DispersalPort: v2DispersalPort, + V2RetrievalPort: v2RetrievalPort, EnableNodeApi: ctx.GlobalBool(flags.EnableNodeApiFlag.Name), NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name), diff --git a/node/flags/flags.go b/node/flags/flags.go index da61e9aaff..40340f40d7 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -54,6 +54,12 @@ var ( Required: true, EnvVar: common.PrefixEnvVar(EnvVarPrefix, "V2_DISPERSAL_PORT"), } + V2RetrievalPortFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "v2-retrieval-port"), + Usage: "Port at which node registers to listen for v2 retrieval calls", + Required: true, + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "V2_RETRIEVAL_PORT"), + } EnableNodeApiFlag = cli.BoolFlag{ Name: common.PrefixFlag(FlagPrefix, "enable-node-api"), Usage: "enable node-api to serve eigenlayer-cli node-api calls", @@ -438,6 +444,7 @@ var optionalFlags = []cli.Flag{ BLSSignerAPIKeyFlag, EnableV2Flag, V2DispersalPortFlag, + V2RetrievalPortFlag, OnchainStateRefreshIntervalFlag, ChunkDownloadTimeoutFlag, GRPCMsgSizeLimitV2Flag, diff --git a/node/grpc/run.go b/node/grpc/run.go index dd888a7284..f9fcd7fc4d 100644 --- a/node/grpc/run.go +++ b/node/grpc/run.go @@ -54,7 +54,7 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge // V2 dispersal service go func() { if !config.EnableV2 { - logger.Warn("V2 is not enabled, skipping V2 server startup") + logger.Warn("V2 is not enabled, skipping V2 dispersal server startup") return } for { @@ -109,5 +109,34 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge } }() + go func() { + if !config.EnableV2 { + logger.Warn("V2 is not enabled, skipping V2 retrieval server startup") + return + } + for { + addr := fmt.Sprintf("%s:%s", localhost, config.V2RetrievalPort) + listener, err := net.Listen("tcp", addr) + if err != nil { + logger.Fatalf("Could not start tcp listener: %v", err) + } + opt := grpc.MaxRecvMsgSize(config.GRPCMsgSizeLimitV2) + gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption()) + + // Register reflection service on gRPC server + // This makes "grpcurl -plaintext localhost:9000 list" command work + reflection.Register(gs) + + validator.RegisterRetrievalServer(gs, serverV2) + + healthcheck.RegisterHealthServer("node.v2.Retrieval", gs) + + logger.Info("port", config.V2RetrievalPort, "address", listener.Addr().String(), "GRPC Listening") + if err := gs.Serve(listener); err != nil { + logger.Error("retrieval v2 server failed; restarting.", "err", err) + } + } + }() + return nil } diff --git a/node/node.go b/node/node.go index fa1f8106ad..abc1829dd1 100644 --- a/node/node.go +++ b/node/node.go @@ -184,7 +184,7 @@ func NewNode( } nodeLogger.Info("Creating node", "chainID", chainID.String(), "operatorID", config.ID.Hex(), - "dispersalPort", config.DispersalPort, "v2DispersalPort", config.V2DispersalPort, "retrievalPort", config.RetrievalPort, "churnerUrl", config.ChurnerUrl, + "dispersalPort", config.DispersalPort, "v2DispersalPort", config.V2DispersalPort, "retrievalPort", config.RetrievalPort, "v2RetrievalPort", config.V2RetrievalPort, "churnerUrl", config.ChurnerUrl, "quorumIDs", fmt.Sprint(config.QuorumIDList), "registerNodeAtStart", config.RegisterNodeAtStart, "pubIPCheckInterval", config.PubIPCheckInterval, "eigenDAServiceManagerAddr", config.EigenDAServiceManagerAddr, "blockStaleMeasure", blockStaleMeasure, "storeDurationBlocks", storeDurationBlocks, "enableGnarkBundleEncoding", config.EnableGnarkBundleEncoding) @@ -286,12 +286,12 @@ func (n *Node) Start(ctx context.Context) error { } // Build the socket based on the hostname/IP provided in the CLI - socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort)) + socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort, n.Config.V2RetrievalPort)) var operator *Operator if n.Config.RegisterNodeAtStart { n.Logger.Info("Registering node on chain with the following parameters:", "operatorId", n.Config.ID.Hex(), "hostname", n.Config.Hostname, "dispersalPort", n.Config.DispersalPort, "v2DispersalPort", n.Config.V2DispersalPort, - "retrievalPort", n.Config.RetrievalPort, "churnerUrl", n.Config.ChurnerUrl, "quorumIds", fmt.Sprint(n.Config.QuorumIDList)) + "retrievalPort", n.Config.RetrievalPort, "v2RetrievalPort", n.Config.V2RetrievalPort, "churnerUrl", n.Config.ChurnerUrl, "quorumIds", fmt.Sprint(n.Config.QuorumIDList)) privateKey, err := crypto.HexToECDSA(n.Config.EthClientConfig.PrivateKeyString) if err != nil { return fmt.Errorf("NewClient: cannot parse private key: %w", err) @@ -649,7 +649,7 @@ func (n *Node) checkCurrentNodeIp(ctx context.Context) { case <-ctx.Done(): return case <-t.C: - newSocketAddr, err := SocketAddress(ctx, n.PubIPProvider, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort) + newSocketAddr, err := SocketAddress(ctx, n.PubIPProvider, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort, n.Config.V2RetrievalPort) if err != nil { n.Logger.Error("failed to get socket address", "err", err) continue diff --git a/node/plugin/cmd/main.go b/node/plugin/cmd/main.go index 5d58c1ee17..f1619d7200 100644 --- a/node/plugin/cmd/main.go +++ b/node/plugin/cmd/main.go @@ -135,7 +135,7 @@ func pluginOps(ctx *cli.Context) { return } - _, dispersalPort, retrievalPort, v2DispersalPort, err := core.ParseOperatorSocket(config.Socket) + _, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort, err := core.ParseOperatorSocket(config.Socket) if err != nil { log.Printf("Error: failed to parse operator socket: %v", err) return @@ -144,7 +144,7 @@ func pluginOps(ctx *cli.Context) { socket := config.Socket if isLocalhost(socket) { pubIPProvider := pubip.ProviderOrDefault(logger, config.PubIPProvider) - socket, err = node.SocketAddress(context.Background(), pubIPProvider, dispersalPort, retrievalPort, v2DispersalPort) + socket, err = node.SocketAddress(context.Background(), pubIPProvider, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort) if err != nil { log.Printf("Error: failed to get socket address from ip provider: %v", err) return diff --git a/node/utils.go b/node/utils.go index ce842e6936..394ba4115b 100644 --- a/node/utils.go +++ b/node/utils.go @@ -121,12 +121,12 @@ func ValidatePointsFromBlobHeader(h *pb.BlobHeader) error { return nil } -func SocketAddress(ctx context.Context, provider pubip.Provider, dispersalPort, retrievalPort, v2DispersalPort string) (string, error) { +func SocketAddress(ctx context.Context, provider pubip.Provider, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort string) (string, error) { ip, err := provider.PublicIPAddress(ctx) if err != nil { return "", fmt.Errorf("failed to get public ip address from IP provider: %w", err) } - socket := core.MakeOperatorSocket(ip, dispersalPort, retrievalPort, v2DispersalPort) + socket := core.MakeOperatorSocket(ip, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort) return socket.String(), nil } diff --git a/test/integration_test.go b/test/integration_test.go index 1299fec53a..aa8de6e67c 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -359,6 +359,7 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging InternalRetrievalPort: op.RetrievalPort, InternalDispersalPort: op.DispersalPort, V2DispersalPort: op.V2DispersalPort, + V2RetrievalPort: op.V2RetrievalPort, EnableMetrics: false, Timeout: 10, ExpirationPollIntervalSec: 10, @@ -383,7 +384,7 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging tx.On("GetBlockStaleMeasure").Return(nil) tx.On("GetStoreDurationBlocks").Return(nil) tx.On("OperatorIDToAddress").Return(gethcommon.Address{1}, nil) - socket := core.MakeOperatorSocket(config.Hostname, config.DispersalPort, config.RetrievalPort, config.V2DispersalPort) + socket := core.MakeOperatorSocket(config.Hostname, config.DispersalPort, config.RetrievalPort, config.V2DispersalPort, config.V2RetrievalPort) tx.On("GetOperatorSocket", mock.Anything, mock.Anything).Return(socket.String(), nil) noopMetrics := metrics.NewNoopMetrics()