diff --git a/jest.config.js b/jest.config.js index 90bc9fd..47e9393 100644 --- a/jest.config.js +++ b/jest.config.js @@ -1,10 +1,10 @@ module.exports = { transform: { - "^.+\\.(ts|tsx)$": ["ts-jest", {tsconfig: "test/tsconfig.json"}] + '^.+\\.(ts|tsx)$': ['ts-jest', { tsconfig: 'test/tsconfig.json' }] }, - moduleFileExtensions: ["ts", "js"], - coverageDirectory: "coverage", - collectCoverageFrom: ["src/**/*.ts", "src/**/*.js"], - testMatch: ["**/*.spec.(ts)"], - testEnvironment: "node", + moduleFileExtensions: ['ts', 'js'], + coverageDirectory: 'coverage', + collectCoverageFrom: ['src/**/*.ts', 'src/**/*.js', '!src/migrations/**'], + testMatch: ['**/*.spec.(ts)'], + testEnvironment: 'node' } diff --git a/src/adapters/db.ts b/src/adapters/db.ts index 4cdcaff..d9272ea 100644 --- a/src/adapters/db.ts +++ b/src/adapters/db.ts @@ -129,8 +129,6 @@ export function createDBComponent(components: Pick async updateFriendshipStatus(friendshipId, isActive, txClient) { logger.debug(`updating ${friendshipId} - ${isActive}`) const query = SQL`UPDATE friendships SET is_active = ${isActive}, updated_at = now() WHERE id = ${friendshipId}` - console.log(query.text) - console.log(query.values) if (txClient) { const results = await txClient.query(query) diff --git a/src/adapters/rpc-server/constants.ts b/src/adapters/rpc-server/constants.ts new file mode 100644 index 0000000..68d5735 --- /dev/null +++ b/src/adapters/rpc-server/constants.ts @@ -0,0 +1,2 @@ +export const FRIENDSHIPS_COUNT_PAGE_STREAM = 20 +export const INTERNAL_SERVER_ERROR = 'SERVER ERROR' diff --git a/src/adapters/rpc-server/index.ts b/src/adapters/rpc-server/index.ts new file mode 100644 index 0000000..4440b96 --- /dev/null +++ b/src/adapters/rpc-server/index.ts @@ -0,0 +1 @@ +export * from './rpc-server' diff --git a/src/adapters/rpc-server/rpc-server.ts b/src/adapters/rpc-server/rpc-server.ts new file mode 100644 index 0000000..cd7210c --- /dev/null +++ b/src/adapters/rpc-server/rpc-server.ts @@ -0,0 +1,78 @@ +import { Transport, createRpcServer } from '@dcl/rpc' +import { SocialServiceDefinition } from '@dcl/protocol/out-js/decentraland/social_service_v2/social_service.gen' +import { registerService } from '@dcl/rpc/dist/codegen' +import { IBaseComponent } from '@well-known-components/interfaces' +import { AppComponents, RpcServerContext, SubscriptionEventsEmitter } from '../../types' +import { getFriendsService } from './services/get-friends' +import { getMutualFriendsService } from './services/get-mutual-friends' +import { getPendingFriendshipRequestsService } from './services/get-pending-friendship-requests' +import { upsertFriendshipService } from './services/upsert-friendship' +import { subscribeToFriendshipUpdatesService } from './services/subscribe-to-friendship-updates' + +export type IRPCServerComponent = IBaseComponent & { + attachUser(user: { transport: Transport; address: string }): void +} + +export async function createRpcServerComponent( + components: Pick +): Promise { + const { logs, db, pubsub, config, server } = components + + const SHARED_CONTEXT: Pick = { + subscribers: {} + } + + const rpcServer = createRpcServer({ + logger: logs.getLogger('rpcServer') + }) + + const logger = logs.getLogger('rpcServer-handler') + + const rpcServerPort = (await config.getNumber('RPC_SERVER_PORT')) || 8085 + + const getFriends = getFriendsService({ components: { logs, db } }) + const getMutualFriends = getMutualFriendsService({ components: { logs, db } }) + const getPendingFriendshipRequests = getPendingFriendshipRequestsService({ components: { logs, db } }) + const getSentFriendshipRequests = getPendingFriendshipRequestsService({ components: { logs, db } }) + const upsertFriendship = upsertFriendshipService({ components: { logs, db, pubsub } }) + const subscribeToFriendshipUpdates = subscribeToFriendshipUpdatesService({ components: { logs } }) + + rpcServer.setHandler(async function handler(port) { + registerService(port, SocialServiceDefinition, async () => ({ + getFriends, + getMutualFriends, + getPendingFriendshipRequests, + getSentFriendshipRequests, + upsertFriendship, + subscribeToFriendshipUpdates + })) + }) + + return { + async start() { + server.app.listen(rpcServerPort, () => { + logger.info(`[RPC] RPC Server listening on port ${rpcServerPort}`) + }) + + await pubsub.subscribeToFriendshipUpdates((message) => { + try { + const update = JSON.parse(message) as SubscriptionEventsEmitter['update'] + const updateEmitter = SHARED_CONTEXT.subscribers[update.to] + if (updateEmitter) { + updateEmitter.emit('update', update) + } + } catch (error) { + logger.error(error as any) + } + }) + }, + attachUser({ transport, address }) { + transport.on('close', () => { + if (SHARED_CONTEXT.subscribers[address]) { + delete SHARED_CONTEXT.subscribers[address] + } + }) + rpcServer.attachTransport(transport, { subscribers: SHARED_CONTEXT.subscribers, address }) + } + } +} diff --git a/src/adapters/rpc-server/services/get-friends.ts b/src/adapters/rpc-server/services/get-friends.ts new file mode 100644 index 0000000..2b445de --- /dev/null +++ b/src/adapters/rpc-server/services/get-friends.ts @@ -0,0 +1,46 @@ +import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen' +import { Friendship, RpcServerContext, RPCServiceContext } from '../../../types' +import { INTERNAL_SERVER_ERROR, FRIENDSHIPS_COUNT_PAGE_STREAM } from '../constants' +import { UsersResponse } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen' + +export function getFriendsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) { + const logger = logs.getLogger('get-friends-service') + + return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator { + let friendsGenerator: AsyncGenerator | undefined + try { + friendsGenerator = db.getFriends(context.address) + } catch (error) { + logger.error(error as any) + // throw an error bc there is no sense to create a generator to send an error + // as it's done in the previous Social Service + throw new Error(INTERNAL_SERVER_ERROR) + } + + let users = [] + + for await (const friendship of friendsGenerator) { + const { address_requested, address_requester } = friendship + if (context.address === address_requested) { + users.push({ address: address_requester }) + } else { + users.push({ address: address_requested }) + } + + if (users.length === FRIENDSHIPS_COUNT_PAGE_STREAM) { + const response = { + users: [...users] + } + users = [] + yield response + } + } + + if (users.length) { + const response = { + users + } + yield response + } + } +} diff --git a/src/adapters/rpc-server/services/get-mutual-friends.ts b/src/adapters/rpc-server/services/get-mutual-friends.ts new file mode 100644 index 0000000..ebe6357 --- /dev/null +++ b/src/adapters/rpc-server/services/get-mutual-friends.ts @@ -0,0 +1,45 @@ +import { RpcServerContext, RPCServiceContext } from '../../../types' +import { INTERNAL_SERVER_ERROR, FRIENDSHIPS_COUNT_PAGE_STREAM } from '../constants' +import { + MutualFriendsPayload, + UsersResponse +} from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen' +import { normalizeAddress } from '../../../utils/address' + +export function getMutualFriendsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) { + const logger = logs.getLogger('get-mutual-friends-service') + + return async function* (request: MutualFriendsPayload, context: RpcServerContext): AsyncGenerator { + logger.debug(`getting mutual friends ${context.address}<>${request.user!.address}`) + let mutualFriends: AsyncGenerator<{ address: string }> | undefined + try { + mutualFriends = db.getMutualFriends(context.address, normalizeAddress(request.user!.address)) + } catch (error) { + logger.error(error as any) + // throw an error bc there is no sense to create a generator to send an error + // as it's done in the previous Social Service + throw new Error(INTERNAL_SERVER_ERROR) + } + + let users = [] + + for await (const friendship of mutualFriends) { + const { address } = friendship + users.push({ address }) + if (users.length === FRIENDSHIPS_COUNT_PAGE_STREAM) { + const response = { + users + } + yield response + users = [] + } + } + + if (users.length) { + const response = { + users + } + yield response + } + } +} diff --git a/src/adapters/rpc-server/services/get-pending-friendship-requests.ts b/src/adapters/rpc-server/services/get-pending-friendship-requests.ts new file mode 100644 index 0000000..c7775d1 --- /dev/null +++ b/src/adapters/rpc-server/services/get-pending-friendship-requests.ts @@ -0,0 +1,35 @@ +import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen' +import { RpcServerContext, RPCServiceContext } from '../../../types' +import { FriendshipRequestsResponse } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen' + +export function getPendingFriendshipRequestsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) { + const logger = logs.getLogger('get-pending-friendship-requests-service') + + return async function (_request: Empty, context: RpcServerContext): Promise { + try { + const pendingRequests = await db.getReceivedFriendshipRequests(context.address) + const mappedRequests = pendingRequests.map(({ address, timestamp, metadata }) => ({ + user: { address }, + createdAt: new Date(timestamp).getTime(), + message: metadata?.message || '' + })) + + return { + response: { + $case: 'requests', + requests: { + requests: mappedRequests + } + } + } + } catch (error) { + logger.error(error as any) + return { + response: { + $case: 'internalServerError', + internalServerError: {} + } + } + } + } +} diff --git a/src/adapters/rpc-server/services/get-sent-friendship-requests.ts b/src/adapters/rpc-server/services/get-sent-friendship-requests.ts new file mode 100644 index 0000000..e1a9b22 --- /dev/null +++ b/src/adapters/rpc-server/services/get-sent-friendship-requests.ts @@ -0,0 +1,35 @@ +import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen' +import { RpcServerContext, RPCServiceContext } from '../../../types' +import { FriendshipRequestsResponse } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen' + +export function getSentFriendshipRequestsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) { + const logger = logs.getLogger('get-sent-friendship-requests-service') + + return async function (_request: Empty, context: RpcServerContext): Promise { + try { + const pendingRequests = await db.getSentFriendshipRequests(context.address) + const mappedRequests = pendingRequests.map(({ address, timestamp, metadata }) => ({ + user: { address }, + createdAt: new Date(timestamp).getTime(), + message: metadata?.message || '' + })) + + return { + response: { + $case: 'requests', + requests: { + requests: mappedRequests + } + } + } + } catch (error) { + logger.error(error as any) + return { + response: { + $case: 'internalServerError', + internalServerError: {} + } + } + } + } +} diff --git a/src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts b/src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts new file mode 100644 index 0000000..d2cd43d --- /dev/null +++ b/src/adapters/rpc-server/services/subscribe-to-friendship-updates.ts @@ -0,0 +1,30 @@ +import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen' +import { RpcServerContext, RPCServiceContext, SubscriptionEventsEmitter } from '../../../types' +import { FriendshipUpdate } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen' +import mitt from 'mitt' +import { parseEmittedUpdateToFriendshipUpdate } from '../../../logic/friendships' +import emitterToAsyncGenerator from '../../../utils/emitterToGenerator' + +export function subscribeToFriendshipUpdatesService({ components: { logs } }: RPCServiceContext<'logs'>) { + const logger = logs.getLogger('subscribe-to-friendship-updates-service') + + return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator { + const eventEmitter = context.subscribers[context.address] || mitt() + + if (!context.subscribers[context.address]) { + context.subscribers[context.address] = eventEmitter + } + + const updatesGenerator = emitterToAsyncGenerator(eventEmitter, 'update') + + for await (const update of updatesGenerator) { + logger.debug('> friendship update received, sending: ', { update: update as any }) + const updateToResponse = parseEmittedUpdateToFriendshipUpdate(update) + if (updateToResponse) { + yield updateToResponse + } else { + logger.error('> unable to parse update to FriendshipUpdate > ', { update: update as any }) + } + } + } +} diff --git a/src/adapters/rpc-server/services/upsert-friendship.ts b/src/adapters/rpc-server/services/upsert-friendship.ts new file mode 100644 index 0000000..2c163e4 --- /dev/null +++ b/src/adapters/rpc-server/services/upsert-friendship.ts @@ -0,0 +1,114 @@ +import { Action, FriendshipStatus, RpcServerContext, RPCServiceContext } from '../../../types' +import { + UpsertFriendshipPayload, + UpsertFriendshipResponse +} from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen' +import { + parseUpsertFriendshipRequest, + validateNewFriendshipAction, + getNewFriendshipStatus +} from '../../../logic/friendships' + +export function upsertFriendshipService({ + components: { logs, db, pubsub } +}: RPCServiceContext<'logs' | 'db' | 'pubsub'>) { + const logger = logs.getLogger('upsert-friendship-service') + + return async function ( + request: UpsertFriendshipPayload, + context: RpcServerContext + ): Promise { + const parsedRequest = parseUpsertFriendshipRequest(request) + if (!parsedRequest) { + logger.error('upsert friendship received unknown message: ', request as any) + return { + response: { + $case: 'internalServerError', + internalServerError: {} + } + } + } + + logger.debug(`upsert friendship > `, parsedRequest as Record) + + try { + const friendship = await db.getFriendship([context.address, parsedRequest.user!]) + let lastAction = undefined + if (friendship) { + const lastRecordedAction = await db.getLastFriendshipAction(friendship.id) + lastAction = lastRecordedAction + } + + if ( + !validateNewFriendshipAction( + context.address, + { action: parsedRequest.action, user: parsedRequest.user }, + lastAction + ) + ) { + logger.error('invalid action for a friendship') + return { + response: { + $case: 'invalidFriendshipAction', + invalidFriendshipAction: {} + } + } + } + + const friendshipStatus = getNewFriendshipStatus(parsedRequest.action) + const isActive = friendshipStatus === FriendshipStatus.Friends + + logger.debug('friendship status > ', { isActive: JSON.stringify(isActive), friendshipStatus }) + + const id = await db.executeTx(async (tx) => { + let id + if (friendship) { + await db.updateFriendshipStatus(friendship.id, isActive, tx) + id = friendship.id + } else { + const newFriendshipId = await db.createFriendship([context.address, parsedRequest.user!], isActive, tx) + id = newFriendshipId + } + + await db.recordFriendshipAction( + id, + context.address, + parsedRequest.action, + parsedRequest.action === Action.REQUEST ? parsedRequest.metadata : null, + tx + ) + return id + }) + + logger.debug(`${id} friendship was upsert successfully`) + + await pubsub.publishFriendshipUpdate({ + from: context.address, + to: parsedRequest.user, + action: parsedRequest.action, + timestamp: Date.now(), + metadata: + parsedRequest.action === Action.REQUEST + ? parsedRequest.metadata + ? parsedRequest.metadata + : undefined + : undefined + }) + + return { + response: { + $case: 'accepted', + accepted: {} + } + } + } catch (error) { + logger.error(error as any) + return { + response: { + $case: 'internalServerError', + internalServerError: {} + } + } + } + } +} diff --git a/src/adapters/rpcServer.ts b/src/adapters/rpcServer.ts deleted file mode 100644 index ce56de8..0000000 --- a/src/adapters/rpcServer.ts +++ /dev/null @@ -1,323 +0,0 @@ -import { Transport, createRpcServer } from '@dcl/rpc' -import { SocialServiceDefinition } from '@dcl/protocol/out-js/decentraland/social_service_v2/social_service.gen' -import { registerService } from '@dcl/rpc/dist/codegen' -import mitt from 'mitt' -import { IBaseComponent } from '@well-known-components/interfaces' -import { - Action, - AppComponents, - Friendship, - FriendshipStatus, - RpcServerContext, - SubscriptionEventsEmitter -} from '../types' -import { - getNewFriendshipStatus, - parseEmittedUpdateToFriendshipUpdate, - parseUpsertFriendshipRequest, - validateNewFriendshipAction -} from '../logic/friendships' -import emitterToAsyncGenerator from '../utils/emitterToGenerator' -import { normalizeAddress } from '../utils/address' - -export type IRPCServerComponent = IBaseComponent & { - attachUser(user: { transport: Transport; address: string }): void -} - -const FRIENDSHIPS_COUNT_PAGE_STREAM = 20 - -const INTERNAL_SERVER_ERROR = 'SERVER ERROR' - -export default async function createRpcServerComponent( - components: Pick -): Promise { - const { logs, db, pubsub, config, server } = components - - const SHARED_CONTEXT: Pick = { - subscribers: {} - } - - const rpcServer = createRpcServer({ - logger: logs.getLogger('rpcServer') - }) - - const logger = logs.getLogger('rpcServer-handler') - - const rpcPort = (await config.getNumber('RPC_SERVER_PORT')) || 8085 - - rpcServer.setHandler(async function handler(port) { - registerService(port, SocialServiceDefinition, async () => ({ - getFriends(_request, context) { - logger.debug('getting friends for ', { address: context.address }) - let friendsGenerator: AsyncGenerator | undefined - try { - friendsGenerator = db.getFriends(context.address) - } catch (error) { - logger.error(error as any) - // throw an error bc there is no sense to create a generator to send an error - // as it's done in the previous Social Service - throw new Error(INTERNAL_SERVER_ERROR) - } - - const generator = async function* () { - let users = [] - for await (const friendship of friendsGenerator) { - const { address_requested, address_requester } = friendship - if (context.address === address_requested) { - users.push({ address: address_requester }) - } else { - users.push({ address: address_requested }) - } - - if (users.length === FRIENDSHIPS_COUNT_PAGE_STREAM) { - const response = { - users: [...users] - } - users = [] - yield response - } - } - - if (users.length) { - const response = { - users - } - yield response - } - } - - return generator() - }, - getMutualFriends(request, context) { - logger.debug(`getting mutual friends ${context.address}<>${request.user!.address}`) - let mutualFriends: AsyncGenerator<{ address: string }> | undefined - try { - mutualFriends = db.getMutualFriends(context.address, normalizeAddress(request.user!.address)) - } catch (error) { - logger.error(error as any) - // throw an error bc there is no sense to create a generator to send an error - // as it's done in the previous Social Service - throw new Error(INTERNAL_SERVER_ERROR) - } - - const generator = async function* () { - const users = [] - for await (const friendship of mutualFriends) { - const { address } = friendship - users.push({ address }) - if (users.length === FRIENDSHIPS_COUNT_PAGE_STREAM) { - const response = { - users - } - yield response - } - } - - if (users.length) { - const response = { - users - } - yield response - } - } - - return generator() - }, - async getPendingFriendshipRequests(_request, context) { - try { - const pendingRequests = await db.getReceivedFriendshipRequests(context.address) - const mappedRequests = pendingRequests.map(({ address, timestamp, metadata }) => ({ - user: { address }, - createdAt: new Date(timestamp).getTime(), - message: metadata?.message || '' - })) - - return { - response: { - $case: 'requests', - requests: { - requests: mappedRequests - } - } - } - } catch (error) { - logger.error(error as any) - return { - response: { - $case: 'internalServerError', - internalServerError: {} - } - } - } - }, - async getSentFriendshipRequests(_request, context) { - try { - const pendingRequests = await db.getSentFriendshipRequests(context.address) - const mappedRequests = pendingRequests.map(({ address, timestamp, metadata }) => ({ - user: { address }, - createdAt: new Date(timestamp).getTime(), - message: metadata?.message || '' - })) - - return { - response: { - $case: 'requests', - requests: { - requests: mappedRequests - } - } - } - } catch (error) { - logger.error(error as any) - return { - response: { - $case: 'internalServerError', - internalServerError: {} - } - } - } - }, - async upsertFriendship(request, context) { - const parsedRequest = parseUpsertFriendshipRequest(request) - if (!parsedRequest) { - logger.error('upsert friendship received unkwown message: ', request as any) - return { - response: { - $case: 'internalServerError', - internalServerError: {} - } - } - } - - logger.debug(`upsert friendship > `, parsedRequest as Record) - - try { - const friendship = await db.getFriendship([context.address, parsedRequest.user!]) - let lastAction = undefined - if (friendship) { - const lastRecordedAction = await db.getLastFriendshipAction(friendship.id) - lastAction = lastRecordedAction - } - - if ( - !validateNewFriendshipAction( - context.address, - { action: parsedRequest.action, user: parsedRequest.user }, - lastAction - ) - ) { - logger.error('invalid action for a friendship') - return { - response: { - $case: 'invalidFriendshipAction', - invalidFriendshipAction: {} - } - } - } - - const friendshipStatus = getNewFriendshipStatus(parsedRequest.action) - const isActive = friendshipStatus === FriendshipStatus.Friends - - logger.debug('friendshipstatus > ', { isActive: JSON.stringify(isActive), friendshipStatus }) - - const id = await db.executeTx(async (tx) => { - let id - if (friendship) { - await db.updateFriendshipStatus(friendship.id, isActive, tx) - id = friendship.id - } else { - const newFriendshipId = await db.createFriendship([context.address, parsedRequest.user!], isActive, tx) - id = newFriendshipId - } - - await db.recordFriendshipAction( - id, - context.address, - parsedRequest.action, - parsedRequest.action === Action.REQUEST ? parsedRequest.metadata : null, - tx - ) - return id - }) - - logger.debug(`${id} friendship was upserted successfully`) - - await pubsub.publishFriendshipUpdate({ - from: context.address, - to: parsedRequest.user, - action: parsedRequest.action, - timestamp: Date.now(), - metadata: - parsedRequest.action === Action.REQUEST - ? parsedRequest.metadata - ? parsedRequest.metadata - : undefined - : undefined - }) - - return { - response: { - $case: 'accepted', - accepted: {} - } - } - } catch (error) { - logger.error(error as any) - return { - response: { - $case: 'internalServerError', - internalServerError: {} - } - } - } - }, - subscribeToFriendshipUpdates(_request, context) { - const eventEmitter = mitt() - context.subscribers[context.address] = eventEmitter - const updatesGenerator = emitterToAsyncGenerator(eventEmitter, 'update') - - const generator = async function* () { - for await (const update of updatesGenerator) { - logger.debug('> friendship update received, sending: ', { update: update as any }) - const updateToResponse = parseEmittedUpdateToFriendshipUpdate(update) - if (updateToResponse) { - yield updateToResponse - } else { - logger.error('> unable to parse update to FriendshipUpdate > ', { update: update as any }) - } - } - } - - return generator() - } - })) - }) - - return { - async start() { - server.app.listen(rpcPort, () => { - logger.info(`[RPC] RPC Server listening on port ${rpcPort}`) - }) - - await pubsub.subscribeToFriendshipUpdates((message) => { - try { - const update = JSON.parse(message) as SubscriptionEventsEmitter['update'] - const updateEmitter = SHARED_CONTEXT.subscribers[update.to] - if (updateEmitter) { - updateEmitter.emit('update', update) - } - } catch (error) { - logger.error(error as any) - } - }) - }, - attachUser({ transport, address }) { - transport.on('close', () => { - if (SHARED_CONTEXT.subscribers[address]) { - delete SHARED_CONTEXT.subscribers[address] - } - }) - rpcServer.attachTransport(transport, { subscribers: SHARED_CONTEXT.subscribers, address }) - } - } -} diff --git a/src/adapters/ws.ts b/src/adapters/ws.ts index da31fe0..7de5a93 100644 --- a/src/adapters/ws.ts +++ b/src/adapters/ws.ts @@ -1,6 +1,7 @@ import { WebSocketServer } from 'ws' import { IWebSocketComponent } from '../types' +// TODO: UNUSED export async function createWsComponent(): Promise { let wss: WebSocketServer | undefined diff --git a/src/components.ts b/src/components.ts index c6713b9..d7d44b2 100644 --- a/src/components.ts +++ b/src/components.ts @@ -7,7 +7,7 @@ import { createPgComponent } from '@well-known-components/pg-component' import { AppComponents } from './types' import { metricDeclarations } from './metrics' import { createDBComponent } from './adapters/db' -import createRpcServerComponent from './adapters/rpcServer' +import { createRpcServerComponent } from './adapters/rpc-server' import createRedisComponent from './adapters/redis' import createPubSubComponent from './adapters/pubsub' import { createUWsComponent } from '@well-known-components/uws-http-server' diff --git a/src/controllers/handlers/ws-handler.ts b/src/controllers/handlers/ws-handler.ts index b372f37..b000c85 100644 --- a/src/controllers/handlers/ws-handler.ts +++ b/src/controllers/handlers/ws-handler.ts @@ -4,6 +4,7 @@ import { verify } from '@dcl/platform-crypto-middleware' import { AppComponents, WsUserData } from '../../types' import { normalizeAddress } from '../../utils/address' import { IUWebSocketEventMap, UWebSocketTransport } from '../../utils/UWebSocketTransport' +import { isNotAuthenticated } from '../../utils/wsUserData' const textDecoder = new TextDecoder() @@ -38,7 +39,7 @@ export async function registerWsHandler( logger.debug('ws open') const data = ws.getUserData() // just for type assertion - if (!data.auth) { + if (isNotAuthenticated(data)) { data.timeout = setTimeout(() => { try { logger.error('closing connection, no authchain received') @@ -53,7 +54,7 @@ export async function registerWsHandler( if (data.auth) { data.eventEmitter.emit('message', message) - } else { + } else if (isNotAuthenticated(data)) { clearTimeout(data.timeout) data.timeout = undefined diff --git a/src/logic/friendships.ts b/src/logic/friendships.ts index 6e0320c..517649c 100644 --- a/src/logic/friendships.ts +++ b/src/logic/friendships.ts @@ -68,7 +68,7 @@ type CommonParsedRequest = { user: string } -type ParsedUpsertFriendshipRequest = +export type ParsedUpsertFriendshipRequest = | (CommonParsedRequest & { metadata: { message: string } | null }) | CommonParsedRequest | CommonParsedRequest diff --git a/src/types.ts b/src/types.ts index 795c491..80bda23 100644 --- a/src/types.ts +++ b/src/types.ts @@ -12,7 +12,7 @@ import { Emitter } from 'mitt' import { metricDeclarations } from './metrics' import { IDatabaseComponent } from './adapters/db' import { IRedisComponent } from './adapters/redis' -import { IRPCServerComponent } from './adapters/rpcServer' +import { IRPCServerComponent } from './adapters/rpc-server/rpc-server' import { IPubSubComponent } from './adapters/pubsub' import { HttpRequest, HttpResponse, IUWsComponent, WebSocket } from '@well-known-components/uws-http-server' import { IUWebSocketEventMap } from './utils/UWebSocketTransport' @@ -58,21 +58,27 @@ export type IHandler = { f: (res: HttpResponse, req: HttpRequest) => Promise } -export type WsUserData = - | { - isConnected: boolean - auth: false - timeout?: NodeJS.Timeout - } - | { - isConnected: boolean - eventEmitter: Emitter - auth: true - address: string - } +export type WsAuthenticatedUserData = { + isConnected: boolean + eventEmitter: Emitter + auth: true + address: string +} + +export type WsNotAuthenticatedUserData = { + isConnected: boolean + auth: false + timeout?: NodeJS.Timeout +} + +export type WsUserData = WsAuthenticatedUserData | WsNotAuthenticatedUserData export type InternalWebSocket = WebSocket +export type RPCServiceContext = { + components: Pick +} + // this type simplifies the typings of http handlers export type HandlerContextWithPath< ComponentNames extends keyof AppComponents, diff --git a/src/utils/UWebSocketTransport.ts b/src/utils/UWebSocketTransport.ts index 04e9848..c93d643 100644 --- a/src/utils/UWebSocketTransport.ts +++ b/src/utils/UWebSocketTransport.ts @@ -1,8 +1,6 @@ import { Transport, TransportEvents } from '@dcl/rpc' import mitt, { Emitter } from 'mitt' -export const defer = Promise.prototype.then.bind(Promise.resolve()) - export type RecognizedString = | string | ArrayBuffer @@ -66,8 +64,12 @@ export function UWebSocketTransport( }) // socket already connected at this point - void defer(() => events.emit('connect', {})) - void defer(() => flush()) + setImmediate(() => { + events.emit('connect', {}) + }) + setImmediate(() => { + flush() + }) const api: Transport = { ...events, diff --git a/src/utils/wsUserData.ts b/src/utils/wsUserData.ts new file mode 100644 index 0000000..cd4e57b --- /dev/null +++ b/src/utils/wsUserData.ts @@ -0,0 +1,5 @@ +import { WsNotAuthenticatedUserData, WsUserData } from '../types' + +export function isNotAuthenticated(data: WsUserData): data is WsNotAuthenticatedUserData { + return !data.auth +} diff --git a/test/mocks/components/config.ts b/test/mocks/components/config.ts new file mode 100644 index 0000000..6c0f638 --- /dev/null +++ b/test/mocks/components/config.ts @@ -0,0 +1,8 @@ +import { IConfigComponent } from '@well-known-components/interfaces' + +export const mockConfig: jest.Mocked = { + getNumber: jest.fn(), + getString: jest.fn(), + requireNumber: jest.fn(), + requireString: jest.fn() +} diff --git a/test/mocks/components/db.ts b/test/mocks/components/db.ts new file mode 100644 index 0000000..45cc45b --- /dev/null +++ b/test/mocks/components/db.ts @@ -0,0 +1,14 @@ +import { IDatabaseComponent } from '../../../src/adapters/db' + +export const mockDb: jest.Mocked = { + createFriendship: jest.fn(), + updateFriendshipStatus: jest.fn(), + getFriends: jest.fn(), + getMutualFriends: jest.fn(), + getFriendship: jest.fn(), + getLastFriendshipAction: jest.fn(), + recordFriendshipAction: jest.fn(), + getReceivedFriendshipRequests: jest.fn(), + getSentFriendshipRequests: jest.fn(), + executeTx: jest.fn() +} diff --git a/test/mocks/components/index.ts b/test/mocks/components/index.ts new file mode 100644 index 0000000..f8808c1 --- /dev/null +++ b/test/mocks/components/index.ts @@ -0,0 +1,6 @@ +export * from './logs' +export * from './db' +export * from './pubsub' +export * from './pg' +export * from './config' +export * from './uws' diff --git a/test/mocks/components/logs.ts b/test/mocks/components/logs.ts new file mode 100644 index 0000000..0235740 --- /dev/null +++ b/test/mocks/components/logs.ts @@ -0,0 +1,13 @@ +import { IMetricsComponent } from '@well-known-components/interfaces' +import { ILoggerComponent } from '@well-known-components/interfaces/dist/components/logger' +import { createLogComponent } from '@well-known-components/logger' + +export const mockLogs: jest.Mocked = { + getLogger: jest.fn().mockReturnValue({ + log: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + info: jest.fn(), + warn: jest.fn() + }) +} diff --git a/test/mocks/components/pg.ts b/test/mocks/components/pg.ts new file mode 100644 index 0000000..d341699 --- /dev/null +++ b/test/mocks/components/pg.ts @@ -0,0 +1,12 @@ +import { IPgComponent } from '@well-known-components/pg-component' +import { release } from 'os' + +export const mockPg: jest.Mocked = { + streamQuery: jest.fn(), + start: jest.fn(), + query: jest.fn(), + getPool: jest.fn().mockReturnValue({ + connect: jest.fn().mockResolvedValue({ query: jest.fn(), release: jest.fn() }) + }), + stop: jest.fn() +} diff --git a/test/mocks/components/pubsub.ts b/test/mocks/components/pubsub.ts new file mode 100644 index 0000000..95f6485 --- /dev/null +++ b/test/mocks/components/pubsub.ts @@ -0,0 +1,8 @@ +import { IPubSubComponent } from '../../../src/adapters/pubsub' + +export const mockPubSub: jest.Mocked = { + start: jest.fn(), + stop: jest.fn(), + subscribeToFriendshipUpdates: jest.fn(), + publishFriendshipUpdate: jest.fn() +} diff --git a/test/mocks/components/uws.ts b/test/mocks/components/uws.ts new file mode 100644 index 0000000..8b72e71 --- /dev/null +++ b/test/mocks/components/uws.ts @@ -0,0 +1,28 @@ +import { IUWsComponent } from '@well-known-components/uws-http-server' + +export const mockUWs: jest.Mocked = { + start: jest.fn(), + app: { + listen: jest.fn(), + listen_unix: jest.fn(), + get: jest.fn(), + post: jest.fn(), + options: jest.fn(), + del: jest.fn(), + patch: jest.fn(), + put: jest.fn(), + head: jest.fn(), + connect: jest.fn(), + trace: jest.fn(), + any: jest.fn(), + ws: jest.fn(), + publish: jest.fn(), + numSubscribers: jest.fn(), + addServerName: jest.fn(), + domain: jest.fn(), + removeServerName: jest.fn(), + missingServerName: jest.fn(), + filter: jest.fn(), + close: jest.fn() + } +} diff --git a/test/mocks/empty-request.ts b/test/mocks/empty-request.ts new file mode 100644 index 0000000..10c9431 --- /dev/null +++ b/test/mocks/empty-request.ts @@ -0,0 +1,3 @@ +import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen' + +export const emptyRequest = {} as Empty diff --git a/test/mocks/friendship-request.ts b/test/mocks/friendship-request.ts new file mode 100644 index 0000000..8b5cc6a --- /dev/null +++ b/test/mocks/friendship-request.ts @@ -0,0 +1,23 @@ +import { FriendshipRequest } from '../../src/types' + +/** + * Creates a mock friendship request from given parameters. + */ +export const createMockFriendshipRequest = ( + address: string, + timestamp: string, + message?: string +): FriendshipRequest => ({ + address, + timestamp, + metadata: message ? { message } : undefined +}) + +/** + * Creates the expected mapped response for a friendship request. + */ +export const createMockExpectedFriendshipRequest = (address: string, createdAt: string, message: string) => ({ + user: { address }, + createdAt: new Date(createdAt).getTime(), + message +}) diff --git a/test/setupTests.ts b/test/setupTests.ts new file mode 100644 index 0000000..630c356 --- /dev/null +++ b/test/setupTests.ts @@ -0,0 +1,3 @@ +afterEach(() => { + jest.clearAllMocks() +}) diff --git a/test/unit/adapters/db.spec.ts b/test/unit/adapters/db.spec.ts new file mode 100644 index 0000000..d6a9346 --- /dev/null +++ b/test/unit/adapters/db.spec.ts @@ -0,0 +1,216 @@ +import { createDBComponent } from '../../../src/adapters/db' +import { Action } from '../../../src/types' +import SQL from 'sql-template-strings' +import { mockLogs, mockPg } from '../../mocks/components' + +describe('db', () => { + let dbComponent: ReturnType + + beforeEach(() => { + dbComponent = createDBComponent({ pg: mockPg, logs: mockLogs }) + }) + + describe('getFriends', () => { + it('should stream active friendships', async () => { + const mockGenerator = (async function* () { + yield { id: 'friendship-1', address_requester: '0x123', address_requested: '0x456', is_active: true } + })() + mockPg.streamQuery.mockReturnValueOnce(mockGenerator) + + const generator = dbComponent.getFriends('0x123', true) + const result = await generator.next() + + expect(mockPg.streamQuery).toHaveBeenCalledWith( + SQL`SELECT * FROM friendships WHERE (address_requester = ${'0x123'} OR address_requested = ${'0x123'}) AND is_active = true` + ) + expect(result.value).toEqual({ + id: 'friendship-1', + address_requester: '0x123', + address_requested: '0x456', + is_active: true + }) + }) + + it('should stream all friendships (including inactive)', async () => { + const mockGenerator = (async function* () { + yield { id: 'friendship-1', address_requester: '0x123', address_requested: '0x456', is_active: false } + })() + mockPg.streamQuery.mockReturnValueOnce(mockGenerator) + + const generator = dbComponent.getFriends('0x123', false) + const result = await generator.next() + + expect(mockPg.streamQuery).toHaveBeenCalledWith( + SQL`SELECT * FROM friendships WHERE (address_requester = ${'0x123'} OR address_requested = ${'0x123'})` + ) + expect(result.value).toEqual({ + id: 'friendship-1', + address_requester: '0x123', + address_requested: '0x456', + is_active: false + }) + }) + }) + + describe('getMutualFriends', () => { + it('should stream mutual friends', async () => { + const mockGenerator = (async function* () { + yield { address: '0x789' } + })() + mockPg.streamQuery.mockReturnValueOnce(mockGenerator) + + const generator = dbComponent.getMutualFriends('0x123', '0x456') + const result = await generator.next() + + expect(result.value).toEqual({ address: '0x789' }) + expect(mockPg.streamQuery).toHaveBeenCalled() + }) + }) + + describe('createFriendship', () => { + it('should create a new friendship', async () => { + await dbComponent.createFriendship(['0x123', '0x456'], true) + + expect(mockPg.query).toHaveBeenCalledWith( + SQL`INSERT INTO friendships (id, address_requester, address_requested, is_active) VALUES (${expect.any(String)}, ${'0x123'}, ${'0x456'}, ${true})` + ) + }) + }) + + describe('updateFriendshipStatus', () => { + it('should update friendship status', async () => { + mockPg.query.mockResolvedValueOnce({ rowCount: 1, rows: [{}] }) + + const result = await dbComponent.updateFriendshipStatus('friendship-id', false) + + expect(result).toBe(true) + expect(mockPg.query).toHaveBeenCalledWith( + SQL`UPDATE friendships SET is_active = ${false}, updated_at = now() WHERE id = ${'friendship-id'}` + ) + }) + + it('should return false if no rows were updated', async () => { + mockPg.query.mockResolvedValueOnce({ rowCount: 0, rows: [] }) + + const result = await dbComponent.updateFriendshipStatus('friendship-id', false) + + expect(result).toBe(false) + }) + }) + + describe('getFriendship', () => { + it('should retrieve a specific friendship', async () => { + const mockFriendship = { + id: 'friendship-1', + address_requester: '0x123', + address_requested: '0x456', + is_active: true + } + mockPg.query.mockResolvedValueOnce({ rows: [mockFriendship], rowCount: 1 }) + + const result = await dbComponent.getFriendship(['0x123', '0x456']) + + expect(result).toEqual(mockFriendship) + expect(mockPg.query).toHaveBeenCalled() + }) + }) + + describe('getLastFriendshipAction', () => { + it('should return the most recent friendship action', async () => { + const mockAction = { + id: 'action-1', + friendship_id: 'friendship-1', + action: Action.REQUEST, + acting_user: '0x123', + metadata: null, + timestamp: '2025-01-01T00:00:00.000Z' + } + mockPg.query.mockResolvedValueOnce({ rows: [mockAction], rowCount: 1 }) + + const result = await dbComponent.getLastFriendshipAction('friendship-1') + + expect(result).toEqual(mockAction) + }) + }) + + describe('getReceivedFriendshipRequests', () => { + it('should retrieve received friendship requests', async () => { + const mockRequests = [ + { + address: '0x123', + timestamp: '2025-01-01T00:00:00.000Z', + metadata: { message: 'Hello' } + } + ] + mockPg.query.mockResolvedValueOnce({ rows: mockRequests, rowCount: 1 }) + + const result = await dbComponent.getReceivedFriendshipRequests('0x456') + + expect(result).toEqual(mockRequests) + }) + }) + + describe('getSentFriendshipRequests', () => { + it('should retrieve sent friendship requests', async () => { + const mockRequests = [ + { + address: '0x456', + timestamp: '2025-01-01T00:00:00.000Z', + metadata: { message: 'Hi there' } + } + ] + mockPg.query.mockResolvedValueOnce({ rows: mockRequests, rowCount: 1 }) + + const result = await dbComponent.getSentFriendshipRequests('0x123') + + expect(result).toEqual(mockRequests) + }) + }) + + describe('recordFriendshipAction', () => { + it('should record a friendship action', async () => { + const result = await dbComponent.recordFriendshipAction('friendship-id', '0x123', Action.REQUEST, { + message: 'Hi' + }) + + expect(result).toBe(true) + expect(mockPg.query).toHaveBeenCalledWith( + expect.objectContaining({ + text: expect.stringContaining( + 'INSERT INTO friendship_actions (id, friendship_id, action, acting_user, metadata)' + ), + values: expect.arrayContaining(['friendship-id', '0x123', Action.REQUEST, { message: 'Hi' }]) + }) + ) + }) + }) + + describe('executeTx', () => { + it('should execute a transaction successfully', async () => { + const result = await dbComponent.executeTx(async (client) => { + await client.query('SELECT 1') + return 'success' + }) + + const mockClient = await mockPg.getPool().connect() + + expect(mockClient.query).toHaveBeenCalledWith('BEGIN') + expect(mockClient.query).toHaveBeenCalledWith('SELECT 1') + expect(mockClient.query).toHaveBeenCalledWith('COMMIT') + expect(result).toBe('success') + }) + + it('should rollback the transaction on error', async () => { + const mockClient = await mockPg.getPool().connect() + + await expect( + dbComponent.executeTx(async () => { + throw new Error('Rollback error') + }) + ).rejects.toThrow('Rollback error') + + expect(mockClient.query).toHaveBeenCalledWith('BEGIN') + expect(mockClient.query).toHaveBeenCalledWith('ROLLBACK') + }) + }) +}) diff --git a/test/unit/adapters/rpc-server.spec.ts b/test/unit/adapters/rpc-server.spec.ts new file mode 100644 index 0000000..af4d70d --- /dev/null +++ b/test/unit/adapters/rpc-server.spec.ts @@ -0,0 +1,62 @@ +import { createRpcServerComponent, IRPCServerComponent } from ' ../../../src/adapters/rpc-server/rpc-server' +import { RpcServerContext, SubscriptionEventsEmitter } from '../../../src/types' +import { RpcServer, Transport, createRpcServer } from '@dcl/rpc' +import mitt from 'mitt' +import { mockConfig, mockDb, mockLogs, mockPubSub, mockUWs } from '../../mocks/components' + +jest.mock('@dcl/rpc', () => ({ + createRpcServer: jest.fn().mockReturnValue({ + setHandler: jest.fn(), + attachTransport: jest.fn() + }) +})) + +describe('createRpcServerComponent', () => { + let rpcServer: IRPCServerComponent + + let rpcServerMock: jest.Mocked> + let setHandlerMock: jest.Mock, attachTransportMock: jest.Mock + + beforeEach(async () => { + rpcServerMock = createRpcServer({ + logger: mockLogs.getLogger('rpcServer-test') + }) as jest.Mocked> + + setHandlerMock = rpcServerMock.setHandler as jest.Mock + attachTransportMock = rpcServerMock.attachTransport as jest.Mock + + rpcServer = await createRpcServerComponent({ + logs: mockLogs, + db: mockDb, + pubsub: mockPubSub, + config: mockConfig, + server: mockUWs + }) + }) + + it('should register all services correctly', async () => { + expect(setHandlerMock).toHaveBeenCalledWith(expect.any(Function)) + }) + + describe('start', () => { + it('should start the server and subscribe to pubsub updates', async () => { + mockConfig.getNumber.mockResolvedValueOnce(8085) + + await rpcServer.start({} as any) + + expect(mockUWs.app.listen).toHaveBeenCalledWith(8085, expect.any(Function)) + expect(mockPubSub.subscribeToFriendshipUpdates).toHaveBeenCalledWith(expect.any(Function)) + }) + }) + + describe('attachUser', () => { + it('should attach a user and register transport events', () => { + const mockTransport = { on: jest.fn() } as unknown as Transport + const address = '0x123' + + rpcServer.attachUser({ transport: mockTransport, address }) + + expect(mockTransport.on).toHaveBeenCalledWith('close', expect.any(Function)) + }) + }) +}) diff --git a/test/unit/adapters/rpc-server/services/get-friends.spec.ts b/test/unit/adapters/rpc-server/services/get-friends.spec.ts new file mode 100644 index 0000000..d764c77 --- /dev/null +++ b/test/unit/adapters/rpc-server/services/get-friends.spec.ts @@ -0,0 +1,74 @@ +import { mockDb, mockLogs } from '../../../../mocks/components' +import { getFriendsService } from '../../../../../src/adapters/rpc-server/services/get-friends' +import { FRIENDSHIPS_COUNT_PAGE_STREAM, INTERNAL_SERVER_ERROR } from '../../../../../src/adapters/rpc-server/constants' +import { RpcServerContext, Friendship, AppComponents } from '../../../../../src/types' +import { emptyRequest } from '../../../../mocks/empty-request' + +describe('getFriendsService', () => { + let components: jest.Mocked> + let getFriends: ReturnType + + const rpcContext: RpcServerContext = { + address: '0x123', + subscribers: undefined + } + + beforeEach(() => { + components = { db: mockDb, logs: mockLogs } + getFriends = getFriendsService({ components }) + }) + + it('should return the correct list of friends', async () => { + const mockGetFriendsGenerator = async function* () { + yield createMockFriendship('0x456', '0x123') + yield createMockFriendship('0x789', '0x123') + } + mockDb.getFriends.mockReturnValueOnce(mockGetFriendsGenerator()) + + const generator = getFriends(emptyRequest, rpcContext) + + const result1 = await generator.next() + expect(result1.value).toEqual({ users: [{ address: '0x456' }, { address: '0x789' }] }) + + const result2 = await generator.next() + expect(result2.done).toBe(true) + }) + + it('should respect the pagination limit', async () => { + const mockFriendsGenerator = async function* () { + for (let i = 0; i < FRIENDSHIPS_COUNT_PAGE_STREAM + 1; i++) { + yield createMockFriendship(`0x${i + 1}`, '0x123') + } + } + mockDb.getFriends.mockReturnValueOnce(mockFriendsGenerator()) + + const generator = getFriends(emptyRequest, rpcContext) + + const result1 = await generator.next() + expect(result1.value.users).toHaveLength(FRIENDSHIPS_COUNT_PAGE_STREAM) + + const result2 = await generator.next() + expect(result2.value.users).toHaveLength(1) + expect(result2.done).toBe(false) // Generator still has values + }) + + it('should handle errors from the database gracefully', async () => { + mockDb.getFriends.mockImplementationOnce(() => { + throw new Error('Database error') + }) + + const generator = getFriends(emptyRequest, rpcContext) + + await expect(generator.next()).rejects.toThrow(INTERNAL_SERVER_ERROR) + }) + + // Helper to create a mock friendship object + const createMockFriendship = (requester: string, requested: string): Friendship => ({ + address_requester: requester, + address_requested: requested, + id: 'mock-friendship-id', + is_active: false, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString() + }) +}) diff --git a/test/unit/adapters/rpc-server/services/get-mutual-friends.spec.ts b/test/unit/adapters/rpc-server/services/get-mutual-friends.spec.ts new file mode 100644 index 0000000..abdc00a --- /dev/null +++ b/test/unit/adapters/rpc-server/services/get-mutual-friends.spec.ts @@ -0,0 +1,68 @@ +import { mockDb, mockLogs } from '../../../../mocks/components' +import { getMutualFriendsService } from '../../../../../src/adapters/rpc-server/services/get-mutual-friends' +import { INTERNAL_SERVER_ERROR, FRIENDSHIPS_COUNT_PAGE_STREAM } from '../../../../../src/adapters/rpc-server/constants' +import { MutualFriendsPayload } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen' +import { RpcServerContext, AppComponents } from '../../../../../src/types' + +describe('getMutualFriendsService', () => { + let components: jest.Mocked> + let getMutualFriends: ReturnType + + const rpcContext: RpcServerContext = { + address: '0x123', + subscribers: undefined + } + + const mutualFriendsRequest: MutualFriendsPayload = { + user: { address: '0x456' } + } + + beforeEach(() => { + components = { db: mockDb, logs: mockLogs } + getMutualFriends = getMutualFriendsService({ components }) + }) + + it('should return the correct list of mutual friends', async () => { + const mockMutualFriendsGenerator = async function* () { + yield { address: '0x789' } + yield { address: '0xabc' } + } + mockDb.getMutualFriends.mockReturnValueOnce(mockMutualFriendsGenerator()) + + const generator = getMutualFriends(mutualFriendsRequest, rpcContext) + + const result1 = await generator.next() + expect(result1.value).toEqual({ users: [{ address: '0x789' }, { address: '0xabc' }] }) + + const result2 = await generator.next() + expect(result2.done).toBe(true) + }) + + it('should respect the pagination limit', async () => { + const mockMutualFriendsGenerator = async function* () { + for (let i = 0; i <= FRIENDSHIPS_COUNT_PAGE_STREAM; i++) { + yield { address: `0x${i}` } + } + } + mockDb.getMutualFriends.mockReturnValueOnce(mockMutualFriendsGenerator()) + + const generator = getMutualFriends(mutualFriendsRequest, rpcContext) + + const result1 = await generator.next() + expect(result1.value.users).toHaveLength(FRIENDSHIPS_COUNT_PAGE_STREAM) + + const result2 = await generator.next() + expect(result2.value.users).toHaveLength(1) + expect(result2.done).toBe(false) + }) + + it('should handle errors from the database gracefully', async () => { + mockDb.getMutualFriends.mockImplementationOnce(() => { + throw new Error('Database error') + }) + + const generator = getMutualFriends(mutualFriendsRequest, rpcContext) + + await expect(generator.next()).rejects.toThrow(INTERNAL_SERVER_ERROR) + }) +}) diff --git a/test/unit/adapters/rpc-server/services/get-pending-friendship-requests.spec.ts b/test/unit/adapters/rpc-server/services/get-pending-friendship-requests.spec.ts new file mode 100644 index 0000000..d554693 --- /dev/null +++ b/test/unit/adapters/rpc-server/services/get-pending-friendship-requests.spec.ts @@ -0,0 +1,76 @@ +import { mockDb, mockLogs } from '../../../../mocks/components' +import { getPendingFriendshipRequestsService } from '../../../../../src/adapters/rpc-server/services/get-pending-friendship-requests' +import { RpcServerContext, AppComponents } from '../../../../../src/types' +import { FriendshipRequestsResponse } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen' +import { emptyRequest } from '../../../../mocks/empty-request' +import { createMockFriendshipRequest, createMockExpectedFriendshipRequest } from '../../../../mocks/friendship-request' + +describe('getPendingFriendshipRequestsService', () => { + let components: jest.Mocked> + let getPendingRequests: ReturnType + + const rpcContext: RpcServerContext = { + address: '0x123', + subscribers: undefined + } + + beforeEach(() => { + components = { db: mockDb, logs: mockLogs } + getPendingRequests = getPendingFriendshipRequestsService({ components }) + }) + + it('should return the correct list of pending friendship requests', async () => { + const mockPendingRequests = [ + createMockFriendshipRequest('0x456', '2025-01-01T00:00:00Z', 'Hi!'), + createMockFriendshipRequest('0x789', '2025-01-02T00:00:00Z') + ] + + mockDb.getReceivedFriendshipRequests.mockResolvedValueOnce(mockPendingRequests) + + const result: FriendshipRequestsResponse = await getPendingRequests(emptyRequest, rpcContext) + + expect(result).toEqual({ + response: { + $case: 'requests', + requests: { + requests: [ + createMockExpectedFriendshipRequest('0x456', '2025-01-01T00:00:00Z', 'Hi!'), + createMockExpectedFriendshipRequest('0x789', '2025-01-02T00:00:00Z', '') + ] + } + } + }) + }) + + it('should handle database errors gracefully', async () => { + mockDb.getReceivedFriendshipRequests.mockImplementationOnce(() => { + throw new Error('Database error') + }) + + const result: FriendshipRequestsResponse = await getPendingRequests(emptyRequest, rpcContext) + + expect(result).toEqual({ + response: { + $case: 'internalServerError', + internalServerError: {} + } + }) + }) + + it('should map metadata.message to an empty string if undefined', async () => { + const mockPendingRequests = [createMockFriendshipRequest('0x456', '2025-01-01T00:00:00Z')] + + mockDb.getReceivedFriendshipRequests.mockResolvedValueOnce(mockPendingRequests) + + const result: FriendshipRequestsResponse = await getPendingRequests(emptyRequest, rpcContext) + + expect(result).toEqual({ + response: { + $case: 'requests', + requests: { + requests: [createMockExpectedFriendshipRequest('0x456', '2025-01-01T00:00:00Z', '')] + } + } + }) + }) +}) diff --git a/test/unit/adapters/rpc-server/services/get-sent-friendship-requests.spec.ts b/test/unit/adapters/rpc-server/services/get-sent-friendship-requests.spec.ts new file mode 100644 index 0000000..ec0cb53 --- /dev/null +++ b/test/unit/adapters/rpc-server/services/get-sent-friendship-requests.spec.ts @@ -0,0 +1,76 @@ +import { mockDb, mockLogs } from '../../../../mocks/components' +import { getSentFriendshipRequestsService } from '../../../../../src/adapters/rpc-server/services/get-sent-friendship-requests' +import { RpcServerContext, AppComponents } from '../../../../../src/types' +import { emptyRequest } from '../../../../mocks/empty-request' +import { createMockFriendshipRequest, createMockExpectedFriendshipRequest } from '../../../../mocks/friendship-request' +import { FriendshipRequestsResponse } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen' + +describe('getSentFriendshipRequestsService', () => { + let components: jest.Mocked> + let getSentRequests: ReturnType + + const rpcContext: RpcServerContext = { + address: '0x123', + subscribers: undefined + } + + beforeEach(() => { + components = { db: mockDb, logs: mockLogs } + getSentRequests = getSentFriendshipRequestsService({ components }) + }) + + it('should return the correct list of sent friendship requests', async () => { + const mockSentRequests = [ + createMockFriendshipRequest('0x456', '2025-01-01T00:00:00Z', 'Hello!'), + createMockFriendshipRequest('0x789', '2025-01-02T00:00:00Z') + ] + + mockDb.getSentFriendshipRequests.mockResolvedValueOnce(mockSentRequests) + + const result: FriendshipRequestsResponse = await getSentRequests(emptyRequest, rpcContext) + + expect(result).toEqual({ + response: { + $case: 'requests', + requests: { + requests: [ + createMockExpectedFriendshipRequest('0x456', '2025-01-01T00:00:00Z', 'Hello!'), + createMockExpectedFriendshipRequest('0x789', '2025-01-02T00:00:00Z', '') + ] + } + } + }) + }) + + it('should handle database errors gracefully', async () => { + mockDb.getSentFriendshipRequests.mockImplementationOnce(() => { + throw new Error('Database error') + }) + + const result: FriendshipRequestsResponse = await getSentRequests(emptyRequest, rpcContext) + + expect(result).toEqual({ + response: { + $case: 'internalServerError', + internalServerError: {} + } + }) + }) + + it('should map metadata.message to an empty string if undefined', async () => { + const mockSentRequests = [createMockFriendshipRequest('0x456', '2025-01-01T00:00:00Z')] + + mockDb.getSentFriendshipRequests.mockResolvedValueOnce(mockSentRequests) + + const result: FriendshipRequestsResponse = await getSentRequests(emptyRequest, rpcContext) + + expect(result).toEqual({ + response: { + $case: 'requests', + requests: { + requests: [createMockExpectedFriendshipRequest('0x456', '2025-01-01T00:00:00Z', '')] + } + } + }) + }) +}) diff --git a/test/unit/adapters/rpc-server/services/subscribe-to-friendship-updates.spec.ts b/test/unit/adapters/rpc-server/services/subscribe-to-friendship-updates.spec.ts new file mode 100644 index 0000000..7aeb171 --- /dev/null +++ b/test/unit/adapters/rpc-server/services/subscribe-to-friendship-updates.spec.ts @@ -0,0 +1,33 @@ +import { subscribeToFriendshipUpdatesService } from '../../../../../src/adapters/rpc-server/services/subscribe-to-friendship-updates' +import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen' +import { RpcServerContext, AppComponents } from '../../../../../src/types' +import { mockLogs } from '../../../../mocks/components' + +describe('subscribeToFriendshipUpdatesService', () => { + let components: Pick + let subscribeToUpdates: ReturnType + let rpcContext: RpcServerContext + + beforeEach(() => { + components = { logs: mockLogs } + subscribeToUpdates = subscribeToFriendshipUpdatesService({ components }) + + rpcContext = { + address: '0x123', + subscribers: {} + } + }) + + it('should add the subscriber to the context', async () => { + const generator = subscribeToUpdates({} as Empty, rpcContext) + generator.next() + + expect(rpcContext.subscribers['0x123']).toBeDefined() + + // Properly clean up the generator + generator.return(undefined) + }) + + it.todo('should yield parsed updates when an update is emitted') + it.todo('should skip unparsable updates') +}) diff --git a/test/unit/adapters/rpc-server/services/upsert-friendship.spec.ts b/test/unit/adapters/rpc-server/services/upsert-friendship.spec.ts new file mode 100644 index 0000000..2c55e05 --- /dev/null +++ b/test/unit/adapters/rpc-server/services/upsert-friendship.spec.ts @@ -0,0 +1,152 @@ +import { mockDb, mockLogs, mockPubSub } from '../../../../mocks/components' +import { upsertFriendshipService } from '../../../../../src/adapters/rpc-server/services/upsert-friendship' +import { Action, FriendshipStatus, RpcServerContext, AppComponents } from '../../../../../src/types' +import * as FriendshipsLogic from '../../../../../src/logic/friendships' +import { + UpsertFriendshipPayload, + UpsertFriendshipResponse +} from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen' +import { ParsedUpsertFriendshipRequest } from '../../../../../src/logic/friendships' + +jest.mock('../../../../../src/logic/friendships') + +describe('upsertFriendshipService', () => { + let components: jest.Mocked> + let upsertFriendship: ReturnType + + const rpcContext: RpcServerContext = { address: '0x123', subscribers: undefined } + const userAddress = '0x456' + const message = 'Hello' + + const mockRequest: UpsertFriendshipPayload = { + action: { + $case: 'request', + request: { user: { address: userAddress }, message } + } + } + + const mockParsedRequest: ParsedUpsertFriendshipRequest = { + action: Action.REQUEST, + user: userAddress, + metadata: { message } + } + + const existingFriendship = { + id: 'friendship-id', + status: FriendshipStatus.Requested, + address_requester: rpcContext.address, + address_requested: userAddress, + is_active: true, + created_at: new Date().toISOString(), + updated_at: new Date().toISOString() + } + + const lastFriendshipAction = { + id: 'action-id', + friendship_id: 'friendship-id', + acting_user: rpcContext.address, + action: Action.REQUEST, + timestamp: Date.now().toString() + } + + beforeEach(() => { + components = { db: mockDb, logs: mockLogs, pubsub: mockPubSub } + upsertFriendship = upsertFriendshipService({ components }) + mockDb.executeTx.mockImplementation(async (cb) => await cb({} as any)) + }) + + it('should return an internalServerError for an invalid request', async () => { + jest.spyOn(FriendshipsLogic, 'parseUpsertFriendshipRequest').mockReturnValueOnce(null) + + const result: UpsertFriendshipResponse = await upsertFriendship(mockRequest, rpcContext) + + expect(result).toEqual({ + response: { + $case: 'internalServerError', + internalServerError: {} + } + }) + }) + + it('should return invalidFriendshipAction for an invalid action', async () => { + jest.spyOn(FriendshipsLogic, 'parseUpsertFriendshipRequest').mockReturnValueOnce(mockParsedRequest) + jest.spyOn(FriendshipsLogic, 'validateNewFriendshipAction').mockReturnValueOnce(false) + + const result: UpsertFriendshipResponse = await upsertFriendship(mockRequest, rpcContext) + + expect(result).toEqual({ + response: { + $case: 'invalidFriendshipAction', + invalidFriendshipAction: {} + } + }) + }) + + it('should update an existing friendship', async () => { + jest.spyOn(FriendshipsLogic, 'parseUpsertFriendshipRequest').mockReturnValueOnce(mockParsedRequest) + jest.spyOn(FriendshipsLogic, 'validateNewFriendshipAction').mockReturnValueOnce(true) + jest.spyOn(FriendshipsLogic, 'getNewFriendshipStatus').mockReturnValueOnce(FriendshipStatus.Friends) + + mockDb.getFriendship.mockResolvedValueOnce(existingFriendship) + mockDb.getLastFriendshipAction.mockResolvedValueOnce(lastFriendshipAction) + + const result: UpsertFriendshipResponse = await upsertFriendship(mockRequest, rpcContext) + + expect(mockDb.updateFriendshipStatus).toHaveBeenCalledWith(existingFriendship.id, true, expect.anything()) + expect(mockDb.recordFriendshipAction).toHaveBeenCalledWith( + existingFriendship.id, + rpcContext.address, + mockParsedRequest.action, + mockParsedRequest.metadata, + expect.anything() + ) + expect(result).toEqual({ + response: { + $case: 'accepted', + accepted: {} + } + }) + }) + + it('should create a new friendship', async () => { + jest.spyOn(FriendshipsLogic, 'parseUpsertFriendshipRequest').mockReturnValueOnce(mockParsedRequest) + jest.spyOn(FriendshipsLogic, 'validateNewFriendshipAction').mockReturnValueOnce(true) + jest.spyOn(FriendshipsLogic, 'getNewFriendshipStatus').mockReturnValueOnce(FriendshipStatus.Requested) + + mockDb.getFriendship.mockResolvedValueOnce(null) + mockDb.createFriendship.mockResolvedValueOnce('new-friendship-id') + + const result: UpsertFriendshipResponse = await upsertFriendship(mockRequest, rpcContext) + + expect(mockDb.createFriendship).toHaveBeenCalledWith([rpcContext.address, userAddress], false, expect.anything()) + expect(mockDb.recordFriendshipAction).toHaveBeenCalledWith( + 'new-friendship-id', + rpcContext.address, + mockParsedRequest.action, + mockParsedRequest.metadata, + expect.anything() + ) + expect(result).toEqual({ + response: { + $case: 'accepted', + accepted: {} + } + }) + }) + + it('should handle errors gracefully', async () => { + jest.spyOn(FriendshipsLogic, 'parseUpsertFriendshipRequest').mockReturnValueOnce(mockParsedRequest) + mockDb.getFriendship.mockImplementationOnce(() => { + throw new Error('Database error') + }) + + const result: UpsertFriendshipResponse = await upsertFriendship(mockRequest, rpcContext) + + expect(result).toEqual({ + response: { + $case: 'internalServerError', + internalServerError: {} + } + }) + }) +}) diff --git a/test/unit/friendships.spec.ts b/test/unit/logic/friendships.spec.ts similarity index 99% rename from test/unit/friendships.spec.ts rename to test/unit/logic/friendships.spec.ts index 6af800e..a6fa026 100644 --- a/test/unit/friendships.spec.ts +++ b/test/unit/logic/friendships.spec.ts @@ -5,8 +5,8 @@ import { parseEmittedUpdateToFriendshipUpdate, parseUpsertFriendshipRequest, validateNewFriendshipAction -} from '../../src/logic/friendships' -import { Action, FriendshipStatus } from '../../src/types' +} from '../../../src/logic/friendships' +import { Action, FriendshipStatus } from '../../../src/types' describe('isFriendshipActionValid()', () => { test('it should be valid if from is null and to is REQUEST ', () => { diff --git a/test/unit/utils/UWebSocketTransport.spec.ts b/test/unit/utils/UWebSocketTransport.spec.ts new file mode 100644 index 0000000..c786b17 --- /dev/null +++ b/test/unit/utils/UWebSocketTransport.spec.ts @@ -0,0 +1,84 @@ +import mitt, { Emitter } from 'mitt' +import { UWebSocketTransport, IUWebSocket, IUWebSocketEventMap } from '../../../src/utils/UWebSocketTransport' +import { Transport } from '@dcl/rpc' + +describe('UWebSocketTransport', () => { + let mockSocket: jest.Mocked> + let mockEmitter: Emitter + let transport: Transport + + beforeEach(() => { + mockSocket = { + send: jest.fn(), + close: jest.fn(), + end: jest.fn(), + getUserData: jest.fn(() => ({ isConnected: true })) + } as jest.Mocked> + + mockEmitter = mitt() + transport = UWebSocketTransport(mockSocket, mockEmitter) + }) + + describe('Transport Initialization', () => { + it('should initialize transport and emit connect event', async () => { + const connectListener = jest.fn() + transport.on('connect', connectListener) + + await new Promise(setImmediate) + + expect(connectListener).toHaveBeenCalled() + }) + }) + + describe('sendMessage', () => { + it('should send a Uint8Array message', () => { + const message = new Uint8Array([1, 2, 3]) + + transport.sendMessage(message) + expect(mockSocket.send).toHaveBeenCalledWith(message, true) + }) + }) + + describe('close', () => { + it('should call socket.close()', () => { + transport.close() + expect(mockSocket.close).toHaveBeenCalled() + }) + }) + + describe('Event Handling', () => { + it('should emit a close event when the WebSocket is closed', () => { + const closeListener = jest.fn() + transport.on('close', closeListener) + + mockEmitter.emit('close', undefined) + expect(closeListener).toHaveBeenCalledWith({}) + }) + + it('should emit a message event when a valid message is received', () => { + const messageListener = jest.fn() + transport.on('message', messageListener) + + const message = new ArrayBuffer(4) + mockEmitter.emit('message', message) + + expect(messageListener).toHaveBeenCalledWith(new Uint8Array(message)) + }) + + it('should throw an error for unsupported message types', () => { + const invalidMessage = 'Invalid message' + expect(() => mockEmitter.emit('message', invalidMessage)).toThrow( + 'WebSocketTransport: Received unknown type of message, expecting Uint8Array' + ) + }) + }) + + describe('isConnected', () => { + it('should reflect the socket connection state', () => { + expect(transport.isConnected).toBe(true) + + mockSocket.getUserData.mockReturnValueOnce({ isConnected: false }) + expect(transport.isConnected).toBe(false) + }) + }) +}) diff --git a/test/unit/utils/address.spec.ts b/test/unit/utils/address.spec.ts new file mode 100644 index 0000000..565846b --- /dev/null +++ b/test/unit/utils/address.spec.ts @@ -0,0 +1,12 @@ +import { normalizeAddress } from '../../../src/utils/address' + +describe('normalizeAddress', () => { + it.each([ + ['0xABCDEF1234567890', '0xabcdef1234567890'], + ['0xabcdef1234567890', '0xabcdef1234567890'], + ['0xAbCdEf1234567890', '0xabcdef1234567890'] + ])('should convert %s to %s', (address, expected) => { + const normalized = normalizeAddress(address) + expect(normalized).toBe(expected) + }) +}) diff --git a/test/unit/utils/emitterToGenerator.spec.ts b/test/unit/utils/emitterToGenerator.spec.ts new file mode 100644 index 0000000..a498bfa --- /dev/null +++ b/test/unit/utils/emitterToGenerator.spec.ts @@ -0,0 +1,125 @@ +import mitt, { Emitter } from 'mitt' +import emitterToAsyncGenerator from '../../../src/utils/emitterToGenerator' + +type TestEvents = { + testEvent: string +} + +describe('emitterToAsyncGenerator', () => { + let emitter: Emitter + + beforeEach(() => { + emitter = mitt() + }) + + it('should yield events emitted on the specified event type', async () => { + const generator = emitterToAsyncGenerator(emitter, 'testEvent') + + // Emit an event + const emittedValue = 'Hello, World!' + emitter.emit('testEvent', emittedValue) + + // Consume the generator + const result = await generator.next() + + expect(result.value).toBe(emittedValue) + expect(result.done).toBe(false) + }) + + it('should queue events until next() is called', async () => { + const generator = emitterToAsyncGenerator(emitter, 'testEvent') + + // Emit multiple events + const emittedValues = ['Event 1', 'Event 2', 'Event 3'] + emittedValues.forEach((value) => emitter.emit('testEvent', value)) + + // Consume the generator + for (const expectedValue of emittedValues) { + const result = await generator.next() + expect(result.value).toBe(expectedValue) + expect(result.done).toBe(false) + } + }) + + it('should block on next() until an event is emitted', async () => { + const generator = emitterToAsyncGenerator(emitter, 'testEvent') + + const emittedValue = 'Delayed Event' + + // Simulate a delayed event + setTimeout(() => emitter.emit('testEvent', emittedValue), 100) + + const result = await generator.next() + expect(result.value).toBe(emittedValue) + expect(result.done).toBe(false) + }) + + it('should handle return() to terminate the generator', async () => { + const generator = emitterToAsyncGenerator(emitter, 'testEvent') + + const result = await generator.return('Completed') + expect(result.value).toBe('Completed') + expect(result.done).toBe(true) + }) + + it('should handle throw() to propagate an error', async () => { + const generator = emitterToAsyncGenerator(emitter, 'testEvent') + + const error = new Error('Test error') + + await expect(generator.throw(error)).rejects.toThrow('Test error') + }) + + it('should handle no events being emitted gracefully', async () => { + const generator = emitterToAsyncGenerator(emitter, 'testEvent') + + const promise = generator.next() + expect(promise).toBeInstanceOf(Promise) // Ensure it waits indefinitely + }) + + it('should process events in order when emitted rapidly', async () => { + const generator = emitterToAsyncGenerator(emitter, 'testEvent') + + // Emit events rapidly + const emittedValues = ['Event A', 'Event B', 'Event C'] + emittedValues.forEach((value) => emitter.emit('testEvent', value)) + + const results = [] + for (let i = 0; i < emittedValues.length; i++) { + const result = await generator.next() + results.push(result.value) + } + + expect(results).toEqual(emittedValues) + }) + + it('should allow multiple consumers with separate generators', async () => { + const generator1 = emitterToAsyncGenerator(emitter, 'testEvent') + const generator2 = emitterToAsyncGenerator(emitter, 'testEvent') + + const emittedValue1 = 'Generator 1 Event' + const emittedValue2 = 'Generator 2 Event' + + emitter.emit('testEvent', emittedValue1) + emitter.emit('testEvent', emittedValue2) + + const [result11, result12, result21, result22] = await Promise.all([ + generator1.next(), + generator1.next(), + generator2.next(), + generator2.next() + ]) + + expect(result11.value).toBe(emittedValue1) + expect(result11.done).toBe(false) + + expect(result12.value).toBe(emittedValue2) + expect(result12.done).toBe(false) + + expect(result21.value).toBe(emittedValue1) + expect(result21.done).toBe(false) + + expect(result22.value).toBe(emittedValue2) + expect(result22.done).toBe(false) + }) +}) diff --git a/test/unit/utils/wsUserData.spec.ts b/test/unit/utils/wsUserData.spec.ts new file mode 100644 index 0000000..fca803e --- /dev/null +++ b/test/unit/utils/wsUserData.spec.ts @@ -0,0 +1,28 @@ +import { WsUserData, WsNotAuthenticatedUserData } from '../../../src/types' +import { isNotAuthenticated } from '../../../src/utils/wsUserData' +import { IUWebSocketEventMap } from '../../../src/utils/UWebSocketTransport' +import { Emitter } from 'mitt' + +describe('wsUserData', () => { + describe('isNotAuthenticated', () => { + it('should return false if the user is authenticated', () => { + const data: WsUserData = { + auth: true, + isConnected: false, + eventEmitter: { emit: jest.fn() } as unknown as Emitter, + address: '0x123' + } + + expect(isNotAuthenticated(data)).toBe(false) + }) + + it('should return true with the correct type if the user is not authenticated', () => { + const data: WsUserData = { + auth: false, + isConnected: false + } + + expect(isNotAuthenticated(data)).toBe(true) + }) + }) +})