Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/supavisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@
[_ | _] = replicas ->
opts =
Enum.map(replicas, fn replica ->
case replica do

Check warning on line 296 in lib/supavisor.ex

View workflow job for this annotation

GitHub Actions / Code style

Function body is nested too deep (max depth is 2, was 3).
%Tenants.ClusterTenants{tenant: tenant, type: type} ->
Map.put(tenant, :replica_type, type)

Expand Down Expand Up @@ -331,6 +331,7 @@
db_host: db_host,
db_port: db_port,
db_database: db_database,
auth_query: auth_query,
default_parameter_status: ps,
ip_version: ip_ver,
default_pool_size: def_pool_size,
Expand All @@ -343,6 +344,7 @@
db_user: db_user,
db_password: db_pass,
pool_size: pool_size,
db_user_alias: alias,
# mode_type: mode_type,
max_clients: max_clients
}
Expand All @@ -361,6 +363,8 @@
sni_hostname: if(sni_hostname != nil, do: to_charlist(sni_hostname)),
port: db_port,
user: db_user,
alias: alias,
auth_query: auth_query,
database: if(db_name != nil, do: db_name, else: db_database),
password: fn -> db_pass end,
application_name: "Supavisor",
Expand Down
5 changes: 4 additions & 1 deletion lib/supavisor/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ defmodule Supavisor.Application do
{Registry, keys: :unique, name: Supavisor.Registry.ManagerTables},
{Registry, keys: :unique, name: Supavisor.Registry.PoolPids},
{Registry, keys: :duplicate, name: Supavisor.Registry.TenantSups},
{Registry, keys: :duplicate, name: Supavisor.Registry.TenantClients},
{Registry,
keys: :duplicate,
name: Supavisor.Registry.TenantClients,
partitions: System.schedulers_online()},
{Cluster.Supervisor, [topologies, [name: Supavisor.ClusterSupervisor]]},
Supavisor.Repo,
# Start the Telemetry supervisor
Expand Down
1 change: 0 additions & 1 deletion lib/supavisor/client_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@

key = {:secrets, tenant_or_alias, user}

case auth_secrets(info, user, key, :timer.hours(24)) do

Check warning on line 283 in lib/supavisor/client_handler.ex

View workflow job for this annotation

GitHub Actions / Code style

Function body is nested too deep (max depth is 2, was 3).
{:ok, auth_secrets} ->
Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}")

Expand Down Expand Up @@ -339,7 +339,7 @@
Cachex.get(Supavisor.Cache, key) == {:ok, nil} do
case auth_secrets(info, data.user, key, 15_000) do
{:ok, {method2, secrets2}} = value ->
if method != method2 or Map.delete(secrets.(), :client_key) != secrets2.() do

Check warning on line 342 in lib/supavisor/client_handler.ex

View workflow job for this annotation

GitHub Actions / Code style

Function body is nested too deep (max depth is 2, was 4).
Logger.warning("ClientHandler: Update secrets and terminate pool")

