Skip to content

Commit bc35135

Browse files
committed
Merge pull request #470 from basho/features/lrb/multiput-ts-data
READY: Multi-put for Riak TS data
2 parents 278469b + 62dc3d7 commit bc35135

File tree

1 file changed

+27
-16
lines changed

1 file changed

+27
-16
lines changed

riak/client/multi.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
from threading import Thread, Lock, Event
44
from multiprocessing import cpu_count
55
from six import PY2
6+
7+
from riak.riak_object import RiakObject
8+
from riak.ts_object import TsObject
9+
610
if PY2:
711
from Queue import Queue
812
else:
@@ -20,12 +24,16 @@
2024
POOL_SIZE = 6
2125

2226
#: A :class:`namedtuple` for tasks that are fed to workers in the
23-
#: multi pool.
24-
Task = namedtuple(
25-
'Task',
26-
['client', 'outq',
27-
'bucket_type', 'bucket', 'key',
28-
'object', 'options'])
27+
#: multi get pool.
28+
Task = namedtuple('Task',
29+
['client', 'outq', 'bucket_type', 'bucket', 'key',
30+
'object', 'options'])
31+
32+
33+
#: A :class:`namedtuple` for tasks that are fed to workers in the
34+
#: multi put pool.
35+
PutTask = namedtuple('PutTask',
36+
['client', 'outq', 'object', 'options'])
2937

3038

3139
class MultiPool(object):
@@ -55,7 +63,7 @@ def enq(self, task):
5563
stopping.
5664
5765
:param task: the Task object
58-
:type task: Task
66+
:type task: Task or PutTask
5967
"""
6068
if not self._stop.is_set():
6169
self._inq.put(task)
@@ -164,8 +172,13 @@ def _worker_method(self):
164172
while not self._should_quit():
165173
task = self._inq.get()
166174
try:
167-
robj = task.object
168-
rv = task.client.put(robj, **task.options)
175+
obj = task.object
176+
if isinstance(obj, RiakObject):
177+
rv = task.client.put(obj, **task.options)
178+
elif isinstance(obj, TsObject):
179+
rv = task.client.ts_put(obj, **task.options)
180+
else:
181+
raise ValueError('unknown obj type: %s'.format(type(obj)))
169182
task.outq.put(rv)
170183
except KeyboardInterrupt:
171184
raise
@@ -236,8 +249,9 @@ def multiput(client, objs, **options):
236249
237250
:param client: the client to use
238251
:type client: :class:`RiakClient <riak.client.RiakClient>`
239-
:param objs: the Riak Objects to store in parallel
240-
:type keys: list of `RiakObject <riak.riak_object.RiakObject>`
252+
:param objs: the objects to store in parallel
253+
:type objs: list of `RiakObject <riak.riak_object.RiakObject>` or
254+
`TsObject <riak.ts_object.TsObject>`
241255
:param options: request options to
242256
:meth:`RiakClient.put <riak.client.RiakClient.put>`
243257
:type options: dict
@@ -252,11 +266,8 @@ def multiput(client, objs, **options):
252266
pool = RIAK_MULTIPUT_POOL
253267

254268
pool.start()
255-
for robj in objs:
256-
bucket_type = robj.bucket.bucket_type
257-
bucket = robj.bucket.name
258-
key = robj.key
259-
task = Task(client, outq, bucket_type, bucket, key, robj, options)
269+
for obj in objs:
270+
task = PutTask(client, outq, obj, options)
260271
pool.enq(task)
261272

262273
results = []

0 commit comments

Comments
 (0)