Skip to content

Commit

Permalink
refactor: RPC Services to diff files
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinszuchet committed Jan 13, 2025
1 parent 842c62d commit f4a150e
Show file tree
Hide file tree
Showing 13 changed files with 399 additions and 325 deletions.
2 changes: 2 additions & 0 deletions src/adapters/rpc-server/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const FRIENDSHIPS_COUNT_PAGE_STREAM = 20
export const INTERNAL_SERVER_ERROR = 'SERVER ERROR'
1 change: 1 addition & 0 deletions src/adapters/rpc-server/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './rpc-server'
78 changes: 78 additions & 0 deletions src/adapters/rpc-server/rpc-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Transport, createRpcServer } from '@dcl/rpc'
import { SocialServiceDefinition } from '@dcl/protocol/out-js/decentraland/social_service_v2/social_service.gen'
import { registerService } from '@dcl/rpc/dist/codegen'
import { IBaseComponent } from '@well-known-components/interfaces'
import { AppComponents, RpcServerContext, SubscriptionEventsEmitter } from '../../types'
import { getFriendsService } from './services/get-friends'
import { getMutualFriendsService } from './services/get-mutual-friends'
import { getPendingFriendshipRequestsService } from './services/get-pending-friendship-requests'
import { upsertFriendshipService } from './services/upsert-friendship'
import { subscribeToFriendshipUpdatesService } from './services/subscribe-to-friendship-updates'

export type IRPCServerComponent = IBaseComponent & {
attachUser(user: { transport: Transport; address: string }): void
}

export async function createRpcServerComponent(
components: Pick<AppComponents, 'logs' | 'db' | 'pubsub' | 'config' | 'server'>
): Promise<IRPCServerComponent> {
const { logs, db, pubsub, config, server } = components

const SHARED_CONTEXT: Pick<RpcServerContext, 'subscribers'> = {
subscribers: {}
}

const rpcServer = createRpcServer<RpcServerContext>({
logger: logs.getLogger('rpcServer')
})

const logger = logs.getLogger('rpcServer-handler')

const rpcServerPort = (await config.getNumber('RPC_SERVER_PORT')) || 8085

const getFriends = getFriendsService({ components: { logs, db } })
const getMutualFriends = getMutualFriendsService({ components: { logs, db } })
const getPendingFriendshipRequests = getPendingFriendshipRequestsService({ components: { logs, db } })
const getSentFriendshipRequests = getPendingFriendshipRequestsService({ components: { logs, db } })
const upsertFriendship = upsertFriendshipService({ components: { logs, db, pubsub } })
const subscribeToFriendshipUpdates = subscribeToFriendshipUpdatesService({ components: { logs } })

rpcServer.setHandler(async function handler(port) {
registerService(port, SocialServiceDefinition, async () => ({
getFriends,
getMutualFriends,
getPendingFriendshipRequests,
getSentFriendshipRequests,
upsertFriendship,
subscribeToFriendshipUpdates
}))
})

return {
async start() {
server.app.listen(rpcServerPort, () => {
logger.info(`[RPC] RPC Server listening on port ${rpcServerPort}`)
})

await pubsub.subscribeToFriendshipUpdates((message) => {
try {
const update = JSON.parse(message) as SubscriptionEventsEmitter['update']
const updateEmitter = SHARED_CONTEXT.subscribers[update.to]
if (updateEmitter) {
updateEmitter.emit('update', update)
}
} catch (error) {
logger.error(error as any)
}
})
},
attachUser({ transport, address }) {
transport.on('close', () => {
if (SHARED_CONTEXT.subscribers[address]) {
delete SHARED_CONTEXT.subscribers[address]
}
})
rpcServer.attachTransport(transport, { subscribers: SHARED_CONTEXT.subscribers, address })
}
}
}
50 changes: 50 additions & 0 deletions src/adapters/rpc-server/services/get-friends.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen'
import { Friendship, RpcServerContext, RPCServiceContext } from '../../../types'
import { INTERNAL_SERVER_ERROR, FRIENDSHIPS_COUNT_PAGE_STREAM } from '../constants'
import { UsersResponse } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen'

export function getFriendsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) {
const logger = logs.getLogger('get-friends-service')

return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator<UsersResponse> {
logger.debug('getting friends for ', { address: context.address })
let friendsGenerator: AsyncGenerator<Friendship> | undefined
try {
friendsGenerator = db.getFriends(context.address)
} catch (error) {
logger.error(error as any)
// throw an error bc there is no sense to create a generator to send an error
// as it's done in the previous Social Service
throw new Error(INTERNAL_SERVER_ERROR)
}

const generator = async function* () {
let users = []
for await (const friendship of friendsGenerator) {
const { address_requested, address_requester } = friendship
if (context.address === address_requested) {
users.push({ address: address_requester })
} else {
users.push({ address: address_requested })
}

if (users.length === FRIENDSHIPS_COUNT_PAGE_STREAM) {
const response = {
users: [...users]
}
users = []
yield response
}
}

if (users.length) {
const response = {
users
}
yield response
}
}

return generator()
}
}
47 changes: 47 additions & 0 deletions src/adapters/rpc-server/services/get-mutual-friends.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { RpcServerContext, RPCServiceContext } from '../../../types'
import { INTERNAL_SERVER_ERROR, FRIENDSHIPS_COUNT_PAGE_STREAM } from '../constants'
import {
MutualFriendsPayload,
UsersResponse
} from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen'
import { normalizeAddress } from '../../../utils/address'

