Skip to content

Commit

Permalink
Add event_processing queue, tables to track processed events, and fin…
Browse files Browse the repository at this point in the history
…alize sqlite transition. Still having issues with receiving messages.
  • Loading branch information
erskingardner committed Jan 24, 2025
1 parent 4c0061a commit c8c5733
Show file tree
Hide file tree
Showing 27 changed files with 1,193 additions and 651 deletions.
44 changes: 34 additions & 10 deletions src-tauri/db_migrations/0001_initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ CREATE TABLE invites (
member_count INTEGER NOT NULL,
state TEXT NOT NULL,
outer_event_id TEXT, -- the event_id of the 1059 event that contained the invite
processed BOOLEAN NOT NULL DEFAULT FALSE,

FOREIGN KEY (account_pubkey) REFERENCES accounts(pubkey) ON DELETE CASCADE
);

Expand All @@ -94,6 +94,19 @@ CREATE INDEX idx_invites_account ON invites(account_pubkey);
CREATE INDEX idx_invites_outer_event_id ON invites(outer_event_id);
CREATE INDEX idx_invites_event_id ON invites(event_id);

CREATE TABLE processed_invites (
event_id TEXT PRIMARY KEY, -- This is the outer event id of the 1059 gift wrap event
invite_event_id TEXT, -- This is the event id of the 444 invite event
account_pubkey TEXT NOT NULL, -- This is the pubkey of the account that processed the invite
processed_at INTEGER NOT NULL, -- This is the timestamp of when the invite was processed
state TEXT NOT NULL, -- This is the state of the invite processing
failure_reason TEXT, -- This is the reason the invite failed to process

FOREIGN KEY (account_pubkey) REFERENCES accounts(pubkey) ON DELETE CASCADE
);

CREATE INDEX idx_processed_invites_invite_event_id ON processed_invites(invite_event_id);

