diff --git a/Cargo.toml b/Cargo.toml index 04e2e31..f7684d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,27 @@ [package] name = "bevy_irc" -version = "0.1.0" +version = "0.2.0" edition = "2021" [features] +default = [] +tls-native = ["irc/tls-native"] +tls-rust = ["irc/tls-rust"] twitch = ["thiserror"] [dependencies] async-compat = "0.2" -bevy_app = "0.12" -bevy_ecs = "0.12" -futures-lite = "1.12" -irc = { version = "0.15", default-features = false, features = ["tls-rust"] } -log = "*" +bevy_app = "0.14" +bevy_ecs = "0.14" +bevy_time = "0.14" +bevy_utils = "0.14" +futures-util = { version = "0.3", default-features = false } +irc = { version = "1.0", default-features = false } thiserror = { version = "1.0", optional = true } + +[dev-dependencies] +bevy_log = "0.14" + +[[example]] +name = "twitch" +required-features = ["twitch"] diff --git a/examples/twitch.rs b/examples/twitch.rs new file mode 100644 index 0000000..48ca950 --- /dev/null +++ b/examples/twitch.rs @@ -0,0 +1,19 @@ +use bevy_app::{App, AppExit, ScheduleRunnerPlugin}; +use bevy_irc::prelude::*; +use bevy_log::LogPlugin; + +fn main() -> AppExit { + let mut app = App::new(); + app.add_plugins(( + IRCPlugin, + LogPlugin::default(), + ScheduleRunnerPlugin::default(), + )); + + app.world_mut().spawn(( + Connection::twitch(), + Auth::new("justinfan1234").password(""), + )); + + app.run() +} diff --git a/src/components.rs b/src/components.rs new file mode 100644 index 0000000..c82dd2c --- /dev/null +++ b/src/components.rs @@ -0,0 +1,215 @@ +use std::{ + ops::{Deref, DerefMut}, + sync::Mutex, +}; + +use bevy_ecs::prelude::*; +use bevy_time::Stopwatch; +use bevy_utils::{BoxedFuture, ConditionalSend}; + +use crate::irc_prelude as irc; + +/// Bevy component containing connection info +#[derive(Component, Clone, Debug)] +pub struct Connection { + host: String, + port: u16, +} + +impl Connection { + /// Create a connection component to the given host and port + /// + /// # Example + /// ``` + /// use bevy_irc::prelude::*; + /// + /// let connection = Connection::new("irc.freenode.net", 6667); + /// ``` + pub fn new(host: impl AsRef, port: u16) -> Self { + Self { + host: host.as_ref().to_owned(), + port, + } + } + + /// Create a connection component to the twitch IRC server + /// + /// # Example + /// ``` + /// use bevy_irc::prelude::*; + /// + /// let connection = Connection::twitch(); + /// ``` + #[cfg(feature = "twitch")] + #[must_use] + pub fn twitch() -> Self { + Self { + host: "irc.chat.twitch.tv".to_owned(), + port: 6697, + } + } +} + +impl From<&Connection> for irc::Config { + #[inline] + fn from(con: &Connection) -> Self { + irc::Config { + server: Some(con.host.clone()), + port: Some(con.port), + ping_time: Some(u32::MAX), + ..Default::default() + } + } +} + +/// Bevy component containing the IRC sender +#[derive(Component, Debug)] +pub struct Sender(pub(crate) irc::Sender); + +impl Deref for Sender { + type Target = irc::Sender; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Bevy component containing the IRC authentication info +/// +/// # Example +/// ``` +/// use bevy_irc::prelude::*; +/// +/// let nick_only = Auth::new("nick"); +/// let with_password = Auth::new("nick").password("password"); +/// ``` +#[derive(Component, Debug)] +pub struct Auth { + /// Nickname send using the `NICK` command + pub nick: String, + /// Password sent using the `PASS` command + pub pass: Option, +} + +impl Auth { + /// Create a new authentication component with the given nickname + pub fn new(nickname: impl AsRef) -> Self { + Self { + nick: nickname.as_ref().to_owned(), + pass: None, + } + } + + /// Set the password for the authentication component + #[inline] + #[must_use] + pub fn password(self, password: impl AsRef) -> Self { + Self { + pass: Some(password.as_ref().to_owned()), + ..self + } + } +} + +/// Bevy component containing the channels the client should be in +#[derive(Component, Debug, Default)] +pub struct Channels(pub Vec); + +/// Bevy component containing the capabilities the client should request +/// +/// # Example +/// ``` +/// use bevy_irc::{prelude::*, irc::client::prelude::*}; +/// +/// let capabilities = Capabilities(vec![ +/// Capability::AwayNotify, +/// Capability::ServerTime, +/// ]); +/// ``` +#[derive(Component, Debug)] +pub struct Capabilities(pub Vec); + +/// Bevy component containing the IRC client stream +#[derive(Component, Debug)] +pub struct Stream(pub(crate) irc::ClientStream); + +#[derive(Component)] +pub(crate) struct Connecting(Mutex>>); + +impl Connecting { + #[inline] + pub fn new( + fut: impl std::future::Future> + + ConditionalSend + + 'static, + ) -> Self { + Self(Mutex::new(Box::pin(fut))) + } +} + +impl Deref for Connecting { + type Target = Mutex>>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Connecting { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[derive(Component, Debug)] +pub(crate) struct Registered; + +#[derive(Component, Debug)] +pub(crate) struct Identifying; + +#[derive(Event, Debug, Default)] +pub(crate) struct Pinger { + pub(crate) last_ping: Stopwatch, +} + +/// Bevy Event for incoming IRC messages and commands +#[derive(Event, Debug, Clone)] +pub struct Incoming(pub(crate) T); + +impl Deref for Incoming { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Bevy Event for outgoing IRC messages and commands +#[derive(Event, Debug, Clone)] +pub struct Outgoing(pub(crate) T); + +impl Deref for Outgoing { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Outgoing { + /// Create a new outgoing command event + #[inline] + #[must_use] + pub fn new(command: irc::Command) -> Self { + Self(command) + } +} + +impl Outgoing { + /// Create a new outgoing message event + #[inline] + #[must_use] + pub fn new(message: irc::Message) -> Self { + Self(message) + } +} diff --git a/src/lib.rs b/src/lib.rs index 1bb28b9..c7e0883 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,317 +1,44 @@ #![warn(missing_docs)] -#![allow(clippy::type_complexity)] +#![allow(clippy::type_complexity, clippy::needless_pass_by_value)] //! # TODO: Add documentation -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::sync::OnceLock; - -use async_compat::Compat; -use bevy_ecs::prelude::*; -use futures_lite::{future, StreamExt}; +use bevy_utils::tracing::warn; pub use irc; -use irc::client::prelude::*; -use irc::proto::Capability; -use irc::proto::Command; -use irc::proto::Message; -use log::{error, info, trace}; + +/// Bevy components +pub mod components; +/// Bevy systems +mod systems; +/// Utilities for using the Twitch IRC #[cfg(feature = "twitch")] pub mod twitch; -/// Bevy component containing connection info -#[derive(Component)] -pub struct Connection { - host: String, - port: u16, -} - -impl Connection { - /// Create a connection component to the given host and port - /// - /// # Example - /// ``` - /// use bevy_irc::Connection; - /// - /// let connection = Connection::new("irc.freenode.net", 6667); - /// ``` - pub fn new(host: impl AsRef, port: u16) -> Self { - Self { - host: host.as_ref().to_owned(), - port, - } - } - - /// Create a connection component to the twitch IRC server - /// - /// # Example - /// ``` - /// use bevy_irc::Connection; - /// - /// let connection = Connection::twitch(); - /// ``` - #[cfg(feature = "twitch")] - pub fn twitch() -> Self { - Self { - host: "irc.chat.twitch.tv".to_owned(), - port: 6697, - } - } +mod irc_prelude { + pub use irc::client::prelude::*; + pub use irc::client::ClientStream; + pub use irc::error::Error; + pub use irc::proto::CapSubCommand; } -/// Bevy component containing the connected IRC client -#[derive(Component, Debug)] -pub struct Client(irc::client::Client); - -impl Deref for Client { - type Target = irc::client::Client; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for Client { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Notype for IRC Messages to derive Event -#[derive(Event, Debug)] -pub struct MessageEvent(pub Message); - -impl Deref for MessageEvent { - type Target = Message; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for MessageEvent { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl From for MessageEvent { - fn from(msg: Message) -> Self { - Self(msg) - } -} - -/// Bevy component containing the IRC authentication info -/// -/// # Example -/// ``` -/// use bevy_irc::Authentication; -/// -/// let nick_only = Authentication::new("nick"); -/// let with_password = Authentication::new("nick").password("password"); -/// ``` -#[derive(Component)] -pub struct Authentication { - nick: String, - pass: Option, -} - -impl Authentication { - /// Create a new authentication component with the given nickname - pub fn new(nickname: impl AsRef) -> Self { - Self { - nick: nickname.as_ref().to_owned(), - pass: None, - } - } - - /// Set the password for the authentication component - pub fn password(&mut self, password: impl AsRef) -> &mut Self { - self.pass = Some(password.as_ref().to_owned()); - self - } -} - -/// Bevy component containing the channels the client should be in -#[derive(Component)] -pub struct Channels(pub Vec); - -/// Bevy component containing the capabilities the client should request -/// -/// # Example -/// ``` -/// use bevy_irc::{Capabilities, irc::proto::Capability}; -/// -/// let capabilities = Capabilities(vec![ -/// Capability::AwayNotify, -/// Capability::ServerTime, -/// ]); -/// ``` -#[derive(Component)] -pub struct Capabilities(pub Vec); - -#[derive(Component)] -struct Stream(irc::client::ClientStream); - -type ConnectingFut = Compat< - Pin< - Box< - dyn std::future::Future> - + Sync - + Send, - >, - >, ->; - -#[derive(Component)] -struct Connecting(OnceLock); - -#[derive(Component)] -struct Identified; - -fn connect(mut commands: Commands, chats: Query<(Entity, &Connection), Added>) { - for (chat, con) in &chats { - let config = Config { - server: Some(con.host.clone()), - port: Some(con.port), - ping_time: Some(u32::MAX), - ..Default::default() - }; - let fut = irc::client::Client::from_config(config); - let boxed_fut: Pin + Send + Sync>> = Box::pin(fut); - let fut = Compat::new(boxed_fut); - let cell = OnceLock::new(); - let _ = cell.set(fut); - commands.entity(chat).insert(Connecting(cell)); - } -} - -fn finish_connect(mut commands: Commands, mut chats: Query<(Entity, &mut Connecting)>) { - for (chat, mut connecting) in &mut chats { - let fut = connecting.0.get_mut().unwrap(); - - let Some(res) = future::block_on(future::poll_once(fut)) else { - continue; - }; - commands.entity(chat).remove::(); - - let mut client = match res { - Err(e) => { - error!("Failed to connect: {e:?}"); - continue; - } - Ok(c) => c, - }; - info!("Connected"); - - if let Ok(stream) = client.stream() { - commands.entity(chat).insert(Stream(stream)); - } else { - error!("Failed to get stream"); - } - - commands.entity(chat).insert(Client(client)); - } -} - -fn identify( - mut commands: Commands, - chats: Query<(Entity, &Client, &Authentication), Without>, -) { - for (chat, client, auth) in &chats { - info!("Identifying as {}", auth.nick); - if let Some(pass) = auth.pass.as_ref() { - if let Err(e) = client.send(Command::PASS(pass.clone())) { - error!("Failed to send PASS: {}", e); - continue; - } - } - if let Err(e) = client.send(Command::NICK(auth.nick.clone())) { - error!("Failed to send NICK: {}", e); - continue; - } - commands.entity(chat).insert(Identified); - } -} - -fn join_and_part( - mut chats: Query< - (&Client, &Channels), - (With, Or<(Added, Changed)>), - >, -) { - for (client, channels) in &mut chats { - info!("Joining and parting channels"); - let current = client.list_channels().unwrap_or_default(); - - let to_join = channels.0.iter().filter(|c| !current.contains(c)); - let to_part = current.iter().filter(|c| !channels.0.contains(c)); - - for channel in to_join { - info!("Joining {}", channel); - client - .send(Command::JOIN(channel.to_owned(), None, None)) - .unwrap_or_else(|e| { - error!("Failed to send JOIN {}: {}", channel, e); - }); - } - - for channel in to_part { - info!("Parting {}", channel); - client - .send(Command::PART(channel.to_owned(), None)) - .unwrap_or_else(|e| { - error!("Failed to send PART {}: {}", channel, e); - }); - } - } -} -fn capabilities( - chats: Query< - (&Client, &Capabilities), - ( - With, - Or<(Added, Changed)>, - ), - >, -) { - for (client, caps) in &chats { - info!("Requesting capabilities"); - - client.send_cap_req(&caps.0).unwrap_or_else(|e| { - error!("Failed to request capabilities: {}", e); - }); - } -} - -fn receive(mut writer: EventWriter, mut streams: Query<&mut Stream>) { - for mut stream in &mut streams { - while let Some(resp) = future::block_on(future::poll_once(&mut stream.0.next())).flatten() { - match resp { - Ok(msg) => { - trace!("Received: {:?}", msg.to_string().trim_end()); - writer.send(msg.into()); - } - Err(e) => { - error!("Failed to receive: {}", e); - } - } - } - } +#[allow(missing_docs)] +pub mod prelude { + pub use super::IRCPlugin; + pub use crate::components::*; } /// Bevy plugin to connect and manage IRC connections /// /// # Example /// ``` -/// use bevy_irc::{IRCPlugin, Connection, Authentication, Channels}; +/// use bevy_irc::prelude::*; /// use bevy_app::prelude::*; /// /// let mut app = App::new(); /// -/// let irc = app.world.spawn(( +/// let irc = app.world_mut().spawn(( /// Connection::new("irc.example.com", 6667), -/// Authentication::new("bevy"), +/// Auth::new("bevy"), /// Channels(vec!["#bevy".to_owned()]), /// )); /// @@ -323,13 +50,48 @@ impl bevy_app::Plugin for IRCPlugin { fn build(&self, app: &mut bevy_app::App) { use bevy_app::Update; - app.add_event::(); - app.add_systems(Update, connect); - app.add_systems(Update, finish_connect); - app.add_systems(Update, identify); - app.add_systems(Update, join_and_part); - app.add_systems(Update, capabilities); - app.add_systems(Update, receive); + if !app.is_plugin_added::() { + app.add_plugins(bevy_time::TimePlugin); + } + + app.world_mut() + .observe(systems::send::); + app.world_mut() + .observe(systems::send::); + + app.add_systems( + Update, + ( + systems::connect, + systems::poll_connecting, + systems::identify, + systems::request_capabilities, + systems::join_channels, + systems::poll_stream, + systems::ping, + ), + ); } } +#[cfg(test)] +mod tests { + use super::prelude::*; + #[test] + fn test_connection() { + let mut app = bevy_app::App::new(); + app.add_plugins(bevy_log::LogPlugin::default()); + // app.add_plugins(bevy_app::ScheduleRunnerPlugin::default()); + app.add_plugins(IRCPlugin); + + app.world_mut().spawn(( + Connection::new("irc.example.com", 6667), + Auth::new("bevy"), + Channels(vec!["#bevy".to_owned()]), + )); + + println!("Running app"); + + app.run(); + } +} diff --git a/src/systems.rs b/src/systems.rs new file mode 100644 index 0000000..2e692df --- /dev/null +++ b/src/systems.rs @@ -0,0 +1,193 @@ +#[allow(clippy::wildcard_imports)] +use crate::components::*; +use async_compat::CompatExt; +use bevy_ecs::prelude::*; +use bevy_time::{Real, Time}; +use bevy_utils::{ + futures::check_ready, + tracing::{debug, error, info, trace, warn}, +}; + +use crate::irc_prelude as irc; + +pub fn connect( + mut commands: Commands, + chats: Query< + (Entity, &Connection), + (Without, Or<(Without, Without)>), + >, +) { + for (id, con) in &chats { + let mut entity = commands.entity(id); + let fut = irc::Client::from_config(con.into()); + // let fut = Box::pin(fut); + // let fut = Compat::new(boxed_fut); + let connecting = Connecting::new(fut); + entity.insert((connecting, Pinger::default())); + entity.remove::(); + entity.observe(on_ping); + entity.observe(on_welcome); + } +} + +pub fn poll_connecting(mut commands: Commands, mut chats: Query<(Entity, &mut Connecting)>) { + for (id, mut connecting) in &mut chats { + let mut fut = connecting.get_mut().unwrap().compat(); + + if let Some(res) = check_ready(&mut fut) { + let mut entity = commands.entity(id); + entity.remove::(); + match res { + Ok(mut client) => { + info!(message = "Connected", ?client); + entity.insert(Sender(client.sender())); + entity.insert(Stream(client.stream().unwrap())); + } + Err(e) => { + error!(message = "Failed to connect", error=%e); + continue; + } + } + } + } +} + +pub fn send + std::fmt::Debug + Clone>( + trigger: Trigger>, + sender: Query<&Sender>, + mut commands: Commands, +) { + let msg = &trigger.event().0; + let id = trigger.entity(); + let sender = match sender.get(id) { + Ok(sender) => sender, + Err(e) => { + error!(message = "Failed to get sender", error=%e); + return; + } + }; + trace!(message = "Sending message", ?msg); + if let Err(e) = sender.send(msg.to_owned()) { + error!(message = "Failed to send message", error=%e); + commands.entity(id).remove::(); + } +} + +pub fn on_ping(trigger: Trigger>, mut commands: Commands) { + let cmd = &trigger.event().0; + let id = trigger.entity(); + if let irc::Command::PING(srv, ..) = cmd { + debug!("Received PING"); + let pong = irc::Command::PONG(srv.to_owned(), None); + commands.trigger_targets(Outgoing(pong), id); + } +} + +pub fn on_welcome(trigger: Trigger>, mut commands: Commands) { + let msg = &trigger.event().0; + if let irc::Command::Response(irc::Response::RPL_WELCOME, args) = msg { + info!(message = "Registered", ?args); + if let Some(mut entity) = commands.get_entity(trigger.entity()) { + entity.remove::(); + entity.insert(Registered); + } + } +} + +pub fn ping( + mut pingers: Query<(Entity, &mut Pinger)>, + time: Res>, + mut commands: Commands, +) { + for (id, mut pinger) in &mut pingers { + if pinger.last_ping.tick(time.delta()).elapsed_secs() < 600.0 { + return; + } + let ping = irc::Command::PING(String::new(), None); + commands.trigger_targets(Outgoing(ping), id); + pinger.last_ping.reset(); + } +} + +pub fn identify( + mut commands: Commands, + chats: Query<(Entity, &Auth), (With, Without, Without)>, +) { + for (id, auth) in &chats { + commands.entity(id).insert(Identifying); + info!(message = "Identifying", ?auth); + if let Some(pass) = &auth.pass { + commands.trigger_targets(Outgoing(irc::Command::PASS(pass.clone())), id); + } + commands.trigger_targets(Outgoing(irc::Command::NICK(auth.nick.clone())), id); + } +} + +pub fn join_channels( + mut commands: Commands, + chats: Query< + (Entity, &Channels), + (With, Or<(Added, Changed)>), + >, +) { + for (id, channels) in &chats { + info!(message = "Joining channels", ?channels); + for channel in &channels.0 { + let join = irc::Command::JOIN(channel.to_owned(), None, None); + commands.trigger_targets(Outgoing(join), id); + } + } +} + +pub fn request_capabilities( + mut commands: Commands, + chats: Query< + (Entity, &Capabilities), + ( + With, + Or<(Added, Changed)>, + ), + >, +) { + for (id, caps) in &chats { + info!(message = "Requesting capabilities", ?caps); + let caps = caps + .0 + .iter() + .map(irc::Capability::as_ref) + .collect::>() + .join(" "); + let req = irc::Command::CAP(None, irc::CapSubCommand::REQ, None, Some(caps)); + + commands.trigger_targets(Outgoing(req), id); + } +} + +pub fn poll_stream(mut commands: Commands, mut streams: Query<(Entity, &mut Stream)>) { + use futures_util::StreamExt; + for (id, mut stream) in &mut streams { + loop { + let Some(next) = check_ready(&mut stream.0.next()) else { + break; + }; + match next { + None => { + warn!(message = "Stream ended", ?stream); + commands.entity(id).remove::(); + break; + } + Some(Ok(msg)) => { + trace!(message = "Received message", ?msg); + let command = Incoming(msg.command.clone()); + commands.trigger_targets(command, id); + commands.trigger_targets(Incoming(msg), id); + } + Some(Err(e)) => { + error!(message = "Failed to poll stream", error=%e, ?stream); + commands.entity(id).remove::(); + break; + } + } + } + } +} diff --git a/src/twitch.rs b/src/twitch.rs index cf985e8..11081af 100644 --- a/src/twitch.rs +++ b/src/twitch.rs @@ -1,5 +1,7 @@ +#![allow(missing_docs)] +use std::str::FromStr; + use irc::proto::message::Tag; -use thiserror::Error; pub trait TwitchMessageExt { type Error: std::error::Error; @@ -7,18 +9,20 @@ pub trait TwitchMessageExt { fn message_id(&self) -> Option<&str>; fn user_id(&self) -> Option<&str>; fn display_name(&self) -> Option<&str>; + fn badges(&self) -> Option>; fn set_reply_parent_id(&mut self, id: &str); fn set_reply_parent(&mut self, parent_message: &Self) { if let Some(id) = parent_message.message_id() { self.set_reply_parent_id(id); } } + /// Create a new reply message for the given message fn new_reply(&self, message: &str) -> Result where Self: Sized; } -#[derive(Error, Debug)] +#[derive(thiserror::Error, Debug)] pub enum MessageError { #[error("Invalid message command")] InvalidCommand, @@ -26,13 +30,28 @@ pub enum MessageError { MissingId, } -impl TwitchMessageExt for super::Message { +pub struct Badge { + pub badge: String, + pub version: String, +} + +impl FromStr for Badge { + type Err = (); + + fn from_str(s: &str) -> Result { + let mut split = s.splitn(2, '/'); + let badge = split.next().ok_or(())?.to_owned(); + let version = split.next().ok_or(())?.to_owned(); + Ok(Self { badge, version }) + } +} + +impl TwitchMessageExt for irc::proto::Message { type Error = MessageError; fn is_send_by_mod(&self) -> bool { - let tags = match &self.tags { - Some(tags) => tags, - None => return false, + let Some(tags) = &self.tags else { + return false; }; for Tag(key, val) in tags { @@ -47,6 +66,22 @@ impl TwitchMessageExt for super::Message { false } + fn badges(&self) -> Option> { + let tags = self.tags.as_ref()?; + + let badges = match tags.iter().find(|Tag(key, _)| key == "badges") { + Some(Tag(_, Some(badges))) => badges, + _ => return None, + }; + + let badges = badges + .split(',') + .map(|s| s.parse().ok()) + .collect::>>(); + + badges + } + fn display_name(&self) -> Option<&str> { self.tags .as_ref()? @@ -82,10 +117,9 @@ impl TwitchMessageExt for super::Message { } fn new_reply(&self, message: &str) -> Result { - use irc::proto::Command::PRIVMSG; - let channel = match &self.command { - PRIVMSG(ref channel, _) => channel, - _ => return Err(MessageError::InvalidCommand), + use irc::proto::{Command::PRIVMSG, Message}; + let PRIVMSG(channel, _) = &self.command else { + return Err(MessageError::InvalidCommand); }; let tags = match self.message_id() { @@ -96,10 +130,12 @@ impl TwitchMessageExt for super::Message { None => return Err(MessageError::MissingId), }; - Ok(Self { + let reply = Message { prefix: None, command: PRIVMSG(channel.to_owned(), message.to_owned()), tags, - }) + }; + + Ok(reply) } }