Skip to content

Commit

Permalink
OZ-371: Analytics' catalog and database renamed (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
wluyima authored Jun 10, 2024
1 parent 0f10260 commit e3123c5
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 11 deletions.
2 changes: 1 addition & 1 deletion development/.env
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ CHANGELOG_FILE=db.changelog-master.xml
ODOO_ANALYTICS_TABLES='databasechangelog,account_account,product_category,sale_order,sale_order_line,res_partner,product_product,product_template,ir_model_data'

#Kafka
CREATE_TOPICS=openmrs.openmrs.appointment_service,openmrs.openmrs.appointment_service_type,openmrs.openmrs.care_setting,openmrs.openmrs.concept,openmrs.openmrs.concept_name,openmrs.openmrs.concept_reference_map,openmrs.openmrs.concept_reference_source,openmrs.openmrs.concept_reference_term,openmrs.openmrs.conditions,openmrs.openmrs.encounter,openmrs.openmrs.encounter_diagnosis,openmrs.openmrs.encounter_type,openmrs.openmrs.location,openmrs.openmrs.form,openmrs.openmrs.obs,openmrs.openmrs.order_type,openmrs.openmrs.orders,openmrs.openmrs.patient,openmrs.openmrs.patient_appointment,openmrs.openmrs.patient_appointment_provider,openmrs.openmrs.patient_identifier,openmrs.openmrs.patient_identifier_type,openmrs.openmrs.patient_program,openmrs.openmrs.program,openmrs.openmrs.person,openmrs.openmrs.person_attribute,openmrs.openmrs.person_attribute_type,openmrs.openmrs.person_name,openmrs.openmrs.person_address,openmrs.openmrs.visit_type,openmrs.openmrs.visit,openmrs.openmrs.visit_attribute,openmrs.openmrs.visit_attribute_type,odoo.public.sale_order,odoo.public.sale_order_line,odoo.public.res_partner,odoo.public.product_product,odoo.public.product_template,odoo.public.ir_model_data
CREATE_TOPICS=emr.openmrs.appointment_service,emr.openmrs.appointment_service_type,emr.openmrs.care_setting,emr.openmrs.concept,emr.openmrs.concept_name,emr.openmrs.concept_reference_map,emr.openmrs.concept_reference_source,emr.openmrs.concept_reference_term,emr.openmrs.conditions,emr.openmrs.encounter,emr.openmrs.encounter_diagnosis,emr.openmrs.encounter_type,emr.openmrs.location,emr.openmrs.form,emr.openmrs.obs,emr.openmrs.order_type,emr.openmrs.orders,emr.openmrs.patient,emr.openmrs.patient_appointment,emr.openmrs.patient_appointment_provider,emr.openmrs.patient_identifier,emr.openmrs.patient_identifier_type,emr.openmrs.patient_program,emr.openmrs.program,emr.openmrs.person,emr.openmrs.person_attribute,emr.openmrs.person_attribute_type,emr.openmrs.person_name,emr.openmrs.person_address,emr.openmrs.visit_type,emr.openmrs.visit,emr.openmrs.visit_attribute,emr.openmrs.visit_attribute_type,odoo.public.sale_order,odoo.public.sale_order_line,odoo.public.res_partner,odoo.public.product_product,odoo.public.product_template,odoo.public.ir_model_data

