From 7b3e6f142c9fc6220f05d12a07d305dbde85cd57 Mon Sep 17 00:00:00 2001 From: Emmanuel Nyachoke Date: Wed, 6 Jul 2022 09:50:15 +0300 Subject: [PATCH] First commit extracted from https://github.com/ozone-his/ozone-analytics --- .dockerignore | 2 + .gitignore | 116 +++++ Dockerfile | 24 ++ Dockerfile_batch | 14 + README.md | 88 ++++ batch-export.sh | 7 + job.properties | 16 + pom.xml | 406 ++++++++++++++++++ run.sh | 21 + .../data/pipelines/batch/BatchJob.java | 4 + .../pipelines/export/BatchParquetExport.java | 69 +++ .../pipelines/migrations/RunMigrations.java | 59 +++ .../pipelines/shared/dsl/TableDSLFactory.java | 116 +++++ .../pipelines/shared/dsl/TableSQLDSL.java | 9 + .../dsl/sink/filesystem/ConceptsFs.java | 44 ++ .../dsl/sink/filesystem/ObservationsFs.java | 77 ++++ .../dsl/sink/filesystem/PatientsFs.java | 51 +++ .../shared/dsl/sink/filesystem/VisitsFs.java | 57 +++ .../shared/dsl/sink/jdbc/Concepts.java | 46 ++ .../shared/dsl/sink/jdbc/Observations.java | 79 ++++ .../shared/dsl/sink/jdbc/Patients.java | 53 +++ .../shared/dsl/sink/jdbc/Visits.java | 59 +++ .../pipelines/shared/dsl/source/Concept.java | 56 +++ .../shared/dsl/source/ConceptName.java | 53 +++ .../dsl/source/ConceptReferenceMap.java | 45 ++ .../dsl/source/ConceptReferenceSource.java | 50 +++ .../dsl/source/ConceptReferenceTerm.java | 50 +++ .../shared/dsl/source/Encounter.java | 51 +++ .../shared/dsl/source/EncounterType.java | 49 +++ .../pipelines/shared/dsl/source/Location.java | 71 +++ .../data/pipelines/shared/dsl/source/Obs.java | 66 +++ .../pipelines/shared/dsl/source/Patient.java | 46 ++ .../shared/dsl/source/PatientIdentifier.java | 51 +++ .../dsl/source/PatientIdentifierType.java | 55 +++ .../pipelines/shared/dsl/source/Person.java | 55 +++ .../shared/dsl/source/PersonAddress.java | 72 ++++ .../shared/dsl/source/PersonName.java | 56 +++ .../pipelines/shared/dsl/source/Visit.java | 52 +++ .../shared/dsl/source/VisitType.java | 48 +++ .../data/pipelines/shared/jobs/Job.java | 22 + .../pipelines/streaming/StreamingETLJob.java | 171 ++++++++ .../data/pipelines/utils/CommonUtils.java | 16 + .../data/pipelines/utils/ConnectorUtils.java | 13 + src/main/resources/concepts.sql | 14 + src/main/resources/database/dbchangelog.xml | 128 ++++++ src/main/resources/jobs.json | 18 + src/main/resources/log4j.properties | 10 + src/main/resources/log4j2.properties | 25 ++ src/main/resources/observations.sql | 53 +++ src/main/resources/patients.sql | 25 ++ src/main/resources/visits.sql | 35 ++ 51 files changed, 2873 insertions(+) create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 Dockerfile_batch create mode 100644 README.md create mode 100644 batch-export.sh create mode 100644 job.properties create mode 100644 pom.xml create mode 100644 run.sh create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/batch/BatchJob.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/export/BatchParquetExport.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/migrations/RunMigrations.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/TableDSLFactory.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/TableSQLDSL.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/ConceptsFs.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/ObservationsFs.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/PatientsFs.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/VisitsFs.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Concepts.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Observations.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Patients.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Visits.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Concept.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptName.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceMap.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceSource.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceTerm.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Encounter.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/EncounterType.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Location.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Obs.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Patient.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PatientIdentifier.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PatientIdentifierType.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Person.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PersonAddress.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PersonName.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Visit.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/VisitType.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/shared/jobs/Job.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/streaming/StreamingETLJob.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/utils/CommonUtils.java create mode 100644 src/main/java/net/mekomsolutions/data/pipelines/utils/ConnectorUtils.java create mode 100644 src/main/resources/concepts.sql create mode 100644 src/main/resources/database/dbchangelog.xml create mode 100644 src/main/resources/jobs.json create mode 100644 src/main/resources/log4j.properties create mode 100644 src/main/resources/log4j2.properties create mode 100644 src/main/resources/observations.sql create mode 100644 src/main/resources/patients.sql create mode 100644 src/main/resources/visits.sql diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..e673575 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +.idea/ +target/ \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8590616 --- /dev/null +++ b/.gitignore @@ -0,0 +1,116 @@ + +# Created by https://www.toptal.com/developers/gitignore/api/intellij +# Edit at https://www.toptal.com/developers/gitignore?templates=intellij + +### Intellij ### +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 +.idea/ +target/ +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml +dependency-reduced-pom.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# SonarLint plugin +.idea/sonarlint/ + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +### Intellij Patch ### +# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721 + +# *.iml +# modules.xml +# .idea/misc.xml +# *.ipr + +# Sonarlint plugin +# https://plugins.jetbrains.com/plugin/7973-sonarlint +.idea/**/sonarlint/ + +# SonarQube Plugin +# https://plugins.jetbrains.com/plugin/7238-sonarqube-community-plugin +.idea/**/sonarIssues.xml + +# Markdown Navigator plugin +# https://plugins.jetbrains.com/plugin/7896-markdown-navigator-enhanced +.idea/**/markdown-navigator.xml +.idea/**/markdown-navigator-enh.xml +.idea/**/markdown-navigator/ + +# Cache file creation bug +# See https://youtrack.jetbrains.com/issue/JBR-2257 +.idea/$CACHE_FILE$ + +# CodeStream plugin +# https://plugins.jetbrains.com/plugin/12206-codestream +.idea/codestream.xml + +# End of https://www.toptal.com/developers/gitignore/api/intellij \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..54b61bb --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ + +FROM maven:3.5-jdk-8-alpine as builder +# add pom.xml and source code +ADD ./pom.xml pom.xml +#cache dependencies +RUN mvn dependency:go-offline +ADD ./src src/ +RUN mvn clean package + +FROM flink:1.14.4-scala_2.12-java8 +RUN wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.4/flink-connector-jdbc_2.12-1.14.4.jar -O /opt/flink/lib/flink-connector-jdbc_2.12-1.14.4.jar +RUN wget https://repo1.maven.org/maven2/org/apache/flink/flink-parquet_2.11/1.13.5/flink-parquet_2.11-1.13.5.jar -O /opt/flink/lib/flink-parquet_2.11-1.13.5.jar +RUN wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar -O /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar +RUN wget https://repo1.maven.org/maven2/org/apache/parquet/parquet-hadoop/1.12.2/parquet-hadoop-1.12.2.jar -O /opt/flink/lib/parquet-hadoop-1.12.2.jar +RUN wget https://repo1.maven.org/maven2/org/apache/parquet/parquet-common/1.12.2/parquet-common-1.12.2.jar -O /opt/flink/lib/parquet-common-1.12.2.jar +RUN wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.13/httpclient-4.5.13.jar -O /opt/flink/lib/httpclient-4.5.13.jar +RUN wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.15/httpcore-4.4.15.jar -O /opt/flink/lib/httpcore-4.4.15.jar +RUN wget https://repo1.maven.org/maven2/com/ecwid/consul/consul-api/1.4.5/consul-api-1.4.5.jar -O /opt/flink/lib/consul-api-1.4.5.jar +RUN wget https://repo1.maven.org/maven2/com/google/code/gson/gson/2.9.0/gson-2.9.0.jar -O /opt/flink/lib/gson-2.9.0.jar +COPY --from=builder target/ozone-etl-flink-1.0-SNAPSHOT-etl-streaming.jar /opt/flink/usrlib/streaming-etl-job.jar +COPY --from=builder target/ozone-etl-flink-1.0-SNAPSHOT-etl-migrations.jar /opt/flink/usrlib/ozone-etl-migrations.jar +COPY run.sh /run.sh +RUN chmod +x /run.sh +ENTRYPOINT ["/docker-entrypoint.sh"] \ No newline at end of file diff --git a/Dockerfile_batch b/Dockerfile_batch new file mode 100644 index 0000000..7761bcd --- /dev/null +++ b/Dockerfile_batch @@ -0,0 +1,14 @@ +FROM maven:3.5-jdk-8-alpine as builder +# add pom.xml and source code +ADD ./pom.xml pom.xml +ADD ./src src/ +RUN mvn clean install -Pbatch + +FROM openjdk:8-jre-alpine +ENV OUTPUT_DIR=/parquet +COPY --from=builder target/ozone-etl-flink-1.0-SNAPSHOT-etl-export.jar etl-export.jar +COPY --from=builder target/ozone-etl-flink-1.0-SNAPSHOT-etl-migrations.jar /opt/flink/usrlib/ozone-etl-migrations.jar +ADD ./batch-export.sh ./batch-export.sh +COPY run.sh /run.sh +RUN chmod +x /run.sh +CMD [ "/bin/sh","./batch-export.sh" ] diff --git a/README.md b/README.md new file mode 100644 index 0000000..e941d3b --- /dev/null +++ b/README.md @@ -0,0 +1,88 @@ + +# Ozone ETL pipelines + +## Flink + + + +This repository contains an ETL [Flink](hhttps://ci.apache.org/projects/flink/flink-docs-master/) [job](https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/job_scheduling/#:~:text=A%20Flink%20job%20is%20first,it%20cancels%20all%20running%20tasks) for flattening [Ozone HIS](https://github.com/ozone-his) data. + +## Features + + + +- Provides both [batch]() and [streaming]() modes + +- Currently flattens OpenMRS to output reporting friendly tables for: + +- patients + +- observations + +- visits + +- concepts + + + +## Tech + +- [Flink](hhttps://ci.apache.org/projects/flink/flink-docs-master/) + +- [Flink CDC connectors](https://github.com/ververica/flink-cdc-connectors) - For streaming + + + +## Building and Installation + + + +### Prerequisites + +- A running Flink [installation](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/local_installation/) + +- A running OpenMRS installation + +- A running PostgreSQL installation + +- A liquibase installation + + + +### Build and install + +- Update the [job.properties](./job.properties) files with the details for your OpenMRS MySQL database and the analytics PostgreSQL database + +- Build with `mvn clean package` + +- Retrieve and apply the [Liquibase file](https://github.com/ozone-his/ozonepro-docker/tree/master/flink/liquidbase) needed to create tables on the analytics database (more on installation and usage of Liquibase see [liquibase](https://www.liquibase.org/get-started/quickstart)) + +- Run with `flink run -m target/ozone-etl-flink-1.0-SNAPSHOT.jar` + +#### Building Docker image + +`docker build -t mekomsolutions/ozone-flink-jobs .` + +### Layout + +src/main/java/net/mekomsolutions/data/pipelines/batch + +src/main/java/net/mekomsolutions/data/pipelines/export + +src/main/java/net/mekomsolutions/data/pipelines/streaming + +src/main/java/net/mekomsolutions/data/pipelines/utils + +src/main/java/net/mekomsolutions/data/pipelines/shared + + + +### Adding new jobs +#### Sources and Sinks +This project uses flink's [Table API & SQL](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/overview/) ,for you to access data you need to setup [temporary](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/create/) tables both for data sources and data sinks. For most cases all the tables you will every need for writting OpenMRS data processing jobs are already added under `src/main/java/net/mekomsolutions/data/pipelines/shared/dsl` . But incase you need to add more you can add them to either `src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source` or `src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink` You will then need to register them in factory class `src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/TableDSLFactory.java` . See `src/main/java/net/mekomsolutions/data/pipelines/streaming/StreamingETLJob.java` for an example of how to use the factory to setup sources and sinks. + +#### Setting up jobs +Assuming you already have all the source and sink tables setup adding new jobs involves; + + - Adding your SQL files to `src/main/resources` + - Registering the jobs in `flink-jobs/src/main/resources/jobs.json` \ No newline at end of file diff --git a/batch-export.sh b/batch-export.sh new file mode 100644 index 0000000..c655dcd --- /dev/null +++ b/batch-export.sh @@ -0,0 +1,7 @@ +#!/bin/sh +: ${JDBC_URL?"Need to set JDBC_URL"} +: ${JDBC_USERNAME:?"Need to set JDBC_USERNAME"} +: ${JDBC_PASSWORD:?"Need to set JDBC_PASSWORDy"} +: ${OUTPUT_DIR:?"Need to set OUTPUT_DIR"} +: ${LOCATION_TAG:?"Need to set LOCATION_TAG"} +java -jar etl-export.jar --jdbc-url $JDBC_URL --jdbc-username $JDBC_USERNAME --jdbc-password $JDBC_PASSWORD --output-dir $OUTPUT_DIR --location-tag $LOCATION_TAG \ No newline at end of file diff --git a/job.properties b/job.properties new file mode 100644 index 0000000..8460baa --- /dev/null +++ b/job.properties @@ -0,0 +1,16 @@ +job.streaming=true +openmrs.database.username=root +openmrs.database.password=3cY8Kve4lGey +openmrs.database.batch.url=jdbc:mysql://localhost:3306/openmrs +openmrs.database.streaming.hostname=localhost +openmrs.database.streaming.port=3306 +openmrs.database.streaming.database-name=openmrs +properties.bootstrap.servers=localhost:9092 + + +analytics.database.username=postgres +analytics.database.password=password +analytics.database.batch.url=jdbc:postgresql://localhost:5432/analytics + +analytics.filesystem=true + diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..8093efb --- /dev/null +++ b/pom.xml @@ -0,0 +1,406 @@ + + + 4.0.0 + + net.mekomsolutions.data.pipelines + ozone-etl-flink + 1.0-SNAPSHOT + jar + + Ozone ETL Pipelines + + + UTF-8 + 1.14.4 + 1.8 + 2.12 + ${target.java.version} + ${target.java.version} + 2.12.1 + 2.13.2.2 + + + + + + streaming + + true + + + provided + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + streaming-job-build + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + net.mekomsolutions.data.pipelines.streaming.StreamingETLJob + + + ${project.artifactId}-${project.version}-etl-streaming + + + + + + + + + batch + + false + + + compile + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + export-job-build + package + + shade + + + + + + + + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + net.mekomsolutions.data.pipelines.export.BatchParquetExport + + + ${project.artifactId}-${project.version}-etl-export + + + + + + + + + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + + + + + + org.apache.flink + flink-java + ${flink.version} + ${scope} + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${flink.version} + ${scope} + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + ${scope} + + + org.apache.flink + flink-table-api-java-bridge_2.12 + ${flink.version} + ${scope} + + + org.apache.flink + flink-table-planner_2.12 + ${flink.version} + ${scope} + + + org.apache.flink + flink-streaming-scala_2.12 + ${flink.version} + ${scope} + + + org.apache.flink + flink-statebackend-rocksdb_2.12 + ${flink.version} + ${scope} + + + + + + org.apache.flink + flink-json + ${flink.version} + ${scope} + + + org.apache.flink + flink-connector-jdbc_2.12 + ${flink.version} + ${scope} + + + org.apache.flink + flink-connector-kafka_2.12 + ${flink.version} + + + org.apache.flink + flink-parquet_2.12 + ${flink.version} + + + org.postgresql + postgresql + 42.3.4 + + + mysql + mysql-connector-java + 8.0.28 + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + + org.apache.hadoop + hadoop-client + 3.3.2 + + + + com.google.guava + guava + 31.1-jre + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + commons-cli + commons-cli + 1.5.0 + + + org.liquibase + liquibase-core + 4.12.0 + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + export-job-migrations + package + + shade + + + + + + + + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + net.mekomsolutions.data.pipelines.migrations.RunMigrations + + + ${project.artifactId}-${project.version}-etl-migrations + + + + + + + + + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + org.apache.maven.plugins + maven-shade-plugin + [3.1.1,) + + shade + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + [3.1,) + + testCompile + compile + + + + + + + + + + + + + + diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..8240b71 --- /dev/null +++ b/run.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# +# Process all environment variables that start with 'FLINK_JOB_' +# +touch $FLINK_HOME/job.properties +env | while read -r VAR; +do + env_var=`echo "$VAR" | sed -r "s/([^=]*)=.*/\1/g"` + if [[ $env_var =~ ^FLINK_JOB_ ]]; then + prop_name=`echo "$VAR" | sed -r "s/^FLINK_JOB_([^=]*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .` + prop_value=`echo "$VAR" | sed -r "s/^FLINK_JOB_[^=]*=(.*)/\1/g"` + if egrep -q "(^|^#)$prop_name=" $FLINK_HOME/job.properties; then + #note that no config names or values may contain an '@' char + sed -r -i "s@(^|^#)($prop_name)=(.*)@\2=${prop_value}@g" $FLINK_HOME/job.properties + else + # echo "Adding property $prop_name=${prop_value}" + echo "$prop_name=${prop_value}" >> $FLINK_HOME/job.properties + fi + fi +done +java -jar /opt/flink/usrlib/ozone-etl-migrations.jar \ No newline at end of file diff --git a/src/main/java/net/mekomsolutions/data/pipelines/batch/BatchJob.java b/src/main/java/net/mekomsolutions/data/pipelines/batch/BatchJob.java new file mode 100644 index 0000000..88e806b --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/batch/BatchJob.java @@ -0,0 +1,4 @@ +package net.mekomsolutions.data.pipelines.batch; +//TODO Split Batch job to it's own class +public class BatchJob { +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/export/BatchParquetExport.java b/src/main/java/net/mekomsolutions/data/pipelines/export/BatchParquetExport.java new file mode 100644 index 0000000..7f722dd --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/export/BatchParquetExport.java @@ -0,0 +1,69 @@ +package net.mekomsolutions.data.pipelines.export; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableDSLFactory; +import net.mekomsolutions.data.pipelines.utils.CommonUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class BatchParquetExport { + public static void main(String[] args) throws Exception { + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .inBatchMode() + .build(); + TableEnvironment tEnv = TableEnvironment.create(settings); + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + Map postgresConnectorOptions = Stream.of(new String[][]{ + {"connector", "jdbc"}, + {"url", parameterTool.get("jdbc-url", "")}, + {"username", parameterTool.get("jdbc-username", "")}, + {"password", parameterTool.get("jdbc-password", "")}, + {"sink.buffer-flush.max-rows", "1000"}, + {"sink.buffer-flush.interval", "1s"} + }).collect(Collectors.toMap(data -> data[0], data -> data[1])); + + final Map fileSystemConnectorOptions = Stream.of(new String[][]{ + {"connector", "filesystem"}, + {"format", "parquet"}, + {"path", parameterTool.get("output-dir", "/tmp/")}, + }).collect(Collectors.toMap(data -> data[0], data -> data[1])); + + String[] sourceTables = {"visits", "patients", "concepts", "observations"}; + + CommonUtils.setupTables(tEnv, sourceTables, postgresConnectorOptions); + + + String outPutPath = parameterTool.get("output-dir", "/tmp/") + .replaceAll("([^/])$","$1/"); + String locationTag = parameterTool.get("location-tag", ""); + String date = DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now()); + + fileSystemConnectorOptions.put("path",outPutPath+"/observations/"+locationTag+"/"+date); + TableDSLFactory tableDSLFactorySink = new TableDSLFactory(fileSystemConnectorOptions); + tEnv.executeSql(tableDSLFactorySink.getTable("observations_fs").getDSL()); + + fileSystemConnectorOptions.put("path",outPutPath+"/patients/"+locationTag+"/"+date); + TableDSLFactory tableDSLFactorySinkPatients = new TableDSLFactory(fileSystemConnectorOptions); + tEnv.executeSql(tableDSLFactorySinkPatients.getTable("patients_fs").getDSL()); + + fileSystemConnectorOptions.put("path",outPutPath+"/concepts/"+locationTag+"/"+date); + TableDSLFactory tableDSLFactorySinkConcepts = new TableDSLFactory(fileSystemConnectorOptions); + tEnv.executeSql(tableDSLFactorySinkConcepts.getTable("concepts_fs").getDSL()); + + fileSystemConnectorOptions.put("path",outPutPath+"/visits/"+locationTag+"/"+date); + TableDSLFactory tableDSLFactorySinkVisits = new TableDSLFactory(fileSystemConnectorOptions); + tEnv.executeSql(tableDSLFactorySinkVisits.getTable("visits_fs").getDSL()); + + tEnv.executeSql("INSERT into observations_fs SELECT o.* from observations o"); + tEnv.executeSql("INSERT into concepts_fs SELECT c.* from concepts c"); + tEnv.executeSql("INSERT into patients_fs SELECT p.* from patients p"); + tEnv.executeSql("INSERT into visits_fs SELECT v.* from visits v"); + } +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/migrations/RunMigrations.java b/src/main/java/net/mekomsolutions/data/pipelines/migrations/RunMigrations.java new file mode 100644 index 0000000..bc5fc34 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/migrations/RunMigrations.java @@ -0,0 +1,59 @@ +package net.mekomsolutions.data.pipelines.migrations; + +import liquibase.Contexts; +import liquibase.LabelExpression; +import liquibase.Liquibase; +import liquibase.Scope; +import liquibase.database.Database; +import liquibase.database.DatabaseFactory; +import liquibase.database.jvm.JdbcConnection; +import liquibase.resource.ClassLoaderResourceAccessor; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class RunMigrations { + static String flinkHome = System.getenv("FLINK_HOME"); + public static void main(String[] args) throws Exception { + runLiquibase(); + } + public static void runLiquibase() throws Exception { + Map config = new HashMap<>(); + Properties prop = readPropertiesFile(flinkHome+"/job.properties"); + Scope.child(config, () -> { + try { + Connection connection = DriverManager.getConnection(prop.getProperty("postgres.url",""), + prop.getProperty("postgres.user",""), prop.getProperty("postgres.password","")); + Database database = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(new JdbcConnection(connection)); + Liquibase liquibase = new liquibase.Liquibase("database/dbchangelog.xml", new ClassLoaderResourceAccessor(), database); + liquibase.update(new Contexts(), new LabelExpression()); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }); + + } + public static Properties readPropertiesFile(String fileName) throws IOException { + FileInputStream fis = null; + Properties prop = null; + try { + + fis = new FileInputStream(fileName); + prop = new Properties(); + prop.load(fis); + } catch(IOException fnfe) { + fnfe.printStackTrace(); + } finally { + assert fis != null; + fis.close(); + } + return prop; + } +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/TableDSLFactory.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/TableDSLFactory.java new file mode 100644 index 0000000..13bc962 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/TableDSLFactory.java @@ -0,0 +1,116 @@ +package net.mekomsolutions.data.pipelines.shared.dsl; + +import net.mekomsolutions.data.pipelines.shared.dsl.sink.filesystem.ConceptsFs; +import net.mekomsolutions.data.pipelines.shared.dsl.sink.filesystem.ObservationsFs; +import net.mekomsolutions.data.pipelines.shared.dsl.sink.filesystem.PatientsFs; +import net.mekomsolutions.data.pipelines.shared.dsl.sink.filesystem.VisitsFs; +import net.mekomsolutions.data.pipelines.shared.dsl.sink.jdbc.Concepts; +import net.mekomsolutions.data.pipelines.shared.dsl.sink.jdbc.Observations; +import net.mekomsolutions.data.pipelines.shared.dsl.sink.jdbc.Patients; +import net.mekomsolutions.data.pipelines.shared.dsl.sink.jdbc.Visits; +import net.mekomsolutions.data.pipelines.shared.dsl.source.*; +import net.mekomsolutions.data.pipelines.streaming.StreamingETLJob; +import org.apache.logging.slf4j.Log4jLoggerFactory; +import org.slf4j.Logger; + +import java.util.Map; + +/** + * The class provides a factory method for getting Source/Sink Dynamic Table DSL for use in {@link StreamingETLJob ETLJob.class} + */ +public class TableDSLFactory { + private static final Logger LOG = new Log4jLoggerFactory().getLogger(TableDSLFactory.class.getName()); + private Map connectorOptions; + public TableDSLFactory(Map connectorOptions) { + this.connectorOptions = connectorOptions; + } + + /** + * @param tableName + * @return Table DSL for the requested table + */ + public TableSQLDSL getTable(String tableName){ + TableSQLDSL tableSQLDSL = null; + switch (tableName) { + case "concept": + tableSQLDSL = new Concept(this.connectorOptions); + break; + case "concept_name": + tableSQLDSL = new ConceptName(this.connectorOptions); + break; + case "concept_reference_map": + tableSQLDSL = new ConceptReferenceMap(this.connectorOptions); + break; + case "concept_reference_source": + tableSQLDSL = new ConceptReferenceSource(this.connectorOptions); + break; + case "concept_reference_term": + tableSQLDSL = new ConceptReferenceTerm(this.connectorOptions); + break; + case "encounter": + tableSQLDSL = new Encounter(this.connectorOptions); + break; + case "encounter_type": + tableSQLDSL = new EncounterType(this.connectorOptions); + break; + case "location": + tableSQLDSL = new Location(this.connectorOptions); + break; + case "person": + tableSQLDSL = new Person(this.connectorOptions); + break; + case "person_name": + tableSQLDSL = new PersonName(this.connectorOptions); + break; + case "person_address": + tableSQLDSL = new PersonAddress(this.connectorOptions); + break; + case "patient": + tableSQLDSL = new Patient(this.connectorOptions); + break; + case "patient_identifier": + tableSQLDSL = new PatientIdentifier(this.connectorOptions); + break; + case "patient_identifier_type": + tableSQLDSL = new PatientIdentifierType(this.connectorOptions); + break; + case "visit": + tableSQLDSL = new Visit(this.connectorOptions); + break; + case "visit_type": + tableSQLDSL = new VisitType(this.connectorOptions); + break; + case "obs": + tableSQLDSL = new Obs(this.connectorOptions); + break; + case "visits": + tableSQLDSL = new Visits(this.connectorOptions); + break; + case "observations": + tableSQLDSL = new Observations(this.connectorOptions); + break; + case "concepts": + tableSQLDSL = new Concepts(this.connectorOptions); + break; + case "patients": + tableSQLDSL = new Patients(this.connectorOptions); + break; + case "observations_fs": + tableSQLDSL = new ObservationsFs(this.connectorOptions); + break; + case "concepts_fs": + tableSQLDSL = new ConceptsFs(this.connectorOptions); + break; + case "patients_fs": + tableSQLDSL = new PatientsFs(this.connectorOptions); + break; + case "visits_fs": + tableSQLDSL = new VisitsFs(this.connectorOptions); + break; + default: + LOG.warn("Table DSL not found"); + } + return tableSQLDSL; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/TableSQLDSL.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/TableSQLDSL.java new file mode 100644 index 0000000..9af6e7e --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/TableSQLDSL.java @@ -0,0 +1,9 @@ +package net.mekomsolutions.data.pipelines.shared.dsl; + +/** + * A representation of a Flink Dynamic Source/Sink table + * Implementers of this interface have to an SQL DSL representation of the desired source our sink table + */ +public interface TableSQLDSL { + String getDSL(); +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/ConceptsFs.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/ConceptsFs.java new file mode 100644 index 0000000..0626acd --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/ConceptsFs.java @@ -0,0 +1,44 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.sink.filesystem; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; + +/** + * This creates a Flink Sink table + * For flattening OpenMRS Concepts + */ +public class ConceptsFs implements TableSQLDSL { + private Map connectorOptions; + public ConceptsFs(Map connectorOptions) { + this.connectorOptions = connectorOptions; + } + + + + /** + * @return concepts table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE concepts_fs (\n" + + " concept_id BIGINT PRIMARY KEY,\n" + + " concept_mapping_source VARCHAR,\n" + + " concept_mapping_code VARCHAR,\n" + + " concept_mapping_name VARCHAR,\n" + + " name VARCHAR,\n" + + " locale VARCHAR,\n" + + " locale_preferred BOOLEAN,\n" + + " retired BOOLEAN,\n" + + " uuid VARCHAR \n" + + ")\n" + + "WITH (\n" + + ConnectorUtils.propertyJoiner(",", "=").apply(this.connectorOptions) + + ")"; + } +} \ No newline at end of file diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/ObservationsFs.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/ObservationsFs.java new file mode 100644 index 0000000..66d7729 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/ObservationsFs.java @@ -0,0 +1,77 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.sink.filesystem; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; + +/** + * This creates a Flink Sink table + * For flattening OpenMRS Obs + */ +public class ObservationsFs implements TableSQLDSL { + private Map connectorOptions; + public ObservationsFs(Map connectorOptions) { + this.connectorOptions = connectorOptions; + } + + /** + * @return observations table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE observations_fs (\n" + + " obs_id BIGINT PRIMARY KEY,\n" + + " person_id BIGINT,\n" + + " concept_name VARCHAR,\n" + + " concept_id BIGINT,\n" + + " obs_group_id BIGINT,\n" + + " accession_number VARCHAR,\n" + + " form_namespace_and_path VARCHAR,\n" + + " value_coded BIGINT,\n" + + " value_coded_name VARCHAR,\n" + + " value_coded_name_id BIGINT,\n" + + " value_drug BIGINT,\n" + + " value_datetime TIMESTAMP,\n" + + " value_numeric DOUBLE,\n" + + " value_modifier VARCHAR,\n" + + " value_text VARCHAR,\n" + + " value_complex VARCHAR,\n" + + " comments VARCHAR,\n" + + " creator BIGINT,\n" + + " date_created TIMESTAMP,\n" + + " obs_voided BOOLEAN,\n" + + " obs_void_reason VARCHAR,\n" + + " previous_version BIGINT,\n" + + " encounter_id BIGINT,\n" + + " voided_2 BOOLEAN,\n" + + " visit_id BIGINT,\n" + + " visit_date_started TIMESTAMP,\n" + + " visit_date_stopped TIMESTAMP,\n" + + " location_id BIGINT,\n" + + " encounter_type_name VARCHAR,\n" + + " encounter_type_description VARCHAR,\n" + + " encounter_type_retired BOOLEAN,\n" + + " encounter_type_uuid VARCHAR,\n" + + " visit_type_name VARCHAR,\n" + + " visit_type_retired BOOLEAN,\n" + + " visit_type_uuid VARCHAR,\n" + + " location_name VARCHAR,\n" + + " location_address1 VARCHAR,\n" + + " location_address2 VARCHAR,\n" + + " location_city_village VARCHAR,\n" + + " location_state_province VARCHAR,\n" + + " location_postal_code VARCHAR,\n" + + " location_country VARCHAR,\n" + + " location_retired BOOLEAN,\n" + + " location_uuid VARCHAR \n" + + ")" + + "WITH (\n" + + ConnectorUtils.propertyJoiner(",", "=").apply(connectorOptions) + + ")"; + } +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/PatientsFs.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/PatientsFs.java new file mode 100644 index 0000000..b749fd0 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/PatientsFs.java @@ -0,0 +1,51 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.sink.filesystem; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; + +/** + * This creates a Flink Sink table + * For flattening OpenMRS Patients + */ +public class PatientsFs implements TableSQLDSL { + private Map connectorOptions; + public PatientsFs(Map connectorOptions) { + this.connectorOptions = connectorOptions; + } + + /** + * @return visits table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `patients_fs` (\n" + + "patient_id BIGINT PRIMARY KEY,\n" + + "given_name VARCHAR,\n" + + "middle_name VARCHAR,\n" + + "family_name VARCHAR,\n" + + "identifier VARCHAR,\n" + + "gender VARCHAR,\n" + + "birthdate DATE,\n" + + "birthdate_estimated BOOLEAN,\n" + + "city VARCHAR,\n" + + "dead BOOLEAN,\n" + + "death_date TIMESTAMP,\n" + + "cause_of_death BIGINT,\n" + + "creator BIGINT,\n" + + "date_created TIMESTAMP,\n" + + "person_voided BOOLEAN,\n" + + "person_void_reason VARCHAR\n" + + ")\n" + + "WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/VisitsFs.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/VisitsFs.java new file mode 100644 index 0000000..b269a2e --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/filesystem/VisitsFs.java @@ -0,0 +1,57 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.sink.filesystem; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; + +/** + * This creates a Flink Sink table + * For flattening OpenMRS Visits + */ +public class VisitsFs implements TableSQLDSL { + private Map connectorOptions; + public VisitsFs(Map connectorOptions) { + this.connectorOptions = connectorOptions; + } + + /** + * @return visits table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `visits_fs` (\n" + + " visit_id BIGINT PRIMARY KEY,\n" + + " patient_id BIGINT,\n" + + " visit_type_uuid VARCHAR,\n" + + " visit_type VARCHAR,\n" + + " date_started TIMESTAMP,\n" + + " date_stopped TIMESTAMP,\n" + + " indication_concept_id BIGINT,\n" + + " location_id BIGINT,\n" + + " visit_voided BOOLEAN,\n" + + " visit_uuid VARCHAR,\n" + + " person_id BIGINT,\n" + +// " number_occurences BIGINT,\n" + + " gender VARCHAR,\n" + + " birthdate DATE,\n" + + " birthdate_estimated BOOLEAN, \n" + + " age_at_visit_group_profile_1 VARCHAR,\n" + + " age_at_visit BIGINT,\n" + + " dead BOOLEAN,\n" + + " death_date TIMESTAMP,\n" + + " cause_of_death BIGINT,\n" + + " person_voided BOOLEAN,\n" + + " person_uuid VARCHAR\n" + + ")\n" + + "WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Concepts.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Concepts.java new file mode 100644 index 0000000..b26aae0 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Concepts.java @@ -0,0 +1,46 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.sink.jdbc; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This creates a Flink Sink table + * For flattening OpenMRS Concepts + */ +public class Concepts implements TableSQLDSL { + private Map connectorOptions; + public Concepts(Map connectorOptions) { + if(!Objects.equals(connectorOptions.get("connector"), "filesystem")){ + connectorOptions.put("table-name","concepts"); + }else{ + connectorOptions.put("path","/tmp/analytics/concepts"); + } + this.connectorOptions = connectorOptions; + } + + + + /** + * @return concepts table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE concepts (\n" + + " concept_id BIGINT PRIMARY KEY,\n" + + " concept_mapping_source VARCHAR,\n" + + " concept_mapping_code VARCHAR,\n" + + " concept_mapping_name VARCHAR,\n" + + " name VARCHAR,\n" + + " locale VARCHAR,\n" + + " locale_preferred BOOLEAN,\n" + + " retired BOOLEAN,\n" + + " uuid VARCHAR\n" + + ")\n" + + "WITH (\n" + + ConnectorUtils.propertyJoiner(",", "=").apply(this.connectorOptions) + + ")"; + } +} \ No newline at end of file diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Observations.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Observations.java new file mode 100644 index 0000000..31eba04 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Observations.java @@ -0,0 +1,79 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.sink.jdbc; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This creates a Flink Sink table + * For flattening OpenMRS Obs + */ +public class Observations implements TableSQLDSL { + private Map connectorOptions; + public Observations(Map connectorOptions) { + if(!Objects.equals(connectorOptions.get("connector"), "filesystem")){ + connectorOptions.put("table-name","observations"); + }else{ + connectorOptions.put("path","/tmp/analytics/observations"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return observations table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE observations (\n" + + " obs_id BIGINT PRIMARY KEY,\n" + + " person_id BIGINT,\n" + + " concept_name VARCHAR,\n" + + " concept_id BIGINT,\n" + + " obs_group_id BIGINT,\n" + + " accession_number VARCHAR,\n" + + " form_namespace_and_path VARCHAR,\n" + + " value_coded BIGINT,\n" + + " value_coded_name VARCHAR,\n" + + " value_coded_name_id BIGINT,\n" + + " value_drug BIGINT,\n" + + " value_datetime TIMESTAMP,\n" + + " value_numeric DOUBLE,\n" + + " value_modifier VARCHAR,\n" + + " value_text VARCHAR,\n" + + " value_complex VARCHAR,\n" + + " comments VARCHAR,\n" + + " creator BIGINT,\n" + + " date_created TIMESTAMP,\n" + + " obs_voided BOOLEAN,\n" + + " obs_void_reason VARCHAR,\n" + + " previous_version BIGINT,\n" + + " encounter_id BIGINT,\n" + + " voided_2 BOOLEAN,\n" + + " visit_id BIGINT,\n" + + " visit_date_started TIMESTAMP,\n" + + " visit_date_stopped TIMESTAMP,\n" + + " location_id BIGINT,\n" + + " encounter_type_name VARCHAR,\n" + + " encounter_type_description VARCHAR,\n" + + " encounter_type_retired BOOLEAN,\n" + + " encounter_type_uuid VARCHAR,\n" + + " visit_type_name VARCHAR,\n" + + " visit_type_retired BOOLEAN,\n" + + " visit_type_uuid VARCHAR,\n" + + " location_name VARCHAR,\n" + + " location_address1 VARCHAR,\n" + + " location_address2 VARCHAR,\n" + + " location_city_village VARCHAR,\n" + + " location_state_province VARCHAR,\n" + + " location_postal_code VARCHAR,\n" + + " location_country VARCHAR,\n" + + " location_retired BOOLEAN,\n" + + " location_uuid VARCHAR\n" + + ")" + + "WITH (\n" + + ConnectorUtils.propertyJoiner(",", "=").apply(connectorOptions) + + ")"; + } +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Patients.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Patients.java new file mode 100644 index 0000000..88a2ebf --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Patients.java @@ -0,0 +1,53 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.sink.jdbc; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This creates a Flink Sink table + * For flattening OpenMRS Patients + */ +public class Patients implements TableSQLDSL { + private Map connectorOptions; + public Patients(Map connectorOptions) { + if(!Objects.equals(connectorOptions.get("connector"), "filesystem")){ + connectorOptions.put("table-name","patients"); + }else{ + connectorOptions.put("path","/tmp/analytics/patients"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return visits table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `patients` (\n" + + "patient_id BIGINT PRIMARY KEY,\n" + + "given_name VARCHAR,\n" + + "middle_name VARCHAR,\n" + + "family_name VARCHAR,\n" + + "identifier VARCHAR,\n" + + "gender VARCHAR,\n" + + "birthdate DATE,\n" + + "birthdate_estimated BOOLEAN,\n" + + "city VARCHAR,\n" + + "dead BOOLEAN,\n" + + "death_date TIMESTAMP,\n" + + "cause_of_death BIGINT,\n" + + "creator int,\n" + + "date_created TIMESTAMP,\n" + + "person_voided BOOLEAN,\n" + + "person_void_reason VARCHAR\n" + + ")\n" + + "WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Visits.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Visits.java new file mode 100644 index 0000000..505329c --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/sink/jdbc/Visits.java @@ -0,0 +1,59 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.sink.jdbc; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This creates a Flink Sink table + * For flattening OpenMRS Visits + */ +public class Visits implements TableSQLDSL { + private Map connectorOptions; + public Visits(Map connectorOptions) { + if(!Objects.equals(connectorOptions.get("connector"), "filesystem")){ + connectorOptions.put("table-name","visits"); + }else{ + connectorOptions.put("path","/tmp/analytics/visits"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return visits table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `visits` (\n" + + " visit_id BIGINT PRIMARY KEY,\n" + + " patient_id BIGINT,\n" + + " visit_type_uuid VARCHAR,\n" + + " visit_type VARCHAR,\n" + + " date_started TIMESTAMP,\n" + + " date_stopped TIMESTAMP,\n" + + " indication_concept_id BIGINT,\n" + + " location_id BIGINT,\n" + + " visit_voided BOOLEAN,\n" + + " visit_uuid VARCHAR,\n" + + " person_id BIGINT,\n" + +// " number_occurences BIGINT,\n" + + " gender VARCHAR,\n" + + " birthdate DATE,\n" + + " birthdate_estimated BOOLEAN,\n" + + " age_at_visit_group_profile_1 VARCHAR,\n" + + " age_at_visit BIGINT,\n" + + " dead BOOLEAN,\n" + + " death_date TIMESTAMP,\n" + + " cause_of_death BIGINT,\n" + + " person_voided BOOLEAN,\n" + + " person_uuid VARCHAR\n" + + ")\n" + + "WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Concept.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Concept.java new file mode 100644 index 0000000..016b071 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Concept.java @@ -0,0 +1,56 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS Concept class + * @return + */ +public class Concept implements TableSQLDSL { + private Map connectorOptions; + + public Concept(Map connectorOptions) { + + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.concept"); + }else{ + connectorOptions.put("table-name","concept"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return concept source table DSL + */ + @Override + public String getDSL() { + + return "CREATE TABLE `concept` (\n" + + " `concept_id` int primary key,\n" + + " `retired` BOOLEAN,\n" + + " `short_name` VARCHAR,\n" + + " `description` VARCHAR,\n" + + " `form_text` VARCHAR,\n" + + " `datatype_id` int,\n" + + " `class_id` int,\n" + + " `is_set` BOOLEAN,\n" + + " `creator` int,\n" + + " `date_created` TIMESTAMP,\n" + + " `version` VARCHAR,\n" + + " `changed_by` int,\n" + + " `date_changed` TIMESTAMP,\n" + + " `retired_by` int,\n" + + " `date_retired` TIMESTAMP,\n" + + " `retire_reason` VARCHAR,\n" + + " `uuid` char\n" + + ")" + + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptName.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptName.java new file mode 100644 index 0000000..1b7c5ae --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptName.java @@ -0,0 +1,53 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * his class represents a Flink Source table + * Mapping to OpenMRS ConceptName class + */ +public class ConceptName implements TableSQLDSL { + private Map connectorOptions; + + public ConceptName(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.concept_name"); + }else{ + connectorOptions.put("table-name","concept_name"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return concept_name source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `concept_name` (\n" + + " `concept_name_id` int primary key,\n" + + " `concept_id` int,\n" + + " `name` VARCHAR,\n" + + " `locale` VARCHAR,\n" + + " `locale_preferred` BOOLEAN,\n" + + " `creator` int,\n" + + " `date_created` TIMESTAMP,\n" + + " `concept_name_type` VARCHAR,\n" + + " `voided` BOOLEAN,\n" + + " `voided_by` int,\n" + + " `date_voided` TIMESTAMP,\n" + + " `void_reason` VARCHAR,\n" + + " `uuid` char,\n" + + " `date_changed` TIMESTAMP,\n" + + " `changed_by` int\n" + + ")\n" + + " " + + "WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceMap.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceMap.java new file mode 100644 index 0000000..d7535bd --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceMap.java @@ -0,0 +1,45 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS ConceptReferenceMap class + */ +public class ConceptReferenceMap implements TableSQLDSL { + private Map connectorOptions; + public ConceptReferenceMap(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.concept_reference_map"); + }else{ + connectorOptions.put("table-name","concept_reference_map"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return concept_reference_map source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `concept_reference_map` (\n" + + " `concept_map_id` int primary key,\n" + + " `concept_reference_term_id` int,\n" + + " `concept_map_type_id` int,\n" + + " `creator` int,\n" + + " `date_created` TIMESTAMP,\n" + + " `concept_id` int,\n" + + " `changed_by` int,\n" + + " `date_changed` TIMESTAMP ,\n" + + " `uuid` char\n" + + ")" + + " " + + "WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceSource.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceSource.java new file mode 100644 index 0000000..9005045 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceSource.java @@ -0,0 +1,50 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS ConceptReferenceSource class + */ +public class ConceptReferenceSource implements TableSQLDSL { + private Map connectorOptions; + public ConceptReferenceSource(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.concept_reference_source"); + }else{ + connectorOptions.put("table-name","concept_reference_source"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return concept_reference_source source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `concept_reference_source` (\n" + + " `concept_source_id` int primary key,\n" + + " `name` VARCHAR,\n" + + " `description` VARCHAR,\n" + + " `hl7_code` VARCHAR,\n" + + " `creator` int,\n" + + " `date_created` TIMESTAMP,\n" + + " `retired` BOOLEAN,\n" + + " `retired_by` int,\n" + + " `date_retired` TIMESTAMP,\n" + + " `retire_reason` VARCHAR,\n" + + " `uuid` char,\n" + + " `unique_id` VARCHAR,\n" + + " `date_changed` TIMESTAMP,\n" + + " `changed_by` int\n" + + ")" + + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceTerm.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceTerm.java new file mode 100644 index 0000000..0d67f33 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/ConceptReferenceTerm.java @@ -0,0 +1,50 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS ConceptReferenceTerm class + */ +public class ConceptReferenceTerm implements TableSQLDSL { + private Map connectorOptions; + public ConceptReferenceTerm(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.concept_reference_term"); + }else{ + connectorOptions.put("table-name","concept_reference_term"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return concept_reference_term source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `concept_reference_term` (\n" + + " `concept_reference_term_id` int primary key,\n" + + " `concept_source_id` int,\n" + + " `name` VARCHAR,\n" + + " `code` VARCHAR,\n" + + " `version` VARCHAR,\n" + + " `description` VARCHAR,\n" + + " `creator` int,\n" + + " `date_created` TIMESTAMP,\n" + + " `date_changed` TIMESTAMP,\n" + + " `changed_by` int,\n" + + " `retired` BOOLEAN,\n" + + " `retired_by` int,\n" + + " `date_retired` TIMESTAMP,\n" + + " `retire_reason` VARCHAR,\n" + + " `uuid` char\n" + + ")" + + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Encounter.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Encounter.java new file mode 100644 index 0000000..1c37e10 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Encounter.java @@ -0,0 +1,51 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS Encounter class + */ +public class Encounter implements TableSQLDSL { + private Map connectorOptions; + public Encounter(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.encounter"); + }else{ + connectorOptions.put("table-name","encounter"); + } + this.connectorOptions = connectorOptions; + } + + @Override + /** + * @return encounter source table DSL + */ + public String getDSL() { + return "CREATE TABLE `encounter` (\n" + + " `encounter_id` int primary key,\n" + + " `encounter_type` int,\n" + + " `patient_id` int,\n" + + " `location_id` int,\n" + + " `form_id` int,\n" + + " `encounter_datetime` TIMESTAMP,\n" + + " `creator` int,\n" + + " `date_created` TIMESTAMP,\n" + + " `voided` BOOLEAN,\n" + + " `voided_by` int,\n" + + " `date_voided` TIMESTAMP,\n" + + " `void_reason` VARCHAR,\n" + + " `changed_by` int,\n" + + " `date_changed` TIMESTAMP,\n" + + " `visit_id` int,\n" + + " `uuid` char)" + + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/EncounterType.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/EncounterType.java new file mode 100644 index 0000000..cf19557 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/EncounterType.java @@ -0,0 +1,49 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS EncounterType class + */ +public class EncounterType implements TableSQLDSL { + private Map connectorOptions; + public EncounterType(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.encounter_type"); + }else{ + connectorOptions.put("table-name","encounter_type"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return encounter_type source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `encounter_type` (\n" + + " `encounter_type_id` int primary key,\n" + + " `name` VARCHAR,\n" + + " `description` VARCHAR,\n" + + " `creator` int,\n" + + " `date_created` TIMESTAMP,\n" + + " `retired` BOOLEAN,\n" + + " `retired_by` int,\n" + + " `date_retired` TIMESTAMP,\n" + + " `retire_reason` VARCHAR,\n" + + " `uuid` char,\n" + + " `edit_privilege` VARCHAR,\n" + + " `view_privilege` VARCHAR,\n" + + " `changed_by` int,\n" + + " `date_changed` TIMESTAMP)" + + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Location.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Location.java new file mode 100644 index 0000000..991c95b --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Location.java @@ -0,0 +1,71 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS Location class + */ +public class Location implements TableSQLDSL { + private Map connectorOptions; + public Location(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.location"); + }else{ + connectorOptions.put("table-name","location"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return location source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `location` (\n" + + " `location_id` int primary key,\n" + + " `name` VARCHAR,\n" + + " `description` VARCHAR,\n" + + " `address1` VARCHAR,\n" + + " `address2` VARCHAR,\n" + + " `city_village` VARCHAR,\n" + + " `state_province` VARCHAR,\n" + + " `postal_code` VARCHAR,\n" + + " `country` VARCHAR,\n" + + " `latitude` VARCHAR,\n" + + " `longitude` VARCHAR,\n" + + " `creator` int,\n" + + " `date_created` TIMESTAMP,\n" + + " `county_district` VARCHAR,\n" + + " `address3` VARCHAR,\n" + + " `address4` VARCHAR,\n" + + " `address5` VARCHAR ,\n" + + " `address6` VARCHAR ,\n" + + " `retired` BOOLEAN ,\n" + + " `retired_by` int ,\n" + + " `date_retired` TIMESTAMP ,\n" + + " `retire_reason` VARCHAR ,\n" + + " `parent_location` int ,\n" + + " `uuid` char,\n" + + " `changed_by` int ,\n" + + " `date_changed` TIMESTAMP ,\n" + + " `address7` VARCHAR ,\n" + + " `address8` VARCHAR ,\n" + + " `address9` VARCHAR ,\n" + + " `address10` VARCHAR ,\n" + + " `address11` VARCHAR ,\n" + + " `address12` VARCHAR ,\n" + + " `address13` VARCHAR ,\n" + + " `address14` VARCHAR,\n" + + " `address15` VARCHAR \n" + + ")" + + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Obs.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Obs.java new file mode 100644 index 0000000..2ca64c7 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Obs.java @@ -0,0 +1,66 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS Obs class + */ +public class Obs implements TableSQLDSL { + private Map connectorOptions; + + public Obs(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.obs"); + }else{ + connectorOptions.put("table-name","obs"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return obs source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE obs (\n" + + " obs_id int primary key,\n" + + " person_id int,\n" + + " concept_id int,\n" + + " encounter_id int,\n" + + " order_id int,\n" + + " obs_datetime TIMESTAMP,\n" + + " location_id int,\n" + + " obs_group_id int,\n" + + " accession_number VARCHAR,\n" + + " value_group_id int,\n" + + " value_coded int,\n" + + " value_coded_name_id int,\n" + + " value_drug int,\n" + + " value_datetime TIMESTAMP,\n" + + " value_numeric double,\n" + + " value_modifier VARCHAR,\n" + + " value_text VARCHAR,\n" + + " value_complex VARCHAR,\n" + + " comments VARCHAR,\n" + + " creator int,\n" + + " date_created TIMESTAMP,\n" + + " voided BOOLEAN,\n" + + " voided_by int,\n" + + " date_voided TIMESTAMP,\n" + + " void_reason VARCHAR,\n" + + " uuid VARCHAR,\n" + + " previous_version int,\n" + + " form_namespace_and_path VARCHAR,\n" + + " status VARCHAR,\n" + + " interpretation VARCHAR" + + ")" + + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Patient.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Patient.java new file mode 100644 index 0000000..2d06199 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Patient.java @@ -0,0 +1,46 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS Patient class + */ +public class Patient implements TableSQLDSL { + private Map connectorOptions; + public Patient(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.patient"); + }else{ + connectorOptions.put("table-name","patient"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return patient source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `patient` (\n" + + "`patient_id` int primary key,\n" + + "`creator` int,\n" + + "`date_created` TIMESTAMP,\n" + + "`changed_by` int,\n" + + "`date_changed` TIMESTAMP,\n" + + "`voided` BOOLEAN,\n" + + "`voided_by` int,\n" + + "`date_voided` TIMESTAMP,\n" + + "`void_reason` VARCHAR,\n" + + "`allergy_status` VARCHAR\n" + + ")" + + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PatientIdentifier.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PatientIdentifier.java new file mode 100644 index 0000000..1b15892 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PatientIdentifier.java @@ -0,0 +1,51 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS PatientIdentifier class + */ +public class PatientIdentifier implements TableSQLDSL { + private Map connectorOptions; + public PatientIdentifier(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.patient_identifier"); + }else{ + connectorOptions.put("table-name","patient_identifier"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return patient_identifier source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `patient_identifier` (\n" + + "`patient_identifier_id` int primary key,\n" + + "`patient_id` int,\n" + + "`identifier` VARCHAR,\n" + + "`identifier_type` int,\n" + + "`preferred` BOOLEAN,\n" + + "`location_id` int,\n" + + "`creator` int,\n" + + "`date_created` TIMESTAMP,\n" + + "`voided` BOOLEAN,\n" + + "`voided_by` int ,\n" + + "`date_voided` TIMESTAMP ,\n" + + "`void_reason` VARCHAR ,\n" + + "`uuid` char,\n" + + "`date_changed` TIMESTAMP,\n" + + "`changed_by` int\n" + + ")"+ + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PatientIdentifierType.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PatientIdentifierType.java new file mode 100644 index 0000000..18a4e54 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PatientIdentifierType.java @@ -0,0 +1,55 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS PatientIdentifierType class + */ +public class PatientIdentifierType implements TableSQLDSL { + private Map connectorOptions; + public PatientIdentifierType(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.patient_identifier_type"); + }else{ + connectorOptions.put("table-name","patient_identifier_type"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return patient_identifier_type source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `patient_identifier_type` (\n" + + "`patient_identifier_type_id` int primary key,\n" + + "`name` VARCHAR,\n" + + "`description` VARCHAR,\n" + + "`format` VARCHAR,\n" + + "`check_digit` BOOLEAN,\n" + + "`creator` int,\n" + + "`date_created` TIMESTAMP,\n" + + "`required` BOOLEAN,\n" + + "`format_description` VARCHAR,\n" + + "`validator` VARCHAR,\n" + + "`retired` BOOLEAN,\n" + + "`retired_by` int,\n" + + "`date_retired` TIMESTAMP,\n" + + "`retire_reason` VARCHAR,\n" + + "`uuid` char,\n" + + "`location_behavior` VARCHAR,\n" + + "`uniqueness_behavior` VARCHAR,\n" + + "`date_changed` TIMESTAMP,\n" + + "`changed_by` int\n" + + ")" + + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Person.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Person.java new file mode 100644 index 0000000..24a3127 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Person.java @@ -0,0 +1,55 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS Person class + */ +public class Person implements TableSQLDSL { + private Map connectorOptions; + public Person(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.person"); + }else{ + connectorOptions.put("table-name","person"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return person source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `person` (\n" + + " `person_id` int primary key,\n" + + " `gender` VARCHAR,\n" + + " `birthdate` DATE ,\n" + + " `birthdate_estimated` BOOLEAN,\n" + + " `dead` BOOLEAN,\n" + + " `death_date` TIMESTAMP,\n" + + " `cause_of_death` BIGINT,\n" + + " `creator` int,\n" + + " `date_created` TIMESTAMP,\n" + + " `changed_by` int,\n" + + " `date_changed` TIMESTAMP,\n" + + " `voided` BOOLEAN,\n" + + " `voided_by` int,\n" + + " `date_voided` TIMESTAMP,\n" + + " `void_reason` VARCHAR,\n" + + " `uuid` char,\n" + + " `deathdate_estimated` BOOLEAN,\n" + + " `birthtime` time,\n" + + " `cause_of_death_non_coded` VARCHAR\n" + + ")"+ + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PersonAddress.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PersonAddress.java new file mode 100644 index 0000000..1ee26bd --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PersonAddress.java @@ -0,0 +1,72 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS PersonAddress class + */ +public class PersonAddress implements TableSQLDSL { + private Map connectorOptions; + public PersonAddress(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.person_address"); + }else{ + connectorOptions.put("table-name","person_address"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return person_address source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `person_address` (\n" + + "`person_address_id` int primary key,\n" + + "`person_id` int,\n" + + "`preferred` BOOLEAN,\n" + + "`address1` VARCHAR,\n" + + "`address2` VARCHAR,\n" + + "`city_village` VARCHAR,\n" + + "`state_province` VARCHAR,\n" + + "`postal_code` VARCHAR,\n" + + "`country` VARCHAR,\n" + + "`latitude` VARCHAR,\n" + + "`longitude` VARCHAR,\n" + + "`creator` int,\n" + + "`date_created` TIMESTAMP,\n" + + "`voided` BOOLEAN,\n" + + "`voided_by` int,\n" + + "`date_voided` TIMESTAMP,\n" + + "`void_reason` VARCHAR,\n" + + "`county_district` VARCHAR,\n" + + "`address3` VARCHAR,\n" + + "`address6` VARCHAR,\n" + + "`address5` VARCHAR,\n" + + "`address4` VARCHAR,\n" + + "`uuid` char,\n" + + "`date_changed` TIMESTAMP,\n" + + "`changed_by` int,\n" + + "`start_date` TIMESTAMP,\n" + + "`end_date` TIMESTAMP,\n" + + "`address7` VARCHAR,\n" + + "`address8` VARCHAR,\n" + + "`address9` VARCHAR,\n" + + "`address10` VARCHAR,\n" + + "`address11` VARCHAR,\n" + + "`address12` VARCHAR,\n" + + "`address13` VARCHAR,\n" + + "`address14` VARCHAR,\n" + + "`address15` VARCHAR\n" + + ")"+ + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PersonName.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PersonName.java new file mode 100644 index 0000000..eed43c2 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/PersonName.java @@ -0,0 +1,56 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS PersonName class + */ +public class PersonName implements TableSQLDSL { + private Map connectorOptions; + public PersonName(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.person_name"); + }else{ + connectorOptions.put("table-name","person_name"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return person_name source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `person_name` (\n" + + "`person_name_id` int primary key,\n" + + "`preferred` BOOLEAN,\n" + + "`person_id` int,\n" + + "`prefix` VARCHAR,\n" + + "`given_name` VARCHAR,\n" + + "`middle_name` VARCHAR,\n" + + "`family_name_prefix` VARCHAR,\n" + + "`family_name` VARCHAR,\n" + + "`family_name2` VARCHAR,\n" + + "`family_name_suffix` VARCHAR,\n" + + "`degree` VARCHAR,\n" + + "`creator` int,\n" + + "`date_created` TIMESTAMP,\n" + + "`voided` BOOLEAN,\n" + + "`voided_by` int,\n" + + "`date_voided` TIMESTAMP,\n" + + "`void_reason` VARCHAR,\n" + + "`changed_by` int,\n" + + "`date_changed` TIMESTAMP,\n" + + "`uuid` char\n" + + ")"+ + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Visit.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Visit.java new file mode 100644 index 0000000..57dc84d --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/Visit.java @@ -0,0 +1,52 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS Visit class + */ +public class Visit implements TableSQLDSL { + private Map connectorOptions; + public Visit(Map connectorOptions) { + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.visit"); + }else{ + connectorOptions.put("table-name","visit"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return visit source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `visit` (\n" + + " `visit_id` int primary key,\n" + + " `patient_id` int,\n" + + " `visit_type_id` int,\n" + + " `date_started` TIMESTAMP,\n" + + " `date_stopped` TIMESTAMP,\n" + + " `indication_concept_id` int,\n" + + " `location_id` int,\n" + + " `creator` int,\n" + + " `date_created` TIMESTAMP,\n" + + " `changed_by` int,\n" + + " `date_changed` TIMESTAMP,\n" + + " `voided` BOOLEAN,\n" + + " `voided_by` int,\n" + + " `date_voided` TIMESTAMP,\n" + + " `void_reason` VARCHAR,\n" + + " `uuid` VARCHAR\n" + + ") " + + "WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(this.connectorOptions) + + ")"; + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/VisitType.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/VisitType.java new file mode 100644 index 0000000..dfd1f77 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/dsl/source/VisitType.java @@ -0,0 +1,48 @@ +package net.mekomsolutions.data.pipelines.shared.dsl.source; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableSQLDSL; +import net.mekomsolutions.data.pipelines.utils.ConnectorUtils; + +import java.util.Map; +import java.util.Objects; + +/** + * This class represents a Flink Source table + * Mapping to OpenMRS VisitType class + */ +public class VisitType implements TableSQLDSL { + private Map connectorOptions; + public VisitType(Map connectorOptions) { + + if(Objects.equals(connectorOptions.get("connector"), "kafka")){ + connectorOptions.put("topic","openmrs.openmrs.visit_type"); + }else{ + connectorOptions.put("table-name","visit_type"); + } + this.connectorOptions = connectorOptions; + } + + /** + * @return visit_type source table DSL + */ + @Override + public String getDSL() { + return "CREATE TABLE `visit_type` (\n" + + " `visit_type_id` int primary key,\n" + + " `name` VARCHAR,\n" + + " `description` VARCHAR,\n" + + " `creator` int,\n" + + " `date_created` TIMESTAMP,\n" + + " `changed_by` int,\n" + + " `date_changed` TIMESTAMP,\n" + + " `retired` BOOLEAN,\n" + + " `retired_by` int,\n" + + " `date_retired` TIMESTAMP,\n" + + " `retire_reason` VARCHAR,\n" + + " `uuid` VARCHAR\n" + + ")"+ + " WITH (\n" + + ConnectorUtils.propertyJoiner(",","=").apply(connectorOptions) + + ")"; + } +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/shared/jobs/Job.java b/src/main/java/net/mekomsolutions/data/pipelines/shared/jobs/Job.java new file mode 100644 index 0000000..680ed3f --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/shared/jobs/Job.java @@ -0,0 +1,22 @@ +package net.mekomsolutions.data.pipelines.shared.jobs; + +public class Job { + private String name; + private String sourceFilePath; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getSourceFilePath() { + return sourceFilePath; + } + + public void setSourceFilePath(String sourceFilePath) { + this.sourceFilePath = sourceFilePath; + } +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/streaming/StreamingETLJob.java b/src/main/java/net/mekomsolutions/data/pipelines/streaming/StreamingETLJob.java new file mode 100644 index 0000000..0795d66 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/streaming/StreamingETLJob.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.mekomsolutions.data.pipelines.streaming; + +import com.fasterxml.jackson.databind.ObjectMapper; +import net.mekomsolutions.data.pipelines.shared.dsl.TableDSLFactory; +import net.mekomsolutions.data.pipelines.shared.jobs.Job; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.StatementSet; +import org.apache.flink.table.api.TableEnvironment; +//import org.apache.logging.slf4j.Log4jLoggerFactory; +//import org.slf4j.Logger; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +/** + * Flink Job Mekom ETL pipelines iy offers both Batch and Streaming options you can select the mode in the job.properties + * file by setting job.streaming=true/false + *

