diff --git a/CHANGELOG.md b/CHANGELOG.md index 5575afa..115f1f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ * **BREAKING** http server is not started unless --p is provided * **BREAKING** migrated to new API /api/v1/events -> /api/events * if one of the threads exit, the whole program will exit -* moved to ilert-rust@2.0.0, will migrated incident_key -> alert_key in code and db +* moved to ilert-rust@2.0.0, will migrate incident_key -> alert_key in code and db * added event mapping keys to map mqtt payloads to event api * added event filter keys to filter mqtt payloads diff --git a/Cargo.toml b/Cargo.toml index 85315fa..f2a4a68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,8 @@ chrono = "0.4" clap = "2" ilert = "4.0.1" uuid = { version = "1.10", features = ["v4"] } +ctrlc = { version = "3.4" } rusqlite = { version = "0.32", features = ["bundled"] } # SQLite 3.46.0 rumqttc = "0.24" rdkafka = { version = "0.36", features = ["cmake-build"] } +futures-util = "0.3.30" diff --git a/README.md b/README.md index d7766b9..5ad0684 100644 --- a/README.md +++ b/README.md @@ -25,25 +25,35 @@ The ilert agent comes in a single binary with a small footprint and helps you to ### Docker image -You can grab the latest release from [Docker hub](https://hub.docker.com/r/ilert/ilagent) +You can grab the latest official image from [Docker hub](https://hub.docker.com/r/ilert/ilagent) -```shell script +```sh docker run ilert/ilagent ``` -### Install script - -For MacOS and Linux we also provide this one-liner to automatically install the agent: +### Compile the binary from source -> Note: default prebuild support stopped at version 0.3.0 if you cannot use the docker image or compile yourself and need new builds please open an issue +> Note: requires Rust to be installed, see https://rustup.rs -```shell script -curl -sL https://raw.githubusercontent.com/iLert/ilagent/master/install.sh | bash - +```sh +git clone git@github.com:iLert/ilagent.git +cd ilagent +cargo build --release +cd ./target/release +./ilagent --help ``` ### Pre-build releases -> Note: default prebuild support stopped at version 0.3.0 if you cannot use the docker image or compile yourself and need new builds please open an issue +> Note: default prebuild support stopped at version 0.3.0 if you cannot use the docker image or cant compile yourself and need new builds please open an issue + +#### Install script + +For MacOS and Linux we also provide this one-liner to automatically install the agent: + +```shell script +curl -sL https://raw.githubusercontent.com/iLert/ilagent/master/install.sh | bash - +``` We provide pre compiled binaries for every major OS on the [release page of this repository](https://github.com/iLert/ilagent/releases). @@ -166,6 +176,13 @@ ilagent daemon -v -v \ --filter_val 'ALARM' ``` +## Liveness probes + +When providing the `-p 8977` port argument the agent will start its http server. +Providing both a `GET /ready` and a `GET /health` endpoint, these are currently static, but will be dynamic in the future. +Additionally, we recommend providing the `-b il1hbt123...` heartbeat argument with the integration key of a heartbeat alert source +to periodically ping the source. + ## Getting help We are happy to respond to [GitHub issues][issues] as well. @@ -177,7 +194,7 @@ We are happy to respond to [GitHub issues][issues] as well. ### Cross-Compiling Of course, you can also grab the source code and compile it yourself. -Requires cross (`cargo install cross`) to be installed. +Requires cross (`cargo install cross` for Apple Silicon support: `cargo install cross --git https://github.com/cross-rs/cross`) to be installed. - Mac (or your host): `cargo build --release` - Linux: `cross build --release --target x86_64-unknown-linux-gnu` diff --git a/examples/_daemon b/examples/_daemon index abf8f1d..36347cb 100755 --- a/examples/_daemon +++ b/examples/_daemon @@ -1,2 +1,2 @@ #!/bin/bash -cargo run -- daemon -p 8977 -v -v +cargo run -- daemon -p 8977 -v -v -b 123 diff --git a/examples/_kafka_overwrite b/examples/_kafka_overwrite new file mode 100755 index 0000000..0061506 --- /dev/null +++ b/examples/_kafka_overwrite @@ -0,0 +1,12 @@ +#!/bin/bash +cargo run -- daemon -v -v \ + --kafka_brokers localhost:9092 --kafka_group_id ilagent -e 'test-topic' \ + --event_key 'il1api123...' \ + --map_key_alert_key 'mCode' \ + --map_key_summary 'comment' \ + --map_key_etype 'state' \ + --map_val_etype_alert 'SET' \ + --map_val_etype_accept 'ACK' \ + --map_val_etype_resolve 'CLR' \ + --filter_key 'type' \ + --filter_val 'ALARM' \ No newline at end of file diff --git a/examples/_mqtt_overwrite b/examples/_mqtt_overwrite index 17faa2c..1524eab 100755 --- a/examples/_mqtt_overwrite +++ b/examples/_mqtt_overwrite @@ -1,12 +1,12 @@ #!/bin/bash cargo run -- daemon -v -v \ -m 127.0.0.1 -q 1883 -n ilagent -e '#' \ - --mqtt_event_key 'il1api112115xxx' \ - --mqtt_map_key_alert_key 'mCode' \ - --mqtt_map_key_summary 'comment' \ - --mqtt_map_key_etype 'state' \ - --mqtt_map_val_etype_alert 'SET' \ - --mqtt_map_val_etype_accept 'ACK' \ - --mqtt_map_val_etype_resolve 'CLR' \ - --mqtt_filter_key 'type' \ - --mqtt_filter_val 'ALARM' \ No newline at end of file + --event_key 'il1api123...' \ + --map_key_alert_key 'mCode' \ + --map_key_summary 'comment' \ + --map_key_etype 'state' \ + --map_val_etype_alert 'SET' \ + --map_val_etype_accept 'ACK' \ + --map_val_etype_resolve 'CLR' \ + --filter_key 'type' \ + --filter_val 'ALARM' \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index c3c1113..54d099f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -39,7 +39,7 @@ impl ILConfig { http_host: "0.0.0.0".to_string(), http_port: 8977, start_http: false, - http_worker_count: 2, + http_worker_count: 1, db_file: "./ilagent.db3".to_string(), heartbeat_key: None, mqtt_host: None, diff --git a/src/consumers/kafka.rs b/src/consumers/kafka.rs index 8327860..34d248c 100644 --- a/src/consumers/kafka.rs +++ b/src/consumers/kafka.rs @@ -34,20 +34,18 @@ impl ConsumerContext for CustomContext { type LoggingConsumer = StreamConsumer; -pub async fn run_kafka_job(daemon_context: Arc) -> () { +pub async fn run_kafka_job(daemon_ctx: Arc) -> () { let (version_n, version_s) = get_rdkafka_version(); info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s); - let context = CustomContext; - - let event_topic = if let Some(topic) = daemon_context.config.clone().event_topic { + let event_topic = if let Some(topic) = daemon_ctx.config.clone().event_topic { topic.clone() } else { "".to_string() }; - let heartbeat_topic = if let Some(topic) = daemon_context.config.clone().heartbeat_topic { + let heartbeat_topic = if let Some(topic) = daemon_ctx.config.clone().heartbeat_topic { topic.clone() } else { "".to_string() @@ -61,9 +59,10 @@ pub async fn run_kafka_job(daemon_context: Arc) -> () { topics.push(heartbeat_topic.as_str()); } - let brokers = daemon_context.config.clone().kafka_brokers.expect("no broker"); - let group_id = daemon_context.config.clone().kafka_group_id.expect("no group id"); + let brokers = daemon_ctx.config.clone().kafka_brokers.expect("no broker"); + let group_id = daemon_ctx.config.clone().kafka_group_id.expect("no group id"); + let context = CustomContext; let consumer: LoggingConsumer = ClientConfig::new() .set("group.id", group_id) .set("bootstrap.servers", brokers) @@ -114,9 +113,9 @@ pub async fn run_kafka_job(daemon_context: Arc) -> () { } */ let should_retry: bool = if m.topic().eq(event_topic.as_str()) { - handle_event_message(daemon_context.clone(), message_key, payload, m.topic()).await + handle_event_message(daemon_ctx.clone(), message_key, payload, m.topic()).await } else if m.topic().eq(heartbeat_topic.as_str()) { - handle_heartbeat_message(daemon_context.clone(), message_key, payload).await + handle_heartbeat_message(daemon_ctx.clone(), message_key, payload).await } else { warn!("Received Kafka message from unsubscribed topic: {}", m.topic()); // will commit these anyway diff --git a/src/consumers/mqtt.rs b/src/consumers/mqtt.rs index e2cae62..8465398 100644 --- a/src/consumers/mqtt.rs +++ b/src/consumers/mqtt.rs @@ -2,25 +2,26 @@ use log::{info, error}; use std::time::{Duration}; use rumqttc::{MqttOptions, Client, QoS, Incoming, Event}; use std::{str, thread}; - +use std::sync::Arc; +use std::sync::atomic::Ordering; use ilert::ilert::ILert; use crate::db::ILDatabase; use crate::config::ILConfig; -use crate::hbt; +use crate::{hbt, DaemonContext}; use crate::models::event::EventQueueItemJson; -pub fn run_mqtt_job(config: &ILConfig) -> () { +pub fn run_mqtt_job(daemon_ctx: Arc) -> () { let mut connected = false; let mut recon_attempts = 0; - let db = ILDatabase::new(config.db_file.as_str()); + let db = ILDatabase::new(daemon_ctx.config.db_file.as_str()); - let mqtt_host = config.mqtt_host.clone().expect("Missing mqtt host"); - let mqtt_port = config.mqtt_port.clone().expect("Missing mqtt port"); + let mqtt_host = daemon_ctx.config.mqtt_host.clone().expect("Missing mqtt host"); + let mqtt_port = daemon_ctx.config.mqtt_port.clone().expect("Missing mqtt port"); let mut mqtt_options = MqttOptions::new( - config.mqtt_name.clone().expect("Missing mqtt name"), + daemon_ctx.config.mqtt_name.clone().expect("Missing mqtt name"), mqtt_host.as_str(), mqtt_port, ); @@ -30,16 +31,16 @@ pub fn run_mqtt_job(config: &ILConfig) -> () { .set_pending_throttle(Duration::from_secs(1)) .set_clean_session(false); - if let Some(mqtt_username) = config.mqtt_username.clone() { + if let Some(mqtt_username) = daemon_ctx.config.mqtt_username.clone() { mqtt_options.set_credentials(mqtt_username.as_str(), - config.mqtt_password.clone() + daemon_ctx.config.mqtt_password.clone() .expect("mqtt_username is set, expecting mqtt_password to be set as well").as_str()); } let (client, mut connection) = Client::new(mqtt_options, 10); - let event_topic = config.event_topic.clone().expect("Missing mqtt event topic"); - let heartbeat_topic = config.heartbeat_topic.clone().expect("Missing mqtt heartbeat topic"); + let event_topic = daemon_ctx.config.event_topic.clone().expect("Missing mqtt event topic"); + let heartbeat_topic = daemon_ctx.config.heartbeat_topic.clone().expect("Missing mqtt heartbeat topic"); client.subscribe(event_topic.as_str(), QoS::AtMostOnce) .expect("Failed to subscribe to mqtt event topic"); @@ -54,6 +55,10 @@ pub fn run_mqtt_job(config: &ILConfig) -> () { info!("Connecting to Mqtt server.."); for (_i, invoke) in connection.iter().enumerate() { + if !daemon_ctx.running.load(Ordering::Relaxed) { + break; + } + match invoke { Err(e) => { error!("mqtt error {:?}", e); @@ -85,12 +90,12 @@ pub fn run_mqtt_job(config: &ILConfig) -> () { if heartbeat_topic == message.topic { handle_heartbeat_message(payload); } else if event_topic == message.topic { - handle_event_message(&config, &db, payload, message.topic.as_str()); + handle_event_message(&daemon_ctx.config, &db, payload, message.topic.as_str()); } else { // with filters event processing might subscribe to wildcards if event_topic.contains("#") || event_topic.contains("+") { - handle_event_message(&config, &db, payload, message.topic.as_str()); + handle_event_message(&daemon_ctx.config, &db, payload, message.topic.as_str()); } } }, @@ -98,6 +103,11 @@ pub fn run_mqtt_job(config: &ILConfig) -> () { } } + // faster exits + if !daemon_ctx.running.load(Ordering::Relaxed) { + break; + } + // fallback, in case mqtt connection drops all the time if recon_attempts < 300 { recon_attempts = recon_attempts + 1; diff --git a/src/hbt.rs b/src/hbt.rs index b33df99..0adc458 100644 --- a/src/hbt.rs +++ b/src/hbt.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::sync::atomic::Ordering; use log::{error}; use std::time::{Duration, Instant}; @@ -6,18 +7,18 @@ use ilert::ilert::ILert; use ilert::ilert_builders::{HeartbeatApiResource}; use crate::DaemonContext; -pub async fn run_hbt_job(daemon_context: Arc) -> () { +pub async fn run_hbt_job(daemon_ctx: Arc) -> () { let mut last_run = Instant::now(); - let api_key = daemon_context.config.clone().heartbeat_key + let api_key = daemon_ctx.config.clone().heartbeat_key .expect("Failed to access heartbeat api key"); let api_key = api_key.as_str(); // kick off call - ping_heartbeat(&daemon_context.ilert_client, api_key).await; + ping_heartbeat(&daemon_ctx.ilert_client, api_key).await; - loop { + while daemon_ctx.running.load(Ordering::Relaxed) { tokio::time::sleep(Duration::from_millis(300)).await; if last_run.elapsed().as_millis() < 30000 { @@ -26,7 +27,7 @@ pub async fn run_hbt_job(daemon_context: Arc) -> () { last_run = Instant::now(); } - ping_heartbeat(&daemon_context.ilert_client, api_key).await; + ping_heartbeat(&daemon_ctx.ilert_client, api_key).await; } } diff --git a/src/server.rs b/src/http_server.rs similarity index 82% rename from src/server.rs rename to src/http_server.rs index 8d0cc1f..d2ee11d 100644 --- a/src/server.rs +++ b/src/http_server.rs @@ -23,6 +23,14 @@ async fn get_index(_req: HttpRequest) -> impl Responder { .body("ilagent/0.5.0") } +async fn get_ready(_req: HttpRequest) -> impl Responder { + HttpResponse::NoContent().finish() +} + +async fn get_health(_req: HttpRequest) -> impl Responder { + HttpResponse::NoContent().finish() +} + async fn get_heartbeat(container: web::Data>, _req: HttpRequest, path: web::Path<(String,)>) -> impl Responder { let container = container.lock().await; @@ -83,6 +91,14 @@ fn config_app(cfg: &mut web::ServiceConfig) { .route(web::get().to(get_index)) // / ); + cfg.service(web::resource("/ready") + .route(web::get().to(get_ready)) + ); + + cfg.service(web::resource("/health") + .route(web::get().to(get_health)) + ); + cfg.service(web::resource("/api/events") .route(web::post().to(post_event)) // POST ); @@ -93,18 +109,19 @@ fn config_app(cfg: &mut web::ServiceConfig) { ); } -pub fn run_server(daemon_context: Arc) -> () { - let addr = daemon_context.config.get_http_bind_str().clone(); - let db = ILDatabase::new(daemon_context.config.db_file.as_str()); +pub async fn run_server(daemon_ctx: Arc) -> () { + let addr = daemon_ctx.config.get_http_bind_str().clone(); + let db = ILDatabase::new(daemon_ctx.config.db_file.as_str()); let ilert_client = ILert::new().expect("failed to create ilert client"); + info!("Starting HTTP server @ {}", addr); let container = web::Data::new(Mutex::new(WebContextContainer{ db, ilert_client })); let server = HttpServer::new(move|| App::new() .app_data(container.clone()) .wrap(middleware::Logger::default()) .app_data(web::JsonConfig::default().limit(16000)) .configure(config_app)) - .workers(daemon_context.config.http_worker_count.try_into().expect("Failed to get http worker count")) + .workers(daemon_ctx.config.http_worker_count.try_into().expect("Failed to get http worker count")) .bind(addr.as_str()) .expect("Failed to bind to http port"); - let _ = server.run(); + let _ = server.run().await; } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index c40e741..28f354f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,13 @@ -use log::{debug, info, error}; +use log::{debug, info, error, warn}; use env_logger::Env; use std::sync::Arc; use clap::{Arg, App, ArgMatches}; use std::panic; use std::process; - +use std::sync::atomic::{AtomicBool, Ordering}; use ilert::ilert::ILert; use ilert::ilert_builders::{EventImage, EventLink}; -use tokio::sync::{Mutex}; - +use tokio::sync::Mutex; use config::ILConfig; use db::ILDatabase; use crate::models::event_db::EventQueueItem; @@ -19,13 +18,14 @@ mod models; mod hbt; mod consumers; mod poll; -mod server; +mod http_server; mod cleanup; pub struct DaemonContext { pub config: ILConfig, pub db: Mutex, - pub ilert_client: ILert + pub ilert_client: ILert, + pub running: AtomicBool } #[tokio::main] @@ -353,6 +353,11 @@ async fn main() -> () { } config = parse_consumer_arguments(&matches, config); + // kafka enforces http, to exit properly + if !config.start_http { + config.start_http = true; + warn!("The current version of ilagent, enforces the http server when kafka is used"); + } } let db_file = matches.value_of("file"); @@ -439,17 +444,14 @@ fn parse_consumer_arguments(matches: &ArgMatches, mut config: ILConfig) -> ILCon Kafka will use the consumer offset to ensure at least once delivery, no db polling needed */ async fn run_daemon(config: &ILConfig) -> () { + info!("Starting daemon.."); - // in case a thread (like mqtt or poll) dies of a panic - // we want to make sure the whole program exits let orig_hook = panic::take_hook(); panic::set_hook(Box::new(move |panic_info| { orig_hook(panic_info); process::exit(1); })); - info!("Starting.."); - let ilert_client = ILert::new().expect("Failed to create ilert client"); let db = ILDatabase::new(config.db_file.as_str()); info!("Migrating DB.."); @@ -458,9 +460,19 @@ async fn run_daemon(config: &ILConfig) -> () { let daemon_ctx = Arc::new(DaemonContext { config: config.clone(), db: Mutex::new(db), - ilert_client + ilert_client, + running: AtomicBool::new(true) }); + // kafka stream will not exit when ctrlc hook is present + if config.kafka_brokers.is_none() { + let ctrlc_ctx = daemon_ctx.clone(); + ctrlc::set_handler(move || { + info!("Received Ctrl+C. Shutting down threads..."); + ctrlc_ctx.running.store(false, Ordering::Relaxed); + }).expect("Error setting Ctrl-C handler"); + } + // poll is only needed if mqtt or web server are running let is_poll_needed = config.mqtt_host.is_some() || config.start_http; let mut poll_job = None; @@ -487,40 +499,47 @@ async fn run_daemon(config: &ILConfig) -> () { let cloned_ctx = daemon_ctx.clone(); // rumqttc spawns its own tokio runtime mqtt_job = Some(tokio::task::spawn_blocking(move || { - consumers::mqtt::run_mqtt_job(&cloned_ctx.config); + consumers::mqtt::run_mqtt_job(cloned_ctx); })); } - let mut kafka_job = None; + // let mut kafka_job = None; if config.kafka_brokers.is_some() { info!("Running Kafka thread.."); let cloned_ctx = daemon_ctx.clone(); - kafka_job = Some(tokio::spawn(async move { + tokio::spawn(async move { consumers::kafka::run_kafka_job(cloned_ctx).await; - })); + }); } if config.start_http { - info!("Starting server.."); - server::run_server(daemon_ctx.clone()); - // blocking.. + http_server::run_server(daemon_ctx.clone()).await; + // blocking... + debug!("http ended"); + daemon_ctx.running.store(false, Ordering::Relaxed); } if let Some(handle) = poll_job { handle.await.expect("Failed to join poll thread"); + debug!("poll ended"); } if let Some(handle) = hbt_job { handle.await.expect("Failed to join heartbeat thread"); + debug!("hbt ended"); } if let Some(handle) = mqtt_job { + info!("waiting for mqtt to drop connections..."); handle.await.expect("Failed to join mqtt thread"); + debug!("mqtt ended"); } + /* as soon as actix or ctrcl_handler is used the kafka streaming consumer will not shut down if let Some(handle) = kafka_job { handle.await.expect("Failed to join kafka thread"); - } + info!("kafka ended"); + } */ () } diff --git a/src/poll.rs b/src/poll.rs index b3b9437..bb658da 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::sync::atomic::Ordering; use log::{info, warn, error}; use std::time::{Duration, Instant}; @@ -11,7 +12,7 @@ use crate::models::event_db::EventQueueItem; pub async fn run_poll_job(daemon_ctx: Arc) -> () { let mut last_run = Instant::now(); - loop { + while daemon_ctx.running.load(Ordering::Relaxed) { tokio::time::sleep(Duration::from_millis(250)).await; if last_run.elapsed().as_millis() < 5000 { @@ -54,7 +55,7 @@ pub async fn process_queued_event(ilert_client: &ILert, event: &EventQueueItem) let parsed_event = EventQueueItemJson::from_db(event.clone()); - let event_id = event.id.clone().unwrap_or("".to_string()); + let event_id = event.id.clone().unwrap_or("no_id".to_string()); let event_type = ILertEventType::from_str(event.event_type.as_str()); let event_type = match event_type { Ok(et) => et, @@ -106,21 +107,22 @@ pub async fn process_queued_event(ilert_client: &ILert, event: &EventQueueItem) let status = response.status.as_u16(); if status == 202 { - info!("Event {} post successfully delivered", event_id); + let correlation_id = response.headers.get("correlation-id"); + info!("Event id: {}, correlation-id: {:?} successfully delivered", event_id, correlation_id); return false; // default happy case, no retry } if status == 429 { - warn!("Event {} post failed too many requests", event_id); + warn!("Event {} failed too many requests", event_id); return true; // too many requests, retry } if status > 499 { - warn!("Event {} post failed server side exception", event_id); + warn!("Event {} failed server side exception", event_id); return true; // 500 exceptions, retry } - warn!("Event {} post failed bad request rejection {}", event_id, status); + warn!("Event {} failed bad request rejection {}", event_id, status); error!("Response body: {}", response.body_raw.unwrap_or("No body provided".to_string())); false // any other status code e.g. 400, no retry }