Skip to content

Commit

Permalink
[Tech Debt] Use knex pooling correctly and provide DB connection to P…
Browse files Browse the repository at this point in the history
…gBoss

- Initialize the knex pool instance in the top level of publisher and consumer and pass the knex instance to the classes that make DB calls.
- Update knex initialization to use the full knexfile config instead of just the connection field.
- Add PgBossKnexAdapter class to convert PgBoss SQL statements into knex raw commands.
- Pass PgBossKnexAdapter to queue client setup. This removes the separate database for queue messages.
- Remove knex pool destroy statements from the message queue consumer and from postgres publisher.
- Keep knex pool destroy statements in message queue publisher to close the pool after each interval script.
- Update CI to remove the message queue database parameter from deploy jobs
- Remove map statement from CSV formatter to avoid making a copy of the report dataset during formatting in order to reduce consumer memory usage.
- Offset timed publisher script runs so they don't ever happen in parallel to reduce publisher memory usage.
- Disable unused hourly publisher runs.
  • Loading branch information
levinmr committed Jan 3, 2025
1 parent 480499c commit a415911
Show file tree
Hide file tree
Showing 19 changed files with 184 additions and 113 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ jobs:
CF_ORGANIZATION_NAME: ${{ vars.CF_ORGANIZATION_NAME }}
CF_SPACE_NAME: ${{ vars.CF_SPACE_NAME_DEV }}
DB_SERVICE_NAME: ${{ vars.DB_SERVICE_NAME_DEV }}
MESSAGE_QUEUE_DATABASE_NAME: ${{ vars.MESSAGE_QUEUE_DATABASE_NAME }}
MESSAGE_QUEUE_NAME: ${{ vars.MESSAGE_QUEUE_NAME }}
NEW_RELIC_APP_NAME: ${{ vars.NEW_RELIC_APP_NAME_DEV }}
PROXY_FQDN: ${{ vars.PROXY_FQDN_DEV }}
Expand Down Expand Up @@ -111,7 +110,6 @@ jobs:
CF_ORGANIZATION_NAME: ${{ vars.CF_ORGANIZATION_NAME }}
CF_SPACE_NAME: ${{ vars.CF_SPACE_NAME_STG }}
DB_SERVICE_NAME: ${{ vars.DB_SERVICE_NAME_STG }}
MESSAGE_QUEUE_DATABASE_NAME: ${{ vars.MESSAGE_QUEUE_DATABASE_NAME }}
MESSAGE_QUEUE_NAME: ${{ vars.MESSAGE_QUEUE_NAME }}
NEW_RELIC_APP_NAME: ${{ vars.NEW_RELIC_APP_NAME_STG }}
PROXY_FQDN: ${{ vars.PROXY_FQDN_STG }}
Expand Down Expand Up @@ -141,7 +139,6 @@ jobs:
CF_ORGANIZATION_NAME: ${{ vars.CF_ORGANIZATION_NAME }}
CF_SPACE_NAME: ${{ vars.CF_SPACE_NAME_PRD }}
DB_SERVICE_NAME: ${{ vars.DB_SERVICE_NAME_PRD }}
MESSAGE_QUEUE_DATABASE_NAME: ${{ vars.MESSAGE_QUEUE_DATABASE_NAME }}
MESSAGE_QUEUE_NAME: ${{ vars.MESSAGE_QUEUE_NAME }}
NEW_RELIC_APP_NAME: ${{ vars.NEW_RELIC_APP_NAME_PRD }}
PROXY_FQDN: ${{ vars.PROXY_FQDN_PRD }}
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ on:
DB_SERVICE_NAME:
required: true
type: string
MESSAGE_QUEUE_DATABASE_NAME:
required: true
type: string
MESSAGE_QUEUE_NAME:
required: true
type: string
Expand Down Expand Up @@ -72,7 +69,6 @@ env:
CF_SPACE_NAME: ${{ inputs.CF_SPACE_NAME }}
DB_SERVICE_NAME: ${{ inputs.DB_SERVICE_NAME }}
GA4_CREDS: ${{ secrets.GA4_CREDS }}
MESSAGE_QUEUE_DATABASE_NAME: ${{ inputs.MESSAGE_QUEUE_DATABASE_NAME }}
MESSAGE_QUEUE_NAME: ${{ inputs.MESSAGE_QUEUE_NAME }}
NEW_RELIC_APP_NAME: ${{ inputs.NEW_RELIC_APP_NAME }}
NEW_RELIC_LICENSE_KEY: ${{ secrets.NEW_RELIC_LICENSE_KEY }}
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/manual_deploy_to_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ jobs:
CF_ORGANIZATION_NAME: ${{ vars.CF_ORGANIZATION_NAME }}
CF_SPACE_NAME: ${{ vars.CF_SPACE_NAME_DEV }}
DB_SERVICE_NAME: ${{ vars.DB_SERVICE_NAME_DEV }}
MESSAGE_QUEUE_DATABASE_NAME: ${{ vars.MESSAGE_QUEUE_DATABASE_NAME }}
MESSAGE_QUEUE_NAME: ${{ vars.MESSAGE_QUEUE_NAME }}
NEW_RELIC_APP_NAME: ${{ vars.NEW_RELIC_APP_NAME_DEV }}
PROXY_FQDN: ${{ vars.PROXY_FQDN_DEV }}
Expand Down
2 changes: 1 addition & 1 deletion deploy/api.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
export ANALYTICS_REPORTS_PATH=reports/api.json
export ANALYTICS_SCRIPT_NAME=api.sh

