Skip to content

Commit fe3ddee

Browse files
committed
driver: fix duplicate id error with mvvc on
Taking the maximum of the index is an implicit transactions, so it is always done with 'read-confirmed' mvcc isolation level. It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. It is hapenning because 'max' for several puts in parallel will be the same since read confirmed isolation level makes visible all transactions that finished the commit. To fix it we wrap it with box.begin/commit and set right isolation level. Closes #207
1 parent 16e3b74 commit fe3ddee

File tree

5 files changed

+135
-4
lines changed

5 files changed

+135
-4
lines changed

queue/abstract/driver/fifo.lua

+18-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,24 @@ end
6868

6969
-- put task in space
7070
function method.put(self, data, opts)
71-
local max = self.space.index.task_id:max()
71+
local max
72+
73+
-- Taking the maximum of the index is an implicit transactions, so it is
74+
-- always done with 'read-confirmed' mvcc isolation level.
75+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
76+
-- It is hapenning because 'max' for several puts in parallel will be the same since
77+
-- read confirmed isolation level makes visible all transactions that finished the commit.
78+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
79+
-- See https://github.com/tarantool/queue/issues/207
80+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
81+
if box.cfg.memtx_use_mvcc_engine then
82+
box.begin({txn_isolation = 'read-committed'})
83+
max = self.space.index.task_id:max()
84+
box.commit()
85+
else
86+
max = self.space.index.task_id:max()
87+
end
88+
7289
local id = max and max[1] + 1 or 0
7390
local task = self.space:insert{id, state.READY, data}
7491
self.on_task_change(task, 'put')

queue/abstract/driver/fifottl.lua

+18-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,24 @@ end
209209

210210
-- put task in space
211211
function method.put(self, data, opts)
212-
local max = self.space.index.task_id:max()
212+
local max
213+
214+
-- Taking the maximum of the index is an implicit transactions, so it is
215+
-- always done with 'read-confirmed' mvcc isolation level.
216+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
217+
-- It is hapenning because 'max' for several puts in parallel will be the same since
218+
-- read confirmed isolation level makes visible all transactions that finished the commit.
219+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
220+
-- See https://github.com/tarantool/queue/issues/207
221+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
222+
if box.cfg.memtx_use_mvcc_engine then
223+
box.begin({txn_isolation = 'read-committed'})
224+
max = self.space.index.task_id:max()
225+
box.commit()
226+
else
227+
max = self.space.index.task_id:max()
228+
end
229+
213230
local id = max and max[i_id] + 1 or 0
214231

215232
local status

queue/abstract/driver/utube.lua

+18-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,24 @@ end
7575

7676
-- put task in space
7777
function method.put(self, data, opts)
78-
local max = self.space.index.task_id:max()
78+
local max
79+
80+
-- Taking the maximum of the index is an implicit transactions, so it is
81+
-- always done with 'read-confirmed' mvcc isolation level.
82+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
83+
-- It is hapenning because 'max' for several puts in parallel will be the same since
84+
-- read confirmed isolation level makes visible all transactions that finished the commit.
85+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
86+
-- See https://github.com/tarantool/queue/issues/207
87+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
88+
if box.cfg.memtx_use_mvcc_engine then
89+
box.begin({txn_isolation = 'read-committed'})
90+
max = self.space.index.task_id:max()
91+
box.commit()
92+
else
93+
max = self.space.index.task_id:max()
94+
end
95+
7996
local id = max and max[1] + 1 or 0
8097
local task = self.space:insert{id, state.READY, tostring(opts.utube), data}
8198
self.on_task_change(task, 'put')

queue/abstract/driver/utubettl.lua

+18-1
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,24 @@ end
217217

218218
-- put task in space
219219
function method.put(self, data, opts)
220-
local max = self.space.index.task_id:max()
220+
local max
221+
222+
-- Taking the maximum of the index is an implicit transactions, so it is
223+
-- always done with 'read-confirmed' mvcc isolation level.
224+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
225+
-- It is hapenning because 'max' for several puts in parallel will be the same since
226+
-- read confirmed isolation level makes visible all transactions that finished the commit.
227+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
228+
-- See https://github.com/tarantool/queue/issues/207
229+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
230+
if box.cfg.memtx_use_mvcc_engine then
231+
box.begin({txn_isolation = 'read-committed'})
232+
max = self.space.index.task_id:max()
233+
box.commit()
234+
else
235+
max = self.space.index.task_id:max()
236+
end
237+
221238
local id = max and max[i_id] + 1 or 0
222239

223240
local status

t/220-mvcc.t

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#!/usr/bin/env tarantool
2+
local qc = require('queue.compat')
3+
local log = require('log')
4+
if not qc.check_version({2, 6, 1}) then
5+
log.info('Tests skipped, tarantool version < 2.6.1')
6+
return
7+
end
8+
local yaml = require('yaml')
9+
local fiber = require('fiber')
10+
11+
local test = require('tap').test()
12+
test:plan(6)
13+
14+
local queue = require('queue')
15+
local state = require('queue.abstract.state')
16+
17+
local tnt = require('t.tnt')
18+
tnt.cfg{memtx_use_mvcc_engine = true}
19+
20+
local engine = 'memtx'
21+
22+
test:ok(rawget(box, 'space'), 'box started')
23+
test:ok(queue, 'queue is loaded')
24+
25+
local tube = queue.create_tube('test', 'fifo', { engine = engine })
26+
test:ok(tube, 'test tube created')
27+
test:is(tube.name, 'test', 'tube.name')
28+
test:is(tube.type, 'fifo', 'tube.type')
29+
30+
test:test('concurent take', function(test)
31+
test:plan(151)
32+
33+
local channel = fiber.channel(1000)
34+
test:ok(channel, 'channel created')
35+
36+
local res = {}
37+
for i = 1, 50 do
38+
fiber.create(function(i)
39+
local taken = tube:take(1)
40+
test:ok(taken, 'Task was taken ' .. i)
41+
table.insert(res, { taken })
42+
channel:put(true)
43+
end, i)
44+
end
45+
46+
fiber.sleep(.1)
47+
test:ok(tube:put(1), 'task 1 was put')
48+
49+
for i = 2, 50 do
50+
fiber.create(function(i)
51+
test:ok(tube:put(i), 'task ' .. i .. ' was put')
52+
end, i)
53+
end
54+
fiber.sleep(.1)
55+
for i = 1, 50 do
56+
test:ok(channel:get(1 / i), 'take was done ' .. i)
57+
end
58+
end)
59+
60+
61+
tnt.finish()
62+
os.exit(test:check() and 0 or 1)
63+
-- vim: set ft=lua:

0 commit comments

Comments
 (0)