Skip to content

Commit 6772c8f

Browse files
authored
feat: use iodata for metrics export (supabase#586)
This prevents memory bloat form creating all intermediate binaries that are later dropped. Instead we try to reuse as much of binaries as possible and then let the VM do the concatenation if needed.
1 parent 795f357 commit 6772c8f

File tree

8 files changed

+197
-73
lines changed

8 files changed

+197
-73
lines changed

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.1.6
1+
2.1.7

lib/supavisor/monitoring/prom_ex.ex

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ defmodule Supavisor.Monitoring.PromEx do
5858
end
5959
end
6060

61-
@spec get_metrics() :: String.t()
61+
@spec get_metrics() :: iodata()
6262
def get_metrics do
6363
metrics_tags =
6464
case Application.fetch_env(:supavisor, :metrics_tags) do
@@ -71,7 +71,7 @@ defmodule Supavisor.Monitoring.PromEx do
7171
metrics =
7272
PromEx.get_metrics(__MODULE__)
7373
|> String.split("\n")
74-
|> Enum.map_join("\n", &parse_and_add_tags(&1, def_tags))
74+
|> Enum.map(&parse_and_add_tags(&1, def_tags))
7575

7676
Supavisor.Monitoring.PromEx.ETSCronFlusher
7777
|> PromEx.ETSCronFlusher.defer_ets_flush()
@@ -81,7 +81,7 @@ defmodule Supavisor.Monitoring.PromEx do
8181

8282
@spec do_cache_tenants_metrics() :: list
8383
def do_cache_tenants_metrics do
84-
metrics = get_metrics() |> String.split("\n")
84+
metrics = get_metrics() |> IO.iodata_to_binary() |> String.split("\n")
8585

8686
pools =
8787
Registry.select(Supavisor.Registry.TenantClients, [{{:"$1", :_, :_}, [], [:"$1"]}])
@@ -109,43 +109,21 @@ defmodule Supavisor.Monitoring.PromEx do
109109
end
110110
end
111111

112-
@spec parse_and_add_tags(String.t(), String.t()) :: String.t()
112+
@spec parse_and_add_tags(String.t(), String.t()) :: iodata()
113113
defp parse_and_add_tags(line, def_tags) do
114114
case Regex.run(~r/(?!\#)^(\w+)(?:{(.*?)})?\s*(.+)$/, line) do
115115
nil ->
116-
line
116+
[line, "\n"]
117117

118118
[_, key, tags, value] ->
119-
tags = clean_string(tags)
120-
121119
tags =
122120
if tags == "" do
123121
def_tags
124122
else
125-
"#{tags},#{def_tags}"
123+
[tags, ",", def_tags]
126124
end
127125

128-
"#{key}{#{tags}} #{value}"
126+
[key, "{", tags, "}", value, "\n"]
129127
end
130128
end
131-
132-
@spec clean_string(String.t()) :: String.t()
133-
def clean_string(metric_string) do
134-
regex = ~r/=\s*"([^"]*?)"/
135-
136-
String.replace(metric_string, regex, fn match ->
137-
[_, value] = Regex.run(regex, match)
138-
139-
cleaned =
140-
value
141-
|> String.replace(~r/\n+/, "")
142-
|> String.trim()
143-
144-
if value != cleaned do
145-
Logger.warning("Tag validation: #{inspect(value)} / #{inspect(cleaned)}")
146-
end
147-
148-
"=\"#{cleaned}\""
149-
end)
150-
end
151129
end

lib/supavisor_web/controllers/metrics_controller.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,6 @@ defmodule SupavisorWeb.MetricsController do
5656
end
5757

5858
def merge_node_metrics({_, {_node, metrics}}, acc) do
59-
[metrics <> "\n" | acc]
59+
[metrics, "\n" | acc]
6060
end
6161
end

mix.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ defmodule Supavisor.MixProject do
5252
{:phoenix_live_view, "~> 1.0"},
5353
{:phoenix_live_dashboard, "~> 0.7"},
5454
{:telemetry_poller, "~> 1.0"},
55-
{:peep, "~> 3.1",
56-
github: "hauleth/peep", ref: "94dd35d2c98a858ef20983f2b3da2ab112a07238", override: true},
55+
{:peep, "~> 3.4"},
5756
{:jason, "~> 1.2"},
5857
{:plug_cowboy, "~> 2.5"},
5958
{:joken, "~> 2.6.0"},
@@ -83,7 +82,8 @@ defmodule Supavisor.MixProject do
8382

8483
# Test utilities
8584
{:excoveralls, ">= 0.0.0", only: [:dev, :test]},
86-
{:stream_data, "~> 1.0", only: [:dev, :test]}
85+
{:stream_data, "~> 1.0", only: [:dev, :test]},
86+
{:req, "~> 0.5"}
8787
]
8888
end
8989

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
"octo_fetch": {:hex, :octo_fetch, "0.4.0", "074b5ecbc08be10b05b27e9db08bc20a3060142769436242702931c418695b19", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "cf8be6f40cd519d7000bb4e84adcf661c32e59369ca2827c4e20042eda7a7fc6"},
4545
"open_api_spex": {:hex, :open_api_spex, "3.21.2", "6a704f3777761feeb5657340250d6d7332c545755116ca98f33d4b875777e1e5", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:poison, "~> 3.0 or ~> 4.0 or ~> 5.0 or ~> 6.0", [hex: :poison, repo: "hexpm", optional: true]}, {:ymlr, "~> 2.0 or ~> 3.0 or ~> 4.0 or ~> 5.0", [hex: :ymlr, repo: "hexpm", optional: true]}], "hexpm", "f42ae6ed668b895ebba3e02773cfb4b41050df26f803f2ef634c72a7687dc387"},
4646
"opentelemetry_api": {:hex, :opentelemetry_api, "1.4.0", "63ca1742f92f00059298f478048dfb826f4b20d49534493d6919a0db39b6db04", [:mix, :rebar3], [], "hexpm", "3dfbbfaa2c2ed3121c5c483162836c4f9027def469c41578af5ef32589fcfc58"},
47-
"peep": {:git, "https://github.com/hauleth/peep.git", "94dd35d2c98a858ef20983f2b3da2ab112a07238", [ref: "94dd35d2c98a858ef20983f2b3da2ab112a07238"]},
47+
"peep": {:hex, :peep, "3.4.2", "49d4ca116d994779351959dfee971fb2c7d6506f1821374f3cc4fd39a3d3fadb", [:mix], [{:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:plug, "~> 1.16", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "cae224b09e224bf5584f5375108becf71e2288572a099a122e66735289cd33f4"},
4848
"pg_types": {:hex, :pg_types, "0.4.0", "3ce365c92903c5bb59c0d56382d842c8c610c1b6f165e20c4b652c96fa7e9c14", [:rebar3], [], "hexpm", "b02efa785caececf9702c681c80a9ca12a39f9161a846ce17b01fb20aeeed7eb"},
4949
"pgo": {:hex, :pgo, "0.14.0", "f53711d103d7565db6fc6061fcf4ff1007ab39892439be1bb02d9f686d7e6663", [:rebar3], [{:backoff, "~> 1.1.6", [hex: :backoff, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:pg_types, "~> 0.4.0", [hex: :pg_types, repo: "hexpm", optional: false]}], "hexpm", "71016c22599936e042dc0012ee4589d24c71427d266292f775ebf201d97df9c9"},
5050
"phoenix": {:hex, :phoenix, "1.7.18", "5310c21443514be44ed93c422e15870aef254cf1b3619e4f91538e7529d2b2e4", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "1797fcc82108442a66f2c77a643a62980f342bfeb63d6c9a515ab8294870004e"},
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
defmodule Supavisor.Monitoring.PromExTest do
2+
use Supavisor.DataCase, async: true
3+
use ExUnitProperties
4+
5+
@subject Supavisor.Monitoring.PromEx
6+
7+
@tenant "prom_tenant"
8+
9+
setup do
10+
db_conf = Application.get_env(:supavisor, Repo)
11+
12+
{:ok, proxy} =
13+
Postgrex.start_link(
14+
hostname: db_conf[:hostname],
15+
port: Application.get_env(:supavisor, :proxy_port_transaction),
16+
database: db_conf[:database],
17+
password: db_conf[:password],
18+
username: db_conf[:username] <> "." <> @tenant,
19+
socket_dir: nil,
20+
show_sensitive_data_on_connection_error: true
21+
)
22+
23+
assert :idle == DBConnection.status(proxy)
24+
25+
%{proxy: proxy, user: db_conf[:username], db_name: db_conf[:database]}
26+
end
27+
28+
describe "get_metrics/1" do
29+
@sources %{
30+
{:darwin, :aarch64} => {
31+
"https://github.com/prometheus/prom2json/releases/download/v1.4.1/prom2json-1.4.1.darwin-arm64.tar.gz",
32+
"prom2json-1.4.1.darwin-arm64/prom2json"
33+
},
34+
{:linux, :aarch64} => {
35+
"https://github.com/prometheus/prom2json/releases/download/v1.4.1/prom2json-1.4.1.linux-arm64.tar.gz",
36+
"prom2json-1.4.1.linux-arm64/prom2json"
37+
}
38+
}
39+
40+
setup do
41+
{:ok, prom2json: Supavisor.Downloader.ensure("prom2json", @sources)}
42+
end
43+
44+
@tag :tmp_dir
45+
test "returned metrics are parseable", %{tmp_dir: dir, prom2json: exe} do
46+
metrics = @subject.get_metrics()
47+
file = Path.join(dir, "prom.out")
48+
File.write!(file, metrics)
49+
50+
assert {_, 0} = System.cmd(exe, [file])
51+
end
52+
53+
@tag :tmp_dir
54+
property "non-standard DB names do not cause parsing issues", %{tmp_dir: dir, prom2json: exe} do
55+
tenant = "tenant"
56+
user = "user"
57+
58+
check all db_name <- string(:printable, min_length: 1, max_length: 63) do
59+
Supavisor.Monitoring.Telem.client_join(
60+
:ok,
61+
{{:single, tenant}, user, :session, db_name, nil}
62+
)
63+
64+
metrics = @subject.get_metrics()
65+
file = Path.join(dir, "prom.out")
66+
File.write!(file, metrics)
67+
68+
assert {out, 0} = System.cmd(exe, [file])
69+
assert {:ok, measurements} = Jason.decode(out)
70+
71+
assert %{"metrics" => metrics} =
72+
Enum.find(measurements, &(&1["name"] == "supavisor_client_joins_ok"))
73+
74+
assert Enum.find(metrics, &(&1["labels"]["db_name"] == db_name))
75+
end
76+
end
77+
78+
@tag :tmp_dir
79+
property "non-standard user names do not cause parsing issues", %{
80+
tmp_dir: dir,
81+
prom2json: exe
82+
} do
83+
tenant = "tenant"
84+
db_name = "db_name"
85+
86+
check all user <- string(:printable, min_length: 1, max_length: 63) do
87+
Supavisor.Monitoring.Telem.client_join(
88+
:ok,
89+
{{:single, tenant}, user, :session, db_name, nil}
90+
)
91+
92+
metrics = @subject.get_metrics()
93+
file = Path.join(dir, "prom.out")
94+
File.write!(file, metrics)
95+
96+
assert {out, 0} = System.cmd(exe, [file])
97+
assert {:ok, measurements} = Jason.decode(out)
98+
99+
assert %{"metrics" => metrics} =
100+
Enum.find(measurements, &(&1["name"] == "supavisor_client_joins_ok"))
101+
102+
assert Enum.find(metrics, &(&1["labels"]["db_name"] == db_name))
103+
end
104+
end
105+
106+
@tag :tmp_dir
107+
property "non-standard tenant names do not cause parsing issues", %{tmp_dir: dir, prom2json: exe} do
108+
db_name = "db_name"
109+
user = "user"
110+
111+
check all tenant <- string(:printable, min_length: 1) do
112+
Supavisor.Monitoring.Telem.client_join(
113+
:ok,
114+
{{:single, tenant}, user, :session, db_name, nil}
115+
)
116+
117+
metrics = @subject.get_metrics()
118+
file = Path.join(dir, "prom.out")
119+
File.write!(file, metrics)
120+
121+
assert {out, 0} = System.cmd(exe, [file])
122+
assert {:ok, measurements} = Jason.decode(out)
123+
124+
assert %{"metrics" => metrics} =
125+
Enum.find(measurements, &(&1["name"] == "supavisor_client_joins_ok"))
126+
127+
assert Enum.find(metrics, &(&1["labels"]["db_name"] == db_name))
128+
end
129+
end
130+
end
131+
end

test/supavisor/prom_ex_test.exs

Lines changed: 0 additions & 38 deletions
This file was deleted.

test/support/downloader.ex

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
defmodule Supavisor.Downloader do
2+
def ensure(name, sources) do
3+
case System.find_executable(name) do
4+
nil -> do_download(name, sources)
5+
path -> path
6+
end
7+
end
8+
9+
def do_download(name, sources) do
10+
path = System.tmp_dir!()
11+
arch = os_arch()
12+
13+
source =
14+
case Map.fetch(sources, arch) do
15+
{:ok, src} -> src
16+
:error -> raise "Cannot find source for #{inspect(arch)}"
17+
end
18+
19+
{url, file} =
20+
case source do
21+
{url, file} -> {url, file}
22+
url when is_binary(url) -> {url, name}
23+
end
24+
25+
out = Path.join(path, file)
26+
27+
if not File.exists?(out) do
28+
%Req.Response{status: 200, body: body} = Req.get!(url)
29+
30+
:ok =
31+
:erl_tar.extract({:binary, body}, [
32+
:compressed,
33+
cwd: to_charlist(path),
34+
files: [to_charlist(file)]
35+
])
36+
end
37+
38+
out
39+
end
40+
41+
defp os_arch do
42+
{_, name} = :os.type()
43+
44+
arch =
45+
:erlang.system_info(:system_architecture)
46+
|> List.to_string()
47+
|> String.split("-", parts: 2)
48+
|> hd()
49+
|> String.to_atom()
50+
51+
{name, arch}
52+
end
53+
end

0 commit comments

Comments
 (0)