-- Messages table with full-text search
CREATE TABLE messages (
event_id TEXT PRIMARY KEY,
Expand All @@ -105,35 +118,46 @@ CREATE TABLE messages (
tags TEXT, -- JSON array of nostr tags
event TEXT NOT NULL, -- JSON string for UnsignedEvent
outer_event_id TEXT NOT NULL, -- the event_id of the 445 event
processed BOOLEAN DEFAULT FALSE,
FOREIGN KEY (mls_group_id) REFERENCES groups(mls_group_id) ON DELETE CASCADE,
FOREIGN KEY (mls_group_id, account_pubkey) REFERENCES groups(mls_group_id, account_pubkey) ON DELETE CASCADE,
FOREIGN KEY (account_pubkey) REFERENCES accounts(pubkey) ON DELETE CASCADE
);

CREATE INDEX idx_messages_group_time ON messages(mls_group_id, created_at);
CREATE INDEX idx_messages_account_time ON messages(account_pubkey, created_at);
CREATE INDEX idx_messages_author_time ON messages(author_pubkey, created_at);
CREATE INDEX idx_messages_processing ON messages(processed, mls_group_id);
CREATE INDEX idx_messages_outer_event_id ON messages(outer_event_id);
CREATE INDEX idx_messages_event_id ON messages(event_id);

CREATE TABLE processed_messages (
event_id TEXT PRIMARY KEY, -- This is the outer event id of the 445 event
message_event_id TEXT, -- This is the inner UnsignedEvent's id. This is the id of the events stored in the messages table.
account_pubkey TEXT NOT NULL, -- This is the pubkey of the account that processed the message
processed_at INTEGER NOT NULL, -- This is the timestamp of when the message was processed
state TEXT NOT NULL, -- This is the state of the message processing
failure_reason TEXT, -- This is the reason the message failed to process

FOREIGN KEY (account_pubkey) REFERENCES accounts(pubkey) ON DELETE CASCADE
);

CREATE INDEX idx_processed_messages_message_event_id ON processed_messages(message_event_id);

-- Full-text search for messages
CREATE VIRTUAL TABLE messages_fts USING fts5(
content,
content='messages',
content_rowid='event_id'
event_id UNINDEXED, -- Add event_id as an unindexed column
content='messages'
);

-- FTS triggers
CREATE TRIGGER messages_ai AFTER INSERT ON messages BEGIN
INSERT INTO messages_fts(rowid, content) VALUES (new.event_id, new.content);
INSERT INTO messages_fts(content, event_id) VALUES (new.content, new.event_id);
END;

CREATE TRIGGER messages_ad AFTER DELETE ON messages BEGIN
INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', old.event_id, old.content);
INSERT INTO messages_fts(messages_fts, content, event_id) VALUES('delete', old.content, old.event_id);
END;

CREATE TRIGGER messages_au AFTER UPDATE ON messages BEGIN
INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', old.event_id, old.content);
INSERT INTO messages_fts(rowid, content) VALUES (new.event_id, new.content);
INSERT INTO messages_fts(messages_fts, content, event_id) VALUES('delete', old.content, old.event_id);
INSERT INTO messages_fts(content, event_id) VALUES (new.content, new.event_id);
END;
9 changes: 3 additions & 6 deletions src-tauri/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ pub enum AccountError {
#[error("Tauri error: {0}")]
TauriError(#[from] tauri::Error),

#[error("Failed to acquire lock")]
LockError,

#[error("No active account found")]
NoActiveAccount,

Expand Down Expand Up @@ -378,7 +375,7 @@ impl Account {

// Then update Nostr MLS instance
{
let mut nostr_mls = wn.nostr_mls.lock().map_err(|_| AccountError::LockError)?;
let mut nostr_mls = wn.nostr_mls.lock().await;
*nostr_mls = NostrMls::new(wn.data_dir.clone(), Some(self.pubkey.to_hex()));
}

Expand Down Expand Up @@ -428,6 +425,7 @@ impl Account {
}

/// Returns the invites the account has received
#[allow(dead_code)]
pub async fn invites(&self, wn: tauri::State<'_, Whitenoise>) -> Result<Vec<Invite>> {
let mut txn = wn.database.pool.begin().await?;

Expand All @@ -454,7 +452,6 @@ impl Account {
member_count: row.member_count,
state: row.state.into(),
outer_event_id: row.outer_event_id,
processed: row.processed,
})
})
.collect::<Result<Vec<_>>>()
Expand Down Expand Up @@ -631,7 +628,7 @@ impl Account {

// Then update Nostr MLS instance
{
let mut nostr_mls = wn.nostr_mls.lock().map_err(|_| AccountError::LockError)?;
let mut nostr_mls = wn.nostr_mls.lock().await;
*nostr_mls = NostrMls::new(wn.data_dir.clone(), Some(hex_pubkey));
}

Expand Down
201 changes: 4 additions & 197 deletions src-tauri/src/commands/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use crate::groups::{Group, GroupType};
use crate::key_packages::fetch_key_packages_for_members;
use crate::secrets_store;
use crate::whitenoise::Whitenoise;
use nostr_openmls::groups::GroupError;
use nostr_sdk::prelude::*;
use std::collections::HashMap;
use std::ops::Add;
use tauri::Emitter;

Expand Down Expand Up @@ -181,11 +179,11 @@ pub async fn create_group(
);

// TODO: Add ability to specify relays for the group
let group_relays = wn.nostr.relays().map_err(|e| e.to_string())?;
let group_relays = wn.nostr.relays().await.unwrap();

let create_group_result;
{
let nostr_mls = wn.nostr_mls.lock().expect("Failed to lock nostr_mls");
let nostr_mls = wn.nostr_mls.lock().await;

create_group_result = nostr_mls
.create_group(
Expand Down Expand Up @@ -421,7 +419,7 @@ pub async fn send_mls_message(
let export_secret_hex;
let epoch;
{
let nostr_mls = wn.nostr_mls.lock().unwrap();
let nostr_mls = wn.nostr_mls.lock().await;
serialized_message = nostr_mls
.create_message_for_group(group.mls_group_id.clone(), json_event_string)
.map_err(|e| e.to_string())?;
Expand Down Expand Up @@ -490,197 +488,6 @@ pub async fn send_mls_message(
Ok(inner_event)
}

// TODO: Make this use last synced so we don't fetch things we don't need repeatedly.
// TODO: Maybe split this into a method to handle groups individually?
#[tauri::command]
pub async fn fetch_mls_messages(
wn: tauri::State<'_, Whitenoise>,
app_handle: tauri::AppHandle,
) -> Result<(), String> {
let group_ids: Vec<String> = Group::get_all_groups(wn.clone())
.await
.map_err(|e| e.to_string())?
.iter()
.map(|group| group.nostr_group_id.clone())
.collect();

let message_events = wn
.nostr
.query_mls_group_messages(group_ids)
.await
.map_err(|e| e.to_string())?;

// Filter the events to only include ones we haven't processed yet
let processed_message_ids = sqlx::query_scalar::<_, String>(
"SELECT outer_event_id FROM messages WHERE processed = true",
)
.fetch_all(&wn.database.pool)
.await
.map_err(|e| e.to_string())?;

let unprocessed_messages = message_events
.into_iter()
.filter(|event| !processed_message_ids.contains(&event.id.to_string()));

let grouped_messages = unprocessed_messages
.into_iter()
.filter_map(|event| {
event
.tags
.iter()
.find(|tag| tag.kind() == TagKind::h())
.and_then(|tag| {
tag.content()
.map(|group_id| (group_id.to_string(), event.clone()))
})
})
.fold(
HashMap::new(),
|mut acc: HashMap<String, Vec<Event>>, (group_id, event)| {
acc.entry(group_id).or_default().push(event);
acc
},
);

for (group_id, events) in grouped_messages {
let group = Group::get_by_nostr_group_id(group_id.as_str(), wn.clone())
.await
.map_err(|e| e.to_string())?;

// Sort the events by created_at (and then lexigraphically by ID)
let mut sorted_events = events.into_iter().collect::<Vec<_>>();
sorted_events.sort_by(|a, b| a.created_at.cmp(&b.created_at).then(a.id.cmp(&b.id)));

for event in sorted_events {
tracing::debug!(
target: "whitenoise::commands::groups::fetch_mls_messages",
"Processing event: {:?}",
event.id
);

let nostr_keys = match secrets_store::get_export_secret_keys_for_group(
group.mls_group_id.clone(),
group.epoch,
wn.data_dir.as_path(),
) {
Ok(keys) => keys,
Err(_) => {
tracing::debug!(
target: "whitenoise::commands::groups::fetch_mls_messages",
"No export secret keys found, fetching from nostr_openmls",
);
// We need to get the export secret for the group from nostr_openmls
let nostr_mls = wn.nostr_mls.lock().unwrap();
let (export_secret_hex, epoch) = nostr_mls
.export_secret_as_hex_secret_key_and_epoch(group.mls_group_id.clone())
.map_err(|e| e.to_string())?;

// Store the export secret key in the secrets store
secrets_store::store_mls_export_secret(
group.mls_group_id.clone(),
epoch,
export_secret_hex.clone(),
wn.data_dir.as_path(),
)
.map_err(|e| e.to_string())?;

Keys::parse(&export_secret_hex).map_err(|e| e.to_string())?
}
};

// Decrypt events using export secret key
let decrypted_content = nip44::decrypt_to_bytes(
nostr_keys.secret_key(),
&nostr_keys.public_key(),
&event.content,
)
.map_err(|e| format!("Error decrypting message: {}", e))?;

let message_vec;
{
let nostr_mls = wn.nostr_mls.lock().unwrap();

match nostr_mls.process_message_for_group(
group.mls_group_id.clone(),
decrypted_content.clone(),
) {
Ok(message) => message_vec = message,
Err(e) => {
match e {
GroupError::ProcessMessageError(e) => {
if !e.to_string().contains("Cannot decrypt own messages") {
tracing::error!(
target: "whitenoise::commands::groups::fetch_mls_messages",
"Error processing message for group: {}",
e
);
}
}
_ => {
tracing::error!(
target: "whitenoise::commands::groups::fetch_mls_messages",
"UNRECOGNIZED ERROR processing message for group: {}",
e
);
}
}
continue;
}
}
}

// This processes an application message into JSON.
match serde_json::from_slice::<serde_json::Value>(&message_vec) {
Ok(json_value) => {
tracing::debug!(
target: "whitenoise::commands::groups::fetch_mls_messages",
"Deserialized JSON message: {}",
json_value
);
let json_str = json_value.to_string();
let json_event = UnsignedEvent::from_json(&json_str).unwrap();

if !group
.members(wn.clone())
.unwrap()
.contains(&json_event.pubkey)
{
tracing::error!(
target: "whitenoise::commands::groups::fetch_mls_messages",
"Message from non-member: {:?}",
json_event.pubkey
);
continue;
}

group
.add_message(event.id.to_string(), json_event.clone(), wn.clone())
.await
.map_err(|e| e.to_string())?;

app_handle
.emit("mls_message_processed", (group.clone(), json_event.clone()))
.expect("Couldn't emit event");
}
Err(e) => {
tracing::error!(
target: "whitenoise::commands::groups::fetch_mls_messages",
"Failed to deserialize message into JSON: {}",
e
);
}
}
// TODO: Handle Proposal
// TODO: Handle Commit
// TODO: Handle External Join
}

// emit events to let the front end know
}

Ok(())
}

/// Gets the list of members in an MLS group
///
/// # Arguments
Expand All @@ -706,7 +513,7 @@ pub async fn get_group_members(
let group = Group::find_by_mls_group_id(&mls_group_id, wn.clone())
.await
.map_err(|e| format!("Error fetching group: {}", e))?;
let members = group.members(wn.clone()).map_err(|e| e.to_string())?;
let members = group.members(wn.clone()).await.map_err(|e| e.to_string())?;
Ok(members)
}

Expand Down
Loading

0 comments on commit c8c5733

Please sign in to comment.