Skip to content

Commit

Permalink
OZ-163: Tables creation and flattening queries to raw SQL files + Add…
Browse files Browse the repository at this point in the history
…ed Odoo pipeline.

* OZ-163: Externalise tables creation and flattening queries to raw SQL files

* Add DSL readme

* Address issues raised in last round of testing

* CONNECT_MYSQL_SERVER_ID=37991

---------

Co-authored-by: Dimitri R <[email protected]>
  • Loading branch information
enyachoke and mks-d authored Sep 18, 2023
1 parent 1caf5c7 commit dc62e8e
Show file tree
Hide file tree
Showing 145 changed files with 1,916 additions and 2,900 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ target/
**/.classpath
**/.project
**/.settings/*
.vscode/*

# AWS User-specific
.idea/**/aws.xml
Expand Down
163 changes: 104 additions & 59 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,62 +39,107 @@ This repository contains ETL [Flink](hhttps://ci.apache.org/projects/flink/flink

## Tech

- [Flink](hhttps://ci.apache.org/projects/flink/flink-docs-master/)

- [Flink CDC connectors](https://github.com/ververica/flink-cdc-connectors) - For streaming



## Building and Installation



### Prerequisites

- A running Flink [installation](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/local_installation/)

- A running OpenMRS installation (based on the [openmrs-reference-application 3.x ](https://github.com/openmrs/openmrs-distro-referenceapplication/tree/3.x) distribution database)

- A running PostgreSQL installation

- A liquibase installation



### Build and install

- Update the [job.properties](./job.properties) files with the details for your OpenMRS MySQL database and the analytics PostgreSQL database

- Build with `mvn clean package`

- Retrieve and apply the [Liquibase file](https://github.com/ozone-his/ozonepro-docker/tree/master/flink/liquidbase) needed to create tables on the analytics database (more on installation and usage of Liquibase see [liquibase](https://www.liquibase.org/get-started/quickstart))

- Run with `flink run -m <flink-job-manager-url> target/flink-jobs-1.0-SNAPSHOT.jar`

#### Building Docker image for use with https://github.com/ozone-his/ozone-analytics

`docker build -t mekomsolutions/ozone-flink-jobs .`

### Layout

src/main/java/net/mekomsolutions/data/pipelines/batch

src/main/java/net/mekomsolutions/data/pipelines/export

src/main/java/net/mekomsolutions/data/pipelines/streaming

src/main/java/net/mekomsolutions/data/pipelines/utils

src/main/java/net/mekomsolutions/data/pipelines/shared



### Adding new jobs
#### Sources and Sinks
This project uses flink's [Table API & SQL](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/overview/) ,for you to access data you need to setup [temporary](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/create/) tables both for data sources and data sinks. For most cases all the tables you will every need for writting OpenMRS data processing jobs are already added under `src/main/java/net/mekomsolutions/data/pipelines/shared/dsl` . But incase you need to add more you can add them to either `src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source` or `src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink` You will then need to register them in factory class `src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/TableDSLFactory.java` . See `src/main/java/net/mekomsolutions/data/pipelines/streaming/StreamingETLJob.java` for an example of how to use the factory to setup sources and sinks.

#### Setting up jobs
Assuming you already have all the source and sink tables setup adding new jobs involves;

- Adding your SQL files to `src/main/resources`
- Registering the jobs in `flink-jobs/src/main/resources/jobs.json`
- [Flink](hhttps://ci.apache.org/projects/flink/flink-docs-master/) - For ETL
- [Kafka connect](https://docs.confluent.io/platform/current/connect/index.html) - For CDC
- [Kafka](https://kafka.apache.org/) - For streaming data

### Development

#### DSL

The project contains reference DSLs for defining the ETL jobs. The DSLs are located in the `development/dsl` directory.
- [Flattening DSL](development/dsl/flattening/README.md) - For flattening data from OpenMRS
- [Parquet Export DSL](development/dsl/parquet/README.md) - For exporting data to parquet files


#### Step1: startup backing services
The project assumes you already have an Ozone HIS instance running. If not please follow the instructions [here](https://github.com/ozone-his/ozone-docker) or [here](https://github.com/ozone-his/ozonepro-docker) to get one up and running.

```cd development```
##### Export environment variables
```bash
export ANALYTICS_DB_HOST=gateway.docker.internal \
export ANALYTICS_DB_PORT=5432 \
export CONNECT_MYSQL_HOSTNAME=gateway.docker.internal \
export CONNECT_MYSQL_PORT=3306 \
export CONNECT_MYSQL_USER=root \
export CONNECT_MYSQL_PASSWORD=3cY8Kve4lGey \
export CONNECT_ODOO_DB_HOSTNAME=gateway.docker.internal \
export CONNECT_ODOO_DB_PORT=5432 \
export CONNECT_ODOO_DB_NAME=odoo \
export CONNECT_ODOO_DB_USER=postgres \
export CONNECT_ODOO_DB_PASSWORD=password
```

```docker-compose up -d```
```cd ../```
***Note***: The `gateway.docker.internal` is a special DNS name that resolves to the host machine from within containers. It is only available for Mac and Windows. For Linux, use the docker host IP by default ```172.17.0.1```

#### Step 2: Compile
```mvn clean install compile```

#### Step 3:
##### Run Streaming job
``` bash
export ANALYTICS_DB_USER=analytics;\
export ANALYTICS_DB_PASSWORD=password;\
export ANALYTICS_DB_HOST=localhost;\
export ANALYTICS_DB_PORT=5432;\
export ANALYTICS_DB_NAME=analytics;\
export ANALYTICS_SOURCE_TABLES_PATH=$(pwd)/development/dsl/flattening/tables/;\
export ANALYTICS_QUERIES_PATH=$(pwd)/development/dsl/flattening/queries/;\
export OPENMRS_DB_NAME=openmrs;\
export OPENMRS_DB_USER=root;\
export OPENMRS_DB_PASSWORD=3cY8Kve4lGey;\
export OPENMRS_DB_HOST=localhost;\
export OPENMRS_DB_PORT=3306;\
export ODOO_DB_NAME=odoo;\
export ODOO_DB_USER=postgres;\
export ODOO_DB_PASSWORD=password;\
export ODOO_DB_HOST=localhost;\
export ODOO_DB_PORT=5432;
```

```mvn compile exec:java -Dexec.mainClass="com.ozonehis.data.pipelines.streaming.StreamingETLJob" -Dexec.classpathScope="compile"```

##### Run Batch job
```bash
export ANALYTICS_DB_USER=analytics;\
export ANALYTICS_DB_PASSWORD=password;\
export ANALYTICS_DB_HOST=localhost;\
export ANALYTICS_DB_PORT=5432;\
export ANALYTICS_DB_NAME=analytics;\
export ANALYTICS_SOURCE_TABLES_PATH=$(pwd)/development/dsl/flattening/tables/;\
export ANALYTICS_QUERIES_PATH=$(pwd)/development/dsl/flattening/queries/;\
export OPENMRS_DB_NAME=openmrs;\
export OPENMRS_DB_USER=root;\
export OPENMRS_DB_PASSWORD=3cY8Kve4lGey;\
export OPENMRS_DB_HOST=localhost;\
export OPENMRS_DB_PORT=3306;\
export ODOO_DB_NAME=odoo;\
export ODOO_DB_USER=postgres;\
export ODOO_DB_PASSWORD=password;\
export ODOO_DB_HOST=localhost;\
export ODOO_DB_PORT=5432;
```
```mvn compile exec:java -Dexec.mainClass="com.ozonehis.data.pipelines.batch.BatchETLJob" -Dexec.classpathScope="compile"```

##### Run Parquet Export job
```mkdir -p development/data/parquet/```
``` bash
export ANALYTICS_DB_USER=analytics;\
export ANALYTICS_DB_PASSWORD=password;\
export ANALYTICS_DB_HOST=localhost;\
export ANALYTICS_DB_PORT=5432;\
export ANALYTICS_DB_NAME=analytics;\
export EXPORT_DESTINATION_TABLES_PATH=$(pwd)/development/dsl/parquet/tables/;\
export EXPORT_SOURCE_QUERIES_PATH=$(pwd)/development/dsl/parquet/queries;\
export EXPORT_OUTPUT_PATH=$(pwd)/development/data/parquet/;\
export EXPORT_OUTPUT_TAG=h1;
```
```mvn compile exec:java -Dexec.mainClass="com.ozonehis.data.pipelines.export.BatchParquetExport" -Dexec.classpathScope="compile"```


## Gotchas
When streaming data from Postgres See
[consuming-data-produced-by-debezium-postgres-connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/debezium/#consuming-data-produced-by-debezium-postgres-connector)
43 changes: 43 additions & 0 deletions development/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
OPENMRS_DB_USER=openmrs
OPENMRS_DB_PASSWORD=password
OPENMRS_USER=superman
OPENMRS_PASSWORD=Admin123
OPENMRS_DB_HOST=mysql
OPENMRS_DB_PORT=3306
OPENMRS_DB_NAME=openmrs


ANALYTICS_DB_USER=analytics
ANALYTICS_DB_PASSWORD=password
ANALYTICS_DB_HOST=postgresql
ANALYTICS_DB_PORT=5432
ANALYTICS_DB_NAME=analytics
ANALYTICS_SOURCE_TABLES_PATH=
ANALYTICS_QUERIES_PATH=
CHANGELOG_FILE=db.changelog-master.xml
ODOO_ANALYTICS_TABLES='databasechangelog,account_account,product_category,sale_order'

#Kafka
CREATE_TOPICS=appointment_service:1:1,appointment_service_type:1:1,care_setting:1:1,concept:1:1,concept_name:1:1,concept_reference_map:1:1,concept_reference_source:1:1,concept_reference_term:1:1,conditions:1:1,encounter:1:1,encounter_diagnosis:1:1,encounter_type:1:1,location:1:1,form:1:1,obs:1:1,order_type:1:1,orders:1:1,patient:1:1,patient_appointment:1:1,patient_appointment_provider:1:1,patient_identifier:1:1,patient_identifier_type:1:1,patient_program:1:1,program:1:1,person:1:1,person_name:1:1,person_address:1:1,visit_type:1:1,visit:1:1

# Postgresql
POSTGRES_USER=postgres
POSTGRES_PASSWORD=password
POSTGRES_DB_HOST=postgresql
CONNECT_ODOO_DB_HOSTNAME=postgresql
CONNECT_ODOO_DB_PORT=5432
CONNECT_ODOO_DB_NAME=odoo
CONNECT_ODOO_DB_USER=postgres
CONNECT_ODOO_DB_PASSWORD=password

CONNECT_MYSQL_HOSTNAME=mysql
CONNECT_MYSQL_PORT=3306
CONNECT_MYSQL_USER=root
CONNECT_MYSQL_PASSWORD=3cY8Kve4lGey

# MySQL
MYSQL_ROOT_PASSWORD=3cY8Kve4lGey

DEBEZIUM_VERSION=1.9
UID=1000
GID=1001
1 change: 1 addition & 0 deletions development/data/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
parquet
1 change: 1 addition & 0 deletions development/debezium-connect/jars/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
!*.jar
Binary file not shown.
113 changes: 113 additions & 0 deletions development/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
version: '3.8'
services:
kafka:
restart: on-failure
image: debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
- 29092:29092
environment:
- CLUSTER_ID=5Yr1SIgYQz-b-dgRabWx4g
- BROKER_ID=1
- KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
# - CREATE_TOPICS=${CREATE_TOPICS}
- KAFKA_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093,PLAINTEXT_HOST://0.0.0.0:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
volumes:
- kafka-data:/kafka/data
healthcheck:
test:
[
"CMD-SHELL",
"/bin/bash",
"-c",
"./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list"
]
connect:
restart: on-failure
image: debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8383:8083
volumes:
- ./debezium-connect/jars/TimestampConverter-1.2.4-SNAPSHOT.jar:/kafka/connect/debezium-connector-mysql/TimestampConverter-1.2.4-SNAPSHOT.jar
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_CONFIG_PROVIDERS=file
- CONNECT_CONFIG_PROVIDERS_FILE_CLASS=org.apache.kafka.common.config.provider.FileConfigProvider
- CONNECT_MYSQL_HOSTNAME=${CONNECT_MYSQL_HOSTNAME:-mysql}
- CONNECT_MYSQL_USERNAME=${CONNECT_MYSQL_USERNAME:-root}
- CONNECT_MYSQL_PASSWORD=${CONNECT_MYSQL_PASSWORD:-${CONNECT_MYSQL_PASSWORD}}
- CONNECT_MYSQL_PORT=${CONNECT_MYSQL_PORT:-${CONNECT_MYSQL_PORT}}
- CONNECT_MYSQL_SERVER_ID=37991
- CONNECT_MYSQL_SERVER_NAME=openmrs
- CONNECT_MYSQL_INCLUDE_LIST=openmrs
- CONNECT_TABLE_EXCLUDE_LIST=openmrs.audit_log
- CONNECT_MYSQL_HISTROY_TOPIC=dbhistory.openmrs
- CONNECT_MYSQL_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- CONNECT_ODOO_DB_HOSTNAME=${CONNECT_ODOO_DB_HOSTNAME:-postgresql}
- CONNECT_ODOO_DB_USERNAME=${CONNECT_ODOO_DB_USERNAME:-${POSTGRES_USER}}
- CONNECT_ODOO_DB_PASSWORD=${CONNECT_ODOO_DB_PASSWORD:-${POSTGRES_PASSWORD}}
- CONNECT_ODOO_DB_PORT=${CONNECT_ODOO_DB_PORT:-5432}
- CONNECT_ODOO_DB_SERVER_NAME=odoo
- CONNECT_ODOO_DB_INCLUDE_LIST=odoo

connect-setup:
restart: on-failure
build: ./setup-connect
depends_on:
- connect
environment:
- CONNECT_HOST=connect
- SOURCE_DB_HOST=${CONNECT_MYSQL_HOSTNAME}
- SOURCE_DB_PORT=${CONNECT_MYSQL_PORT}
- ODOO_DB_HOST=${ODOO_DB_HOST:-${CONNECT_ODOO_DB_HOSTNAME}}
- ODOO_DB_PORT=${ODOO_DB_PORT:-${CONNECT_ODOO_DB_PORT}}
- FLINK_JOBMANAGER_HOST=jobmanager
kowl:
image: rsmnarts/kowl
container_name: "kowl"
restart: on-failure
entrypoint: /bin/sh
command: -c "echo \"$$KOWL_CONFIG_FILE\" > /tmp/config.yml; /app/kowl"
ports:
- "8282:8080"
environment:
CONFIG_FILEPATH: /tmp/config.yml
KOWL_CONFIG_FILE: |
kafka:
brokers: ["kafka:9092"]
connect:
enabled: true
clusters:
- name: Ozone
url: http://connect:8083
depends_on:
- "kafka"
- "connect"
odoo-replica-identity-migration:
image: liquibase/liquibase
restart: on-failure
volumes:
- ./liquibase/odoo/:/liquibase/changelog/
command: update -Dtables='databasechangelog,account_account' --username=${CONNECT_ODOO_DB_USER} --password=${CONNECT_ODOO_DB_PASSWORD} --changeLogFile=${CHANGELOG_FILE} --url=jdbc:postgresql://${CONNECT_ODOO_DB_HOSTNAME}:${CONNECT_ODOO_DB_PORT}/${CONNECT_ODOO_DB_NAME}
environment:
- INSTALL_POSTGRESQL='true'
- TABLES=${ODOO_ANALYTICS_TABLES}
analytics-migration:
image: liquibase/liquibase
restart: on-failure
volumes:
- ./liquibase/analytics/:/liquibase/changelog/
command: update --username=${ANALYTICS_DB_USER} --password=${ANALYTICS_DB_PASSWORD} --changeLogFile=${CHANGELOG_FILE} --url=jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME}
environment:
- INSTALL_POSTGRESQL='true'

volumes:
kafka-data: ~
Binary file added development/dsl/.DS_Store
Binary file not shown.
5 changes: 5 additions & 0 deletions development/dsl/flattening/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
This directory contains the DSL for flattening data from OpenMRS and Odoo the queries are shared by the batch and streaming jobs. Currently the project supports read from OpenMRS and Odoo. The folder is divided into:
- `tables` - Contains the table definitions for the source tables in OpenMRS and Odoo. (Direct 1-1 mapping to the database tables)
- `queries` - Contains the queries for flattening the data.

***Note***: We don't have destination(SINK) tables for the data is writted directly the analytics database via [JDBC Catalog](https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/jdbc.html#jdbc-catalog).
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ select
encounter_id,
patient_id,
certainty,
`rank`,
uuid,
creator,
date_created,
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
9 changes: 9 additions & 0 deletions development/dsl/flattening/queries/sale_order.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
select
`id`,
`amount_total`,
`invoice_status`,
`name`,
`note`,
`state`
from
sale_order
File renamed without changes.
Empty file.
8 changes: 8 additions & 0 deletions development/dsl/flattening/tables/odoo/sale_order.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE `sale_order` (
`id` INT,
`amount_total` DECIMAL(10),
`invoice_status` VARCHAR,
`name` VARCHAR,
`note` VARCHAR,
`state` VARCHAR
)
Loading

0 comments on commit dc62e8e

Please sign in to comment.