diff --git a/dozer-ingestion/aerospike/src/connector.rs b/dozer-ingestion/aerospike/src/connector.rs index fa34d3c981..703011dde8 100644 --- a/dozer-ingestion/aerospike/src/connector.rs +++ b/dozer-ingestion/aerospike/src/connector.rs @@ -1,6 +1,6 @@ use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError; use dozer_ingestion_connector::dozer_types::event::Event; -use dozer_ingestion_connector::dozer_types::log::{error, info}; +use dozer_ingestion_connector::dozer_types::log::{debug, error, info, warn}; use dozer_ingestion_connector::dozer_types::models::connection::AerospikeConnection; use dozer_ingestion_connector::dozer_types::models::ingestion_types::{ IngestionMessage, TransactionInfo, @@ -15,9 +15,11 @@ use dozer_ingestion_connector::{ TableIdentifier, TableInfo, }; use std::collections::HashMap; -use std::ffi::CString; +use std::ffi::{CStr, CString}; use std::num::TryFromIntError; +use std::time::Duration; + use dozer_ingestion_connector::dozer_types::serde::Deserialize; use actix_web::dev::Server; @@ -179,6 +181,34 @@ impl AerospikeConnector { .bind(address)? .run()) } + + async fn rewind( + &self, + client: &Client, + dc_name: &str, + namespace: &str, + ) -> Result { + unsafe { + let request = CString::new(format!( + "set-config:context=xdr;dc={dc_name};namespace={namespace};action=add;rewind=all" + ))?; + + // Wait until the replication configuration is set. + // It may take some time, so retrying until rewind returns ok. + let mut response: *mut i8 = std::ptr::null_mut(); + client.info(&request, &mut response).map_err(Box::new)?; + + let string = CStr::from_ptr(response); + + let parts: Vec<&str> = string.to_str()?.trim().split('\t').collect(); + + if let Some(status) = parts.get(1) { + Ok(status.replace('\n', "") == *"ok") + } else { + Ok(false) + } + } + } } fn map_error(error: AerospikeConnectorError) -> HttpResponse { @@ -199,6 +229,7 @@ async fn event_request_handler( let event = json.into_inner(); let state = data.into_inner(); + debug!("Event data: {:?}", event); // TODO: Handle delete if event.msg != "write" { return HttpResponse::Ok().finish(); @@ -206,6 +237,7 @@ async fn event_request_handler( let operation_events = map_events(event, state.tables_index_map.clone()).await; + debug!("Mapped events {:?}", operation_events); match operation_events { Ok(None) => HttpResponse::Ok().finish(), Ok(Some(events)) => { @@ -394,18 +426,34 @@ impl Connector for AerospikeConnector { &mut self, ingestor: &Ingestor, tables: Vec, - _last_checkpoint: Option, + last_checkpoint: Option, ) -> Result<(), BoxedError> { let hosts = CString::new(self.config.hosts.as_str())?; let client = Client::new(&hosts).map_err(Box::new)?; - unsafe { + + if last_checkpoint.is_none() { let dc_name = self.config.replication.datacenter.clone(); let namespace = self.config.namespace.clone(); - let request = CString::new(format!( - "set-config:context=xdr;dc={dc_name};namespace={namespace};action=add;rewind=all" - ))?; - let mut response: *mut i8 = std::ptr::null_mut(); - client.info(&request, &mut response).map_err(Box::new)?; + + // To read data snapshot we need to rewind xdr stream. + // Before rewinding we need to remove xdr configuration and then add it again. + unsafe { + let request = CString::new(format!( + "set-config:context=xdr;dc={dc_name};namespace={namespace};action=remove" + ))?; + let mut response: *mut i8 = std::ptr::null_mut(); + client.info(&request, &mut response).map_err(Box::new)?; + } + + loop { + if self.rewind(&client, &dc_name, &namespace).await? { + info!("Aerospike replication configuration set successfully"); + break; + } else { + warn!("Aerospike replication configuration set failed"); + tokio::time::sleep(Duration::from_secs(3)).await; + } + } } let mapped_schema = self.get_schemas(&tables).await?;