Skip to content

replicaset_mode: allow api call in init state #217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

- In replicaset mode, the behavior of the public API is reduced to the same behavior
in all queue states, including INIT. Previously, in the INIT state, an ambiguous
error was thrown when trying to access a public method on a replica and the script
was interrupted by an error.

Old behavior (call `create_tube` on replica, queue is in INIT state):
```
2023-09-04 14:01:11.000 [5990] main/103/replica.lua/box.load_cfg I> set 'read_only' configuration option to true
stack traceback:
/home/void/tmp/cluster/repl/queue/init.lua:44: in function '__index'
replica.lua:13: in main chunk
2023-09-04 14:01:11.004 [5990] main/105/checkpoint_daemon I> scheduled next checkpoint for Mon Sep 4 15:11:32 2023
2023-09-04 14:01:11.004 [5990] main utils.c:610 E> LuajitError: /home/void/tmp/cluster/repl/queue/init.lua:45: Please configure box.cfg{} in read/write mode first
```
After this fix:
```
2023-09-11 10:24:31.463 [19773] main/103/replica.lua abstract.lua:93 E> create_tube: queue is in INIT state
```

## [1.3.2] - 2023-08-24

### Fixed
Expand Down
32 changes: 16 additions & 16 deletions queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,17 @@ end
local tube = {}

-- This check must be called from all public tube methods.
local function check_state()
local function check_state(method)
if queue_state.get() ~= queue_state.states.RUNNING then
log.error(('Queue is in %s state'):format(queue_state.show()))
log.error(('%s: queue is in %s state'):format(method, queue_state.show()))
return false
end

return true
end

function tube.put(self, data, opts)
if not check_state() then
if not check_state("put") then
return nil
end
opts = opts or {}
Expand All @@ -110,7 +110,7 @@ local conds = {}
local releasing_connections = {}

function tube.take(self, timeout)
if not check_state() then
if not check_state("take") then
return nil
end
timeout = util.time(timeout or util.TIMEOUT_INFINITY)
Expand Down Expand Up @@ -151,7 +151,7 @@ function tube.take(self, timeout)
end

function tube.touch(self, id, delta)
if not check_state() then
if not check_state("touch") then
return
end
if delta == nil then
Expand All @@ -177,7 +177,7 @@ function tube.touch(self, id, delta)
end

function tube.ack(self, id)
if not check_state() then
if not check_state("ack") then
return nil
end
check_task_is_taken(self.tube_id, id)
Expand Down Expand Up @@ -206,15 +206,15 @@ local function tube_release_internal(self, id, opts, session_uuid)
end

function tube.release(self, id, opts)
if not check_state() then
if not check_state("release") then
return nil
end
return tube_release_internal(self, id, opts)
end

-- Release all tasks.
function tube.release_all(self)
if not check_state() then
if not check_state("tube") then
return
end
local prefix = ('queue: [tube "%s"] '):format(self.name)
Expand All @@ -229,7 +229,7 @@ function tube.release_all(self)
end

function tube.peek(self, id)
if not check_state() then
if not check_state("peek") then
return nil
end
local task = self.raw:peek(id)
Expand All @@ -240,7 +240,7 @@ function tube.peek(self, id)
end

function tube.bury(self, id)
if not check_state() then
if not check_state("bury") then
return nil
end
local task = self:peek(id)
Expand All @@ -255,15 +255,15 @@ function tube.bury(self, id)
end

function tube.kick(self, count)
if not check_state() then
if not check_state("kick") then
return nil
end
count = count or 1
return self.raw:kick(count)
end

function tube.delete(self, id)
if not check_state() then
if not check_state("delete") then
return nil
end
self:peek(id)
Expand All @@ -272,7 +272,7 @@ end

-- drop tube
function tube.drop(self)
if not check_state() then
if not check_state("drop") then
return nil
end
local tube_name = self.name
Expand Down Expand Up @@ -309,7 +309,7 @@ end
-- truncate tube
-- (delete everything from tube)
function tube.truncate(self)
if not check_state() then
if not check_state("truncate") then
return
end
self.raw:truncate()
Expand All @@ -322,7 +322,7 @@ function tube.on_task_change(self, cb)
end

function tube.grant(self, user, args)
if not check_state() then
if not check_state("grant") then
return
end
local function tube_grant_space(user, name, tp)
Expand Down Expand Up @@ -583,7 +583,7 @@ end
-------------------------------------------------------------------------------
-- create tube
function method.create_tube(tube_name, tube_type, opts)
if not check_state() then
if not check_state("create_tube") then
return
end
opts = opts or {}
Expand Down
16 changes: 13 additions & 3 deletions queue/init.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local abstract = require('queue.abstract')
local queue_state = require('queue.abstract.queue_state')
local queue = nil

Expand Down Expand Up @@ -40,9 +41,18 @@ queue = setmetatable({
state = queue_state.show,
cfg = deferred_cfg,
_VERSION = require('queue.version'),
}, { __index = function()
print(debug.traceback())
error('Please configure box.cfg{} in read/write mode first')
}, { __index = function(self, key)
-- In replicaset mode, the user can attempt to call public methods on the replica start.
-- For example, a single script is used for master and replica.
-- Each public method has a check on the state of the queue, so this forwarding is safe.
if deferred_opts['in_replicaset'] == true then
if abstract[key] ~= nil then
return abstract[key]
end
else
print(debug.traceback())
error('Please configure box.cfg{} in read/write mode first')
end
end
})

Expand Down
4 changes: 3 additions & 1 deletion t/200-master-replica.t
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ local conn = {}
test:plan(8)

test:test('Check master-replica setup', function(test)
test:plan(8)
test:plan(9)
local engine = os.getenv('ENGINE') or 'memtx'
tnt.cluster.cfg{}

Expand All @@ -41,6 +41,8 @@ test:test('Check master-replica setup', function(test)

-- Setup tube. Set ttr = 0.5 for sessions expire testing.
conn:call('queue.cfg', {{ttr = 0.5, in_replicaset = true}})
test:isnil(conn:call('queue.create_tube', {'test', 'fifo'}),
'check api call in INIT state')
queue.cfg{ttr = 0.5, in_replicaset = true}
local tube = queue.create_tube('test', 'fifo', {engine = engine})
test:ok(tube, 'test tube created')
Expand Down