Skip to content

fix custom driver registration after reboot #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -570,15 +570,44 @@ function method.start()
end

for _, tube_tuple in _queue:pairs() do
local tube = recreate_tube(tube_tuple)
-- gh-66: release all taken tasks on start
tube_release_all_tasks(tube)
-- Recreate tubes for registered drivers only.
-- Check if a driver exists for this type of tube.
if queue.driver[tube_tuple[4]] ~= nil then
local tube = recreate_tube(tube_tuple)
-- gh-66: release all taken tasks on start
tube_release_all_tasks(tube)
end
end

session.on_disconnect(queue._on_consumer_disconnect)
return queue
end

--- Register the custom driver.
-- Unlike the "register_driver" method from init.lua, this method
-- recreates the existing tubes of the registered driver.
function method.register_driver(driver_name, tube_ctr)
if type(tube_ctr.create_space) ~= 'function' or
type(tube_ctr.new) ~= 'function' then
error('tube control methods must contain functions "create_space"'
.. ' and "new"')
end
if queue.driver[driver_name] then
error(('overriding registered driver "%s"'):format(driver_name))
end
queue.driver[driver_name] = tube_ctr

-- Recreates the existing tubes of the registered driver.
local _queue = box.space._queue
for _, tube_tuple in _queue:pairs() do
if tube_tuple[4] == driver_name then
local tube = recreate_tube(tube_tuple)
-- Release all task for tube on start.
tube_release_all_tasks(tube)
end
end
end

local function build_stats(space)
local stats = {tasks = {}, calls = {
ack = 0, bury = 0, delete = 0,
Expand Down
3 changes: 2 additions & 1 deletion queue/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ function wrapper_impl(...)
rawset(queue, name, val)
end
abstract.driver = queue.driver
-- Now the "register_driver" method from abstract will be used.
queue.register_driver = nil
setmetatable(queue, getmetatable(abstract))
queue.start()
else
Expand All @@ -95,7 +97,6 @@ local function queue_init()
if rawget(box, 'space') ~= nil and box.info.ro == false then
-- The box was configured with read_only = false
queue = require('queue.abstract')
queue.register_driver = register_driver
queue.driver = core_drivers
queue.start()
else
Expand Down
42 changes: 42 additions & 0 deletions t/170-register-driver-after-reload.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env tarantool

local os = require('os')
local queue = require('queue')
local tap = require('tap')
local tnt = require('t.tnt')

local test = tap.test('custom driver registration after reload')
test:plan(1)

tnt.cfg()

--- Accept gh-137, we need to check custom driver registration
-- after restart. Instead of tarantool reboot, we will additionally
-- call queue.start() to simulate the reload of the module. This
-- is not a clean enough, because queue module doesn't provide the
-- hot restart.
--
-- All tricks in this test are done by professionals, don't try
-- to repeat it yourself!!!
local function check_driver_registration_after_reload()
local fifo = require('queue.abstract.driver.fifo')
queue.register_driver('fifo_cust', fifo)

local tube = queue.create_tube('tube_cust', 'fifo_cust')
tube:put('1')
local task_id = tube:take()[1]

-- Simulate the module reload.
queue.driver.fifo_cust = nil
queue.start()

-- Check the task has been released after reload.
queue.register_driver('fifo_cust', fifo)
local task_status = queue.tube.tube_cust:peek(task_id)[2]
test:is(task_status, 'r', 'check driver registration after reload')
end

check_driver_registration_after_reload()

tnt.finish()
os.exit(test:check() and 0 or 1)