From d78c7f644138689f1ceb40431144cd47e0b1c537 Mon Sep 17 00:00:00 2001 From: Chase Granberry Date: Mon, 16 Oct 2023 08:26:57 -0700 Subject: [PATCH 01/11] chore: move docs to mkdocs (#186) * chore: move docs to mkdocs * fix: add deployment section --- VERSION | 2 +- docs/connecting.md | 37 +++++++++++++++++++ docs/deployment/fly.md | 41 +++++++++++++++++++++ docs/development/installation.md | 13 +++++++ docs/development/setup.md | 60 +++++++++++++++++++++++++++++++ docs/images/favicon.ico | Bin 0 -> 15086 bytes docs/index.md | 21 +++++++++++ docs/metrics.md | 31 ++++++++++++++++ mkdocs.yaml | 43 ++++++++++++++++++++++ 9 files changed, 247 insertions(+), 1 deletion(-) create mode 100644 docs/connecting.md create mode 100644 docs/deployment/fly.md create mode 100644 docs/development/installation.md create mode 100644 docs/development/setup.md create mode 100644 docs/images/favicon.ico create mode 100644 docs/index.md create mode 100644 docs/metrics.md create mode 100644 mkdocs.yaml diff --git a/VERSION b/VERSION index 6d44d227..5d11b147 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.14 +0.9.15 diff --git a/docs/connecting.md b/docs/connecting.md new file mode 100644 index 00000000..dacb21ca --- /dev/null +++ b/docs/connecting.md @@ -0,0 +1,37 @@ +To connect to a tenant database Supavisor needs to look up the tenant with an `external_id`. + +You can connect to Supavisor just like you connect to Postgres except we need to include the `external_id` in the connection string. + +Supavisor parses the `external_id` from a connection in one three ways: + +- The username +- Server name identification +- `options` parameters + +> 📘 Examples +> +> In the following examples our `external_id` is `dev_tenant`. + +## Username + +Include the `external_id` in the username. The `external_id` is found after the `.` in the username: + +``` +psql postgresql://postgres.dev_tenant:postgres@localhost:6543/postgres +``` + +## Server name indication + +The subdomain of the SNI from the TLS handshake: + +``` +dev_tenant.supabase.co +``` + +## Options parameters + +Include the `external_id` as the `reference` in the `options` parameters: + +``` +psql postgresql://postgres:postgres@localhost:6543/postgres&options=reference%3Ddev_tenant +``` diff --git a/docs/deployment/fly.md b/docs/deployment/fly.md new file mode 100644 index 00000000..668c4b04 --- /dev/null +++ b/docs/deployment/fly.md @@ -0,0 +1,41 @@ +The `toml.yaml` file should be in the `deploy/fly` directory of Supavisor. + +Type the following command in your terminal: + +```bash +fly launch +``` + +Choose a name for your app when prompted, then answer "yes" to the following question: + +```bash +Would you like to copy its configuration to the new app? (y/N) +``` + +Next, select an organization and choose a region. You don't need to deploy the app yet. + +Since the pooler uses an additional port (7654) for the PostgreSQL protocol, you need to reserve an IP address: + +```bash +fly ips allocate-v4 +``` + +Set your app's secrets by running the following command: + +```bash +fly secrets set DATABASE_URL="ecto://postgres:postgres@localhost:6432/postgres" \ +VAULT_ENC_KEY="some_vault_secret" \ +API_JWT_SECRET="some_api_secret" \ +METRICS_JWT_SECRET="some_metrics_secret" \ +SECRET_KEY_BASE="some_kb_secret" +``` + +Replace the example values with your actual secrets. + +Finally, deploy your app using the following command: + +```bash +fly deploy +``` + +This will deploy your app on Fly.io diff --git a/docs/development/installation.md b/docs/development/installation.md new file mode 100644 index 00000000..0c773a1b --- /dev/null +++ b/docs/development/installation.md @@ -0,0 +1,13 @@ +Before starting, set up the database where Supavisor will store tenants' data. The following command will pull a Docker image with PostgreSQL 14 and run it on port 6432: + +``` +docker-compose -f ./docker-compose.db.yml up +``` + +> `Supavisor` stores tables in the `supavisor` schema. The schema should be automatically created by the `dev/postgres/00-setup.sql` file. If you encounter issues with migrations, ensure that this schema exists. + +Next, get dependencies and apply migrations: + +``` +mix deps.get && mix ecto.migrate --prefix _supavisor --log-migrator-sql +``` diff --git a/docs/development/setup.md b/docs/development/setup.md new file mode 100644 index 00000000..b696f229 --- /dev/null +++ b/docs/development/setup.md @@ -0,0 +1,60 @@ +Launch the Supavisor application: + +``` +make dev +``` + +You need to add tenants to the database. For example, the following request will add the `dev_tenant` with credentials to the database set up earlier. + +## Add/update tenant + +```bash +curl -X PUT \ + 'http://localhost:4000/api/tenants/dev_tenant' \ + --header 'Accept: application/json' \ + --header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJvbGUiOiJhbm9uIiwiaWF0IjoxNjQ1MTkyODI0LCJleHAiOjE5NjA3Njg4MjR9.M9jrxyvPLkUxWgOYSf5dNdJ8v_eRrq810ShFRT8N-6M' \ + --header 'Content-Type: application/json' \ + --data-raw '{ + "tenant": { + "db_host": "localhost", + "db_port": 6432, + "db_database": "postgres", + "ip_version": "auto", // "auto" | v4 | v6 + "require_user": true, // true | false + "upstream_ssl": true, // true | false, + "enforce_ssl": false, // true | false, + "upstream_verify": "peer", // "none" | "peer" + "upstream_tls_ca": "-----BEGIN CERTIFICATE-----\nblalblalblablalblalblaba\n-----END CERTIFICATE-----\n", // "" + "users": [ + { + "db_user": "postgres", + "db_password": "postgres", + "pool_size": 20, + "mode_type": "transaction", + "pool_checkout_timeout": 100 + } + ] + } +}' +``` + +Now, it's possible to connect through the proxy. By default, Supavisor uses port `6543` for transaction mode and `5432` for session mode: + +``` +psql postgresql://postgres.dev_tenant:postgres@localhost:6543/postgres +``` + +> :warning: The tenant's ID is incorporated into the username and separated by the `.` symbol. For instance, for the username `some_username` belonging to the tenant `some_tenant`, the modified username will be `some_username.some_tenant`. This approach enables the system to support multi-tenancy on a single IP address. + +## Delete tenant + +To delete a tenant, send the following request: + +```bash +curl -X DELETE \ + 'http://localhost:4000/api/tenants/dev_tenant' \ + --header 'Accept: application/json' \ + --header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJvbGUiOiJhbm9uIiwiaWF0IjoxNjQ1MTkyODI0LCJleHAiOjE5NjA3Njg4MjR9.M9jrxyvPLkUxWgOYSf5dNdJ8v_eRrq810ShFRT8N-6M' +``` + +API documentation can be found at [http://localhost:4000/swaggerui](http://localhost:4000/swaggerui) diff --git a/docs/images/favicon.ico b/docs/images/favicon.ico new file mode 100644 index 0000000000000000000000000000000000000000..d3fca2acb10d86d7ff179fe66fa372dabeb21c91 GIT binary patch literal 15086 zcmds8U2s#!6}}xh<)IIy)5%MbZON9bKiiTlW6A#}P;en3B!L8&CSbq@2aLfsX-o<> z7#H&sut{1vF-}4f8|WWN-8Ly{iWyo+fYR1bCV9&YZ-wdPq3r}1I6dD=D_>pTs~<}i zX?Ny$ulC;E^X=JlcF*ovE6P`tY08Wl3g>2J{Z|#GR8bVOIrRGkm%qkkUT^rfO;P4O zq$m#lhF@`wP&sFc!ry)C>md@q`4162#B`YH^LzL1(cQauV-mi>w@m#^qVDb6x9^@h zb&8sso2j9pff^ed1vRV<<&od~4&UHg)ZzNz`%nC~+wC-S=FA9X=a|S{VkT3TDN-ii z<6G20UEo2Rc$brt6Qb-K%6D66!%s@6@=-fwXNAkd@9{0_$o>LzRDJ%=V#%S#ejjZ< z<|miWLfPD=Y#-mEj+7PpC+p)o@XIn~)1r+o+V-1bTHIh7!%=b<~5+^ z2?v$VwMEVUYJPgpXdAKxuWsS{USFFCLi$C}jS6Qj!-4fbyx`bYQ z&rc193(4t?_DA%vq@(}vH|z!MM-6zWW5h@8KljjP_D2@AYwhSi2AN|!*4zJzI zJ6+WJ(?Z&K%th-498~|ZjmjU3ErVqF!*}wOS*f_vNuYr&8U7gO zmAie^f3-&VZSdXH1V0e|pO)}V{T?6XT6td3%pu9}2Y%@J_FtEU{7xNSTHfL}^M|im zx6(CX9FpM=?9jWpE1cAOzEbpE%MX3+8)IU>I70jzGF!3TGhqypE!#Fu%U|@+ldVN!ub|a|$?!)Tu=ke1SyAKAH&#Py_7_jE&4{h0s(&ioL7OGB zZPfXGc?1LGz4drNtcP+Bq}4a=@5%IEwgC+DS*P}WR3-L682eqnuSjYQPK7^kU?26& zPOsn(``h<%6;&;=^FC;ztxMeMJXQYC1+fo%vox|7T=ZN)%KVe@KQIp1$L;)}T-aHC zZ!xL+80#7R*YiIT_P@e92{|lW>!z1@AKX41Agg`iT&IUa$Nv&OAbdjjhKcx}qFv^X z`SlszQ#bvnB*b6G-qaY-Hhf%_{|VnW;(x<8PSpRF81ii9&^}mYt7@L|8NAUpe0)^` zZ|niSK$+Zs03+mtv5H3~Qx?lC_IGFtvITF={sViT2f61=07o6(-H}?3E z{r9M%OzTqkh#737D!e-)ACb5s?{6VDu zFjHUvmWP?X#uULTA5a!13+G3eZZUn?(oF{RW2RgtffxAYGv{8W3rt`31dPC{#*RMe zV#j&oty{O~`t|E{<;sQyJjvX{_-aPVp zz2tJaGD6@5o}HbYbnV(T?fI6(2|WjP*u_6&A#$9vW6cW$0%G08T9cifolyc$tc}57 zP+$QjO@fWiWrA%-C17_toe7_t$hCMXa`XdOfJt-C2+XfCeWC_`_3Bm8XVjZ49h`m9O7jyYB&^bb0hu&-u-ZO$?$$3%SMd;vafA{gU+15B_*%@{`|o&Z>n}Z2f6zrv&HaLS zvd8;On}Wf9$!Q)!v)m*mYd@NHa??BZe7bXeqU7{Z5}G3wx1Qz z=HnsU&%N&Cc!Hf)zha}x#}kOjsn+*&;>UUl9Wf%zp*EM9R`^LVZskMo7Tan{gLx`c)A8^x_lZCYDZ8vRrV?36EcuLc2 z4qDM|PbjvPF8znRi+OIx+47!0R>mdVr8#h!*I|yu=n>x4_lDykESFezoDTUzFP5|w ziF-04M#DZ%%sft9MfgY@PkE%VfajMC;Aizv}8;hVg>5$Ui(E z!;W#kY|lCU*qvJL&<_#ELoCmbYJ^VCS-{(7mH7p$zCM)9(AA9TT-IBoIQCwSX;v&uJdSA8pQ*wReu#r3 zwym~X^)v2e8rh{0UTyhQ;)g9Dj=t$=bQ~Rhz?`$P$46Q`$^M>7{J;YJh7Z^I-gq2d z;>Y}hbD?B0tG}OW{IU=8pLNmR3!ykZ@M8`_pWJ@qwN;JN=s6Sk!5jAgzSqU~0Hklh za}aF1tS+{HV}x6lX9_=T0s8~!Ok`c)HEzKgcS<>x`OA3|QR1#u67=YI47HjKL& z@qA<{YrP!!C_fdYAXddJIHgX1yvuT`Br+R)a zFfo4I(OdOHpTLj1Z{jWnmnVu@{T(pjY+rMJj%h11c#*fekti6-q!^ehOQJK5KMd;zZ0B*b&D@ zEciUrgoOAt<|WLnnT%f}W)9v439)t=zm{?M|H=CUnmZ8a6UN0jNM<0A3>3pgq35_` zB4jS_pU6C14xO6K1l#_U33Kd&N?4mPM*<7(^h{@xa7F$({(eJIK0`iTQAUwZQ +PostgreSQL version +License + +

+ +--- + +**Documentation**: https://supabase.github.io/supavisor + +**Source Code**: https://github.com/supabase/supavisor + +--- + +## Overview + +Supavisor is an open source scalable connection pooler for Postgres. + +Review the [readme](https://github.com/supabase/supavisor/blob/main/README.md) for a detailed overview. diff --git a/docs/metrics.md b/docs/metrics.md new file mode 100644 index 00000000..1e3c2caf --- /dev/null +++ b/docs/metrics.md @@ -0,0 +1,31 @@ +The metrics feature provides a range of metrics in the Prometheus format. The main modules involved in this implementation are: + +- `Supavisor.Monitoring.PromEx` +- `Supavisor.PromEx.Plugins.OsMon` +- `Supavisor.PromEx.Plugins.Tenant` +- `Supavisor.Monitoring.Telem` + +## Metrics exposed + +The exposed metrics include the following: + +- Application +- BEAM +- Phoenix +- Ecto +- System monitoring metrics: + - CPU utilization + - RAM usage + - Load average (LA) +- Pool checkout queue time +- Number of connected clients +- Query duration and query counts +- Network usage for client sockets and database sockets + +## Usage + +To use the metrics feature, send an HTTP request to the `/metrics` endpoint. The endpoint is secured using Bearer authentication, which requires a JSON Web Token (JWT) generated using the `METRICS_JWT_SECRET` environment variable. Make sure to set this environment variable with a secure secret key. + +When a node receives a request for metrics, it polls all nodes in the cluster, accumulates their metrics, and appends service tags such as region and host. To generate a valid JWT, use a library or tool that supports JWT creation with the HS256 algorithm and the `METRICS_JWT_SECRET` as the secret key. + +Remember to keep the `METRICS_JWT_SECRET` secure and only share it with authorized personnel who require access to the metrics endpoint. diff --git a/mkdocs.yaml b/mkdocs.yaml new file mode 100644 index 00000000..a2d96117 --- /dev/null +++ b/mkdocs.yaml @@ -0,0 +1,43 @@ +site_name: supavisor +site_url: https://supabase.github.io/supavisor +site_description: A cloud native multi-tenant connection pooler for Postgres + +repo_name: supabase/supavisor +repo_url: https://github.com/supabase/supavisor + +nav: + - Welcome: 'index.md' + - Development: + - Installation: 'development/installation.md' + - Setup: 'development/setup.md' + - Deployment: + - Deploy with Fly.io: 'deployment/fly.md' + - Connecting: 'connecting.md' + - Metrics: 'metrics.md' + +theme: + name: 'material' + favicon: 'images/favicon.ico' + logo: 'images/favicon.ico' + homepage: https://supabase.github.io/supavisor + features: + - navigation.expand + palette: + primary: black + accent: light green + +markdown_extensions: + - pymdownx.highlight: + linenums: true + guess_lang: false + use_pygments: true + pygments_style: default + - pymdownx.superfences + - pymdownx.tabbed: + alternate_style: true + - pymdownx.snippets + - pymdownx.tasklist + - admonition + - pymdownx.emoji: + emoji_index: !!python/name:materialx.emoji.twemoji + emoji_generator: !!python/name:materialx.emoji.to_svg \ No newline at end of file From 3cba0b0fa56df8513ab739564619e0f583c8c53d Mon Sep 17 00:00:00 2001 From: Stas Date: Tue, 17 Oct 2023 19:50:46 +0200 Subject: [PATCH 02/11] feat: add native mode --- lib/supavisor.ex | 2 +- lib/supavisor/application.ex | 9 +- lib/supavisor/client_handler.ex | 136 +++++------------------- lib/supavisor/handler_helpers.ex | 98 +++++++++++++++++ lib/supavisor/native_handler.ex | 175 +++++++++++++++++++++++++++++++ lib/supavisor/protocol/server.ex | 60 +++++++++++ lib/supavisor/tenants.ex | 23 ++++ 7 files changed, 387 insertions(+), 116 deletions(-) create mode 100644 lib/supavisor/handler_helpers.ex create mode 100644 lib/supavisor/native_handler.ex diff --git a/lib/supavisor.ex b/lib/supavisor.ex index 7014c8b0..2b5b822a 100644 --- a/lib/supavisor.ex +++ b/lib/supavisor.ex @@ -9,7 +9,7 @@ defmodule Supavisor do @type tcp_sock :: {:gen_tcp, :gen_tcp.socket()} @type workers :: %{manager: pid, pool: pid} @type secrets :: {:password | :auth_query, fun()} - @type mode :: :transaction | :session + @type mode :: :transaction | :session | :native @type id :: {String.t(), String.t(), mode} @type subscribe_opts :: %{workers: workers, ps: list, idle_timeout: integer} diff --git a/lib/supavisor/application.ex b/lib/supavisor/application.ex index 8057712d..b6cebd69 100644 --- a/lib/supavisor/application.ex +++ b/lib/supavisor/application.ex @@ -29,11 +29,12 @@ defmodule Supavisor.Application do proxy_ports = [ {:pg_proxy_transaction, Application.get_env(:supavisor, :proxy_port_transaction), - :transaction}, - {:pg_proxy_session, Application.get_env(:supavisor, :proxy_port_session), :session} + :transaction, Supavisor.ClientHandler}, + {:pg_proxy_native, Application.get_env(:supavisor, :proxy_port_session), :native, + Supavisor.NativeHandler} ] - for {key, port, mode} <- proxy_ports do + for {key, port, mode, module} <- proxy_ports do :ranch.start_listener( key, :ranch_tcp, @@ -42,7 +43,7 @@ defmodule Supavisor.Application do num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"), socket_opts: [port: port, keepalive: true] }, - Supavisor.ClientHandler, + module, %{mode: mode} ) |> then(&"Proxy started #{mode} on port #{port}, result: #{inspect(&1)}") diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index a63e9a5d..c7593b54 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -14,6 +14,7 @@ defmodule Supavisor.ClientHandler do alias Supavisor, as: S alias Supavisor.DbHandler, as: Db alias Supavisor.Helpers, as: H + alias Supavisor.HandlerHelpers, as: HH alias Supavisor.{Tenants, Monitoring.Telem, Protocol.Server} @impl true @@ -65,7 +66,7 @@ defmodule Supavisor.ClientHandler do @impl true def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :exchange, data) do Logger.debug("Client is trying to request HTTP") - sock_send(data.sock, "HTTP/1.1 204 OK\r\n\r\n") + HH.sock_send(data.sock, "HTTP/1.1 204 OK\r\n\r\n") {:stop, :normal, data} end @@ -77,8 +78,8 @@ defmodule Supavisor.ClientHandler do # SSL negotiation, S/N/Error if !!downstream_cert and !!downstream_key do - :ok = setopts(sock, active: false) - :ok = sock_send(sock, "S") + :ok = HH.setopts(sock, active: false) + :ok = HH.sock_send(sock, "S") opts = [ certfile: downstream_cert, @@ -88,7 +89,7 @@ defmodule Supavisor.ClientHandler do case :ssl.handshake(elem(sock, 1), opts) do {:ok, ssl_sock} -> socket = {:ssl, ssl_sock} - :ok = setopts(socket, active: true) + :ok = HH.setopts(socket, active: true) {:keep_state, %{data | sock: socket, ssl: true}} error -> @@ -97,16 +98,16 @@ defmodule Supavisor.ClientHandler do end else Logger.error("User requested SSL connection but no downstream cert/key found") - :ok = sock_send(data.sock, "N") + :ok = HH.sock_send(data.sock, "N") :keep_state_and_data end end def handle_event(:info, {_, _, bin}, :exchange, data) do - case decode_startup_packet(bin) do + case Server.decode_startup_packet(bin) do {:ok, hello} -> Logger.debug("Client startup message: #{inspect(hello)}") - {user, external_id} = parse_user_info(hello.payload) + {user, external_id} = HH.parse_user_info(hello.payload) Logger.metadata(project: external_id, user: user, mode: data.mode) {:keep_state, data, {:next_event, :internal, {:hello, {user, external_id}}}} @@ -117,7 +118,7 @@ defmodule Supavisor.ClientHandler do end def handle_event(:internal, {:hello, {user, external_id}}, :exchange, %{sock: sock} = data) do - sni_hostname = try_get_sni(sock) + sni_hostname = HH.try_get_sni(sock) case Tenants.get_user_cache(user, external_id, sni_hostname) do {:ok, info} -> @@ -126,7 +127,7 @@ defmodule Supavisor.ClientHandler do if info.tenant.enforce_ssl and !data.ssl do Logger.error("Tenant is not allowed to connect without SSL, user #{user}") - :ok = send_error(sock, "XX000", "SSL connection is required") + :ok = HH.send_error(sock, "XX000", "SSL connection is required") {:stop, :normal, data} else new_data = update_user_data(data, info, user, id) @@ -139,7 +140,7 @@ defmodule Supavisor.ClientHandler do {:error, reason} -> Logger.error("Authentication auth_secrets error: #{inspect(reason)}") - :ok = send_error(sock, "XX000", "Authentication error") + :ok = HH.send_error(sock, "XX000", "Authentication error") {:stop, :normal, data} end end @@ -147,7 +148,7 @@ defmodule Supavisor.ClientHandler do {:error, reason} -> Logger.error("User not found: #{inspect(reason)} #{inspect({user, external_id})}") - :ok = send_error(sock, "XX000", "Tenant or user not found") + :ok = HH.send_error(sock, "XX000", "Tenant or user not found") {:stop, :normal, data} end end @@ -166,7 +167,7 @@ defmodule Supavisor.ClientHandler do Server.exchange_message(:final, "e=#{reason}") end - sock_send(sock, msg) + HH.sock_send(sock, msg) {:stop, :normal, data} @@ -181,7 +182,7 @@ defmodule Supavisor.ClientHandler do end Logger.debug("Exchange success") - :ok = sock_send(sock, Server.authentication_ok()) + :ok = HH.sock_send(sock, Server.authentication_ok()) {:keep_state, %{data | auth_secrets: {method, secrets}}, {:next_event, :internal, :subscribe}} @@ -210,7 +211,7 @@ defmodule Supavisor.ClientHandler do {:error, :max_clients_reached} -> msg = "Max client connections reached" Logger.error(msg) - :ok = send_error(data.sock, "XX000", msg) + :ok = HH.send_error(data.sock, "XX000", msg) {:stop, :normal, data} error -> @@ -220,7 +221,7 @@ defmodule Supavisor.ClientHandler do end def handle_event(:internal, {:greetings, ps}, _, %{sock: sock} = data) do - :ok = sock_send(sock, Server.greetings(ps)) + :ok = HH.sock_send(sock, Server.greetings(ps)) if data.idle_timeout > 0 do {:next_state, :idle, data, idle_check(data.idle_timeout)} @@ -254,7 +255,7 @@ defmodule Supavisor.ClientHandler do # handle Sync message def handle_event(:info, {proto, _, <>}, :idle, data) when proto in [:tcp, :ssl] do Logger.debug("Receive sync") - :ok = sock_send(data.sock, Server.ready_for_query()) + :ok = HH.sock_send(data.sock, Server.ready_for_query()) if data.idle_timeout > 0 do {:keep_state_and_data, idle_check(data.idle_timeout)} @@ -283,7 +284,7 @@ defmodule Supavisor.ClientHandler do if size > 1_000_000 do msg = "Db buffer size is too big: #{size}" Logger.error(msg) - sock_send(data.sock, Server.error_message("XX000", msg)) + HH.sock_send(data.sock, Server.error_message("XX000", msg)) {:stop, :normal, data} else Logger.debug("DB call buffering") @@ -293,7 +294,7 @@ defmodule Supavisor.ClientHandler do {:error, reason} -> msg = "DB call error: #{inspect(reason)}" Logger.error(msg) - sock_send(data.sock, Server.error_message("XX000", msg)) + HH.sock_send(data.sock, Server.error_message("XX000", msg)) {:stop, :normal, data} end end @@ -339,7 +340,7 @@ defmodule Supavisor.ClientHandler do def handle_event({:call, from}, {:client_call, bin, ready?}, _, data) do Logger.debug("--> --> bin #{inspect(byte_size(bin))} bytes") - reply = {:reply, from, sock_send(data.sock, bin)} + reply = {:reply, from, HH.sock_send(data.sock, bin)} if ready? do Logger.debug("Client is ready") @@ -392,7 +393,7 @@ defmodule Supavisor.ClientHandler do end Logger.error(msg) - sock_send(data.sock, Server.error_message("XX000", msg)) + HH.sock_send(data.sock, Server.error_message("XX000", msg)) :ok end @@ -400,70 +401,10 @@ defmodule Supavisor.ClientHandler do ## Internal functions - @spec parse_user_info(map) :: {String.t() | nil, String.t()} - def parse_user_info(%{"user" => user, "options" => %{"reference" => ref}}) do - {user, ref} - end - - def parse_user_info(%{"user" => user}) do - case :binary.matches(user, ".") do - [] -> - {user, nil} - - matches -> - {pos, 1} = List.last(matches) - <> = user - {name, external_id} - end - end - - def decode_startup_packet(<>) do - with {:ok, payload} <- decode_startup_packet_payload(rest) do - pkt = %{ - len: len, - payload: payload, - tag: :startup - } - - {:ok, pkt} - end - end - - def decode_startup_packet(_) do - {:error, :bad_startup_payload} - end - - # The startup packet payload is a list of key/value pairs, separated by null bytes - defp decode_startup_packet_payload(payload) do - fields = String.split(payload, <<0>>, trim: true) - - # If the number of fields is odd, then the payload is malformed - if rem(length(fields), 2) == 1 do - {:error, :bad_startup_payload} - else - map = - fields - |> Enum.chunk_every(2) - |> Enum.map(fn - ["options" = k, v] -> {k, URI.decode_query(v)} - [k, v] -> {k, v} - end) - |> Map.new() - - # We only do light validation on the fields in the payload. The only field we use at the - # moment is `user`. If that's missing, this is a bad payload. - if Map.has_key?(map, "user") do - {:ok, map} - else - {:error, :bad_startup_payload} - end - end - end - @spec handle_exchange(S.sock(), {atom(), fun()}) :: {:ok, binary() | nil} | {:error, String.t()} def handle_exchange({_, socket} = sock, {:auth_query_md5 = method, secrets}) do salt = :crypto.strong_rand_bytes(4) - :ok = sock_send(sock, Server.md5_request(salt)) + :ok = HH.sock_send(sock, Server.md5_request(salt)) with {:ok, %{ @@ -479,7 +420,7 @@ defmodule Supavisor.ClientHandler do end def handle_exchange({_, socket} = sock, {method, secrets}) do - :ok = sock_send(sock, Server.scram_request()) + :ok = HH.sock_send(sock, Server.scram_request()) with {:ok, %{ @@ -494,7 +435,7 @@ defmodule Supavisor.ClientHandler do }, _} <- receive_next(socket, "Timeout while waiting for the second password message"), {:ok, key} <- authenticate_exchange(method, secrets, signatures, p) do message = "v=#{Base.encode64(signatures.server)}" - :ok = sock_send(sock, Server.exchange_message(:final, message)) + :ok = HH.sock_send(sock, Server.exchange_message(:final, message)) {:ok, key} else {:error, message} -> {:error, message} @@ -512,7 +453,7 @@ defmodule Supavisor.ClientHandler do defp reply_first_exchange(sock, method, secrets, channel, nonce, user) do {message, signatures} = exchange_first(method, secrets, nonce, user, channel) - :ok = sock_send(sock, Server.exchange_message(:first, message)) + :ok = HH.sock_send(sock, Server.exchange_message(:first, message)) {:ok, signatures} end @@ -585,23 +526,6 @@ defmodule Supavisor.ClientHandler do } end - @spec sock_send(S.sock(), iodata()) :: :ok | {:error, term()} - defp sock_send({mod, sock}, data) do - mod.send(sock, data) - end - - @spec send_error(S.sock(), String.t(), String.t()) :: :ok | {:error, term()} - defp send_error(sock, code, message) do - data = Server.error_message(code, message) - sock_send(sock, data) - end - - @spec setopts(S.sock(), term()) :: :ok | {:error, term()} - defp setopts({mod, sock}, opts) do - mod = if mod == :gen_tcp, do: :inet, else: mod - mod.setopts(sock, opts) - end - @spec auth_secrets(map, String.t()) :: {:ok, S.secrets()} | {:error, term()} ## password secrets def auth_secrets(%{user: user, tenant: %{require_user: true}}, _) do @@ -704,16 +628,6 @@ defmodule Supavisor.ClientHandler do {message, sings} end - @spec try_get_sni(S.sock()) :: String.t() | nil - def try_get_sni({:ssl, sock}) do - case :ssl.connection_information(sock, [:sni_hostname]) do - {:ok, [sni_hostname: sni]} -> List.to_string(sni) - _ -> nil - end - end - - def try_get_sni(_), do: nil - @spec idle_check(non_neg_integer) :: {:timeout, non_neg_integer, :idle_terminate} defp idle_check(timeout) do {:timeout, timeout, :idle_terminate} diff --git a/lib/supavisor/handler_helpers.ex b/lib/supavisor/handler_helpers.ex new file mode 100644 index 00000000..16530eef --- /dev/null +++ b/lib/supavisor/handler_helpers.ex @@ -0,0 +1,98 @@ +defmodule Supavisor.HandlerHelpers do + @moduledoc false + + alias Supavisor, as: S + alias Supavisor.Protocol.Server + + @spec sock_send(S.sock(), iodata()) :: :ok | {:error, term()} + def sock_send({mod, sock}, data) do + mod.send(sock, data) + end + + @spec setopts(S.sock(), term()) :: :ok | {:error, term()} + def setopts({mod, sock}, opts) do + mod = if mod == :gen_tcp, do: :inet, else: mod + mod.setopts(sock, opts) + end + + @spec activate(S.sock()) :: :ok | {:error, term} + def activate({:gen_tcp, sock}) do + :inet.setopts(sock, active: true) + end + + def activate({:ssl, sock}) do + :ssl.setopts(sock, active: true) + end + + @spec try_ssl_handshake(S.tcp_sock(), boolean) :: + {:ok, S.sock()} | {:error, term()} + def try_ssl_handshake(sock, true) do + case sock_send(sock, Server.ssl_request()) do + :ok -> ssl_recv(sock) + error -> error + end + end + + def try_ssl_handshake(sock, false), do: {:ok, sock} + + @spec ssl_recv(S.tcp_sock()) :: {:ok, S.ssl_sock()} | {:error, term} + def ssl_recv({:gen_tcp, sock} = s) do + case :gen_tcp.recv(sock, 1, 15_000) do + {:ok, <>} -> + ssl_connect(s) + + {:ok, <>} -> + {:ok, s} + + {:error, _} = error -> + error + end + end + + @spec ssl_connect(S.tcp_sock(), pos_integer) :: + {:ok, S.ssl_sock()} | {:error, term} + def ssl_connect({:gen_tcp, sock}, timeout \\ 5000) do + opts = [verify: :verify_none] + + case :ssl.connect(sock, opts, timeout) do + {:ok, ssl_sock} -> + {:ok, {:ssl, ssl_sock}} + + {:error, reason} -> + {:error, reason} + end + end + + @spec send_error(S.sock(), String.t(), String.t()) :: :ok | {:error, term()} + def send_error(sock, code, message) do + data = Server.error_message(code, message) + sock_send(sock, data) + end + + @spec try_get_sni(S.sock()) :: String.t() | nil + def try_get_sni({:ssl, sock}) do + case :ssl.connection_information(sock, [:sni_hostname]) do + {:ok, [sni_hostname: sni]} -> List.to_string(sni) + _ -> nil + end + end + + def try_get_sni(_), do: nil + + @spec parse_user_info(map) :: {String.t() | nil, String.t()} + def parse_user_info(%{"user" => user, "options" => %{"reference" => ref}}) do + {user, ref} + end + + def parse_user_info(%{"user" => user}) do + case :binary.matches(user, ".") do + [] -> + {user, nil} + + matches -> + {pos, 1} = List.last(matches) + <> = user + {name, external_id} + end + end +end diff --git a/lib/supavisor/native_handler.ex b/lib/supavisor/native_handler.ex new file mode 100644 index 00000000..8856e682 --- /dev/null +++ b/lib/supavisor/native_handler.ex @@ -0,0 +1,175 @@ +defmodule Supavisor.NativeHandler do + @moduledoc false + use GenServer + @behaviour :ranch_protocol + + require Logger + alias Supavisor.Helpers, as: H + alias Supavisor.HandlerHelpers, as: HH + alias Supavisor.{Protocol.Server, Tenants} + + @impl true + def start_link(ref, _sock, transport, opts) do + pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts]) + {:ok, pid} + end + + @impl true + def init(_), do: :ignore + + def init(ref, trans, opts) do + Logger.debug("NativeHandler is: #{inspect(self())} opts: #{inspect(opts)}", + pretty: true + ) + + {:ok, sock} = :ranch.handshake(ref) + :ok = trans.setopts(sock, active: true) + + state = %{ + db_sock: nil, + client_sock: {:gen_tcp, sock}, + trans: trans, + acc: nil, + status: :startup, + ssl: false + } + + :gen_server.enter_loop(__MODULE__, [hibernate_after: 5_000], state) + end + + # ssl request from client + @impl true + def handle_info( + {:tcp, sock, <<_::64>>} = _msg, + %{status: :startup, client_sock: {_, sock} = client_sock} = state + ) do + Logger.debug("Client is trying to connect with SSL") + + downstream_cert = H.downstream_cert() + downstream_key = H.downstream_key() + + # SSL negotiation, S/N/Error + if !!downstream_cert and !!downstream_key do + :ok = HH.setopts(client_sock, active: false) + :ok = HH.sock_send(client_sock, "S") + + opts = [ + certfile: downstream_cert, + keyfile: downstream_key + ] + + case :ssl.handshake(elem(client_sock, 1), opts) do + {:ok, ssl_sock} -> + socket = {:ssl, ssl_sock} + :ok = HH.setopts(socket, active: true) + {:noreply, %{state | client_sock: socket, ssl: true, status: :proxy}} + + error -> + Logger.error("SSL handshake error: #{inspect(error)}") + {:stop, :normal, state} + end + else + Logger.error("User requested SSL connection but no downstream cert/key found") + + :ok = HH.sock_send(client_sock, "N") + {:noreply, state} + end + end + + # send packets to client from db + def handle_info({_, sock, bin}, %{db_sock: {_, sock}} = state) do + :ok = HH.sock_send(state.client_sock, bin) + {:noreply, state} + end + + # initial db connection and send startup packet + def handle_info( + {_, sock, bin}, + %{client_sock: {_, sock}, db_sock: nil} = state + ) do + {:ok, hello} = Server.decode_startup_packet(bin) + Logger.debug("Startup packet: #{inspect(hello, pretty: true)}") + {user, external_id} = HH.parse_user_info(hello.payload) + sni_hostname = HH.try_get_sni(sock) + + Logger.metadata(project: external_id, user: user, mode: "native") + + case Tenants.get_tenant_cache(external_id, sni_hostname) do + %{db_host: host, db_port: port, external_id: ext_id} -> + id = Supavisor.id(ext_id, user, :native, :native) + Registry.register(Supavisor.Registry.TenantClients, id, []) + + payload = + if !!hello.payload["user"] do + %{hello.payload | "user" => user} + else + hello.payload + end + |> Server.encode_startup_packet() + + case connect_local(host, port, payload, state.ssl) do + {:ok, db_sock} -> + {:noreply, %{state | db_sock: db_sock}} + + {:error, reason} -> + Logger.error("Error connecting to tenant db: #{inspect(reason)}") + {:stop, :normal, state} + end + + _ -> + Logger.error("Tenant not found: #{inspect({external_id, sni_hostname})}") + + :ok = HH.send_error(state.client_sock, "XX000", "Tenant not found") + {:stop, :normal, state} + end + end + + # send packets to db from client + def handle_info( + {_, sock, bin}, + %{client_sock: {_, sock}, db_sock: db_sock} = state + ) do + :ok = HH.sock_send(db_sock, bin) + {:noreply, state} + end + + def handle_info({:tcp_closed, _} = msg, state) do + Logger.debug("Terminating #{inspect(msg, pretty: true)}") + {:stop, :normal, state} + end + + def handle_info({:ssl_closed, _} = msg, state) do + Logger.debug("Terminating #{inspect(msg, pretty: true)}") + {:stop, :normal, state} + end + + def handle_info(msg, state) do + Logger.error("Undefined message #{inspect(msg, pretty: true)}") + {:noreply, state} + end + + ### Internal functions + + @spec connect_local(String.t(), non_neg_integer, binary, boolean) :: + {:ok, S.sock()} | {:error, term()} + defp connect_local(host, port, payload, ssl?) do + sock_opts = [ + :binary, + {:packet, :raw}, + {:active, false}, + H.detect_ip_version(host) + ] + + host = String.to_charlist(host) + + with {:ok, sock} <- :gen_tcp.connect(host, port, sock_opts), + {:ok, sock} <- HH.try_ssl_handshake({:gen_tcp, sock}, ssl?), + :ok <- HH.sock_send(sock, payload) do + :ok = HH.activate(sock) + {:ok, sock} + else + {:error, _} = error -> + error + end + end +end diff --git a/lib/supavisor/protocol/server.ex b/lib/supavisor/protocol/server.ex index e9ee6914..bcb25a18 100644 --- a/lib/supavisor/protocol/server.ex +++ b/lib/supavisor/protocol/server.ex @@ -384,4 +384,64 @@ defmodule Supavisor.Protocol.Server do def ssl_request() do @ssl_request end + + # The startup packet payload is a list of key/value pairs, separated by null bytes + def decode_startup_packet_payload(payload) do + fields = String.split(payload, <<0>>, trim: true) + + # If the number of fields is odd, then the payload is malformed + if rem(length(fields), 2) == 1 do + {:error, :bad_startup_payload} + else + map = + fields + |> Enum.chunk_every(2) + |> Enum.map(fn + ["options" = k, v] -> {k, URI.decode_query(v)} + [k, v] -> {k, v} + end) + |> Map.new() + + # We only do light validation on the fields in the payload. The only field we use at the + # moment is `user`. If that's missing, this is a bad payload. + if Map.has_key?(map, "user") do + {:ok, map} + else + {:error, :bad_startup_payload} + end + end + end + + def decode_startup_packet(<>) do + with {:ok, payload} <- decode_startup_packet_payload(rest) do + pkt = %{ + len: len, + payload: payload, + tag: :startup + } + + {:ok, pkt} + end + end + + def decode_startup_packet(_) do + {:error, :bad_startup_payload} + end + + def encode_startup_packet(payload) do + bin = + Enum.reduce(payload, "", fn + # remove options + {"options", _}, acc -> + acc + + {"application_name" = k, v}, acc -> + <> <> acc + + {k, v}, acc -> + <> <> acc + end) + + <> + end end diff --git a/lib/supavisor/tenants.ex b/lib/supavisor/tenants.ex index 6a7e0cb9..0ddba30a 100644 --- a/lib/supavisor/tenants.ex +++ b/lib/supavisor/tenants.ex @@ -43,6 +43,29 @@ defmodule Supavisor.Tenants do Tenant |> Repo.get_by(external_id: external_id) |> Repo.preload(:users) end + @spec get_tenant_cache(String.t() | nil, String.t() | nil) :: Tenant.t() | nil + def get_tenant_cache(external_id, sni_hostname) do + cache_key = {:tenant_cache, external_id, sni_hostname} + + case Cachex.fetch(Supavisor.Cache, cache_key, fn _key -> + {:commit, {:cached, get_tenant(external_id, sni_hostname)}, ttl: 5_000} + end) do + {_, {:cached, value}} -> value + {_, {:cached, value}, _} -> value + end + end + + @spec get_tenant(String.t() | nil, String.t() | nil) :: Tenant.t() | nil + def get_tenant(nil, sni) when sni != nil do + Tenant |> Repo.get_by(sni: sni) + end + + def get_tenant(external_id, _) when external_id != nil do + Tenant |> Repo.get_by(external_id: external_id) + end + + def get_tenant(_, _), do: nil + @spec get_user_cache(String.t(), String.t() | nil, String.t() | nil) :: {:ok, map()} | {:error, any()} def get_user_cache(user, external_id, sni_hostname) do From 67b10b692992bf874c98ffc5e978abd3813385ab Mon Sep 17 00:00:00 2001 From: Stas Date: Wed, 18 Oct 2023 17:05:03 +0200 Subject: [PATCH 03/11] fix tests --- VERSION | 2 +- lib/supavisor/application.ex | 9 ++++++++- lib/supavisor/client_handler.ex | 2 +- lib/supavisor/handler_helpers.ex | 18 +++++------------- lib/supavisor/monitoring/prom_ex.ex | 15 ++++++++------- lib/supavisor_web/open_api_schemas.ex | 6 +++--- test/supavisor/client_handler_test.exs | 10 +++++----- test/test_helper.exs | 2 ++ 8 files changed, 33 insertions(+), 31 deletions(-) diff --git a/VERSION b/VERSION index 5d11b147..f806549d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.15 +0.9.16 diff --git a/lib/supavisor/application.ex b/lib/supavisor/application.ex index b6cebd69..4b25aa53 100644 --- a/lib/supavisor/application.ex +++ b/lib/supavisor/application.ex @@ -74,12 +74,19 @@ defmodule Supavisor.Application do child_spec: DynamicSupervisor, strategy: :one_for_one, name: Supavisor.DynamicSupervisor }, Supavisor.Vault, - {Cachex, name: Supavisor.Cache}, Supavisor.TenantsMetrics, # Start the Endpoint (http/https) SupavisorWeb.Endpoint ] + # start Cachex only if the node uses names, this is necessary for test setup + children = + if node() != :nonode@nohost do + [{Cachex, name: Supavisor.Cache} | children] + else + children + end + # See https://hexdocs.pm/elixir/Supervisor.html # for other strategies and supported options opts = [strategy: :one_for_one, name: Supavisor.Supervisor] diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index c7593b54..6d330124 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -279,7 +279,7 @@ defmodule Supavisor.ClientHandler do :keep_state_and_data {:buffering, size} -> - Logger.warn("DB call buffering #{size}}") + Logger.debug("DB call buffering #{size}") if size > 1_000_000 do msg = "Db buffer size is too big: #{size}" diff --git a/lib/supavisor/handler_helpers.ex b/lib/supavisor/handler_helpers.ex index 16530eef..226c2510 100644 --- a/lib/supavisor/handler_helpers.ex +++ b/lib/supavisor/handler_helpers.ex @@ -38,14 +38,9 @@ defmodule Supavisor.HandlerHelpers do @spec ssl_recv(S.tcp_sock()) :: {:ok, S.ssl_sock()} | {:error, term} def ssl_recv({:gen_tcp, sock} = s) do case :gen_tcp.recv(sock, 1, 15_000) do - {:ok, <>} -> - ssl_connect(s) - - {:ok, <>} -> - {:ok, s} - - {:error, _} = error -> - error + {:ok, <>} -> ssl_connect(s) + {:ok, <>} -> {:ok, s} + {:error, _} = error -> error end end @@ -55,11 +50,8 @@ defmodule Supavisor.HandlerHelpers do opts = [verify: :verify_none] case :ssl.connect(sock, opts, timeout) do - {:ok, ssl_sock} -> - {:ok, {:ssl, ssl_sock}} - - {:error, reason} -> - {:error, reason} + {:ok, ssl_sock} -> {:ok, {:ssl, ssl_sock}} + {:error, reason} -> {:error, reason} end end diff --git a/lib/supavisor/monitoring/prom_ex.ex b/lib/supavisor/monitoring/prom_ex.ex index d8b977e3..d52d7af4 100644 --- a/lib/supavisor/monitoring/prom_ex.ex +++ b/lib/supavisor/monitoring/prom_ex.ex @@ -91,15 +91,16 @@ defmodule Supavisor.Monitoring.PromEx do Registry.select(Supavisor.Registry.TenantClients, [{{:"$1", :_, :_}, [], [:"$1"]}]) |> Enum.uniq() - Enum.reduce(pools, metrics, fn {tenant, _, _}, acc -> - {matched, rest} = Enum.split_with(acc, &String.contains?(&1, "tenant=\"#{tenant}\"")) + _ = + Enum.reduce(pools, metrics, fn {tenant, _, _}, acc -> + {matched, rest} = Enum.split_with(acc, &String.contains?(&1, "tenant=\"#{tenant}\"")) - if matched != [] do - Cachex.put(Supavisor.Cache, {:metrics, tenant}, Enum.join(matched, "\n")) - end + if matched != [] do + Cachex.put(Supavisor.Cache, {:metrics, tenant}, Enum.join(matched, "\n")) + end - rest - end) + rest + end) pools end diff --git a/lib/supavisor_web/open_api_schemas.ex b/lib/supavisor_web/open_api_schemas.ex index b7c73d1e..065dac4a 100644 --- a/lib/supavisor_web/open_api_schemas.ex +++ b/lib/supavisor_web/open_api_schemas.ex @@ -43,7 +43,7 @@ defmodule SupavisorWeb.OpenApiSchemas do db_password: "postgres", pool_size: 10, is_manager: false, - max_clients: 25000, + max_clients: 25_000, mode_type: "transaction", inserted_at: "2023-03-27T12:00:00Z", updated_at: "2023-03-27T12:00:00Z" @@ -101,7 +101,7 @@ defmodule SupavisorWeb.OpenApiSchemas do db_user: "postgres", db_password: "postgres", pool_size: 10, - max_clients: 25000, + max_clients: 25_000, pool_checkout_timeout: 1000, is_manager: false, mode_type: "transaction", @@ -172,7 +172,7 @@ defmodule SupavisorWeb.OpenApiSchemas do db_password: "postgres", pool_size: 10, mode_type: "transaction", - max_clients: 25000, + max_clients: 25_000, pool_checkout_timeout: 1000 } ] diff --git a/test/supavisor/client_handler_test.exs b/test/supavisor/client_handler_test.exs index a076b79b..206f3ed9 100644 --- a/test/supavisor/client_handler_test.exs +++ b/test/supavisor/client_handler_test.exs @@ -1,12 +1,12 @@ defmodule Supavisor.ClientHandlerTest do use ExUnit.Case, async: true - alias Supavisor.ClientHandler + alias Supavisor.HandlerHelpers, as: HH describe "parse_user_info/1" do test "extracts the external_id from the username" do payload = %{"user" => "test.user.external_id"} - {name, external_id} = ClientHandler.parse_user_info(payload) + {name, external_id} = HH.parse_user_info(payload) assert name == "test.user" assert external_id == "external_id" end @@ -14,7 +14,7 @@ defmodule Supavisor.ClientHandlerTest do test "username consists only of username" do username = "username" payload = %{"user" => username} - {user, nil} = ClientHandler.parse_user_info(payload) + {user, nil} = HH.parse_user_info(payload) assert username == user end @@ -22,14 +22,14 @@ defmodule Supavisor.ClientHandlerTest do user = "test.user" external_id = "external_id" payload = %{"options" => %{"reference" => external_id}, "user" => user} - {user1, external_id1} = ClientHandler.parse_user_info(payload) + {user1, external_id1} = HH.parse_user_info(payload) assert user1 == user assert external_id1 == external_id end test "unicode in username" do payload = %{"user" => "тестовe.імʼя.external_id"} - {name, external_id} = ClientHandler.parse_user_info(payload) + {name, external_id} = HH.parse_user_info(payload) assert name == "тестовe.імʼя" assert external_id == "external_id" end diff --git a/test/test_helper.exs b/test/test_helper.exs index 5ee4e48c..a3a81c8b 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -7,5 +7,7 @@ Supavisor.Support.Cluster.apply_config(node2) {:ok, _} = :erpc.call(node2, :application, :ensure_all_started, [:supavisor]) +Cachex.start_link(name: Supavisor.Cache) + ExUnit.start() Ecto.Adapters.SQL.Sandbox.mode(Supavisor.Repo, :auto) From 75b7a7df48d48f537844adf0ea5178e1614f418e Mon Sep 17 00:00:00 2001 From: Stas Date: Wed, 18 Oct 2023 17:15:08 +0200 Subject: [PATCH 04/11] make dialyzer happy --- lib/supavisor/native_handler.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/supavisor/native_handler.ex b/lib/supavisor/native_handler.ex index 8856e682..cd822eb9 100644 --- a/lib/supavisor/native_handler.ex +++ b/lib/supavisor/native_handler.ex @@ -4,6 +4,7 @@ defmodule Supavisor.NativeHandler do @behaviour :ranch_protocol require Logger + alias Supavisor, as: S alias Supavisor.Helpers, as: H alias Supavisor.HandlerHelpers, as: HH alias Supavisor.{Protocol.Server, Tenants} From 785e5c263dbf0026aec1eafb9421812c1ff0eb8b Mon Sep 17 00:00:00 2001 From: Chase Granberry Date: Thu, 19 Oct 2023 12:57:08 -0700 Subject: [PATCH 05/11] docs: auto publish docs on main (#189) --- .github/workflows/docs.yml | 15 +++++++++++++++ VERSION | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/docs.yml diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml new file mode 100644 index 00000000..db961517 --- /dev/null +++ b/.github/workflows/docs.yml @@ -0,0 +1,15 @@ +name: Publish docs +on: + push: + branches: + - main +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + with: + python-version: 3.x + - run: pip install mkdocs + - run: mkdocs gh-deploy --force --clean --verbose diff --git a/VERSION b/VERSION index 5d11b147..f806549d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.15 +0.9.16 From f0560c3f894823fcecac24fac28c9e99039ae123 Mon Sep 17 00:00:00 2001 From: Chase Granberry Date: Thu, 19 Oct 2023 13:02:11 -0700 Subject: [PATCH 06/11] fix: install mkdocs-material (#192) * fix: install mkdocs-material * fix: version --- .github/workflows/docs.yml | 1 + VERSION | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index db961517..12cae61d 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -12,4 +12,5 @@ jobs: with: python-version: 3.x - run: pip install mkdocs + - run: pip install mkdocs-material - run: mkdocs gh-deploy --force --clean --verbose diff --git a/VERSION b/VERSION index f806549d..4148992a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.16 +0.9.17 From e3d07da095919b579ad366828094f08d9df3b4bd Mon Sep 17 00:00:00 2001 From: Chase Granberry Date: Thu, 19 Oct 2023 13:04:50 -0700 Subject: [PATCH 07/11] docs: more docs (#190) * docs: more docs * fix: version --- VERSION | 2 +- .../{connecting.md => connecting/overview.md} | 0 docs/connecting/pool_modes.md | 21 +++++++++++++ docs/development/docs.md | 19 ++++++++++++ docs/{ => monitoring}/metrics.md | 31 +++++++++++++------ docs/orms/prisma.md | 17 ++++++++++ mkdocs.yaml | 10 ++++-- 7 files changed, 88 insertions(+), 12 deletions(-) rename docs/{connecting.md => connecting/overview.md} (100%) create mode 100644 docs/connecting/pool_modes.md create mode 100644 docs/development/docs.md rename docs/{ => monitoring}/metrics.md (80%) create mode 100644 docs/orms/prisma.md diff --git a/VERSION b/VERSION index 4148992a..e8895e38 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.17 +0.9.18 \ No newline at end of file diff --git a/docs/connecting.md b/docs/connecting/overview.md similarity index 100% rename from docs/connecting.md rename to docs/connecting/overview.md diff --git a/docs/connecting/pool_modes.md b/docs/connecting/pool_modes.md new file mode 100644 index 00000000..a7a9255c --- /dev/null +++ b/docs/connecting/pool_modes.md @@ -0,0 +1,21 @@ +Configure the `mode_type` on the `user` to set how Supavisor connection pools will behave. + +The `mode_type` can be one of: + +- `transaction` +- `session` +- `native` + +## Transaction Mode + +`transaction` mode assigns a connection to a client for the duration of a single transaction. + +## Session Mode + +`session` mode assigns a connection to a client for the duration of the client connection. + +## Native Mode + +`native` mode proxies a client to the database as if it was directly connected. + +This mode is typically needed to run migrations. diff --git a/docs/development/docs.md b/docs/development/docs.md new file mode 100644 index 00000000..a3c665b4 --- /dev/null +++ b/docs/development/docs.md @@ -0,0 +1,19 @@ +Documentation is managed with [mkdocs](https://www.mkdocs.org/). + +## Adding documentation + +Edit the documentation in: + +`/docs` + +Edit the documentation nav in: + +`/mkdocs.yaml` + +Build and serve the documentation locally with: + +`mkdocs serve` + +Production documentation is built on merge into `main` with the Github Action: + +`/.github/workflows/docs.yml` diff --git a/docs/metrics.md b/docs/monitoring/metrics.md similarity index 80% rename from docs/metrics.md rename to docs/monitoring/metrics.md index 1e3c2caf..a845257d 100644 --- a/docs/metrics.md +++ b/docs/monitoring/metrics.md @@ -5,7 +5,23 @@ The metrics feature provides a range of metrics in the Prometheus format. The ma - `Supavisor.PromEx.Plugins.Tenant` - `Supavisor.Monitoring.Telem` -## Metrics exposed +## Endpoint + +To use the metrics feature, send an HTTP request to the `/metrics` endpoint. The endpoint is secured using Bearer authentication, which requires a JSON Web Token (JWT) generated using the `METRICS_JWT_SECRET` environment variable. Make sure to set this environment variable with a secure secret key. + +When a node receives a request for metrics, it polls all nodes in the cluster, accumulates their metrics, and appends service tags such as region and host. To generate a valid JWT, use a library or tool that supports JWT creation with the HS256 algorithm and the `METRICS_JWT_SECRET` as the secret key. + +Remember to keep the `METRICS_JWT_SECRET` secure and only share it with authorized personnel who require access to the metrics endpoint. + +### Filtered per tenant + +Metrics endpoints filtered for specific tenants are available at their own endpoints: + +``` +/metrics/:external_id +``` + +## System, VM & application metrics The exposed metrics include the following: @@ -17,15 +33,12 @@ The exposed metrics include the following: - CPU utilization - RAM usage - Load average (LA) + +## Tenant metrics + +Supavisor also tags many metrics with the `tenant` `external_id` so you can drill down to metrics per tenant: + - Pool checkout queue time - Number of connected clients - Query duration and query counts - Network usage for client sockets and database sockets - -## Usage - -To use the metrics feature, send an HTTP request to the `/metrics` endpoint. The endpoint is secured using Bearer authentication, which requires a JSON Web Token (JWT) generated using the `METRICS_JWT_SECRET` environment variable. Make sure to set this environment variable with a secure secret key. - -When a node receives a request for metrics, it polls all nodes in the cluster, accumulates their metrics, and appends service tags such as region and host. To generate a valid JWT, use a library or tool that supports JWT creation with the HS256 algorithm and the `METRICS_JWT_SECRET` as the secret key. - -Remember to keep the `METRICS_JWT_SECRET` secure and only share it with authorized personnel who require access to the metrics endpoint. diff --git a/docs/orms/prisma.md b/docs/orms/prisma.md new file mode 100644 index 00000000..a959bc14 --- /dev/null +++ b/docs/orms/prisma.md @@ -0,0 +1,17 @@ +Connecting to a Postgres database with Prisma is easy. + +## PgBouncer Compatability + +Supavisor pool modes behave the same way as PgBouncer. You should be able to connect to Supavisor with the exact same connection string as you use for PgBouncer. + +## Named Prepared Statements + +Prisma will use named prepared statements to query Postgres by default. + +To turn off named prepared statements use `pgbouncer=true` in your connection string with Prisma. + +The `pgbouncer=true` connection string parameter is compatable with Supavisor. + +## Prisma Connection Management + +Make sure to review the [Prisma connection management guide](https://www.prisma.io/docs/guides/performance-and-optimization/connection-management). diff --git a/mkdocs.yaml b/mkdocs.yaml index a2d96117..1f044d33 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -10,10 +10,16 @@ nav: - Development: - Installation: 'development/installation.md' - Setup: 'development/setup.md' + - Docs: 'development/docs.md' - Deployment: - Deploy with Fly.io: 'deployment/fly.md' - - Connecting: 'connecting.md' - - Metrics: 'metrics.md' + - Connecting: + - Overview: 'connecting/overview.md' + - Pool Modes: 'connecting/pool_modes.md' + - Monitoring: + - Metrics: 'monitoring/metrics.md' + - ORMs: + - Prisma: 'orms/prisma.md' theme: name: 'material' From 1b2d21e0b17d127ada13d2837d41cc066012942d Mon Sep 17 00:00:00 2001 From: Joel Lee Date: Fri, 20 Oct 2023 05:47:27 +0800 Subject: [PATCH 08/11] feat: add gh workflow (#180) * feat: add gh workflow * feat: bump version * fix: update to mirror Postgres format * fix: bump version * chore: mirror image * fix: bump VERSION * chore: update publish workflow * chore: remove local changes * fix: bump version --------- Co-authored-by: joel@joellee.org Co-authored-by: Qiao Han Co-authored-by: Chase Granberry --- .github/workflows/mirror.yml | 42 ++++++++++++++++ .github/workflows/publish_docker.yml | 75 ++++++++++++++++++++++++++++ VERSION | 2 +- 3 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/mirror.yml create mode 100644 .github/workflows/publish_docker.yml diff --git a/.github/workflows/mirror.yml b/.github/workflows/mirror.yml new file mode 100644 index 00000000..8178e8c4 --- /dev/null +++ b/.github/workflows/mirror.yml @@ -0,0 +1,42 @@ +name: Mirror Image + +on: + workflow_call: + inputs: + version: + required: true + type: string + workflow_dispatch: + inputs: + version: + description: "Image tag" + required: true + type: string + +jobs: + mirror: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + id-token: write + steps: + - name: configure aws credentials + uses: aws-actions/configure-aws-credentials@v2.2.0 + with: + role-to-assume: ${{ secrets.PROD_AWS_ROLE }} + aws-region: us-east-1 + - uses: docker/login-action@v2 + with: + registry: public.ecr.aws + - uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - uses: akhilerm/tag-push-action@v2.1.0 + with: + src: docker.io/supabase/supavisor:${{ inputs.version }} + dst: | + public.ecr.aws/supabase/supavisor:${{ inputs.version }} + ghcr.io/supabase/supavisor:${{ inputs.version }} diff --git a/.github/workflows/publish_docker.yml b/.github/workflows/publish_docker.yml new file mode 100644 index 00000000..780051fa --- /dev/null +++ b/.github/workflows/publish_docker.yml @@ -0,0 +1,75 @@ +name: Release on Dockerhub + +on: + push: + branches: + - main + paths: + - ".github/workflows/publish_docker.yml" + - "VERSION" + +jobs: + settings: + runs-on: ubuntu-latest + outputs: + docker_version: ${{ steps.settings.outputs.result }} + image_tag: supabase/supavisor:${{ steps.settings.outputs.result }} + steps: + - uses: actions/checkout@v3 + - id: settings + # Remove spaces to get the raw version string + run: echo "result=$(sed -r 's/\s+//g' VERSION)" >> $GITHUB_OUTPUT + + build_image: + needs: settings + strategy: + matrix: + include: + - runner: [self-hosted, X64] + arch: amd64 + - runner: arm-runner + arch: arm64 + runs-on: ${{ matrix.runner }} + timeout-minutes: 180 + outputs: + image_digest: ${{ steps.build.outputs.digest }} + steps: + - run: docker context create builders + - uses: docker/setup-buildx-action@v3 + with: + endpoint: builders + - uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + - id: build + uses: docker/build-push-action@v5 + with: + push: true + tags: ${{ needs.settings.outputs.image_tag }}_${{ matrix.arch }} + platforms: linux/${{ matrix.arch }} + cache-from: type=gha,scope=${{ github.ref_name }}-${{ matrix.arch }} + cache-to: type=gha,mode=max,scope=${{ github.ref_name }}-${{ matrix.arch }} + + merge_manifest: + needs: [settings, build_image] + runs-on: ubuntu-latest + steps: + - uses: docker/setup-buildx-action@v3 + - uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + - name: Merge multi-arch manifests + run: | + docker buildx imagetools create -t ${{ needs.settings.outputs.image_tag }} \ + ${{ needs.settings.outputs.image_tag }}_amd64 \ + ${{ needs.settings.outputs.image_tag }}_arm64 + + publish: + needs: [settings, merge_manifest] + # Call workflow explicitly because events from actions cannot trigger more actions + uses: ./.github/workflows/mirror.yml + with: + version: ${{ needs.settings.outputs.docker_version }} + secrets: inherit diff --git a/VERSION b/VERSION index e8895e38..3ae021c1 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.18 \ No newline at end of file +0.9.19 \ No newline at end of file From 5a0571ece811af1fdd5c89c4b78ab293e6a2dd2a Mon Sep 17 00:00:00 2001 From: Stas Date: Fri, 20 Oct 2023 09:45:17 +0200 Subject: [PATCH 09/11] fix: handle http request in the native mode (#193) * fix: handle http request in the native mode --- VERSION | 2 +- lib/supavisor/native_handler.ex | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index bd758c92..b2160230 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.20 +0.9.21 diff --git a/lib/supavisor/native_handler.ex b/lib/supavisor/native_handler.ex index cd822eb9..66b76f68 100644 --- a/lib/supavisor/native_handler.ex +++ b/lib/supavisor/native_handler.ex @@ -38,8 +38,15 @@ defmodule Supavisor.NativeHandler do :gen_server.enter_loop(__MODULE__, [hibernate_after: 5_000], state) end - # ssl request from client @impl true + # http healthcheck + def handle_info({_, sock, <<"GET", _::binary>>}, state) do + Logger.debug("Client is trying to request HTTP") + HH.sock_send({:gen_tcp, sock}, "HTTP/1.1 204 OK\r\n\r\n") + {:stop, :normal, state} + end + + # ssl request from client def handle_info( {:tcp, sock, <<_::64>>} = _msg, %{status: :startup, client_sock: {_, sock} = client_sock} = state From c6e7335eed1cf8fcbad6b40e31505de6f8051131 Mon Sep 17 00:00:00 2001 From: Stas Date: Tue, 24 Oct 2023 10:14:15 +0200 Subject: [PATCH 10/11] feat: handle cancelling requests (#194) --- VERSION | 2 +- lib/supavisor.ex | 15 ++++--- lib/supavisor/application.ex | 1 + lib/supavisor/client_handler.ex | 47 +++++++++++++++++-- lib/supavisor/db_handler.ex | 7 +++ lib/supavisor/handler_helpers.ex | 32 +++++++++++++ lib/supavisor/native_handler.ex | 70 +++++++++++++++++++++++------ lib/supavisor/protocol/server.ex | 25 ++++++----- test/supavisor/protocol_test.exs | 36 ++++++--------- test/supavisor/syn_handler_test.exs | 4 +- test/support/fixtures/helpers.ex | 4 +- 11 files changed, 180 insertions(+), 63 deletions(-) diff --git a/VERSION b/VERSION index b2160230..ea3f0d7a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.21 +0.9.22 diff --git a/lib/supavisor.ex b/lib/supavisor.ex index 2b5b822a..624bff7e 100644 --- a/lib/supavisor.ex +++ b/lib/supavisor.ex @@ -15,11 +15,11 @@ defmodule Supavisor do @registry Supavisor.Registry.Tenants - @spec start(id, secrets) :: {:ok, pid} | {:error, any} - def start(id, secrets) do + @spec start(id, secrets, String.t() | nil) :: {:ok, pid} | {:error, any} + def start(id, secrets, db_name) do case get_global_sup(id) do nil -> - start_local_pool(id, secrets) + start_local_pool(id, secrets, db_name) pid -> {:ok, pid} @@ -156,8 +156,8 @@ defmodule Supavisor do ## Internal functions - @spec start_local_pool(id, secrets) :: {:ok, pid} | {:error, any} - defp start_local_pool({tenant, user, mode} = id, {method, secrets}) do + @spec start_local_pool(id, secrets, String.t() | nil) :: {:ok, pid} | {:error, any} + defp start_local_pool({tenant, user, mode} = id, {method, secrets}, db_name) do Logger.debug("Starting pool for #{inspect(id)}") case Tenants.get_pool_config(tenant, secrets.().alias) do @@ -194,7 +194,7 @@ defmodule Supavisor do host: String.to_charlist(db_host), port: db_port, user: db_user, - database: db_database, + database: if(db_name != nil, do: db_name, else: db_database), password: fn -> db_pass end, application_name: "supavisor", ip_version: H.ip_version(ip_ver, db_host), @@ -235,7 +235,8 @@ defmodule Supavisor do end end - @spec set_parameter_status(id, [{binary, binary}]) :: :ok | {:error, :not_found} + @spec set_parameter_status(id, [{binary, binary}]) :: + :ok | {:error, :not_found} def set_parameter_status(id, ps) do case get_local_manager(id) do nil -> {:error, :not_found} diff --git a/lib/supavisor/application.ex b/lib/supavisor/application.ex index 4b25aa53..91b33b08 100644 --- a/lib/supavisor/application.ex +++ b/lib/supavisor/application.ex @@ -61,6 +61,7 @@ defmodule Supavisor.Application do PromEx, {Registry, keys: :unique, name: Supavisor.Registry.Tenants}, {Registry, keys: :unique, name: Supavisor.Registry.ManagerTables}, + {Registry, keys: :unique, name: Supavisor.Registry.PoolPids}, {Registry, keys: :duplicate, name: Supavisor.Registry.TenantSups}, {Registry, keys: :duplicate, name: Supavisor.Registry.TenantClients}, {Cluster.Supervisor, [topologies, [name: Supavisor.ClusterSupervisor]]}, diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index 6d330124..311df525 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -57,7 +57,8 @@ defmodule Supavisor.ClientHandler do proxy_type: nil, mode: opts.mode, stats: %{}, - idle_timeout: 0 + idle_timeout: 0, + db_name: nil } :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :exchange, data) @@ -70,6 +71,31 @@ defmodule Supavisor.ClientHandler do {:stop, :normal, data} end + # cancel request + def handle_event(:info, {_, _, <<16::32, 1234::16, 5678::16, pid::32, key::32>>}, _, data) do + Logger.debug("Got cancel query for #{inspect({pid, key})}") + :ok = HH.send_cancel_query(pid, key) + {:stop, :normal, data} + end + + # send cancel request to db + def handle_event(:info, :cancel_query, :busy, data) do + key = {data.tenant, data.db_pid} + Logger.debug("Cancel query for #{inspect(key)}") + + db_pid = data.db_pid + + case db_pid_meta(key) do + [{^db_pid, meta}] -> + :ok = HH.cancel_query(meta.host, meta.port, meta.ip_ver, meta.pid, meta.key) + + error -> + Logger.error("Received cancel but no proc was found #{inspect(key)} #{inspect(error)}") + end + + :keep_state_and_data + end + def handle_event(:info, {:tcp, _, <<_::64>>}, :exchange, %{sock: sock} = data) do Logger.debug("Client is trying to connect with SSL") @@ -109,6 +135,7 @@ defmodule Supavisor.ClientHandler do Logger.debug("Client startup message: #{inspect(hello)}") {user, external_id} = HH.parse_user_info(hello.payload) Logger.metadata(project: external_id, user: user, mode: data.mode) + data = %{data | db_name: hello.payload["database"]} {:keep_state, data, {:next_event, :internal, {:hello, {user, external_id}}}} {:error, error} -> @@ -192,7 +219,7 @@ defmodule Supavisor.ClientHandler do def handle_event(:internal, :subscribe, _, data) do Logger.debug("Subscribe to tenant #{inspect(data.id)}") - with {:ok, sup} <- Supavisor.start(data.id, data.auth_secrets), + with {:ok, sup} <- Supavisor.start(data.id, data.auth_secrets, data.db_name), {:ok, opts} <- Supavisor.subscribe(sup, data.id) do Process.monitor(opts.workers.manager) data = Map.merge(data, opts.workers) @@ -221,7 +248,10 @@ defmodule Supavisor.ClientHandler do end def handle_event(:internal, {:greetings, ps}, _, %{sock: sock} = data) do - :ok = HH.sock_send(sock, Server.greetings(ps)) + {header, <> = payload} = Server.backend_key_data() + msg = [ps, [header, payload], Server.ready_for_query()] + :ok = HH.listen_cancel_query(pid, key) + :ok = HH.sock_send(sock, msg) if data.idle_timeout > 0 do {:next_state, :idle, data, idle_check(data.idle_timeout)} @@ -632,4 +662,15 @@ defmodule Supavisor.ClientHandler do defp idle_check(timeout) do {:timeout, timeout, :idle_terminate} end + + defp db_pid_meta({_, pid} = key) do + rkey = Supavisor.Registry.PoolPids + fnode = node(pid) + + if fnode == node() do + Registry.lookup(rkey, key) + else + :erpc.call(fnode, Registry, :lookup, [rkey, key], 15_000) + end + end end diff --git a/lib/supavisor/db_handler.ex b/lib/supavisor/db_handler.ex index 1484284c..efee8abe 100644 --- a/lib/supavisor/db_handler.ex +++ b/lib/supavisor/db_handler.ex @@ -110,6 +110,13 @@ defmodule Supavisor.DbHandler do %{tag: :ready_for_query, payload: db_state}, {ps, _} -> {ps, db_state} + %{tag: :backend_key_data, payload: payload}, acc -> + key = {data.tenant, self()} + conn = %{host: data.auth.host, port: data.auth.port, ip_ver: data.auth.ip_version} + Registry.register(Supavisor.Registry.PoolPids, key, Map.merge(payload, conn)) + Logger.debug("Backend #{inspect(key)} data: #{inspect(payload)}") + acc + %{payload: {:authentication_sasl_password, methods_b}}, {ps, _} -> nonce = case Server.decode_string(methods_b) do diff --git a/lib/supavisor/handler_helpers.ex b/lib/supavisor/handler_helpers.ex index 226c2510..cef52004 100644 --- a/lib/supavisor/handler_helpers.ex +++ b/lib/supavisor/handler_helpers.ex @@ -1,6 +1,7 @@ defmodule Supavisor.HandlerHelpers do @moduledoc false + alias Phoenix.PubSub alias Supavisor, as: S alias Supavisor.Protocol.Server @@ -9,6 +10,13 @@ defmodule Supavisor.HandlerHelpers do mod.send(sock, data) end + @spec sock_close(nil | S.sock()) :: :ok | {:error, term()} + def sock_close(nil), do: :ok + + def sock_close({mod, sock}) do + mod.close(sock) + end + @spec setopts(S.sock(), term()) :: :ok | {:error, term()} def setopts({mod, sock}, opts) do mod = if mod == :gen_tcp, do: :inet, else: mod @@ -87,4 +95,28 @@ defmodule Supavisor.HandlerHelpers do {name, external_id} end end + + @spec send_cancel_query(non_neg_integer, non_neg_integer) :: :ok | {:errr, term} + def send_cancel_query(pid, key) do + PubSub.broadcast( + Supavisor.PubSub, + "cancel_req:#{pid}_#{key}", + :cancel_query + ) + end + + @spec listen_cancel_query(non_neg_integer, non_neg_integer) :: :ok | {:errr, term} + def listen_cancel_query(pid, key) do + PubSub.subscribe(Supavisor.PubSub, "cancel_req:#{pid}_#{key}") + end + + @spec cancel_query(keyword, non_neg_integer, atom, non_neg_integer, non_neg_integer) :: :ok + def cancel_query(host, port, ip_version, pid, key) do + msg = Server.cancel_message(pid, key) + opts = [:binary, {:packet, :raw}, {:active, true}, ip_version] + {:ok, sock} = :gen_tcp.connect(host, port, opts) + sock = {:gen_tcp, sock} + :ok = sock_send(sock, msg) + :ok = sock_close(sock) + end end diff --git a/lib/supavisor/native_handler.ex b/lib/supavisor/native_handler.ex index 66b76f68..b42f1c73 100644 --- a/lib/supavisor/native_handler.ex +++ b/lib/supavisor/native_handler.ex @@ -32,7 +32,9 @@ defmodule Supavisor.NativeHandler do trans: trans, acc: nil, status: :startup, - ssl: false + ssl: false, + db_auth: nil, + backend_key: nil } :gen_server.enter_loop(__MODULE__, [hibernate_after: 5_000], state) @@ -46,6 +48,16 @@ defmodule Supavisor.NativeHandler do {:stop, :normal, state} end + def handle_info( + {:tcp, sock, <<16::32, 1234::16, 5678::16, pid::32, key::32>>}, + %{status: :startup, client_sock: {_, sock} = client_sock} = state + ) do + Logger.debug("Got cancel query for #{inspect({pid, key})}") + :ok = HH.send_cancel_query(pid, key) + :ok = HH.sock_close(client_sock) + {:stop, :normal, state} + end + # ssl request from client def handle_info( {:tcp, sock, <<_::64>>} = _msg, @@ -85,6 +97,28 @@ defmodule Supavisor.NativeHandler do end # send packets to client from db + def handle_info( + {_, sock, bin}, + %{db_sock: {_, sock}, backend_key: nil} = state + ) do + state = + bin + |> Server.decode() + |> Enum.filter(fn e -> Map.get(e, :tag) == :backend_key_data end) + |> case do + [%{payload: %{key: key, pid: pid} = k}] -> + Logger.debug("Backend key: #{inspect(k)}") + :ok = HH.listen_cancel_query(pid, key) + %{state | backend_key: k} + + _ -> + state + end + + :ok = HH.sock_send(state.client_sock, bin) + {:noreply, state} + end + def handle_info({_, sock, bin}, %{db_sock: {_, sock}} = state) do :ok = HH.sock_send(state.client_sock, bin) {:noreply, state} @@ -115,9 +149,13 @@ defmodule Supavisor.NativeHandler do end |> Server.encode_startup_packet() - case connect_local(host, port, payload, state.ssl) do + ip_ver = H.detect_ip_version(host) + host = String.to_charlist(host) + + case connect_local(host, port, payload, ip_ver, state.ssl) do {:ok, db_sock} -> - {:noreply, %{state | db_sock: db_sock}} + auth = %{host: host, port: port, ip_ver: ip_ver} + {:noreply, %{state | db_sock: db_sock, db_auth: auth}} {:error, reason} -> Logger.error("Error connecting to tenant db: #{inspect(reason)}") @@ -141,14 +179,15 @@ defmodule Supavisor.NativeHandler do {:noreply, state} end - def handle_info({:tcp_closed, _} = msg, state) do - Logger.debug("Terminating #{inspect(msg, pretty: true)}") + def handle_info({closed, _} = msg, state) when closed in [:tcp_closed, :ssl_closed] do + Logger.debug("Closed socket #{inspect(msg, pretty: true)}") {:stop, :normal, state} end - def handle_info({:ssl_closed, _} = msg, state) do - Logger.debug("Terminating #{inspect(msg, pretty: true)}") - {:stop, :normal, state} + def handle_info(:cancel_query, %{backend_key: key, db_auth: auth} = state) do + Logger.debug("Cancel query for #{inspect(key)}") + :ok = HH.cancel_query(auth.host, auth.port, auth.ip_ver, key.pid, key.key) + {:noreply, state} end def handle_info(msg, state) do @@ -156,20 +195,25 @@ defmodule Supavisor.NativeHandler do {:noreply, state} end + @impl true + def terminate(_reason, state) do + Logger.debug("Terminate #{inspect(self())}") + :ok = HH.sock_close(state.db_sock) + :ok = HH.sock_close(state.client_sock) + end + ### Internal functions - @spec connect_local(String.t(), non_neg_integer, binary, boolean) :: + @spec connect_local(keyword, non_neg_integer, binary, atom, boolean) :: {:ok, S.sock()} | {:error, term()} - defp connect_local(host, port, payload, ssl?) do + defp connect_local(host, port, payload, ip_ver, ssl?) do sock_opts = [ :binary, {:packet, :raw}, {:active, false}, - H.detect_ip_version(host) + ip_ver ] - host = String.to_charlist(host) - with {:ok, sock} <- :gen_tcp.connect(host, port, sock_opts), {:ok, sock} <- HH.try_ssl_handshake({:gen_tcp, sock}, ssl?), :ok <- HH.sock_send(sock, payload) do diff --git a/lib/supavisor/protocol/server.ex b/lib/supavisor/protocol/server.ex index bcb25a18..96c04e38 100644 --- a/lib/supavisor/protocol/server.ex +++ b/lib/supavisor/protocol/server.ex @@ -13,6 +13,7 @@ defmodule Supavisor.Protocol.Server do @ready_for_query <> @ssl_request <<8::32, 1234::16, 5679::16>> @scram_request <> + @msg_cancel_header <<16::32, 1234::16, 5678::16>> defmodule Pkt do @moduledoc "Representing a packet structure with tag, length, and payload fields." @@ -133,8 +134,8 @@ defmodule Supavisor.Protocol.Server do end end - def decode_payload(:backend_key_data, <>) do - %{procid: proc_id, secret: secret} + def decode_payload(:backend_key_data, <>) do + %{pid: pid, key: key} end def decode_payload(:ready_for_query, payload) do @@ -360,18 +361,13 @@ defmodule Supavisor.Protocol.Server do [<>, payload] end - @spec backend_key_data() :: iodata() + @spec backend_key_data() :: {iodata(), binary} def backend_key_data() do - procid = System.unique_integer([:positive, :monotonic]) - secret = Enum.random(0..9_999_999_999) - payload = <> + pid = System.unique_integer([:positive, :monotonic]) + key = :crypto.strong_rand_bytes(4) + payload = <> len = IO.iodata_length(payload) + 4 - [<>, payload] - end - - @spec greetings(iodata()) :: iodata() - def greetings(ps) do - [ps, backend_key_data(), @ready_for_query] + {<>, payload} end @spec ready_for_query() :: binary() @@ -444,4 +440,9 @@ defmodule Supavisor.Protocol.Server do <> end + + @spec cancel_message(non_neg_integer, non_neg_integer) :: iodata + def cancel_message(pid, key) do + [@msg_cancel_header, <>] + end end diff --git a/test/supavisor/protocol_test.exs b/test/supavisor/protocol_test.exs index 128cd8b9..1fc7c00d 100644 --- a/test/supavisor/protocol_test.exs +++ b/test/supavisor/protocol_test.exs @@ -46,40 +46,21 @@ defmodule Supavisor.ProtocolTest do end test "backend_key_data/0" do - result = S.backend_key_data() - payload = Enum.at(result, 1) + {header, payload} = S.backend_key_data() len = byte_size(payload) + 4 assert [ %S.Pkt{ tag: :backend_key_data, len: 13, - payload: %{procid: _, secret: _} + payload: %{pid: _, key: _} } - ] = S.decode(result |> IO.iodata_to_binary()) + ] = S.decode([header, payload] |> IO.iodata_to_binary()) - assert hd(result) == <> + assert header == <> assert byte_size(payload) == 8 end - test "greetings/1" do - ps = S.encode_parameter_status(@initial_data) - - dec = - S.greetings(ps) - |> IO.iodata_to_binary() - |> S.decode() - - ready_for_query_pos = Enum.at(dec, -1) - backend_key_data_pos = Enum.at(dec, -2) - assert %S.Pkt{tag: :ready_for_query} = ready_for_query_pos - assert %S.Pkt{tag: :backend_key_data} = backend_key_data_pos - tags = Enum.map(dec, & &1.tag) - assert Enum.count(tags, &(&1 == :parameter_status)) == 13 - assert Enum.count(tags, &(&1 == :backend_key_data)) == 1 - assert Enum.count(tags, &(&1 == :ready_for_query)) == 1 - end - test "decode_payload for error_response" do assert S.decode(@auth_bin_error) == [ %Supavisor.Protocol.Server.Pkt{ @@ -97,4 +78,13 @@ defmodule Supavisor.ProtocolTest do } ] end + + test "cancel_message/2" do + pid = 123 + key = 123_456 + expected = <<0, 0, 0, 16, 4, 210, 22, 46, 0, 0, 0, 123, 0, 1, 226, 64>> + + assert S.cancel_message(pid, key) + |> IO.iodata_to_binary() == expected + end end diff --git a/test/supavisor/syn_handler_test.exs b/test/supavisor/syn_handler_test.exs index fd4e4a54..b6b26d47 100644 --- a/test/supavisor/syn_handler_test.exs +++ b/test/supavisor/syn_handler_test.exs @@ -11,7 +11,7 @@ defmodule Supavisor.SynHandlerTest do secret = %{alias: "postgres"} auth_secret = {:password, fn -> secret end} - {:ok, pid2} = :erpc.call(node2, Supavisor.FixturesHelpers, :start_pool, [@id, secret]) + {:ok, pid2} = :erpc.call(node2, Supavisor.FixturesHelpers, :start_pool, [@id, secret, nil]) Process.sleep(500) assert pid2 == Supavisor.get_global_sup(@id) assert node(pid2) == node2 @@ -19,7 +19,7 @@ defmodule Supavisor.SynHandlerTest do Process.sleep(500) assert nil == Supavisor.get_global_sup(@id) - {:ok, pid1} = Supavisor.start(@id, auth_secret) + {:ok, pid1} = Supavisor.start(@id, auth_secret, nil) assert pid1 == Supavisor.get_global_sup(@id) assert node(pid1) == node() diff --git a/test/support/fixtures/helpers.ex b/test/support/fixtures/helpers.ex index 0e593c96..4467361e 100644 --- a/test/support/fixtures/helpers.ex +++ b/test/support/fixtures/helpers.ex @@ -1,8 +1,8 @@ defmodule Supavisor.FixturesHelpers do @moduledoc false - def start_pool(id, secret) do + def start_pool(id, secret, db_name) do secret = {:password, fn -> secret end} - Supavisor.start(id, secret) + Supavisor.start(id, secret, db_name) end end From da8de63a5583a16bb6cb3928c1d90224313c98ee Mon Sep 17 00:00:00 2001 From: Stas Date: Wed, 25 Oct 2023 17:07:09 +0200 Subject: [PATCH 11/11] fix: prevent client <-> db locking (#195) --- VERSION | 2 +- lib/supavisor/client_handler.ex | 18 +++++++++--------- lib/supavisor/db_handler.ex | 23 ++++++++++++----------- test/supavisor/db_handler_test.exs | 6 +++--- 4 files changed, 25 insertions(+), 24 deletions(-) diff --git a/VERSION b/VERSION index ea3f0d7a..68a59457 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.22 +0.9.23 diff --git a/lib/supavisor/client_handler.ex b/lib/supavisor/client_handler.ex index 311df525..50fa5209 100644 --- a/lib/supavisor/client_handler.ex +++ b/lib/supavisor/client_handler.ex @@ -26,8 +26,8 @@ defmodule Supavisor.ClientHandler do @impl true def callback_mode, do: [:handle_event_function] - def client_call(pid, bin, ready?) do - :gen_statem.call(pid, {:client_call, bin, ready?}, 5000) + def client_cast(pid, bin, ready?) do + :gen_statem.cast(pid, {:client_cast, bin, ready?}) end @impl true @@ -303,7 +303,7 @@ defmodule Supavisor.ClientHandler do end def handle_event(_, {proto, _, bin}, :busy, data) when proto in [:tcp, :ssl] do - case Db.call(data.db_pid, bin) do + case Db.call(data.db_pid, self(), bin) do :ok -> Logger.debug("DB call success") :keep_state_and_data @@ -366,11 +366,11 @@ defmodule Supavisor.ClientHandler do end end - # emulate handle_call - def handle_event({:call, from}, {:client_call, bin, ready?}, _, data) do + # emulate handle_cast + def handle_event(:cast, {:client_cast, bin, ready?}, _, data) do Logger.debug("--> --> bin #{inspect(byte_size(bin))} bytes") - reply = {:reply, from, HH.sock_send(data.sock, bin)} + :ok = HH.sock_send(data.sock, bin) if ready? do Logger.debug("Client is ready") @@ -382,15 +382,15 @@ defmodule Supavisor.ClientHandler do actions = if data.idle_timeout > 0 do - [reply, idle_check(data.idle_timeout)] + idle_check(data.idle_timeout) else - reply + [] end {:next_state, :idle, %{data | db_pid: db_pid, stats: stats}, actions} else Logger.debug("Client is not ready") - {:keep_state_and_data, reply} + :keep_state_and_data end end diff --git a/lib/supavisor/db_handler.ex b/lib/supavisor/db_handler.ex index efee8abe..1a932a4e 100644 --- a/lib/supavisor/db_handler.ex +++ b/lib/supavisor/db_handler.ex @@ -19,9 +19,9 @@ defmodule Supavisor.DbHandler do :gen_statem.start_link(__MODULE__, config, hibernate_after: 5_000) end - @spec call(pid(), binary()) :: :ok | {:error, any()} | {:buffering, non_neg_integer()} - def call(pid, msg) do - :gen_statem.call(pid, {:db_call, msg}) + @spec call(pid(), pid(), binary()) :: :ok | {:error, any()} | {:buffering, non_neg_integer()} + def call(pid, caller, msg) do + :gen_statem.call(pid, {:db_call, caller, msg}, 15_000) end @impl true @@ -240,32 +240,33 @@ defmodule Supavisor.DbHandler do {:keep_state, %{data | buffer: []}} end - def handle_event(:info, {_proto, _, bin}, _, data) do + def handle_event(:info, {_proto, _, bin}, _, %{caller: caller} = data) when is_pid(caller) do # check if the response ends with "ready for query" ready = String.ends_with?(bin, Server.ready_for_query()) - :ok = Client.client_call(data.caller, bin, ready) + Logger.debug("Db ready #{inspect(ready)}") + :ok = Client.client_cast(caller, bin, ready) if ready do {_, stats} = Telem.network_usage(:db, data.sock, data.id, data.stats) - {:keep_state, %{data | stats: stats}} + {:keep_state, %{data | stats: stats, caller: nil}} else :keep_state_and_data end end - def handle_event({:call, {pid, _} = from}, {:db_call, bin}, :idle, %{sock: sock} = data) do + def handle_event({:call, from}, {:db_call, caller, bin}, :idle, %{sock: sock} = data) do reply = {:reply, from, sock_send(sock, bin)} - {:keep_state, %{data | caller: pid}, reply} + {:keep_state, %{data | caller: caller}, reply} end - def handle_event({:call, {pid, _} = from}, {:db_call, bin}, state, %{buffer: buff} = data) do + def handle_event({:call, from}, {:db_call, caller, bin}, state, %{buffer: buff} = data) do Logger.debug( - "state #{state} <-- <-- bin #{inspect(byte_size(bin))} bytes, caller: #{inspect(pid)}" + "state #{state} <-- <-- bin #{inspect(byte_size(bin))} bytes, caller: #{inspect(caller)}" ) new_buff = [bin | buff] reply = {:reply, from, {:buffering, IO.iodata_length(new_buff)}} - {:keep_state, %{data | caller: pid, buffer: new_buff}, reply} + {:keep_state, %{data | caller: caller, buffer: new_buff}, reply} end def handle_event(:info, {:tcp_closed, sock}, state, %{sock: sock} = data) do diff --git a/test/supavisor/db_handler_test.exs b/test/supavisor/db_handler_test.exs index a75a573f..4b2ed693 100644 --- a/test/supavisor/db_handler_test.exs +++ b/test/supavisor/db_handler_test.exs @@ -97,7 +97,7 @@ defmodule Supavisor.DbHandlerTest do data = %{sock: {:gen_tcp, sock}, caller: nil, buffer: []} from = {self(), :test_ref} event = {:call, from} - payload = {:db_call, "test_data"} + payload = {:db_call, self(), "test_data"} {:keep_state, new_data, reply} = Db.handle_event(event, payload, :idle, data) @@ -107,10 +107,10 @@ defmodule Supavisor.DbHandlerTest do end test "handle_event/4 with non-idle state" do - data = %{sock: nil, caller: nil, buffer: []} + data = %{sock: nil, caller: self(), buffer: []} from = {self(), :test_ref} event = {:call, from} - payload = {:db_call, "test_data"} + payload = {:db_call, self(), "test_data"} state = :non_idle {:keep_state, new_data, reply} = Db.handle_event(event, payload, state, data)