Skip to content

Commit

Permalink
Fix rewind
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Feb 29, 2024
1 parent 70ee1b7 commit ce7477a
Showing 1 changed file with 57 additions and 9 deletions.
66 changes: 57 additions & 9 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError;
use dozer_ingestion_connector::dozer_types::event::Event;
use dozer_ingestion_connector::dozer_types::log::{error, info};
use dozer_ingestion_connector::dozer_types::log::{debug, error, info, warn};
use dozer_ingestion_connector::dozer_types::models::connection::AerospikeConnection;
use dozer_ingestion_connector::dozer_types::models::ingestion_types::{
IngestionMessage, TransactionInfo,
Expand All @@ -15,9 +15,11 @@ use dozer_ingestion_connector::{
TableIdentifier, TableInfo,
};
use std::collections::HashMap;
use std::ffi::CString;
use std::ffi::{CStr, CString};
use std::num::TryFromIntError;

use std::time::Duration;

use dozer_ingestion_connector::dozer_types::serde::Deserialize;

use actix_web::dev::Server;
Expand Down Expand Up @@ -179,6 +181,34 @@ impl AerospikeConnector {
.bind(address)?
.run())
}

async fn rewind(
&self,
client: &Client,
dc_name: &str,
namespace: &str,
) -> Result<bool, BoxedError> {
unsafe {
let request = CString::new(format!(
"set-config:context=xdr;dc={dc_name};namespace={namespace};action=add;rewind=all"
))?;

// Wait until the replication configuration is set.
// It may take some time, so retrying until rewind returns ok.
let mut response: *mut i8 = std::ptr::null_mut();
client.info(&request, &mut response).map_err(Box::new)?;

let string = CStr::from_ptr(response);

let parts: Vec<&str> = string.to_str()?.trim().split('\t').collect();

if let Some(status) = parts.get(1) {
Ok(status.replace('\n', "") == *"ok")
} else {
Ok(false)
}
}
}
}

fn map_error(error: AerospikeConnectorError) -> HttpResponse {
Expand All @@ -199,13 +229,15 @@ async fn event_request_handler(
let event = json.into_inner();
let state = data.into_inner();

debug!("Event data: {:?}", event);
// TODO: Handle delete
if event.msg != "write" {
return HttpResponse::Ok().finish();
}

let operation_events = map_events(event, state.tables_index_map.clone()).await;

debug!("Mapped events {:?}", operation_events);
match operation_events {
Ok(None) => HttpResponse::Ok().finish(),
Ok(Some(events)) => {
Expand Down Expand Up @@ -394,18 +426,34 @@ impl Connector for AerospikeConnector {
&mut self,
ingestor: &Ingestor,
tables: Vec<TableInfo>,
_last_checkpoint: Option<OpIdentifier>,
last_checkpoint: Option<OpIdentifier>,
) -> Result<(), BoxedError> {
let hosts = CString::new(self.config.hosts.as_str())?;
let client = Client::new(&hosts).map_err(Box::new)?;
unsafe {

if last_checkpoint.is_none() {
let dc_name = self.config.replication.datacenter.clone();
let namespace = self.config.namespace.clone();
let request = CString::new(format!(
"set-config:context=xdr;dc={dc_name};namespace={namespace};action=add;rewind=all"
))?;
let mut response: *mut i8 = std::ptr::null_mut();
client.info(&request, &mut response).map_err(Box::new)?;

// To read data snapshot we need to rewind xdr stream.
// Before rewinding we need to remove xdr configuration and then add it again.
unsafe {
let request = CString::new(format!(
"set-config:context=xdr;dc={dc_name};namespace={namespace};action=remove"
))?;
let mut response: *mut i8 = std::ptr::null_mut();
client.info(&request, &mut response).map_err(Box::new)?;
}

loop {
if self.rewind(&client, &dc_name, &namespace).await? {
info!("Aerospike replication configuration set successfully");
break;
} else {
warn!("Aerospike replication configuration set failed");
tokio::time::sleep(Duration::from_secs(3)).await;
}
}
}

let mapped_schema = self.get_schemas(&tables).await?;
Expand Down

0 comments on commit ce7477a

Please sign in to comment.