Skip to content

Commit

Permalink
Merge pull request #14 from decentraland/feat/add-more-endpoints
Browse files Browse the repository at this point in the history
feat: add endpoints for stop and promote
  • Loading branch information
juanmahidalgo authored Nov 25, 2024
2 parents 0a6ed07 + 96c493a commit 4c94ce0
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 74 deletions.
48 changes: 46 additions & 2 deletions src/controllers/handlers/squid-handler.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,58 @@
import { HandlerContextWithPath } from '../../types'
import { isErrorWithMessage } from '../../logic/errors'
import { HandlerContextWithPath, StatusCode } from '../../types'

// handlers arguments only type what they need, to make unit testing easier
export async function squidsHandler(context: Pick<HandlerContextWithPath<'squids', '/squids/list'>, 'url' | 'components'>) {
export async function listSquidsHandler(context: Pick<HandlerContextWithPath<'squids', '/squids/list'>, 'components'>) {
const {
components: { squids }
} = context

const instances = await squids.list()

return {
status: StatusCode.OK,
body: instances
}
}

export async function stopSquidHandler(context: Pick<HandlerContextWithPath<'squids', '/squids/:id/stop'>, 'params' | 'components'>) {
const {
components: { squids },
params: { id }
} = context

try {
await squids.downgrade(id)
return {
status: StatusCode.OK
}
} catch (e) {
return {
status: StatusCode.BAD_REQUEST,
body: {
message: isErrorWithMessage(e) ? e.message : 'Could not stop squid'
}
}
}
}

export async function promoteSquidHandler(context: Pick<HandlerContextWithPath<'squids', '/squids/:id/promote'>, 'params' | 'components'>) {
const {
components: { squids },
params: { id }
} = context

try {
await squids.promote(id)
return {
status: StatusCode.OK
}
} catch (e) {
return {
status: StatusCode.BAD_REQUEST,
body: {
message: isErrorWithMessage(e) ? e.message : 'Could not promote squid'
}
}
}
}
6 changes: 4 additions & 2 deletions src/controllers/routes.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { Router } from '@well-known-components/http-server'
import { GlobalContext } from '../types'
import { squidsHandler } from './handlers/squid-handler'
import { listSquidsHandler, promoteSquidHandler, stopSquidHandler } from './handlers/squid-handler'

// We return the entire router because it will be easier to test than a whole server
export async function setupRouter(): Promise<Router<GlobalContext>> {
const router = new Router<GlobalContext>()

router.get('/list', squidsHandler)
router.get('/list', listSquidsHandler)
router.put('/:id/promote', promoteSquidHandler)
router.put('/:id/stop', stopSquidHandler)

return router
}
3 changes: 3 additions & 0 deletions src/logic/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function isErrorWithMessage(error: unknown): error is Error {
return error !== undefined && error !== null && typeof error === 'object' && 'message' in error
}
151 changes: 87 additions & 64 deletions src/ports/squids/component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import {
import { IConfigComponent, IFetchComponent } from '@well-known-components/interfaces'
import { IPgComponent } from '@well-known-components/pg-component'
import { Network } from '@dcl/schemas'
import { getPromoteQuery } from './queries'
import { getActiveSchemaQuery, getPromoteQuery, getSchemaByServiceNameQuery } from './queries'
import { ISquidComponent, Squid, SquidMetric } from './types'
import { getMetricValue, getSquidsNetworksMapping } from './utils'
import { getMetricValue, getProjectNameFromService, getSquidsNetworksMapping } from './utils'

const AWS_REGION = 'us-east-1'

Expand Down Expand Up @@ -41,82 +41,103 @@ export async function createSubsquidComponent({
const serviceArns = servicesResponse.serviceArns || []
const squidServices = serviceArns.filter(arn => arn.includes('-squid-server'))

// Step 2: Get tasks for each service and fetch task IPs
const results: Squid[] = []

// Step 2: Describe services in parallel
const describeServicesCommand = new DescribeServicesCommand({
cluster,
services: squidServices
})

const describeServicesResponse = await client.send(describeServicesCommand)
const services = describeServicesResponse.services || []

// Process all services in parallel
const results = await Promise.all(
services.map(async squidService => {
const serviceName = squidService.serviceName
const listTasksCommand = new ListTasksCommand({
cluster,
serviceName
})
const taskResponse = await client.send(listTasksCommand)
const taskArns = taskResponse.taskArns || []

if (taskArns.length === 0) return null

const describeTasksCommand = new DescribeTasksCommand({
cluster,
tasks: taskArns
})
const describeResponse = await client.send(describeTasksCommand)
const tasks = describeResponse.tasks || []

const squidName = squidService.serviceName || ''
const schemaName = (await dappsDatabase.query(getSchemaByServiceNameQuery(squidName))).rows[0]?.schema
const projectActiveSchema = (await dappsDatabase.query(getActiveSchemaQuery(squidName))).rows[0]?.schema

const squid: Partial<Squid> = {
name: squidName,
service_name: squidService.serviceName || '',
schema_name: schemaName,
project_active_schema: projectActiveSchema
}

for (const squidService of describeServicesResponse.services || []) {
const serviceName = squidService.serviceName
const listTasksCommand = new ListTasksCommand({
cluster,
serviceName
})
const taskResponse = await client.send(listTasksCommand)
const taskArns = taskResponse.taskArns || []

if (taskArns.length === 0) continue

const describeTasksCommand = new DescribeTasksCommand({
cluster,
tasks: taskArns
})
const describeResponse = await client.send(describeTasksCommand)
const tasks = describeResponse.tasks || []

const squid: Partial<Squid> = {
name: squidService.serviceName || '',
service_name: squidService.serviceName || '',
metrics: {} as Record<Network.ETHEREUM | Network.MATIC, SquidMetric>
}

// there should be just one task per service
for (const task of tasks) {
squid.version = task.version || 0
squid.created_at = task.createdAt
squid.health_status = task.healthStatus
squid.service_status = task.lastStatus

const ip = task.attachments
?.find(att => att.type === 'ElasticNetworkInterface')
?.details?.find(detail => detail.name === 'privateIPv4Address')?.value

if (!ip) continue

// Step 3: Fetch /metrics from each IP for each network
try {
for (const network of getSquidsNetworksMapping()) {
const response = await fetch.fetch(`http://${ip}:${network.port}/metrics`)
const text = await response.text() // Use text() since the response is plain text

if (!squid.metrics) {
squid.metrics = {} as Record<Network.ETHEREUM | Network.MATIC, SquidMetric>
}

squid.metrics[network.name] = {
sqd_processor_sync_eta_seconds: getMetricValue(text, 'sqd_processor_sync_eta_seconds'),
sqd_processor_mapping_blocks_per_second: getMetricValue(text, 'sqd_processor_mapping_blocks_per_second'),
sqd_processor_last_block: getMetricValue(text, 'sqd_processor_last_block'),
sqd_processor_chain_height: getMetricValue(text, 'sqd_processor_chain_height')
// there should be just one task per service
for (const task of tasks) {
squid.version = task.version || 0
squid.created_at = task.createdAt
squid.health_status = task.healthStatus
squid.service_status = task.lastStatus

const ElasticNetworkInterface = 'ElasticNetworkInterface'
const privateIPv4Address = 'privateIPv4Address'

const ip = task.attachments
?.find(att => att.type === ElasticNetworkInterface)
?.details?.find(detail => detail.name === privateIPv4Address)?.value

if (!ip) continue

// Fetch metrics for each network in parallel
try {
const metricsPromises = getSquidsNetworksMapping().map(async network => {
const response = await fetch.fetch(`http://${ip}:${network.port}/metrics`)
const text = await response.text() // Use text() since the response is plain text

return {
networkName: network.name,
metrics: {
sqd_processor_sync_eta_seconds: getMetricValue(text, 'sqd_processor_sync_eta_seconds'),
sqd_processor_mapping_blocks_per_second: getMetricValue(text, 'sqd_processor_mapping_blocks_per_second'),
sqd_processor_last_block: getMetricValue(text, 'sqd_processor_last_block'),
sqd_processor_chain_height: getMetricValue(text, 'sqd_processor_chain_height')
}
}
})

const metricsResults = await Promise.all(metricsPromises)
for (const { networkName, metrics } of metricsResults) {
if (!squid.metrics) {
squid.metrics = {} as Record<Network.ETHEREUM | Network.MATIC, SquidMetric>
}
squid.metrics[networkName] = metrics
}
} catch (error) {
console.error(`Failed to fetch metrics for ${ip}:`, error)
}
} catch (error) {
console.error(`Failed to fetch metrics for ${ip}:`, error)
}

// Only include complete squid objects
if (squid.created_at && squid.health_status && squid.service_status) {
results.push(squid as Squid)
return squid as Squid
} else {
console.warn(`Skipping incomplete squid: ${squid.service_name}`)
return null
}
}
}
})
)

return results
// Filter out null values
return results.filter((squid): squid is Squid => squid !== null)
} catch (error) {
console.error('Error listing services:', error)
return []
Expand All @@ -139,12 +160,14 @@ export async function createSubsquidComponent({

async function promote(serviceName: string): Promise<void> {
try {
const projectName = serviceName.split('-')[0] // e.g: service name is marketplace-squid-server-a-blue-92e812a, project is marketplace
const projectName = getProjectNameFromService(serviceName) // e.g: service name is marketplace-squid-server-a-blue-92e812a, project is marketplace
const schemaName = `squid_${projectName}` // e.g: squid_marketplace
const promoteQuery = getPromoteQuery(serviceName, schemaName, projectName)

// NOTE: in the future, depending on the project we might want to run the promote query in a different db
const result = await dappsDatabase.query(promoteQuery)
console.log('result: ', result) // @TODO implement a proper response
console.log('result.rows: ', result.rows) // @TODO implement a proper response
} catch (error) {
console.log('error: ', error)
}
Expand Down
19 changes: 19 additions & 0 deletions src/ports/squids/queries.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { SQL, SQLStatement } from 'sql-template-strings'
import { getProjectNameFromService } from './utils'

export const getPromoteQuery = (serviceName: string, schemaName: string, project: string): SQLStatement => {
return SQL`
Expand Down Expand Up @@ -35,3 +36,21 @@ export const getPromoteQuery = (serviceName: string, schemaName: string, project
END $$;
`
}

export const getSchemaByServiceNameQuery = (serviceName: string): SQLStatement => {
return SQL`
SELECT schema
FROM public.indexers
WHERE service = ${serviceName};
`
}

export const getActiveSchemaQuery = (serviceName: string): SQLStatement => {
const projectName = getProjectNameFromService(serviceName)

return SQL`
SELECT schema
FROM public.squids
WHERE name = ${projectName};
`
}
4 changes: 3 additions & 1 deletion src/ports/squids/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ export type SquidMetric = {

export type Squid = {
name: string
service_name: string
schema_name: string
project_active_schema: string
created_at: Date | undefined
health_status: HealthStatus | undefined
service_name: string
service_status: string | undefined
version: number
metrics: Record<Network.ETHEREUM | Network.MATIC, SquidMetric>
Expand Down
2 changes: 2 additions & 0 deletions src/ports/squids/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ export const getMetricValue = (metrics: string, metricName: string) => {
return match ? parseFloat(match[1]) : 0
}

export const getProjectNameFromService = (serviceName: string): string => serviceName.split('-squid-server-')[0]

export function getSquidsNetworksMapping(): {
name: Network.ETHEREUM | Network.MATIC
port: number
Expand Down
Loading

0 comments on commit 4c94ce0

Please sign in to comment.