From b32d9f70860b256d4d918606ebdf31b8087e68fd Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Mon, 21 Oct 2024 12:05:53 -0700 Subject: [PATCH] make handler spans more accurate, re-add connection span (#276) ## Why - a lot of proc handler specific log messages didnt get picked up by the right tracer - move proc stream <> handler proxy messages into the handler span - move the invoked message into the client handler span - re-add connection telemetry - span linking between procs/handlers and their originating sessions - we use https://www.npmjs.com/package/@opentelemetry/context-async-hooks so context is tracked safely in an async context so we should leverage that ## What changed ## Versioning - [ ] Breaking protocol change - [ ] Breaking ts/js API change --- logging/log.ts | 13 ++ package-lock.json | 53 +++--- package.json | 4 +- router/client.ts | 4 +- router/server.ts | 182 +++++++++---------- tracing/index.ts | 26 ++- tracing/tracing.test.ts | 108 ++++++++--- transport/connection.ts | 4 +- transport/sessionStateMachine/common.ts | 3 +- transport/sessionStateMachine/transitions.ts | 9 +- 10 files changed, 230 insertions(+), 176 deletions(-) diff --git a/logging/log.ts b/logging/log.ts index 77ad7c86..de77449a 100644 --- a/logging/log.ts +++ b/logging/log.ts @@ -1,5 +1,6 @@ import { ValueError } from '@sinclair/typebox/value'; import { OpaqueTransportMessage, ProtocolVersion } from '../transport/message'; +import { context, trace } from '@opentelemetry/api'; const LoggingLevels = { debug: -1, @@ -27,6 +28,17 @@ export type Tags = const cleanedLogFn = (log: LogFn) => { return (msg: string, metadata?: MessageMetadata) => { + // try to infer telemetry + if (metadata && !metadata.telemetry) { + const span = trace.getSpan(context.active()); + if (span) { + metadata.telemetry = { + traceId: span.spanContext().traceId, + spanId: span.spanContext().spanId, + }; + } + } + // skip cloning object if metadata has no transportMessage if (!metadata?.transportMessage) { log(msg, metadata); @@ -37,6 +49,7 @@ const cleanedLogFn = (log: LogFn) => { // clone metadata and clean transportMessage const { payload, ...rest } = metadata.transportMessage; metadata.transportMessage = rest; + log(msg, metadata); }; }; diff --git a/package-lock.json b/package-lock.json index 49bcc5ce..6885e9eb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/river", - "version": "0.203.0", + "version": "0.203.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@replit/river", - "version": "0.203.0", + "version": "0.203.1", "license": "MIT", "dependencies": { "@msgpack/msgpack": "^3.0.0-beta2", @@ -14,9 +14,9 @@ "ws": "^8.17.0" }, "devDependencies": { + "@opentelemetry/context-async-hooks": "^1.26.0", "@opentelemetry/core": "^1.7.0", "@opentelemetry/sdk-trace-base": "^1.24.1", - "@opentelemetry/sdk-trace-web": "^1.24.1", "@stylistic/eslint-plugin": "^2.6.4", "@types/ws": "^8.5.5", "@typescript-eslint/eslint-plugin": "^7.8.0", @@ -617,6 +617,18 @@ "node": ">=8.0.0" } }, + "node_modules/@opentelemetry/context-async-hooks": { + "version": "1.26.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/context-async-hooks/-/context-async-hooks-1.26.0.tgz", + "integrity": "sha512-HedpXXYzzbaoutw6DFLWLDket2FwLkLpil4hGCZ1xYEIMTcivdfwEOISgdbLEWyG3HW52gTq2V9mOVJrONgiwg==", + "dev": true, + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, "node_modules/@opentelemetry/core": { "version": "1.24.1", "resolved": "https://registry.npmjs.org/@opentelemetry/core/-/core-1.24.1.tgz", @@ -665,23 +677,6 @@ "@opentelemetry/api": ">=1.0.0 <1.9.0" } }, - "node_modules/@opentelemetry/sdk-trace-web": { - "version": "1.24.1", - "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-trace-web/-/sdk-trace-web-1.24.1.tgz", - "integrity": "sha512-0w+aKRai9VREeo3VrtW+hcbrE2Fl/uKL7G+oXgRNf6pI9QLaEGuEzUTX+oxXVPBadzjOd+5dqCHYdX7UeVjzwA==", - "dev": true, - "dependencies": { - "@opentelemetry/core": "1.24.1", - "@opentelemetry/sdk-trace-base": "1.24.1", - "@opentelemetry/semantic-conventions": "1.24.1" - }, - "engines": { - "node": ">=14" - }, - "peerDependencies": { - "@opentelemetry/api": ">=1.0.0 <1.9.0" - } - }, "node_modules/@opentelemetry/semantic-conventions": { "version": "1.24.1", "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.24.1.tgz", @@ -4708,6 +4703,13 @@ "integrity": "sha512-I/s6F7yKUDdtMsoBWXJe8Qz40Tui5vsuKCWJEWVL+5q9sSWRzzx6v2KeNsOBEwd94j0eWkpWCH4yB6rZg9Mf0w==", "peer": true }, + "@opentelemetry/context-async-hooks": { + "version": "1.26.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/context-async-hooks/-/context-async-hooks-1.26.0.tgz", + "integrity": "sha512-HedpXXYzzbaoutw6DFLWLDket2FwLkLpil4hGCZ1xYEIMTcivdfwEOISgdbLEWyG3HW52gTq2V9mOVJrONgiwg==", + "dev": true, + "requires": {} + }, "@opentelemetry/core": { "version": "1.24.1", "resolved": "https://registry.npmjs.org/@opentelemetry/core/-/core-1.24.1.tgz", @@ -4738,17 +4740,6 @@ "@opentelemetry/semantic-conventions": "1.24.1" } }, - "@opentelemetry/sdk-trace-web": { - "version": "1.24.1", - "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-trace-web/-/sdk-trace-web-1.24.1.tgz", - "integrity": "sha512-0w+aKRai9VREeo3VrtW+hcbrE2Fl/uKL7G+oXgRNf6pI9QLaEGuEzUTX+oxXVPBadzjOd+5dqCHYdX7UeVjzwA==", - "dev": true, - "requires": { - "@opentelemetry/core": "1.24.1", - "@opentelemetry/sdk-trace-base": "1.24.1", - "@opentelemetry/semantic-conventions": "1.24.1" - } - }, "@opentelemetry/semantic-conventions": { "version": "1.24.1", "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.24.1.tgz", diff --git a/package.json b/package.json index c722e58b..9a4d2e91 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@replit/river", "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.203.0", + "version": "0.203.1", "type": "module", "exports": { ".": { @@ -57,9 +57,9 @@ "@sinclair/typebox": "~0.32.8" }, "devDependencies": { + "@opentelemetry/context-async-hooks": "^1.26.0", "@opentelemetry/core": "^1.7.0", "@opentelemetry/sdk-trace-base": "^1.24.1", - "@opentelemetry/sdk-trace-web": "^1.24.1", "@stylistic/eslint-plugin": "^2.6.4", "@types/ws": "^8.5.5", "@typescript-eslint/eslint-plugin": "^7.8.0", diff --git a/router/client.ts b/router/client.ts index 0f19c443..0c318b68 100644 --- a/router/client.ts +++ b/router/client.ts @@ -288,7 +288,7 @@ function handleProc( const procClosesWithInit = procType === 'rpc' || procType === 'subscription'; const streamId = generateId(); const { span, ctx } = createProcTelemetryInfo( - transport, + session, procType, serviceName, procedureName, @@ -535,7 +535,7 @@ function handleProc( /** * Waits for a message in the response AND the server to close. - * Logs an error if we receive multiple messages. + * Logs an error if we receive multiple messages. * Used in RPC and Upload. */ async function getSingleMessage( diff --git a/router/server.ts b/router/server.ts index 8dc41c47..31667f26 100644 --- a/router/server.ts +++ b/router/server.ts @@ -74,8 +74,6 @@ export interface Server { close: () => Promise; } -type ProcHandlerReturn = Promise<(() => void) | void>; - interface StreamInitProps { // msg derived streamId: StreamId; @@ -199,10 +197,17 @@ class RiverServer } // if its not a cancelled stream, validate and create a new stream - this.createNewProcStream({ - ...newStreamProps, - ...message, - }); + createHandlerSpan( + newStreamProps.initialSession, + newStreamProps.procedure.type, + newStreamProps.serviceName, + newStreamProps.procedureName, + newStreamProps.streamId, + newStreamProps.tracingCtx, + (span) => { + this.createNewProcStream(span, newStreamProps); + }, + ); }; const handleSessionStatus = (evt: EventMap['sessionStatus']) => { @@ -241,7 +246,7 @@ class RiverServer this.transport.addEventListener('transportStatus', handleTransportStatus); } - private createNewProcStream(props: StreamInitProps) { + private createNewProcStream(span: Span, props: StreamInitProps) { const { streamId, initialSession, @@ -251,7 +256,6 @@ class RiverServer sessionMetadata, serviceContext, initPayload, - tracingCtx, procClosesWithInit, passInitAsDataForBackwardsCompat, } = props; @@ -263,6 +267,12 @@ class RiverServer id: sessionId, } = initialSession; + // dont use the session span here, we want to create a new span for the procedure + loggingMetadata.telemetry = { + traceId: span.spanContext().traceId, + spanId: span.spanContext().spanId, + }; + let cleanClose = true; const onMessage = (msg: OpaqueTransportMessage) => { if (msg.from !== from) { @@ -558,108 +568,78 @@ class RiverServer switch (procedure.type) { case 'rpc': - void createHandlerSpan( - procedure.type, - serviceName, - procedureName, - streamId, - tracingCtx, - async (span): ProcHandlerReturn => { - try { - const responsePayload = await procedure.handler({ - ctx: handlerContextWithSpan(span), - reqInit: initPayload, - }); - - if (resWritable.isClosed()) { - // A disconnect happened - return; - } - - resWritable.write(responsePayload); - } catch (err) { - onHandlerError(err, span); - } finally { - span.end(); + void (async () => { + try { + const responsePayload = await procedure.handler({ + ctx: handlerContextWithSpan(span), + reqInit: initPayload, + }); + + if (resWritable.isClosed()) { + // A disconnect happened + return; } - }, - ); + + resWritable.write(responsePayload); + } catch (err) { + onHandlerError(err, span); + } finally { + span.end(); + } + })(); break; case 'stream': - void createHandlerSpan( - procedure.type, - serviceName, - procedureName, - streamId, - tracingCtx, - async (span): ProcHandlerReturn => { - try { - await procedure.handler({ - ctx: handlerContextWithSpan(span), - reqInit: initPayload, - reqReadable, - resWritable, - }); - } catch (err) { - onHandlerError(err, span); - } finally { - span.end(); - } - }, - ); - + void (async () => { + try { + await procedure.handler({ + ctx: handlerContextWithSpan(span), + reqInit: initPayload, + reqReadable, + resWritable, + }); + } catch (err) { + onHandlerError(err, span); + } finally { + span.end(); + } + })(); break; case 'subscription': - void createHandlerSpan( - procedure.type, - serviceName, - procedureName, - streamId, - tracingCtx, - async (span): ProcHandlerReturn => { - try { - await procedure.handler({ - ctx: handlerContextWithSpan(span), - reqInit: initPayload, - resWritable: resWritable, - }); - } catch (err) { - onHandlerError(err, span); - } finally { - span.end(); - } - }, - ); + void (async () => { + try { + await procedure.handler({ + ctx: handlerContextWithSpan(span), + reqInit: initPayload, + resWritable: resWritable, + }); + } catch (err) { + onHandlerError(err, span); + } finally { + span.end(); + } + })(); break; case 'upload': - void createHandlerSpan( - procedure.type, - serviceName, - procedureName, - streamId, - tracingCtx, - async (span): ProcHandlerReturn => { - try { - const responsePayload = await procedure.handler({ - ctx: handlerContextWithSpan(span), - reqInit: initPayload, - reqReadable: reqReadable, - }); - - if (resWritable.isClosed()) { - // A disconnect happened - return; - } - - resWritable.write(responsePayload); - } catch (err) { - onHandlerError(err, span); - } finally { - span.end(); + void (async () => { + try { + const responsePayload = await procedure.handler({ + ctx: handlerContextWithSpan(span), + reqInit: initPayload, + reqReadable: reqReadable, + }); + + if (resWritable.isClosed()) { + // A disconnect happened + return; } - }, - ); + resWritable.write(responsePayload); + } catch (err) { + onHandlerError(err, span); + } finally { + span.end(); + } + })(); break; } diff --git a/tracing/index.ts b/tracing/index.ts index c51afa6e..ca493f85 100644 --- a/tracing/index.ts +++ b/tracing/index.ts @@ -8,7 +8,10 @@ import { } from '@opentelemetry/api'; import { version as RIVER_VERSION } from '../package.json'; import { ValidProcType } from '../router'; -import { ClientTransport, Connection } from '../transport'; +import { Connection } from '../transport'; +import { MessageMetadata } from '../logging'; +import { ClientSession } from '../transport/sessionStateMachine/transitions'; +import { IdentifiedSession } from '../transport/sessionStateMachine/common'; export interface PropagationContext { traceparent: string; @@ -82,7 +85,7 @@ export function createConnectionTelemetryInfo( } export function createProcTelemetryInfo( - transport: ClientTransport, + session: ClientSession, kind: ValidProcType, serviceName: string, procedureName: string, @@ -100,29 +103,35 @@ export function createProcTelemetryInfo( 'river.streamId': streamId, 'span.kind': 'client', }, + links: [{ context: session.telemetry.span.spanContext() }], kind: SpanKind.CLIENT, }, baseCtx, ); const ctx = trace.setSpan(baseCtx, span); - - transport.log?.info(`invoked ${serviceName}.${procedureName}`, { - clientId: transport.clientId, + const metadata: MessageMetadata = { + ...session.loggingMetadata, transportMessage: { procedureName, serviceName, }, - telemetry: { + }; + + if (span.isRecording()) { + metadata.telemetry = { traceId: span.spanContext().traceId, spanId: span.spanContext().spanId, - }, - }); + }; + } + + session.log?.info(`invoked ${serviceName}.${procedureName}`, metadata); return { span, ctx }; } export function createHandlerSpan unknown>( + session: IdentifiedSession, kind: ValidProcType, serviceName: string, procedureName: string, @@ -145,6 +154,7 @@ export function createHandlerSpan unknown>( 'river.streamId': streamId, 'span.kind': 'server', }, + links: [{ context: session.telemetry.span.spanContext() }], kind: SpanKind.SERVER, }, ctx, diff --git a/tracing/tracing.test.ts b/tracing/tracing.test.ts index 7fb959bd..e6614f4c 100644 --- a/tracing/tracing.test.ts +++ b/tracing/tracing.test.ts @@ -1,6 +1,6 @@ import { trace, context, propagation, Span } from '@opentelemetry/api'; import { describe, test, expect, vi, assert, beforeEach } from 'vitest'; -import { dummySession } from '../testUtil'; +import { dummySession, readNextResult } from '../testUtil'; import { BasicTracerProvider, @@ -8,11 +8,10 @@ import { SimpleSpanProcessor, } from '@opentelemetry/sdk-trace-base'; import { W3CTraceContextPropagator } from '@opentelemetry/core'; -import { StackContextManager } from '@opentelemetry/sdk-trace-web'; +import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks'; import tracer, { createSessionTelemetryInfo, getPropagationContext, - createHandlerSpan, } from './index'; import { testMatrix } from '../testUtil/fixtures/matrix'; import { @@ -22,13 +21,18 @@ import { } from '../testUtil/fixtures/cleanup'; import { TestSetupHelpers } from '../testUtil/fixtures/transports'; import { createPostTestCleanups } from '../testUtil/fixtures/cleanup'; +import { FallibleServiceSchema } from '../testUtil/fixtures/services'; +import { createServer } from '../router/server'; +import { createClient } from '../router/client'; +import { UNCAUGHT_ERROR_CODE } from '../router'; +import { LogFn } from '../logging'; describe('Basic tracing tests', () => { const provider = new BasicTracerProvider(); provider.addSpanProcessor( new SimpleSpanProcessor(new InMemorySpanExporter()), ); - const contextManager = new StackContextManager(); + const contextManager = new AsyncHooksContextManager(); contextManager.enable(); trace.setGlobalTracerProvider(provider); context.setGlobalContextManager(contextManager); @@ -57,29 +61,6 @@ describe('Basic tracing tests', () => { ) as Span, ).toBeTruthy(); }); - - test('createHandlerSpan', () => { - const parentCtx = context.active(); - const span = tracer.startSpan('testing span', {}, parentCtx); - const ctx = trace.setSpan(parentCtx, span); - - const propagationContext = getPropagationContext(ctx); - expect(propagationContext?.traceparent).toBeTruthy(); - - const handlerMock = vi.fn<(span: Span) => void>(); - createHandlerSpan( - 'rpc', - 'myservice', - 'myprocedure', - 'mystream', - propagationContext, - handlerMock, - ); - expect(handlerMock).toHaveBeenCalledTimes(1); - const createdSpan = handlerMock.mock.calls[0][0]; - // @ts-expect-error: hacking to get parentSpanId - expect(createdSpan.parentSpanId).toBe(span.spanContext().spanId); - }); }); describe.each(testMatrix())( @@ -135,5 +116,78 @@ describe.each(testMatrix())( serverTransport, }); }); + + test('implicit telemetry gets picked up from handlers', async () => { + // setup + const clientTransport = getClientTransport('client'); + const clientMockLogger = vi.fn(); + clientTransport.bindLogger(clientMockLogger); + const serverTransport = getServerTransport(); + const serverMockLogger = vi.fn(); + serverTransport.bindLogger(serverMockLogger); + const services = { + fallible: FallibleServiceSchema, + }; + const server = createServer(serverTransport, services); + const client = createClient( + clientTransport, + serverTransport.clientId, + ); + addPostTestCleanup(async () => { + await cleanupTransports([clientTransport, serverTransport]); + }); + + // test + const { reqWritable, resReadable } = client.fallible.echo.stream({}); + + reqWritable.write({ + msg: 'abc', + throwResult: false, + throwError: false, + }); + let result = await readNextResult(resReadable); + expect(result).toStrictEqual({ + ok: true, + payload: { + response: 'abc', + }, + }); + + // this isn't the first message so doesn't have telemetry info on the message itself + reqWritable.write({ + msg: 'def', + throwResult: false, + throwError: true, + }); + + result = await readNextResult(resReadable); + expect(result).toStrictEqual({ + ok: false, + payload: { + code: UNCAUGHT_ERROR_CODE, + message: 'some message', + }, + }); + + // expect that both client and server loggers logged the uncaught error with the correct telemetry info + const clientInvokeCall = clientMockLogger.mock.calls.find( + (call) => call[0] === 'invoked fallible.echo', + ); + const serverInvokeFail = serverMockLogger.mock.calls.find( + (call) => call[0] === 'fallible.echo handler threw an uncaught error', + ); + expect(clientInvokeCall?.[1]).toBeTruthy(); + expect(serverInvokeFail?.[1]).toBeTruthy(); + expect(clientInvokeCall?.[1]?.telemetry?.traceId).toStrictEqual( + serverInvokeFail?.[1]?.telemetry?.traceId, + ); + + reqWritable.close(); + await testFinishesCleanly({ + clientTransports: [clientTransport], + serverTransport, + server, + }); + }); }, ); diff --git a/transport/connection.ts b/transport/connection.ts index 75fb3161..7e05da0d 100644 --- a/transport/connection.ts +++ b/transport/connection.ts @@ -18,9 +18,9 @@ export abstract class Connection { get loggingMetadata(): MessageMetadata { const metadata: MessageMetadata = { connId: this.id }; - const spanContext = this.telemetry?.span.spanContext(); - if (this.telemetry?.span.isRecording() && spanContext) { + if (this.telemetry?.span.isRecording()) { + const spanContext = this.telemetry.span.spanContext(); metadata.telemetry = { traceId: spanContext.traceId, spanId: spanContext.spanId, diff --git a/transport/sessionStateMachine/common.ts b/transport/sessionStateMachine/common.ts index 2c23bd03..d12fc884 100644 --- a/transport/sessionStateMachine/common.ts +++ b/transport/sessionStateMachine/common.ts @@ -241,8 +241,6 @@ export abstract class IdentifiedSession extends CommonSession { } get loggingMetadata(): MessageMetadata { - const spanContext = this.telemetry.span.spanContext(); - const metadata: MessageMetadata = { clientId: this.from, connectedTo: this.to, @@ -250,6 +248,7 @@ export abstract class IdentifiedSession extends CommonSession { }; if (this.telemetry.span.isRecording()) { + const spanContext = this.telemetry.span.spanContext(); metadata.telemetry = { traceId: spanContext.traceId, spanId: spanContext.spanId, diff --git a/transport/sessionStateMachine/transitions.ts b/transport/sessionStateMachine/transitions.ts index 925f347c..595959f9 100644 --- a/transport/sessionStateMachine/transitions.ts +++ b/transport/sessionStateMachine/transitions.ts @@ -14,7 +14,11 @@ import { IdentifiedSessionWithGracePeriodProps, SessionOptions, } from './common'; -import { PropagationContext, createSessionTelemetryInfo } from '../../tracing'; +import { + PropagationContext, + createConnectionTelemetryInfo, + createSessionTelemetryInfo, +} from '../../tracing'; import { SessionWaitingForHandshake, SessionWaitingForHandshakeListeners, @@ -187,6 +191,7 @@ export const SessionStateGraph = { ...carriedState, }); + conn.telemetry = createConnectionTelemetryInfo(conn, session.telemetry); session.log?.info( `session ${session.id} transition from Connecting to Handshaking`, { @@ -262,6 +267,8 @@ export const SessionStateGraph = { listeners, ...carriedState, }); + + conn.telemetry = createConnectionTelemetryInfo(conn, session.telemetry); session.log?.info( `session ${session.id} transition from WaitingForHandshake to Connected`, {