Skip to content

Commit

Permalink
feat(ui): Implement room subscriptions in RoomList + notification c…
Browse files Browse the repository at this point in the history
…ounts

feat(ui): Implement room subscriptions in `RoomList` + notification counts
  • Loading branch information
Hywan authored Jun 15, 2023
2 parents 6b45749 + c07bb0e commit 7756b6d
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 68 deletions.
39 changes: 25 additions & 14 deletions bindings/matrix-sdk-ffi/src/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use tracing::{error, info};
use super::RUNTIME;
use crate::{
error::{ClientError, RoomError},
AudioInfo, FileInfo, ImageInfo, RoomMember, ThumbnailInfo, TimelineDiff, TimelineItem,
TimelineListener, VideoInfo,
AudioInfo, FileInfo, ImageInfo, RoomMember, TaskHandle, ThumbnailInfo, TimelineDiff,
TimelineItem, TimelineListener, VideoInfo,
};

#[derive(uniffi::Enum)]
Expand Down Expand Up @@ -223,7 +223,7 @@ impl Room {
pub fn add_timeline_listener(
&self,
listener: Box<dyn TimelineListener>,
) -> Vec<Arc<TimelineItem>> {
) -> RoomTimelineListenerResult {
let timeline = self
.timeline
.write()
Expand All @@ -240,19 +240,24 @@ impl Room {
let (timeline_items, timeline_stream) = timeline.subscribe().await;

let listener: Arc<dyn TimelineListener> = listener.into();
RUNTIME.spawn(timeline_stream.for_each(move |diff| {
let listener = listener.clone();
let fut = RUNTIME
.spawn_blocking(move || listener.on_update(Arc::new(TimelineDiff::new(diff))));

async move {
if let Err(e) = fut.await {
error!("Timeline listener error: {e}");
let timeline_stream =
TaskHandle::new(RUNTIME.spawn(timeline_stream.for_each(move |diff| {
let listener = listener.clone();
let fut = RUNTIME.spawn_blocking(move || {
listener.on_update(Arc::new(TimelineDiff::new(diff)))
});

async move {
if let Err(e) = fut.await {
error!("Timeline listener error: {e}");
}
}
}
}));
})));

timeline_items.into_iter().map(TimelineItem::from_arc).collect()
RoomTimelineListenerResult {
items: timeline_items.into_iter().map(TimelineItem::from_arc).collect(),
items_stream: Arc::new(timeline_stream),
}
})
}

Expand Down Expand Up @@ -841,6 +846,12 @@ impl Room {
}
}

#[derive(uniffi::Record)]
pub struct RoomTimelineListenerResult {
pub items: Vec<Arc<TimelineItem>>,
pub items_stream: Arc<TaskHandle>,
}

pub enum PaginationOptions {
SingleRequest { event_limit: u16, wait_for_token: bool },
UntilNumItems { event_limit: u16, items: u16, wait_for_token: bool },
Expand Down
48 changes: 26 additions & 22 deletions bindings/matrix-sdk-ffi/src/room_list.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::{fmt::Debug, sync::Arc};
use std::{
fmt::Debug,
sync::{Arc, RwLock},
};

use eyeball_im::VectorDiff;
use futures_util::{pin_mut, StreamExt};
use ruma::RoomId;

use crate::{
Client, EventTimelineItem, Room, RoomListEntry, TaskHandle, TimelineDiff, TimelineItem,
TimelineListener, RUNTIME,
Client, EventTimelineItem, Room, RoomListEntry, RoomSubscription, TaskHandle,
UnreadNotificationsCount, RUNTIME,
};

#[uniffi::export]
Expand Down Expand Up @@ -194,39 +197,40 @@ pub struct RoomListItem {

#[uniffi::export]
impl RoomListItem {
fn id(&self) -> String {
self.inner.id().to_string()
}

fn name(&self) -> Option<String> {
RUNTIME.block_on(async { self.inner.name().await })
}

fn full_room(&self) -> Arc<Room> {
Arc::new(Room::new(self.inner.inner_room().clone()))
Arc::new(Room::with_timeline(
self.inner.inner_room().clone(),
Arc::new(RwLock::new(Some(RUNTIME.block_on(async { self.inner.timeline().await })))),
))
}

async fn timeline(&self, listener: Box<dyn TimelineListener>) -> RoomListItemTimelineResult {
let timeline = self.inner.timeline().await;
let (items, items_stream) = timeline.subscribe().await;

RoomListItemTimelineResult {
items: items.into_iter().map(TimelineItem::from_arc).collect(),
items_stream: Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
pin_mut!(items_stream);
fn subscribe(&self, settings: Option<RoomSubscription>) {
self.inner.subscribe(settings.map(Into::into));
}

while let Some(diff) = items_stream.next().await {
listener.on_update(Arc::new(TimelineDiff::new(diff)))
}
}))),
}
fn unsubscribe(&self) {
self.inner.unsubscribe();
}

fn latest_event(&self) -> Option<Arc<EventTimelineItem>> {
RUNTIME.block_on(async {
self.inner.latest_event().await.map(EventTimelineItem).map(Arc::new)
})
}
}

#[derive(uniffi::Record)]
pub struct RoomListItemTimelineResult {
pub items: Vec<Arc<TimelineItem>>,
pub items_stream: Arc<TaskHandle>,
fn has_unread_notifications(&self) -> bool {
self.inner.has_unread_notifications()
}

fn unread_notifications(&self) -> Arc<UnreadNotificationsCount> {
Arc::new(self.inner.unread_notifications().into())
}
}
7 changes: 4 additions & 3 deletions crates/matrix-sdk-ui/src/room_list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
mod room;
mod state;

use std::future::ready;
use std::{future::ready, sync::Arc};

