Skip to content

Commit

Permalink
feat: 🎸 add Feed.merge() inline method
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Mar 10, 2024
1 parent a745431 commit c050824
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 3 deletions.
25 changes: 22 additions & 3 deletions src/web3/adl/feed-crdt/Feed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,25 @@ export class Feed implements types.FeedApi, SyncStore<types.FeedOpInsert[]> {
}
}

public async merge(forkCid: Cid): Promise<void> {
if (this.unsaved.length) await this.save();
if (!this.head) throw new Error('INVALID_STATE');
const frames = await Feed.merge(this.deps.cas, this.head.cid, forkCid, this.opsPerFrame);
for (const frame of frames) this.ingestFrameData(frame, true);
let head = frames[frames.length - 1];
let curr = head;
for (let i = frames.length - 2; i >= 0; i--) {
curr.prev = frames[i];
curr = frames[i];
}
let existingCurr: FeedFrame | null = this.head;
while (existingCurr && existingCurr.seq() > curr.seq())
existingCurr = existingCurr.prev;
if (existingCurr) curr.prev = existingCurr.prev; else this.tail = curr;
this.head = head;
this.onChange.emit();
}

// ------------------------------------------------------------------ FeedApi

public add(data: unknown): hlc.HlcDto {
Expand Down Expand Up @@ -219,7 +238,7 @@ export class Feed implements types.FeedApi, SyncStore<types.FeedOpInsert[]> {
const prevCidDto = tail.data[0];
if (!prevCidDto) return;
const cid = Cid.fromBinaryV1(prevCidDto);
const frame = await FeedFrame.read(cid, this.deps.cas);
const frame = this.tail?.prev ?? await FeedFrame.read(cid, this.deps.cas);
tail.prev = frame;
this.tail = frame;
this.ingestFrameData(frame);
Expand All @@ -229,7 +248,7 @@ export class Feed implements types.FeedApi, SyncStore<types.FeedOpInsert[]> {
return !!this.tail?.data[0];
}

protected ingestFrameData(frame: FeedFrame): void {
protected ingestFrameData(frame: FeedFrame, silent?: boolean): void {
const [, , ops] = frame.data;
for (const op of ops) {
switch (op[0]) {
Expand All @@ -247,7 +266,7 @@ export class Feed implements types.FeedApi, SyncStore<types.FeedOpInsert[]> {
}
}
}
this.onChange.emit();
if (!silent) this.onChange.emit();
}

@mutex
Expand Down
107 changes: 107 additions & 0 deletions src/web3/adl/feed-crdt/__tests__/Feed-merge.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {CidCasMemory} from '../../../store/cas/CidCasMemory';
import {CidCasStructCbor} from '../../../store/cas/CidCasStructCbor';
import {Feed} from '../Feed';
import {FeedFactory, FeedFactoryDependencies} from '../FeedFactory';
import {FeedFrame} from '../FeedFrame';

interface SetupOpts {
factory?: Pick<FeedFactoryDependencies, 'opsPerFrame'>;
Expand Down Expand Up @@ -282,3 +283,109 @@ describe('can merge', () => {
]);
});
});

describe('can merge inline', () => {
const generateCommonParent = async () => {
const deps = setup({factory: {opsPerFrame: 3}});
const common = deps.feeds.make();
common.add('c1');
common.add('c2');
common.add('c3');
await common.save();
common.add('c4');
common.add('c5');
common.add('c6');
await common.save();
common.add('c7');
common.add('c8');
common.add('c9');
await common.save();
return {...deps, common};
};

test('when both sides one step from common parent', async () => {
const {feeds, cas, common} = await generateCommonParent();
const fork = await feeds.load(common.cid()!);
fork.add('f1');
await fork.save();
common.add('c10');
await common.save();
await common.merge(fork.cid()!);
expect(common.getSnapshot().map(([, , data]) => data)).toStrictEqual([
'c1',
'c2',
'c3',
'c4',
'c5',
'c6',
'c7',
'c8',
'c9',
'f1',
'c10',
]);
let frame: null | FeedFrame = (common as any).head;
let seq = frame?.seq() ?? 0;
while (frame) {
expect(frame.seq()).toBe(seq);
seq--;
frame = frame.prev;
}
});

test('can merge forks twice', async () => {
const {feeds, cas, common} = await generateCommonParent();
const fork = await feeds.load(common.cid()!);
fork.add('f1');
await fork.save();
common.add('c10');
await common.save();
await common.merge(fork.cid()!);
const fork2 = await feeds.load(common.cid()!);
common.add('c11');
common.add('c12');
fork2.add('f2');
fork2.add('f3');
fork2.add('f4');
await fork2.save();
await common.merge(fork2.cid()!);
expect(common.getSnapshot().map(([, , data]) => data)).toStrictEqual([
'c1',
'c2',
'c3',
'c4',
'c5',
'c6',
'c7',
'c8',
'c9',
'f1',
'c10',
'c11',
'c12',
'f2',
'f3',
'f4',
]);
const common2 = await feeds.load(common.cid()!);
await common2.loadAll();
expect(common2.getSnapshot().map(([, , data]) => data)).toStrictEqual([
'c1',
'c2',
'c3',
'c4',
'c5',
'c6',
'c7',
'c8',
'c9',
'f1',
'c10',
'c11',
'c12',
'f2',
'f3',
'f4',
]);
});
});

0 comments on commit c050824

Please sign in to comment.