Skip to content

Commit

Permalink
feat: RD-14290-support-AWS-Lambda-Stream (#533)
Browse files Browse the repository at this point in the history
* feat: support responseStream
  • Loading branch information
eugene-lumigo authored Feb 3, 2025
1 parent ffd9c8f commit f11b320
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 78 deletions.
11 changes: 9 additions & 2 deletions src/tracer/tracer.interface.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import type { Handler } from 'aws-lambda';
import type { Callback, Context, Handler } from 'aws-lambda';

import { ExecutionTags } from '../globals';
import * as LumigoLogger from '../lumigoLogger';

export type ResponseStreamHandler<TEvent = any, TResult = any> = (
event: TEvent,
responseStream: any,
context: Context,
callback?: Callback<TResult>
) => void | Promise<TResult>;

export interface Tracer {
trace: (handler: Handler) => Handler;
trace<T extends Handler | ResponseStreamHandler>(handler: T): T;
addExecutionTag: typeof ExecutionTags.addTag;
info: typeof LumigoLogger.info;
warn: typeof LumigoLogger.warn;
Expand Down
13 changes: 7 additions & 6 deletions src/tracer/tracer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -818,18 +818,19 @@ describe('tracer', () => {
});

test('responseStreamFunctionLogic - tracer disabled and decorator marked as responseStream', async () => {
const handler = jest.fn(async () => {});
const handler = jest.fn(async (event, responseStream, _) => responseStream);
handler[HANDLER_STREAMING] = STREAM_RESPONSE;

const { event, context } = new HandlerInputsBuilder().build();
const { event, context, responseStream } = new HandlerInputsBuilder()
// here we put mocked data to stream, because handler is mocked as well. Mocked `handler` just return response stream back
.withResponseStream({ hello: 'world' })
.build();

const decoratedUserHandler = tracer.trace({})(handler);
await decoratedUserHandler(event, context);
const result = await decoratedUserHandler(event, responseStream, context);

expect(decoratedUserHandler[HANDLER_STREAMING]).toEqual(STREAM_RESPONSE);
expect(spies.warnClient).toHaveBeenCalledWith(
'Tracer is disabled, running on a response stream function'
);
expect(result).toEqual({ hello: 'world' });
});

test('performStepFunctionLogic - Happy flow', async () => {
Expand Down
201 changes: 131 additions & 70 deletions src/tracer/tracer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
import { runOneTimeWrapper } from '../utils/functionUtils';
import { TraceOptions } from './trace-options.type';
import { GenericSpan } from '../types/spans/basicSpan';
import { ResponseStreamHandler } from './tracer.interface';

export const HANDLER_CALLBACKED = 'handler_callbacked';
export const HANDLER_STREAMING = Symbol.for('aws.lambda.runtime.handler.streaming');
Expand All @@ -49,86 +50,138 @@ export const MAX_ELEMENTS_IN_EXTRA = 10;
export const LEAK_MESSAGE =
'Execution leak detected. More information is available in: https://docs.lumigo.io/docs/execution-leak-detected';

export const trace =
({ token, debug, edgeHost, switchOff, stepFunction }: TraceOptions) =>
(userHandler: Handler) => {
const isResponseStreamFunction = userHandler[HANDLER_STREAMING] === STREAM_RESPONSE;
const decoratedUserHandler = async <Event = any>(
event: Event,
context?: Context,
callback?: Callback
): Promise<Handler> => {
if (!!switchOff || isSwitchedOff()) {
info(
`The '${SWITCH_OFF_FLAG}' environment variable is set to 'true': this invocation will not be traced by Lumigo`
);
return userHandler(event, context, callback);
}
const isResponseStreamFunction = (userHandler: any) =>
userHandler[HANDLER_STREAMING] === STREAM_RESPONSE;

const runUserHandler = <Event>(
userHandler: any,
event: Event,
context: Context,
callback?: Callback,
responseStream?: any
) =>
isResponseStreamFunction(userHandler)
? userHandler(event, responseStream, context, callback)
: userHandler(event, context, callback);

const processUserHandler = async <Event>(
userHandler: any,
event: Event,
context: Context,
options: TraceOptions,
callback?: Callback,
responseStream?: any
) => {
const { token, debug, edgeHost, switchOff, stepFunction } = options;

if (!!switchOff || isSwitchedOff()) {
info(
`The '${SWITCH_OFF_FLAG}' environment variable is set to 'true': this invocation will not be traced by Lumigo`
);
return runUserHandler(userHandler, event, context, callback, responseStream);
}

if (!isAwsEnvironment()) {
warnClient('Tracer is disabled, running on non-aws environment');
return userHandler(event, context, callback);
}
if (isResponseStreamFunction) {
warnClient('Tracer is disabled, running on a response stream function');
return userHandler(event, context, callback);
}
try {
TracerGlobals.setHandlerInputs({ event, context });
TracerGlobals.setTracerInputs({
token,
debug,
edgeHost,
switchOff,
stepFunction,
lambdaTimeout: context.getRemainingTimeInMillis(),
});
ExecutionTags.autoTagEvent(event);
} catch (err) {
logger.warn('Failed to start tracer', err);
}
if (!isAwsEnvironment()) {
warnClient('Tracer is disabled, running on non-aws environment');
return runUserHandler(userHandler, event, context, callback, responseStream);
}

if (!context || !isAwsContext(context)) {
logger.warnClient(
'missing context parameter - learn more at https://docs.lumigo.io/docs/nodejs'
);
const { err, data, type } = await promisifyUserHandler(userHandler, event, context);
return performPromisifyType(err, data, type, callback);
}
try {
TracerGlobals.setHandlerInputs({ event, context });
TracerGlobals.setTracerInputs({
token,
debug,
edgeHost,
switchOff,
stepFunction,
lambdaTimeout: context.getRemainingTimeInMillis(),
});
ExecutionTags.autoTagEvent(event);
} catch (err) {
logger.warn('Failed to start tracer', err);
}

if (context.__wrappedByLumigo) {
const { err, data, type } = await promisifyUserHandler(userHandler, event, context);
return performPromisifyType(err, data, type, callback);
}
context.__wrappedByLumigo = true;
if (!context || !isAwsContext(context)) {
logger.warnClient(
'missing context parameter - learn more at https://docs.lumigo.io/docs/nodejs'
);
const { err, data, type } = await promisifyUserHandler(
userHandler,
event,
context,
responseStream
);
return performPromisifyType(err, data, type, callback);
}

const functionSpan = getFunctionSpan(event, context);
if (context.__wrappedByLumigo) {
const { err, data, type } = await promisifyUserHandler(
userHandler,
event,
context,
responseStream
);
return performPromisifyType(err, data, type, callback);
}
context.__wrappedByLumigo = true;

await hookUnhandledRejection(functionSpan);
const functionSpan = getFunctionSpan(event, context);

const pStartTrace = startTrace(functionSpan);
const pUserHandler = promisifyUserHandler(userHandler, event, context);
await hookUnhandledRejection(functionSpan);

let [, handlerReturnValue] = await Promise.all([pStartTrace, pUserHandler]);
const pStartTrace = startTrace(functionSpan);
const pUserHandler = promisifyUserHandler(userHandler, event, context, responseStream);

handlerReturnValue = normalizeLambdaError(handlerReturnValue);
let [, handlerReturnValue] = await Promise.all([pStartTrace, pUserHandler]);

if (isStepFunction()) {
handlerReturnValue = performStepFunctionLogic(handlerReturnValue);
}
handlerReturnValue = normalizeLambdaError(handlerReturnValue);

const cleanedHandlerReturnValue = removeLumigoFromStacktrace(handlerReturnValue);
if (isStepFunction()) {
handlerReturnValue = performStepFunctionLogic(handlerReturnValue);
}

await endTrace(functionSpan, cleanedHandlerReturnValue);
const { err, data, type } = cleanedHandlerReturnValue;
const cleanedHandlerReturnValue = removeLumigoFromStacktrace(handlerReturnValue);

return performPromisifyType(err, data, type, callback);
};
await endTrace(functionSpan, cleanedHandlerReturnValue);
const { err, data, type } = cleanedHandlerReturnValue;

if (isResponseStreamFunction) {
decoratedUserHandler[HANDLER_STREAMING] = STREAM_RESPONSE;
}
return decoratedUserHandler;
return performPromisifyType(err, data, type, callback);
};

const decorateUserHandler = <T extends Handler | ResponseStreamHandler>(
userHandler: T,
options: TraceOptions
) => {
const decoratedUserHandler = async <Event = any>(
event: Event,
context?: Context,
callback?: Callback
): Promise<Handler> => {
return await processUserHandler(userHandler, event, context, options, callback, undefined);
};

const decoratedResponseStreamUserHandler = async <Event = any>(
event: Event,
responseStream?: any,
context?: Context,
callback?: Callback
): Promise<ResponseStreamHandler> => {
return await processUserHandler(userHandler, event, context, options, callback, responseStream);
};

if (isResponseStreamFunction(userHandler)) {
logger.debug('Function has response stream in the handler');
decoratedResponseStreamUserHandler[HANDLER_STREAMING] = STREAM_RESPONSE;
return decoratedResponseStreamUserHandler as T;
} else {
return decoratedUserHandler as T;
}
};

export const trace =
(options: TraceOptions) =>
<T extends Handler | ResponseStreamHandler>(userHandler: T): T => {
return decorateUserHandler(userHandler, options);
};

export const startTrace = async (functionSpan: GenericSpan) => {
Expand Down Expand Up @@ -191,11 +244,14 @@ export const isCallbacked = (handlerReturnValue) => {
export function promisifyUserHandler(
userHandler,
event,
context
context,
responseStream?
): Promise<{ err: any; data: any; type: string }> {
return new Promise((resolve) => {
try {
const result = userHandler(event, context, callbackResolver(resolve));
const result = isResponseStreamFunction(userHandler)
? userHandler(event, responseStream, context, callbackResolver(resolve))
: userHandler(event, context, callbackResolver(resolve));
if (isPromise(result)) {
result
.then((data) => resolve({ err: null, data, type: ASYNC_HANDLER_RESOLVED }))
Expand Down Expand Up @@ -286,7 +342,12 @@ const logLeakedSpans = (allSpans) => {
});
};

const performPromisifyType = (err, data, type, callback): Handler => {
const performPromisifyType = <T extends Handler | ResponseStreamHandler>(
err,
data: T,
type,
callback
): T => {
switch (type) {
case HANDLER_CALLBACKED:
callback(err, data);
Expand Down
7 changes: 7 additions & 0 deletions testUtils/handlerInputsBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Context } from 'aws-lambda';
export class HandlerInputsBuilder {
_event: any;
_context: Context;
_responseStream: any;

static DEFAULT_AWS_REQUEST_ID = HttpSpanBuilder.DEFAULT_PARENT_ID;
static DEFAULT_INVOKED_FUNCTION_ARN = HttpSpanBuilder.DEFAULT_ARN;
Expand Down Expand Up @@ -45,8 +46,14 @@ export class HandlerInputsBuilder {
return this;
};

withResponseStream = (stream: any) => {
this._responseStream = stream;
return this;
};

build = () => ({
event: this._event,
context: this._context,
responseStream: this._responseStream,
});
}

0 comments on commit f11b320

Please sign in to comment.