From c05082401437a4f0141dd714e8c7f9294325dcb8 Mon Sep 17 00:00:00 2001 From: streamich Date: Sun, 10 Mar 2024 17:49:17 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20add=20Feed.merge()=20inl?= =?UTF-8?q?ine=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/web3/adl/feed-crdt/Feed.ts | 25 +++- .../feed-crdt/__tests__/Feed-merge.spec.ts | 107 ++++++++++++++++++ 2 files changed, 129 insertions(+), 3 deletions(-) diff --git a/src/web3/adl/feed-crdt/Feed.ts b/src/web3/adl/feed-crdt/Feed.ts index 5ae105c521..7f92e9223f 100644 --- a/src/web3/adl/feed-crdt/Feed.ts +++ b/src/web3/adl/feed-crdt/Feed.ts @@ -180,6 +180,25 @@ export class Feed implements types.FeedApi, SyncStore { } } + public async merge(forkCid: Cid): Promise { + if (this.unsaved.length) await this.save(); + if (!this.head) throw new Error('INVALID_STATE'); + const frames = await Feed.merge(this.deps.cas, this.head.cid, forkCid, this.opsPerFrame); + for (const frame of frames) this.ingestFrameData(frame, true); + let head = frames[frames.length - 1]; + let curr = head; + for (let i = frames.length - 2; i >= 0; i--) { + curr.prev = frames[i]; + curr = frames[i]; + } + let existingCurr: FeedFrame | null = this.head; + while (existingCurr && existingCurr.seq() > curr.seq()) + existingCurr = existingCurr.prev; + if (existingCurr) curr.prev = existingCurr.prev; else this.tail = curr; + this.head = head; + this.onChange.emit(); + } + // ------------------------------------------------------------------ FeedApi public add(data: unknown): hlc.HlcDto { @@ -219,7 +238,7 @@ export class Feed implements types.FeedApi, SyncStore { const prevCidDto = tail.data[0]; if (!prevCidDto) return; const cid = Cid.fromBinaryV1(prevCidDto); - const frame = await FeedFrame.read(cid, this.deps.cas); + const frame = this.tail?.prev ?? await FeedFrame.read(cid, this.deps.cas); tail.prev = frame; this.tail = frame; this.ingestFrameData(frame); @@ -229,7 +248,7 @@ export class Feed implements types.FeedApi, SyncStore { return !!this.tail?.data[0]; } - protected ingestFrameData(frame: FeedFrame): void { + protected ingestFrameData(frame: FeedFrame, silent?: boolean): void { const [, , ops] = frame.data; for (const op of ops) { switch (op[0]) { @@ -247,7 +266,7 @@ export class Feed implements types.FeedApi, SyncStore { } } } - this.onChange.emit(); + if (!silent) this.onChange.emit(); } @mutex diff --git a/src/web3/adl/feed-crdt/__tests__/Feed-merge.spec.ts b/src/web3/adl/feed-crdt/__tests__/Feed-merge.spec.ts index 46dda81faf..dab1eae8de 100644 --- a/src/web3/adl/feed-crdt/__tests__/Feed-merge.spec.ts +++ b/src/web3/adl/feed-crdt/__tests__/Feed-merge.spec.ts @@ -3,6 +3,7 @@ import {CidCasMemory} from '../../../store/cas/CidCasMemory'; import {CidCasStructCbor} from '../../../store/cas/CidCasStructCbor'; import {Feed} from '../Feed'; import {FeedFactory, FeedFactoryDependencies} from '../FeedFactory'; +import {FeedFrame} from '../FeedFrame'; interface SetupOpts { factory?: Pick; @@ -282,3 +283,109 @@ describe('can merge', () => { ]); }); }); + +describe('can merge inline', () => { + const generateCommonParent = async () => { + const deps = setup({factory: {opsPerFrame: 3}}); + const common = deps.feeds.make(); + common.add('c1'); + common.add('c2'); + common.add('c3'); + await common.save(); + common.add('c4'); + common.add('c5'); + common.add('c6'); + await common.save(); + common.add('c7'); + common.add('c8'); + common.add('c9'); + await common.save(); + return {...deps, common}; + }; + + test('when both sides one step from common parent', async () => { + const {feeds, cas, common} = await generateCommonParent(); + const fork = await feeds.load(common.cid()!); + fork.add('f1'); + await fork.save(); + common.add('c10'); + await common.save(); + await common.merge(fork.cid()!); + expect(common.getSnapshot().map(([, , data]) => data)).toStrictEqual([ + 'c1', + 'c2', + 'c3', + 'c4', + 'c5', + 'c6', + 'c7', + 'c8', + 'c9', + 'f1', + 'c10', + ]); + let frame: null | FeedFrame = (common as any).head; + let seq = frame?.seq() ?? 0; + while (frame) { + expect(frame.seq()).toBe(seq); + seq--; + frame = frame.prev; + } + }); + + test('can merge forks twice', async () => { + const {feeds, cas, common} = await generateCommonParent(); + const fork = await feeds.load(common.cid()!); + fork.add('f1'); + await fork.save(); + common.add('c10'); + await common.save(); + await common.merge(fork.cid()!); + const fork2 = await feeds.load(common.cid()!); + common.add('c11'); + common.add('c12'); + fork2.add('f2'); + fork2.add('f3'); + fork2.add('f4'); + await fork2.save(); + await common.merge(fork2.cid()!); + expect(common.getSnapshot().map(([, , data]) => data)).toStrictEqual([ + 'c1', + 'c2', + 'c3', + 'c4', + 'c5', + 'c6', + 'c7', + 'c8', + 'c9', + 'f1', + 'c10', + 'c11', + 'c12', + 'f2', + 'f3', + 'f4', + ]); + const common2 = await feeds.load(common.cid()!); + await common2.loadAll(); + expect(common2.getSnapshot().map(([, , data]) => data)).toStrictEqual([ + 'c1', + 'c2', + 'c3', + 'c4', + 'c5', + 'c6', + 'c7', + 'c8', + 'c9', + 'f1', + 'c10', + 'c11', + 'c12', + 'f2', + 'f3', + 'f4', + ]); + }); +});