Skip to content

Commit

Permalink
UBERF-8899: Reconnect performance issues
Browse files Browse the repository at this point in the history
+ Fix slow PG driver workspace start

Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo committed Jan 8, 2025
1 parent bfe2960 commit 3b1a707
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 52 deletions.
1 change: 1 addition & 0 deletions packages/core/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ async function tryLoadModel (

if (conn.getLastHash !== undefined && (await conn.getLastHash(ctx)) === current.hash) {
// We have same model hash.
current.full = false // Since we load, no need to send full
return current
}
const lastTxTime = getLastTxTime(current.transactions)
Expand Down
12 changes: 9 additions & 3 deletions plugins/client-resources/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import core, {
TxApplyIf,
TxHandler,
TxResult,
clone,
generateId,
toFindResult,
type MeasureContext
Expand Down Expand Up @@ -108,6 +109,8 @@ class Connection implements ClientConnection {

private helloRecieved: boolean = false

private account: Account | undefined

onConnect?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>

rpcHandler = new RPCHandler()
Expand Down Expand Up @@ -303,7 +306,7 @@ class Connection implements ClientConnection {
this.websocket?.close()
return
}

this.account = helloResp.account
this.helloRecieved = true
if (this.upgrading) {
// We need to call upgrade since connection is upgraded
Expand All @@ -322,8 +325,8 @@ class Connection implements ClientConnection {
}

void this.onConnect?.(
(resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected,
(resp as HelloResponse).lastTx,
helloResp.reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected,
helloResp.lastTx,
this.sessionId
)
this.schedulePing(socketId)
Expand Down Expand Up @@ -635,6 +638,9 @@ class Connection implements ClientConnection {
}

getAccount (): Promise<Account> {
if (this.account !== undefined) {
return clone(this.account)
}
return this.sendRequest({ method: 'getAccount', params: [] })
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/workbench-resources/src/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ export async function connect (title: string): Promise<Client | undefined> {
return
}
try {
if (event === ClientConnectEvent.Connected) {
if (event === ClientConnectEvent.Connected || event === ClientConnectEvent.Reconnected) {
setMetadata(presentation.metadata.SessionId, data)
}
if ((_clientSet && event === ClientConnectEvent.Connected) || event === ClientConnectEvent.Refresh) {
Expand Down
62 changes: 33 additions & 29 deletions server/core/src/dbAdapterManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,43 +64,47 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
return this.defaultAdapter
}

async registerHelper (helper: DomainHelper): Promise<void> {
async registerHelper (ctx: MeasureContext, helper: DomainHelper): Promise<void> {
this.domainHelper = helper
await this.initDomains()
await this.initDomains(ctx)
}

async initDomains (): Promise<void> {
async initDomains (ctx: MeasureContext): Promise<void> {
const adapterDomains = new Map<DbAdapter, Set<Domain>>()
for (const d of this.context.hierarchy.domains()) {
// We need to init domain info
const info = this.getDomainInfo(d)
await this.updateInfo(d, adapterDomains, info)
await ctx.with('update-info', { domain: d }, async (ctx) => {
const info = this.getDomainInfo(d)
await this.updateInfo(d, adapterDomains, info)
})
}
for (const adapter of this.adapters.values()) {
adapter.on?.((domain, event, count, helper) => {
const info = this.getDomainInfo(domain)
const oldDocuments = info.documents
switch (event) {
case 'add':
info.documents += count
break
case 'update':
break
case 'delete':
info.documents -= count
break
case 'read':
break
}
for (const [name, adapter] of this.adapters.entries()) {
await ctx.with('domain-helper', { name }, async (ctx) => {
adapter.on?.((domain, event, count, helper) => {
const info = this.getDomainInfo(domain)
const oldDocuments = info.documents
switch (event) {
case 'add':
info.documents += count
break
case 'update':
break
case 'delete':
info.documents -= count
break
case 'read':
break
}

if (oldDocuments < 50 && info.documents > 50) {
// We have more 50 documents, we need to check for indexes
void this.domainHelper?.checkDomain(this.metrics, domain, info.documents, helper)
}
if (oldDocuments > 50 && info.documents < 50) {
// We have more 50 documents, we need to check for indexes
void this.domainHelper?.checkDomain(this.metrics, domain, info.documents, helper)
}
if (oldDocuments < 50 && info.documents > 50) {
// We have more 50 documents, we need to check for indexes
void this.domainHelper?.checkDomain(this.metrics, domain, info.documents, helper)
}
if (oldDocuments > 50 && info.documents < 50) {
// We have more 50 documents, we need to check for indexes
void this.domainHelper?.checkDomain(this.metrics, domain, info.documents, helper)
}
})
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/core/src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import { Analytics } from '@hcengineering/analytics'
import {
toFindResult,
withContext,
type Class,
type Doc,
type DocumentQuery,
Expand Down Expand Up @@ -66,6 +67,7 @@ class PipelineImpl implements Pipeline {
return pipeline
}

@withContext('build-chain')
private async buildChain (
ctx: MeasureContext,
constructors: MiddlewareCreator[],
Expand Down
2 changes: 1 addition & 1 deletion server/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export interface DBAdapterManager {

close: () => Promise<void>

registerHelper: (helper: DomainHelper) => Promise<void>
registerHelper: (ctx: MeasureContext, helper: DomainHelper) => Promise<void>

initAdapters: (ctx: MeasureContext) => Promise<void>

Expand Down
11 changes: 8 additions & 3 deletions server/middleware/src/dbAdapterHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,27 @@
// limitations under the License.
//

import { type MeasureContext } from '@hcengineering/core'
import { withContext, type MeasureContext } from '@hcengineering/core'
import type { Middleware, PipelineContext } from '@hcengineering/server-core'
import { BaseMiddleware, DomainIndexHelperImpl } from '@hcengineering/server-core'

/**
* @public
*/
export class DBAdapterInitMiddleware extends BaseMiddleware implements Middleware {
@withContext('db-adapter-init')
static async create (
ctx: MeasureContext,
context: PipelineContext,
next?: Middleware
): Promise<Middleware | undefined> {
await context.adapterManager?.initAdapters?.(ctx)
await ctx.with('init-adapters', {}, async (ctx) => {
await context.adapterManager?.initAdapters?.(ctx)
})
const domainHelper = new DomainIndexHelperImpl(ctx, context.hierarchy, context.modelDb, context.workspace)
await context.adapterManager?.registerHelper?.(domainHelper)
await ctx.with('register-helper', {}, async (ctx) => {
await context.adapterManager?.registerHelper?.(ctx, domainHelper)
})
return undefined
}
}
2 changes: 2 additions & 0 deletions server/middleware/src/domainTx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import core, {
Domain,
groupByArray,
TxProcessor,
withContext,
type Doc,
type MeasureContext,
type SessionData,
Expand All @@ -41,6 +42,7 @@ import { BaseMiddleware } from '@hcengineering/server-core'
export class DomainTxMiddleware extends BaseMiddleware implements Middleware {
adapterManager!: DBAdapterManager

@withContext('domainTx-middleware')
static async create (ctx: MeasureContext, context: PipelineContext, next?: Middleware): Promise<Middleware> {
const middleware = new DomainTxMiddleware(context, next)
if (context.adapterManager == null) {
Expand Down
4 changes: 3 additions & 1 deletion server/middleware/src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import core, {
type Timestamp,
type Tx,
type TxCUD,
DOMAIN_TX
DOMAIN_TX,
withContext
} from '@hcengineering/core'
import { PlatformError, unknownError } from '@hcengineering/platform'
import type {
Expand Down Expand Up @@ -51,6 +52,7 @@ export class ModelMiddleware extends BaseMiddleware implements Middleware {
super(context, next)
}

@withContext('modelAdapter-middleware')
static async doCreate (
ctx: MeasureContext,
context: PipelineContext,
Expand Down
14 changes: 4 additions & 10 deletions server/postgres/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,8 @@ export class DBCollectionHelper implements DomainHelperOperations {
async create (domain: Domain): Promise<void> {}

async exists (domain: Domain): Promise<boolean> {
const exists = await this.client`
SELECT table_name
FROM information_schema.tables
WHERE table_name = '${this.client(translateDomain(domain))}'
`
return exists.length > 0
// Always exists. We don't need to check for index existence
return true
}

async listDomains (): Promise<Set<Domain>> {
Expand All @@ -469,10 +465,8 @@ export class DBCollectionHelper implements DomainHelperOperations {
}

async estimatedCount (domain: Domain): Promise<number> {
const res = await this
.client`SELECT COUNT(_id) FROM ${this.client(translateDomain(domain))} WHERE "workspaceId" = ${this.workspaceId.name}`

return res.count
// We should always return 0, since no controlled index stuff is required for postgres driver
return 0
}
}

Expand Down
2 changes: 2 additions & 0 deletions server/rpc/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
//

import type { Account } from '@hcengineering/core'
import platform, { PlatformError, Severity, Status } from '@hcengineering/platform'
import { Packr } from 'msgpackr'

Expand Down Expand Up @@ -48,6 +49,7 @@ export interface HelloResponse extends Response<any> {
serverVersion: string
lastTx?: string
lastHash?: string // Last model hash
account: Account
}

function replacer (key: string, value: any): any {
Expand Down
11 changes: 7 additions & 4 deletions server/server/src/sessionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ class TSessionManager implements SessionManager {
})
workspace = this.createWorkspace(
ctx.parent ?? ctx,
ctx,
pipelineFactory,
token,
workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId,
Expand All @@ -435,7 +436,7 @@ class TSessionManager implements SessionManager {
workspace: workspaceInfo.workspaceId,
wsUrl: workspaceInfo.workspaceUrl
})
pipeline = await ctx.with('💤 wait', { workspaceName }, () => (workspace as Workspace).pipeline)
pipeline = await ctx.with('💤 wait-pipeline', {}, () => (workspace as Workspace).pipeline)
} else {
ctx.warn('reconnect workspace in upgrade switch', {
email: token.email,
Expand Down Expand Up @@ -466,9 +467,10 @@ class TSessionManager implements SessionManager {
})
return { upgrade: true }
}

try {
if (workspace.pipeline instanceof Promise) {
pipeline = await workspace.pipeline
pipeline = await ctx.with('💤 wait-pipeline', {}, () => (workspace as Workspace).pipeline)
workspace.pipeline = pipeline
} else {
pipeline = workspace.pipeline
Expand Down Expand Up @@ -645,6 +647,7 @@ class TSessionManager implements SessionManager {

private createWorkspace (
ctx: MeasureContext,
pipelineCtx: MeasureContext,
pipelineFactory: PipelineFactory,
token: Token,
workspaceUrl: string,
Expand All @@ -655,7 +658,6 @@ class TSessionManager implements SessionManager {
const wsId = toWorkspaceString(token.workspace)
const upgrade = token.extra?.model === 'upgrade'
const context = ctx.newChild('🧲 session', {})
const pipelineCtx = context.newChild('🧲 pipeline-factory', {})
const workspace: Workspace = {
context,
id: generateId(),
Expand Down Expand Up @@ -1106,7 +1108,8 @@ class TSessionManager implements SessionManager {
reconnect,
serverVersion: this.serverVersion,
lastTx: pipeline.context.lastTx,
lastHash: pipeline.context.lastHash
lastHash: pipeline.context.lastHash,
account: service.getRawAccount(pipeline)
}
ws.send(requestCtx, helloResponse, false, false)
}
Expand Down

0 comments on commit 3b1a707

Please sign in to comment.