diff --git a/src/json-crdt-patch/clock/clock.ts b/src/json-crdt-patch/clock/clock.ts index 8fa440c6b3..d3e4b0591c 100644 --- a/src/json-crdt-patch/clock/clock.ts +++ b/src/json-crdt-patch/clock/clock.ts @@ -196,10 +196,10 @@ export class ClockVector extends LogicalClock implements IClockVector { } /** - * Returns a human-readable string representation of the vector clock. + * Returns a human-readable string representation of the clock vector. * * @param tab String to use for indentation. - * @returns Human-readable string representation of the vector clock. + * @returns Human-readable string representation of the clock vector. */ public toString(tab: string = ''): string { const last = this.peers.size; @@ -236,4 +236,13 @@ export class ServerClockVector extends LogicalClock implements IClockVector { public fork(): ServerClockVector { return new ServerClockVector(SESSION.SERVER, this.time); } + + /** + * Returns a human-readable string representation of the clock vector. + * + * @returns Human-readable string representation of the clock vector. + */ + public toString(): string { + return `clock ${this.sid}.${this.time}`; + } } diff --git a/src/json-crdt/file/File.ts b/src/json-crdt/file/File.ts index 8a66dfad41..56e7157b9f 100644 --- a/src/json-crdt/file/File.ts +++ b/src/json-crdt/file/File.ts @@ -1,29 +1,101 @@ -import {Model} from "../model"; -import {PatchLog} from "./PatchLog"; -import {FileModelEncoding} from "./constants"; +import {Model} from '../model'; +import {PatchLog} from './PatchLog'; +import {FileModelEncoding} from './constants'; import {Encoder as SidecarEncoder} from '../codec/sidecar/binary/Encoder'; +import {Decoder as SidecarDecoder} from '../codec/sidecar/binary/Decoder'; import {Encoder as StructuralEncoderCompact} from '../codec/structural/compact/Encoder'; import {Encoder as StructuralEncoderVerbose} from '../codec/structural/verbose/Encoder'; import {encode as encodeCompact} from '../../json-crdt-patch/codec/compact/encode'; import {encode as encodeVerbose} from '../../json-crdt-patch/codec/verbose/encode'; -import type * as types from "./types"; +import {Writer} from '../../util/buffers/Writer'; +import {CborEncoder} from '../../json-pack/cbor/CborEncoder'; +import {JsonEncoder} from '../../json-pack/json/JsonEncoder'; +import {printTree} from '../../util/print/printTree'; +import {decodeModel, decodeNdjsonComponents, decodePatch, decodeSeqCborComponents} from './util'; +import {Patch} from '../../json-crdt-patch'; +import type * as types from './types'; +import type {Printable} from '../../util/print/types'; -export class File { - public static fromModel(model: Model): File { +export class File implements Printable { + public static unserialize(components: types.FileReadSequence): File { + const [view, metadata, model, history, ...frontier] = components; + const modelFormat = metadata[1]; + let decodedModel: Model | null = null; + if (model) { + const isSidecar = modelFormat === FileModelEncoding.SidecarBinary; + if (isSidecar) { + const decoder = new SidecarDecoder(); + if (!(model instanceof Uint8Array)) throw new Error('NOT_BLOB'); + decodedModel = decoder.decode(view, model); + } else { + decodedModel = decodeModel(model); + } + } + let log: PatchLog | null = null; + if (history) { + const [start, patches] = history; + if (start) { + const startModel = decodeModel(start); + log = new PatchLog(startModel); + for (const patch of patches) log.push(decodePatch(patch)); + } + } + if (!log) throw new Error('NO_HISTORY'); + if (!decodedModel) decodedModel = log.replayToEnd(); + if (frontier.length) { + for (const patch of frontier) { + const patchDecoded = decodePatch(patch); + decodedModel.applyPatch(patchDecoded); + log.push(patchDecoded); + } + } + const file = new File(decodedModel, log); + return file; + } + + public static fromNdjson(blob: Uint8Array): File { + const components = decodeNdjsonComponents(blob); + return File.unserialize(components as types.FileReadSequence); + } + + public static fromSeqCbor(blob: Uint8Array): File { + const components = decodeSeqCborComponents(blob); + return File.unserialize(components as types.FileReadSequence); + } + + public static fromModel(model: Model): File { return new File(model, PatchLog.fromModel(model)); } - constructor( - public readonly model: Model, - public readonly history: PatchLog, - ) {} + constructor(public readonly model: Model, public readonly log: PatchLog) {} + + public apply(patch: Patch): void { + const id = patch.getId(); + if (!id) return; + this.model.applyPatch(patch); + this.log.push(patch); + } + + public sync(): () => void { + const {model, log} = this; + const api = model.api; + const autoflushUnsubscribe = api.autoFlush(); + const onPatchUnsubscribe = api.onPatch.listen((patch) => { + log.push(patch); + }); + const onFlushUnsubscribe = api.onFlush.listen((patch) => { + log.push(patch); + }); + return () => { + autoflushUnsubscribe(); + onPatchUnsubscribe(); + onFlushUnsubscribe(); + }; + } public serialize(params: types.FileSerializeParams = {}): types.FileWriteSequence { - const view = this.model.view(); - const metadata: types.FileMetadata = [ - {}, - FileModelEncoding.SidecarBinary, - ]; + if (params.noView && params.model === 'sidecar') throw new Error('SIDECAR_MODEL_WITHOUT_VIEW'); + const metadata: types.FileMetadata = [{}, FileModelEncoding.Auto]; let model: Uint8Array | unknown | null = null; const modelFormat = params.model ?? 'sidecar'; switch (modelFormat) { @@ -35,58 +107,80 @@ export class File { break; } case 'binary': { - metadata[1] = FileModelEncoding.StructuralBinary; model = this.model.toBinary(); break; } case 'compact': { - metadata[1] = FileModelEncoding.StructuralCompact; model = new StructuralEncoderCompact().encode(this.model); break; } case 'verbose': { - metadata[1] = FileModelEncoding.StructuralVerbose; model = new StructuralEncoderVerbose().encode(this.model); break; } + case 'none': { + model = null; + break; + } default: throw new Error(`Invalid model format: ${modelFormat}`); } - const history: types.FileWriteSequenceHistory = [ - null, - [], - ]; + const history: types.FileWriteSequenceHistory = [null, []]; const patchFormat = params.history ?? 'binary'; switch (patchFormat) { case 'binary': { - history[0] = this.history.start.toBinary(); - this.history.patches.forEach(({v}) => { + history[0] = this.log.start.toBinary(); + this.log.patches.forEach(({v}) => { history[1].push(v.toBinary()); }); break; } case 'compact': { - history[0] = new StructuralEncoderCompact().encode(this.history.start); - this.history.patches.forEach(({v}) => { + history[0] = new StructuralEncoderCompact().encode(this.log.start); + this.log.patches.forEach(({v}) => { history[1].push(encodeCompact(v)); }); break; } case 'verbose': { - history[0] = new StructuralEncoderVerbose().encode(this.history.start); - this.history.patches.forEach(({v}) => { + history[0] = new StructuralEncoderVerbose().encode(this.log.start); + this.log.patches.forEach(({v}) => { history[1].push(encodeVerbose(v)); }); break; } + case 'none': { + break; + } default: throw new Error(`Invalid history format: ${patchFormat}`); } - return [ - view, - metadata, - model, - history, - ]; + return [params.noView ? null : this.model.view(), metadata, model, history]; + } + + public toBinary(params: types.FileEncodingParams): Uint8Array { + const sequence = this.serialize(params); + const writer = new Writer(16 * 1024); + switch (params.format) { + case 'ndjson': { + const json = new JsonEncoder(writer); + for (const component of sequence) { + json.writeAny(component); + json.writer.u8('\n'.charCodeAt(0)); + } + return json.writer.flush(); + } + case 'seq.cbor': { + const cbor = new CborEncoder(writer); + for (const component of sequence) cbor.writeAny(component); + return cbor.writer.flush(); + } + } + } + + // ---------------------------------------------------------------- Printable + + public toString(tab?: string) { + return `file` + printTree(tab, [(tab) => this.model.toString(tab), () => '', (tab) => this.log.toString(tab)]); } } diff --git a/src/json-crdt/file/PatchLog.ts b/src/json-crdt/file/PatchLog.ts index 7aa07cf7b8..a2040eb573 100644 --- a/src/json-crdt/file/PatchLog.ts +++ b/src/json-crdt/file/PatchLog.ts @@ -1,9 +1,12 @@ -import {ITimestampStruct, Patch, ServerClockVector, compare} from "../../json-crdt-patch"; -import {AvlMap} from "../../util/trees/avl/AvlMap"; -import {Model} from "../model"; +import {ITimestampStruct, Patch, compare} from '../../json-crdt-patch'; +import {printTree} from '../../util/print/printTree'; +import {AvlMap} from '../../util/trees/avl/AvlMap'; +import {Model} from '../model'; +import type {Printable} from '../../util/print/types'; +import {first, next} from '../../util/trees/util'; -export class PatchLog { - public static fromModel (model: Model): PatchLog { +export class PatchLog implements Printable { + public static fromModel(model: Model): PatchLog { const start = new Model(model.clock.clone()); const log = new PatchLog(start); if (model.api.builder.patch.ops.length) { @@ -15,11 +18,44 @@ export class PatchLog { public readonly patches = new AvlMap(compare); - constructor (public readonly start: Model) {} + constructor(public readonly start: Model) {} public push(patch: Patch): void { const id = patch.getId(); if (!id) return; this.patches.set(id, patch); } + + public replayToEnd(): Model { + const clone = this.start.clone(); + for (let node = first(this.patches.root); node; node = next(node)) clone.applyPatch(node.v); + return clone; + } + + public replayTo(ts: ITimestampStruct): Model { + const clone = this.start.clone(); + for (let node = first(this.patches.root); node && compare(ts, node.k) >= 0; node = next(node)) + clone.applyPatch(node.v); + return clone; + } + + // ---------------------------------------------------------------- Printable + + public toString(tab?: string) { + const log: Patch[] = []; + this.patches.forEach(({v}) => log.push(v)); + return ( + `log` + + printTree(tab, [ + (tab) => this.start.toString(tab), + () => '', + (tab) => + 'history' + + printTree( + tab, + log.map((patch, i) => (tab) => `${i}: ${patch.toString(tab)}`), + ), + ]) + ); + } } diff --git a/src/json-crdt/file/__tests__/File.spec.ts b/src/json-crdt/file/__tests__/File.spec.ts new file mode 100644 index 0000000000..66222b0eda --- /dev/null +++ b/src/json-crdt/file/__tests__/File.spec.ts @@ -0,0 +1,157 @@ +import {s} from '../../../json-crdt-patch'; +import {Model} from '../../model'; +import {File} from '../File'; +import {JsonDecoder} from '../../../json-pack/json/JsonDecoder'; +import {CborDecoder} from '../../../json-pack/cbor/CborDecoder'; +import {FileEncodingParams} from '../types'; + +const setup = (view: unknown) => { + const model = Model.withServerClock(); + model.api.root(view); + const file = File.fromModel(model); + return {model, file}; +}; + +test('can create File from new model', () => { + const model = Model.withServerClock().setSchema( + s.obj({ + foo: s.str('bar'), + }), + ); + const file = File.fromModel(model); + expect(file.log.start.view()).toBe(undefined); + expect(file.model.view()).toEqual({ + foo: 'bar', + }); + expect(file.log.start.clock.sid).toBe(file.model.clock.sid); +}); + +test.todo('patches are flushed and stored in memory'); +test.todo('can replay history'); + +describe('.toBinary()', () => { + describe('can read first value as view', () => { + test('.ndjson', () => { + const {file} = setup({foo: 'bar'}); + const blob = file.toBinary({format: 'ndjson', model: 'compact', history: 'compact'}); + const decoder = new JsonDecoder(); + const view = decoder.read(blob); + expect(view).toEqual({foo: 'bar'}); + }); + + test('.seq.cbor', () => { + const {file} = setup({foo: 'bar'}); + const blob = file.toBinary({format: 'seq.cbor'}); + const decoder = new CborDecoder(); + const view = decoder.read(blob); + expect(view).toEqual({foo: 'bar'}); + }); + }); + + describe('can decode from blob', () => { + test('.ndjson', () => { + const {file} = setup({foo: 'bar'}); + const blob = file.toBinary({format: 'ndjson', model: 'compact', history: 'compact'}); + const file2 = File.fromNdjson(blob); + expect(file2.model.view()).toEqual({foo: 'bar'}); + expect(file2.model !== file.model).toBe(true); + expect(file.log.start.view()).toEqual(undefined); + expect(file.log.replayToEnd().view()).toEqual({foo: 'bar'}); + }); + + test('.seq.cbor', () => { + const {file} = setup({foo: 'bar'}); + const blob = file.toBinary({format: 'seq.cbor', model: 'binary', history: 'binary'}); + const file2 = File.fromSeqCbor(blob); + expect(file2.model.view()).toEqual({foo: 'bar'}); + expect(file2.model !== file.model).toBe(true); + expect(file.log.start.view()).toEqual(undefined); + expect(file.log.replayToEnd().view()).toEqual({foo: 'bar'}); + }); + }); + + const assertEncoding = (file: File, params: FileEncodingParams) => { + const blob = file.toBinary(params); + // if (params.format === 'ndjson') console.log(Buffer.from(blob).toString('utf8')) + const file2 = params.format === 'seq.cbor' ? File.fromSeqCbor(blob) : File.fromNdjson(blob); + expect(file2.model.view()).toEqual(file.model.view()); + expect(file2.model !== file.model).toBe(true); + expect(file2.log.start.view()).toEqual(undefined); + expect(file2.log.replayToEnd().view()).toEqual(file.model.view()); + expect(file2.log.patches.size()).toBe(file.log.patches.size()); + }; + + describe('can encode/decode all format combinations', () => { + const formats: FileEncodingParams['format'][] = ['ndjson', 'seq.cbor']; + const modelFormats: FileEncodingParams['model'][] = ['sidecar', 'binary', 'compact', 'verbose']; + const historyFormats: FileEncodingParams['history'][] = ['binary', 'compact', 'verbose']; + const noViews = [true, false]; + for (const format of formats) { + for (const model of modelFormats) { + for (const history of historyFormats) { + for (const noView of noViews) { + if (noView && model === 'sidecar') continue; + const params = {format, model, history, noView}; + test(JSON.stringify(params), () => { + const {file} = setup({foo: 'bar'}); + assertEncoding(file, params); + }); + } + } + } + } + }); +}); + +describe('.unserialize()', () => { + test('applies frontier', () => { + const {file, model} = setup({foo: 'bar'}); + const clone = model.clone(); + clone.api.obj([]).set({ + xyz: 123, + }); + const serialized = file.serialize({ + history: 'binary', + }); + serialized.push(clone.api.flush().toBinary()); + expect(file.model.view()).toEqual({foo: 'bar'}); + const file2 = File.unserialize(serialized); + expect(file2.model.view()).toEqual({foo: 'bar', xyz: 123}); + }); +}); + +describe('.sync()', () => { + test('keeps track of local changes', async () => { + const {file, model} = setup({foo: 'bar'}); + file.sync(); + model.api.obj([]).set({x: 1}); + await Promise.resolve(); + expect(file.model.view()).toEqual({foo: 'bar', x: 1}); + expect(file.log.replayToEnd().view()).toEqual({foo: 'bar', x: 1}); + }); + + test('processes local transactions', async () => { + const {file, model} = setup({foo: 'bar'}); + file.sync(); + const logLength = file.log.patches.size(); + model.api.transaction(() => { + model.api.obj([]).set({x: 1}); + model.api.obj([]).set({y: 2}); + }); + expect(file.log.patches.size()).toBe(logLength + 1); + }); + + test('keeps track of remote changes', async () => { + const {file, model} = setup({foo: 'bar'}); + const clone = model.clone(); + file.sync(); + clone.api.obj([]).set({x: 1}); + expect(clone.view()).toEqual({foo: 'bar', x: 1}); + expect(file.model.view()).toEqual({foo: 'bar'}); + const patch = clone.api.flush(); + file.model.applyPatch(patch); + await Promise.resolve(); + expect(file.model.view()).toEqual({foo: 'bar', x: 1}); + expect(file.log.replayToEnd().view()).toEqual({foo: 'bar', x: 1}); + }); +}); diff --git a/src/json-crdt/file/__tests__/PatchLog.spec.ts b/src/json-crdt/file/__tests__/PatchLog.spec.ts new file mode 100644 index 0000000000..c15e43c5c6 --- /dev/null +++ b/src/json-crdt/file/__tests__/PatchLog.spec.ts @@ -0,0 +1,29 @@ +import {Model} from '../../model'; +import {File} from '../File'; + +const setup = (view: unknown) => { + const model = Model.withServerClock(); + model.api.root(view); + const file = File.fromModel(model); + return {model, file}; +}; + +test('can replay to specific patch', () => { + const {file} = setup({foo: 'bar'}); + const model = file.model.clone(); + model.api.obj([]).set({x: 1}); + const patch1 = model.api.flush(); + model.api.obj([]).set({y: 2}); + const patch2 = model.api.flush(); + file.apply(patch1); + file.apply(patch2); + const model2 = file.log.replayToEnd(); + const model3 = file.log.replayTo(patch1.getId()!); + const model4 = file.log.replayTo(patch2.getId()!); + expect(model.view()).toEqual({foo: 'bar', x: 1, y: 2}); + expect(file.model.view()).toEqual({foo: 'bar', x: 1, y: 2}); + expect(file.log.start.view()).toEqual(undefined); + expect(model2.view()).toEqual({foo: 'bar', x: 1, y: 2}); + expect(model3.view()).toEqual({foo: 'bar', x: 1}); + expect(model4.view()).toEqual({foo: 'bar', x: 1, y: 2}); +}); diff --git a/src/json-crdt/file/constants.ts b/src/json-crdt/file/constants.ts index 3ccde424af..912083d9dd 100644 --- a/src/json-crdt/file/constants.ts +++ b/src/json-crdt/file/constants.ts @@ -1,7 +1,4 @@ export const enum FileModelEncoding { - None = 0, + Auto = 0, SidecarBinary = 1, - StructuralBinary = 5, - StructuralCompact = 6, - StructuralVerbose = 7, } diff --git a/src/json-crdt/file/types.ts b/src/json-crdt/file/types.ts index 32b4450530..5d2f6cc8e3 100644 --- a/src/json-crdt/file/types.ts +++ b/src/json-crdt/file/types.ts @@ -1,9 +1,6 @@ -import type {FileModelEncoding} from "./constants"; +import type {FileModelEncoding} from './constants'; -export type FileMetadata = [ - map: {}, - modelFormat: FileModelEncoding, -]; +export type FileMetadata = [map: {}, modelFormat: FileModelEncoding]; export type FileWriteSequence = [ view: unknown | null, @@ -12,18 +9,16 @@ export type FileWriteSequence = [ history: FileWriteSequenceHistory, ]; -export type FileWriteSequenceHistory = [ - model: Uint8Array | unknown | null, - patches: Array, -]; +export type FileWriteSequenceHistory = [model: Uint8Array | unknown | null, patches: Array]; -export type FileReadSequence = [ - ...FileWriteSequence, - ...frontier: Array, -]; +export type FileReadSequence = [...FileWriteSequence, ...frontier: Array]; export interface FileSerializeParams { noView?: boolean; - model?: 'sidecar' | 'binary' | 'compact' | 'verbose'; - history?: 'binary' | 'compact' | 'verbose'; + model?: 'sidecar' | 'binary' | 'compact' | 'verbose' | 'none'; + history?: 'binary' | 'compact' | 'verbose' | 'none'; +} + +export interface FileEncodingParams extends FileSerializeParams { + format: 'ndjson' | 'seq.cbor'; } diff --git a/src/json-crdt/file/util.ts b/src/json-crdt/file/util.ts new file mode 100644 index 0000000000..c268f98a97 --- /dev/null +++ b/src/json-crdt/file/util.ts @@ -0,0 +1,50 @@ +import {JsonDecoder} from '../../json-pack/json/JsonDecoder'; +import {CborDecoder} from '../../json-pack/cbor/CborDecoder'; +import {Model} from '../model'; +import {Decoder as StructuralDecoderCompact} from '../codec/structural/compact/Decoder'; +import {Decoder as StructuralDecoderVerbose} from '../codec/structural/verbose/Decoder'; +import {decode as decodeCompact} from '../../json-crdt-patch/codec/compact/decode'; +import {decode as decodeVerbose} from '../../json-crdt-patch/codec/verbose/decode'; +import {Patch} from '../../json-crdt-patch'; +import type {JsonCrdtCompactDocument} from '../codec/structural/compact'; +import type {JsonCrdtVerboseDocument} from '../codec/structural/verbose'; +import type {CompactCodecPatch} from '../../json-crdt-patch/codec/compact'; +import type {JsonCodecPatch} from '../../json-crdt-patch/codec/verbose'; + +export const decodeNdjsonComponents = (blob: Uint8Array): unknown[] => { + const decoder = new JsonDecoder(); + const reader = decoder.reader; + reader.reset(blob); + const components: unknown[] = []; + while (reader.x < blob.length) { + components.push(decoder.readAny()); + const nl = reader.u8(); + if (nl !== '\n'.charCodeAt(0)) throw new Error('NDJSON_UNEXPECTED_NEWLINE'); + } + return components; +}; + +export const decodeSeqCborComponents = (blob: Uint8Array): unknown[] => { + const decoder = new CborDecoder(); + const reader = decoder.reader; + reader.reset(blob); + const components: unknown[] = []; + while (reader.x < blob.length) components.push(decoder.val()); + return components; +}; + +export const decodeModel = (serialized: unknown): Model => { + if (!serialized) throw new Error('NO_MODEL'); + if (serialized instanceof Uint8Array) return Model.fromBinary(serialized); + if (Array.isArray(serialized)) return new StructuralDecoderCompact().decode(serialized); + if (typeof serialized === 'object') return new StructuralDecoderVerbose().decode(serialized); + throw new Error('UNKNOWN_MODEL'); +}; + +export const decodePatch = (serialized: unknown): Patch => { + if (!serialized) throw new Error('NO_MODEL'); + if (serialized instanceof Uint8Array) return Patch.fromBinary(serialized); + if (Array.isArray(serialized)) return decodeCompact(serialized); + if (typeof serialized === 'object') return decodeVerbose(serialized); + throw new Error('UNKNOWN_MODEL'); +}; diff --git a/src/json-crdt/model/Model.ts b/src/json-crdt/model/Model.ts index c5876f1a05..ee0c321c70 100644 --- a/src/json-crdt/model/Model.ts +++ b/src/json-crdt/model/Model.ts @@ -21,7 +21,7 @@ export const UNDEFINED = new ConNode(ORIGIN, undefined); * In instance of Model class represents the underlying data structure, * i.e. model, of the JSON CRDT document. */ -export class Model implements Printable { +export class Model> implements Printable { /** * Create a CRDT model which uses logical clock. Logical clock assigns a * logical timestamp to every node and operation. Logical timestamp consists diff --git a/src/json-crdt/model/api/ModelApi.ts b/src/json-crdt/model/api/ModelApi.ts index 97b52b1060..c84170098f 100644 --- a/src/json-crdt/model/api/ModelApi.ts +++ b/src/json-crdt/model/api/ModelApi.ts @@ -39,6 +39,15 @@ export class ModelApi implements SyncStore(); /** Emitted after local changes through `model.api` are applied. */ public readonly onLocalChange = new FanOut(); + /** + * Emitted after local changes through `model.api` are applied. Same as + * `.onLocalChange`, but this event buffered withing a microtask. + */ + public readonly onLocalChanges = new MicrotaskBufferFanOut(this.onLocalChange); + /** Emitted before a transaction is started. */ + public readonly onBeforeTransaction = new FanOut(); + /** Emitted after transaction completes. */ + public readonly onTransaction = new FanOut(); /** Emitted when the model changes. Combines `onReset`, `onPatch` and `onLocalChange`. */ public readonly onChange = new MergeFanOut([this.onReset, this.onPatch, this.onLocalChange]); /** Emitted when the model changes. Same as `.onChange`, but this event is emitted once per microtask. */ @@ -245,10 +254,17 @@ export class ModelApi implements SyncStore void) { + this.onBeforeTransaction.emit(); + callback(); + this.onTransaction.emit(); + } + /** * Flushes the builder and returns a patch. * * @returns A JSON CRDT patch. + * @todo Make this return undefined if there are no operations in the builder. */ public flush(): Patch { const patch = this.builder.flush(); @@ -257,6 +273,28 @@ export class ModelApi implements SyncStore void = undefined; + + /** + * Begins to automatically flush buffered operations into patches, grouping + * operations by microtasks or by transactions. To capture the patch, listen + * to the `.onFlush` event. + * + * @returns Callback to stop auto flushing. + */ + public autoFlush(): () => void { + const drain = () => this.builder.patch.ops.length && this.flush(); + const onLocalChangesUnsubscribe = this.onLocalChanges.listen(drain); + const onBeforeTransactionUnsubscribe = this.onBeforeTransaction.listen(drain); + const onTransactionUnsubscribe = this.onTransaction.listen(drain); + return (this.stopAutoFlush = () => { + this.stopAutoFlush = undefined; + onLocalChangesUnsubscribe(); + onBeforeTransactionUnsubscribe(); + onTransactionUnsubscribe(); + }); + } + // ---------------------------------------------------------------- SyncStore public readonly subscribe = (callback: () => void) => this.onChanges.listen(() => callback()); diff --git a/src/util/trees/avl/AvlMap.ts b/src/util/trees/avl/AvlMap.ts index d0b3751f12..8babe77ee7 100644 --- a/src/util/trees/avl/AvlMap.ts +++ b/src/util/trees/avl/AvlMap.ts @@ -72,6 +72,15 @@ export class AvlMap implements Printable { return !!this.find(k); } + public size(): number { + const root = this.root; + if (!root) return 0; + let curr = first(root); + let size = 1; + while ((curr = next(curr as HeadlessNode) as AvlNode | undefined)) size++; + return size; + } + public getOrNextLower(k: K): AvlNode | undefined { return (findOrNextLower(this.root, k, this.comparator) as AvlNode) || undefined; }