export function getMutualFriendsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) {
const logger = logs.getLogger('get-mutual-friends-service')

return async function* (request: MutualFriendsPayload, context: RpcServerContext): AsyncGenerator<UsersResponse> {
logger.debug(`getting mutual friends ${context.address}<>${request.user!.address}`)
let mutualFriends: AsyncGenerator<{ address: string }> | undefined
try {
mutualFriends = db.getMutualFriends(context.address, normalizeAddress(request.user!.address))
} catch (error) {
logger.error(error as any)
// throw an error bc there is no sense to create a generator to send an error
// as it's done in the previous Social Service
throw new Error(INTERNAL_SERVER_ERROR)
}

const generator = async function* () {
const users = []
for await (const friendship of mutualFriends) {
const { address } = friendship
users.push({ address })
if (users.length === FRIENDSHIPS_COUNT_PAGE_STREAM) {
const response = {
users
}
yield response
}
}

if (users.length) {
const response = {
users
}
yield response
}
}

return generator()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen'
import { RpcServerContext, RPCServiceContext } from '../../../types'
import { FriendshipRequestsResponse } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen'

export function getPendingFriendshipRequestsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) {
const logger = logs.getLogger('get-pending-friendship-requests-service')

return async function (_request: Empty, context: RpcServerContext): Promise<FriendshipRequestsResponse> {
try {
const pendingRequests = await db.getReceivedFriendshipRequests(context.address)
const mappedRequests = pendingRequests.map(({ address, timestamp, metadata }) => ({
user: { address },
createdAt: new Date(timestamp).getTime(),
message: metadata?.message || ''
}))

return {
response: {
$case: 'requests',
requests: {
requests: mappedRequests
}
}
}
} catch (error) {
logger.error(error as any)
return {
response: {
$case: 'internalServerError',
internalServerError: {}
}
}
}
}
}
35 changes: 35 additions & 0 deletions src/adapters/rpc-server/services/get-sent-friendship-requests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen'
import { RpcServerContext, RPCServiceContext } from '../../../types'
import { FriendshipRequestsResponse } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen'

export function getSentFriendshipRequestsService({ components: { logs, db } }: RPCServiceContext<'logs' | 'db'>) {
const logger = logs.getLogger('get-sent-friendship-requests-service')

return async function (_request: Empty, context: RpcServerContext): Promise<FriendshipRequestsResponse> {
try {
const pendingRequests = await db.getSentFriendshipRequests(context.address)
const mappedRequests = pendingRequests.map(({ address, timestamp, metadata }) => ({
user: { address },
createdAt: new Date(timestamp).getTime(),
message: metadata?.message || ''
}))

return {
response: {
$case: 'requests',
requests: {
requests: mappedRequests
}
}
}
} catch (error) {
logger.error(error as any)
return {
response: {
$case: 'internalServerError',
internalServerError: {}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Empty } from '@dcl/protocol/out-ts/google/protobuf/empty.gen'
import { RpcServerContext, RPCServiceContext, SubscriptionEventsEmitter } from '../../../types'
import { FriendshipUpdate } from '@dcl/protocol/out-ts/decentraland/social_service_v2/social_service.gen'
import mitt from 'mitt'
import { parseEmittedUpdateToFriendshipUpdate } from '../../../logic/friendships'
import emitterToAsyncGenerator from '../../../utils/emitterToGenerator'

export function subscribeToFriendshipUpdatesService({ components: { logs } }: RPCServiceContext<'logs'>) {
const logger = logs.getLogger('subscribe-to-friendship-updates-service')

return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator<FriendshipUpdate> {
const eventEmitter = mitt<SubscriptionEventsEmitter>()
context.subscribers[context.address] = eventEmitter
const updatesGenerator = emitterToAsyncGenerator(eventEmitter, 'update')

const generator = async function* () {
for await (const update of updatesGenerator) {
logger.debug('> friendship update received, sending: ', { update: update as any })
const updateToResponse = parseEmittedUpdateToFriendshipUpdate(update)
if (updateToResponse) {
yield updateToResponse
} else {
logger.error('> unable to parse update to FriendshipUpdate > ', { update: update as any })
}
}
}

return generator()
}
}
Loading

0 comments on commit f4a150e

Please sign in to comment.