Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream_all_messages to node bindings #838

Merged
merged 3 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bindings_node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# @xmtp/mls-client-bindings-node

## 0.0.4

- Added `stream_all_messages`

## 0.0.3

- Fixed default export value
Expand Down
2 changes: 1 addition & 1 deletion bindings_node/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@xmtp/mls-client-bindings-node",
"version": "0.0.3",
"version": "0.0.4",
"repository": {
"type": "git",
"url": "git+https://[email protected]/xmtp/libxmtp.git",
Expand Down
38 changes: 18 additions & 20 deletions bindings_node/src/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFun
use napi::JsFunction;
use napi_derive::napi;

// use crate::messages::NapiMessage;
use crate::messages::NapiMessage;
use crate::{
groups::{GroupPermissions, NapiGroup},
mls_client::RustXmtpClient,
Expand Down Expand Up @@ -147,23 +147,21 @@ impl NapiConversations {
))
}

// TODO: this fn needs to be sync for it to work with NAPI
// #[napi(ts_args_type = "callback: (err: null | Error, result: NapiGroup) => void")]
// pub async fn stream_all_messages(&self, callback: JsFunction) -> Result<NapiStreamCloser> {
// let tsfn: ThreadsafeFunction<NapiMessage, ErrorStrategy::CalleeHandled> =
// callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
// let stream_closer = RustXmtpClient::stream_all_messages_with_callback(
// self.inner_client.clone(),
// move |message| {
// tsfn.call(Ok(message.into()), ThreadsafeFunctionCallMode::Blocking);
// },
// )
// .await
// .map_err(|e| Error::from_reason(format!("{}", e)))?;

// Ok(NapiStreamCloser::new(
// stream_closer.close_fn,
// stream_closer.is_closed_atomic,
// ))
// }
#[napi(ts_args_type = "callback: (err: null | Error, result: NapiMessage) => void")]
pub fn stream_all_messages(&self, callback: JsFunction) -> Result<NapiStreamCloser> {
let tsfn: ThreadsafeFunction<NapiMessage, ErrorStrategy::CalleeHandled> =
callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
let stream_closer = RustXmtpClient::stream_all_messages_with_callback_sync(
self.inner_client.clone(),
move |message| {
tsfn.call(Ok(message.into()), ThreadsafeFunctionCallMode::Blocking);
},
)
.map_err(|e| Error::from_reason(format!("{}", e)))?;

Ok(NapiStreamCloser::new(
stream_closer.close_fn,
stream_closer.is_closed_atomic,
))
}
}
77 changes: 77 additions & 0 deletions bindings_node/test/AsyncStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
type Value<T, V> = V extends undefined ? T : V

type ResolveValue<T, V> = {
value: Value<T, V> | undefined
done: boolean
}

type ResolveNext<T, V> = (resolveValue: ResolveValue<T, V>) => void

type TransformValue<T, V> = (value: T) => Value<T, V>

