|
190 | 190 | -spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
|
191 | 191 | -spec on_node_up(node()) -> 'ok'.
|
192 | 192 | -spec on_node_down(node()) -> 'ok'.
|
| 193 | +-spec queues_to_delete_from_node_down(node()) -> |
| 194 | + rabbit_misc:execute_mnesia_transaction(fun()). |
193 | 195 | -spec pseudo_queue(name(), pid()) -> rabbit_types:amqqueue().
|
194 | 196 | -spec immutable(rabbit_types:amqqueue()) -> rabbit_types:amqqueue().
|
195 | 197 | -spec store_queue(rabbit_types:amqqueue()) -> 'ok'.
|
@@ -1112,36 +1114,68 @@ maybe_clear_recoverable_node(Node,
|
1112 | 1114 | end.
|
1113 | 1115 |
|
1114 | 1116 | on_node_down(Node) ->
|
1115 |
| - rabbit_misc:execute_mnesia_tx_with_tail( |
1116 |
| - fun () -> QsDels = |
1117 |
| - qlc:e(qlc:q([{QName, delete_queue(QName)} || |
1118 |
| - #amqqueue{name = QName, pid = Pid} = |
1119 |
| - Q <- mnesia:table(rabbit_queue), |
1120 |
| - node(Pid) == Node andalso |
1121 |
| - not rabbit_mnesia:is_process_alive(Pid) andalso |
1122 |
| - (not rabbit_amqqueue:is_mirrored(Q) orelse |
1123 |
| - rabbit_amqqueue:is_dead_exclusive(Q))])), |
1124 |
| - {Qs, Dels} = lists:unzip(QsDels), |
1125 |
| - T = rabbit_binding:process_deletions( |
1126 |
| - lists:foldl(fun rabbit_binding:combine_deletions/2, |
1127 |
| - rabbit_binding:new_deletions(), Dels), |
1128 |
| - ?INTERNAL_USER), |
1129 |
| - fun () -> |
1130 |
| - T(), |
1131 |
| - lists:foreach( |
1132 |
| - fun(QName) -> |
1133 |
| - rabbit_core_metrics:queue_deleted(QName), |
1134 |
| - ok = rabbit_event:notify(queue_deleted, |
1135 |
| - [{name, QName}, |
1136 |
| - {user, ?INTERNAL_USER}]) |
1137 |
| - end, Qs) |
1138 |
| - end |
1139 |
| - end). |
| 1117 | + % Create 1 transaction per N queues that need to be deleted |
| 1118 | + % * 1 transaction for all queues might block everything for a really long time |
| 1119 | + % * ^^^ this is what used to happen before this change |
| 1120 | + % * 1 transaction per queue will result in too many transaction |
| 1121 | + % * ^^^ this is what happens now; it's not perfect, but it's a step in the right direction |
| 1122 | + % * ^^^ OPTIMISE THIS BEFORE MERGING ^^^ |
| 1123 | + % * Maybe 1 transaction for every 10 queues that need to be deleted ? |
| 1124 | + % |
| 1125 | + % For each transaction: |
| 1126 | + % * delete all queues in the transaction |
| 1127 | + % * capture the result for every delete queue |
| 1128 | + [ |
| 1129 | + rabbit_misc:execute_mnesia_tx_with_tail( |
| 1130 | + fun () -> Dels = [delete_queue(Q)], |
| 1131 | + T = rabbit_binding:process_deletions( |
| 1132 | + lists:foldl(fun rabbit_binding:combine_deletions/2, |
| 1133 | + rabbit_binding:new_deletions(), Dels)), |
| 1134 | + fun () -> |
| 1135 | + T(), |
| 1136 | + lists:foreach( |
| 1137 | + fun(QName) -> |
| 1138 | + % When 40k queues are being deleted, |
| 1139 | + % this results in a rabbit_node_monitor function that recurses for 30 minutes, |
| 1140 | + % meaning that no information is available for the node (Management Overview doesn't update): |
| 1141 | + % |
| 1142 | + % {current_stacktrace, |
| 1143 | + % [{rabbit_core_metrics,queue_deleted,1, |
| 1144 | + % [{file,"src/rabbit_core_metrics.erl"},{line,235}]}, |
| 1145 | + % {rabbit_amqqueue,'-on_node_down/1-fun-1-',1, |
| 1146 | + % [{file,"src/rabbit_amqqueue.erl"},{line,1094}]}, |
| 1147 | + % {lists,foreach,2,[{file,"lists.erl"},{line,1338}]}, |
| 1148 | + % {rabbit_amqqueue,'-on_node_down/1-lc$^0/1-0-',1, |
| 1149 | + % [{file,"src/rabbit_amqqueue.erl"},{line,1084}]}, |
| 1150 | + % {rabbit_amqqueue,'-on_node_down/1-lc$^0/1-0-',1, |
| 1151 | + % [{file,"src/rabbit_amqqueue.erl"},{line,1100}]}, |
| 1152 | + % {rabbit_node_monitor,handle_dead_rabbit,2, |
| 1153 | + % [{file,"src/rabbit_node_monitor.erl"},{line,755}]}, |
| 1154 | + % {rabbit_node_monitor,handle_info,2, |
| 1155 | + % [{file,"src/rabbit_node_monitor.erl"},{line,548}]}]} |
| 1156 | + rabbit_core_metrics:queue_deleted(QName), |
| 1157 | + ok = rabbit_event:notify(queue_deleted, |
| 1158 | + [{name, QName}]) |
| 1159 | + end, [Q]) |
| 1160 | + end |
| 1161 | + end) || Q <- queues_to_delete_from_node_down(Node) |
| 1162 | + ]. |
1140 | 1163 |
|
1141 | 1164 | delete_queue(QueueName) ->
|
1142 | 1165 | ok = mnesia:delete({rabbit_queue, QueueName}),
|
1143 | 1166 | rabbit_binding:remove_transient_for_destination(QueueName).
|
1144 | 1167 |
|
| 1168 | +queues_to_delete_from_node_down(NodeDown) -> |
| 1169 | + rabbit_misc:execute_mnesia_transaction(fun () -> |
| 1170 | + qlc:e(qlc:q([QName || |
| 1171 | + #amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue), |
| 1172 | + node(Pid) == NodeDown andalso |
| 1173 | + not rabbit_mnesia:is_process_alive(Pid) andalso |
| 1174 | + (not rabbit_amqqueue:is_mirrored(Q) orelse |
| 1175 | + rabbit_amqqueue:is_dead_exclusive(Q))] |
| 1176 | + )) |
| 1177 | + end). |
| 1178 | + |
1145 | 1179 | pseudo_queue(QueueName, Pid) ->
|
1146 | 1180 | #amqqueue{name = QueueName,
|
1147 | 1181 | durable = false,
|
|
0 commit comments