Skip to content

Commit

Permalink
feat(solana): cached solana provider (#905)
Browse files Browse the repository at this point in the history
Signed-off-by: Reinis Martinsons <[email protected]>
Co-authored-by: Paul <[email protected]>
  • Loading branch information
Reinis-FRP and pxrl authored Mar 5, 2025
1 parent 5d96656 commit 548ae9c
Show file tree
Hide file tree
Showing 13 changed files with 493 additions and 1 deletion.
35 changes: 35 additions & 0 deletions src/caching/Memory/MemoryCacheClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { CachingMechanismInterface } from "../../interfaces";

interface CacheEntry {
value: unknown;
expiresAt?: number | null;
}

/**
* A simple in-memory cache client that stores values in a map with TTL support.
*/
export class MemoryCacheClient implements CachingMechanismInterface {
private cache: Map<string, CacheEntry> = new Map();

get<T>(key: string): Promise<T | null> {
return new Promise((resolve) => {
const entry = this.cache.get(key);
if (entry === undefined) return resolve(null);

if (entry.expiresAt && entry.expiresAt < Date.now()) {
this.cache.delete(key);
return resolve(null);
}

resolve(entry.value as T);
});
}

set<T>(key: string, value: T, ttl?: number): Promise<string | undefined> {
return new Promise((resolve) => {
const expiresAt = ttl ? Date.now() + ttl * 1000 : null;
this.cache.set(key, { value, expiresAt });
resolve(key);
});
}
}
1 change: 1 addition & 0 deletions src/caching/Memory/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./MemoryCacheClient";
1 change: 1 addition & 0 deletions src/caching/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./IPFS";
export * from "./Arweave";
export * from "./Memory";
124 changes: 124 additions & 0 deletions src/providers/solana/cachedRpcFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { RpcTransport, GetTransactionApi, RpcFromTransport, SolanaRpcApiFromTransport } from "@solana/web3.js";
import { is, object, optional, string, tuple } from "superstruct";
import { CachingMechanismInterface } from "../../interfaces";
import { SolanaClusterRpcFactory } from "./baseRpcFactories";
import { RateLimitedSolanaRpcFactory } from "./rateLimitedRpcFactory";
import { CacheType } from "../utils";
import { jsonReplacerWithBigInts, jsonReviverWithBigInts } from "../../utils";

export class CachedSolanaRpcFactory extends SolanaClusterRpcFactory {
public readonly getTransactionCachePrefix: string;

// Holds the underlying transport that the cached transport wraps.
protected rateLimitedTransport: RpcTransport;

// RPC client based on the rate limited transport, used internally to check confirmation status.
protected rateLimitedRpcClient: RpcFromTransport<SolanaRpcApiFromTransport<RpcTransport>, RpcTransport>;

constructor(
providerCacheNamespace: string,
readonly redisClient?: CachingMechanismInterface,
...rateLimitedConstructorParams: ConstructorParameters<typeof RateLimitedSolanaRpcFactory>
) {
// SolanaClusterRpcFactory shares the last two constructor parameters with RateLimitedSolanaRpcFactory.
const superParams = rateLimitedConstructorParams.slice(-2) as [
ConstructorParameters<typeof SolanaClusterRpcFactory>[0], // clusterUrl: ClusterUrl
ConstructorParameters<typeof SolanaClusterRpcFactory>[1], // chainId: number
];
super(...superParams);

// Create the rate limited transport and RPC client.
const rateLimitedRpcFactory = new RateLimitedSolanaRpcFactory(...rateLimitedConstructorParams);
this.rateLimitedTransport = rateLimitedRpcFactory.createTransport();
this.rateLimitedRpcClient = rateLimitedRpcFactory.createRpcClient();

// Pre-compute as much of the redis key as possible.
const cachePrefix = `${providerCacheNamespace},${new URL(this.clusterUrl).hostname},${this.chainId}`;
this.getTransactionCachePrefix = `${cachePrefix}:getTransaction,`;
}

public createTransport(): RpcTransport {
return async <TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> => {
const { method, params } = args[0].payload as { method: string; params?: unknown[] };
const cacheType = this.redisClient ? this.cacheType(method) : CacheType.NONE;

if (cacheType === CacheType.NONE) {
return this.rateLimitedTransport<TResponse>(...args);
}

const redisKey = this.buildRedisKey(method, params);

// Attempt to pull the result from the cache.
const redisResult = await this.redisClient?.get<string>(redisKey);

// If cache has the result, parse the json and return it.
if (redisResult) {
return JSON.parse(redisResult, jsonReviverWithBigInts);
}

// Cache does not have the result. Query it directly and cache it if finalized.
return this.requestAndCacheFinalized<TResponse>(...args);
};
}

private async requestAndCacheFinalized<TResponse>(...args: Parameters<RpcTransport>): Promise<TResponse> {
const { method, params } = args[0].payload as { method: string; params?: unknown[] };

// Only handles getTransaction right now.
if (method !== "getTransaction") return this.rateLimitedTransport<TResponse>(...args);

// Do not throw if params are not valid, just skip caching and pass through to the underlying transport.
if (!this.isGetTransactionParams(params)) return this.rateLimitedTransport<TResponse>(...args);

// Check the confirmation status first to avoid caching non-finalized transactions.
const getSignatureStatusesResponse = await this.rateLimitedRpcClient
.getSignatureStatuses([params[0]], {
searchTransactionHistory: true,
})
.send();

const getTransactionResponse = await this.rateLimitedTransport<TResponse>(...args);

// Cache the transaction only if it is finalized.
if (getSignatureStatusesResponse.value[0]?.confirmationStatus === "finalized") {
const redisKey = this.buildRedisKey(method, params);
await this.redisClient?.set(
redisKey,
JSON.stringify(getTransactionResponse, jsonReplacerWithBigInts),
Number.POSITIVE_INFINITY
);
}

return getTransactionResponse;
}

private buildRedisKey(method: string, params?: unknown[]) {
// Only handles getTransaction right now.
switch (method) {
case "getTransaction":
return this.getTransactionCachePrefix + JSON.stringify(params, jsonReplacerWithBigInts);
default:
throw new Error(`CachedSolanaRpcFactory::buildRedisKey: invalid JSON-RPC method ${method}`);
}
}

private cacheType(method: string): CacheType {
// Today, we only cache getTransaction.
if (method === "getTransaction") {
// We only store finalized transactions in the cache, hence TTL is not required.
return CacheType.NO_TTL;
} else {
return CacheType.NONE;
}
}

private isGetTransactionParams(params: unknown): params is Parameters<GetTransactionApi["getTransaction"]> {
return is(
params,
tuple([
string(), // Signature (Base58 string)
optional(object()), // We use only the tx signature to get its commitment, but pass through the options as is.
])
);
}
}
1 change: 1 addition & 0 deletions src/providers/solana/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from "./baseRpcFactories";
export * from "./cachedRpcFactory";
export * from "./defaultRpcFactory";
export * from "./rateLimitedRpcFactory";
export * from "./utils";
2 changes: 1 addition & 1 deletion src/providers/solana/rateLimitedRpcFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export class RateLimitedSolanaRpcFactory extends SolanaClusterRpcFactory {
private queue: QueueObject<SolanaRateLimitTask>;

// Holds the underlying transport that the rate limiter wraps.
private readonly defaultTransport: RpcTransport;
protected defaultTransport: RpcTransport;

// Takes the same arguments as the SolanaDefaultRpcFactory, but it has an additional parameters to control
// concurrency and logging at the beginning of the list.
Expand Down
14 changes: 14 additions & 0 deletions src/utils/JSONUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,17 @@ export function jsonReviverWithBigNumbers(_key: string, value: unknown): unknown
}
return value;
}