export class AsyncStream<T, V = undefined> {
#done = false
#resolveNext: ResolveNext<T, V> | null
#queue: Value<T, V>[]
#transformValue?: TransformValue<T, V>

stopCallback: (() => void) | undefined = undefined

constructor(
transformValue: V extends undefined ? undefined : TransformValue<T, V>
) {
this.#queue = []
this.#resolveNext = null
this.#done = false
this.#transformValue = transformValue
}

callback = (err: Error | null, value: T) => {
if (err) {
console.error('stream error', err)
this.stop()
return
}

if (this.#done) {
return
}

const newValue = this.#transformValue
? this.#transformValue(value)
: // must assert type because TypeScript can't infer that T is assignable
// to Value<T, V> when this.#transformValue is undefined
(value as unknown as Value<T, V>)

if (this.#resolveNext) {
this.#resolveNext({ value: newValue, done: false })
this.#resolveNext = null
} else {
this.#queue.push(newValue)
}
}

stop = () => {
this.#done = true
if (this.#resolveNext) {
this.#resolveNext({ value: undefined, done: true })
}
this.stopCallback?.()
}

next = (): Promise<ResolveValue<T, V>> => {
if (this.#queue.length > 0) {
return Promise.resolve({ value: this.#queue.shift(), done: false })
} else if (this.#done) {
return Promise.resolve({ value: undefined, done: true })
} else {
return new Promise((resolve) => {
this.#resolveNext = resolve
})
}
};

[Symbol.asyncIterator]() {
return this
}
}
128 changes: 128 additions & 0 deletions bindings_node/test/Conversations.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { encode } from 'punycode'
import { describe, expect, it } from 'vitest'
import { AsyncStream } from '@test/AsyncStream'
import {
createRegisteredClient,
createUser,
encodeTextMessage,
} from '@test/helpers'
import { NapiGroup, NapiMessage } from '../dist'

describe('Conversations', () => {
it('should not have initial conversations', async () => {
const user = createUser()
const client = await createRegisteredClient(user)
const conversations = client.conversations().list()
expect((await conversations).length).toBe(0)
})

it('should create a new group', async () => {
const user1 = createUser()
const user2 = createUser()
const client1 = await createRegisteredClient(user1)
const client2 = await createRegisteredClient(user2)
const group = await client1
.conversations()
.createGroup([user2.account.address])
expect(group).toBeDefined()
expect(group.id()).toBeDefined()
expect(group.createdAtNs()).toBeTypeOf('number')
expect(group.isActive()).toBe(true)
expect(group.groupName()).toBe('')
expect(group.addedByInboxId()).toBe(client1.inboxId())
expect(group.findMessages().length).toBe(1)
const members = group.listMembers()
expect(members.length).toBe(2)
const memberInboxIds = members.map((member) => member.inboxId)
expect(memberInboxIds).toContain(client1.inboxId())
expect(memberInboxIds).toContain(client2.inboxId())
expect(group.groupMetadata().conversationType()).toBe('group')
expect(group.groupMetadata().creatorInboxId()).toBe(client1.inboxId())

const group1 = await client1.conversations().list()
expect(group1.length).toBe(1)
expect(group1[0].id).toBe(group.id)

expect((await client2.conversations().list()).length).toBe(0)

await client2.conversations().sync()

const group2 = await client2.conversations().list()
expect(group2.length).toBe(1)
expect(group2[0].id).toBe(group.id)
})

it('should stream new groups', async () => {
const user1 = createUser()
const user2 = createUser()
const user3 = createUser()
const client1 = await createRegisteredClient(user1)
const client2 = await createRegisteredClient(user2)
const client3 = await createRegisteredClient(user3)
const asyncStream = new AsyncStream<NapiGroup>(undefined)
const stream = client3.conversations().stream(asyncStream.callback)
const group1 = await client1
.conversations()
.createGroup([user3.account.address])
const group2 = await client2
.conversations()
.createGroup([user3.account.address])
let count = 0
for await (const convo of asyncStream) {
count++
expect(convo).toBeDefined()
if (count === 1) {
expect(convo!.id).toBe(group1.id)
}
if (count === 2) {
expect(convo!.id).toBe(group2.id)
break
}
}
asyncStream.stop()
stream.end()
})

it('should stream all messages', async () => {
const user1 = createUser()
const user2 = createUser()
const user3 = createUser()
const client1 = await createRegisteredClient(user1)
const client2 = await createRegisteredClient(user2)
const client3 = await createRegisteredClient(user3)
await client1.conversations().createGroup([user2.account.address])
await client1.conversations().createGroup([user3.account.address])

const asyncStream = new AsyncStream<NapiMessage>(undefined)
const stream = client1
.conversations()
.streamAllMessages(asyncStream.callback)

const groups2 = client2.conversations()
await groups2.sync()
const groupsList2 = await groups2.list()

const groups3 = client3.conversations()
await groups3.sync()
const groupsList3 = await groups3.list()

await groupsList2[0].send(encodeTextMessage('gm!'))
await groupsList3[0].send(encodeTextMessage('gm2!'))

let count = 0

for await (const message of asyncStream) {
count++
expect(message).toBeDefined()
if (count === 1) {
expect(message!.senderInboxId).toBe(client2.inboxId())
}
if (count === 2) {
expect(message!.senderInboxId).toBe(client3.inboxId())
break
}
}
asyncStream.stop()
stream.end()
})
})
15 changes: 15 additions & 0 deletions bindings_node/test/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,18 @@ export const createRegisteredClient = async (user: User) => {
}
return client
}

export const encodeTextMessage = (text: string) => {
return {
type: {
authorityId: 'xmtp.org',
typeId: 'text',
versionMajor: 1,
versionMinor: 0,
},
parameters: {
encoding: 'UTF-8',
},
content: new TextEncoder().encode(text),
}
}
Loading