-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmod.ts
70 lines (60 loc) · 1.71 KB
/
mod.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
export type PaleonStorageRecord = {
datetime: Date;
};
export interface PaleonStorageReadOptions {
since?: Date;
until?: Date;
limit?: number;
reverse?: boolean;
}
/**
* Interface for a storage of time series data.
*/
export type PaleonStorage<
T extends PaleonStorageRecord = PaleonStorageRecord,
> = {
readonly subject: Deno.KvKey | Deno.KvKeyPart;
read(options?: PaleonStorageReadOptions): ReadableStream<T>;
write(value: T): Promise<Deno.KvCommitResult>;
erase(): Promise<void>;
close(): void;
};
export const PaleonStorage = {
async open<T extends PaleonStorageRecord>(
subject: Deno.KvKey | Deno.KvKeyPart,
): Promise<PaleonStorage<T>> {
const prefix = [subject].flat();
const kv = await Deno.openKv();
return {
subject,
write(record: T) {
const time = record.datetime.getTime();
const key = [...prefix, time, crypto.randomUUID()];
return kv.set(key, record);
},
read(options?: PaleonStorageReadOptions) {
const start = [...prefix, options?.since?.getTime() ?? 0];
const end = [...prefix, options?.until?.getTime() ?? Infinity];
const limit = options?.limit;
const reverse = options?.reverse ?? true;
const iter = kv.list<T>({ start, end }, { limit, reverse });
return new ReadableStream<T>({
async start(controller) {
for await (const { value } of iter) {
controller.enqueue(value);
}
controller.close();
},
});
},
async erase() {
for await (const { key } of kv.list<T>({ prefix })) {
await kv.delete(key);
}
},
close() {
kv.close();
},
};
},
};