# Postgresql
POSTGRES_USER=postgres
Expand Down
10 changes: 5 additions & 5 deletions development/data/config.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
# Configuration Jdbc catalogs which Map to actual databases allowing direct read and write without temporary tables
jdbcCatalogs:
# Name of the catalog
- name: analytics
- name: ozone
# Name of the default database in the catalog
defaultDatabase: '${ANALYTICS_DB_NAME:-analytics}'
# Database username
username: '${ANALYTICS_DB_USER:-analytics}'
# Databse password
# Database password
password: '${ANALYTICS_DB_PASSWORD:-analytics}'
# Jdbc Database Url
baseUrl: 'jdbc:postgresql://${ANALYTICS_DB_HOST:-localhost}:${ANALYTICS_DB_PORT:-5432}'
driver: postgresql
# Configuration for Kafka data streams used for streaming analytics
kafkaStreams:
# Topic prefix generated by Kafka Connect
- topicPrefix: openmrs.openmrs
- topicPrefix: emr.openmrs
# Path to the table definitions for temporary source tables
tableDefinitionsPath: '${ANALYTICS_SOURCE_TABLES_PATH:-/analytics/source-tables}/openmrs'
# Kafka bootstrap servers
Expand All @@ -38,7 +38,7 @@ jdbcSources:
# Configuration for Jdbc data sinks
jdbcSinks:
# Name of the jdbc catalog to write into. The catalog must be defined in the jdbcCatalogs section
- jdbcCatalog: analytics
- jdbcCatalog: ozone
# Name of the databse. The database must be defined in the jdbcCatalog above
databaseName: analytics
# The path to the queries to use for flattening data
Expand All @@ -54,4 +54,4 @@ fileSinks:
# The tag to use for the exported output. This is used to create a subdirectory in the exportOutputPath
exportOutPutTag: '${EXPORT_OUTPUT_TAG:-location1}'
# The format for export files. Currently only parquet and csv are supported
format: parquet
format: parquet
2 changes: 1 addition & 1 deletion development/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ services:
- CONNECT_MYSQL_PASSWORD=${CONNECT_MYSQL_PASSWORD:-${CONNECT_MYSQL_PASSWORD}}
- CONNECT_MYSQL_PORT=${CONNECT_MYSQL_PORT:-${CONNECT_MYSQL_PORT}}
- CONNECT_MYSQL_SERVER_ID=37991
- CONNECT_MYSQL_SERVER_NAME=openmrs
- CONNECT_MYSQL_SERVER_NAME=emr
- CONNECT_MYSQL_INCLUDE_LIST=openmrs
- CONNECT_TABLE_EXCLUDE_LIST=openmrs.audit_log
- CONNECT_MYSQL_HISTROY_TOPIC=dbhistory.openmrs
Expand Down
21 changes: 21 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
<analytics.queries.version>1.1.0-SNAPSHOT</analytics.queries.version>
<ozone.version>1.0.0-SNAPSHOT</ozone.version>
<spring.version>5.3.23</spring.version>
<mockitoJunitVersion>5.7.0</mockitoJunitVersion>
<powerMockVersion>2.0.9</powerMockVersion>
<byteBuddyVersion>1.14.10</byteBuddyVersion>

<!-- Plugins -->
<surefire.version>3.2.5</surefire.version>
Expand Down Expand Up @@ -236,6 +239,24 @@
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockitoJunitVersion}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-core</artifactId>
<version>${powerMockVersion}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>${byteBuddyVersion}</version>
<scope>test</scope>
</dependency>
</dependencies>

<repositories>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/ozonehis/data/pipelines/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public class Constants {
public static final String DEFAULT_ANALYTICS_CONFIG_FILE_PATH = "/etc/analytics/config.yaml";

public static final String PROP_FLINK_REST_PORT = "FLINK_REST_PORT";

public static final String CFG_PLACEHOLDER_CATALOG = "{ANALYTICS_CATALOG}";
}
13 changes: 10 additions & 3 deletions src/main/java/com/ozonehis/data/pipelines/export/ExportJob.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.ozonehis.data.pipelines.export;

