Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
adityapk00 committed Feb 5, 2024
1 parent 8204ca9 commit 2563e78
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 59 deletions.
96 changes: 46 additions & 50 deletions apps/hubble/src/network/sync/multiPeerSyncEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
MessageData,
} from "@farcaster/hub-nodejs";
import { APP_NICKNAME, APP_VERSION, HubInterface } from "../../hubble.js";
import SyncEngine from "./syncEngine.js";
import SyncEngine, { QUICK_SYNC_TS_CUTOFF } from "./syncEngine.js";
import { SyncId } from "./syncId.js";
import Server from "../../rpc/server.js";
import { jestRocksDB } from "../../storage/db/jestUtils.js";
Expand Down Expand Up @@ -279,63 +279,59 @@ describe("Multi peer sync engine", () => {
TEST_TIMEOUT_LONG,
);

test(
"sync should respect 2-week cutoff",
async () => {
// Add signer custody event to engine 1
await expect(engine1.mergeOnChainEvent(custodyEvent)).resolves.toBeDefined();
await expect(engine1.mergeOnChainEvent(signerEvent)).resolves.toBeDefined();
await expect(engine1.mergeOnChainEvent(storageEvent)).resolves.toBeDefined();
await expect(engine1.mergeUserNameProof(fname)).resolves.toBeDefined();
test("sync should respect 2-week cutoff", async () => {
// Add signer custody event to engine 1
await expect(engine1.mergeOnChainEvent(custodyEvent)).resolves.toBeDefined();
await expect(engine1.mergeOnChainEvent(signerEvent)).resolves.toBeDefined();
await expect(engine1.mergeOnChainEvent(storageEvent)).resolves.toBeDefined();
await expect(engine1.mergeUserNameProof(fname)).resolves.toBeDefined();

// Sync engine 2 with engine 1, this should get all the onchain events and fnames
await syncEngine2.performSync("engine1", (await syncEngine1.getSnapshot())._unsafeUnwrap(), clientForServer1);
await sleepWhile(() => syncEngine2.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT);
// Sync engine 2 with engine 1, this should get all the onchain events and fnames
await syncEngine2.performSync("engine1", (await syncEngine1.getSnapshot())._unsafeUnwrap(), clientForServer1);
await sleepWhile(() => syncEngine2.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT);

// Make sure root hash matches
expect(await syncEngine1.trie.rootHash()).toEqual(await syncEngine2.trie.rootHash());
// Add messages to engine 1. The first message is 4 weeks old, the second is new.
const nowFsTime = getFarcasterTime()._unsafeUnwrap();
const oldFsTime = nowFsTime - 60 * 60 * 24 * 7 * 4;
// Make sure root hash matches
expect(await syncEngine1.trie.rootHash()).toEqual(await syncEngine2.trie.rootHash());
// Add messages to engine 1. The first message is > 2 weeks old, the second is new.
const nowFsTime = getFarcasterTime()._unsafeUnwrap();
const oldFsTime = nowFsTime - QUICK_SYNC_TS_CUTOFF - 1;

const oldCast = await Factories.CastAddMessage.create(
{ data: { fid, network, timestamp: oldFsTime } },
{ transient: { signer } },
);
const newCast = await Factories.CastAddMessage.create(
{ data: { fid, network, timestamp: nowFsTime } },
{ transient: { signer } },
);
const oldCast = await Factories.CastAddMessage.create(
{ data: { fid, network, timestamp: oldFsTime } },
{ transient: { signer } },
);
const newCast = await Factories.CastAddMessage.create(
{ data: { fid, network, timestamp: nowFsTime } },
{ transient: { signer } },
);

// Merge the casts in
let result = await engine1.mergeMessage(oldCast);
expect(result.isOk()).toBeTruthy();
result = await engine1.mergeMessage(newCast);
expect(result.isOk()).toBeTruthy();
// Merge the casts in
let result = await engine1.mergeMessage(oldCast);
expect(result.isOk()).toBeTruthy();
result = await engine1.mergeMessage(newCast);
expect(result.isOk()).toBeTruthy();

await sleepWhile(() => syncEngine1.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT);
await sleepWhile(() => syncEngine1.syncTrieQSize > 0, SLEEPWHILE_TIMEOUT);

// Engine 2 should sync with engine1 (inlcuding onchain events and fnames)
expect(
(await syncEngine2.syncStatus("engine2", (await syncEngine1.getSnapshot())._unsafeUnwrap()))._unsafeUnwrap()
.shouldSync,
).toBeTruthy();
// Engine 2 should sync with engine1 (inlcuding onchain events and fnames)
expect(
(await syncEngine2.syncStatus("engine2", (await syncEngine1.getSnapshot())._unsafeUnwrap()))._unsafeUnwrap()
.shouldSync,
).toBeTruthy();

// Sync engine 2 with engine 1
await syncEngine2.performSync(
"engine1",
(await syncEngine1.getSnapshot())._unsafeUnwrap(),
clientForServer1,
false,
nowFsTime - 1,
);
// Sync engine 2 with engine 1
await syncEngine2.performSync(
"engine1",
(await syncEngine1.getSnapshot())._unsafeUnwrap(),
clientForServer1,
false,
nowFsTime - 1,
);

// Expect the new cast to be in the sync trie, but not the old one
expect(await syncEngine2.trie.exists(SyncId.fromMessage(newCast))).toBeTruthy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(oldCast))).toBeFalsy();
},
30 * 60 * 1000,
);
// Expect the new cast to be in the sync trie, but not the old one
expect(await syncEngine2.trie.exists(SyncId.fromMessage(newCast))).toBeTruthy();
expect(await syncEngine2.trie.exists(SyncId.fromMessage(oldCast))).toBeFalsy();
});

