Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: AS XDR rewind #2392

Merged
merged 12 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dozer-ingestion/aerospike/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ edition = "2021"
dozer-ingestion-connector = { path = "../connector" }
actix-web = "4.5.1"
base64 = "0.21.7"
dozer-sink-aerospike = { path = "../../dozer-sink-aerospike" }
115 changes: 104 additions & 11 deletions dozer-ingestion/aerospike/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use dozer_ingestion_connector::dozer_types::errors::internal::BoxedError;
use dozer_ingestion_connector::dozer_types::event::Event;
use dozer_ingestion_connector::dozer_types::log::{error, info};
use dozer_ingestion_connector::dozer_types::log::{error, info, trace, warn};
use dozer_ingestion_connector::dozer_types::models::connection::AerospikeConnection;
use dozer_ingestion_connector::dozer_types::models::ingestion_types::{
IngestionMessage, TransactionInfo,
Expand All @@ -16,8 +16,11 @@ use dozer_ingestion_connector::{
TableIdentifier, TableInfo,
};
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::num::TryFromIntError;

use std::time::Duration;

use dozer_ingestion_connector::dozer_types::serde::Deserialize;

use actix_web::dev::Server;
Expand All @@ -38,22 +41,29 @@ use base64::prelude::*;
use dozer_ingestion_connector::dozer_types::chrono::{
DateTime, FixedOffset, NaiveDate, NaiveDateTime, Utc,
};

use dozer_ingestion_connector::dozer_types::thiserror::{self, Error};
use dozer_ingestion_connector::schema_parser::SchemaParser;

use dozer_sink_aerospike::Client;

#[derive(Debug, Error)]
pub enum AerospikeConnectorError {
#[error("Cannot start server: {0}")]
CannotStartServer(#[from] std::io::Error),

#[error("Set name is none. Key: {0:?}, {1:?}, {2:?}")]
SetNameIsNone(Option<String>, Option<String>, Option<String>),
SetNameIsNone(
Option<serde_json::Value>,
Option<serde_json::Value>,
Option<serde_json::Value>,
),

#[error("PK is none: {0:?}, {1:?}, {2:?}")]
PkIsNone(Option<String>, String, Option<String>),
PkIsNone(Option<serde_json::Value>, String, Option<serde_json::Value>),

#[error("Invalid key value: {0:?}. Key is supposed to have 4 elements.")]
InvalidKeyValue(Vec<Option<String>>),
InvalidKeyValue(Vec<Option<serde_json::Value>>),

#[error("Unsupported type. Bin type {bin_type:?}, field type: {field_type:?}")]
UnsupportedTypeForFieldType {
Expand Down Expand Up @@ -102,13 +112,16 @@ pub enum AerospikeConnectorError {

#[error("Failed parsing timestamp: {0}")]
TimestampParsingError(#[from] dozer_ingestion_connector::dozer_types::chrono::ParseError),

#[error("Key is neither string or int")]
KeyNotSupported(Value),
}

#[derive(Deserialize, Debug)]
#[serde(crate = "dozer_types::serde")]
pub struct AerospikeEvent {
msg: String,
key: Vec<Option<String>>,
key: Vec<Option<serde_json::Value>>,
// gen: u32,
// exp: u32,
lut: u64,
Expand Down Expand Up @@ -160,6 +173,34 @@ impl AerospikeConnector {
.bind(address)?
.run())
}

async fn rewind(
&self,
client: &Client,
dc_name: &str,
namespace: &str,
) -> Result<bool, BoxedError> {
unsafe {
let request = CString::new(format!(
"set-config:context=xdr;dc={dc_name};namespace={namespace};action=add;rewind=all"
))?;

// Wait until the replication configuration is set.
// It may take some time, so retrying until rewind returns ok.
let mut response: *mut i8 = std::ptr::null_mut();
client.info(&request, &mut response).map_err(Box::new)?;

let string = CStr::from_ptr(response);

let parts: Vec<&str> = string.to_str()?.trim().split('\t').collect();

if let Some(status) = parts.get(1) {
Ok(status.replace('\n', "") == *"ok")
} else {
Ok(false)
}
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -276,13 +317,15 @@ async fn event_request_handler(
let event = json.into_inner();
let state = data.into_inner();

trace!("Event data: {:?}", event);
// TODO: Handle delete
if event.msg != "write" {
return HttpResponse::Ok().finish();
}

let operation_events = map_events(event, &state.tables_index_map).await;

trace!("Mapped events {:?}", operation_events);
match operation_events {
Ok(None) => HttpResponse::Ok().finish(),
Ok(Some(message)) => {
Expand Down Expand Up @@ -389,6 +432,8 @@ impl Connector for AerospikeConnector {
name: name.clone(),
typ: if name == "inserted_at" {
FieldType::Timestamp
} else if name == "PK" {
FieldType::UInt
} else {
FieldType::String
},
Expand Down Expand Up @@ -472,8 +517,36 @@ impl Connector for AerospikeConnector {
&mut self,
ingestor: &Ingestor,
tables: Vec<TableInfo>,
_last_checkpoint: Option<OpIdentifier>,
last_checkpoint: Option<OpIdentifier>,
) -> Result<(), BoxedError> {
let hosts = CString::new(self.config.hosts.as_str())?;
let client = Client::new(&hosts).map_err(Box::new)?;

if last_checkpoint.is_none() {
let dc_name = self.config.replication.datacenter.clone();
let namespace = self.config.namespace.clone();

// To read data snapshot we need to rewind xdr stream.
// Before rewinding we need to remove xdr configuration and then add it again.
unsafe {
let request = CString::new(format!(
"set-config:context=xdr;dc={dc_name};namespace={namespace};action=remove"
))?;
let mut response: *mut i8 = std::ptr::null_mut();
client.info(&request, &mut response).map_err(Box::new)?;
}

loop {
if self.rewind(&client, &dc_name, &namespace).await? {
info!("Aerospike replication configuration set successfully");
break;
} else {
warn!("Aerospike replication configuration set failed");
tokio::time::sleep(Duration::from_secs(3)).await;
}
}
}

let mapped_schema = self.get_schemas(&tables).await?;
ingestor
.handle_message(IngestionMessage::TransactionInfo(
Expand Down Expand Up @@ -535,7 +608,7 @@ async fn map_events(
event: AerospikeEvent,
tables_map: &HashMap<String, TableIndexMap>,
) -> Result<Option<IngestionMessage>, AerospikeConnectorError> {
let key: [Option<String>; 4] = match event.key.try_into() {
let key: [Option<serde_json::Value>; 4] = match event.key.try_into() {
Ok(key) => key,
Err(key) => return Err(AerospikeConnectorError::InvalidKeyValue(key)),
};
Expand All @@ -546,20 +619,40 @@ async fn map_events(
));
};

let table_name = match set_name {
serde_json::Value::String(s) => s.clone(),
_ => {
return Err(AerospikeConnectorError::SetNameIsNone(
key0, key2, pk_in_key,
))
}
};

let Some(TableIndexMap {
columns_map,
table_index,
}) = tables_map.get(set_name.as_str())
}) = tables_map.get(&table_name)
else {
return Ok(None);
};

let mut fields = vec![Field::Null; columns_map.len()];
if let Some((pk, _)) = columns_map.get("PK") {
if let Some(pk_in_key) = pk_in_key {
fields[*pk] = Field::String(pk_in_key);
match pk_in_key {
serde_json::Value::String(s) => {
fields[*pk] = Field::String(s.clone());
}
serde_json::Value::Number(n) => {
fields[*pk] = Field::UInt(
n.as_u64()
.ok_or(AerospikeConnectorError::ParsingUIntFailed)?,
);
}
v => return Err(AerospikeConnectorError::KeyNotSupported(v)),
}
} else {
return Err(AerospikeConnectorError::PkIsNone(key0, set_name, key2));
return Err(AerospikeConnectorError::PkIsNone(key0, table_name, key2));
}
}

Expand Down Expand Up @@ -589,7 +682,7 @@ async fn map_events(
op: Insert {
new: dozer_types::types::Record::new(fields),
},
id: None,
id: Some(OpIdentifier::new(event.lut, 0)),
}))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <aerospike/aerospike.h>
#include <aerospike/aerospike_info.h>
#include <aerospike/aerospike_key.h>
#include <aerospike/as_record.h>
#include <aerospike/as_error.h>
Expand Down
32 changes: 26 additions & 6 deletions dozer-sink-aerospike/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use std::time::{Duration, Instant};
use std::{collections::HashMap, fmt::Debug};

use aerospike_client_sys::{
aerospike, aerospike_batch_write, aerospike_connect, aerospike_destroy, aerospike_key_put,
aerospike_key_remove, aerospike_key_select, aerospike_new, as_arraylist_append,
as_arraylist_destroy, as_arraylist_new, as_batch_record, as_batch_records,
aerospike, aerospike_batch_write, aerospike_connect, aerospike_destroy, aerospike_info_any,
aerospike_key_put, aerospike_key_remove, aerospike_key_select, aerospike_new,
as_arraylist_append, as_arraylist_destroy, as_arraylist_new, as_batch_record, as_batch_records,
as_batch_records_destroy, as_batch_write_record, as_bin_value, as_boolean_new, as_bytes_new,
as_bytes_new_wrap, as_bytes_set, as_bytes_type, as_bytes_type_e_AS_BYTES_STRING, as_config,
as_config_add_hosts, as_config_init, as_double_new, as_error, as_integer_new, as_key,
Expand All @@ -36,6 +36,7 @@ use aerospike_client_sys::{
as_val_val_reserve, as_vector, as_vector_increase_capacity, as_vector_init, AS_BATCH_WRITE,
AS_BIN_NAME_MAX_LEN,
};

use dozer_core::node::{PortHandle, Sink, SinkFactory};
use dozer_types::errors::internal::BoxedError;
use dozer_types::geo::{Coord, Point};
Expand Down Expand Up @@ -74,7 +75,7 @@ enum AerospikeSinkError {
}

#[derive(Debug, Error)]
struct AerospikeError {
pub struct AerospikeError {
code: i32,
message: String,
}
Expand Down Expand Up @@ -102,7 +103,7 @@ impl Display for AerospikeError {

// Client should never be `Clone`, because of the custom Drop impl
#[derive(Debug)]
struct Client {
pub struct Client {
inner: NonNull<aerospike>,
}

Expand All @@ -128,7 +129,7 @@ unsafe fn as_try(mut f: impl FnMut(*mut as_error) -> as_status) -> Result<(), Ae
}

impl Client {
fn new(hosts: &CStr) -> Result<Self, AerospikeError> {
pub fn new(hosts: &CStr) -> Result<Self, AerospikeError> {
let mut config = unsafe {
let mut config = MaybeUninit::uninit();
as_config_init(config.as_mut_ptr());
Expand Down Expand Up @@ -232,6 +233,25 @@ impl Client {
)
})
}

/// # Safety
///
/// This function sends a raw info request to the aerospike server
pub unsafe fn info(
&self,
request: &CStr,
response: &mut *mut i8,
) -> Result<(), AerospikeError> {
as_try(|err| {
aerospike_info_any(
self.inner.as_ptr(),
err,
null(),
request.as_ptr(),
response as *mut *mut i8,
)
})
}
}

impl Drop for Client {
Expand Down
7 changes: 7 additions & 0 deletions dozer-types/src/models/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ pub struct ReplicationSettings {
pub server_address: String,
#[serde(default = "default_server_port")]
pub server_port: u32,
#[serde(default = "default_datacenter")]
pub datacenter: String,
}

fn default_server_address() -> String {
Expand All @@ -236,11 +238,16 @@ fn default_server_port() -> u32 {
5929
}

fn default_datacenter() -> String {
"esp".to_string()
}

impl Default for ReplicationSettings {
fn default() -> Self {
ReplicationSettings {
server_address: default_server_address(),
server_port: default_server_port(),
datacenter: default_datacenter(),
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion json_schemas/dozer.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@
"replication": {
"default": {
"server_address": "0.0.0.0",
"server_port": 5929
"server_port": 5929,
"datacenter": "esp"
},
"allOf": [
{
Expand Down Expand Up @@ -1596,6 +1597,10 @@
"ReplicationSettings": {
"type": "object",
"properties": {
"datacenter": {
"default": "esp",
"type": "string"
},
"server_address": {
"default": "0.0.0.0",
"type": "string"
Expand Down
Loading