import com.ozonehis.data.pipelines.BaseJob;
import com.ozonehis.data.pipelines.Constants;
import com.ozonehis.data.pipelines.config.AppConfiguration;
import com.ozonehis.data.pipelines.config.FileSinkConfig;
import com.ozonehis.data.pipelines.utils.CommonUtils;
import com.ozonehis.data.pipelines.utils.ConnectorUtils;
Expand Down Expand Up @@ -51,11 +53,16 @@ public void beforeExecute() {

@Override
protected void doExecute() {
for (FileSinkConfig fileSinkConfig :
CommonUtils.getConfig(configFilePath).getFileSinks()) {
AppConfiguration cfg = CommonUtils.getConfig(configFilePath);
if (cfg.getJdbcCatalogs().size() > 1) {
throw new RuntimeException("Found multiple configured JDBC catalogs");
}

for (FileSinkConfig fileSinkConfig : cfg.getFileSinks()) {
List<QueryFile> queries = CommonUtils.getSQL(fileSinkConfig.getQueryPath());
final String catalog = cfg.getJdbcCatalogs().get(0).getName();
for (QueryFile query : queries) {
tableEnv.executeSql(query.content);
tableEnv.executeSql(query.content.replace(Constants.CFG_PLACEHOLDER_CATALOG, catalog));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class QueryFile {

public String parent;

QueryFile(String parent, String fileName, String content) {
public QueryFile(String parent, String fileName, String content) {
this.content = content;
this.fileName = fileName;
this.parent = parent;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.ozonehis.data.pipelines.export;

import static com.ozonehis.data.pipelines.Constants.CFG_PLACEHOLDER_CATALOG;
import static org.mockito.Mockito.when;

import com.ozonehis.data.pipelines.config.AppConfiguration;
import com.ozonehis.data.pipelines.config.FileSinkConfig;
import com.ozonehis.data.pipelines.config.JdbcCatalogConfig;
import com.ozonehis.data.pipelines.utils.CommonUtils;
import com.ozonehis.data.pipelines.utils.QueryFile;
import java.util.List;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.powermock.reflect.internal.WhiteboxImpl;

@ExtendWith(MockitoExtension.class)
public class ExportJobTest {

private static final String TEST_CFG_PATH = "/some/test/path";

private static MockedStatic<CommonUtils> mockCommonUtils;

private ExportJob job;

@Mock
private AppConfiguration mockConfig;

@Mock
private StreamTableEnvironment mockTableEnv;

@BeforeEach
public void setupClass() {
mockCommonUtils = Mockito.mockStatic(CommonUtils.class);
when(CommonUtils.getConfig(TEST_CFG_PATH)).thenReturn(mockConfig);
job = new ExportJob();
WhiteboxImpl.setInternalState(job, "configFilePath", TEST_CFG_PATH);
WhiteboxImpl.setInternalState(job, "tableEnv", mockTableEnv);
}

@AfterEach
public void tearDownClass() {
mockCommonUtils.close();
}

@Test
public void doExecute_shouldRejectAConfigWithMultipleJdbcCatalogs() {
when(mockConfig.getJdbcCatalogs()).thenReturn(List.of(new JdbcCatalogConfig(), new JdbcCatalogConfig()));
RuntimeException e = Assert.assertThrows(RuntimeException.class, () -> job.doExecute());
Assert.assertEquals("Found multiple configured JDBC catalogs", e.getMessage());
}

@Test
public void doExecute_shouldReplacePlaceholderWithCatalogNameInExportQueries() {
final String catalog = "test";
final String queryPath = "/test/query/path";
final String query =
"INSERT into patients SELECT t.* from " + CFG_PLACEHOLDER_CATALOG + ".analytics.patients t";
JdbcCatalogConfig catalogCfg = new JdbcCatalogConfig();
catalogCfg.setName(catalog);
when(CommonUtils.getSQL(queryPath)).thenReturn(List.of(new QueryFile(null, null, query)));
FileSinkConfig fileSinkCfg = new FileSinkConfig();
fileSinkCfg.setQueryPath(queryPath);
when(mockConfig.getJdbcCatalogs()).thenReturn(List.of(catalogCfg));
when(mockConfig.getFileSinks()).thenReturn(List.of(fileSinkCfg));

job.doExecute();

Mockito.verify(mockTableEnv).executeSql(query.replace(CFG_PLACEHOLDER_CATALOG, catalog));
}
}

0 comments on commit e3123c5

Please sign in to comment.