Skip to content

Commit 5b358d2

Browse files
committed
issue #49, #55, #58: ensure mnesia cache persistence over node failures
The deduplication cache was created on the node where the resource (exchange or queue) was initialized. If the node crashed and was replaced mnesia could not recover or recreate the table used as a cache. After several attempts, it seems the best way to prevent this from happening is to replicate the table over more nodes to ensure mnesia can recover it in case of a node disappearing. To ensure safe enough persistence without replicating too aggressively, the cache is created on 2/3 of the total cluster nodes. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 5b5eb01 commit 5b358d2

File tree

1 file changed

+13
-18
lines changed

1 file changed

+13
-18
lines changed

lib/cache.ex

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -140,35 +140,23 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Cache do
140140

141141
## Utility functions
142142

143-
# Run Mnesia creation functions handling output
144-
defmacro mnesia_create(function) do
145-
quote do
146-
case unquote(function) do
147-
{:atomic, :ok} -> :ok
148-
{:aborted, {:already_exists, _}} -> :ok
149-
{:aborted, {:already_exists, _, _}} -> :ok
150-
error -> error
151-
end
152-
end
153-
end
154-
155143
# Mnesia cache table creation.
156144
defp cache_create(cache, options) do
157145
persistence = case Keyword.get(options, :persistence) do
158146
:disk -> :disc_copies
159147
:memory -> :ram_copies
160148
end
161149
options = [{:attributes, [:entry, :expiration]},
162-
{persistence, [node()]},
150+
{persistence, cache_replicas()},
163151
{:index, [:expiration]},
164152
{:user_properties, [{:limit, Keyword.get(options, :size)},
165153
{:default_ttl, Keyword.get(options, :ttl)}]}]
166154

167-
with :ok <- mnesia_create(Mnesia.create_table(cache, options)),
168-
:ok <- mnesia_create(Mnesia.add_table_copy(cache, node(), persistence))
169-
do
170-
Mnesia.wait_for_tables([cache], @table_wait_time)
171-
else
155+
case Mnesia.create_table(cache, options) do
156+
{:atomic, :ok} ->
157+
Mnesia.wait_for_tables([cache], @table_wait_time)
158+
{:aborted, {:already_exists, _}} ->
159+
Mnesia.wait_for_tables([cache], @table_wait_time)
172160
error -> error
173161
end
174162
end
@@ -219,4 +207,11 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Cache do
219207

220208
entry
221209
end
210+
211+
# List the nodes on which to create the cache replicas.
212+
# Cache is replicated on two-third of the cluster nodes.
213+
defp cache_replicas() do
214+
nodes = [Node.self() | Node.list()]
215+
nodes |> Enum.split(round((length(nodes) * 2) / 3)) |> elem(0)
216+
end
222217
end

0 commit comments

Comments
 (0)