Skip to content

Commit

Permalink
fix: Fix publication slot creation (#1202)
Browse files Browse the repository at this point in the history
Postgres connector were initiated without tables config, so during publication slot creation it was always creating slot for all tables. This was causing issue in replication, because we were receiving data from tables, which were not used in sources.

Signed-off-by: Karolis Gudiškis <[email protected]>
  • Loading branch information
karolisg authored Mar 10, 2023
1 parent a569032 commit d46eced
Show file tree
Hide file tree
Showing 25 changed files with 66 additions and 68 deletions.
4 changes: 2 additions & 2 deletions dozer-admin/src/services/connection_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl ConnectionService {
connection: Connection,
) -> Result<Vec<dozer_orchestrator::TableInfo>, ErrorResponse> {
let res = thread::spawn(|| {
let connector = get_connector(connection).map_err(|err| err.to_string())?;
let connector = get_connector(connection, None).map_err(|err| err.to_string())?;
connector.get_tables().map_err(|err| err.to_string())
})
.join()
Expand Down Expand Up @@ -185,7 +185,7 @@ impl ConnectionService {
) -> Result<ValidateConnectionResponse, ErrorResponse> {
let c = input.connection.unwrap();
let validate_result = thread::spawn(|| {
let connector = get_connector(c).map_err(|err| err.to_string())?;
let connector = get_connector(c, None).map_err(|err| err.to_string())?;
connector.validate(None).map_err(|err| err.to_string())
});
validate_result
Expand Down
3 changes: 2 additions & 1 deletion dozer-ingestion/benches/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ pub fn get_progress() -> ProgressBar {
pub async fn get_connection_iterator(config: TestConfig) -> IngestionIterator {
let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
std::thread::spawn(move || {
let grpc_connector = dozer_ingestion::connectors::get_connector(config.connection).unwrap();
let grpc_connector =
dozer_ingestion::connectors::get_connector(config.connection, None).unwrap();

let mut tables = grpc_connector.get_tables().unwrap();
if let Some(tables_filter) = config.tables_filter {
Expand Down
4 changes: 2 additions & 2 deletions dozer-ingestion/src/connectors/delta_lake/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Connector for DeltaLakeConnector {
}

fn validate_schemas(&self, tables: &[TableInfo]) -> ConnectorResult<ValidationResults> {
let schemas = self.get_schemas(Some(tables.to_vec()))?;
let schemas = self.get_schemas(Some(&tables.to_vec()))?;
let mut validation_result = ValidationResults::new();
let existing_schemas_names: Vec<String> = schemas.iter().map(|s| s.name.clone()).collect();
for table in tables {
Expand All @@ -59,7 +59,7 @@ impl Connector for DeltaLakeConnector {

fn get_schemas(
&self,
table_names: Option<Vec<TableInfo>>,
table_names: Option<&Vec<TableInfo>>,
) -> ConnectorResult<Vec<SourceSchema>> {
let schema_helper = SchemaHelper::new(self.config.clone());
schema_helper.get_schemas(self.id, table_names)
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/src/connectors/delta_lake/schema_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl SchemaHelper {
pub fn get_schemas(
&self,
id: u64,
tables: Option<Vec<TableInfo>>,
tables: Option<&Vec<TableInfo>>,
) -> ConnectorResult<Vec<SourceSchema>> {
if tables.is_none() {
return Ok(vec![]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn get_schema_from_deltalake() {
name: table_name.to_string(),
columns: None,
};
let field = connector.get_schemas(Some(vec![table_info])).unwrap()[0]
let field = connector.get_schemas(Some(&vec![table_info])).unwrap()[0]
.schema
.fields[0]
.clone();
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/src/connectors/ethereum/log/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl EthLogConnector {
impl Connector for EthLogConnector {
fn get_schemas(
&self,
tables: Option<Vec<TableInfo>>,
tables: Option<&Vec<TableInfo>>,
) -> Result<Vec<SourceSchema>, ConnectorError> {
let mut schemas = vec![SourceSchema::new(
ETH_LOGS_TABLE.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/src/connectors/ethereum/trace/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl EthTraceConnector {
impl Connector for EthTraceConnector {
fn get_schemas(
&self,
_table_names: Option<Vec<TableInfo>>,
_table_names: Option<&Vec<TableInfo>>,
) -> Result<Vec<SourceSchema>, ConnectorError> {
Ok(vec![SourceSchema::new(
ETH_TRACE_TABLE.to_string(),
Expand Down
4 changes: 2 additions & 2 deletions dozer-ingestion/src/connectors/grpc/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ where
{
fn get_schemas(
&self,
table_names: Option<Vec<TableInfo>>,
table_names: Option<&Vec<TableInfo>>,
) -> Result<Vec<SourceSchema>, ConnectorError> {
let schemas_str = Self::parse_config(&self.config)?;
let adapter = GrpcIngestor::<T>::new(schemas_str)?;
Expand All @@ -156,7 +156,7 @@ where
}

fn validate(&self, table_names: Option<Vec<TableInfo>>) -> Result<(), ConnectorError> {
let schemas = self.get_schemas(table_names);
let schemas = self.get_schemas(table_names.as_ref());
schemas.map(|_| ())
}

Expand Down
21 changes: 12 additions & 9 deletions dozer-ingestion/src/connectors/grpc/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@ async fn ingest_grpc(
let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());

std::thread::spawn(move || {
let grpc_connector = crate::connectors::get_connector(Connection {
config: Some(ConnectionConfig::Grpc(GrpcConfig {
schemas: Some(GrpcConfigSchemas::Inline(schemas.to_string())),
adapter,
port,
..Default::default()
})),
name: "grpc".to_string(),
})
let grpc_connector = crate::connectors::get_connector(
Connection {
config: Some(ConnectionConfig::Grpc(GrpcConfig {
schemas: Some(GrpcConfigSchemas::Inline(schemas.to_string())),
adapter,
port,
..Default::default()
})),
name: "grpc".to_string(),
},
None,
)
.unwrap();

let tables = grpc_connector.get_tables().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions dozer-ingestion/src/connectors/kafka/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ impl KafkaConnector {
impl Connector for KafkaConnector {
fn get_schemas(
&self,
table_names: Option<Vec<TableInfo>>,
table_names: Option<&Vec<TableInfo>>,
) -> Result<Vec<SourceSchema>, ConnectorError> {
self.config.schema_registry_url.clone().map_or(
NoSchemaRegistry::get_schema(table_names.clone(), self.config.clone()),
NoSchemaRegistry::get_schema(table_names, self.config.clone()),
|_| SchemaRegistry::get_schema(table_names, self.config.clone()),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct NoSchemaRegistry {}

impl NoSchemaRegistry {
pub fn get_schema(
table_names: Option<Vec<TableInfo>>,
table_names: Option<&Vec<TableInfo>>,
config: KafkaConfig,
) -> Result<Vec<SourceSchema>, ConnectorError> {
table_names.map_or(Ok(vec![]), |tables| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl SchemaRegistry {
}

pub fn get_schema(
table_names: Option<Vec<TableInfo>>,
table_names: Option<&Vec<TableInfo>>,
config: KafkaConfig,
) -> Result<Vec<SourceSchema>, ConnectorError> {
let sr_settings = SrSettings::new(config.schema_registry_url.unwrap());
Expand Down
9 changes: 6 additions & 3 deletions dozer-ingestion/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub trait Connector: Send + Sync + Debug {

fn get_schemas(
&self,
table_names: Option<Vec<TableInfo>>,
table_names: Option<&Vec<TableInfo>>,
) -> Result<Vec<SourceSchema>, ConnectorError>;

fn can_start_from(&self, last_checkpoint: (u64, u64)) -> Result<bool, ConnectorError>;
Expand Down Expand Up @@ -95,7 +95,10 @@ impl ColumnInfo {
}
}

pub fn get_connector(connection: Connection) -> Result<Box<dyn Connector>, ConnectorError> {
pub fn get_connector(
connection: Connection,
tables: Option<Vec<TableInfo>>,
) -> Result<Box<dyn Connector>, ConnectorError> {
let config = connection
.config
.ok_or_else(|| ConnectorError::MissingConfiguration(connection.name.clone()))?;
Expand All @@ -104,7 +107,7 @@ pub fn get_connector(connection: Connection) -> Result<Box<dyn Connector>, Conne
let config = map_connection_config(&config)?;
let postgres_config = PostgresConfig {
name: connection.name,
tables: None,
tables,
config,
};

Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/src/connectors/object_store/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl<T: DozerObjectStore> Connector for ObjectStoreConnector<T> {

fn get_schemas(
&self,
table_names: Option<Vec<TableInfo>>,
table_names: Option<&Vec<TableInfo>>,
) -> ConnectorResult<Vec<SourceSchema>> {
let mapper = SchemaMapper::new(self.config.clone());
mapper.get_schema(table_names)
Expand Down
6 changes: 3 additions & 3 deletions dozer-ingestion/src/connectors/object_store/schema_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ impl<T: Clone + Send + Sync> SchemaMapper<T> {
pub trait Mapper<T> {
fn get_schema(
&self,
tables: Option<Vec<TableInfo>>,
tables: Option<&Vec<TableInfo>>,
) -> Result<Vec<SourceSchema>, ConnectorError>;
}

impl<T: DozerObjectStore> Mapper<T> for SchemaMapper<T> {
fn get_schema(
&self,
tables: Option<Vec<TableInfo>>,
tables: Option<&Vec<TableInfo>>,
) -> Result<Vec<SourceSchema>, ConnectorError> {
let rt = Runtime::new().map_err(|_| ObjectStoreConnectorError::RuntimeCreationError)?;

let tables_list = tables.unwrap_or_else(|| {
let tables_list = tables.cloned().unwrap_or_else(|| {
self.config
.tables()
.iter()
Expand Down
4 changes: 2 additions & 2 deletions dozer-ingestion/src/connectors/postgres/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ impl PostgresConnector {
impl Connector for PostgresConnector {
fn get_schemas(
&self,
table_names: Option<Vec<TableInfo>>,
table_names: Option<&Vec<TableInfo>>,
) -> Result<Vec<SourceSchema>, ConnectorError> {
self.schema_helper
.get_schemas(table_names.as_deref())
.get_schemas(table_names.map(|t| &t[..]))
.map_err(PostgresConnectorError)
}

Expand Down
2 changes: 1 addition & 1 deletion dozer-ingestion/src/connectors/postgres/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn get_iterator(config: Connection, table_name: String) -> IngestionIterator
columns: None,
}];

let connector = get_connector(config).unwrap();
let connector = get_connector(config, Some(tables.clone())).unwrap();
connector.start(None, &ingestor, tables).unwrap();
});

Expand Down
19 changes: 5 additions & 14 deletions dozer-ingestion/src/connectors/postgres/xlog_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::connectors::postgres::helper;
use crate::connectors::ColumnInfo;
use crate::errors::{PostgresConnectorError, PostgresSchemaError};
use dozer_types::node::OpIdentifier;
use dozer_types::types::{Field, FieldDefinition, Operation, Record, Schema, SourceDefinition};
use dozer_types::types::{Field, FieldDefinition, Operation, Record, SourceDefinition};
use helper::postgres_type_to_dozer_type;
use postgres_protocol::message::backend::LogicalReplicationMessage::{
Begin, Commit, Delete, Insert, Relation, Update,
Expand Down Expand Up @@ -123,7 +123,7 @@ impl XlogMapper {
new: Record::new(
Some(dozer_types::types::SchemaIdentifier {
id: table.rel_id,
version: table.rel_id as u16,
version: 0,
}),
values,
None,
Expand All @@ -143,15 +143,15 @@ impl XlogMapper {
old: Record::new(
Some(dozer_types::types::SchemaIdentifier {
id: table.rel_id,
version: table.rel_id as u16,
version: 0,
}),
old_values,
None,
),
new: Record::new(
Some(dozer_types::types::SchemaIdentifier {
id: table.rel_id,
version: table.rel_id as u16,
version: 0,
}),
values,
None,
Expand All @@ -171,7 +171,7 @@ impl XlogMapper {
old: Record::new(
Some(dozer_types::types::SchemaIdentifier {
id: table.rel_id,
version: table.rel_id as u16,
version: 0,
}),
values,
None,
Expand Down Expand Up @@ -248,15 +248,6 @@ impl XlogMapper {
});
}

let _schema = Schema {
identifier: Some(dozer_types::types::SchemaIdentifier {
id: table.rel_id,
version: table.rel_id as u16,
}),
fields,
primary_index: vec![0],
};

self.relations_map.insert(rel_id, table);

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl Client {

pub fn fetch_tables(
&self,
tables: Option<Vec<TableInfo>>,
tables: Option<&Vec<TableInfo>>,
tables_indexes: HashMap<String, usize>,
keys: HashMap<String, Vec<String>>,
conn: &Connection<AutocommitOn>,
Expand Down
4 changes: 2 additions & 2 deletions dozer-ingestion/src/connectors/snowflake/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ impl Connector for SnowflakeConnector {
#[cfg(feature = "snowflake")]
fn get_schemas(
&self,
table_names: Option<Vec<TableInfo>>,
table_names: Option<&Vec<TableInfo>>,
) -> Result<Vec<SourceSchema>, ConnectorError> {
SchemaHelper::get_schema(&self.config, table_names)
}

#[cfg(not(feature = "snowflake"))]
fn get_schemas(
&self,
_table_names: Option<Vec<TableInfo>>,
_table_names: Option<&Vec<TableInfo>>,
) -> Result<Vec<SourceSchema>, ConnectorError> {
todo!()
}
Expand Down
6 changes: 3 additions & 3 deletions dozer-ingestion/src/connectors/snowflake/schema_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct SchemaHelper {}
impl SchemaHelper {
pub fn get_schema(
config: &SnowflakeConfig,
table_names: Option<Vec<TableInfo>>,
table_names: Option<&Vec<TableInfo>>,
) -> Result<Vec<SourceSchema>, ConnectorError> {
let client = Client::new(config);
let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
Expand All @@ -26,7 +26,7 @@ impl SchemaHelper {
.fetch_keys(&conn)
.map_err(ConnectorError::SnowflakeError)?;

let tables_indexes = table_names.clone().map_or(HashMap::new(), |tables| {
let tables_indexes = table_names.map_or(HashMap::new(), |tables| {
let mut result = HashMap::new();
for (idx, table) in tables.iter().enumerate() {
result.insert(table.name.clone(), idx);
Expand Down Expand Up @@ -68,7 +68,7 @@ impl SchemaHelper {
config: &SnowflakeConfig,
tables: &[TableInfo],
) -> Result<ValidationResults, ConnectorError> {
let schemas = Self::get_schema(config, Some(tables.to_vec()))?;
let schemas = Self::get_schema(config, Some(&tables.to_vec()))?;
let mut validation_result = ValidationResults::new();

let existing_schemas_names: Vec<String> = schemas.iter().map(|s| s.name.clone()).collect();
Expand Down
8 changes: 4 additions & 4 deletions dozer-ingestion/src/connectors/snowflake/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn test_disabled_connector_and_read_from_stream() {
thread::spawn(move || {
let tables: Vec<TableInfo> = vec![table];

let connector = get_connector(connection_config).unwrap();
let connector = get_connector(connection_config, None).unwrap();
let _ = connector.start(None, &ingestor, tables);
});

Expand Down Expand Up @@ -103,7 +103,7 @@ fn test_disabled_connector_and_read_from_stream() {
fn test_disabled_connector_get_schemas_test() {
run_connector_test("snowflake", |config| {
let connection = config.connections.get(0).unwrap();
let connector = get_connector(connection.clone()).unwrap();
let connector = get_connector(connection.clone(), None).unwrap();
let client = get_client(connection);

let env = create_environment_v3().map_err(|e| e.unwrap()).unwrap();
Expand Down Expand Up @@ -138,7 +138,7 @@ fn test_disabled_connector_get_schemas_test() {

let schemas = connector
.as_ref()
.get_schemas(Some(vec![TableInfo {
.get_schemas(Some(&vec![TableInfo {
name: table_name.clone(),
columns: None,
}]))
Expand Down Expand Up @@ -176,7 +176,7 @@ fn test_disabled_connector_get_schemas_test() {
fn test_disabled_connector_missing_table_validator() {
run_connector_test("snowflake", |config| {
let connection = config.connections.get(0).unwrap();
let connector = get_connector(connection.clone()).unwrap();
let connector = get_connector(connection.clone(), None).unwrap();

let not_existing_table = "not_existing_table".to_string();
let result = connector
Expand Down
Loading

0 comments on commit d46eced

Please sign in to comment.