Skip to content

Commit

Permalink
improve retry mechanism, avoiding rely on AWS SQS retry and instead o… (
Browse files Browse the repository at this point in the history
#1999)

…nly using self retry mechanism
  • Loading branch information
jeanschmidt authored Feb 1, 2023
1 parent f7b9388 commit 9d5460f
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 154 deletions.
4 changes: 2 additions & 2 deletions terraform-aws-github-runner/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ resource "aws_sqs_queue" "queued_builds" {
fifo_queue = true
content_based_deduplication = true
max_message_size = 2048
message_retention_seconds = var.runners_scale_up_sqs_max_retry * var.runners_scale_up_sqs_visibility_timeout + 100
message_retention_seconds = var.runners_scale_up_sqs_message_ret_s
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.queued_builds_dead_letter.arn
maxReceiveCount = var.runners_scale_up_sqs_max_retry
Expand All @@ -57,7 +57,7 @@ resource "aws_sqs_queue" "queued_builds_retry" {
name = "${var.environment}-queued-builds-retry"
visibility_timeout_seconds = var.runners_scale_up_sqs_visibility_timeout
max_message_size = 2048
message_retention_seconds = var.runners_scale_up_sqs_max_retry * var.runners_scale_up_sqs_visibility_timeout + 100
message_retention_seconds = var.runners_scale_up_sqs_message_ret_s
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.queued_builds_retry_dead_letter.arn
maxReceiveCount = var.runners_scale_up_sqs_max_retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ import { Context, SQSEvent, ScheduledEvent } from 'aws-lambda';
import { mocked } from 'ts-jest/utils';
import { scaleDown } from './scale-runners/scale-down';
import { scaleUp, RetryableScalingError } from './scale-runners/scale-up';
import * as MetricsModule from './scale-runners/metrics';

const mockCloudWatch = {
putMetricData: jest.fn().mockImplementation(() => {
return { promise: jest.fn().mockResolvedValue(true) };
}),
};
const mockSQS = {
sendMessage: jest.fn().mockReturnValue({ promise: jest.fn() }),
};
jest.mock('aws-sdk', () => ({
SQS: jest.fn().mockImplementation(() => mockSQS),
CloudWatch: jest.fn().mockImplementation(() => mockCloudWatch),
}));

jest.mock('./scale-runners/scale-down');
jest.mock('./scale-runners/scale-up');

const metrics = new MetricsModule.ScaleUpMetrics();

beforeEach(() => {
jest.resetModules();
jest.clearAllMocks();
Expand All @@ -27,6 +36,7 @@ beforeEach(() => {
describe('scaleUp', () => {
beforeEach(() => {
jest.spyOn(global.Math, 'random').mockReturnValue(1.0);
jest.spyOn(MetricsModule, 'ScaleUpMetrics').mockReturnValue(metrics);
});

afterEach(() => {
Expand All @@ -47,8 +57,8 @@ describe('scaleUp', () => {
callback,
);
expect(mockedScaleUp).toBeCalledTimes(2);
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 1 });
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 2 });
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 1 }, metrics);
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 2 }, metrics);
expect(callback).toBeCalledTimes(1);
expect(callback).toBeCalledWith(null);
});
Expand All @@ -67,7 +77,7 @@ describe('scaleUp', () => {
callback,
);
expect(mockedScaleUp).toBeCalledTimes(1);
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 1 });
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 1 }, metrics);
expect(callback).toBeCalledTimes(1);
expect(callback).toBeCalledWith('Failed handling SQS event');
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,71 +1,117 @@
import { SQS } from 'aws-sdk';
import { ActionRequestMessage, RetryableScalingError, scaleUp as scaleUpR } from './scale-runners/scale-up';
import { Context, SQSEvent, SQSRecord, ScheduledEvent } from 'aws-lambda';

import { Config } from './scale-runners/config';
import { scaleDown as scaleDownR } from './scale-runners/scale-down';
import { scaleUp as scaleUpR, RetryableScalingError, ActionRequestMessage } from './scale-runners/scale-up';
import { SQS } from 'aws-sdk';
import { ScaleUpMetrics, sendMetricsAtTimeout, sendMetricsTimeoutVars } from './scale-runners/metrics';
import { getDelayWithJitterRetryCount } from './scale-runners/utils';
import { scaleDown as scaleDownR } from './scale-runners/scale-down';

