From 70f17134a019ffd417b7790f762e7914ddcb5351 Mon Sep 17 00:00:00 2001 From: Ry Racherbaumer Date: Wed, 12 Jun 2024 11:40:51 -0500 Subject: [PATCH 1/3] Add stream_all_messages --- bindings_node/src/conversations.rs | 38 ++++++++++++++---------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/bindings_node/src/conversations.rs b/bindings_node/src/conversations.rs index 58a7710e4..5859f2368 100644 --- a/bindings_node/src/conversations.rs +++ b/bindings_node/src/conversations.rs @@ -7,7 +7,7 @@ use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFun use napi::JsFunction; use napi_derive::napi; -// use crate::messages::NapiMessage; +use crate::messages::NapiMessage; use crate::{ groups::{GroupPermissions, NapiGroup}, mls_client::RustXmtpClient, @@ -147,23 +147,21 @@ impl NapiConversations { )) } - // TODO: this fn needs to be sync for it to work with NAPI - // #[napi(ts_args_type = "callback: (err: null | Error, result: NapiGroup) => void")] - // pub async fn stream_all_messages(&self, callback: JsFunction) -> Result { - // let tsfn: ThreadsafeFunction = - // callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?; - // let stream_closer = RustXmtpClient::stream_all_messages_with_callback( - // self.inner_client.clone(), - // move |message| { - // tsfn.call(Ok(message.into()), ThreadsafeFunctionCallMode::Blocking); - // }, - // ) - // .await - // .map_err(|e| Error::from_reason(format!("{}", e)))?; - - // Ok(NapiStreamCloser::new( - // stream_closer.close_fn, - // stream_closer.is_closed_atomic, - // )) - // } + #[napi(ts_args_type = "callback: (err: null | Error, result: NapiMessage) => void")] + pub fn stream_all_messages(&self, callback: JsFunction) -> Result { + let tsfn: ThreadsafeFunction = + callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?; + let stream_closer = RustXmtpClient::stream_all_messages_with_callback_sync( + self.inner_client.clone(), + move |message| { + tsfn.call(Ok(message.into()), ThreadsafeFunctionCallMode::Blocking); + }, + ) + .map_err(|e| Error::from_reason(format!("{}", e)))?; + + Ok(NapiStreamCloser::new( + stream_closer.close_fn, + stream_closer.is_closed_atomic, + )) + } } From 50cab69d72805564052da53bb8950654703accc5 Mon Sep 17 00:00:00 2001 From: Ry Racherbaumer Date: Wed, 12 Jun 2024 11:41:41 -0500 Subject: [PATCH 2/3] Add tests --- bindings_node/test/AsyncStream.ts | 77 ++++++++++++++ bindings_node/test/Conversations.test.ts | 128 +++++++++++++++++++++++ bindings_node/test/helpers.ts | 15 +++ 3 files changed, 220 insertions(+) create mode 100644 bindings_node/test/AsyncStream.ts create mode 100644 bindings_node/test/Conversations.test.ts diff --git a/bindings_node/test/AsyncStream.ts b/bindings_node/test/AsyncStream.ts new file mode 100644 index 000000000..490cf1355 --- /dev/null +++ b/bindings_node/test/AsyncStream.ts @@ -0,0 +1,77 @@ +type Value = V extends undefined ? T : V + +type ResolveValue = { + value: Value | undefined + done: boolean +} + +type ResolveNext = (resolveValue: ResolveValue) => void + +type TransformValue = (value: T) => Value + +export class AsyncStream { + #done = false + #resolveNext: ResolveNext | null + #queue: Value[] + #transformValue?: TransformValue + + stopCallback: (() => void) | undefined = undefined + + constructor( + transformValue: V extends undefined ? undefined : TransformValue + ) { + this.#queue = [] + this.#resolveNext = null + this.#done = false + this.#transformValue = transformValue + } + + callback = (err: Error | null, value: T) => { + if (err) { + console.error('stream error', err) + this.stop() + return + } + + if (this.#done) { + return + } + + const newValue = this.#transformValue + ? this.#transformValue(value) + : // must assert type because TypeScript can't infer that T is assignable + // to Value when this.#transformValue is undefined + (value as unknown as Value) + + if (this.#resolveNext) { + this.#resolveNext({ value: newValue, done: false }) + this.#resolveNext = null + } else { + this.#queue.push(newValue) + } + } + + stop = () => { + this.#done = true + if (this.#resolveNext) { + this.#resolveNext({ value: undefined, done: true }) + } + this.stopCallback?.() + } + + next = (): Promise> => { + if (this.#queue.length > 0) { + return Promise.resolve({ value: this.#queue.shift(), done: false }) + } else if (this.#done) { + return Promise.resolve({ value: undefined, done: true }) + } else { + return new Promise((resolve) => { + this.#resolveNext = resolve + }) + } + }; + + [Symbol.asyncIterator]() { + return this + } +} diff --git a/bindings_node/test/Conversations.test.ts b/bindings_node/test/Conversations.test.ts new file mode 100644 index 000000000..a421394ad --- /dev/null +++ b/bindings_node/test/Conversations.test.ts @@ -0,0 +1,128 @@ +import { encode } from 'punycode' +import { describe, expect, it } from 'vitest' +import { AsyncStream } from '@test/AsyncStream' +import { + createRegisteredClient, + createUser, + encodeTextMessage, +} from '@test/helpers' +import { NapiGroup, NapiMessage } from '../dist' + +describe('Conversations', () => { + it('should not have initial conversations', async () => { + const user = createUser() + const client = await createRegisteredClient(user) + const conversations = client.conversations().list() + expect((await conversations).length).toBe(0) + }) + + it('should create a new group', async () => { + const user1 = createUser() + const user2 = createUser() + const client1 = await createRegisteredClient(user1) + const client2 = await createRegisteredClient(user2) + const group = await client1 + .conversations() + .createGroup([user2.account.address]) + expect(group).toBeDefined() + expect(group.id()).toBeDefined() + expect(group.createdAtNs()).toBeTypeOf('number') + expect(group.isActive()).toBe(true) + expect(group.groupName()).toBe('') + expect(group.addedByInboxId()).toBe(client1.inboxId()) + expect(group.findMessages().length).toBe(1) + const members = group.listMembers() + expect(members.length).toBe(2) + const memberInboxIds = members.map((member) => member.inboxId) + expect(memberInboxIds).toContain(client1.inboxId()) + expect(memberInboxIds).toContain(client2.inboxId()) + expect(group.groupMetadata().conversationType()).toBe('group') + expect(group.groupMetadata().creatorInboxId()).toBe(client1.inboxId()) + + const group1 = await client1.conversations().list() + expect(group1.length).toBe(1) + expect(group1[0].id).toBe(group.id) + + expect((await client2.conversations().list()).length).toBe(0) + + await client2.conversations().sync() + + const group2 = await client2.conversations().list() + expect(group2.length).toBe(1) + expect(group2[0].id).toBe(group.id) + }) + + it('should stream new groups', async () => { + const user1 = createUser() + const user2 = createUser() + const user3 = createUser() + const client1 = await createRegisteredClient(user1) + const client2 = await createRegisteredClient(user2) + const client3 = await createRegisteredClient(user3) + const asyncStream = new AsyncStream(undefined) + const stream = client3.conversations().stream(asyncStream.callback) + const group1 = await client1 + .conversations() + .createGroup([user3.account.address]) + const group2 = await client2 + .conversations() + .createGroup([user3.account.address]) + let count = 0 + for await (const convo of asyncStream) { + count++ + expect(convo).toBeDefined() + if (count === 1) { + expect(convo!.id).toBe(group1.id) + } + if (count === 2) { + expect(convo!.id).toBe(group2.id) + break + } + } + asyncStream.stop() + stream.end() + }) + + it('should stream all messages', async () => { + const user1 = createUser() + const user2 = createUser() + const user3 = createUser() + const client1 = await createRegisteredClient(user1) + const client2 = await createRegisteredClient(user2) + const client3 = await createRegisteredClient(user3) + await client1.conversations().createGroup([user2.account.address]) + await client1.conversations().createGroup([user3.account.address]) + + const asyncStream = new AsyncStream(undefined) + const stream = client1 + .conversations() + .streamAllMessages(asyncStream.callback) + + const groups2 = client2.conversations() + await groups2.sync() + const groupsList2 = await groups2.list() + + const groups3 = client3.conversations() + await groups3.sync() + const groupsList3 = await groups3.list() + + await groupsList2[0].send(encodeTextMessage('gm!')) + await groupsList3[0].send(encodeTextMessage('gm2!')) + + let count = 0 + + for await (const message of asyncStream) { + count++ + expect(message).toBeDefined() + if (count === 1) { + expect(message!.senderInboxId).toBe(client2.inboxId()) + } + if (count === 2) { + expect(message!.senderInboxId).toBe(client3.inboxId()) + break + } + } + asyncStream.stop() + stream.end() + }) +}) diff --git a/bindings_node/test/helpers.ts b/bindings_node/test/helpers.ts index 3b00b2c19..e9336a844 100644 --- a/bindings_node/test/helpers.ts +++ b/bindings_node/test/helpers.ts @@ -50,3 +50,18 @@ export const createRegisteredClient = async (user: User) => { } return client } + +export const encodeTextMessage = (text: string) => { + return { + type: { + authorityId: 'xmtp.org', + typeId: 'text', + versionMajor: 1, + versionMinor: 0, + }, + parameters: { + encoding: 'UTF-8', + }, + content: new TextEncoder().encode(text), + } +} From 393b4e8859330fbfa0d6ed340f6260b98676c47d Mon Sep 17 00:00:00 2001 From: Ry Racherbaumer Date: Wed, 12 Jun 2024 11:42:57 -0500 Subject: [PATCH 3/3] Add release --- bindings_node/CHANGELOG.md | 4 ++++ bindings_node/package.json | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/bindings_node/CHANGELOG.md b/bindings_node/CHANGELOG.md index 3dd373e7d..07a32c8cf 100644 --- a/bindings_node/CHANGELOG.md +++ b/bindings_node/CHANGELOG.md @@ -1,5 +1,9 @@ # @xmtp/mls-client-bindings-node +## 0.0.4 + +- Added `stream_all_messages` + ## 0.0.3 - Fixed default export value diff --git a/bindings_node/package.json b/bindings_node/package.json index c5b0af282..5e8a9e53a 100644 --- a/bindings_node/package.json +++ b/bindings_node/package.json @@ -1,6 +1,6 @@ { "name": "@xmtp/mls-client-bindings-node", - "version": "0.0.3", + "version": "0.0.4", "repository": { "type": "git", "url": "git+https://git@github.com/xmtp/libxmtp.git",