Skip to content

Commit

Permalink
refactor: RPC services to different files + tests (#31)
Browse files Browse the repository at this point in the history
* refactor: RPC Services to diff files

* fix: Services returning generators of generators

* test: Add Get Friends and Mutuals tests

* test: Get Pending Frienship Requests tests

* test: Get Sent Frienship Requests tests

* test: Upsert Friendship tests

* test: Upsert Friendship tests tiny refactor

* test: Subscribe to Friendship Updates test and todo tests

* test: Add DB tests

* refactor: Move tests to proper dir

* test: Normalize address

* test: Emitter to Generator tests

* test: Add utils tests

* test: Rpc Server Adapter tests

* fix: Coverage was complaining because of a mismatch in a type without timeout
  • Loading branch information
kevinszuchet authored Jan 15, 2025
1 parent 842c62d commit a66b796
Show file tree
Hide file tree
Showing 42 changed files with 1,554 additions and 354 deletions.
12 changes: 6 additions & 6 deletions jest.config.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module.exports = {
transform: {
"^.+\\.(ts|tsx)$": ["ts-jest", {tsconfig: "test/tsconfig.json"}]
'^.+\\.(ts|tsx)$': ['ts-jest', { tsconfig: 'test/tsconfig.json' }]
},
moduleFileExtensions: ["ts", "js"],
coverageDirectory: "coverage",
collectCoverageFrom: ["src/**/*.ts", "src/**/*.js"],
testMatch: ["**/*.spec.(ts)"],
testEnvironment: "node",
moduleFileExtensions: ['ts', 'js'],
coverageDirectory: 'coverage',
collectCoverageFrom: ['src/**/*.ts', 'src/**/*.js', '!src/migrations/**'],
testMatch: ['**/*.spec.(ts)'],
testEnvironment: 'node'
}
2 changes: 0 additions & 2 deletions src/adapters/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
async updateFriendshipStatus(friendshipId, isActive, txClient) {
logger.debug(`updating ${friendshipId} - ${isActive}`)
const query = SQL`UPDATE friendships SET is_active = ${isActive}, updated_at = now() WHERE id = ${friendshipId}`
console.log(query.text)
console.log(query.values)

if (txClient) {
const results = await txClient.query(query)
Expand Down
2 changes: 2 additions & 0 deletions src/adapters/rpc-server/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const FRIENDSHIPS_COUNT_PAGE_STREAM = 20
export const INTERNAL_SERVER_ERROR = 'SERVER ERROR'
1 change: 1 addition & 0 deletions src/adapters/rpc-server/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './rpc-server'
78 changes: 78 additions & 0 deletions src/adapters/rpc-server/rpc-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Transport, createRpcServer } from '@dcl/rpc'
import { SocialServiceDefinition } from '@dcl/protocol/out-js/decentraland/social_service_v2/social_service.gen'
import { registerService } from '@dcl/rpc/dist/codegen'
import { IBaseComponent } from '@well-known-components/interfaces'
import { AppComponents, RpcServerContext, SubscriptionEventsEmitter } from '../../types'
import { getFriendsService } from './services/get-friends'
import { getMutualFriendsService } from './services/get-mutual-friends'
import { getPendingFriendshipRequestsService } from './services/get-pending-friendship-requests'
import { upsertFriendshipService } from './services/upsert-friendship'
import { subscribeToFriendshipUpdatesService } from './services/subscribe-to-friendship-updates'

export type IRPCServerComponent = IBaseComponent & {
attachUser(user: { transport: Transport; address: string }): void
}

export async function createRpcServerComponent(
components: Pick<AppComponents, 'logs' | 'db' | 'pubsub' | 'config' | 'server'>
): Promise<IRPCServerComponent> {
const { logs, db, pubsub, config, server } = components

const SHARED_CONTEXT: Pick<RpcServerContext, 'subscribers'> = {
subscribers: {}
}

const rpcServer = createRpcServer<RpcServerContext>({
logger: logs.getLogger('rpcServer')
})

const logger = logs.getLogger('rpcServer-handler')

const rpcServerPort = (await config.getNumber('RPC_SERVER_PORT')) || 8085

const getFriends = getFriendsService({ components: { logs, db } })
const getMutualFriends = getMutualFriendsService({ components: { logs, db } })
const getPendingFriendshipRequests = getPendingFriendshipRequestsService({ components: { logs, db } })
const getSentFriendshipRequests = getPendingFriendshipRequestsService({ components: { logs, db } })
const upsertFriendship = upsertFriendshipService({ components: { logs, db, pubsub } })
const subscribeToFriendshipUpdates = subscribeToFriendshipUpdatesService({ components: { logs } })

rpcServer.setHandler(async function handler(port) {
registerService(port, SocialServiceDefinition, async () => ({
getFriends,
getMutualFriends,
getPendingFriendshipRequests,
getSentFriendshipRequests,
upsertFriendship,
subscribeToFriendshipUpdates
}))
})

return {
async start() {
server.app.listen(rpcServerPort, () => {
logger.info(`[RPC] RPC Server listening on port ${rpcServerPort}`)
})

await pubsub.subscribeToFriendshipUpdates((message) => {
try {
const update = JSON.parse(message) as SubscriptionEventsEmitter['update']
const updateEmitter = SHARED_CONTEXT.subscribers[update.to]
if (updateEmitter) {
updateEmitter.emit('update', update)
}
} catch (error) {
logger.error(error as any)
}
})
},
attachUser({ transport, address }) {
transport.on('close', () => {
if (SHARED_CONTEXT.subscribers[address]) {
delete SHARED_CONTEXT.subscribers[address]
}
})
rpcServer.attachTransport(transport, { subscribers: SHARED_CONTEXT.subscribers, address })
}
}
}
46 changes: 46 additions & 0 deletions src/adapters/rpc-server/services/get-friends.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen'
import { Friendship, RpcServerContext, RPCServiceContext } from '../../../types'
import { INTERNAL_SERVER_ERROR, FRIENDSHIPS_COUNT_PAGE_STREAM } from '../constants'
import { UsersResponse } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen'

export function getFriendsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) {
const logger = logs.getLogger('get-friends-service')

return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator<UsersResponse> {
let friendsGenerator: AsyncGenerator<Friendship> | undefined
try {
friendsGenerator = db.getFriends(context.address)
} catch (error) {
logger.error(error as any)
// throw an error bc there is no sense to create a generator to send an error
// as it's done in the previous Social Service
throw new Error(INTERNAL_SERVER_ERROR)
}

let users = []

for await (const friendship of friendsGenerator) {
const { address_requested, address_requester } = friendship
if (context.address === address_requested) {
users.push({ address: address_requester })
} else {
users.push({ address: address_requested })
}

if (users.length === FRIENDSHIPS_COUNT_PAGE_STREAM) {
const response = {
users: [...users]
}
users = []
yield response
}
}

if (users.length) {
const response = {
users
}
yield response
}
}
}
45 changes: 45 additions & 0 deletions src/adapters/rpc-server/services/get-mutual-friends.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { RpcServerContext, RPCServiceContext } from '../../../types'
import { INTERNAL_SERVER_ERROR, FRIENDSHIPS_COUNT_PAGE_STREAM } from '../constants'
import {
MutualFriendsPayload,
UsersResponse
} from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen'
import { normalizeAddress } from '../../../utils/address'

export function getMutualFriendsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) {
const logger = logs.getLogger('get-mutual-friends-service')

return async function* (request: MutualFriendsPayload, context: RpcServerContext): AsyncGenerator<UsersResponse> {
logger.debug(`getting mutual friends ${context.address}<>${request.user!.address}`)
let mutualFriends: AsyncGenerator<{ address: string }> | undefined
try {
mutualFriends = db.getMutualFriends(context.address, normalizeAddress(request.user!.address))
} catch (error) {
logger.error(error as any)
// throw an error bc there is no sense to create a generator to send an error
// as it's done in the previous Social Service
throw new Error(INTERNAL_SERVER_ERROR)
}

let users = []

for await (const friendship of mutualFriends) {
const { address } = friendship
users.push({ address })
if (users.length === FRIENDSHIPS_COUNT_PAGE_STREAM) {
const response = {
users
}
yield response
users = []
}
}

if (users.length) {
const response = {
users
}
yield response
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen'
import { RpcServerContext, RPCServiceContext } from '../../../types'
import { FriendshipRequestsResponse } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen'

export function getPendingFriendshipRequestsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) {
const logger = logs.getLogger('get-pending-friendship-requests-service')

return async function (_request: Empty, context: RpcServerContext): Promise<FriendshipRequestsResponse> {
try {
const pendingRequests = await db.getReceivedFriendshipRequests(context.address)
const mappedRequests = pendingRequests.map(({ address, timestamp, metadata }) => ({
user: { address },
createdAt: new Date(timestamp).getTime(),
message: metadata?.message || ''
}))

return {
response: {
$case: 'requests',
requests: {
requests: mappedRequests
}
}
}
} catch (error) {
logger.error(error as any)
return {
response: {
$case: 'internalServerError',
internalServerError: {}
}
}
}
}
}
35 changes: 35 additions & 0 deletions src/adapters/rpc-server/services/get-sent-friendship-requests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen'
import { RpcServerContext, RPCServiceContext } from '../../../types'
import { FriendshipRequestsResponse } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen'

export function getSentFriendshipRequestsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) {
const logger = logs.getLogger('get-sent-friendship-requests-service')

return async function (_request: Empty, context: RpcServerContext): Promise<FriendshipRequestsResponse> {
try {
const pendingRequests = await db.getSentFriendshipRequests(context.address)
const mappedRequests = pendingRequests.map(({ address, timestamp, metadata }) => ({
user: { address },
createdAt: new Date(timestamp).getTime(),
message: metadata?.message || ''
}))

return {
response: {
$case: 'requests',
requests: {
requests: mappedRequests
}
}
}
} catch (error) {
logger.error(error as any)
return {
response: {
$case: 'internalServerError',
internalServerError: {}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen'
import { RpcServerContext, RPCServiceContext, SubscriptionEventsEmitter } from '../../../types'
import { FriendshipUpdate } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen'
import mitt from 'mitt'
import { parseEmittedUpdateToFriendshipUpdate } from '../../../logic/friendships'
import emitterToAsyncGenerator from '../../../utils/emitterToGenerator'

export function subscribeToFriendshipUpdatesService({ components: { logs } }: RPCServiceContext<'logs'>) {
const logger = logs.getLogger('subscribe-to-friendship-updates-service')

return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator<FriendshipUpdate> {
const eventEmitter = context.subscribers[context.address] || mitt<SubscriptionEventsEmitter>()

if (!context.subscribers[context.address]) {
context.subscribers[context.address] = eventEmitter
}

const updatesGenerator = emitterToAsyncGenerator(eventEmitter, 'update')

for await (const update of updatesGenerator) {
logger.debug('> friendship update received, sending: ', { update: update as any })
const updateToResponse = parseEmittedUpdateToFriendshipUpdate(update)
if (updateToResponse) {
yield updateToResponse
} else {
logger.error('> unable to parse update to FriendshipUpdate > ', { update: update as any })
}
}
}
}
Loading

0 comments on commit a66b796

Please sign in to comment.