diff --git a/Cargo.lock b/Cargo.lock index b4d6ace..108e903 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1310,7 +1310,7 @@ dependencies = [ [[package]] name = "telelog" -version = "0.1.5" +version = "0.1.6" dependencies = [ "chrono", "clap", @@ -1320,6 +1320,7 @@ dependencies = [ "reqwest", "secrets", "serde", + "serde_json", "signal-hook", "systemd", "tokio", diff --git a/Cargo.toml b/Cargo.toml index f8b1bfc..4cee48e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telelog" -version = "0.1.5" +version = "0.1.6" edition = "2021" [dependencies] @@ -12,6 +12,7 @@ regex = "1.10.2" reqwest = "0.11.23" secrets = "1.2.0" serde = "1.0.195" +serde_json = "1.0.111" signal-hook = "0.3.17" systemd = "0.10.0" tokio = { version = "1.35.1", features = ["full"] } diff --git a/src/config.rs b/src/config.rs index 6d518fb..413d6ac 100644 --- a/src/config.rs +++ b/src/config.rs @@ -31,7 +31,7 @@ impl Default for AppSettings { pub struct TelegramSettings { pub chat_id: String, pub api_key: Option, - pub flush_seconds: Option, + pub flush_seconds: Option, } #[derive(Debug, Deserialize)] diff --git a/src/telegram.rs b/src/telegram.rs index 1b62a05..3030b9b 100644 --- a/src/telegram.rs +++ b/src/telegram.rs @@ -1,20 +1,50 @@ use lazy_static::lazy_static; use signal_hook::{consts::{SIGTERM,SIGINT}, iterator::Signals}; -use tokio; +use tokio::time::sleep; use tokio::sync::Mutex as AsyncMutex; use reqwest; +use std::time::Duration; +use serde::Deserialize; +use serde_json::Error as JsonError; +use std::sync::OnceLock; use crate::journal::LogEntry; use crate::config::AppSettings; + +#[derive(Debug)] +struct TelegramContext { + chat_id: String, + api_key: String, + flush_seconds: u16, +} + lazy_static!( - static ref LOG_BUFFER: AsyncMutex> = AsyncMutex::new(vec![]); - static ref APP_SETTINGS_COPY: AsyncMutex = AsyncMutex::new(AppSettings::default()); - static ref CLIENT: reqwest::Client = reqwest::Client::new(); + static ref LOG_ENTRY_BUFFER: AsyncMutex> = AsyncMutex::new(Vec::new()); + static ref PROCESSED_MESSAGE_BUFFER: AsyncMutex> = AsyncMutex::new(Vec::new()); + static ref REQUEST_CLIENT: reqwest::Client = reqwest::Client::new(); + static ref SEND_LOCK: AsyncMutex<()> = AsyncMutex::new(()); ); +static TELEGRAM_CONTEXT: OnceLock = OnceLock::new(); + +#[derive(Deserialize)] +struct ErrorResponse { + // ok: Option, + // error_code: Option, + // description: Option, + parameters: Option, +} + +#[derive(Deserialize)] +struct RetryParameters { + retry_after: Option, +} pub async fn init(settings: AppSettings) { - let mut app_settings_copy = APP_SETTINGS_COPY.lock().await; - *app_settings_copy = settings; + TELEGRAM_CONTEXT.set(TelegramContext { + chat_id: settings.telegram.chat_id, + api_key: settings.telegram.api_key.unwrap(), + flush_seconds: settings.telegram.flush_seconds.unwrap_or(5), + }).expect("Initialisation only occurs once"); tokio::task::spawn(async move { let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap(); @@ -34,7 +64,7 @@ pub async fn init(settings: AppSettings) { } pub async fn send_log_entry(entry: LogEntry) { - let mut buffer = LOG_BUFFER.lock().await; + let mut buffer = LOG_ENTRY_BUFFER.lock().await; buffer.push(entry.clone()); // if this is a critical entry, flush the buffer immediately @@ -49,18 +79,13 @@ pub async fn send_log_entry(entry: LogEntry) { // otherwise schedule a flush if this is the first message in the buffer if buffer.len() == 1 { drop(buffer); // release the lock - let settings = APP_SETTINGS_COPY.lock().await; - let flush_seconds = match settings.telegram.flush_seconds { - Some(seconds) => seconds, - None => { - println!("[telegram] flush_seconds not set, defaulting to 5 seconds"); - 5 - }, + let flush_seconds = match TELEGRAM_CONTEXT.get() { + Some(context) => context.flush_seconds, + None => 5, }; - drop(settings); // release the lock - + tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_secs(flush_seconds as u64)).await; + sleep(Duration::from_secs(flush_seconds as u64)).await; flush_log_buffer().await; }); } @@ -91,49 +116,115 @@ fn colour_translate(priority: u8) -> String { } } -async fn flush_log_buffer() { - let mut buffer = LOG_BUFFER.lock().await; - // parse the buffer to form the telegram message - let mut message = String::from("\n"); +fn generate_messages(buffer: &Vec) -> Vec { + let mut message_list: Vec = vec![]; + let mut current_message = String::from("\n"); - buffer.retain(|entry: &LogEntry| { + for entry in buffer { let new_entry_string = format!("{}[{}] {}: {}\n", colour_translate(entry.priority), entry.timestamp.format("%b %d %H:%M:%S"), entry.identifier, entry.message); - if message.len() + new_entry_string.len() < 4088 { - message.push_str(&new_entry_string); - return false + if current_message.len() + new_entry_string.len() >= 4088 { + current_message.push_str(""); + message_list.push(current_message); + current_message = String::from("\n"); } - - return true + + current_message.push_str(&new_entry_string); + } + if current_message != "\n" { + current_message.push_str(""); + message_list.push(current_message); + } + return message_list +} + +async fn send_telegram_message(message: &String, api_key: &String, chat_id: &String) -> Result { + let _guard = SEND_LOCK.lock().await; + let response = REQUEST_CLIENT.post(&format!("https://api.telegram.org/bot{}/sendMessage", api_key)) + .form(&[("chat_id", chat_id), ("text", message), ("parse_mode", &"HTML".to_string())]) + .send() + .await; + + tokio::spawn(async move { + sleep(Duration::from_secs(1)).await; + drop(_guard); }); + + return response; +} + +async fn flush_log_buffer() { + let mut buffer = LOG_ENTRY_BUFFER.lock().await; + let message_list = generate_messages(&buffer); + buffer.clear(); drop(buffer); // release the lock - if message == "\n" { + if message_list.len() == 0 { println!("[telegram] flush was ran, but buffer was empty"); return } - message.push_str(""); + let (api_key, chat_id) = match TELEGRAM_CONTEXT.get() { + Some(context) => (context.api_key.clone(), context.chat_id.clone()), + None => { + println!("[telegram] flush was ran, but context was empty"); + return + } + }; - // send the message to telegram - let settings = APP_SETTINGS_COPY.lock().await; - let chat_id = (settings.telegram.chat_id).to_owned(); - let api_key = (settings.telegram.api_key.as_ref().unwrap()).to_owned(); - drop(settings); + let mut old_unsent_messages = PROCESSED_MESSAGE_BUFFER.lock().await; + let mut new_unsent_messages: Vec = Vec::new(); - match CLIENT.post(&format!("https://api.telegram.org/bot{}/sendMessage", api_key)) - .form(&[("chat_id", chat_id), ("text", message), ("parse_mode", "HTML".to_string())]) - .send() - .await { - Ok(response) => { - if response.status().is_success() { - return + // try sending all the messages + for buffer in [&old_unsent_messages, &message_list] { + for message in buffer { + let result = send_telegram_message(message, &api_key, &chat_id).await; + + if let Err(e) = result { + println!("[telegram] Failed: {}", e); + new_unsent_messages.push(message.to_string()); + continue + } + + let response = result.unwrap(); + + if !response.status().is_success() { + let status = response.status(); + let text = response.text().await.unwrap(); + + // Error handling specifics + match status.as_u16() { + 429 => { + match serde_json::from_str(&text) as Result { + Ok(error_response) => { + if let Some(parameters) = error_response.parameters { + if let Some(retry_after) = parameters.retry_after { + println!("[telegram] API response 429: pausing messages for {} seconds", retry_after); + tokio::spawn(async move { + let _guard = SEND_LOCK.lock().await; + sleep(Duration::from_secs(retry_after)).await; + drop(_guard); + }); + } + } + } + Err(e) => { + println!("[telegram] Failed to parse 429 response: {}", e); + } + } + + new_unsent_messages.push(message.to_string()); + }, + _ => { + println!("[telegram] API response {}: {:?}", status, text); + // new_unsent_messages.push(message.to_string()); + } + } } - println!("[telegram] API response {}: {:?}", response.status(), response.text().await); - }, - Err(e) => { - println!("[telegram] Failed: {}", e); } } + *old_unsent_messages = new_unsent_messages; + + drop(old_unsent_messages); }