Skip to content

Commit

Permalink
sink-postgres: add TLS configuration
Browse files Browse the repository at this point in the history
**Summary**

Add TLS-related options to the PostgreSQL sink. We expose all options exposed
by the native_tls crate so that we can support as many deployments as possible.

This commit changes the default behaviour from not using TLS to using TLS.
  • Loading branch information
fracek committed Sep 11, 2023
1 parent 7443b5a commit d197c57
Show file tree
Hide file tree
Showing 11 changed files with 384 additions and 35 deletions.
202 changes: 179 additions & 23 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ starknet = { git = "https://github.com/fracek/starknet-rs", rev = "5bb6950" }
thiserror = "1.0.32"
tempfile = "3.3.0"
tempdir = "0.3.7"
testcontainers = "0.14.0"
testcontainers = { version = "0.14.0", features = ["experimental"] }
tokio = { version = "1.20.1", features = ["full"] }
tokio-stream = { version = "0.1.10", features = ["sync"] }
tokio-util = "0.7.4"
Expand Down
33 changes: 33 additions & 0 deletions docs/integrations/postgres.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ apibara plugins install sink-postgres
- `tableName: string`: table where data will be inserted. The table must exist and
it must have a schema compatible with the data returned by the transform
step.
- `noTls: boolean`: disable TLS when connecting to the server.
- `tlsCertificate: string`: path to the PEM-formatted X509 TLS certificate.
- `tlsDisableSystemRoots: boolean`: disable system root certificates.
- `tlsAcceptInvalidCertificates: boolean`: accept invalid TLS certificates.
- `tlsAcceptInvalidHostnames: boolean`: disable hostname validation.
- `tlsUseSni: boolean`: use Server Name Identification (SNI).


### Table schema
Expand All @@ -40,3 +46,30 @@ Batch data is converted to Postgres records using the `json_populate_recordset`
function. Additionally, the PostgreSQL integration **requires a `_cursor`
column** in the table to keep track of each batch's cursor, so that data can be
invalidated in case of chain reorganizations.

### Provider-specific setup

#### Supabase

You have two options:

- disable TLS by adding the `--no-tls=true` flag when running your indexer.
**This is not recommended for production**.
- download the SSL certificate from your Supabase dashboard (Settings =>
Database) and convert it to PEM.

After downloading the `.crt` certificate from your dashboard, you will have a
`.crt` file in your download folder. This file will be named something like
`prod-ca-2021.pem`. Convert it to PEM using, for example, the `openssl` CLI tool.

```bash
openssl x509 -in prod-ca-2021.crt -out prod-ca-2021.pem -outform PEM
```

Use the `--tls-certificate` (or `sinkOptions.tlsCertificate` in your
script) flag to point to the PEM certificate path.

#### Neon

Use the provided connection string.

9 changes: 5 additions & 4 deletions examples/postgres/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ _Mirror onchain data to a PostgreSQL table._
You must set the `POSTGRES_CONNECTION_STRING` environment variable to the one
provided by your PostgreSQL provider.

For developing locally, we provide a `docker-compose.postgres.yml` file that
For developing locally, we provide a `docker-compose.yml` file that
starts Hasura locally. Run it with:

```
docker-compose -f docker-compose.postgres.yml up
docker-compose up
```