For a tutorial how to write a Flink streaming application, check the + * tutorials and examples on the Flink Website. + * + *

To package your application into a JAR file for execution, run + * 'mvn clean package' on the command line. + * + *

If you change the name of the main class (with the public static void main(String[] args)) + * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). + */ +public class StreamingETLJob { + //private static final Logger LOG = new Log4jLoggerFactory().getLogger(StreamingETLJob.class.getName()); + + public static void main(String[] args) throws Exception { + String propertiesFilePath = System.getProperty("user.dir") + "/job.properties"; + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + propertiesFilePath = parameterTool.get("properties-file", "/opt/flink/usrlib/job.properties"); + ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath); + boolean streaming = Boolean.parseBoolean(parameter.get("job.streaming", "true")); + Map openmrsConnectorOptions = Stream.of(new String[][]{ + {"connector", "jdbc"}, + {"url", parameter.get("openmrs.database.batch.url", "")}, + {"username", parameter.get("openmrs.database.username", "")}, + {"password", parameter.get("openmrs.database.password", "")}, + }).collect(Collectors.toMap(data -> data[0], data -> data[1])); + Map postgresConnectorOptions = Stream.of(new String[][]{ + {"connector", "jdbc"}, + {"url", parameterTool.get("sink-url", "")}, + {"username", parameterTool.get("sink-username", "")}, + {"password", parameterTool.get("sink-password", "")}, + {"sink.buffer-flush.max-rows", "1000"}, + {"sink.buffer-flush.interval", "1s"} + }).collect(Collectors.toMap(data -> data[0], data -> data[1])); + + Map fileSystemConnectorOptions = Stream.of(new String[][]{ + {"connector", "filesystem"}, + {"format", "parquet"}, + {"sink.partition-commit.delay", "1 h"}, + {"sink.partition-commit.policy.kind", "success-file"} + }).collect(Collectors.toMap(data -> data[0], data -> data[1])); + + + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .inBatchMode() + .build(); + + if (streaming) { + settings = EnvironmentSettings + .newInstance() + .inStreamingMode() + .build(); + openmrsConnectorOptions = Stream.of(new String[][]{ + {"connector", "kafka"}, + {"properties.bootstrap.servers" ,parameter.get("properties.bootstrap.servers", "")}, + {"scan.startup.mode" ,"earliest-offset"}, + {"value.format" ,"debezium-json"}, + {"value.debezium-json.ignore-parse-errors", "true"}, + }).collect(Collectors.toMap(data -> data[0], data -> data[1])); + } + + TableEnvironment tEnv = TableEnvironment.create(settings); + tEnv.getConfig().getConfiguration().setString("restart-strategy", "exponential-delay"); + //set the checkpoint mode to EXACTLY_ONCE + if (streaming) { + tEnv.getConfig().getConfiguration().setString("execution.checkpointing.mode", "EXACTLY_ONCE"); + tEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval", "10min"); + tEnv.getConfig().getConfiguration().setString("execution.checkpointing.timeout", "10min"); + tEnv.getConfig().getConfiguration().setString("execution.checkpointing.unaligned", "true"); + tEnv.getConfig().getConfiguration().setString("execution.checkpointing.tolerable-failed-checkpoints", "400"); + tEnv.getConfig().getConfiguration().setString("table.dynamic-table-options.enabled", "true"); + tEnv.getConfig().getConfiguration().setString("state.backend", "rocksdb"); + tEnv.getConfig().getConfiguration().setString("state.backend.incremental", "true"); + tEnv.getConfig().getConfiguration().setString("state.checkpoints.dir", "file:///tmp/checkpoints/"); + tEnv.getConfig().getConfiguration().setString("state.savepoints.dir", "file:///tmp/savepoints/"); + } + + + + // set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager" + // you can also set the full qualified Java class name of the StateBackendFactory to this option + // e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory + tEnv.getConfig().getConfiguration().setString("taskmanager.network.numberOfBuffers", "20"); + tEnv.getConfig() // access high-level configuration + .getConfiguration() // set low-level key-value options + .setString("table.exec.resource.default-parallelism", "6"); + // set the checkpoint directory, which is required by the RocksDB statebackend + + + String[] sourceTables = {"person", "person_name", "person_address", "patient", "patient_identifier", "patient_identifier_type", "visit", "visit_type", "concept", "concept_name", "concept_reference_map", + "concept_reference_term", "concept_reference_source", "obs", "encounter", "encounter_type", "location"}; + String[] sinkTables = {"visits", "patients", "concepts", "observations"}; + setupSourceTables(tEnv, sourceTables, openmrsConnectorOptions); + setupSinkTables(tEnv, sinkTables, postgresConnectorOptions); + final ObjectMapper objectMapper = new ObjectMapper(); + Job[] jobs = objectMapper.readValue(getResourceFileAsString("jobs.json"), Job[].class); + StatementSet stmtSet = tEnv.createStatementSet(); + for (Job job : jobs) { + String queryDSL = "INSERT INTO " + job.getName() + " \n" + getResourceFileAsString(job.getSourceFilePath()); + stmtSet.addInsertSql(queryDSL); + } + + stmtSet.execute(); + } + + private static void setupSourceTables(TableEnvironment tableEnv, String[] tables, Map connectorOptions) { + TableDSLFactory tableDSLFactory = new TableDSLFactory(connectorOptions); + for (String tableName : tables) { + tableEnv.executeSql(tableDSLFactory.getTable(tableName).getDSL()); + } + } + + private static void setupSinkTables(TableEnvironment tableEnv, String[] tables, Map connectorOptions) { + TableDSLFactory tableDSLFactory = new TableDSLFactory(connectorOptions); + for (String tableName : tables) { + tableEnv.executeSql(tableDSLFactory.getTable(tableName).getDSL()); + } + } + + private static String getResourceFileAsString(String fileName) throws IOException { + try (InputStream is = StreamingETLJob.class.getResourceAsStream("/" + fileName)) { + if (is == null) return null; + try (InputStreamReader isr = new InputStreamReader(is); + BufferedReader reader = new BufferedReader(isr)) { + return reader.lines().collect(Collectors.joining(System.lineSeparator())); + } + } + } + +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/utils/CommonUtils.java b/src/main/java/net/mekomsolutions/data/pipelines/utils/CommonUtils.java new file mode 100644 index 0000000..9c31c44 --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/utils/CommonUtils.java @@ -0,0 +1,16 @@ +package net.mekomsolutions.data.pipelines.utils; + +import net.mekomsolutions.data.pipelines.shared.dsl.TableDSLFactory; +import org.apache.flink.table.api.TableEnvironment; + +import java.util.Map; + +public class CommonUtils { + public static void setupTables(TableEnvironment tableEnv, String[] tables, Map connectorOptions) { + + TableDSLFactory tableDSLFactory = new TableDSLFactory(connectorOptions); + for (String tableName : tables) { + tableEnv.executeSql(tableDSLFactory.getTable(tableName).getDSL()); + } + } +} diff --git a/src/main/java/net/mekomsolutions/data/pipelines/utils/ConnectorUtils.java b/src/main/java/net/mekomsolutions/data/pipelines/utils/ConnectorUtils.java new file mode 100644 index 0000000..0f39dff --- /dev/null +++ b/src/main/java/net/mekomsolutions/data/pipelines/utils/ConnectorUtils.java @@ -0,0 +1,13 @@ +package net.mekomsolutions.data.pipelines.utils; + +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class ConnectorUtils { + public static Function, String> propertyJoiner(String entrySeparator, String valueSeparator) { + return m -> m.entrySet().stream() + .map(e -> "'" + e.getKey() + "' " + valueSeparator +" '"+ e.getValue()+ "'") + .collect(Collectors.joining(entrySeparator)); + } +} diff --git a/src/main/resources/concepts.sql b/src/main/resources/concepts.sql new file mode 100644 index 0000000..ab8d07b --- /dev/null +++ b/src/main/resources/concepts.sql @@ -0,0 +1,14 @@ +SELECT `concept`.`concept_id` AS `concept_id`, + `concept_reference_source`.`name` AS `Concept Mapping Source`, + `concept_reference_term`.`code` AS `Concept Mapping Code`, + `concept_reference_term`.`name` AS `Concept Mapping Name`, + `concept_name`.`name` AS `name`, + `concept_name`.`locale` AS `locale`, + `concept_name`.`locale_preferred` AS `locale_preferred`, + `concept`.`retired` AS `retired`, + `concept`.`uuid` AS `uuid` +FROM `concept` +LEFT JOIN `concept_reference_map` `concept_reference_map` ON `concept`.`concept_id` = `concept_reference_map`.`concept_id` +LEFT JOIN `concept_reference_term` `concept_reference_term` ON `concept_reference_map`.`concept_reference_term_id` = `concept_reference_term`.`concept_reference_term_id` +LEFT JOIN `concept_reference_source` `concept_reference_source` ON `concept_reference_term`.`concept_source_id` = `concept_reference_source`.`concept_source_id` +LEFT JOIN `concept_name` `concept_name` ON `concept`.`concept_id` = `concept_name`.`concept_id` \ No newline at end of file diff --git a/src/main/resources/database/dbchangelog.xml b/src/main/resources/database/dbchangelog.xml new file mode 100644 index 0000000..582f053 --- /dev/null +++ b/src/main/resources/database/dbchangelog.xml @@ -0,0 +1,128 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/main/resources/jobs.json b/src/main/resources/jobs.json new file mode 100644 index 0000000..13327be --- /dev/null +++ b/src/main/resources/jobs.json @@ -0,0 +1,18 @@ +[ + { + "name": "visits", + "sourceFilePath": "visits.sql" + }, + { + "name": "concepts", + "sourceFilePath": "concepts.sql" + }, + { + "name": "patients", + "sourceFilePath": "patients.sql" + }, + { + "name": "observations", + "sourceFilePath": "observations.sql" + } +] diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..e07168e --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,10 @@ +log4j.appender.Stdout=org.apache.log4j.ConsoleAppender +log4j.appender.Stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.Stdout.layout.conversionPattern=%-5p - %-26.26c{1} - %m\n + +log4j.rootLogger=INFO,Stdout + +log4j.logger.org.apache.wicket=INFO +log4j.logger.org.apache.wicket.protocol.http.HttpSessionStore=INFO +log4j.logger.org.apache.wicket.version=INFO +log4j.logger.org.apache.wicket.RequestCycle=INFO \ No newline at end of file diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties new file mode 100644 index 0000000..32c696e --- /dev/null +++ b/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/src/main/resources/observations.sql b/src/main/resources/observations.sql new file mode 100644 index 0000000..b8eb1ad --- /dev/null +++ b/src/main/resources/observations.sql @@ -0,0 +1,53 @@ +SELECT obs.obs_id AS obs_id, + obs.person_id AS person_id, + concept_concept_name.name AS concept_name, + obs.concept_id AS concept_id, + obs.obs_group_id AS obs_group_id, + obs.accession_number AS accession_number, + obs.form_namespace_and_path AS form_namespace_and_path, + obs.value_coded AS value_coded, + value_concept_name.name AS value_coded_name, + obs.value_coded_name_id AS value_coded_name_id, + obs.value_drug AS value_drug, + obs.value_datetime AS value_datetime, + obs.value_numeric AS value_numeric, + obs.value_modifier AS value_modifier, + obs.value_text AS value_text, + obs.value_complex AS value_complex, + obs.comments AS comments, + obs.creator AS creator, + obs.date_created AS date_created, + obs.voided AS obs_voided, + obs.void_reason AS obs_void_reason, + obs.previous_version AS previous_version, + encounter.encounter_id AS encounter_id, + encounter.voided AS voided_2, + visit.visit_id AS visit_id, + visit.date_started AS visit_date_started, + visit.date_stopped AS visit_date_stopped, + visit.location_id AS location_id, + encounter_type.name AS encounter_type_name, + encounter_type.description AS encounter_type_description, + encounter_type.retired AS encounter_type_retired, + encounter_type.uuid AS encounter_type_uuid, + visit_type.name AS visit_type_name, + visit_type.retired AS visit_type_retired, + visit_type.uuid AS visit_type_uuid, + location.name AS location_name, + location.address1 AS location_address1, + location.address2 AS location_address2, + location.city_village AS location_city_village, + location.state_province AS location_state_province, + location.postal_code AS location_postal_code, + location.country AS location_country, + location.retired AS location_retired, + location.uuid AS location_uuid +FROM obs +LEFT JOIN concept_name value_concept_name ON obs.value_coded = value_concept_name.concept_id +AND obs.value_coded IS NOT NULL +LEFT JOIN encounter encounter ON obs.encounter_id = encounter.encounter_id +LEFT JOIN visit visit ON encounter.visit_id = visit.visit_id +LEFT JOIN encounter_type encounter_type ON encounter.encounter_type = encounter_type.encounter_type_id +LEFT JOIN visit_type visit_type ON visit.visit_type_id = visit_type.visit_type_id +LEFT JOIN location location ON obs.location_id = location.location_id +LEFT JOIN concept_name concept_concept_name ON obs.concept_id = concept_concept_name.concept_id \ No newline at end of file diff --git a/src/main/resources/patients.sql b/src/main/resources/patients.sql new file mode 100644 index 0000000..3265c25 --- /dev/null +++ b/src/main/resources/patients.sql @@ -0,0 +1,25 @@ +SELECT patient.patient_id AS id, + name.given_name AS given_name, + name.middle_name AS middle_name, + name.family_name AS family_name, + identifier.identifier AS identifier, + person.gender AS gender, + person.birthdate AS birthdate, + person.birthdate_estimated AS birthdate_estimated, + address.city_village AS city, + person.dead AS dead, + person.death_date AS death_date, + person.cause_of_death AS cause_of_death, + person.creator AS creator, + person.date_created AS date_created, + person.voided AS person_voided, + person.void_reason AS person_void_reason +FROM patient + LEFT JOIN person person + ON patient.patient_id = person.person_id + LEFT JOIN person_name name + ON person.person_id = name.person_id + LEFT JOIN person_address address + ON person.person_id = address.person_id + LEFT JOIN patient_identifier identifier + ON patient.patient_id = identifier.patient_id diff --git a/src/main/resources/visits.sql b/src/main/resources/visits.sql new file mode 100644 index 0000000..7be69e2 --- /dev/null +++ b/src/main/resources/visits.sql @@ -0,0 +1,35 @@ +SELECT visit.visit_id AS visit_id, + visit.patient_id AS patient_id, + visit_type.uuid AS visit_type_uuid, + visit_type.name AS visit_type, + visit.date_started AS date_started, + visit.date_stopped AS date_stopped, + visit.indication_concept_id AS indication_concept_id, + visit.location_id AS location_id, + visit.voided AS visit_voided, + visit.uuid AS visit_uuid, + person.person_id AS person_id, + person.gender AS gender, + person.birthdate AS birthdate, + person.birthdate_estimated AS birthdate_estimated, + CASE + WHEN Timestampdiff(day,person.birthdate , visit.date_started) / 365 < 1 THEN '0 - 1' + WHEN Timestampdiff(day,person.birthdate , visit.date_started) / 365 BETWEEN 1 AND 4 THEN '1 - 4' + WHEN Timestampdiff(day, person.birthdate , visit.date_started) / 365 BETWEEN 4 AND 9 THEN '5 - 9' + WHEN Timestampdiff(day, person.birthdate , visit.date_started) / 365 BETWEEN 10 AND 14 THEN '10 - 14' + WHEN Timestampdiff(day, person.birthdate , visit.date_started) / 365 BETWEEN 15 AND 19 THEN '15 - 19' + WHEN Timestampdiff(day, person.birthdate , visit.date_started) / 365 BETWEEN 20 AND 24 THEN '20 - 24' + WHEN Timestampdiff(day, person.birthdate , visit.date_started) / 365 > 24 THEN '25+' + ELSE 'Invalid birthdate' + END AS `age_at_visit_group_profile_1`, + timestampdiff(day, person.birthdate,visit.date_started) / 365 AS age_at_visit, + person.dead AS dead, + person.death_date AS death_date, + person.cause_of_death AS cause_of_death, + person.voided AS person_voided, + person.uuid AS person_uuid +FROM visit +LEFT JOIN visit_type visit_type +ON visit.visit_type_id = visit_type.visit_type_id +LEFT JOIN person person +ON visit.patient_id = person.person_id \ No newline at end of file