use async_stream::stream;
use eyeball::{shared::Observable, Subscriber};
Expand All @@ -86,7 +86,7 @@ use thiserror::Error;
/// The [`RoomList`] type. See the module's documentation to learn more.
#[derive(Debug)]
pub struct RoomList {
sliding_sync: SlidingSync,
sliding_sync: Arc<SlidingSync>,
state: Observable<State>,
}

Expand Down Expand Up @@ -119,6 +119,7 @@ impl RoomList {
)
.build()
.await
.map(Arc::new)
.map_err(Error::SlidingSync)?;

Ok(Self { sliding_sync, state: Observable::new(State::Init) })
Expand Down Expand Up @@ -243,7 +244,7 @@ impl RoomList {
/// Get a [`Room`] if it exists.
pub async fn room(&self, room_id: &RoomId) -> Result<Room, Error> {
match self.sliding_sync.get_room(room_id).await {
Some(room) => Room::new(room).await,
Some(room) => Room::new(self.sliding_sync.clone(), room).await,
None => Err(Error::RoomNotFound(room_id.to_owned())),
}
}
Expand Down
86 changes: 66 additions & 20 deletions crates/matrix-sdk-ui/src/room_list/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
use std::sync::Arc;

use async_once_cell::OnceCell as AsyncOnceCell;
use matrix_sdk::SlidingSyncRoom;
use matrix_sdk::{SlidingSync, SlidingSyncRoom};
use ruma::{
api::client::sync::sync_events::{v4::RoomSubscription, UnreadNotificationsCount},
RoomId,
};

use super::Error;
use crate::{timeline::EventTimelineItem, Timeline};
Expand All @@ -18,30 +22,37 @@ pub struct Room {

#[derive(Debug)]
struct RoomInner {
/// The Sliding Sync where everything comes from.
sliding_sync: Arc<SlidingSync>,

/// The Sliding Sync room.
sliding_sync_room: SlidingSyncRoom,

/// The underlying client room.
room: matrix_sdk::room::Room,

/// The timeline of the room.
timeline: AsyncOnceCell<Timeline>,
timeline: AsyncOnceCell<Arc<Timeline>>,

/// The “sneaky” timeline of the room, i.e. this timeline doesn't track the
/// read marker nor the receipts.
sneaky_timeline: AsyncOnceCell<Timeline>,
sneaky_timeline: AsyncOnceCell<Arc<Timeline>>,
}

impl Room {
/// Create a new `Room`.
pub(super) async fn new(sliding_sync_room: SlidingSyncRoom) -> Result<Self, Error> {
pub(super) async fn new(
sliding_sync: Arc<SlidingSync>,
sliding_sync_room: SlidingSyncRoom,
) -> Result<Self, Error> {
let room = sliding_sync_room
.client()
.get_room(sliding_sync_room.room_id())
.ok_or_else(|| Error::RoomNotFound(sliding_sync_room.room_id().to_owned()))?;

Ok(Self {
inner: Arc::new(RoomInner {
sliding_sync,
sliding_sync_room,
room,
timeline: AsyncOnceCell::new(),
Expand All @@ -50,6 +61,11 @@ impl Room {
})
}

/// Get the room ID.
pub fn id(&self) -> &RoomId {
self.inner.room.room_id()
}

/// Get the best possible name for the room.
///
/// If the sliding sync room has received a name from the server, then use
Expand All @@ -66,21 +82,39 @@ impl Room {
&self.inner.room
}

/// Subscribe to this room.
///
/// It means that all events from this room will be received everytime, no
/// matter how the `RoomList` is configured.
pub fn subscribe(&self, settings: Option<RoomSubscription>) {
self.inner.sliding_sync.subscribe_to_room(self.inner.room.room_id().to_owned(), settings)
}

/// Unsubscribe to this room.
///
/// It's the opposite method of [Self::subscribe`].
pub fn unsubscribe(&self) {
self.inner.sliding_sync.unsubscribe_from_room(self.inner.room.room_id().to_owned())
}

/// Get the timeline of the room.
pub async fn timeline(&self) -> &Timeline {
pub async fn timeline(&self) -> Arc<Timeline> {
self.inner
.timeline
.get_or_init(async {
Timeline::builder(&self.inner.room)
.events(
self.inner.sliding_sync_room.prev_batch(),
self.inner.sliding_sync_room.timeline_queue(),
)
.track_read_marker_and_receipts()
.build()
.await
Arc::new(
Timeline::builder(&self.inner.room)
.events(
self.inner.sliding_sync_room.prev_batch(),
self.inner.sliding_sync_room.timeline_queue(),
)
.track_read_marker_and_receipts()
.build()
.await,
)
})
.await
.clone()
}

/// Get the latest event of the timeline.
Expand All @@ -91,16 +125,28 @@ impl Room {
self.inner
.sneaky_timeline
.get_or_init(async {
Timeline::builder(&self.inner.room)
.events(
self.inner.sliding_sync_room.prev_batch(),
self.inner.sliding_sync_room.timeline_queue(),
)
.build()
.await
Arc::new(
Timeline::builder(&self.inner.room)
.events(
self.inner.sliding_sync_room.prev_batch(),
self.inner.sliding_sync_room.timeline_queue(),
)
.build()
.await,
)
})
.await
.latest_event()
.await
}

/// Is there any unread notifications?
pub fn has_unread_notifications(&self) -> bool {
self.inner.sliding_sync_room.has_unread_notifications()
}

/// Get unread notifications.
pub fn unread_notifications(&self) -> UnreadNotificationsCount {
self.inner.sliding_sync_room.unread_notifications()
}
}
Loading

0 comments on commit 7756b6d

Please sign in to comment.