Skip to content

Commit

Permalink
better error handling. Handle rate limits
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicholas Hassan committed Jan 21, 2024
1 parent fe8b6fd commit f75e79b
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 48 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "telelog"
version = "0.1.5"
version = "0.1.6"
edition = "2021"

[dependencies]
Expand All @@ -12,6 +12,7 @@ regex = "1.10.2"
reqwest = "0.11.23"
secrets = "1.2.0"
serde = "1.0.195"
serde_json = "1.0.111"
signal-hook = "0.3.17"
systemd = "0.10.0"
tokio = { version = "1.35.1", features = ["full"] }
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Default for AppSettings {
pub struct TelegramSettings {
pub chat_id: String,
pub api_key: Option<String>,
pub flush_seconds: Option<u8>,
pub flush_seconds: Option<u16>,
}

#[derive(Debug, Deserialize)]
Expand Down
181 changes: 136 additions & 45 deletions src/telegram.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,50 @@
use lazy_static::lazy_static;
use signal_hook::{consts::{SIGTERM,SIGINT}, iterator::Signals};
use tokio;
use tokio::time::sleep;
use tokio::sync::Mutex as AsyncMutex;
use reqwest;
use std::time::Duration;
use serde::Deserialize;
use serde_json::Error as JsonError;
use std::sync::OnceLock;

use crate::journal::LogEntry;
use crate::config::AppSettings;

#[derive(Debug)]
struct TelegramContext {
chat_id: String,
api_key: String,
flush_seconds: u16,
}

lazy_static!(
static ref LOG_BUFFER: AsyncMutex<Vec<LogEntry>> = AsyncMutex::new(vec![]);
static ref APP_SETTINGS_COPY: AsyncMutex<AppSettings> = AsyncMutex::new(AppSettings::default());
static ref CLIENT: reqwest::Client = reqwest::Client::new();
static ref LOG_ENTRY_BUFFER: AsyncMutex<Vec<LogEntry>> = AsyncMutex::new(Vec::new());
static ref PROCESSED_MESSAGE_BUFFER: AsyncMutex<Vec<String>> = AsyncMutex::new(Vec::new());
static ref REQUEST_CLIENT: reqwest::Client = reqwest::Client::new();
static ref SEND_LOCK: AsyncMutex<()> = AsyncMutex::new(());
);
static TELEGRAM_CONTEXT: OnceLock<TelegramContext> = OnceLock::new();

#[derive(Deserialize)]
struct ErrorResponse {
// ok: Option<bool>,
// error_code: Option<u16>,
// description: Option<String>,
parameters: Option<RetryParameters>,
}

#[derive(Deserialize)]
struct RetryParameters {
retry_after: Option<u64>,
}

pub async fn init(settings: AppSettings) {
let mut app_settings_copy = APP_SETTINGS_COPY.lock().await;
*app_settings_copy = settings;
TELEGRAM_CONTEXT.set(TelegramContext {
chat_id: settings.telegram.chat_id,
api_key: settings.telegram.api_key.unwrap(),
flush_seconds: settings.telegram.flush_seconds.unwrap_or(5),
}).expect("Initialisation only occurs once");

tokio::task::spawn(async move {
let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
Expand All @@ -34,7 +64,7 @@ pub async fn init(settings: AppSettings) {
}

pub async fn send_log_entry(entry: LogEntry) {
let mut buffer = LOG_BUFFER.lock().await;
let mut buffer = LOG_ENTRY_BUFFER.lock().await;
buffer.push(entry.clone());

// if this is a critical entry, flush the buffer immediately
Expand All @@ -49,18 +79,13 @@ pub async fn send_log_entry(entry: LogEntry) {
// otherwise schedule a flush if this is the first message in the buffer
if buffer.len() == 1 {
drop(buffer); // release the lock
let settings = APP_SETTINGS_COPY.lock().await;
let flush_seconds = match settings.telegram.flush_seconds {
Some(seconds) => seconds,
None => {
println!("[telegram] flush_seconds not set, defaulting to 5 seconds");
5
},
let flush_seconds = match TELEGRAM_CONTEXT.get() {
Some(context) => context.flush_seconds,
None => 5,
};
drop(settings); // release the lock


tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(flush_seconds as u64)).await;
sleep(Duration::from_secs(flush_seconds as u64)).await;
flush_log_buffer().await;
});
}
Expand Down Expand Up @@ -91,49 +116,115 @@ fn colour_translate(priority: u8) -> String {
}
}

async fn flush_log_buffer() {
let mut buffer = LOG_BUFFER.lock().await;
// parse the buffer to form the telegram message
let mut message = String::from("<code>\n");
fn generate_messages(buffer: &Vec<LogEntry>) -> Vec<String> {
let mut message_list: Vec<String> = vec![];
let mut current_message = String::from("<code>\n");

buffer.retain(|entry: &LogEntry| {
for entry in buffer {
let new_entry_string = format!("{}[{}] {}: {}\n", colour_translate(entry.priority), entry.timestamp.format("%b %d %H:%M:%S"), entry.identifier, entry.message);

if message.len() + new_entry_string.len() < 4088 {
message.push_str(&new_entry_string);
return false
if current_message.len() + new_entry_string.len() >= 4088 {
current_message.push_str("</code>");
message_list.push(current_message);
current_message = String::from("<code>\n");
}

return true

current_message.push_str(&new_entry_string);
}
if current_message != "<code>\n" {
current_message.push_str("</code>");
message_list.push(current_message);
}
return message_list
}

async fn send_telegram_message(message: &String, api_key: &String, chat_id: &String) -> Result<reqwest::Response, reqwest::Error> {
let _guard = SEND_LOCK.lock().await;
let response = REQUEST_CLIENT.post(&format!("https://api.telegram.org/bot{}/sendMessage", api_key))
.form(&[("chat_id", chat_id), ("text", message), ("parse_mode", &"HTML".to_string())])
.send()
.await;

tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
drop(_guard);
});

return response;
}

async fn flush_log_buffer() {
let mut buffer = LOG_ENTRY_BUFFER.lock().await;
let message_list = generate_messages(&buffer);
buffer.clear();
drop(buffer); // release the lock

if message == "<code>\n" {
if message_list.len() == 0 {
println!("[telegram] flush was ran, but buffer was empty");
return
}

message.push_str("</code>");
let (api_key, chat_id) = match TELEGRAM_CONTEXT.get() {
Some(context) => (context.api_key.clone(), context.chat_id.clone()),
None => {
println!("[telegram] flush was ran, but context was empty");
return
}
};

// send the message to telegram
let settings = APP_SETTINGS_COPY.lock().await;
let chat_id = (settings.telegram.chat_id).to_owned();
let api_key = (settings.telegram.api_key.as_ref().unwrap()).to_owned();
drop(settings);
let mut old_unsent_messages = PROCESSED_MESSAGE_BUFFER.lock().await;
let mut new_unsent_messages: Vec<String> = Vec::new();

match CLIENT.post(&format!("https://api.telegram.org/bot{}/sendMessage", api_key))
.form(&[("chat_id", chat_id), ("text", message), ("parse_mode", "HTML".to_string())])
.send()
.await {
Ok(response) => {
if response.status().is_success() {
return
// try sending all the messages
for buffer in [&old_unsent_messages, &message_list] {
for message in buffer {
let result = send_telegram_message(message, &api_key, &chat_id).await;

if let Err(e) = result {
println!("[telegram] Failed: {}", e);
new_unsent_messages.push(message.to_string());
continue
}

let response = result.unwrap();

if !response.status().is_success() {
let status = response.status();
let text = response.text().await.unwrap();

// Error handling specifics
match status.as_u16() {
429 => {
match serde_json::from_str(&text) as Result<ErrorResponse, JsonError> {
Ok(error_response) => {
if let Some(parameters) = error_response.parameters {
if let Some(retry_after) = parameters.retry_after {
println!("[telegram] API response 429: pausing messages for {} seconds", retry_after);
tokio::spawn(async move {
let _guard = SEND_LOCK.lock().await;
sleep(Duration::from_secs(retry_after)).await;
drop(_guard);
});
}
}
}
Err(e) => {
println!("[telegram] Failed to parse 429 response: {}", e);
}
}

new_unsent_messages.push(message.to_string());
},
_ => {
println!("[telegram] API response {}: {:?}", status, text);
// new_unsent_messages.push(message.to_string());
}
}
}
println!("[telegram] API response {}: {:?}", response.status(), response.text().await);
},
Err(e) => {
println!("[telegram] Failed: {}", e);
}
}

*old_unsent_messages = new_unsent_messages;

drop(old_unsent_messages);
}

0 comments on commit f75e79b

Please sign in to comment.