From 3b1a7073f1c058a9ff3bfbfe98d48f4010b5af2a Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Thu, 9 Jan 2025 00:10:09 +0700 Subject: [PATCH] UBERF-8899: Reconnect performance issues + Fix slow PG driver workspace start Signed-off-by: Andrey Sobolev --- packages/core/src/client.ts | 1 + plugins/client-resources/src/connection.ts | 12 +++-- plugins/workbench-resources/src/connect.ts | 2 +- server/core/src/dbAdapterManager.ts | 62 ++++++++++++---------- server/core/src/pipeline.ts | 2 + server/core/src/types.ts | 2 +- server/middleware/src/dbAdapterHelper.ts | 11 ++-- server/middleware/src/domainTx.ts | 2 + server/middleware/src/model.ts | 4 +- server/postgres/src/utils.ts | 14 ++--- server/rpc/src/rpc.ts | 2 + server/server/src/sessionManager.ts | 11 ++-- 12 files changed, 73 insertions(+), 52 deletions(-) diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index f20273e518e..0f9e9e48496 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -329,6 +329,7 @@ async function tryLoadModel ( if (conn.getLastHash !== undefined && (await conn.getLastHash(ctx)) === current.hash) { // We have same model hash. + current.full = false // Since we load, no need to send full return current } const lastTxTime = getLastTxTime(current.transactions) diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index 0535c8b126c..d55a8ec0dbe 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -44,6 +44,7 @@ import core, { TxApplyIf, TxHandler, TxResult, + clone, generateId, toFindResult, type MeasureContext @@ -108,6 +109,8 @@ class Connection implements ClientConnection { private helloRecieved: boolean = false + private account: Account | undefined + onConnect?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise rpcHandler = new RPCHandler() @@ -303,7 +306,7 @@ class Connection implements ClientConnection { this.websocket?.close() return } - + this.account = helloResp.account this.helloRecieved = true if (this.upgrading) { // We need to call upgrade since connection is upgraded @@ -322,8 +325,8 @@ class Connection implements ClientConnection { } void this.onConnect?.( - (resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected, - (resp as HelloResponse).lastTx, + helloResp.reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected, + helloResp.lastTx, this.sessionId ) this.schedulePing(socketId) @@ -635,6 +638,9 @@ class Connection implements ClientConnection { } getAccount (): Promise { + if (this.account !== undefined) { + return clone(this.account) + } return this.sendRequest({ method: 'getAccount', params: [] }) } diff --git a/plugins/workbench-resources/src/connect.ts b/plugins/workbench-resources/src/connect.ts index 57874d332e4..6103f985cae 100644 --- a/plugins/workbench-resources/src/connect.ts +++ b/plugins/workbench-resources/src/connect.ts @@ -227,7 +227,7 @@ export async function connect (title: string): Promise { return } try { - if (event === ClientConnectEvent.Connected) { + if (event === ClientConnectEvent.Connected || event === ClientConnectEvent.Reconnected) { setMetadata(presentation.metadata.SessionId, data) } if ((_clientSet && event === ClientConnectEvent.Connected) || event === ClientConnectEvent.Refresh) { diff --git a/server/core/src/dbAdapterManager.ts b/server/core/src/dbAdapterManager.ts index 5a35a3a76d0..6119fe0e3ee 100644 --- a/server/core/src/dbAdapterManager.ts +++ b/server/core/src/dbAdapterManager.ts @@ -64,43 +64,47 @@ export class DbAdapterManagerImpl implements DBAdapterManager { return this.defaultAdapter } - async registerHelper (helper: DomainHelper): Promise { + async registerHelper (ctx: MeasureContext, helper: DomainHelper): Promise { this.domainHelper = helper - await this.initDomains() + await this.initDomains(ctx) } - async initDomains (): Promise { + async initDomains (ctx: MeasureContext): Promise { const adapterDomains = new Map>() for (const d of this.context.hierarchy.domains()) { // We need to init domain info - const info = this.getDomainInfo(d) - await this.updateInfo(d, adapterDomains, info) + await ctx.with('update-info', { domain: d }, async (ctx) => { + const info = this.getDomainInfo(d) + await this.updateInfo(d, adapterDomains, info) + }) } - for (const adapter of this.adapters.values()) { - adapter.on?.((domain, event, count, helper) => { - const info = this.getDomainInfo(domain) - const oldDocuments = info.documents - switch (event) { - case 'add': - info.documents += count - break - case 'update': - break - case 'delete': - info.documents -= count - break - case 'read': - break - } + for (const [name, adapter] of this.adapters.entries()) { + await ctx.with('domain-helper', { name }, async (ctx) => { + adapter.on?.((domain, event, count, helper) => { + const info = this.getDomainInfo(domain) + const oldDocuments = info.documents + switch (event) { + case 'add': + info.documents += count + break + case 'update': + break + case 'delete': + info.documents -= count + break + case 'read': + break + } - if (oldDocuments < 50 && info.documents > 50) { - // We have more 50 documents, we need to check for indexes - void this.domainHelper?.checkDomain(this.metrics, domain, info.documents, helper) - } - if (oldDocuments > 50 && info.documents < 50) { - // We have more 50 documents, we need to check for indexes - void this.domainHelper?.checkDomain(this.metrics, domain, info.documents, helper) - } + if (oldDocuments < 50 && info.documents > 50) { + // We have more 50 documents, we need to check for indexes + void this.domainHelper?.checkDomain(this.metrics, domain, info.documents, helper) + } + if (oldDocuments > 50 && info.documents < 50) { + // We have more 50 documents, we need to check for indexes + void this.domainHelper?.checkDomain(this.metrics, domain, info.documents, helper) + } + }) }) } } diff --git a/server/core/src/pipeline.ts b/server/core/src/pipeline.ts index 1b600a21842..35ff9ddec59 100644 --- a/server/core/src/pipeline.ts +++ b/server/core/src/pipeline.ts @@ -16,6 +16,7 @@ import { Analytics } from '@hcengineering/analytics' import { toFindResult, + withContext, type Class, type Doc, type DocumentQuery, @@ -66,6 +67,7 @@ class PipelineImpl implements Pipeline { return pipeline } + @withContext('build-chain') private async buildChain ( ctx: MeasureContext, constructors: MiddlewareCreator[], diff --git a/server/core/src/types.ts b/server/core/src/types.ts index dca17b3c016..0b8a11fde8f 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -151,7 +151,7 @@ export interface DBAdapterManager { close: () => Promise - registerHelper: (helper: DomainHelper) => Promise + registerHelper: (ctx: MeasureContext, helper: DomainHelper) => Promise initAdapters: (ctx: MeasureContext) => Promise diff --git a/server/middleware/src/dbAdapterHelper.ts b/server/middleware/src/dbAdapterHelper.ts index 1142766b7f3..e25194a7d0f 100644 --- a/server/middleware/src/dbAdapterHelper.ts +++ b/server/middleware/src/dbAdapterHelper.ts @@ -13,7 +13,7 @@ // limitations under the License. // -import { type MeasureContext } from '@hcengineering/core' +import { withContext, type MeasureContext } from '@hcengineering/core' import type { Middleware, PipelineContext } from '@hcengineering/server-core' import { BaseMiddleware, DomainIndexHelperImpl } from '@hcengineering/server-core' @@ -21,14 +21,19 @@ import { BaseMiddleware, DomainIndexHelperImpl } from '@hcengineering/server-cor * @public */ export class DBAdapterInitMiddleware extends BaseMiddleware implements Middleware { + @withContext('db-adapter-init') static async create ( ctx: MeasureContext, context: PipelineContext, next?: Middleware ): Promise { - await context.adapterManager?.initAdapters?.(ctx) + await ctx.with('init-adapters', {}, async (ctx) => { + await context.adapterManager?.initAdapters?.(ctx) + }) const domainHelper = new DomainIndexHelperImpl(ctx, context.hierarchy, context.modelDb, context.workspace) - await context.adapterManager?.registerHelper?.(domainHelper) + await ctx.with('register-helper', {}, async (ctx) => { + await context.adapterManager?.registerHelper?.(ctx, domainHelper) + }) return undefined } } diff --git a/server/middleware/src/domainTx.ts b/server/middleware/src/domainTx.ts index e155e4e0852..51978183964 100644 --- a/server/middleware/src/domainTx.ts +++ b/server/middleware/src/domainTx.ts @@ -17,6 +17,7 @@ import core, { Domain, groupByArray, TxProcessor, + withContext, type Doc, type MeasureContext, type SessionData, @@ -41,6 +42,7 @@ import { BaseMiddleware } from '@hcengineering/server-core' export class DomainTxMiddleware extends BaseMiddleware implements Middleware { adapterManager!: DBAdapterManager + @withContext('domainTx-middleware') static async create (ctx: MeasureContext, context: PipelineContext, next?: Middleware): Promise { const middleware = new DomainTxMiddleware(context, next) if (context.adapterManager == null) { diff --git a/server/middleware/src/model.ts b/server/middleware/src/model.ts index 499f48e01cd..5e6ee6736a7 100644 --- a/server/middleware/src/model.ts +++ b/server/middleware/src/model.ts @@ -20,7 +20,8 @@ import core, { type Timestamp, type Tx, type TxCUD, - DOMAIN_TX + DOMAIN_TX, + withContext } from '@hcengineering/core' import { PlatformError, unknownError } from '@hcengineering/platform' import type { @@ -51,6 +52,7 @@ export class ModelMiddleware extends BaseMiddleware implements Middleware { super(context, next) } + @withContext('modelAdapter-middleware') static async doCreate ( ctx: MeasureContext, context: PipelineContext, diff --git a/server/postgres/src/utils.ts b/server/postgres/src/utils.ts index f05927f68c6..6d190e9d930 100644 --- a/server/postgres/src/utils.ts +++ b/server/postgres/src/utils.ts @@ -450,12 +450,8 @@ export class DBCollectionHelper implements DomainHelperOperations { async create (domain: Domain): Promise {} async exists (domain: Domain): Promise { - const exists = await this.client` - SELECT table_name - FROM information_schema.tables - WHERE table_name = '${this.client(translateDomain(domain))}' - ` - return exists.length > 0 + // Always exists. We don't need to check for index existence + return true } async listDomains (): Promise> { @@ -469,10 +465,8 @@ export class DBCollectionHelper implements DomainHelperOperations { } async estimatedCount (domain: Domain): Promise { - const res = await this - .client`SELECT COUNT(_id) FROM ${this.client(translateDomain(domain))} WHERE "workspaceId" = ${this.workspaceId.name}` - - return res.count + // We should always return 0, since no controlled index stuff is required for postgres driver + return 0 } } diff --git a/server/rpc/src/rpc.ts b/server/rpc/src/rpc.ts index ac9022c3899..f4afa6929c0 100644 --- a/server/rpc/src/rpc.ts +++ b/server/rpc/src/rpc.ts @@ -13,6 +13,7 @@ // limitations under the License. // +import type { Account } from '@hcengineering/core' import platform, { PlatformError, Severity, Status } from '@hcengineering/platform' import { Packr } from 'msgpackr' @@ -48,6 +49,7 @@ export interface HelloResponse extends Response { serverVersion: string lastTx?: string lastHash?: string // Last model hash + account: Account } function replacer (key: string, value: any): any { diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index d551af9721e..d02ca203554 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -418,6 +418,7 @@ class TSessionManager implements SessionManager { }) workspace = this.createWorkspace( ctx.parent ?? ctx, + ctx, pipelineFactory, token, workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId, @@ -435,7 +436,7 @@ class TSessionManager implements SessionManager { workspace: workspaceInfo.workspaceId, wsUrl: workspaceInfo.workspaceUrl }) - pipeline = await ctx.with('💤 wait', { workspaceName }, () => (workspace as Workspace).pipeline) + pipeline = await ctx.with('💤 wait-pipeline', {}, () => (workspace as Workspace).pipeline) } else { ctx.warn('reconnect workspace in upgrade switch', { email: token.email, @@ -466,9 +467,10 @@ class TSessionManager implements SessionManager { }) return { upgrade: true } } + try { if (workspace.pipeline instanceof Promise) { - pipeline = await workspace.pipeline + pipeline = await ctx.with('💤 wait-pipeline', {}, () => (workspace as Workspace).pipeline) workspace.pipeline = pipeline } else { pipeline = workspace.pipeline @@ -645,6 +647,7 @@ class TSessionManager implements SessionManager { private createWorkspace ( ctx: MeasureContext, + pipelineCtx: MeasureContext, pipelineFactory: PipelineFactory, token: Token, workspaceUrl: string, @@ -655,7 +658,6 @@ class TSessionManager implements SessionManager { const wsId = toWorkspaceString(token.workspace) const upgrade = token.extra?.model === 'upgrade' const context = ctx.newChild('🧲 session', {}) - const pipelineCtx = context.newChild('🧲 pipeline-factory', {}) const workspace: Workspace = { context, id: generateId(), @@ -1106,7 +1108,8 @@ class TSessionManager implements SessionManager { reconnect, serverVersion: this.serverVersion, lastTx: pipeline.context.lastTx, - lastHash: pipeline.context.lastHash + lastHash: pipeline.context.lastHash, + account: service.getRawAccount(pipeline) } ws.send(requestCtx, helloResponse, false, false) }