test("cast remove should remove from trie", async () => {
// Add signer custody event to engine 1
Expand Down
20 changes: 11 additions & 9 deletions apps/hubble/src/network/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ const SYNC_INTERRUPT_TIMEOUT = 30 * 1000; // 30 seconds

// A quick sync will only sync messages that are newer than 2 weeks.
const QUICK_SYNC_PROBABILITY = 0.7; // 70% of the time, we'll do a quick sync
const QUICK_SYNC_TS_CUTOFF = 2 * 7 * 24 * 60 * 60; // 2 weeks, in seconds
export const QUICK_SYNC_TS_CUTOFF = 2 * 7 * 24 * 60 * 60; // 2 weeks, in seconds

const COMPACTION_THRESHOLD = 100_000; // Sync
const BAD_PEER_BLOCK_TIMEOUT = 5 * 60 * 60 * 1000; // 5 hours, arbitrary, may need to be adjusted as network grows
Expand Down Expand Up @@ -1085,7 +1085,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
prefix: Uint8Array,
rpcClient: HubRpcClient,
onMissingHashes: (missingHashes: Uint8Array[]) => Promise<void>,
tsCutoff = -1,
quickSyncTsCutoff = -1,
): Promise<number> {
// Check if we should interrupt the sync
if (this._currentSyncStatus.interruptSync) {
Expand All @@ -1094,11 +1094,13 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
}

// If the prefix is before the cutoff, then we won't recurse into this node.
if (tsCutoff >= 0) {
if (quickSyncTsCutoff >= 0) {
// We won't recurse into nodes if they are older than the cutoff. To do this, we'll compare the prefix
// with a max timestamp prefix. If the prefix is older than the cutoff, we'll skip it.
const tsPrefix = Buffer.from(prefix.slice(0, 10)).toString("ascii");
const maxTs = parseInt(tsPrefix.padEnd(TIMESTAMP_LENGTH, "9"), 10);
if (maxTs < tsCutoff) {
log.debug({ prefix, maxTs, tsCutoff }, "Skipping prefix before cutoff");
const maxTs = parseInt(tsPrefix.padEnd(TIMESTAMP_LENGTH, "9"), 10); // pad with 9s to get the max timestamp
if (maxTs < quickSyncTsCutoff) {
log.debug({ prefix, maxTs, tsCutoff: quickSyncTsCutoff }, "Skipping prefix before cutoff");
return 0;
}
}
Expand Down Expand Up @@ -1130,7 +1132,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
ourNode,
rpcClient,
onMissingHashes,
tsCutoff,
quickSyncTsCutoff,
);
return 1;
}
Expand All @@ -1141,7 +1143,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
ourNode: NodeMetadata | undefined,
rpcClient: HubRpcClient,
onMissingHashes: (missingHashes: Uint8Array[]) => Promise<void>,
tsCutoff: number,
quickSyncTsCutoff: number,
): Promise<void> {
if (this._currentSyncStatus.interruptSync) {
log.info("Interrupting sync");
Expand Down Expand Up @@ -1211,7 +1213,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
for (const [theirChildChar, theirChild] of reversedEntries) {
// recursively fetch hashes for every node where the hashes don't match
if (ourNode?.children?.get(theirChildChar)?.hash !== theirChild.hash) {
const r = this.compareNodeAtPrefix(theirChild.prefix, rpcClient, onMissingHashes, tsCutoff);
const r = this.compareNodeAtPrefix(theirChild.prefix, rpcClient, onMissingHashes, quickSyncTsCutoff);
numChildrenFetched += 1;

// If we're fetching more than HASHES_PER_FETCH, we'll wait for the first batch to finish before starting
Expand Down

0 comments on commit 2563e78

Please sign in to comment.