/**
* A replacer for `JSON.stringify` that converts BigInt to a decimal string with 'n' suffix.
*/
export function jsonReplacerWithBigInts(_key: string, value: unknown): unknown {
return typeof value === "bigint" ? value.toString() + "n" : value;
}

/**
* A reviver for `JSON.parse` that converts strings ending in 'n' back to BigInt.
*/
export function jsonReviverWithBigInts(_key: string, value: unknown): unknown {
return typeof value === "string" && /^-?\d+n$/.test(value) ? BigInt(value.slice(0, -1)) : value;
}
138 changes: 138 additions & 0 deletions test/SolanaCachedProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { signature, Commitment, Rpc, SolanaRpcApiFromTransport, RpcTransport } from "@solana/web3.js";
import bs58 from "bs58";
import { createHash } from "crypto";
import winston from "winston";
import { MockRateLimitedSolanaRpcFactory, MockSolanaRpcFactory, MockCachedSolanaRpcFactory } from "./mocks";
import { createSpyLogger, expect, spyLogIncludes } from "./utils";
import { MemoryCacheClient } from "../src/caching";
import { jsonReviverWithBigInts } from "../src/utils";

const chainId = 1234567890;
const url = "https://test.example.com/";
const maxConcurrency = 1;
const pctRpcCallsLogged = 100; // Will use logs to check underlying transport calls.
const providerCacheNamespace = "test";
const testSignature = signature(bs58.encode(createHash("sha512").update("testSignature").digest()));
const getSignatureStatusesParams = [[testSignature], { searchTransactionHistory: true }];
const getTransactionConfig = {
commitment: "confirmed" as Commitment,
encoding: "base58" as const,
};
const getTransactionResult = {
slot: 0n,
transaction: bs58.encode(Buffer.from("testTransaction")),
blockTime: null,
meta: null,
};