$ANALYTICS_ROOT_PATH/bin/analytics-publisher --debug --write-to-database --output /tmp --agenciesFile=$ANALYTICS_ROOT_PATH/deploy/agencies.json
$ANALYTICS_ROOT_PATH/bin/analytics-publisher --debug --write-to-database --agenciesFile=$ANALYTICS_ROOT_PATH/deploy/agencies.json
48 changes: 29 additions & 19 deletions deploy/cron.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ const daily_run = () => {
runScriptWithLogName(`${scriptRootPath}/daily.sh`, "daily.sh");
};

const hourly_run = () => {
/*const hourly_run = () => {
runScriptWithLogName(`${scriptRootPath}/hourly.sh`, "hourly.sh");
};
};*/

const realtime_run = () => {
runScriptWithLogName(`${scriptRootPath}/realtime.sh`, "realtime.sh");
};

/**
Daily reports run every morning at 10 AM UTC.
This calculates the offset between now and then for the next scheduled run.
*/
* Daily and API reports run every morning at 10 AM UTC.
* This calculates the offset between now and then for the next scheduled run.
*/
const calculateNextDailyRunTimeOffset = () => {
const currentTime = new Date();
const nextRunTime = new Date(
Expand All @@ -85,26 +85,36 @@ const calculateNextDailyRunTimeOffset = () => {
};

/**
* All scripts run immediately upon application start (with a 10 second delay
* All scripts run immediately upon application start (with a 60 second delay
* between each so that they don't all run at once), then run again at intervals
* going forward.
*/
setTimeout(realtime_run, 1000 * 10);
setTimeout(hourly_run, 1000 * 20);
setTimeout(daily_run, 1000 * 30);
setTimeout(api_run, 1000 * 40);
// setTimeout(hourly_run, 1000 * 70); No hourly reports exist at this time.
setTimeout(daily_run, 1000 * 70);
setTimeout(api_run, 1000 * 130);

// daily
// Daily and API recurring script run setup.
// Runs at 10 AM UTC, then every 24 hours afterwards
setTimeout(() => {
daily_run();
setInterval(daily_run, 1000 * 60 * 60 * 24);
// API
api_run();
setInterval(api_run, 1000 * 60 * 60 * 24);
// Offset the daily script run by 30 seconds so that it never runs in parallel
// with the realtime script in order to save memory/CPU.
setTimeout(() => {
daily_run();
setInterval(daily_run, 1000 * 60 * 60 * 24);
}, 1000 * 30);

// setTimeout(hourly_run, 1000 * 60);

// Offset the API script run by 90 seconds so that it never runs in parallel
// with the daily or realtime scripts in order to save memory/CPU.
setTimeout(() => {
api_run();
setInterval(api_run, 1000 * 60 * 60 * 24);
}, 1000 * 90);
}, calculateNextDailyRunTimeOffset());
// hourly
setInterval(hourly_run, 1000 * 60 * 60);
// realtime. Runs every 15 minutes.
// Google updates realtime reports every 30 minutes, so there is some overlap.
// hourly (no hourly reports exist at this time).
// setInterval(hourly_run, 1000 * 60 * 60);

// Realtime recurring script run setup. Runs every 15 minutes.
setInterval(realtime_run, 1000 * 60 * 15);
39 changes: 26 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
const { AsyncLocalStorage } = require("node:async_hooks");
const knex = require("knex");
const PgBoss = require("pg-boss");
const util = require("util");
const AppConfig = require("./src/app_config");
const ReportProcessingContext = require("./src/report_processing_context");
const Logger = require("./src/logger");
const Processor = require("./src/processor");
const PgBossKnexAdapter = require("./src/pg_boss_knex_adapter");

/**
* Gets an array of JSON report objects from the application confing, then runs
Expand Down Expand Up @@ -80,7 +82,7 @@ async function _processReport(appConfig, context, reportConfig, processor) {
await processor.processChain(context);
logger.info("Processing complete");
} catch (e) {
logger.error("Encountered an error");
logger.error("Encountered an error during report processing");
logger.error(util.inspect(e));
}
});
Expand Down Expand Up @@ -121,8 +123,8 @@ async function runQueuePublish(options = {}) {
agencyName: appConfig.agencyLogName,
scriptName: appConfig.scriptName,
});
const queueClient = await _initQueueClient(appConfig, appLogger);
const queue = "analytics-reporter-job-queue";
const knexInstance = await knex(appConfig.knexConfig);
const queueClient = await _initQueueClient(knexInstance, appLogger);

for (const agency of agencies) {
for (const reportConfig of reportConfigs) {
Expand All @@ -134,7 +136,7 @@ async function runQueuePublish(options = {}) {
});
try {
let jobId = await queueClient.send(
queue,
appConfig.messageQueueName,
_createQueueMessage(
options,
agency,
Expand All @@ -151,13 +153,17 @@ async function runQueuePublish(options = {}) {
);
if (jobId) {
reportLogger.info(
`Created job in queue: ${queue} with job ID: ${jobId}`,
`Created job in queue: ${appConfig.messageQueueName} with job ID: ${jobId}`,
);
} else {
reportLogger.info(`Found a duplicate job in queue: ${queue}`);
reportLogger.info(
`Found a duplicate job in queue: ${appConfig.messageQueueName}`,
);
}
} catch (e) {
reportLogger.error(`Error sending to queue: ${queue}`);
reportLogger.error(
`Error sending to queue: ${appConfig.messageQueueName}`,
);
reportLogger.error(util.inspect(e));
}
}
Expand All @@ -169,6 +175,9 @@ async function runQueuePublish(options = {}) {
} catch (e) {
appLogger.error("Error stopping queue client");
appLogger.error(util.inspect(e));
} finally {
appLogger.debug(`Destroying database connection pool`);
knexInstance.destroy();
}
}

Expand All @@ -189,10 +198,10 @@ function _initAgencies(agencies_file) {
return Array.isArray(agencies) ? agencies : legacyAgencies;
}

async function _initQueueClient(appConfig, logger) {
async function _initQueueClient(knexInstance, logger) {
let queueClient;
try {
queueClient = new PgBoss(appConfig.messageQueueDatabaseConnection);
queueClient = new PgBoss({ db: new PgBossKnexAdapter(knexInstance) });
await queueClient.start();
logger.debug("Starting queue client");
} catch (e) {
Expand Down Expand Up @@ -230,15 +239,19 @@ function _messagePriority(reportConfig) {
async function runQueueConsume() {
const appConfig = new AppConfig();
const appLogger = Logger.initialize();
const queueClient = await _initQueueClient(appConfig, appLogger);
const queue = "analytics-reporter-job-queue";
const knexInstance = await knex(appConfig.knexConfig);
const queueClient = await _initQueueClient(knexInstance, appLogger);

try {
const context = new ReportProcessingContext(new AsyncLocalStorage());
const processor = Processor.buildAnalyticsProcessor(appConfig, appLogger);
const processor = Processor.buildAnalyticsProcessor(
appConfig,
appLogger,
knexInstance,
);

await queueClient.work(
queue,
appConfig.messageQueueName,
{ newJobCheckIntervalSeconds: 1 },
async (message) => {
appLogger.info("Queue message received");
Expand Down
12 changes: 12 additions & 0 deletions knexfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ module.exports = {
password: process.env.POSTGRES_PASSWORD || "123abc",
port: 5432,
},
pool: {
min: 2,
max: 10,
},
},
test: {
client: "postgresql",
Expand All @@ -18,6 +22,10 @@ module.exports = {
password: process.env.POSTGRES_PASSWORD || "123abc",
port: 5432,
},
pool: {
min: 2,
max: 10,
},
migrations: {
tableName: "knex_migrations",
},
Expand All @@ -31,5 +39,9 @@ module.exports = {
password: process.env.POSTGRES_PASSWORD,
ssl: true,
},
pool: {
min: 2,
max: 10,
},
},
};
1 change: 0 additions & 1 deletion manifest.consumer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ applications:
ANALYTICS_REPORT_EMAIL: ${ANALYTICS_REPORT_EMAIL}
AWS_CACHE_TIME: '0'
GOOGLE_APPLICATION_CREDENTIALS: /home/vcap/app/${ANALYTICS_KEY_FILE_NAME}
MESSAGE_QUEUE_DATABASE_NAME: ${MESSAGE_QUEUE_DATABASE_NAME}
MESSAGE_QUEUE_NAME: ${MESSAGE_QUEUE_NAME}
NEW_RELIC_APP_NAME: ${NEW_RELIC_APP_NAME}
NEW_RELIC_LICENSE_KEY: ${NEW_RELIC_LICENSE_KEY}
Expand Down
1 change: 0 additions & 1 deletion manifest.publisher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ applications:
# The default path for reports (used for gov-wide reports)
AWS_BUCKET_PATH: data/live
AWS_CACHE_TIME: '0'
MESSAGE_QUEUE_DATABASE_NAME: ${MESSAGE_QUEUE_DATABASE_NAME}
MESSAGE_QUEUE_NAME: ${MESSAGE_QUEUE_NAME}
NEW_RELIC_APP_NAME: ${NEW_RELIC_APP_NAME}
NEW_RELIC_LICENSE_KEY: ${NEW_RELIC_LICENSE_KEY}
Expand Down
3 changes: 2 additions & 1 deletion src/actions/format_processed_analytics_data.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class FormatProcessedAnalyticsData extends Action {
*/
async executeStrategy(context) {
context.logger.debug("Formatting analytics data");
const formattedAnalyticsData = {};
let formattedAnalyticsData = {};
for (const format of context.appConfig.formats) {
formattedAnalyticsData[format] = await ResultFormatter.formatResult(
context.processedAnalyticsData,
Expand All @@ -29,6 +29,7 @@ class FormatProcessedAnalyticsData extends Action {
}
context.processedAnalyticsData = undefined;
context.formattedAnalyticsData = formattedAnalyticsData;
formattedAnalyticsData = undefined;
}
}

Expand Down
25 changes: 13 additions & 12 deletions src/app_config.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,21 @@ class AppConfig {
return this.#options.csv ? "csv" : "json";
}

/**
* Array order here is important because the CSV formatter maps headers in
* place on the analytics report object and we don't want that mapping done on
* the JSON version.
*
* @returns {string[]} the formats to use for report formatting.
*/
get formats() {
const formats = [];
if (this.#options.csv) {
formats.push("csv");
}
if (this.#options.json) {
formats.push("json");
}
if (this.#options.csv) {
formats.push("csv");
}
return formats;
}

Expand Down Expand Up @@ -194,18 +201,12 @@ class AppConfig {
};
}

get messageQueueDatabaseConnection() {
const connection =
knexfile[process.env.NODE_ENV || "development"].connection;
return `postgres://${connection.user}:${connection.password}@${connection.host}/${process.env.MESSAGE_QUEUE_DATABASE_NAME}${process.env.NODE_ENV == "production" ? "?ssl=true" : ""}`;
}

get messageQueueName() {
return process.env.MESSAGE_QUEUE_NAME;
return process.env.MESSAGE_QUEUE_NAME || "analytics_reporter_job_queue";
}

get postgres() {
return knexfile[process.env.NODE_ENV || "development"].connection;
get knexConfig() {
return knexfile[process.env.NODE_ENV || "development"];
}

get static() {
Expand Down
39 changes: 39 additions & 0 deletions src/pg_boss_knex_adapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Handles providing a database client for the Pg-Boss library using knex.
*/
class PgBossKnexAdapter {
#knex;

/**
* @param {import('knex')} knexInstance an initialized instance of the knex
* library which provides a database connection.
*/
constructor(knexInstance) {
this.#knex = knexInstance;
}

/**
* Execute PgBoss SQL using the knex library interface
*
* @param {string} sql the SQL string to execute.
* @param {string[]} parameters the parameters to insert into the SQL string.
* @returns {Promise} which resolves with the result of the SQL query.
*/
executeSql(sql, parameters = []) {
// This is needed to replace pg-boss' $1, $2 arguments
// into knex's :val, :val2 style.
const replacedSql = sql.replace(
/\$(\d+)\b/g,
(_, number) => `:param_${number}`,
);

const parametersObject = {};
parameters.forEach(
(value, index) => (parametersObject[`param_${index + 1}`] = value),
);

return this.#knex.raw(replacedSql, parametersObject);
}
}

module.exports = PgBossKnexAdapter;
Loading

0 comments on commit a415911

Please sign in to comment.