async function sendRetryEvents(evtFailed: Array<[SQSRecord, boolean]>, metrics: ScaleUpMetrics) {
console.error(`Detected ${evtFailed.length} errors when processing messages, will retry relevant messages.`);
metrics.exception();

const sqs: SQS = new SQS();

for (const [evt, retryable] of evtFailed) {
const body: ActionRequestMessage = JSON.parse(evt.body);
const retryCount = body?.retryCount ?? 0;

if (
retryCount < Config.Instance.maxRetryScaleUpRecord &&
(Config.Instance.retryScaleUpRecordQueueUrl?.length ?? 0) > 0
) {
if (retryable) {
metrics.scaleUpFailureRetryable(retryCount);
} else {
metrics.scaleUpFailureNonRetryable(retryCount);
}

body.retryCount = retryCount + 1;
body.delaySeconds = Math.min(
900,
getDelayWithJitterRetryCount(
retryCount,
Math.max(Config.Instance.retryScaleUpRecordDelayS, 20),
Config.Instance.retryScaleUpRecordJitterPct,
),
);

const sqsPayload: SQS.SendMessageRequest = {
DelaySeconds: body.delaySeconds,
MessageBody: JSON.stringify(body),
QueueUrl: Config.Instance.retryScaleUpRecordQueueUrl as string,
};

await sqs.sendMessage(sqsPayload).promise();
console.warn(`Sent message: ${evt.body}`);
} else {
console.error(`Permanently abandoning message: ${evt.body}`);
}
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export async function scaleUp(event: SQSEvent, context: Context, callback: any) {
console.dir(event, { depth: 5 });
let success = false;
const metrics = new ScaleUpMetrics();

const sndMetricsTimout: sendMetricsTimeoutVars = {
metrics: metrics,
};
sndMetricsTimout.setTimeout = setTimeout(
sendMetricsAtTimeout(sndMetricsTimout),
(Config.Instance.lambdaTimeout - 10) * 1000,
);

try {
const evtFailed: Array<SQSRecord> = [];
const evtFailed: Array<[SQSRecord, boolean]> = [];

recordsIterProcess: for (let i = 0; i < event.Records.length; i += 1) {
const evt = event.Records[i];

for (const evt of event.Records) {
try {
await scaleUpR(evt.eventSource, JSON.parse(evt.body));
await scaleUpR(evt.eventSource, JSON.parse(evt.body), metrics);
metrics.scaleUpSuccess();
} catch (e) {
if (e instanceof RetryableScalingError) {
console.error(`Retryable error thrown: "${e.message}"`);
evtFailed.push(evt);
evtFailed.push([evt, true]);
} else {
throw e;
console.error(`Non-retryable error during request: "${e.message}"`);
console.error(`All remaning '${event.Records.length - i}' messages will be scheduled to retry`);
for (let ii = i; ii < event.Records.length; ii += 1) {
evtFailed.push([event.Records[ii], false]);
}
break recordsIterProcess;
}
}
}

if (evtFailed.length > 0) {
console.error(`Detected ${evtFailed.length} errors when processing messages, will retry relevant messages.`);

const sqs: SQS = new SQS();

for (const evt of evtFailed) {
const body: ActionRequestMessage = JSON.parse(evt.body);
const retryCount = body?.retryCount ?? 0;

if (
retryCount < Config.Instance.maxRetryScaleUpRecord &&
(Config.Instance.retryScaleUpRecordQueueUrl?.length ?? 0) > 0
) {
body.retryCount = retryCount + 1;
body.delaySeconds = Math.min(
900,
getDelayWithJitterRetryCount(
retryCount,
Math.max(Config.Instance.retryScaleUpRecordDelayS, 20),
Config.Instance.retryScaleUpRecordJitterPct,
),
);

const sqsPayload: SQS.SendMessageRequest = {
DelaySeconds: body.delaySeconds,
MessageBody: JSON.stringify(body),
QueueUrl: Config.Instance.retryScaleUpRecordQueueUrl as string,
};

await sqs.sendMessage(sqsPayload).promise();
console.warn(`Sent message: ${evt.body}`);
} else {
console.error(`Permanently abandoning message: ${evt.body}`);
}
}
await sendRetryEvents(evtFailed, metrics);
}

return callback(null);
success = evtFailed.every((i) => {
return i[1];
});
} catch (e) {
console.error(e);
return callback('Failed handling SQS event');
} finally {
try {
clearTimeout(sndMetricsTimout.setTimeout);
sndMetricsTimout.metrics = undefined;
sndMetricsTimout.setTimeout = undefined;
metrics.sendMetrics();
} catch (e) {
console.error(`Error sending metrics: ${e}`);
}

if (success) {
callback(null);
} else {
callback('Failed handling SQS event');
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ export class Metrics {
}
}

/* istanbul ignore next */
run() {
this.countEntry('run.count');
}

/* istanbul ignore next */
exception() {
this.countEntry('run.exceptions_count');
}

// GitHub API CALLS
/* istanbul ignore next */
createAppAuthGHCallSuccess(ms: number) {
Expand Down Expand Up @@ -690,6 +700,29 @@ export class ScaleUpMetrics extends Metrics {
this.countEntry('run.skip', 1, this.getRepoDim(repo));
}

/* istanbul ignore next */
scaleUpSuccess() {
this.countEntry('run.scaleup.success');
}

/* istanbul ignore next */
scaleUpFailureRetryable(retries: number) {
this.countEntry('run.scaleup.failure.total.count');
this.addEntry('run.scaleup.failure.total.retries', retries);

this.countEntry('run.scaleup.failure.retryable.count');
this.addEntry('run.scaleup.failure.retryable.retries', retries);
}

/* istanbul ignore next */
scaleUpFailureNonRetryable(retries: number) {
this.countEntry('run.scaleup.failure.total.count');
this.addEntry('run.scaleup.failure.total.retries', retries);

this.countEntry('run.scaleup.failure.nonretryable.count');
this.addEntry('run.scaleup.failure.nonretryable.retries', retries);
}

/* istanbul ignore next */
ghRunnersRepoStats(repo: Repo, runnerType: string, total: number, labeled: number, busy: number) {
const dimensions = this.getRepoDim(repo);
Expand Down Expand Up @@ -794,16 +827,6 @@ export class ScaleDownMetrics extends Metrics {
super('scaleDown');
}

/* istanbul ignore next */
run() {
this.countEntry('run.count');
}

/* istanbul ignore next */
exception() {
this.countEntry('run.exceptions_count');
}

/* istanbul ignore next */
runnerLessMinimumTime(ec2Runner: RunnerInfo) {
this.countEntry(`run.ec2runners.notMinTime`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ describe('scaleUp', () => {
installationId: 2,
runnerLabels: [],
};
await expect(scaleUp('other', payload)).rejects.toThrow('Cannot handle non-SQS events!');
await expect(scaleUp('other', payload, metrics)).rejects.toThrow('Cannot handle non-SQS events!');
});

it('provides runnerLabels that aren`t present on runnerTypes', async () => {
Expand Down Expand Up @@ -80,7 +80,7 @@ describe('scaleUp', () => {
);
const mockedListGithubRunners = mocked(listGithubRunnersRepo);

await scaleUp('aws:sqs', payload);
await scaleUp('aws:sqs', payload, metrics);

expect(mockedGetRunnerTypes).toBeCalledTimes(1);
expect(mockedGetRunnerTypes).toBeCalledWith({ repo: 'repo', owner: 'owner' }, metrics);
Expand Down Expand Up @@ -189,7 +189,7 @@ describe('scaleUp', () => {
]);
const mockedCreateRegistrationTokenForRepo = mocked(createRegistrationTokenRepo);

await scaleUp('aws:sqs', payload);
await scaleUp('aws:sqs', payload, metrics);

expect(mockedGetRunnerTypes).toBeCalledTimes(1);
expect(mockedGetRunnerTypes).toBeCalledWith(repo, metrics);
Expand Down Expand Up @@ -261,7 +261,7 @@ describe('scaleUp', () => {
const mockedCreateRegistrationTokenForOrg = mocked(createRegistrationTokenOrg).mockResolvedValue(token);
const mockedCreateRunner = mocked(createRunner);

await scaleUp('aws:sqs', payload);
await scaleUp('aws:sqs', payload, metrics);

expect(mockedListGithubRunnersOrg).toBeCalledWith(repo.owner, metrics);
expect(mockedCreateRunner).toBeCalledTimes(1);
Expand Down Expand Up @@ -344,7 +344,7 @@ describe('scaleUp', () => {
const mockedCreateRegistrationTokenForRepo = mocked(createRegistrationTokenRepo).mockResolvedValue(token);
const mockedCreateRunner = mocked(createRunner);

await scaleUp('aws:sqs', payload);
await scaleUp('aws:sqs', payload, metrics);

expect(mockedCreateRunner).toBeCalledTimes(1);
expect(mockedCreateRunner).toBeCalledWith(
Expand Down Expand Up @@ -426,7 +426,7 @@ describe('scaleUp', () => {
const mockedCreateRegistrationTokenForRepo = mocked(createRegistrationTokenRepo).mockResolvedValue(token);
const mockedCreateRunner = mocked(createRunner);

await scaleUp('aws:sqs', payload);
await scaleUp('aws:sqs', payload, metrics);

expect(mockedCreateRunner).toBeCalledTimes(1);
expect(mockedCreateRunner).toBeCalledWith(
Expand Down Expand Up @@ -508,7 +508,7 @@ describe('scaleUp', () => {
const mockedCreateRegistrationTokenForRepo = mocked(createRegistrationTokenRepo).mockResolvedValue(token);
const mockedCreateRunner = mocked(createRunner);

await scaleUp('aws:sqs', payload);
await scaleUp('aws:sqs', payload, metrics);

expect(mockedCreateRunner).toBeCalledTimes(1);
expect(mockedCreateRunner).toBeCalledWith(
Expand Down Expand Up @@ -573,7 +573,7 @@ describe('scaleUp', () => {
]);
const mockedCreateRegistrationTokenForRepo = mocked(createRegistrationTokenRepo);

await scaleUp('aws:sqs', payload);
await scaleUp('aws:sqs', payload, metrics);

expect(mockedCreateRegistrationTokenForRepo).not.toBeCalled();
});
Expand Down Expand Up @@ -623,7 +623,7 @@ describe('scaleUp', () => {
mocked(createRegistrationTokenRepo).mockResolvedValue(token);
const mockedCreateRunner = mocked(createRunner);

await scaleUp('aws:sqs', payload);
await scaleUp('aws:sqs', payload, metrics);

expect(mockedCreateRunner).toBeCalledWith(
{
Expand Down Expand Up @@ -668,7 +668,7 @@ describe('scaleUp', () => {
mockedGetRepoIssuesWithLabel.mockResolvedValueOnce([{ something: 1 }] as unknown as GhIssues);
mockedGetRepoIssuesWithLabel.mockResolvedValueOnce([]);

await scaleUp('aws:sqs', payload);
await scaleUp('aws:sqs', payload, metrics);
expect(mockedGetRunnerTypes).not.toBeCalled();
expect(mockedGetRepoIssuesWithLabel).toBeCalledTimes(2);
expect(mockedGetRepoIssuesWithLabel).toBeCalledWith(repo, config.mustHaveIssuesLabels[0], metrics);
Expand Down Expand Up @@ -701,7 +701,7 @@ describe('scaleUp', () => {
mockedGetRepoIssuesWithLabel.mockResolvedValueOnce([]);
mockedGetRepoIssuesWithLabel.mockResolvedValueOnce([{ something: 1 }] as unknown as GhIssues);

await scaleUp('aws:sqs', payload);
await scaleUp('aws:sqs', payload, metrics);
expect(mockedGetRunnerTypes).not.toBeCalled();
expect(mockedGetRepoIssuesWithLabel).toBeCalledTimes(2);
expect(mockedGetRepoIssuesWithLabel).toBeCalledWith(repo, config.cantHaveIssuesLabels[0], metrics);
Expand Down
Loading

0 comments on commit 9d5460f

Please sign in to comment.