Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
aditiharini committed Dec 12, 2024
1 parent be8a569 commit 1c82516
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 26 deletions.
42 changes: 21 additions & 21 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ export class Hub implements HubInterface {
await this.engine.start();

// Start the sync engine
await this.syncEngine.start(this.options.rebuildSyncTrie ?? false);
// await this.syncEngine.start(this.options.rebuildSyncTrie ?? false);

// Start the RPC server
await this.rpcServer.start(this.options.rpcServerHost, this.options.rpcPort ?? 0);
Expand Down Expand Up @@ -799,42 +799,42 @@ export class Hub implements HubInterface {
await this.adminServer.start(this.options.adminServerHost ?? "127.0.0.1");
}

await this.l2RegistryProvider.start();
await this.fNameRegistryEventsProvider.start();
// await this.l2RegistryProvider.start();
// await this.fNameRegistryEventsProvider.start();

const peerId = this.options.peerId
? exportToProtobuf(this.options.peerId as RSAPeerId | Ed25519PeerId | Secp256k1PeerId)
: undefined;

// Start the Gossip node
await this.gossipNode.start(this.bootstrapAddrs(), {
peerId,
ipMultiAddr: this.options.ipMultiAddr,
announceIp: this.options.announceIp,
gossipPort: this.options.gossipPort,
allowedPeerIdStrs: this.allowedPeerIds,
deniedPeerIdStrs: this.deniedPeerIds,
directPeers: this.options.directPeers,
allowlistedImmunePeers: this.options.allowlistedImmunePeers,
applicationScoreCap: this.options.applicationScoreCap,
strictNoSign: this.strictNoSign,
connectToDbPeers: this.options.connectToDbPeers,
statsdParams: this.options.statsdParams,
});
// await this.gossipNode.start(this.bootstrapAddrs(), {
// peerId,
// ipMultiAddr: this.options.ipMultiAddr,
// announceIp: this.options.announceIp,
// gossipPort: this.options.gossipPort,
// allowedPeerIdStrs: this.allowedPeerIds,
// deniedPeerIdStrs: this.deniedPeerIds,
// directPeers: this.options.directPeers,
// allowlistedImmunePeers: this.options.allowlistedImmunePeers,
// applicationScoreCap: this.options.applicationScoreCap,
// strictNoSign: this.strictNoSign,
// connectToDbPeers: this.options.connectToDbPeers,
// statsdParams: this.options.statsdParams,
// });

await this.registerEventHandlers();

// Start cron tasks
this.pruneMessagesJobScheduler.start(this.options.pruneMessagesJobCron);
this.periodSyncJobScheduler.start();
// this.periodSyncJobScheduler.start();
this.pruneEventsJobScheduler.start(this.options.pruneEventsJobCron);
this.checkFarcasterVersionJobScheduler.start();
this.validateOrRevokeMessagesJobScheduler.start();
// this.validateOrRevokeMessagesJobScheduler.start();

const randomMinute = Math.floor(Math.random() * 15);
this.gossipContactInfoJobScheduler.start(`${randomMinute}-59/15 * * * *`); // Weird syntax but required by cron, random minute every 15 minutes
// this.gossipContactInfoJobScheduler.start(`${randomMinute}-59/15 * * * *`); // Weird syntax but required by cron, random minute every 15 minutes
this.checkIncomingPortsJobScheduler.start();
this.measureSyncHealthJobScheduler.start();
// this.measureSyncHealthJobScheduler.start();

// Mainnet only jobs
if (this.options.network === FarcasterNetwork.MAINNET) {
Expand Down
3 changes: 2 additions & 1 deletion apps/hubble/src/rpc/adminServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export default class AdminServer {
submitOnChainEvent: async (call, callback) => {
const authResult = await authenticateUser(call.metadata, this.rpcUsers);
if (authResult.isErr()) {
logger.warn({ errMsg: authResult.error.message }, "submitOnChainEvent failed");
log.warn({ errMsg: authResult.error.message }, "submitOnChainEvent failed");
callback(
toServiceError(new HubError("unauthenticated", `gRPC authentication failed: ${authResult.error.message}`)),
);
Expand All @@ -161,6 +161,7 @@ export default class AdminServer {

const onChainEvent = call.request;
const result = await this.hub?.submitOnChainEvent(onChainEvent, "rpc");
log.info({ result, fid: onChainEvent.fid, type: onChainEvent.type }, "submitOnChainEvent complete");
result?.match(
() => {
callback(null, onChainEvent);
Expand Down
18 changes: 14 additions & 4 deletions packages/shuttle/src/shuttle/migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export class Migration {
return err(result.error);
}

let numOnchainEvents = 0;
for (const onChainEvent of result.value.events) {
const snapchainResult = await this.snapchainAdminClient.submitOnChainEvent(onChainEvent);
if (snapchainResult.isErr()) {
Expand All @@ -97,9 +98,11 @@ export class Migration {
log.info(`Unable to submit onchain event to hub ${hubResult.error.message} ${hubResult.error.stack}`);
}

log.info(`Process onchain event of type ${onChainEvent.type} for fid ${fid}`);
numOnchainEvents += 1;
}

log.info(`Processed ${numOnchainEvents} onchain events`);

return ok(undefined);
}

Expand Down Expand Up @@ -131,8 +134,10 @@ export class Migration {
}

async ingestMessagesFromDb(fids: number[]) {
let numMessages = 0;
for (const fid of fids) {
let numMessages = 0;
let numErrorsOnHub = 0;
let numErrorsOnSnapchain = 0;
const messageTypes = [
MessageType.CAST_ADD,
MessageType.CAST_REMOVE,
Expand Down Expand Up @@ -178,24 +183,29 @@ export class Migration {
newMessage.data = undefined;

const snapchainResult = await this.snapchainClient.submitMessage(newMessage);
const hubResult = await this.hubClient.submitMessage(newMessage);

if (snapchainResult.isErr()) {
log.info(
`Unable to submit message to snapchain ${snapchainResult.error.message} ${snapchainResult.error.stack}`,
);
numErrorsOnSnapchain += 1;
}

const hubResult = await this.hubClient.submitMessage(newMessage);
if (hubResult.isErr()) {
log.info(`Unable to submit message to hub ${hubResult.error.message} ${hubResult.error.stack}`);
numErrorsOnHub += 1;
}
numMessages += 1;
}

pageNumber += 1;
}
}
log.info(
`Submitted ${numMessages} messages for fid ${fid}. ${numErrorsOnHub} hub errors. ${numErrorsOnSnapchain} snapchain errors.`,
);
}
log.info(`Submitted ${numMessages} messages`);
}
}

Expand Down

0 comments on commit 1c82516

Please sign in to comment.