Skip to content

Commit

Permalink
Refactor/pg sink services (#725)
Browse files Browse the repository at this point in the history
* dont make HELIUS auth a required env

* uniform error message
  • Loading branch information
bryzettler authored Oct 23, 2024
1 parent 61ae28f commit ee9ca2a
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 103 deletions.
32 changes: 20 additions & 12 deletions packages/account-postgres-sink-service/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,35 @@ import os from "os";
import dotenv from "dotenv";

dotenv.config();

process.env.ANCHOR_WALLET =
process.env.ANCHOR_WALLET || os.homedir() + "/.config/solana/id.json";

const getEnvBoolean = (key: string): boolean => process.env[key] === "true";
export const SOLANA_URL = process.env.SOLANA_URL || "http://127.0.0.1:8899";

export const YELLOWSTONE_URL =
process.env.YELLOWSTONE_URL || "http://127.0.0.1:8899";
export const YELLOWSTONE_TOKEN = process.env.YELLOWSTONE_TOKEN!;

export const REFRESH_PASSWORD = process.env.REFRESH_PASSWORD;

export const PG_POOL_SIZE = Number(process.env.PG_POOL_SIZE) || 20;
export const PROGRAM_ACCOUNT_CONFIGS =
process.env.PROGRAM_ACCOUNT_CONFIGS ||
`${__dirname}/../program_account_configs_example.json`;

export const USE_YELLOWSTONE = getEnvBoolean("USE_YELLOWSTONE");
export const YELLOWSTONE_TOKEN = process.env.YELLOWSTONE_TOKEN;
export const YELLOWSTONE_URL =
process.env.YELLOWSTONE_URL || "http://127.0.0.1:8899";

export const USE_HELIUS_WEBHOOK = getEnvBoolean("USE_HELIUS_WEBHOOK");
export const HELIUS_AUTH_SECRET = process.env.HELIUS_AUTH_SECRET;
export const FETCH_DELAY_SECONDS =
Number(process.env.FETCH_DELAY_SECONDS) || 10;
export const USE_SUBSTREAMS = process.env.USE_SUBSTREAMS === "true";
export const USE_YELLOWSTONE = process.env.USE_YELLOWSTONE === "true";

export const USE_SUBSTREAMS = getEnvBoolean("USE_SUBSTREAMS");
export const SUBSTREAM = process.env.SUBSTREAM;
export const USE_KAFKA = process.env.USE_KAFKA === "true";
export const PG_POOL_SIZE = Number(process.env.PG_POOL_SIZE) || 20;

export const USE_KAFKA = getEnvBoolean("USE_KAFKA");
export const KAFKA_USER = process.env.KAFKA_USER;
export const KAFKA_GROUP_ID = process.env.KAFKA_CROUP_ID;
export const KAFKA_BROKERS = process.env.KAFKA_BROKERS?.split(",");
export const KAFKA_TOPIC = process.env.KAFKA_TOPIC;
export const KAFKA_PASSWORD = process.env.KAFKA_PASSWORD?.replace(
/(\r\n|\n|\r)/gm,
""
);
204 changes: 113 additions & 91 deletions packages/account-postgres-sink-service/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@ import { EachMessagePayload, Kafka, KafkaConfig } from "kafkajs";
import { Op } from "sequelize";
import {
HELIUS_AUTH_SECRET,
KAFKA_BROKERS,
KAFKA_GROUP_ID,
KAFKA_PASSWORD,
KAFKA_TOPIC,
KAFKA_USER,
PG_POOL_SIZE,
PROGRAM_ACCOUNT_CONFIGS,
REFRESH_PASSWORD,
SUBSTREAM,
USE_HELIUS_WEBHOOK,
USE_KAFKA,
USE_SUBSTREAMS,
USE_YELLOWSTONE,
PG_POOL_SIZE,
} from "./env";
import { getPluginsByAccountTypeByProgram } from "./plugins";
import { metrics } from "./plugins/metrics";
Expand All @@ -45,12 +51,8 @@ import { integrityCheckProgramAccounts } from "./utils/integrityCheckProgramAcco
import { provider } from "./utils/solana";
import { upsertProgramAccounts } from "./utils/upsertProgramAccounts";

if (!HELIUS_AUTH_SECRET) {
throw new Error("Helius auth secret not available");
}

if (PG_POOL_SIZE < 5) {
throw new Error("PG Pool size must be minimum of 5");
throw new Error("PG_POOL_SIZE must be minimum of 5");
}

(async () => {
Expand Down Expand Up @@ -101,16 +103,18 @@ if (PG_POOL_SIZE < 5) {
for (const config of configs) {
if ((programId && programId == config.programId) || !programId) {
console.log(
programId
? `Refreshing accounts for program: ${programId}`
: `Refreshing accounts`
`Refreshing accounts for program: ${config.programId}`
);

try {
await upsertProgramAccounts({
programId: new PublicKey(config.programId),
accounts: config.accounts,
});
console.log(`Accounts refreshed for program: ${programId}`);

console.log(
`Accounts refreshed for program: ${config.programId}`
);
} catch (err) {
throw err;
}
Expand Down Expand Up @@ -233,90 +237,97 @@ if (PG_POOL_SIZE < 5) {
}
}
}
server.post<{ Body: any[] }>("/transaction-webhook", async (req, res) => {
if (req.headers.authorization != HELIUS_AUTH_SECRET) {
res.code(StatusCodes.FORBIDDEN).send({
message: "Invalid authorization",
});
return;
}
if (refreshing) {
res.code(StatusCodes.SERVICE_UNAVAILABLE).send({
message: "Refresh is happening, cannot create transactions",
});
return;
}

try {
const transactions = req.body as TransactionResponse[];
const writableAccountKeys = transactions.flatMap((tx) =>
getWritableAccountKeys(
tx.transaction.message.accountKeys,
tx.transaction.message.header
)
);

await insertTransactionAccounts(
await getMultipleAccounts({
connection: provider.connection,
keys: writableAccountKeys,
})
);
res.code(StatusCodes.OK).send(ReasonPhrases.OK);
} catch (err) {
res.code(StatusCodes.INTERNAL_SERVER_ERROR).send(err);
console.error(err);
if (USE_HELIUS_WEBHOOK) {
if (!HELIUS_AUTH_SECRET) {
throw new Error("HELIUS_AUTH_SECRET undefined");
}
});

server.post("/account-webhook", async (req, res) => {
if (req.headers.authorization != HELIUS_AUTH_SECRET) {
res.code(StatusCodes.FORBIDDEN).send({
message: "Invalid authorization",
});
return;
}
if (refreshing) {
res.code(StatusCodes.SERVICE_UNAVAILABLE).send({
message: "Refresh is happening, cannot create transactions",
});
return;
}
server.post<{ Body: any[] }>("/transaction-webhook", async (req, res) => {
if (req.headers.authorization != HELIUS_AUTH_SECRET) {
res.code(StatusCodes.FORBIDDEN).send({
message: "Invalid authorization",
});
return;
}
if (refreshing) {
res.code(StatusCodes.SERVICE_UNAVAILABLE).send({
message: "Refresh is happening, cannot create transactions",
});
return;
}

try {
const accounts = req.body as any[];
try {
const transactions = req.body as TransactionResponse[];
const writableAccountKeys = transactions.flatMap((tx) =>
getWritableAccountKeys(
tx.transaction.message.accountKeys,
tx.transaction.message.header
)
);

if (configs) {
for (const account of accounts) {
const parsed = account["account"]["parsed"];
const config = configs.find((x) => x.programId == parsed["owner"]);

if (!config) {
// exit early if account doesn't need to be saved
res.code(StatusCodes.OK).send(ReasonPhrases.OK);
return;
}
await insertTransactionAccounts(
await getMultipleAccounts({
connection: provider.connection,
keys: writableAccountKeys,
})
);
res.code(StatusCodes.OK).send(ReasonPhrases.OK);
} catch (err) {
res.code(StatusCodes.INTERNAL_SERVER_ERROR).send(err);
console.error(err);
}
});

try {
await handleAccountWebhook({
fastify: server,
programId: new PublicKey(config.programId),
accounts: config.accounts,
account: parsed,
pluginsByAccountType:
pluginsByAccountTypeByProgram[parsed["owner"]] || {},
});
} catch (err) {
throw err;
server.post("/account-webhook", async (req, res) => {
if (req.headers.authorization != HELIUS_AUTH_SECRET) {
res.code(StatusCodes.FORBIDDEN).send({
message: "Invalid authorization",
});
return;
}
if (refreshing) {
res.code(StatusCodes.SERVICE_UNAVAILABLE).send({
message: "Refresh is happening, cannot create transactions",
});
return;
}

try {
const accounts = req.body as any[];

if (configs) {
for (const account of accounts) {
const parsed = account["account"]["parsed"];
const config = configs.find((x) => x.programId == parsed["owner"]);

if (!config) {
// exit early if account doesn't need to be saved
res.code(StatusCodes.OK).send(ReasonPhrases.OK);
return;
}

try {
await handleAccountWebhook({
fastify: server,
programId: new PublicKey(config.programId),
accounts: config.accounts,
account: parsed,
pluginsByAccountType:
pluginsByAccountTypeByProgram[parsed["owner"]] || {},
});
} catch (err) {
throw err;
}
}
}
res.code(StatusCodes.OK).send(ReasonPhrases.OK);
} catch (err) {
res.code(StatusCodes.INTERNAL_SERVER_ERROR).send(err);
console.error(err);
}
res.code(StatusCodes.OK).send(ReasonPhrases.OK);
} catch (err) {
res.code(StatusCodes.INTERNAL_SERVER_ERROR).send(err);
console.error(err);
}
});
});
}

try {
// models are defined on boot, and updated in refresh-accounts
Expand All @@ -341,25 +352,32 @@ if (PG_POOL_SIZE < 5) {
}

if (USE_KAFKA) {
if (!KAFKA_USER) throw new Error("KAFKA_USER undefined");
if (!KAFKA_TOPIC) throw new Error("KAFKA_TOPIC undefined");
if (!KAFKA_BROKERS) throw new Error("KAFKA_BROKERS undefined");
if (!KAFKA_PASSWORD) throw new Error("KAFKA_PASSWORD undefined");
if (!KAFKA_GROUP_ID) throw new Error("KAFKA_GROUP_ID undefined");

const kafkaConfig: KafkaConfig = {
ssl: true,
clientId: "helium-reader",
brokers: KAFKA_BROKERS,
sasl: {
mechanism: "scram-sha-512",
username: process.env.KAFKA_USER!,
// Remove newlines from password
password: process.env.KAFKA_PASSWORD!.replace(/(\r\n|\n|\r)/gm, ""),
username: KAFKA_USER,
password: KAFKA_PASSWORD,
},
brokers: process.env.KAFKA_BROKERS!.split(","),
};

const kafka = new Kafka(kafkaConfig);
const consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID! });
const consumer = kafka.consumer({ groupId: KAFKA_GROUP_ID });

await consumer.connect();
await consumer.subscribe({
topic: process.env.KAFKA_TOPIC!,
topic: KAFKA_TOPIC,
fromBeginning: false,
});

await consumer.run({
eachMessage: async ({ message }: EachMessagePayload) => {
if (message.value) {
Expand All @@ -369,7 +387,9 @@ if (PG_POOL_SIZE < 5) {
pubkey,
isDelete,
} = JSON.parse(message.value.toString());

const config = configs.find((x) => x.programId == programId);

if (config) {
await handleAccountWebhook({
fastify: server,
Expand All @@ -390,6 +410,8 @@ if (PG_POOL_SIZE < 5) {
}

if (USE_SUBSTREAMS) {
if (!SUBSTREAM) throw new Error("SUBSTREAM undefined");

await Cursor.sync();
const lastCursor = await Cursor.findOne({
order: [["createdAt", "DESC"]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ export const setupYellowstone = async (
server: FastifyInstance,
configs: IConfig[]
) => {
if (!YELLOWSTONE_TOKEN) {
throw new Error("YELLOWSTONE_TOKEN undefined");
}

let isReconnecting = false;
const pluginsByAccountTypeByProgram = await getPluginsByAccountTypeByProgram(
configs
Expand Down

0 comments on commit ee9ca2a

Please sign in to comment.