describe("cached solana provider", () => {
let spy: sinon.SinonSpy;
let mockRpcFactory: MockSolanaRpcFactory;
let memoryCache: MemoryCacheClient;
let cachedRpcClient: Rpc<SolanaRpcApiFromTransport<RpcTransport>>;

beforeEach(() => {
const spyLoggerResult = createSpyLogger();
spy = spyLoggerResult.spy;

mockRpcFactory = new MockSolanaRpcFactory(url, chainId);
const rateLimitedParams: [number, number, winston.Logger, string, number] = [
maxConcurrency,
pctRpcCallsLogged,
spyLoggerResult.spyLogger,
url,
chainId,
];
const rateLimitedRpcFactory = new MockRateLimitedSolanaRpcFactory(mockRpcFactory, ...rateLimitedParams);
memoryCache = new MemoryCacheClient();
cachedRpcClient = new MockCachedSolanaRpcFactory(
rateLimitedRpcFactory,
providerCacheNamespace,
memoryCache,
...rateLimitedParams
).createRpcClient();
});

it("caches finalized transaction", async () => {
// Prepare required mock results for finalized transaction.
mockRpcFactory.setResult("getSignatureStatuses", getSignatureStatusesParams, {
value: [{ confirmationStatus: "finalized" }],
});
mockRpcFactory.setResult("getTransaction", [testSignature, getTransactionConfig], getTransactionResult);

let result = await cachedRpcClient.getTransaction(testSignature, getTransactionConfig).send();
expect(result).to.deep.equal(getTransactionResult);

// Check the cache.
const cacheKey = `${providerCacheNamespace},${
new URL(url).hostname
},${chainId}:getTransaction,["${testSignature}",${JSON.stringify(getTransactionConfig)}]`;
const cacheValue = JSON.parse((await memoryCache.get(cacheKey)) || "{}", jsonReviverWithBigInts);
expect(cacheValue).to.have.property("result");
expect(cacheValue.result).to.deep.equal(getTransactionResult);

// Expect 2 log entries from the underlying transport: one for getSignatureStatuses and one for getTransaction.
expect(spy.callCount).to.equal(2);
expect(spyLogIncludes(spy, 0, "getSignatureStatuses")).to.be.true;
expect(spyLogIncludes(spy, 1, "getTransaction")).to.be.true;

// Second request should fetch from cache.
result = await cachedRpcClient.getTransaction(testSignature, getTransactionConfig).send();
expect(result).to.deep.equal(getTransactionResult);

// No new log entries should be emitted from the underlying transport, expect the same 2 as after the first request.
expect(spy.callCount).to.equal(2);
});

it("does not cache non-finalized transaction", async () => {
// Prepare required mock results for non-finalized transaction.
mockRpcFactory.setResult("getSignatureStatuses", getSignatureStatusesParams, {
value: [{ confirmationStatus: "processed" }],
});
mockRpcFactory.setResult("getTransaction", [testSignature, getTransactionConfig], getTransactionResult);

let result = await cachedRpcClient.getTransaction(testSignature, getTransactionConfig).send();
expect(result).to.deep.equal(getTransactionResult);

// Check the cache is empty.
const cacheKey = `${providerCacheNamespace},${
new URL(url).hostname
},${chainId}:getTransaction,["${testSignature}",${JSON.stringify(getTransactionConfig)}]`;
const cacheValue = JSON.parse((await memoryCache.get(cacheKey)) || "{}", jsonReviverWithBigInts);
expect(cacheValue).to.be.empty;

// Expect 2 log entries from the underlying transport: one for getSignatureStatuses and one for getTransaction.
expect(spy.callCount).to.equal(2);
expect(spyLogIncludes(spy, 0, "getSignatureStatuses")).to.be.true;
expect(spyLogIncludes(spy, 1, "getTransaction")).to.be.true;

result = await cachedRpcClient.getTransaction(testSignature, getTransactionConfig).send();
expect(result).to.deep.equal(getTransactionResult);

// Second request should have triggered the underlying transport again, doubling the log entries.
expect(spy.callCount).to.equal(4);
expect(spyLogIncludes(spy, 2, "getSignatureStatuses")).to.be.true;
expect(spyLogIncludes(spy, 3, "getTransaction")).to.be.true;
});

it("does not cache other methods", async () => {
let slotResult = 1;
mockRpcFactory.setResult("getSlot", [], slotResult);

let rpcResult = await cachedRpcClient.getSlot().send();
expect(rpcResult).to.equal(BigInt(slotResult));

// Expect 1 log entry from the underlying transport.
expect(spy.callCount).to.equal(1);
expect(spyLogIncludes(spy, 0, "getSlot")).to.be.true;

slotResult = 2;
mockRpcFactory.setResult("getSlot", [], slotResult);
rpcResult = await cachedRpcClient.getSlot().send();
expect(rpcResult).to.equal(BigInt(slotResult));

// Second request should have triggered the underlying transport again, doubling the log entries.
expect(spy.callCount).to.equal(2);
expect(spyLogIncludes(spy, 1, "getSlot")).to.be.true;
});
});
Loading

0 comments on commit 548ae9c

Please sign in to comment.