Then export the following environment variable:
Expand All @@ -29,8 +29,9 @@ export POSTGRES_CONNECTION_STRING='postgres://postgres:postgres@localhost:5432/p
Follow
[the steps on the official Hasura
documentation](https://hasura.io/docs/latest/getting-started/docker-simple/#step-2-connect-a-database)
to connect to the database and create the following table (TL;DR: use
`PG_DATABASE_URL` to connect Hasura to PostgreSQL).
to connect to the database and create the following table (TL;DR: visit
http://localhost:8080 and use `PG_DATABASE_URL` to connect Hasura to
PostgreSQL).

**Notice**: the `_cursor` column is REQUIRED by Apibara to automatically
invalidate data following chain reorganizations.
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions examples/postgres/starknet_to_postgres.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const config = {
filter,
sinkType: "postgres",
sinkOptions: {
// noTls: true,
tableName: "transfers",
},
};
Expand Down
1 change: 1 addition & 0 deletions nix/build.nix
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ rec {
pkg-config
protobuf
rustToolchain
openssl
] ++ pkgs.lib.optional stdenv.isDarwin (with pkgs.darwin.apple_sdk.frameworks; [
CoreFoundation
CoreServices
Expand Down
2 changes: 2 additions & 0 deletions sink-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ apibara-sink-common = { path = "../sink-common" }
async-trait.workspace = true
color-eyre.workspace = true
clap.workspace = true
native-tls = "0.2.11"
postgres-native-tls = "0.5.0"
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
Expand Down
64 changes: 62 additions & 2 deletions sink-postgres/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
use std::str::FromStr;
use std::{path::PathBuf, str::FromStr};

use apibara_sink_common::SinkOptions;
use clap::Args;
use tokio_postgres::Config;

use crate::sink::SinkPostgresError;

#[derive(Debug)]
pub enum TlsConfiguration {
NoTls,
Tls {
certificate: Option<PathBuf>,
accept_invalid_certificates: Option<bool>,
disable_system_roots: Option<bool>,
accept_invalid_hostnames: Option<bool>,
use_sni: Option<bool>,
},
}

#[derive(Debug)]
pub struct SinkPostgresConfiguration {
pub pg: Config,
pub table_name: String,
pub tls: TlsConfiguration,
}

#[derive(Debug, Args, Default, SinkOptions)]
Expand All @@ -24,13 +37,43 @@ pub struct SinkPostgresOptions {
/// transformation step.
#[arg(long, env = "POSTGRES_TABLE_NAME")]
pub table_name: Option<String>,
/// Disable TLS when connecting to the PostgreSQL server.
#[arg(long, env = "POSTGRES_NO_TLS")]
pub no_tls: Option<bool>,
/// Path to the PEM-formatted X509 TLS certificate file.
#[arg(long, env = "POSTGRES_TLS_CERTIFICATE")]
pub tls_certificate: Option<String>,
/// Disable system root certificates.
#[arg(long, env = "POSTGRES_TLS_DISABLE_SYSTEM_ROOTS")]
pub tls_disable_system_roots: Option<bool>,
/// Disable certificate validation.
#[arg(long, env = "POSTGRES_TLS_ACCEPT_INVALID_CERTIFICATES")]
pub tls_accept_invalid_certificates: Option<bool>,
/// Disable hostname validation.
#[arg(long, env = "POSTGRES_TLS_ACCEPT_INVALID_HOSTNAMES")]
pub tls_accept_invalid_hostnames: Option<bool>,
/// Use Server Name Indication (SNI).
#[arg(long, env = "POSTGRES_TLS_USE_SNI")]
pub tls_use_sni: Option<bool>,
}

impl SinkOptions for SinkPostgresOptions {
fn merge(self, other: SinkPostgresOptions) -> Self {
Self {
connection_string: self.connection_string.or(other.connection_string),
table_name: self.table_name.or(other.table_name),
no_tls: self.no_tls.or(other.no_tls),
tls_certificate: self.tls_certificate.or(other.tls_certificate),
tls_disable_system_roots: self
.tls_disable_system_roots
.or(other.tls_disable_system_roots),
tls_accept_invalid_certificates: self
.tls_accept_invalid_certificates
.or(other.tls_accept_invalid_certificates),
tls_accept_invalid_hostnames: self
.tls_accept_invalid_hostnames
.or(other.tls_accept_invalid_hostnames),
tls_use_sni: self.tls_use_sni.or(other.tls_use_sni),
}
}
}
Expand All @@ -44,6 +87,23 @@ impl SinkPostgresOptions {
let table_name = self
.table_name
.ok_or_else(|| SinkPostgresError::MissingTableName)?;
Ok(SinkPostgresConfiguration { pg, table_name })

let tls = if self.no_tls.unwrap_or(false) {
TlsConfiguration::NoTls
} else {
TlsConfiguration::Tls {
certificate: self.tls_certificate.map(PathBuf::from),
accept_invalid_certificates: self.tls_accept_invalid_certificates,
disable_system_roots: self.tls_disable_system_roots,
accept_invalid_hostnames: self.tls_accept_invalid_hostnames,
use_sni: self.tls_use_sni,
}
};

Ok(SinkPostgresConfiguration {
pg,
table_name,
tls,
})
}
}
58 changes: 55 additions & 3 deletions sink-postgres/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use apibara_core::node::v1alpha2::{Cursor, DataFinality};
use apibara_sink_common::{CursorAction, DisplayCursor, Sink, ValueExt};
use async_trait::async_trait;
use native_tls::{Certificate, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use serde_json::Value;
use tokio_postgres::types::Json;
use tokio_postgres::{Client, NoTls, Statement};
use tracing::{info, warn};

use crate::configuration::TlsConfiguration;
use crate::SinkPostgresOptions;

#[derive(Debug, thiserror::Error)]
Expand All @@ -16,6 +19,10 @@ pub enum SinkPostgresError {
MissingTableName,
#[error("Postgres error: {0}")]
Postgres(#[from] tokio_postgres::Error),
#[error("TLS error: {0}")]
Tls(#[from] native_tls::Error),
#[error("IO error: {0}")]
IO(#[from] std::io::Error),
}

pub struct PostgresSink {
Expand All @@ -35,9 +42,54 @@ impl Sink for PostgresSink {
let config = options.to_postgres_configuration()?;
let table_name = config.table_name;

// TODO: add flag to use tls
let (client, connection) = config.pg.connect(NoTls).await?;
tokio::spawn(connection);
// Notice that all `connector` and `connection` types are different, so it's easier/cleaner
// to just connect and spawn a connection inside each branch.
let client = match config.tls {
TlsConfiguration::NoTls => {
info!("Using insecure connection");
let (client, connection) = config.pg.connect(NoTls).await?;
tokio::spawn(connection);
client
}
TlsConfiguration::Tls {
certificate,
accept_invalid_hostnames,
accept_invalid_certificates,
disable_system_roots,
use_sni,
} => {
info!("Configure TLS connection");
let mut builder = TlsConnector::builder();

if let Some(certificate) = certificate {
let certificate = tokio::fs::read(certificate).await?;
let certificate = Certificate::from_pem(&certificate)?;
builder.add_root_certificate(certificate);
}

if let Some(accept_invalid_certificates) = accept_invalid_certificates {
builder.danger_accept_invalid_certs(accept_invalid_certificates);
}

if let Some(disable_system_roots) = disable_system_roots {
builder.disable_built_in_roots(disable_system_roots);
}

if let Some(accept_invalid_hostnames) = accept_invalid_hostnames {
builder.danger_accept_invalid_hostnames(accept_invalid_hostnames);
}

if let Some(use_sni) = use_sni {
builder.use_sni(use_sni);
}

let connector = builder.build()?;
let connector = MakeTlsConnector::new(connector);
let (client, connection) = config.pg.connect(connector).await?;
tokio::spawn(connection);
client
}
};

info!("client connected successfully");

Expand Down
47 changes: 45 additions & 2 deletions sink-postgres/tests/test_sink.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,56 @@
use std::collections::HashMap;

use apibara_core::node::v1alpha2::{Cursor, DataFinality};
use apibara_sink_common::{CursorAction, Sink};
use apibara_sink_postgres::{PostgresSink, SinkPostgresError, SinkPostgresOptions};
use serde_json::{json, Value};
use testcontainers::{
clients,
core::{ExecCommand, WaitFor},
images::postgres::Postgres,
Container,
Container, Image,
};
use tokio_postgres::Client;

const NAME: &str = "postgres";
const TAG: &str = "11-alpine";

#[derive(Debug)]
pub struct Postgres {
env_vars: HashMap<String, String>,
}

impl Default for Postgres {
fn default() -> Self {
let mut env_vars = HashMap::new();
env_vars.insert("POSTGRES_DB".to_owned(), "postgres".to_owned());
env_vars.insert("POSTGRES_HOST_AUTH_METHOD".into(), "trust".into());

Self { env_vars }
}
}

impl Image for Postgres {
type Args = ();

fn name(&self) -> String {
NAME.to_owned()
}

fn tag(&self) -> String {
TAG.to_owned()
}

fn ready_conditions(&self) -> Vec<WaitFor> {
vec![WaitFor::message_on_stderr(
"database system is ready to accept connections",
)]
}

fn env_vars(&self) -> Box<dyn Iterator<Item = (&String, &String)> + '_> {
Box::new(self.env_vars.iter())
}
}

fn new_cursor(order_key: u64) -> Cursor {
Cursor {
order_key,
Expand Down Expand Up @@ -105,6 +146,8 @@ async fn new_sink(port: u16) -> PostgresSink {
let options = SinkPostgresOptions {
connection_string: Some(format!("postgresql://postgres@localhost:{}", port)),
table_name: Some("test".into()),
no_tls: Some(true),
..Default::default()
};
PostgresSink::from_options(options).await.unwrap()
}
Expand Down

0 comments on commit d197c57

Please sign in to comment.