Cachex.update(
Expand Down Expand Up @@ -883,7 +883,6 @@
host: to_charlist(info.tenant.db_host),
sni_hostname:
if(info.tenant.sni_hostname != nil, do: to_charlist(info.tenant.sni_hostname)),
ip_version: Helpers.ip_version(info.tenant.ip_version, info.tenant.db_host),
port: info.tenant.db_port,
user: user,
password: info.user.db_password,
Expand Down
1 change: 1 addition & 0 deletions lib/supavisor/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
if params["require_user"] do
{:cont, {:ok, version}}
else
case get_user_secret(conn, params["auth_query"], user["db_user"]) do

Check warning on line 63 in lib/supavisor/helpers.ex

View workflow job for this annotation

GitHub Actions / Code style

Function body is nested too deep (max depth is 2, was 4).
{:ok, _} ->
{:halt, {:ok, version}}

Expand Down Expand Up @@ -201,6 +201,7 @@
"""
@spec detect_ip_version(String.t()) :: :inet | :inet6
def detect_ip_version(host) when is_binary(host) do
Logger.info("Detecting IP version for #{host}")
host = String.to_charlist(host)

case :inet.gethostbyname(host) do
Expand Down
116 changes: 116 additions & 0 deletions lib/supavisor/secret_checker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
defmodule Supavisor.SecretChecker do
@moduledoc false

use GenServer
require Logger

alias Supavisor.Helpers

@interval :timer.seconds(15)

def start_link(args) do
name = {:via, Registry, {Supavisor.Registry.Tenants, {:secret_checker, args.id}}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need that registry at all? It doesn't seem to be used for anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, it can be very useful during debugging

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure if it is good approach. Maybe we could use Process.set_label/1 in the future instead?


GenServer.start_link(__MODULE__, args, name: name)
end

def init(args) do
Logger.debug("SecretChecker: Starting secret checker")
tenant = Supavisor.tenant(args.id)

%{auth: auth, user: user} = Enum.find(args.replicas, fn e -> e.replica_type == :write end)

state = %{
tenant: tenant,
auth: auth,
user: user,
key: {:secrets, tenant, user},
ttl: args[:ttl] || :timer.hours(24),
conn: nil,
check_ref: check()
}

Logger.metadata(project: tenant, user: user)
{:ok, state, {:continue, :init_conn}}
end

def handle_continue(:init_conn, %{auth: auth} = state) do
ssl_opts =
if auth.upstream_ssl and auth.upstream_verify == "peer" do
[
{:verify, :verify_peer},
{:cacerts, [Helpers.upstream_cert(auth.upstream_tls_ca)]},
{:server_name_indication, auth.host},
{:customize_hostname_check, [{:match_fun, fn _, _ -> true end}]}
]
end

{:ok, conn} =
Postgrex.start_link(
hostname: auth.host,
port: auth.port,
database: auth.database,
password: auth.password.(),
username: auth.user,
parameters: [application_name: "Supavisor auth_query"],
ssl: auth.upstream_ssl,
socket_options: [
auth.ip_version
],
queue_target: 1_000,
queue_interval: 5_000,
ssl_opts: ssl_opts || []
)

# kill the postgrex connection if the current process exits unexpectedly
Process.link(conn)
{:noreply, %{state | conn: conn}}
end

def handle_info(:check, state) do
Logger.debug("Checking secrets")
check_secrets(state)
Logger.debug("Secrets checked")
{:noreply, %{state | check_ref: check()}}
end

def handle_info(msg, state) do
Logger.error("Unexpected message: #{inspect(msg)}")
{:noreply, state}
end

def terminate(_, state) do
:gen_statem.stop(state.conn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird that Postgrex do not offer Postgrex.stop/1 function. Maybe it would be worth adding such function upstream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I've been wanting to add this PR to Postgrex for a long time

:ok
end

def check(interval \\ @interval),
do: Process.send_after(self(), :check, interval)

def check_secrets(%{auth: auth, user: user, conn: conn} = state) do
case Helpers.get_user_secret(conn, auth.auth_query, user) do
{:ok, secret} ->
method = if secret.digest == :md5, do: :auth_query_md5, else: :auth_query
secrets = Map.put(secret, :alias, auth.alias)

update_cache =
case Cachex.get(Supavisor.Cache, state.key) do
{:ok, {:cached, {_, {old_method, old_secrets}}}} ->
method != old_method or secrets != old_secrets.()

other ->
Logger.error("Failed to get cache: #{inspect(other)}")
true
end

if update_cache do
Logger.info("Secrets changed or not present, updating cache")
value = {:ok, {method, fn -> secrets end}}
Cachex.put(Supavisor.Cache, state.key, {:cached, value}, expire: :timer.hours(24))
end

other ->
Logger.error("Failed to get secret: #{inspect(other)}")
end
end
end
3 changes: 2 additions & 1 deletion lib/supavisor/tenant_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Supavisor.TenantSupervisor do

require Logger
alias Supavisor.Manager
alias Supavisor.SecretChecker

def start_link(%{replicas: [%{mode: mode} = single]} = args)
when mode in [:transaction, :session] do
Expand Down Expand Up @@ -33,7 +34,7 @@ defmodule Supavisor.TenantSupervisor do
}
end)

children = [{Manager, args} | pools]
children = [{Manager, args}, {SecretChecker, args} | pools]

{{type, tenant}, user, mode, db_name, search_path} = args.id
map_id = %{user: user, mode: mode, type: type, db_name: db_name, search_path: search_path}
Expand Down
Loading