Skip to content

Commit

Permalink
feat: Implement storage extension FIP (#2256)
Browse files Browse the repository at this point in the history
## Why is this change needed?

Implements farcasterxyz/protocol#191

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
The focus of this PR is to implement the Storage Extension FIP. 

### Detailed summary
- Added `StorageUnitDetails` message in `request_response.proto`
- Updated storage-related functions in various Rust files
- Modified default limits in different store types
- Introduced new date calculations in `factories.ts`

> The following files were skipped due to too many changes:
`apps/hubble/src/storage/stores/storeEventHandler.test.ts`,
`packages/core/src/limits.ts`,
`apps/hubble/src/storage/stores/storageCache.test.ts`,
`apps/hubble/src/storage/stores/storeEventHandler.ts`,
`apps/hubble/src/rpc/test/server.test.ts`,
`packages/core/src/limits.test.ts`,
`apps/hubble/src/storage/engine/index.ts`,
`packages/hub-web/src/generated/request_response.ts`,
`packages/hub-nodejs/src/generated/request_response.ts`,
`packages/core/src/protobufs/generated/request_response.ts`,
`apps/hubble/src/storage/stores/storageCache.ts`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
sanjayprabhu authored Aug 16, 2024
1 parent 3fbfb6e commit dd634c7
Show file tree
Hide file tree
Showing 25 changed files with 863 additions and 187 deletions.
8 changes: 8 additions & 0 deletions .changeset/smooth-crews-burn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@farcaster/hub-nodejs": minor
"@farcaster/hub-web": minor
"@farcaster/core": minor
"@farcaster/hubble": patch
---

feat: Implement Storage Extension FIP
10 changes: 7 additions & 3 deletions apps/hubble/src/addon/src/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,19 +955,23 @@ impl Store {
&self,
fid: u32,
cached_count: u64,
units: u64,
max_count: u64,
) -> Result<Vec<HubEvent>, HubError> {
let mut pruned_events = vec![];

let mut count = cached_count;
let prune_size_limit = self.store_def.get_prune_size_limit();
let max_message_count = if self.store_def.get_prune_size_limit() > 0 {
self.store_def.get_prune_size_limit() as u64
} else {
max_count
};

let mut txn = self.db.txn();

let prefix = &make_message_primary_key(fid, self.store_def.postfix(), None);
self.db
.for_each_iterator_by_prefix(prefix, &PageOptions::default(), |_key, value| {
if count <= (prune_size_limit as u64) * units {
if count <= max_message_count {
return Ok(true); // Stop the iteration, nothing left to prune
}

Expand Down
26 changes: 18 additions & 8 deletions apps/hubble/src/rpc/test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
StoreType,
getDefaultStoreLimit,
HubError,
StorageUnitType,
} from "@farcaster/hub-nodejs";
import Engine from "../../storage/engine/index.js";
import { MockHub } from "../../test/mocks.js";
Expand All @@ -24,6 +25,7 @@ import { Err, Ok } from "neverthrow";
import { sleep } from "../../utils/crypto.js";
import * as http from "http";
import { AddressInfo, createServer } from "net";
import { LEGACY_STORAGE_UNIT_CUTOFF_TIMESTAMP } from "../../storage/stores/storageCache.js";

const db = jestRocksDB("protobufs.rpc.server.test");
const network = FarcasterNetwork.TESTNET;
Expand Down Expand Up @@ -63,7 +65,10 @@ beforeEach(async () => {
const custodySignerKey = (await custodySigner.getSignerKey())._unsafeUnwrap();
custodyEvent = Factories.IdRegistryOnChainEvent.build({ fid }, { transient: { to: custodySignerKey } });
signerEvent = Factories.SignerOnChainEvent.build({ fid }, { transient: { signer: signerKey } });
storageEvent = Factories.StorageRentOnChainEvent.build({ fid }, { transient: { units: 1 } });
storageEvent = Factories.StorageRentOnChainEvent.build(
{ fid, blockTimestamp: LEGACY_STORAGE_UNIT_CUTOFF_TIMESTAMP - 1 },
{ transient: { units: 1 } },
);
await engine.mergeOnChainEvent(custodyEvent);
await engine.mergeOnChainEvent(signerEvent);
await engine.mergeOnChainEvent(storageEvent);
Expand Down Expand Up @@ -141,7 +146,7 @@ describe("server rpc tests", () => {
const storageLimits = limitsResponse.limits;
expect(storageLimits).toContainEqual(
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.CASTS),
limit: getDefaultStoreLimit(StoreType.CASTS, StorageUnitType.UNIT_TYPE_LEGACY),
storeType: StoreType.CASTS,
name: "CASTS",
used: 2,
Expand All @@ -151,7 +156,7 @@ describe("server rpc tests", () => {
);
expect(storageLimits).toContainEqual(
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.REACTIONS),
limit: getDefaultStoreLimit(StoreType.REACTIONS, StorageUnitType.UNIT_TYPE_LEGACY),
storeType: StoreType.REACTIONS,
name: "REACTIONS",
used: 0,
Expand All @@ -161,7 +166,7 @@ describe("server rpc tests", () => {
);
expect(storageLimits).toContainEqual(
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.LINKS),
limit: getDefaultStoreLimit(StoreType.LINKS, StorageUnitType.UNIT_TYPE_LEGACY),
storeType: StoreType.LINKS,
name: "LINKS",
used: 0,
Expand All @@ -171,7 +176,7 @@ describe("server rpc tests", () => {
);
expect(storageLimits).toContainEqual(
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.USER_DATA),
limit: getDefaultStoreLimit(StoreType.USER_DATA, StorageUnitType.UNIT_TYPE_LEGACY),
storeType: StoreType.USER_DATA,
name: "USER_DATA",
used: 0,
Expand All @@ -181,7 +186,7 @@ describe("server rpc tests", () => {
);
expect(storageLimits).toContainEqual(
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.VERIFICATIONS),
limit: getDefaultStoreLimit(StoreType.VERIFICATIONS, StorageUnitType.UNIT_TYPE_LEGACY),
storeType: StoreType.VERIFICATIONS,
name: "VERIFICATIONS",
used: 1,
Expand All @@ -191,7 +196,7 @@ describe("server rpc tests", () => {
);
expect(storageLimits).toContainEqual(
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.USERNAME_PROOFS),
limit: getDefaultStoreLimit(StoreType.USERNAME_PROOFS, StorageUnitType.UNIT_TYPE_LEGACY),
storeType: StoreType.USERNAME_PROOFS,
name: "USERNAME_PROOFS",
used: 0,
Expand All @@ -203,6 +208,7 @@ describe("server rpc tests", () => {
// add 2 more units
const rentEvent2 = Factories.StorageRentOnChainEvent.build({
fid,
blockTimestamp: LEGACY_STORAGE_UNIT_CUTOFF_TIMESTAMP + 2,
storageRentEventBody: Factories.StorageRentEventBody.build({ units: 2 }),
});
await engine.mergeOnChainEvent(rentEvent2);
Expand All @@ -212,7 +218,11 @@ describe("server rpc tests", () => {
const newLimits = limitsResponse2.limits;
expect(newLimits.length).toEqual(6);
for (const limit of newLimits) {
expect(limit.limit).toEqual(getDefaultStoreLimit(limit.storeType) * 3);
// 1 unit of legacy storage from the first event, 2 units of 2024 storage from the second event
const expectedLimit =
getDefaultStoreLimit(limit.storeType, StorageUnitType.UNIT_TYPE_LEGACY) +
getDefaultStoreLimit(limit.storeType, StorageUnitType.UNIT_TYPE_2024) * 2;
expect(limit.limit).toEqual(expectedLimit);
}
});
});
Expand Down
4 changes: 2 additions & 2 deletions apps/hubble/src/rustfunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,9 @@ export const rsPruneMessages = async (
store: RustDynStore,
fid: number,
cachedCount: number,
units: number,
maxCount: number,
): Promise<Buffer> => {
return await lib.pruneMessages.call(store, fid, cachedCount, units);
return await lib.pruneMessages.call(store, fid, cachedCount, maxCount);
};

export const rsGetAllMessagesByFid = async (
Expand Down
50 changes: 29 additions & 21 deletions apps/hubble/src/storage/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
SignerOnChainEvent,
StorageLimit,
StorageLimitsResponse,
StorageUnitType,
StoreType,
UserDataAddMessage,
UserDataType,
Expand All @@ -51,9 +52,9 @@ import {
import { err, ok, ResultAsync } from "neverthrow";
import fs from "fs";
import { Worker } from "worker_threads";
import { forEachMessageBySigner, makeUserKey, messageDecode, typeToSetPostfix } from "../db/message.js";
import { forEachMessageBySigner, typeToSetPostfix } from "../db/message.js";
import RocksDB from "../db/rocksdb.js";
import { UserMessagePostfixMax, UserPostfix } from "../db/types.js";
import { UserPostfix } from "../db/types.js";
import CastStore from "../stores/castStore.js";
import LinkStore from "../stores/linkStore.js";
import ReactionStore from "../stores/reactionStore.js";
Expand Down Expand Up @@ -172,15 +173,14 @@ class Engine extends TypedEmitter<EngineEvents> {
this._onchainEventsStore = new OnChainEventStore(db, this.eventHandler);
this._usernameProofStore = new UsernameProofStore(db, this.eventHandler);

// Calculate total storage available per unit of store. Note that OnChainEventStore
// is not included in this calculation because it is not pruned.
// Total set size for all stores. We should probably reduce this to just the size of the cast store.
this._totalPruneSize =
this._linkStore.pruneSizeLimit +
this._reactionStore.pruneSizeLimit +
this._castStore.pruneSizeLimit +
this._userDataStore.pruneSizeLimit +
this._verificationStore.pruneSizeLimit +
this._usernameProofStore.pruneSizeLimit;
getDefaultStoreLimit(StoreType.CASTS, StorageUnitType.UNIT_TYPE_LEGACY) +
getDefaultStoreLimit(StoreType.LINKS, StorageUnitType.UNIT_TYPE_LEGACY) +
getDefaultStoreLimit(StoreType.REACTIONS, StorageUnitType.UNIT_TYPE_LEGACY) +
getDefaultStoreLimit(StoreType.USER_DATA, StorageUnitType.UNIT_TYPE_LEGACY) +
getDefaultStoreLimit(StoreType.USERNAME_PROOFS, StorageUnitType.UNIT_TYPE_LEGACY) +
getDefaultStoreLimit(StoreType.VERIFICATIONS, StorageUnitType.UNIT_TYPE_LEGACY);

log.info({ totalPruneSize: this._totalPruneSize }, "total default storage limit size");

Expand Down Expand Up @@ -292,20 +292,22 @@ class Engine extends TypedEmitter<EngineEvents> {

// Extract the FID that this message was signed by
const fid = message.data?.fid ?? 0;
const storageUnits = await this.eventHandler.getCurrentStorageUnitsForFid(fid);
const storageSlot = await this.eventHandler.getCurrentStorageSlotForFid(fid);

if (storageUnits.isErr()) {
mergeResults.set(i, err(storageUnits.error));
if (storageSlot.isErr()) {
mergeResults.set(i, err(storageSlot.error));
return;
}

if (storageUnits.value === 0) {
const totalUnits = storageSlot.value.legacy_units + storageSlot.value.units;

if (totalUnits === 0) {
mergeResults.set(i, err(new HubError("bad_request.prunable", "no storage")));
return;
}

// We rate limit the number of messages that can be merged per FID
const limiter = getRateLimiterForTotalMessages(storageUnits.value * this._totalPruneSize);
const limiter = getRateLimiterForTotalMessages(totalUnits * this._totalPruneSize);
const isRateLimited = await isRateLimitedByKey(`${fid}`, limiter);
if (isRateLimited) {
log.warn({ fid }, "rate limit exceeded for FID");
Expand Down Expand Up @@ -861,13 +863,17 @@ class Engine extends TypedEmitter<EngineEvents> {
return err(validatedFid.error);
}

const units = await this.eventHandler.getCurrentStorageUnitsForFid(fid);
const slot = await this.eventHandler.getCurrentStorageSlotForFid(fid);

if (units.isErr()) {
return err(units.error);
if (slot.isErr()) {
return err(slot.error);
}

const storeLimits = getStoreLimits(units.value);
const unitDetails = [
{ unitType: StorageUnitType.UNIT_TYPE_LEGACY, unitSize: slot.value.legacy_units },
{ unitType: StorageUnitType.UNIT_TYPE_2024, unitSize: slot.value.units },
];
const storeLimits = getStoreLimits(unitDetails);
const limits: StorageLimit[] = [];
for (const limit of storeLimits) {
const usageResult = await this.eventHandler.getUsage(fid, limit.storeType);
Expand All @@ -887,8 +893,9 @@ class Engine extends TypedEmitter<EngineEvents> {
);
}
return ok({
units: units.value,
units: slot.value.units + slot.value.legacy_units,
limits: limits,
unitDetails: unitDetails,
});
}

Expand Down Expand Up @@ -1193,7 +1200,8 @@ class Engine extends TypedEmitter<EngineEvents> {
// LinkCompactStateMessages can't be more than 100 storage units
if (
isLinkCompactStateMessage(message) &&
message.data.linkCompactStateBody.targetFids.length > getDefaultStoreLimit(StoreType.LINKS) * 100
message.data.linkCompactStateBody.targetFids.length >
getDefaultStoreLimit(StoreType.LINKS, StorageUnitType.UNIT_TYPE_LEGACY) * 100
) {
return err(
new HubError("bad_request.validation_failure", "LinkCompactStateMessage is too big. Limit = 100 storage units"),
Expand Down
5 changes: 3 additions & 2 deletions apps/hubble/src/storage/jobs/pruneMessagesJob.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
Message,
MessageType,
PruneMessageHubEvent,
StorageUnitType,
StoreType,
} from "@farcaster/hub-nodejs";
import { jestRocksDB } from "../db/jestUtils.js";
Expand All @@ -32,7 +33,7 @@ const seedMessagesFromTimestamp = async (engine: Engine, fid: number, signer: Ed
);
const linkAdd = await Factories.LinkAddMessage.create({ data: { fid, timestamp } }, { transient: { signer } });
const proofs = await Factories.VerificationAddEthAddressMessage.createList(
getDefaultStoreLimit(StoreType.VERIFICATIONS) + 1,
getDefaultStoreLimit(StoreType.VERIFICATIONS, StorageUnitType.UNIT_TYPE_2024) + 1,
{ data: { fid, timestamp } },
{ transient: { signer } },
);
Expand Down Expand Up @@ -109,7 +110,7 @@ describe("doJobs", () => {

const verifications = await engine.getVerificationsByFid(fid);
expect(verifications._unsafeUnwrap().messages.length).toEqual(
getDefaultStoreLimit(StoreType.VERIFICATIONS) + 1,
getDefaultStoreLimit(StoreType.VERIFICATIONS, StorageUnitType.UNIT_TYPE_2024) + 1,
);
}

Expand Down
4 changes: 2 additions & 2 deletions apps/hubble/src/storage/stores/castStore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CastAddMessage, CastId, CastRemoveMessage, getDefaultStoreLimit, StoreType } from "@farcaster/hub-nodejs";
import { CastAddMessage, CastId, CastRemoveMessage } from "@farcaster/hub-nodejs";
import { ResultAsync } from "neverthrow";
import RocksDB from "../db/rocksdb.js";
import { UserPostfix } from "../db/types.js";
Expand All @@ -19,7 +19,7 @@ import { messageDecode } from "../../storage/db/message.js";

class CastStore extends RustStoreBase<CastAddMessage, CastRemoveMessage> {
constructor(db: RocksDB, eventHandler: StoreEventHandler, options: StorePruneOptions = {}) {
const pruneSizeLimit = options.pruneSizeLimit ?? getDefaultStoreLimit(StoreType.CASTS);
const pruneSizeLimit = options.pruneSizeLimit ?? 0;
const rustCastStore = rsCreateCastStore(db.rustDb, eventHandler.getRustStoreEventHandler(), pruneSizeLimit);

super(db, rustCastStore, UserPostfix.CastMessage, eventHandler, pruneSizeLimit);
Expand Down
10 changes: 2 additions & 8 deletions apps/hubble/src/storage/stores/linkStore.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
import {
getDefaultStoreLimit,
LinkAddMessage,
LinkCompactStateMessage,
LinkRemoveMessage,
StoreType,
} from "@farcaster/hub-nodejs";
import { LinkAddMessage, LinkCompactStateMessage, LinkRemoveMessage } from "@farcaster/hub-nodejs";
import { makeFidKey, messageDecode } from "../../storage/db/message.js";
import { UserPostfix } from "../db/types.js";
import { MessagesPage, PageOptions, StorePruneOptions } from "./types.js";
Expand Down Expand Up @@ -38,7 +32,7 @@ import storeEventHandler from "./storeEventHandler.js";
*/
class LinkStore extends RustStoreBase<LinkAddMessage, LinkRemoveMessage> {
constructor(db: RocksDB, eventHandler: storeEventHandler, options: StorePruneOptions = {}) {
const pruneSizeLimit = options.pruneSizeLimit ?? getDefaultStoreLimit(StoreType.LINKS);
const pruneSizeLimit = options.pruneSizeLimit ?? 0;
const rustReactionStore = rsLinkStore.CreateLinkStore(
db.rustDb,
eventHandler.getRustStoreEventHandler(),
Expand Down
12 changes: 2 additions & 10 deletions apps/hubble/src/storage/stores/reactionStore.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
import {
CastId,
Message,
ReactionAddMessage,
ReactionRemoveMessage,
ReactionType,
StoreType,
getDefaultStoreLimit,
} from "@farcaster/hub-nodejs";
import { CastId, ReactionAddMessage, ReactionRemoveMessage, ReactionType } from "@farcaster/hub-nodejs";
import {
rsCreateReactionStore,
rsGetReactionAdd,
Expand All @@ -26,7 +18,7 @@ import { messageDecode } from "../../storage/db/message.js";

class ReactionStore extends RustStoreBase<ReactionAddMessage, ReactionRemoveMessage> {
constructor(db: RocksDB, eventHandler: StoreEventHandler, options: StorePruneOptions = {}) {
const pruneSizeLimit = options.pruneSizeLimit ?? getDefaultStoreLimit(StoreType.REACTIONS);
const pruneSizeLimit = options.pruneSizeLimit ?? 0;
const rustReactionStore = rsCreateReactionStore(db.rustDb, eventHandler.getRustStoreEventHandler(), pruneSizeLimit);

super(db, rustReactionStore, UserPostfix.ReactionMessage, eventHandler, pruneSizeLimit);
Expand Down
14 changes: 9 additions & 5 deletions apps/hubble/src/storage/stores/rustStoreBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,24 +168,28 @@ export abstract class RustStoreBase<TAdd extends Message, TRemove extends Messag

async pruneMessages(fid: number): HubAsyncResult<number[]> {
const cachedCount = await this._eventHandler.getCacheMessageCount(fid, this._postfix, false);
const units = await this._eventHandler.getCurrentStorageUnitsForFid(fid);
let maxCount = await this._eventHandler.getMaxMessageCount(fid, this._postfix);

// Require storage cache to be synced to prune
if (cachedCount.isErr()) {
return err(cachedCount.error);
}

if (units.isErr()) {
return err(units.error);
if (maxCount.isErr()) {
return err(maxCount.error);
}

if (this._pruneSizeLimit > 0 && this._pruneSizeLimit < maxCount.value) {
maxCount = ok(this._pruneSizeLimit);
}

// Return immediately if there are no messages to prune
if (cachedCount.value <= this._pruneSizeLimit * units.value) {
if (cachedCount.value <= maxCount.value) {
return ok([]);
}

const result = await ResultAsync.fromPromise(
rsPruneMessages(this._rustStore, fid, cachedCount.value, units.value),
rsPruneMessages(this._rustStore, fid, cachedCount.value, maxCount.value),
rustErrorToHubError,
);
if (result.isErr()) {
Expand Down
Loading

0 comments on commit dd634c7

Please sign in to comment.