-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PoC Couch Stats Resource Tracker for tracking process local resource usage #4812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
be84279
e5e4820
ecda8f3
73863a7
12e0267
84be4af
19224af
cafa512
996c385
1bbf7a9
e70ba4d
715b524
be8ab04
4c05a0f
eb20b5d
f4a712c
c758cb7
4f00910
887a7f2
1a1a584
db1f874
50495bf
9578773
f77583a
ac17510
c500f0f
d7e9bd9
f68005a
0371589
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,12 +33,17 @@ | |
|
||
-export([ | ||
create_context/0, create_context/1, create_context/3, | ||
create_coordinator_context/1, create_coordinator_context/2, | ||
set_context_dbname/1, | ||
set_context_username/1, | ||
track/1, | ||
should_track/1 | ||
]). | ||
|
||
-export([ | ||
active/0 | ||
active/0, | ||
active_coordinators/0, | ||
active_workers/0 | ||
]). | ||
|
||
-export([ | ||
|
@@ -93,6 +98,7 @@ | |
%% TODO: overlap between this and couch btree fold invocations | ||
%% TODO: need some way to distinguish fols on views vs find vs all_docs | ||
-define(FRPC_CHANGES_ROW, changes_processed). | ||
%%-define(FRPC_CHANGES_ROW, ?ROWS_READ). | ||
|
||
%% Module pdict markers | ||
-define(DELTA_TA, csrt_delta_ta). | ||
|
@@ -109,13 +115,19 @@ | |
%% TODO: switch to: | ||
%% -record(?RCTX, { | ||
-record(rctx, { | ||
%% Metadata | ||
updated_at = os:timestamp(), | ||
exited_at, | ||
pid_ref, | ||
mfa, | ||
nonce, | ||
from, | ||
type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc | ||
state = alive, | ||
dbname, | ||
username, | ||
|
||
%% Stats counters | ||
db_open = 0, | ||
docs_read = 0, | ||
rows_read = 0, | ||
|
@@ -132,8 +144,7 @@ | |
%% TODO: switch record definitions to be macro based, eg: | ||
%% ?COUCH_BT_GET_KP_NODE = 0, | ||
get_kv_node = 0, | ||
get_kp_node = 0, | ||
state = alive | ||
get_kp_node = 0 | ||
}). | ||
|
||
db_opened() -> inc(db_opened). | ||
|
@@ -208,7 +219,7 @@ inc(?MANGO_EVAL_MATCH, N) -> | |
inc(?DB_OPEN_DOC, N) -> | ||
update_counter(#rctx.?DB_OPEN_DOC, N); | ||
inc(?FRPC_CHANGES_ROW, N) -> | ||
update_counter(#rctx.?FRPC_CHANGES_ROW, N); | ||
update_counter(#rctx.?ROWS_READ, N); %% TODO: rework double use of rows_read | ||
inc(?COUCH_BT_GET_KP_NODE, N) -> | ||
update_counter(#rctx.?COUCH_BT_GET_KP_NODE, N); | ||
inc(?COUCH_BT_GET_KV_NODE, N) -> | ||
|
@@ -238,8 +249,8 @@ maybe_inc([couchdb, query_server, js_filter], Val) -> | |
inc(?COUCH_JS_FILTER, Val); | ||
maybe_inc([couchdb, query_server, js_filtered_docs], Val) -> | ||
inc(?COUCH_JS_FILTERED_DOCS, Val); | ||
maybe_inc(Metric, Val) -> | ||
io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]), | ||
maybe_inc(_Metric, _Val) -> | ||
%%io:format("SKIPPING MAYBE_INC METRIC[~p]: ~p~n", [Val, Metric]), | ||
0. | ||
|
||
|
||
|
@@ -248,6 +259,8 @@ should_track([fabric_rpc, all_docs, spawned]) -> | |
true; | ||
should_track([fabric_rpc, changes, spawned]) -> | ||
true; | ||
should_track([fabric_rpc, changes, processed]) -> | ||
true; | ||
should_track([fabric_rpc, map_view, spawned]) -> | ||
true; | ||
should_track([fabric_rpc, reduce_view, spawned]) -> | ||
|
@@ -283,7 +296,26 @@ update_counter({_Pid,_Ref}=Key, Field, Count) -> | |
ets:update_counter(?MODULE, Key, {Field, Count}, #rctx{pid_ref=Key}). | ||
|
||
|
||
active() -> | ||
active() -> active_int(all). | ||
active_coordinators() -> active_int(coordinators). | ||
active_workers() -> active_int(workers). | ||
|
||
|
||
active_int(coordinators) -> | ||
select_by_type(coordinators); | ||
active_int(workers) -> | ||
select_by_type(workers); | ||
active_int(all) -> | ||
lists:map(fun to_json/1, ets:tab2list(?MODULE)). | ||
|
||
|
||
select_by_type(coordinators) -> | ||
ets:select(couch_stats_resource_tracker, | ||
[{#rctx{type = {coordinator,'_','_'}, _ = '_'}, [], ['$_']}]); | ||
select_by_type(workers) -> | ||
ets:select(couch_stats_resource_tracker, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (node1@127.0.0.1)17> ets:fun2ms(fun(#rctx{type = {worker, _, _}} = R) -> R end).
[{#rctx{started_at = '_',updated_at = '_',exited_at = '_',
pid_ref = '_',mon_ref = '_',mfa = '_',nonce = '_',
from = '_',
type = {worker,'_','_'},
state = '_',dbname = '_',username = '_',db_open = '_',
docs_read = '_',rows_read = '_',btree_folds = '_',
changes_processed = '_',changes_returned = '_',
ioq_calls = '_',io_bytes_read = '_',io_bytes_written = '_',
js_evals = '_',js_filter = '_',js_filter_error = '_',
js_filtered_docs = '_',mango_eval_match = '_',...},
[],
['$_']}] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would only work with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did an experiment % erlc +time +to_core add_delta.erl
-module(ets_select).
-include_lib("stdlib/include/ms_transform.hrl").
-export([test/0]).
%% TODO: switch to:
%% -record(?RCTX, {
-record(rctx, {
%% Metadata
started_at,
updated_at,
exited_at, %% TODO: do we need a final exit time and additional update times afterwards?
pid_ref,
mon_ref,
mfa,
nonce,
from,
type = unknown, %% unknown/background/system/rpc/coordinator/fabric_rpc/etc_rpc/etc
state = alive,
dbname,
username,
%% Stats counters
db_open = 0,
docs_read = 0,
rows_read = 0,
btree_folds = 0,
changes_processed = 0,
changes_returned = 0,
ioq_calls = 0,
io_bytes_read = 0,
io_bytes_written = 0,
js_evals = 0,
js_filter = 0,
js_filter_error = 0,
js_filtered_docs = 0,
mango_eval_match = 0,
%% TODO: switch record definitions to be macro based, eg:
%% ?COUCH_BT_GET_KP_NODE = 0,
get_kv_node = 0,
get_kp_node = 0
}).
select() ->
ets:select(?MODULE, ets:fun2ms(fun(#rctx{type = {worker, _, _}} = R) -> R end)).
test() ->
(catch ets:new(?MODULE, [named_table, {keypos, #rctx.started_at}])),
true = ets:insert(?MODULE, [#rctx{started_at = 1, type = {worker, 1, 2}}]),
true = ets:insert(?MODULE, [#rctx{started_at = 2, type = {other, 1, 2}}]),
io:format("selected = ~p~n", [select()]),
io:format("all = ~p~n", [ets:tab2list(?MODULE)]). The output.
The core erlang generated by compiler
As you can see the magic syntax get expanded
|
||
[{#rctx{type = {worker,'_','_'}, _ = '_'}, [], ['$_']}]); | ||
select_by_type(all) -> | ||
lists:map(fun to_json/1, ets:tab2list(?MODULE)). | ||
|
||
|
||
|
@@ -294,11 +326,17 @@ to_json(#rctx{}=Rctx) -> | |
mfa = MFA0, | ||
nonce = Nonce0, | ||
from = From0, | ||
dbname = DbName, | ||
username = UserName, | ||
db_open = DbOpens, | ||
docs_read = DocsRead, | ||
rows_read = RowsRead, | ||
state = State0, | ||
type = Type, | ||
btree_folds = BtFolds, | ||
get_kp_node = KpNodes, | ||
get_kv_node = KvNodes, | ||
ioq_calls = IoqCalls, | ||
changes_processed = ChangesProcessed | ||
} = Rctx, | ||
%%io:format("TO_JSON_MFA: ~p~n", [MFA0]), | ||
|
@@ -338,27 +376,43 @@ to_json(#rctx{}=Rctx) -> | |
nonce => term_to_json(Nonce), | ||
%%from => From, | ||
from => term_to_json(From), | ||
dbname => DbName, | ||
username => UserName, | ||
db_open => DbOpens, | ||
docs_read => DocsRead, | ||
rows_read => RowsRead, | ||
state => State, | ||
type => term_to_json(Type), | ||
type => term_to_json({type, Type}), | ||
btree_folds => BtFolds, | ||
kp_nodes => KpNodes, | ||
kv_nodes => KvNodes, | ||
ioq_calls => IoqCalls, | ||
changes_processed => ChangesProcessed | ||
}. | ||
|
||
term_to_json({Pid, Ref}) when is_pid(Pid), is_reference(Ref) -> | ||
[?l2b(pid_to_list(Pid)), ?l2b(ref_to_list(Ref))]; | ||
term_to_json({type, {coordinator, _, _} = Type}) -> | ||
%%io:format("SETTING JSON TYPE: ~p~n", [Type]), | ||
?l2b(io_lib:format("~p", [Type])); | ||
term_to_json({A, B, C}) -> | ||
[A, B, C]; | ||
term_to_json(undefined) -> | ||
null; | ||
term_to_json(null) -> | ||
null; | ||
term_to_json(T) -> | ||
T. | ||
|
||
term_to_flat_json({type, {coordinator, _, _} = Type}) -> | ||
%%io:format("SETTING FLAT JSON TYPE: ~p~n", [Type]), | ||
?l2b(io_lib:format("~p", [Type])); | ||
term_to_flat_json(Tuple) when is_tuple(Tuple) -> | ||
?l2b(io_lib:format("~w", [Tuple])); | ||
term_to_flat_json(undefined) -> | ||
null; | ||
term_to_flat_json(null) -> | ||
null; | ||
term_to_flat_json(T) -> | ||
T. | ||
|
||
|
@@ -369,11 +423,17 @@ to_flat_json(#rctx{}=Rctx) -> | |
mfa = MFA0, | ||
nonce = Nonce0, | ||
from = From0, | ||
dbname = DbName, | ||
username = UserName, | ||
db_open = DbOpens, | ||
docs_read = DocsRead, | ||
rows_read = RowsRead, | ||
state = State0, | ||
type = Type, | ||
btree_folds = ChangesProcessed | ||
get_kp_node = KpNodes, | ||
get_kv_node = KvNodes, | ||
btree_folds = ChangesProcessed, | ||
ioq_calls = IoqCalls | ||
} = Rctx, | ||
io:format("TO_JSON_MFA: ~p~n", [MFA0]), | ||
MFA = case MFA0 of | ||
|
@@ -402,6 +462,7 @@ to_flat_json(#rctx{}=Rctx) -> | |
Nonce0 -> | ||
list_to_binary(Nonce0) | ||
end, | ||
io:format("NONCE IS: ~p||~p~n", [Nonce0, Nonce]), | ||
#{ | ||
%%updated_at => ?l2b(io_lib:format("~w", [TP])), | ||
updated_at => term_to_flat_json(TP), | ||
|
@@ -410,11 +471,17 @@ to_flat_json(#rctx{}=Rctx) -> | |
mfa => MFA, | ||
nonce => Nonce, | ||
from => From, | ||
dbname => DbName, | ||
username => UserName, | ||
db_open => DbOpens, | ||
docs_read => DocsRead, | ||
rows_read => RowsRead, | ||
state => State, | ||
type => term_to_flat_json(Type), | ||
btree_folds => ChangesProcessed | ||
type => term_to_flat_json({type, Type}), | ||
kp_nodes => KpNodes, | ||
kv_nodes => KvNodes, | ||
btree_folds => ChangesProcessed, | ||
ioq_calls => IoqCalls | ||
}. | ||
|
||
get_pid_ref() -> | ||
|
@@ -440,22 +507,71 @@ create_context(Pid) -> | |
|
||
%% add type to disnguish coordinator vs rpc_worker | ||
create_context(From, {M,F,_A} = MFA, Nonce) -> | ||
io:format("CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [From, MFA, Nonce]), | ||
Ref = make_ref(), | ||
io:format("[~p] CREAT_CONTEXT MFA[~p]: {~p}: ~p~n", [self(), From, MFA, Nonce]), | ||
PidRef = get_pid_ref(), %% this will instantiate a new PidRef | ||
%%Rctx = make_record(self(), Ref), | ||
%% TODO: extract user_ctx and db/shard from | ||
Rctx = #rctx{ | ||
pid_ref = {self(), Ref}, | ||
pid_ref = PidRef, | ||
from = From, | ||
mfa = MFA, | ||
type = {worker, M, F}, | ||
nonce = Nonce | ||
}, | ||
track(Rctx), | ||
erlang:put(?DELTA_TZ, Rctx), | ||
ets:insert(?MODULE, Rctx), | ||
true = ets:insert(?MODULE, Rctx), | ||
Rctx. | ||
|
||
create_coordinator_context(#httpd{path_parts=Parts} = Req) -> | ||
create_coordinator_context(Req, io_lib:format("~p", [Parts])). | ||
|
||
create_coordinator_context(#httpd{} = Req, Path) -> | ||
io:format("CREATING COORDINATOR CONTEXT ON {~p}~n", [Path]), | ||
#httpd{ | ||
method = Verb, | ||
%%path_parts = Parts, | ||
nonce = Nonce | ||
} = Req, | ||
PidRef = get_pid_ref(), %% this will instantiate a new PidRef | ||
%%Rctx = make_record(self(), Ref), | ||
%% TODO: extract user_ctx and db/shard from Req | ||
Rctx = #rctx{ | ||
pid_ref = PidRef, | ||
%%type = {cooridantor, Verb, Parts}, | ||
type = {coordinator, Verb, [$/ | Path]}, | ||
nonce = Nonce | ||
}, | ||
track(Rctx), | ||
erlang:put(?DELTA_TZ, Rctx), | ||
true = ets:insert(?MODULE, Rctx), | ||
Rctx. | ||
|
||
set_context_dbname(DbName) -> | ||
case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.dbname, DbName}]) of | ||
false -> | ||
Stk = try throw(42) catch _:_:Stk0 -> Stk0 end, | ||
io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [DbName, get_resource(), Stk, process_info(self(), current_stacktrace)]), | ||
timer:sleep(1000), | ||
erlang:halt(kaboomz); | ||
true -> | ||
true | ||
end. | ||
|
||
set_context_username(null) -> | ||
ok; | ||
set_context_username(UserName) -> | ||
io:format("CSRT SETTING USERNAME CONTEXT: ~p~n", [UserName]), | ||
case ets:update_element(?MODULE, get_pid_ref(), [{#rctx.username, UserName}]) of | ||
false -> | ||
Stk = try throw(42) catch _:_:Stk0 -> Stk0 end, | ||
io:format("UPDATING DBNAME[~p] FAILURE WITH CONTEXT: ~p AND STACK:~n~pFOO:: ~p~n~n", [UserName, get_resource(), Stk, process_info(self(), current_stacktrace)]), | ||
timer:sleep(1000), | ||
erlang:halt(kaboomz); | ||
true -> | ||
true | ||
end. | ||
|
||
track(#rctx{}=Rctx) -> | ||
%% TODO: should this block or not? If no, what cleans up zombies? | ||
%% gen_server:call(?MODULE, {track, PR}). | ||
|
@@ -522,6 +638,10 @@ make_delta(#rctx{}=TA, #rctx{}=TB) -> | |
docs_read => TB#rctx.docs_read - TA#rctx.docs_read, | ||
rows_read => TB#rctx.rows_read - TA#rctx.rows_read, | ||
btree_folds => TB#rctx.btree_folds - TA#rctx.btree_folds, | ||
get_kp_node => TB#rctx.get_kp_node - TA#rctx.get_kp_node, | ||
get_kv_node => TB#rctx.get_kv_node - TA#rctx.get_kv_node, | ||
db_open => TB#rctx.db_open - TA#rctx.db_open, | ||
ioq_calls => TB#rctx.ioq_calls - TA#rctx.ioq_calls, | ||
dt => timer:now_diff(TB#rctx.updated_at, TA#rctx.updated_at) | ||
}, | ||
%% TODO: reevaluate this decision | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is fine IMO