diff --git a/Procfile b/Procfile new file mode 100644 index 0000000000..e24905e36b --- /dev/null +++ b/Procfile @@ -0,0 +1,5 @@ +hubble: cd apps/hubble && rm -rf .rocks && yarn build && yarn start --eth-mainnet-rpc-url $ETH_RPC_URL --l2-rpc-url $L2_RPC_URL --hub-operator-fid 1 --disable-snapshot-sync --catchup-sync-with-snapshot false --network 1 --disable-console-status --admin-server-enabled > hub.log +snapchain1: cd ../snapchain-v0 && rm -rf nodes/1/.rocks && cargo run -- --config-path nodes/1/snapchain.toml > snapchain1.log +snapchain2: cd ../snapchain-v0 && rm -rf nodes/2/.rocks && cargo run -- --config-path nodes/2/snapchain.toml > snapchain2.log +snapchain3: cd ../snapchain-v0 && rm -rf nodes/3/.rocks && cargo run -- --config-path nodes/3/snapchain.toml > snapchain3.log +migrate: cd packages/shuttle && sleep 30 && yarn build && ONCHAIN_EVENTS_HUB_HOST=kassad.merkle.zone:2283 HUB_HOST=127.0.0.1:2283 HUB_ADMIN_HOST=127.0.0.1:2284 SNAPCHAIN_HOST=127.0.0.1:3383 yarn migrate:onchain-events > onchain_events.log&& sleep 30 && ONCHAIN_EVENTS_HUB_HOST=kassad.merkle.zone:2283 HUB_HOST=127.0.0.1:2283 HUB_ADMIN_HOST=127.0.0.1:2284 SNAPCHAIN_HOST=127.0.0.1:3383 yarn migrate:messages > messages.log && sleep 120 && ONCHAIN_EVENTS_HUB_HOST=kassad.merkle.zone:2283 HUB_HOST=127.0.0.1:2283 HUB_ADMIN_HOST=127.0.0.1:2284 SNAPCHAIN_HOST=127.0.0.1:3383 yarn migrate:validate > validate.log \ No newline at end of file diff --git a/apps/hubble/src/cli.ts b/apps/hubble/src/cli.ts index 35849e35a4..7a9ce448a1 100644 --- a/apps/hubble/src/cli.ts +++ b/apps/hubble/src/cli.ts @@ -106,6 +106,7 @@ app "The block number to begin syncing events from L2 Farcaster contracts", parseNumber, ) + .option("--l2-stop-block ", "The block number to stop syncing L2 events at", parseNumber) .option( "--l2-chunk-size ", "The number of events to fetch from L2 Farcaster contracts at a time", @@ -178,6 +179,7 @@ app .option("--profile-sync", "Profile a full hub sync and exit. (default: disabled)") .option("--rebuild-sync-trie", "Rebuild the sync trie before starting (default: disabled)") .option("--resync-name-events", "Resync events from the Fname server before starting (default: disabled)") + .option("--stop-fname-transfer-id ", "Fname transfer id to stop at", parseNumber) .option( "--chunk-size ", `The number of blocks to batch when syncing historical events from Farcaster contracts. (default: ${DEFAULT_CHUNK_SIZE})`, @@ -519,6 +521,7 @@ app l2KeyRegistryAddress: cliOptions.l2KeyRegistryAddress ?? hubConfig.l2KeyRegistryAddress, l2StorageRegistryAddress: cliOptions.l2StorageRegistryAddress ?? hubConfig.l2StorageRegistryAddress, l2FirstBlock: cliOptions.l2FirstBlock ?? hubConfig.l2FirstBlock, + l2StopBlock: cliOptions.l2StopBlock, l2ChunkSize: cliOptions.l2ChunkSize ?? hubConfig.l2ChunkSize, l2ChainId: cliOptions.l2ChainId ?? hubConfig.l2ChainId, l2ResyncEvents: cliOptions.l2ResyncEvents ?? hubConfig.l2ResyncEvents ?? false, diff --git a/apps/hubble/src/eth/fnameRegistryEventsProvider.ts b/apps/hubble/src/eth/fnameRegistryEventsProvider.ts index 6ece03d48f..891c85e8a6 100644 --- a/apps/hubble/src/eth/fnameRegistryEventsProvider.ts +++ b/apps/hubble/src/eth/fnameRegistryEventsProvider.ts @@ -65,16 +65,23 @@ export class FNameRegistryEventsProvider { private client: FNameRegistryClientInterface; private hub: HubInterface; private lastTransferId = 0; + private stopTransferId?: number; private resyncEvents: boolean; private pollTimeoutId: ReturnType | undefined; private serverSignerAddress: Uint8Array; private shouldStop = false; - constructor(fnameRegistryClient: FNameRegistryClientInterface, hub: HubInterface, resyncEvents = false) { + constructor( + fnameRegistryClient: FNameRegistryClientInterface, + hub: HubInterface, + resyncEvents = false, + stopTransferId?: number, + ) { this.client = fnameRegistryClient; this.hub = hub; this.resyncEvents = resyncEvents; this.serverSignerAddress = new Uint8Array(); + this.stopTransferId = stopTransferId; } public async start() { @@ -161,6 +168,9 @@ export class FNameRegistryEventsProvider { private async mergeTransfers(transfers: FNameTransfer[]) { for (const transfer of transfers) { + if (this.stopTransferId && transfer.id > this.stopTransferId) { + break; + } const serialized = Result.combine([ utf8StringToBytes(transfer.username), hexStringToBytes(transfer.owner), diff --git a/apps/hubble/src/eth/l2EventsProvider.ts b/apps/hubble/src/eth/l2EventsProvider.ts index b634f3c177..f2077fefbd 100644 --- a/apps/hubble/src/eth/l2EventsProvider.ts +++ b/apps/hubble/src/eth/l2EventsProvider.ts @@ -85,6 +85,7 @@ export class L2EventsProvider; private _firstBlock: number; + private _stopBlock?: number; private _chunkSize: number; private _chainId: number; private _rentExpiry: number; @@ -151,10 +152,12 @@ export class L2EventsProvider { const l2RpcUrls = l2RpcUrl.split(","); const transports = l2RpcUrls.map((url) => @@ -232,6 +236,7 @@ export class L2EventsProvider { + public async start(): HubAsyncResult { // Connect to L2 RPC // Start the contract watchers first, so we cache events while we sync historical events - this._watchStorageContractEvents?.start(); - this._watchIdRegistryV2ContractEvents?.start(); - this._watchKeyRegistryV2ContractEvents?.start(); + // this._watchStorageContractEvents?.start(); + // this._watchIdRegistryV2ContractEvents?.start(); + // this._watchKeyRegistryV2ContractEvents?.start(); const syncHistoryResult = await this.connectAndSyncHistoricalEvents(); if (syncHistoryResult.isErr()) { @@ -256,7 +261,8 @@ export class L2EventsProvider { - const latestBlockResult = await ResultAsync.fromPromise(getBlockNumber(this._publicClient), (err) => err); - if (latestBlockResult.isErr()) { - diagnosticReporter().reportError(latestBlockResult.error as Error); - const msg = "failed to connect to optimism node. Check your eth RPC URL (e.g. --l2-rpc-url)"; - log.error({ err: latestBlockResult.error }, msg); - return err(new HubError("unavailable.network_failure", msg)); - } - const latestBlock = Number(latestBlockResult.value); - - if (!latestBlock) { - const msg = "failed to get the latest block from the RPC provider"; - log.error(msg); - return err(new HubError("unavailable.network_failure", msg)); - } - - log.info({ latestBlock: latestBlock }, "connected to optimism node"); - // Find how how much we need to sync let lastSyncedBlock = this._firstBlock; @@ -635,6 +624,28 @@ export class L2EventsProvider err); + if (latestBlockResult.isErr()) { + diagnosticReporter().reportError(latestBlockResult.error as Error); + const msg = "failed to connect to optimism node. Check your eth RPC URL (e.g. --l2-rpc-url)"; + log.error({ err: latestBlockResult.error }, msg); + return err(new HubError("unavailable.network_failure", msg)); + } + latestBlock = Number(latestBlockResult.value); + + if (!latestBlock) { + const msg = "failed to get the latest block from the RPC provider"; + log.error(msg); + return err(new HubError("unavailable.network_failure", msg)); + } + + log.info({ latestBlock: latestBlock }, "connected to optimism node"); + } + const toBlock = latestBlock; if (lastSyncedBlock < toBlock) { diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index 1cfe52a4c1..eb9335c6db 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -254,6 +254,8 @@ export interface HubOptions { /** Block number to begin syncing events from for L2 */ l2FirstBlock?: number; + l2StopBlock?: number; + /** Number of blocks to batch when syncing historical events for L2 */ l2ChunkSize?: number; @@ -272,6 +274,9 @@ export interface HubOptions { /** Resync fname events */ resyncNameEvents?: boolean; + /** Id of last fname transfer to ingest */ + stopFnameTransferId?: number; + /** Name of the RocksDB instance */ rocksDBName?: string; @@ -432,6 +437,7 @@ export class Hub implements HubInterface { options.l2ChainId ?? OptimismConstants.ChainId, options.l2ResyncEvents ?? false, options.l2RentExpiryOverride, + options.l2StopBlock, ); } else { log.warn("No L2 RPC URL provided, unable to sync L2 contract events"); @@ -443,6 +449,7 @@ export class Hub implements HubInterface { new FNameRegistryClient(options.fnameServerUrl), this, options.resyncNameEvents ?? false, + options.stopFnameTransferId, ); } else { log.warn("No FName Registry URL provided, unable to sync fname events"); @@ -808,34 +815,34 @@ export class Hub implements HubInterface { : undefined; // Start the Gossip node - await this.gossipNode.start(this.bootstrapAddrs(), { - peerId, - ipMultiAddr: this.options.ipMultiAddr, - announceIp: this.options.announceIp, - gossipPort: this.options.gossipPort, - allowedPeerIdStrs: this.allowedPeerIds, - deniedPeerIdStrs: this.deniedPeerIds, - directPeers: this.options.directPeers, - allowlistedImmunePeers: this.options.allowlistedImmunePeers, - applicationScoreCap: this.options.applicationScoreCap, - strictNoSign: this.strictNoSign, - connectToDbPeers: this.options.connectToDbPeers, - statsdParams: this.options.statsdParams, - }); + // await this.gossipNode.start(this.bootstrapAddrs(), { + // peerId, + // ipMultiAddr: this.options.ipMultiAddr, + // announceIp: this.options.announceIp, + // gossipPort: this.options.gossipPort, + // allowedPeerIdStrs: this.allowedPeerIds, + // deniedPeerIdStrs: this.deniedPeerIds, + // directPeers: this.options.directPeers, + // allowlistedImmunePeers: this.options.allowlistedImmunePeers, + // applicationScoreCap: this.options.applicationScoreCap, + // strictNoSign: this.strictNoSign, + // connectToDbPeers: this.options.connectToDbPeers, + // statsdParams: this.options.statsdParams, + // }); await this.registerEventHandlers(); // Start cron tasks this.pruneMessagesJobScheduler.start(this.options.pruneMessagesJobCron); - this.periodSyncJobScheduler.start(); + // this.periodSyncJobScheduler.start(); this.pruneEventsJobScheduler.start(this.options.pruneEventsJobCron); this.checkFarcasterVersionJobScheduler.start(); - this.validateOrRevokeMessagesJobScheduler.start(); + // this.validateOrRevokeMessagesJobScheduler.start(); const randomMinute = Math.floor(Math.random() * 15); - this.gossipContactInfoJobScheduler.start(`${randomMinute}-59/15 * * * *`); // Weird syntax but required by cron, random minute every 15 minutes + // this.gossipContactInfoJobScheduler.start(`${randomMinute}-59/15 * * * *`); // Weird syntax but required by cron, random minute every 15 minutes this.checkIncomingPortsJobScheduler.start(); - this.measureSyncHealthJobScheduler.start(); + // this.measureSyncHealthJobScheduler.start(); // Mainnet only jobs if (this.options.network === FarcasterNetwork.MAINNET) { diff --git a/apps/hubble/src/rpc/adminServer.ts b/apps/hubble/src/rpc/adminServer.ts index 84488342c0..33e7170330 100644 --- a/apps/hubble/src/rpc/adminServer.ts +++ b/apps/hubble/src/rpc/adminServer.ts @@ -152,7 +152,7 @@ export default class AdminServer { submitOnChainEvent: async (call, callback) => { const authResult = await authenticateUser(call.metadata, this.rpcUsers); if (authResult.isErr()) { - logger.warn({ errMsg: authResult.error.message }, "submitOnChainEvent failed"); + log.warn({ errMsg: authResult.error.message }, "submitOnChainEvent failed"); callback( toServiceError(new HubError("unauthenticated", `gRPC authentication failed: ${authResult.error.message}`)), ); @@ -161,6 +161,7 @@ export default class AdminServer { const onChainEvent = call.request; const result = await this.hub?.submitOnChainEvent(onChainEvent, "rpc"); + log.info({ result, fid: onChainEvent.fid, type: onChainEvent.type }, "submitOnChainEvent complete"); result?.match( () => { callback(null, onChainEvent); @@ -170,6 +171,50 @@ export default class AdminServer { }, ); }, + submitUserNameProof: async (call, callback) => { + const authResult = await authenticateUser(call.metadata, this.rpcUsers); + if (authResult.isErr()) { + log.warn({ errMsg: authResult.error.message }, "submitUserNameProof failed"); + callback( + toServiceError(new HubError("unauthenticated", `gRPC authentication failed: ${authResult.error.message}`)), + ); + return; + } + + const usernameProof = call.request; + const result = await this.hub?.submitUserNameProof(usernameProof, "rpc"); + log.info({ result, fid: usernameProof.fid }, "submitUserNameProof complete"); + result?.match( + () => { + callback(null, usernameProof); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, + pruneMessages: async (call, callback) => { + const authResult = await authenticateUser(call.metadata, this.rpcUsers); + if (authResult.isErr()) { + log.warn({ errMsg: authResult.error.message }, "pruneMessages failed"); + callback( + toServiceError(new HubError("unauthenticated", `gRPC authentication failed: ${authResult.error.message}`)), + ); + return; + } + + const fid = call.request.fid; + const result = await this.hub?.engine.pruneMessages(fid); + log.info({ result, fid }, "pruneMessages complete"); + result?.match( + (numMessagesPruned) => { + callback(null, { numMessagesPruned }); + }, + (err: HubError) => { + callback(toServiceError(err)); + }, + ); + }, }; }; } diff --git a/apps/hubble/src/rpc/server.ts b/apps/hubble/src/rpc/server.ts index 310d22cb43..07db0ab454 100644 --- a/apps/hubble/src/rpc/server.ts +++ b/apps/hubble/src/rpc/server.ts @@ -933,18 +933,18 @@ export default class Server { getOnChainEvents: async (call, callback) => this.getOnChainEventsRPC(call, callback), submitMessage: async (call, callback) => { // Identify peer that is calling, if available. This is used for rate limiting. - const peer = Result.fromThrowable( - () => call.getPeer(), - (e) => e, - )().unwrapOr("unavailable"); + // const peer = Result.fromThrowable( + // () => call.getPeer(), + // (e) => e, + // )().unwrapOr("unavailable"); statsd().increment("rpc.open_request_count", { method: "submitMessage" }); - const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter); - if (rateLimitResult.isErr()) { - logger.warn({ peer }, "submitMessage rate limited"); - callback(toServiceError(new HubError("unavailable", "API rate limit exceeded"))); - return; - } + // const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter); + // if (rateLimitResult.isErr()) { + // logger.warn({ peer }, "submitMessage rate limited"); + // callback(toServiceError(new HubError("unavailable", "API rate limit exceeded"))); + // return; + // } // Authentication const authResult = authenticateUser(call.metadata, this.rpcUsers); @@ -972,19 +972,19 @@ export default class Server { }, submitBulkMessages: async (call, callback) => { // Identify peer that is calling, if available. This is used for rate limiting. - const peer = Result.fromThrowable( - () => call.getPeer(), - (e) => e, - )().unwrapOr("unavailable"); + // const peer = Result.fromThrowable( + // () => call.getPeer(), + // (e) => e, + // )().unwrapOr("unavailable"); statsd().increment("rpc.open_request_count", { method: "submitBulkMessages" }); // Check for rate limits - const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter); - if (rateLimitResult.isErr()) { - logger.warn({ peer }, "submitBulkMessages rate limited"); - callback(toServiceError(new HubError("unavailable", "API rate limit exceeded"))); - return; - } + // const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter); + // if (rateLimitResult.isErr()) { + // logger.warn({ peer }, "submitBulkMessages rate limited"); + // callback(toServiceError(new HubError("unavailable", "API rate limit exceeded"))); + // return; + // } // Authentication const authResult = authenticateUser(call.metadata, this.rpcUsers); diff --git a/apps/hubble/src/storage/engine/index.ts b/apps/hubble/src/storage/engine/index.ts index b8727378ab..0bfaf2c443 100644 --- a/apps/hubble/src/storage/engine/index.ts +++ b/apps/hubble/src/storage/engine/index.ts @@ -301,11 +301,11 @@ class Engine extends TypedEmitter { } const limiter = getRateLimiterForTotalMessages(totalUnits * this._totalPruneSize); - const isRateLimited = await isRateLimitedByKey(`${fid}`, limiter); - if (isRateLimited) { - log.warn({ fid }, "rate limit exceeded for FID"); - return err(new HubError("unavailable", `rate limit exceeded for FID ${fid}`)); - } + // const isRateLimited = await isRateLimitedByKey(`${fid}`, limiter); + // if (isRateLimited) { + // log.warn({ fid }, "rate limit exceeded for FID"); + // return err(new HubError("unavailable", `rate limit exceeded for FID ${fid}`)); + // } return ok({ i, fid, limiter, message }); } @@ -323,15 +323,15 @@ class Engine extends TypedEmitter { if (result.isErr()) { mergeResults.set(i, result); // Try to request on chain event if it's missing - if ( - result.error.errCode === "bad_request.no_storage" || - "bad_request.unknown_signer" || - "bad_request.missing_fid" - ) { - const fid = message.data?.fid ?? 0; - // Don't await because we don't want to block hubs from processing new messages. - this._l2EventsProvider?.retryEventsForFid(fid); - } + // if ( + // result.error.errCode === "bad_request.no_storage" || + // "bad_request.unknown_signer" || + // "bad_request.missing_fid" + // ) { + // const fid = message.data?.fid ?? 0; + // Don't await because we don't want to block hubs from processing new messages. + // this._l2EventsProvider?.retryEventsForFid(fid); + // } } else { validatedMessages.push(result.value); } @@ -540,6 +540,7 @@ class Engine extends TypedEmitter { } async pruneMessages(fid: number): HubAsyncResult { + await this.clearStorageCacheForFid(fid); const logPruneResult = (result: HubResult, store: string): number => { return result.match( (ids) => { @@ -968,6 +969,7 @@ class Engine extends TypedEmitter { return err(validatedFid.error); } + await this.clearStorageCacheForFid(fid); const slot = await this.eventHandler.getCurrentStorageSlotForFid(fid); if (slot.isErr()) { @@ -1004,6 +1006,14 @@ class Engine extends TypedEmitter { }); } + async clearStorageCacheForFid(fid: number): HubAsyncResult { + const limits = getStoreLimits([]); + for (const limit of limits) { + await this.eventHandler.clearCachedMessageCount(fid, limit.storeType); + } + return ok(undefined); + } + async getUserNameProof(name: Uint8Array, retries = 1): HubAsyncResult { const nameString = bytesToUtf8String(name); if (nameString.isErr()) { @@ -1028,17 +1038,17 @@ class Engine extends TypedEmitter { const result = await ResultAsync.fromPromise(this._userDataStore.getUserNameProof(name), (e) => e as HubError); - if (result.isErr() && result.error.errCode === "not_found" && retries > 0 && this._fNameRegistryEventsProvider) { - const rateLimitResult = await ResultAsync.fromPromise( - this._fNameRetryRateLimiter.consume(0), - () => new HubError("unavailable", "Too many requests to fName server"), - ); - if (rateLimitResult.isErr()) { - return err(rateLimitResult.error); - } - await this._fNameRegistryEventsProvider.retryTransferByName(name); - return this.getUserNameProof(name, retries - 1); - } + // if (result.isErr() && result.error.errCode === "not_found" && retries > 0 && this._fNameRegistryEventsProvider) { + // const rateLimitResult = await ResultAsync.fromPromise( + // this._fNameRetryRateLimiter.consume(0), + // () => new HubError("unavailable", "Too many requests to fName server"), + // ); + // if (rateLimitResult.isErr()) { + // return err(rateLimitResult.error); + // } + // await this._fNameRegistryEventsProvider.retryTransferByName(name); + // return this.getUserNameProof(name, retries - 1); + // } return result; } diff --git a/apps/hubble/src/storage/jobs/pruneMessagesJob.ts b/apps/hubble/src/storage/jobs/pruneMessagesJob.ts index e048a7521d..0c4f4468cc 100644 --- a/apps/hubble/src/storage/jobs/pruneMessagesJob.ts +++ b/apps/hubble/src/storage/jobs/pruneMessagesJob.ts @@ -6,7 +6,7 @@ import { logger } from "../../utils/logger.js"; import { statsd } from "../../utils/statsd.js"; import { sleep } from "../../utils/crypto.js"; -export const DEFAULT_PRUNE_MESSAGES_JOB_CRON = "0 */2 * * *"; // Every two hours +export const DEFAULT_PRUNE_MESSAGES_JOB_CRON = "*/5 * * * *"; // Every two hours // How much time to allocate to pruning each fid. // 1000 fids per second = 1 fid per ms. 500k fids will take under 10 minutes diff --git a/apps/hubble/src/storage/stores/storageCache.ts b/apps/hubble/src/storage/stores/storageCache.ts index 8d29a8f3e1..94c4c2adf7 100644 --- a/apps/hubble/src/storage/stores/storageCache.ts +++ b/apps/hubble/src/storage/stores/storageCache.ts @@ -17,7 +17,7 @@ import { } from "@farcaster/hub-nodejs"; import { err, ok } from "neverthrow"; import RocksDB from "../db/rocksdb.js"; -import { FID_BYTES, OnChainEventPostfix, RootPrefix, UserMessagePostfix } from "../db/types.js"; +import { FID_BYTES, OnChainEventPostfix, RootPrefix, UserMessagePostfix, UserPostfix } from "../db/types.js"; import { logger } from "../../utils/logger.js"; import { makeFidKey, makeMessagePrimaryKey, makeTsHash, typeToSetPostfix } from "../db/message.js"; import { bytesCompare, getFarcasterTime, HubAsyncResult } from "@farcaster/core"; @@ -151,6 +151,12 @@ export class StorageCache { } } + async clearMessageCount(fid: number, set: UserMessagePostfix): Promise { + this._counts.delete(makeKey(fid, set)); + this._earliestTsHashes.delete(makeKey(fid, set)); + await this.getMessageCount(fid, set, true); + } + async getCurrentStorageSlotForFid(fid: number): HubAsyncResult { let slot = this._activeStorageSlots.get(fid); diff --git a/apps/hubble/src/storage/stores/storeEventHandler.ts b/apps/hubble/src/storage/stores/storeEventHandler.ts index 5fbaa0915c..386501aeef 100644 --- a/apps/hubble/src/storage/stores/storeEventHandler.ts +++ b/apps/hubble/src/storage/stores/storeEventHandler.ts @@ -227,6 +227,15 @@ class StoreEventHandler extends TypedEmitter { return await this._storageCache.getMessageCount(fid, set, forceFetch); } + async clearCachedMessageCount(fid: number, store: StoreType): HubAsyncResult { + const set = STORE_TO_SET[store]; + if (!set) { + return err(new HubError("bad_request.invalid_param", `invalid store type ${store}`)); + } + await this._storageCache.clearMessageCount(fid, set); + return ok(undefined); + } + async getMaxMessageCount(fid: number, set: UserMessagePostfix): HubAsyncResult { const slot = await this.getCurrentStorageSlotForFid(fid); diff --git a/apps/hubble/src/utils/logger.ts b/apps/hubble/src/utils/logger.ts index 6edc0dd84c..99a64afe6a 100644 --- a/apps/hubble/src/utils/logger.ts +++ b/apps/hubble/src/utils/logger.ts @@ -179,7 +179,11 @@ export type Logger = pino.Logger; export const messageTypeToName = (type?: MessageType) => { if (!type) return ""; - return (MessageType[type] as string).replace("MESSAGE_TYPE_", ""); + if (Object.values(MessageType).includes(type)) { + return (MessageType[type] as string).replace("MESSAGE_TYPE_", ""); + } else { + return `RAW_MESSAGE_TYPE: ${type}`; + } }; export const messageToLog = (message: Message) => { diff --git a/packages/core/src/protobufs/generated/request_response.ts b/packages/core/src/protobufs/generated/request_response.ts index 2670ddb630..0b5b7a0815 100644 --- a/packages/core/src/protobufs/generated/request_response.ts +++ b/packages/core/src/protobufs/generated/request_response.ts @@ -416,6 +416,14 @@ export interface StreamFetchResponse { error?: StreamError | undefined; } +export interface PruneMessagesRequest { + fid: number; +} + +export interface PruneMessagesResponse { + numMessagesPruned: number; +} + function createBaseEmpty(): Empty { return {}; } @@ -5082,6 +5090,118 @@ export const StreamFetchResponse = { }, }; +function createBasePruneMessagesRequest(): PruneMessagesRequest { + return { fid: 0 }; +} + +export const PruneMessagesRequest = { + encode(message: PruneMessagesRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.fid !== 0) { + writer.uint32(8).uint64(message.fid); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PruneMessagesRequest { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBasePruneMessagesRequest(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 8) { + break; + } + + message.fid = longToNumber(reader.uint64() as Long); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): PruneMessagesRequest { + return { fid: isSet(object.fid) ? Number(object.fid) : 0 }; + }, + + toJSON(message: PruneMessagesRequest): unknown { + const obj: any = {}; + message.fid !== undefined && (obj.fid = Math.round(message.fid)); + return obj; + }, + + create, I>>(base?: I): PruneMessagesRequest { + return PruneMessagesRequest.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): PruneMessagesRequest { + const message = createBasePruneMessagesRequest(); + message.fid = object.fid ?? 0; + return message; + }, +}; + +function createBasePruneMessagesResponse(): PruneMessagesResponse { + return { numMessagesPruned: 0 }; +} + +export const PruneMessagesResponse = { + encode(message: PruneMessagesResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.numMessagesPruned !== 0) { + writer.uint32(8).uint64(message.numMessagesPruned); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PruneMessagesResponse { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBasePruneMessagesResponse(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 8) { + break; + } + + message.numMessagesPruned = longToNumber(reader.uint64() as Long); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): PruneMessagesResponse { + return { numMessagesPruned: isSet(object.numMessagesPruned) ? Number(object.numMessagesPruned) : 0 }; + }, + + toJSON(message: PruneMessagesResponse): unknown { + const obj: any = {}; + message.numMessagesPruned !== undefined && (obj.numMessagesPruned = Math.round(message.numMessagesPruned)); + return obj; + }, + + create, I>>(base?: I): PruneMessagesResponse { + return PruneMessagesResponse.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): PruneMessagesResponse { + const message = createBasePruneMessagesResponse(); + message.numMessagesPruned = object.numMessagesPruned ?? 0; + return message; + }, +}; + declare var self: any | undefined; declare var window: any | undefined; declare var global: any | undefined; diff --git a/packages/core/src/validations.ts b/packages/core/src/validations.ts index 1a5f9600ee..db1786be38 100644 --- a/packages/core/src/validations.ts +++ b/packages/core/src/validations.ts @@ -19,7 +19,7 @@ export const ALLOWED_CLOCK_SKEW_SECONDS = 10 * 60; export const FNAME_REGEX = /^[a-z0-9][a-z0-9-]{0,15}$/; export const HEX_REGEX = /^(0x)?[0-9A-Fa-f]+$/; -export const TWITTER_REGEX = /^[a-z0-9_]{0,15}$/; +export const TWITTER_REGEX = /^[a-zA-Z0-9_]{0,15}$/; export const GITHUB_REGEX = /^[a-z\d](?:[a-z\d]|-(?!-)){0,38}$/i; export const USERNAME_MAX_LENGTH = 20; diff --git a/packages/hub-nodejs/src/generated/request_response.ts b/packages/hub-nodejs/src/generated/request_response.ts index 2670ddb630..0b5b7a0815 100644 --- a/packages/hub-nodejs/src/generated/request_response.ts +++ b/packages/hub-nodejs/src/generated/request_response.ts @@ -416,6 +416,14 @@ export interface StreamFetchResponse { error?: StreamError | undefined; } +export interface PruneMessagesRequest { + fid: number; +} + +export interface PruneMessagesResponse { + numMessagesPruned: number; +} + function createBaseEmpty(): Empty { return {}; } @@ -5082,6 +5090,118 @@ export const StreamFetchResponse = { }, }; +function createBasePruneMessagesRequest(): PruneMessagesRequest { + return { fid: 0 }; +} + +export const PruneMessagesRequest = { + encode(message: PruneMessagesRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.fid !== 0) { + writer.uint32(8).uint64(message.fid); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PruneMessagesRequest { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBasePruneMessagesRequest(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 8) { + break; + } + + message.fid = longToNumber(reader.uint64() as Long); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): PruneMessagesRequest { + return { fid: isSet(object.fid) ? Number(object.fid) : 0 }; + }, + + toJSON(message: PruneMessagesRequest): unknown { + const obj: any = {}; + message.fid !== undefined && (obj.fid = Math.round(message.fid)); + return obj; + }, + + create, I>>(base?: I): PruneMessagesRequest { + return PruneMessagesRequest.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): PruneMessagesRequest { + const message = createBasePruneMessagesRequest(); + message.fid = object.fid ?? 0; + return message; + }, +}; + +function createBasePruneMessagesResponse(): PruneMessagesResponse { + return { numMessagesPruned: 0 }; +} + +export const PruneMessagesResponse = { + encode(message: PruneMessagesResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.numMessagesPruned !== 0) { + writer.uint32(8).uint64(message.numMessagesPruned); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PruneMessagesResponse { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBasePruneMessagesResponse(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 8) { + break; + } + + message.numMessagesPruned = longToNumber(reader.uint64() as Long); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): PruneMessagesResponse { + return { numMessagesPruned: isSet(object.numMessagesPruned) ? Number(object.numMessagesPruned) : 0 }; + }, + + toJSON(message: PruneMessagesResponse): unknown { + const obj: any = {}; + message.numMessagesPruned !== undefined && (obj.numMessagesPruned = Math.round(message.numMessagesPruned)); + return obj; + }, + + create, I>>(base?: I): PruneMessagesResponse { + return PruneMessagesResponse.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): PruneMessagesResponse { + const message = createBasePruneMessagesResponse(); + message.numMessagesPruned = object.numMessagesPruned ?? 0; + return message; + }, +}; + declare var self: any | undefined; declare var window: any | undefined; declare var global: any | undefined; diff --git a/packages/hub-nodejs/src/generated/rpc.ts b/packages/hub-nodejs/src/generated/rpc.ts index 20666df059..40fce76011 100644 --- a/packages/hub-nodejs/src/generated/rpc.ts +++ b/packages/hub-nodejs/src/generated/rpc.ts @@ -36,6 +36,8 @@ import { MessagesResponse, OnChainEventRequest, OnChainEventResponse, + PruneMessagesRequest, + PruneMessagesResponse, ReactionRequest, ReactionsByFidRequest, ReactionsByTargetRequest, @@ -1411,6 +1413,15 @@ export const AdminServiceService = { responseSerialize: (value: Empty) => Buffer.from(Empty.encode(value).finish()), responseDeserialize: (value: Buffer) => Empty.decode(value), }, + pruneMessages: { + path: "/AdminService/PruneMessages", + requestStream: false, + responseStream: false, + requestSerialize: (value: PruneMessagesRequest) => Buffer.from(PruneMessagesRequest.encode(value).finish()), + requestDeserialize: (value: Buffer) => PruneMessagesRequest.decode(value), + responseSerialize: (value: PruneMessagesResponse) => Buffer.from(PruneMessagesResponse.encode(value).finish()), + responseDeserialize: (value: Buffer) => PruneMessagesResponse.decode(value), + }, submitOnChainEvent: { path: "/AdminService/SubmitOnChainEvent", requestStream: false, @@ -1420,12 +1431,23 @@ export const AdminServiceService = { responseSerialize: (value: OnChainEvent) => Buffer.from(OnChainEvent.encode(value).finish()), responseDeserialize: (value: Buffer) => OnChainEvent.decode(value), }, + submitUserNameProof: { + path: "/AdminService/SubmitUserNameProof", + requestStream: false, + responseStream: false, + requestSerialize: (value: UserNameProof) => Buffer.from(UserNameProof.encode(value).finish()), + requestDeserialize: (value: Buffer) => UserNameProof.decode(value), + responseSerialize: (value: UserNameProof) => Buffer.from(UserNameProof.encode(value).finish()), + responseDeserialize: (value: Buffer) => UserNameProof.decode(value), + }, } as const; export interface AdminServiceServer extends UntypedServiceImplementation { rebuildSyncTrie: handleUnaryCall; deleteAllMessagesFromDb: handleUnaryCall; + pruneMessages: handleUnaryCall; submitOnChainEvent: handleUnaryCall; + submitUserNameProof: handleUnaryCall; } export interface AdminServiceClient extends Client { @@ -1456,6 +1478,21 @@ export interface AdminServiceClient extends Client { options: Partial, callback: (error: ServiceError | null, response: Empty) => void, ): ClientUnaryCall; + pruneMessages( + request: PruneMessagesRequest, + callback: (error: ServiceError | null, response: PruneMessagesResponse) => void, + ): ClientUnaryCall; + pruneMessages( + request: PruneMessagesRequest, + metadata: Metadata, + callback: (error: ServiceError | null, response: PruneMessagesResponse) => void, + ): ClientUnaryCall; + pruneMessages( + request: PruneMessagesRequest, + metadata: Metadata, + options: Partial, + callback: (error: ServiceError | null, response: PruneMessagesResponse) => void, + ): ClientUnaryCall; submitOnChainEvent( request: OnChainEvent, callback: (error: ServiceError | null, response: OnChainEvent) => void, @@ -1471,6 +1508,21 @@ export interface AdminServiceClient extends Client { options: Partial, callback: (error: ServiceError | null, response: OnChainEvent) => void, ): ClientUnaryCall; + submitUserNameProof( + request: UserNameProof, + callback: (error: ServiceError | null, response: UserNameProof) => void, + ): ClientUnaryCall; + submitUserNameProof( + request: UserNameProof, + metadata: Metadata, + callback: (error: ServiceError | null, response: UserNameProof) => void, + ): ClientUnaryCall; + submitUserNameProof( + request: UserNameProof, + metadata: Metadata, + options: Partial, + callback: (error: ServiceError | null, response: UserNameProof) => void, + ): ClientUnaryCall; } export const AdminServiceClient = makeGenericClientConstructor(AdminServiceService, "AdminService") as unknown as { diff --git a/packages/hub-web/src/generated/request_response.ts b/packages/hub-web/src/generated/request_response.ts index 2670ddb630..0b5b7a0815 100644 --- a/packages/hub-web/src/generated/request_response.ts +++ b/packages/hub-web/src/generated/request_response.ts @@ -416,6 +416,14 @@ export interface StreamFetchResponse { error?: StreamError | undefined; } +export interface PruneMessagesRequest { + fid: number; +} + +export interface PruneMessagesResponse { + numMessagesPruned: number; +} + function createBaseEmpty(): Empty { return {}; } @@ -5082,6 +5090,118 @@ export const StreamFetchResponse = { }, }; +function createBasePruneMessagesRequest(): PruneMessagesRequest { + return { fid: 0 }; +} + +export const PruneMessagesRequest = { + encode(message: PruneMessagesRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.fid !== 0) { + writer.uint32(8).uint64(message.fid); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PruneMessagesRequest { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBasePruneMessagesRequest(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 8) { + break; + } + + message.fid = longToNumber(reader.uint64() as Long); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): PruneMessagesRequest { + return { fid: isSet(object.fid) ? Number(object.fid) : 0 }; + }, + + toJSON(message: PruneMessagesRequest): unknown { + const obj: any = {}; + message.fid !== undefined && (obj.fid = Math.round(message.fid)); + return obj; + }, + + create, I>>(base?: I): PruneMessagesRequest { + return PruneMessagesRequest.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): PruneMessagesRequest { + const message = createBasePruneMessagesRequest(); + message.fid = object.fid ?? 0; + return message; + }, +}; + +function createBasePruneMessagesResponse(): PruneMessagesResponse { + return { numMessagesPruned: 0 }; +} + +export const PruneMessagesResponse = { + encode(message: PruneMessagesResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.numMessagesPruned !== 0) { + writer.uint32(8).uint64(message.numMessagesPruned); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PruneMessagesResponse { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBasePruneMessagesResponse(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag != 8) { + break; + } + + message.numMessagesPruned = longToNumber(reader.uint64() as Long); + continue; + } + if ((tag & 7) == 4 || tag == 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): PruneMessagesResponse { + return { numMessagesPruned: isSet(object.numMessagesPruned) ? Number(object.numMessagesPruned) : 0 }; + }, + + toJSON(message: PruneMessagesResponse): unknown { + const obj: any = {}; + message.numMessagesPruned !== undefined && (obj.numMessagesPruned = Math.round(message.numMessagesPruned)); + return obj; + }, + + create, I>>(base?: I): PruneMessagesResponse { + return PruneMessagesResponse.fromPartial(base ?? {}); + }, + + fromPartial, I>>(object: I): PruneMessagesResponse { + const message = createBasePruneMessagesResponse(); + message.numMessagesPruned = object.numMessagesPruned ?? 0; + return message; + }, +}; + declare var self: any | undefined; declare var window: any | undefined; declare var global: any | undefined; diff --git a/packages/hub-web/src/generated/rpc.ts b/packages/hub-web/src/generated/rpc.ts index b3be5450de..7007ee8cc8 100644 --- a/packages/hub-web/src/generated/rpc.ts +++ b/packages/hub-web/src/generated/rpc.ts @@ -24,6 +24,8 @@ import { MessagesResponse, OnChainEventRequest, OnChainEventResponse, + PruneMessagesRequest, + PruneMessagesResponse, ReactionRequest, ReactionsByFidRequest, ReactionsByTargetRequest, @@ -1526,7 +1528,9 @@ export const HubServiceGetSyncSnapshotByPrefixDesc: UnaryMethodDefinitionish = { export interface AdminService { rebuildSyncTrie(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; deleteAllMessagesFromDb(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; + pruneMessages(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; submitOnChainEvent(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; + submitUserNameProof(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise; } export class AdminServiceClientImpl implements AdminService { @@ -1536,7 +1540,9 @@ export class AdminServiceClientImpl implements AdminService { this.rpc = rpc; this.rebuildSyncTrie = this.rebuildSyncTrie.bind(this); this.deleteAllMessagesFromDb = this.deleteAllMessagesFromDb.bind(this); + this.pruneMessages = this.pruneMessages.bind(this); this.submitOnChainEvent = this.submitOnChainEvent.bind(this); + this.submitUserNameProof = this.submitUserNameProof.bind(this); } rebuildSyncTrie(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise { @@ -1547,9 +1553,17 @@ export class AdminServiceClientImpl implements AdminService { return this.rpc.unary(AdminServiceDeleteAllMessagesFromDbDesc, Empty.fromPartial(request), metadata); } + pruneMessages(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise { + return this.rpc.unary(AdminServicePruneMessagesDesc, PruneMessagesRequest.fromPartial(request), metadata); + } + submitOnChainEvent(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise { return this.rpc.unary(AdminServiceSubmitOnChainEventDesc, OnChainEvent.fromPartial(request), metadata); } + + submitUserNameProof(request: DeepPartial, metadata?: grpcWeb.grpc.Metadata): Promise { + return this.rpc.unary(AdminServiceSubmitUserNameProofDesc, UserNameProof.fromPartial(request), metadata); + } } export const AdminServiceDesc = { serviceName: "AdminService" }; @@ -1600,6 +1614,29 @@ export const AdminServiceDeleteAllMessagesFromDbDesc: UnaryMethodDefinitionish = } as any, }; +export const AdminServicePruneMessagesDesc: UnaryMethodDefinitionish = { + methodName: "PruneMessages", + service: AdminServiceDesc, + requestStream: false, + responseStream: false, + requestType: { + serializeBinary() { + return PruneMessagesRequest.encode(this).finish(); + }, + } as any, + responseType: { + deserializeBinary(data: Uint8Array) { + const value = PruneMessagesResponse.decode(data); + return { + ...value, + toObject() { + return value; + }, + }; + }, + } as any, +}; + export const AdminServiceSubmitOnChainEventDesc: UnaryMethodDefinitionish = { methodName: "SubmitOnChainEvent", service: AdminServiceDesc, @@ -1623,6 +1660,29 @@ export const AdminServiceSubmitOnChainEventDesc: UnaryMethodDefinitionish = { } as any, }; +export const AdminServiceSubmitUserNameProofDesc: UnaryMethodDefinitionish = { + methodName: "SubmitUserNameProof", + service: AdminServiceDesc, + requestStream: false, + responseStream: false, + requestType: { + serializeBinary() { + return UserNameProof.encode(this).finish(); + }, + } as any, + responseType: { + deserializeBinary(data: Uint8Array) { + const value = UserNameProof.decode(data); + return { + ...value, + toObject() { + return value; + }, + }; + }, + } as any, +}; + interface UnaryMethodDefinitionishR extends grpcWeb.grpc.UnaryMethodDefinition { requestStream: any; responseStream: any; diff --git a/packages/shuttle/package.json b/packages/shuttle/package.json index 649a27cf61..45024d7af8 100644 --- a/packages/shuttle/package.json +++ b/packages/shuttle/package.json @@ -11,9 +11,7 @@ "types": "./dist/index.d.ts" } }, - "files": [ - "dist" - ], + "files": ["dist"], "license": "MIT", "dependencies": { "@farcaster/hot-shots": "^10.0.0", @@ -33,7 +31,7 @@ "scripts": { "build": "tsup --config tsup.config.ts", "start": "tsx src/example-app/app.ts", - "clean": "rimraf ./dist", + "migrate:backfill": "tsx src/shuttle/migration.ts backfill", "lint": "biome format src/ --write && biome check src/ --apply", "lint:ci": "biome ci src/", "test": "NODE_OPTIONS=--experimental-vm-modules jest --detectOpenHandles --forceExit", diff --git a/packages/shuttle/src/example-app/env.ts b/packages/shuttle/src/example-app/env.ts index 257c8d9d2a..13a5bc1bf8 100644 --- a/packages/shuttle/src/example-app/env.ts +++ b/packages/shuttle/src/example-app/env.ts @@ -6,6 +6,9 @@ export const COLORIZE = export const LOG_LEVEL = process.env["LOG_LEVEL"] || "info"; export const HUB_HOST = process.env["HUB_HOST"] || "localhost:2283"; +export const HUB_ADMIN_HOST = process.env["HUB_ADMIN_HOST"] || "127.0.0.1:2284"; +export const ONCHAIN_EVENTS_HUB_HOST = process.env["ONCHAIN_EVENTS_HUB_HOST"] || "localhost:2283"; +export const SNAPCHAIN_HOST = process.env["SNAPCHAIN_HOST"] || "127.0.0.1:3383"; export const HUB_SSL = process.env["HUB_SSL"] === "true" ? true : false; export const POSTGRES_URL = process.env["POSTGRES_URL"] || "postgres://localhost:5432"; @@ -16,7 +19,8 @@ export const TOTAL_SHARDS = parseInt(process.env["SHARDS"] || "0"); export const SHARD_INDEX = parseInt(process.env["SHARD_NUM"] || "0"); export const BACKFILL_FIDS = process.env["FIDS"] || ""; -export const MAX_FID = process.env["MAX_FID"]; +export const MAX_FID = process.env["MAX_FID"] || "1"; +export const MIN_FID = process.env["MIN_FID"] || "1"; export const STATSD_HOST = process.env["STATSD_HOST"]; export const STATSD_METRICS_PREFIX = process.env["STATSD_METRICS_PREFIX"] || "shuttle."; diff --git a/packages/shuttle/src/shuttle/index.ts b/packages/shuttle/src/shuttle/index.ts index 4cfeecf000..505ef963e1 100644 --- a/packages/shuttle/src/shuttle/index.ts +++ b/packages/shuttle/src/shuttle/index.ts @@ -9,6 +9,7 @@ export * from "./hubEventProcessor"; export * from "./messageProcessor"; export * from "./messageReconciliation"; export * from "./eventStream"; +export * from "./migration"; export type StoreMessageOperation = "merge" | "delete" | "revoke" | "prune"; export type MessageState = "created" | "deleted"; diff --git a/packages/shuttle/src/shuttle/messageReconciliation.ts b/packages/shuttle/src/shuttle/messageReconciliation.ts index 972cde86fa..f22ec78618 100644 --- a/packages/shuttle/src/shuttle/messageReconciliation.ts +++ b/packages/shuttle/src/shuttle/messageReconciliation.ts @@ -134,7 +134,7 @@ export class MessageReconciliation { } // Next, reconcile messages that are in the database but not in the hub - const dbMessages = await this.allActiveDbMessagesOfTypeForFid(fid, type, startTimestamp, stopTimestamp); + const dbMessages = await allDbMessagesOfTypeForFid(this.db, fid, [type], startTimestamp, stopTimestamp); if (dbMessages.isErr()) { this.log.error({ startTimestamp, stopTimestamp }, "Invalid time range provided to reconciliation"); return; @@ -399,71 +399,62 @@ export class MessageReconciliation { }); } } +} - private async allActiveDbMessagesOfTypeForFid( - fid: number, - type: MessageType, - startTimestamp?: number, - stopTimestamp?: number, - ) { - let typeSet: MessageType[] = [type]; - // Add remove types for messages which support them - switch (type) { - case MessageType.CAST_ADD: - typeSet = [...typeSet, MessageType.CAST_REMOVE]; - break; - case MessageType.REACTION_ADD: - typeSet = [...typeSet, MessageType.REACTION_REMOVE]; - break; - case MessageType.LINK_ADD: - typeSet = [...typeSet, MessageType.LINK_REMOVE, MessageType.LINK_COMPACT_STATE]; - break; - case MessageType.VERIFICATION_ADD_ETH_ADDRESS: - typeSet = [...typeSet, MessageType.VERIFICATION_REMOVE]; - break; - } - - let startDate; - if (startTimestamp) { - const startUnixTimestampResult = fromFarcasterTime(startTimestamp); - if (startUnixTimestampResult.isErr()) { - return err(startUnixTimestampResult.error); - } - - startDate = new Date(startUnixTimestampResult.value); +export async function allDbMessagesOfTypeForFid( + db: DB, + fid: number, + types: MessageType[], + startTimestamp?: number, + stopTimestamp?: number, + limit?: number, + offset?: number, + includePruned = false, + includeDeleted = false, + includeRevoked = false, +) { + let startDate; + if (startTimestamp) { + const startUnixTimestampResult = fromFarcasterTime(startTimestamp); + if (startUnixTimestampResult.isErr()) { + return err(startUnixTimestampResult.error); } - let stopDate; - if (stopTimestamp) { - const stopUnixTimestampResult = fromFarcasterTime(stopTimestamp); - if (stopUnixTimestampResult.isErr()) { - return err(stopUnixTimestampResult.error); - } + startDate = new Date(startUnixTimestampResult.value); + } - stopDate = new Date(stopUnixTimestampResult.value); + let stopDate; + if (stopTimestamp) { + const stopUnixTimestampResult = fromFarcasterTime(stopTimestamp); + if (stopUnixTimestampResult.isErr()) { + return err(stopUnixTimestampResult.error); } - const query = this.db - .selectFrom("messages") - .select([ - "messages.prunedAt", - "messages.revokedAt", - "messages.hash", - "messages.type", - "messages.fid", - "messages.raw", - "messages.signer", - ]) - .where("messages.fid", "=", fid) - .where("messages.type", "in", typeSet) - .where("messages.prunedAt", "is", null) - .where("messages.revokedAt", "is", null) - .where("messages.deletedAt", "is", null); - const queryWithStartTime = startDate ? query.where("messages.timestamp", ">=", startDate) : query; - const queryWithStopTime = stopDate - ? queryWithStartTime.where("messages.timestamp", "<=", stopDate) - : queryWithStartTime; - const result = await queryWithStopTime.execute(); - return ok(result); + stopDate = new Date(stopUnixTimestampResult.value); } + + const query = db + .selectFrom("messages") + .select([ + "messages.prunedAt", + "messages.revokedAt", + "messages.hash", + "messages.type", + "messages.fid", + "messages.raw", + "messages.signer", + ]) + .where("messages.fid", "=", fid) + .where("messages.type", "in", types); + const queryWithPruned = includePruned ? query : query.where("messages.prunedAt", "is", null); + const queryWithRevoked = includeRevoked ? queryWithPruned : queryWithPruned.where("messages.revokedAt", "is", null); + const queryWithDeleted = includeDeleted ? queryWithRevoked : queryWithRevoked.where("messages.deletedAt", "is", null); + const queryWithStartTime = startDate ? queryWithDeleted.where("messages.timestamp", ">=", startDate) : query; + const queryWithStopTime = stopDate + ? queryWithStartTime.where("messages.timestamp", "<=", stopDate) + : queryWithStartTime; + const queryWithLimit = limit ? queryWithStopTime.limit(limit) : queryWithStopTime; + const queryWithOffset = offset ? queryWithLimit.offset(offset) : queryWithLimit; + const result = await queryWithOffset.execute(); + return ok(result); } diff --git a/packages/shuttle/src/shuttle/migration.ts b/packages/shuttle/src/shuttle/migration.ts new file mode 100644 index 0000000000..5e75eedfa8 --- /dev/null +++ b/packages/shuttle/src/shuttle/migration.ts @@ -0,0 +1,388 @@ +import { DB, getDbClient, getHubClient, allDbMessagesOfTypeForFid } from "../index"; // If you want to use this as a standalone app, replace this import with "@farcaster/shuttle" +import { + AdminRpcClient, + getAdminRpcClient, + HubError, + HubResult, + HubRpcClient, + Message, + MessageData, + MessageType, + OnChainEventRequest, + OnChainEventType, + StoreType, + UserDataType, +} from "@farcaster/hub-nodejs"; +import { log } from "../example-app/log"; +import { Argument, Command } from "@commander-js/extra-typings"; +import { readFileSync } from "fs"; +import { + BACKFILL_FIDS, + HUB_ADMIN_HOST, + HUB_HOST, + MAX_FID, + MIN_FID, + ONCHAIN_EVENTS_HUB_HOST, + POSTGRES_SCHEMA, + POSTGRES_URL, + SNAPCHAIN_HOST, +} from "../example-app/env"; +import * as process from "node:process"; +import url from "node:url"; +import { err, ok } from "neverthrow"; +import { sleep } from "src/utils"; + +export class Migration { + private snapchainClient: HubRpcClient; + private snapchainAdminClient: AdminRpcClient; + private hubClient: HubRpcClient; + private hubAdminClient: AdminRpcClient; + private onchainEventsHubClient: HubRpcClient; + private backendDb: DB; + + constructor( + backendDb: DB, + snapchainClient: HubRpcClient, + hubClient: HubRpcClient, + snapchainAdminClient: AdminRpcClient, + hubAdminClient: AdminRpcClient, + onchainEventsHubClient: HubRpcClient, + ) { + this.backendDb = backendDb; + this.snapchainClient = snapchainClient; + this.hubClient = hubClient; + this.snapchainAdminClient = snapchainAdminClient; + this.hubAdminClient = hubAdminClient; + this.onchainEventsHubClient = onchainEventsHubClient; + } + + static async create( + dbSchema: string, + onchainEventsHubUrl: string, + snapchainUrl: string, + hubUrl: string, + hubAdminUrl: string, + ) { + const backendDb = getDbClient(POSTGRES_URL, dbSchema); + const snapchainClient = getHubClient(snapchainUrl, { ssl: false }).client; + const snapchainAdminClient = await getAdminRpcClient(snapchainUrl); + const onchainEventsHubClient = getHubClient(onchainEventsHubUrl, { ssl: true }); + const hubClient = getHubClient(hubUrl, { ssl: false }); + const hubAdminClient = await getAdminRpcClient(hubAdminUrl); + return new Migration( + backendDb, + snapchainClient, + hubClient.client, + snapchainAdminClient, + hubAdminClient, + onchainEventsHubClient.client, + ); + } + + async ingestOnchainEvents(fid: number, eventType: OnChainEventType) { + // Just sync all of them, no need to reconcile + // TODO(aditi): Handle multiple pages + const result = await this.onchainEventsHubClient.getOnChainEvents(OnChainEventRequest.create({ fid, eventType })); + + if (result.isErr()) { + return err(result.error); + } + + let numOnchainEvents = 0; + let numSkipped = 0; + for (const onChainEvent of result.value.events) { + const snapchainResult = await this.submitWithRetry(3, () => { + return this.snapchainAdminClient.submitOnChainEvent(onChainEvent); + }); + if (snapchainResult.isErr()) { + log.info( + `Unable to submit onchain event to snapchain ${snapchainResult.error.message} ${snapchainResult.error.stack}`, + ); + if (this.shouldRetry(snapchainResult.error)) { + log.info("Skipping hub submit to avoid mismatch"); + numSkipped += 1; + continue; + } + } + + const hubResult = await this.hubAdminClient.submitOnChainEvent(onChainEvent); + if (hubResult.isErr()) { + log.info(`Unable to submit onchain event to hub ${hubResult.error.message} ${hubResult.error.stack}`); + } + + numOnchainEvents += 1; + } + + log.info(`Processed ${numOnchainEvents} onchain events for fid ${fid}. Skipped ${numSkipped}.`); + + return ok(undefined); + } + + async ingestUsernameProofs(fids: number[]) { + for (const fid of fids) { + const result = await this.onchainEventsHubClient.getUserNameProofsByFid({ fid }); + if (result.isErr()) { + log.info(`Unable to get username proofs for fid ${fid}`); + continue; + } + + let numSkipped = 0; + for (const usernameProof of result.value.proofs) { + const snapchainResult = await this.submitWithRetry(3, () => { + return this.snapchainAdminClient.submitUserNameProof(usernameProof); + }); + if (snapchainResult.isErr()) { + log.info(`Unable to submit username proof for fid ${fid} to snapchain ${snapchainResult.error.message}`); + if (this.shouldRetry(snapchainResult.error)) { + log.info("Skipping hub submit to avoid mismatch"); + numSkipped += 1; + continue; + } + } + + const hubResult = await this.hubAdminClient.submitUserNameProof(usernameProof); + if (hubResult.isErr()) { + log.info(`Unable to submit username proof for fid ${fid} to hub ${hubResult.error.message}`); + } + } + + log.info(`Processed ${result.value.proofs.length} username proofs for fid ${fid}. Skipped ${numSkipped}.`); + } + } + + async ingestAllOnchainEvents(fids: number[]) { + for (const fid of fids) { + const storageResult = await this.ingestOnchainEvents(fid, OnChainEventType.EVENT_TYPE_STORAGE_RENT); + if (storageResult.isErr()) { + log.info( + `Unable to get storage events for fid ${fid} ${storageResult.error.message} ${storageResult.error.stack}`, + ); + } + + const signerResult = await this.ingestOnchainEvents(fid, OnChainEventType.EVENT_TYPE_SIGNER); + if (signerResult.isErr()) { + log.info( + `Unable to get signer events for fid ${fid} ${signerResult.error.message} ${signerResult.error.stack}`, + ); + } + + // TODO(aditi): Signer migrated for fid 0 + + const idRegisterResult = await this.ingestOnchainEvents(fid, OnChainEventType.EVENT_TYPE_ID_REGISTER); + if (idRegisterResult.isErr()) { + log.info( + `Unable to get signer events for fid ${fid} ${idRegisterResult.error.message} ${idRegisterResult.error.stack}`, + ); + } + } + } + + shouldRetry(error: HubError) { + return error.message.includes("channel is full"); + } + + async submitWithRetry(numRetries: number, submitFn: () => Promise>) { + let result = await submitFn(); + let numRetriesRemaining = numRetries; + let waitTime = 1000; + + if (result.isErr() && this.shouldRetry(result.error) && numRetriesRemaining > 0) { + await sleep(waitTime); + numRetriesRemaining -= 1; + waitTime *= 2; + result = await submitFn(); + } + + return result; + } + + async compareMessageCounts(fid: number) { + const hubCounts = await this.hubClient.getCurrentStorageLimitsByFid({ fid }); + + if (hubCounts.isErr()) { + return err(hubCounts.error); + } + + const snapchainCounts = await this.snapchainClient.getCurrentStorageLimitsByFid({ fid }); + + if (snapchainCounts.isErr()) { + return err(snapchainCounts.error); + } + + const hubUsages = new Map(); + for (const limits of hubCounts.value.limits) { + hubUsages.set(limits.storeType, limits.used); + } + + const snapchainUsages = new Map(); + for (const limits of snapchainCounts.value.limits) { + snapchainUsages.set(limits.storeType, limits.used); + } + + for (const [storeType, hubUsage] of hubUsages.entries()) { + const snapchainUsage = snapchainUsages.get(storeType); + if (hubUsage !== snapchainUsage) { + log.info( + `USAGE MISMATCH: fid: ${fid} store_type: ${StoreType[storeType]}, hub usage: ${hubUsage}, snapchain usage ${snapchainUsage}`, + ); + } + } + + return ok(undefined); + } + + async ingestMessagesFromDb(fids: number[], shuffleMessages = false) { + for (const fid of fids) { + let numMessages = 0; + let numErrorsOnHub = 0; + let numErrorsOnSnapchain = 0; + let numSkipped = 0; + const messageTypes = [ + MessageType.CAST_ADD, + MessageType.CAST_REMOVE, + MessageType.REACTION_ADD, + MessageType.REACTION_REMOVE, + MessageType.LINK_ADD, + MessageType.LINK_REMOVE, + MessageType.LINK_COMPACT_STATE, + MessageType.VERIFICATION_ADD_ETH_ADDRESS, + MessageType.VERIFICATION_REMOVE, + MessageType.USER_DATA_ADD, + ]; + const numMessagesByType = new Map(); + let pageNumber = 0; + const pageSize = 100; + while (true) { + const messages = await allDbMessagesOfTypeForFid( + this.backendDb, + fid, + messageTypes, + undefined, + undefined, + pageSize, + pageSize * pageNumber, + // true, include pruned + // true, include deleted + // true, include revoked + ); + if (messages.isErr()) { + throw messages.error; + } + + if (messages.value.length === 0) { + break; + } + + const shuffledMessages = shuffleMessages + ? messages.value.sort(() => { + return Math.random() - 0.5; + }) + : messages.value; + + for (const dbMessage of shuffledMessages) { + const message = Message.decode(dbMessage.raw); + + if (!message.data) { + continue; + } + + const newMessage = Message.create(message); + newMessage.dataBytes = MessageData.encode(message.data).finish(); + newMessage.data = undefined; + + const snapchainResult = await this.submitWithRetry(3, async () => { + return this.snapchainClient.submitMessage(newMessage); + }); + const hubResult = await this.hubClient.submitMessage(newMessage); + + if (snapchainResult.isErr()) { + log.info( + `Unable to submit message to snapchain ${snapchainResult.error.message} ${snapchainResult.error.stack}`, + ); + if (this.shouldRetry(snapchainResult.error)) { + log.info("Skipping hub submit to avoid mismatch"); + numSkipped += 1; + continue; + } + numErrorsOnSnapchain += 1; + } + + if (hubResult.isErr()) { + log.info(`Unable to submit message to hub ${hubResult.error.message} ${hubResult.error.stack}`); + numErrorsOnHub += 1; + } + + numMessages += 1; + const countForType = numMessagesByType.get(message.data.type) ?? 0; + numMessagesByType.set(message.data.type, countForType + 1); + } + + pageNumber += 1; + } + + for (const [type, count] of numMessagesByType.entries()) { + log.info(`Submitted ${count} messages for fid ${fid} for type ${MessageType[type]}`); + } + + const result = await this.hubAdminClient.pruneMessages({ fid }); + let numMessagesPruned = 0; + if (result.isErr()) { + log.info(`Unable to prune hub messages for fid ${fid}`); + } else { + numMessagesPruned = result.value.numMessagesPruned; + } + + // TODO(aditi): This log should have better structure. + log.info( + `Submitted ${numMessages} messages for fid ${fid}. ${numErrorsOnHub} hub errors. ${numErrorsOnSnapchain} snapchain errors. ${numMessagesPruned} pruned on hub. Skipped ${numSkipped}.`, + ); + } + } +} + +function selectFids() { + return BACKFILL_FIDS + ? BACKFILL_FIDS.split(",").map((fid) => parseInt(fid)) + : Array.from({ length: parseInt(MAX_FID) - parseInt(MIN_FID) + 1 }, (_, i) => parseInt(MIN_FID) + i); +} + +if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString())) { + async function backfill() { + const migration = await Migration.create( + POSTGRES_SCHEMA, + ONCHAIN_EVENTS_HUB_HOST, + SNAPCHAIN_HOST, + HUB_HOST, + HUB_ADMIN_HOST, + ); + + const fids = selectFids(); + await migration.ingestAllOnchainEvents(fids); + log.info("Done migrating onchain events"); + await migration.ingestUsernameProofs(fids); + log.info("Done migrating usernme proofs"); + // Sleep 30s + await sleep(30_000); + await migration.ingestMessagesFromDb(fids); + log.info("Done migrating messages"); + // Sleep 2 minutes + await sleep(120_000); + for (const fid of fids) { + const compareResult = await migration.compareMessageCounts(fid); + if (compareResult.isErr()) { + log.info(`Error comparing message counts ${compareResult.error}`); + } + } + + return; + } + + const program = new Command() + .name("migration") + .description("Synchronizes a Farcaster Hub with a Postgres database") + .version(JSON.parse(readFileSync("./package.json").toString()).version); + + program.command("backfill").description("Queue up backfill for the worker").action(backfill); + + program.parse(process.argv); +} diff --git a/protobufs/schemas/request_response.proto b/protobufs/schemas/request_response.proto index ff9a91e83c..c75d3bc70c 100644 --- a/protobufs/schemas/request_response.proto +++ b/protobufs/schemas/request_response.proto @@ -350,4 +350,12 @@ message StreamFetchResponse { MessagesResponse messages = 2; StreamError error = 3; } +} + +message PruneMessagesRequest { + uint64 fid = 1; +} + +message PruneMessagesResponse { + uint64 num_messages_pruned = 1; } \ No newline at end of file diff --git a/protobufs/schemas/rpc.proto b/protobufs/schemas/rpc.proto index 79f6df6d05..0ef9111dcb 100644 --- a/protobufs/schemas/rpc.proto +++ b/protobufs/schemas/rpc.proto @@ -123,6 +123,8 @@ service HubService { service AdminService { rpc RebuildSyncTrie(Empty) returns (Empty); rpc DeleteAllMessagesFromDb(Empty) returns (Empty); + rpc PruneMessages(PruneMessagesRequest) returns (PruneMessagesResponse); rpc SubmitOnChainEvent(OnChainEvent) returns (OnChainEvent); + rpc SubmitUserNameProof(UserNameProof) returns (UserNameProof); }