Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow uppercase x usernames #2490

Closed
5 changes: 5 additions & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
hubble: cd apps/hubble && rm -rf .rocks && yarn build && yarn start --eth-mainnet-rpc-url $ETH_RPC_URL --l2-rpc-url $L2_RPC_URL --hub-operator-fid 1 --disable-snapshot-sync --catchup-sync-with-snapshot false --network 1 --disable-console-status --admin-server-enabled > hub.log
snapchain1: cd ../snapchain-v0 && rm -rf nodes/1/.rocks && cargo run -- --config-path nodes/1/snapchain.toml > snapchain1.log
snapchain2: cd ../snapchain-v0 && rm -rf nodes/2/.rocks && cargo run -- --config-path nodes/2/snapchain.toml > snapchain2.log
snapchain3: cd ../snapchain-v0 && rm -rf nodes/3/.rocks && cargo run -- --config-path nodes/3/snapchain.toml > snapchain3.log
migrate: cd packages/shuttle && sleep 30 && yarn build && ONCHAIN_EVENTS_HUB_HOST=kassad.merkle.zone:2283 HUB_HOST=127.0.0.1:2283 HUB_ADMIN_HOST=127.0.0.1:2284 SNAPCHAIN_HOST=127.0.0.1:3383 yarn migrate:onchain-events > onchain_events.log&& sleep 30 && ONCHAIN_EVENTS_HUB_HOST=kassad.merkle.zone:2283 HUB_HOST=127.0.0.1:2283 HUB_ADMIN_HOST=127.0.0.1:2284 SNAPCHAIN_HOST=127.0.0.1:3383 yarn migrate:messages > messages.log && sleep 120 && ONCHAIN_EVENTS_HUB_HOST=kassad.merkle.zone:2283 HUB_HOST=127.0.0.1:2283 HUB_ADMIN_HOST=127.0.0.1:2284 SNAPCHAIN_HOST=127.0.0.1:3383 yarn migrate:validate > validate.log
3 changes: 3 additions & 0 deletions apps/hubble/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ app
"The block number to begin syncing events from L2 Farcaster contracts",
parseNumber,
)
.option("--l2-stop-block <number>", "The block number to stop syncing L2 events at", parseNumber)
.option(
"--l2-chunk-size <number>",
"The number of events to fetch from L2 Farcaster contracts at a time",
Expand Down Expand Up @@ -178,6 +179,7 @@ app
.option("--profile-sync", "Profile a full hub sync and exit. (default: disabled)")
.option("--rebuild-sync-trie", "Rebuild the sync trie before starting (default: disabled)")
.option("--resync-name-events", "Resync events from the Fname server before starting (default: disabled)")
.option("--stop-fname-transfer-id <number>", "Fname transfer id to stop at", parseNumber)
.option(
"--chunk-size <number>",
`The number of blocks to batch when syncing historical events from Farcaster contracts. (default: ${DEFAULT_CHUNK_SIZE})`,
Expand Down Expand Up @@ -519,6 +521,7 @@ app
l2KeyRegistryAddress: cliOptions.l2KeyRegistryAddress ?? hubConfig.l2KeyRegistryAddress,
l2StorageRegistryAddress: cliOptions.l2StorageRegistryAddress ?? hubConfig.l2StorageRegistryAddress,
l2FirstBlock: cliOptions.l2FirstBlock ?? hubConfig.l2FirstBlock,
l2StopBlock: cliOptions.l2StopBlock,
l2ChunkSize: cliOptions.l2ChunkSize ?? hubConfig.l2ChunkSize,
l2ChainId: cliOptions.l2ChainId ?? hubConfig.l2ChainId,
l2ResyncEvents: cliOptions.l2ResyncEvents ?? hubConfig.l2ResyncEvents ?? false,
Expand Down
12 changes: 11 additions & 1 deletion apps/hubble/src/eth/fnameRegistryEventsProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,23 @@ export class FNameRegistryEventsProvider {
private client: FNameRegistryClientInterface;
private hub: HubInterface;
private lastTransferId = 0;
private stopTransferId?: number;
private resyncEvents: boolean;
private pollTimeoutId: ReturnType<typeof setTimeout> | undefined;
private serverSignerAddress: Uint8Array;
private shouldStop = false;

constructor(fnameRegistryClient: FNameRegistryClientInterface, hub: HubInterface, resyncEvents = false) {
constructor(
fnameRegistryClient: FNameRegistryClientInterface,
hub: HubInterface,
resyncEvents = false,
stopTransferId?: number,
) {
this.client = fnameRegistryClient;
this.hub = hub;
this.resyncEvents = resyncEvents;
this.serverSignerAddress = new Uint8Array();
this.stopTransferId = stopTransferId;
}

public async start() {
Expand Down Expand Up @@ -161,6 +168,9 @@ export class FNameRegistryEventsProvider {

private async mergeTransfers(transfers: FNameTransfer[]) {
for (const transfer of transfers) {
if (this.stopTransferId && transfer.id > this.stopTransferId) {
break;
}
const serialized = Result.combine([
utf8StringToBytes(transfer.username),
hexStringToBytes(transfer.owner),
Expand Down
55 changes: 33 additions & 22 deletions apps/hubble/src/eth/l2EventsProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
private _publicClient: PublicClient<transport, chain>;

private _firstBlock: number;
private _stopBlock?: number;
private _chunkSize: number;
private _chainId: number;
private _rentExpiry: number;
Expand Down Expand Up @@ -151,10 +152,12 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
chainId: number,
resyncEvents: boolean,
expiryOverride?: number,
stopBlock?: number,
) {
this._hub = hub;
this._publicClient = publicClient;
this._firstBlock = firstBlock;
this._stopBlock = stopBlock;
this._chunkSize = chunkSize;
this._chainId = chainId;
this._resyncEvents = resyncEvents;
Expand Down Expand Up @@ -198,6 +201,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
chainId: number,
resyncEvents: boolean,
expiryOverride?: number,
stopBlock?: number,
): L2EventsProvider<chain> {
const l2RpcUrls = l2RpcUrl.split(",");
const transports = l2RpcUrls.map((url) =>
Expand Down Expand Up @@ -232,6 +236,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
chainId,
resyncEvents,
expiryOverride,
stopBlock,
);

return provider;
Expand All @@ -241,13 +246,13 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
return this._lastBlockNumber;
}

public async start(): HubAsyncResult<void> {
public async start(): HubAsyncResult<undefined> {
// Connect to L2 RPC

// Start the contract watchers first, so we cache events while we sync historical events
this._watchStorageContractEvents?.start();
this._watchIdRegistryV2ContractEvents?.start();
this._watchKeyRegistryV2ContractEvents?.start();
// this._watchStorageContractEvents?.start();
// this._watchIdRegistryV2ContractEvents?.start();
// this._watchKeyRegistryV2ContractEvents?.start();

const syncHistoryResult = await this.connectAndSyncHistoricalEvents();
if (syncHistoryResult.isErr()) {
Expand All @@ -256,7 +261,8 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
} else if (!this._isHistoricalSyncDone) {
throw new HubError("unavailable", "Historical sync failed to complete");
} else {
return ok(this._watchBlockNumber?.start());
return ok(undefined);
// return ok(this._watchBlockNumber?.start());
}
}

Expand Down Expand Up @@ -603,23 +609,6 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra

/** Connect to OP RPC and sync events. Returns the highest block that was synced */
private async connectAndSyncHistoricalEvents(): HubAsyncResult<number> {
const latestBlockResult = await ResultAsync.fromPromise(getBlockNumber(this._publicClient), (err) => err);
if (latestBlockResult.isErr()) {
diagnosticReporter().reportError(latestBlockResult.error as Error);
const msg = "failed to connect to optimism node. Check your eth RPC URL (e.g. --l2-rpc-url)";
log.error({ err: latestBlockResult.error }, msg);
return err(new HubError("unavailable.network_failure", msg));
}
const latestBlock = Number(latestBlockResult.value);

if (!latestBlock) {
const msg = "failed to get the latest block from the RPC provider";
log.error(msg);
return err(new HubError("unavailable.network_failure", msg));
}

log.info({ latestBlock: latestBlock }, "connected to optimism node");

// Find how how much we need to sync
let lastSyncedBlock = this._firstBlock;

Expand All @@ -635,6 +624,28 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
}

log.info({ lastSyncedBlock }, "last synced block");
let latestBlock = lastSyncedBlock;
if (this._stopBlock) {
latestBlock = this._stopBlock;
} else {
const latestBlockResult = await ResultAsync.fromPromise(getBlockNumber(this._publicClient), (err) => err);
if (latestBlockResult.isErr()) {
diagnosticReporter().reportError(latestBlockResult.error as Error);
const msg = "failed to connect to optimism node. Check your eth RPC URL (e.g. --l2-rpc-url)";
log.error({ err: latestBlockResult.error }, msg);
return err(new HubError("unavailable.network_failure", msg));
}
latestBlock = Number(latestBlockResult.value);

if (!latestBlock) {
const msg = "failed to get the latest block from the RPC provider";
log.error(msg);
return err(new HubError("unavailable.network_failure", msg));
}

log.info({ latestBlock: latestBlock }, "connected to optimism node");
}

const toBlock = latestBlock;

if (lastSyncedBlock < toBlock) {
Expand Down
43 changes: 25 additions & 18 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ export interface HubOptions {
/** Block number to begin syncing events from for L2 */
l2FirstBlock?: number;

l2StopBlock?: number;

/** Number of blocks to batch when syncing historical events for L2 */
l2ChunkSize?: number;

Expand All @@ -272,6 +274,9 @@ export interface HubOptions {
/** Resync fname events */
resyncNameEvents?: boolean;

/** Id of last fname transfer to ingest */
stopFnameTransferId?: number;

/** Name of the RocksDB instance */
rocksDBName?: string;

Expand Down Expand Up @@ -432,6 +437,7 @@ export class Hub implements HubInterface {
options.l2ChainId ?? OptimismConstants.ChainId,
options.l2ResyncEvents ?? false,
options.l2RentExpiryOverride,
options.l2StopBlock,
);
} else {
log.warn("No L2 RPC URL provided, unable to sync L2 contract events");
Expand All @@ -443,6 +449,7 @@ export class Hub implements HubInterface {
new FNameRegistryClient(options.fnameServerUrl),
this,
options.resyncNameEvents ?? false,
options.stopFnameTransferId,
);
} else {
log.warn("No FName Registry URL provided, unable to sync fname events");
Expand Down Expand Up @@ -808,34 +815,34 @@ export class Hub implements HubInterface {
: undefined;

// Start the Gossip node
await this.gossipNode.start(this.bootstrapAddrs(), {
peerId,
ipMultiAddr: this.options.ipMultiAddr,
announceIp: this.options.announceIp,
gossipPort: this.options.gossipPort,
allowedPeerIdStrs: this.allowedPeerIds,
deniedPeerIdStrs: this.deniedPeerIds,
directPeers: this.options.directPeers,
allowlistedImmunePeers: this.options.allowlistedImmunePeers,
applicationScoreCap: this.options.applicationScoreCap,
strictNoSign: this.strictNoSign,
connectToDbPeers: this.options.connectToDbPeers,
statsdParams: this.options.statsdParams,
});
// await this.gossipNode.start(this.bootstrapAddrs(), {
// peerId,
// ipMultiAddr: this.options.ipMultiAddr,
// announceIp: this.options.announceIp,
// gossipPort: this.options.gossipPort,
// allowedPeerIdStrs: this.allowedPeerIds,
// deniedPeerIdStrs: this.deniedPeerIds,
// directPeers: this.options.directPeers,
// allowlistedImmunePeers: this.options.allowlistedImmunePeers,
// applicationScoreCap: this.options.applicationScoreCap,
// strictNoSign: this.strictNoSign,
// connectToDbPeers: this.options.connectToDbPeers,
// statsdParams: this.options.statsdParams,
// });

await this.registerEventHandlers();

// Start cron tasks
this.pruneMessagesJobScheduler.start(this.options.pruneMessagesJobCron);
this.periodSyncJobScheduler.start();
// this.periodSyncJobScheduler.start();
this.pruneEventsJobScheduler.start(this.options.pruneEventsJobCron);
this.checkFarcasterVersionJobScheduler.start();
this.validateOrRevokeMessagesJobScheduler.start();
// this.validateOrRevokeMessagesJobScheduler.start();

const randomMinute = Math.floor(Math.random() * 15);
this.gossipContactInfoJobScheduler.start(`${randomMinute}-59/15 * * * *`); // Weird syntax but required by cron, random minute every 15 minutes
// this.gossipContactInfoJobScheduler.start(`${randomMinute}-59/15 * * * *`); // Weird syntax but required by cron, random minute every 15 minutes
this.checkIncomingPortsJobScheduler.start();
this.measureSyncHealthJobScheduler.start();
// this.measureSyncHealthJobScheduler.start();

// Mainnet only jobs
if (this.options.network === FarcasterNetwork.MAINNET) {
Expand Down
47 changes: 46 additions & 1 deletion apps/hubble/src/rpc/adminServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export default class AdminServer {
submitOnChainEvent: async (call, callback) => {
const authResult = await authenticateUser(call.metadata, this.rpcUsers);
if (authResult.isErr()) {
logger.warn({ errMsg: authResult.error.message }, "submitOnChainEvent failed");
log.warn({ errMsg: authResult.error.message }, "submitOnChainEvent failed");
callback(
toServiceError(new HubError("unauthenticated", `gRPC authentication failed: ${authResult.error.message}`)),
);
Expand All @@ -161,6 +161,7 @@ export default class AdminServer {

const onChainEvent = call.request;
const result = await this.hub?.submitOnChainEvent(onChainEvent, "rpc");
log.info({ result, fid: onChainEvent.fid, type: onChainEvent.type }, "submitOnChainEvent complete");
result?.match(
() => {
callback(null, onChainEvent);
Expand All @@ -170,6 +171,50 @@ export default class AdminServer {
},
);
},
submitUserNameProof: async (call, callback) => {
const authResult = await authenticateUser(call.metadata, this.rpcUsers);
if (authResult.isErr()) {
log.warn({ errMsg: authResult.error.message }, "submitUserNameProof failed");
callback(
toServiceError(new HubError("unauthenticated", `gRPC authentication failed: ${authResult.error.message}`)),
);
return;
}

const usernameProof = call.request;
const result = await this.hub?.submitUserNameProof(usernameProof, "rpc");
log.info({ result, fid: usernameProof.fid }, "submitUserNameProof complete");
result?.match(
() => {
callback(null, usernameProof);
},
(err: HubError) => {
callback(toServiceError(err));
},
);
},
pruneMessages: async (call, callback) => {
const authResult = await authenticateUser(call.metadata, this.rpcUsers);
if (authResult.isErr()) {
log.warn({ errMsg: authResult.error.message }, "pruneMessages failed");
callback(
toServiceError(new HubError("unauthenticated", `gRPC authentication failed: ${authResult.error.message}`)),
);
return;
}

const fid = call.request.fid;
const result = await this.hub?.engine.pruneMessages(fid);
log.info({ result, fid }, "pruneMessages complete");
result?.match(
(numMessagesPruned) => {
callback(null, { numMessagesPruned });
},
(err: HubError) => {
callback(toServiceError(err));
},
);
},
};
};
}
40 changes: 20 additions & 20 deletions apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -933,18 +933,18 @@ export default class Server {
getOnChainEvents: async (call, callback) => this.getOnChainEventsRPC(call, callback),
submitMessage: async (call, callback) => {
// Identify peer that is calling, if available. This is used for rate limiting.
const peer = Result.fromThrowable(
() => call.getPeer(),
(e) => e,
)().unwrapOr("unavailable");
// const peer = Result.fromThrowable(
// () => call.getPeer(),
// (e) => e,
// )().unwrapOr("unavailable");
statsd().increment("rpc.open_request_count", { method: "submitMessage" });

const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter);
if (rateLimitResult.isErr()) {
logger.warn({ peer }, "submitMessage rate limited");
callback(toServiceError(new HubError("unavailable", "API rate limit exceeded")));
return;
}
// const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter);
// if (rateLimitResult.isErr()) {
// logger.warn({ peer }, "submitMessage rate limited");
// callback(toServiceError(new HubError("unavailable", "API rate limit exceeded")));
// return;
// }

// Authentication
const authResult = authenticateUser(call.metadata, this.rpcUsers);
Expand Down Expand Up @@ -972,19 +972,19 @@ export default class Server {
},
submitBulkMessages: async (call, callback) => {
// Identify peer that is calling, if available. This is used for rate limiting.
const peer = Result.fromThrowable(
() => call.getPeer(),
(e) => e,
)().unwrapOr("unavailable");
// const peer = Result.fromThrowable(
// () => call.getPeer(),
// (e) => e,
// )().unwrapOr("unavailable");
statsd().increment("rpc.open_request_count", { method: "submitBulkMessages" });

// Check for rate limits
const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter);
if (rateLimitResult.isErr()) {
logger.warn({ peer }, "submitBulkMessages rate limited");
callback(toServiceError(new HubError("unavailable", "API rate limit exceeded")));
return;
}
// const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter);
// if (rateLimitResult.isErr()) {
// logger.warn({ peer }, "submitBulkMessages rate limited");
// callback(toServiceError(new HubError("unavailable", "API rate limit exceeded")));
// return;
// }

// Authentication
const authResult = authenticateUser(call.metadata, this.rpcUsers);
Expand Down
Loading
Loading