Skip to content

Commit 25979e6

Browse files
authored
Merge pull request #488 from basho/fixes/lrb/stop-pools-gh-449
Stop all pools when shutting down.
2 parents 3d78d35 + 3aec1c3 commit 25979e6

File tree

4 files changed

+38
-7
lines changed

4 files changed

+38
-7
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ Contributors
164164
* Daniel Reverri
165165
* [Dan Root](https://github.com/daroot)
166166
* [David Basden](https://github.com/dbasden)
167+
* [David Delassus](https://github.com/linkdd)
167168
* David Koblas
168169
* Dmitry Rozhkov
169170
* Eric Florenzano

RELNOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Riak Python Client Release Notes
22

3+
## [2.5.5 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.5)
4+
5+
* [Stop all pools when client shuts down](https://github.com/basho/riak-python-client/pull/488)
6+
37
## [2.5.4 Release](https://github.com/basho/riak-python-client/issues?q=milestone%3Ariak-python-client-2.5.4)
48

59
* [When converting `datetime` objects to send to Riak TS, `tzinfo` will be used if present](https://github.com/basho/riak-python-client/pull/486)

riak/client/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,12 @@ def __init__(self, protocol='pbc', transport_options={},
130130
self._bucket_types = WeakValueDictionary()
131131
self._tables = WeakValueDictionary()
132132

133+
def __del__(self):
134+
if self._multiget_pool:
135+
self._multiget_pool.stop()
136+
if self._multiput_pool:
137+
self._multiput_pool.stop()
138+
133139
def _get_protocol(self):
134140
return self._protocol
135141

riak/client/multi.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
from riak.riak_object import RiakObject
88
from riak.ts_object import TsObject
99

10+
import atexit
11+
1012
if PY2:
11-
from Queue import Queue
13+
from Queue import Queue, Empty
1214
else:
13-
from queue import Queue
15+
from queue import Queue, Empty
1416

1517
__all__ = ['multiget', 'multiput', 'MultiGetPool', 'MultiPutPool']
1618

@@ -102,9 +104,10 @@ def stop(self):
102104
"""
103105
Signals the worker threads to exit and waits on them.
104106
"""
105-
self._stop.set()
106-
for worker in self._workers:
107-
worker.join()
107+
if not self.stopped():
108+
self._stop.set()
109+
for worker in self._workers:
110+
worker.join()
108111

109112
def stopped(self):
110113
"""
@@ -144,7 +147,11 @@ def _worker_method(self):
144147
output queue.
145148
"""
146149
while not self._should_quit():
147-
task = self._inq.get()
150+
try:
151+
task = self._inq.get(block=True, timeout=0.25)
152+
except Empty:
153+
continue
154+
148155
try:
149156
btype = task.client.bucket_type(task.bucket_type)
150157
obj = btype.bucket(task.bucket).get(task.key, **task.options)
@@ -170,7 +177,11 @@ def _worker_method(self):
170177
the output queue.
171178
"""
172179
while not self._should_quit():
173-
task = self._inq.get()
180+
try:
181+
task = self._inq.get(block=True, timeout=0.25)
182+
except Empty:
183+
continue
184+
174185
try:
175186
obj = task.object
176187
if isinstance(obj, RiakObject):
@@ -193,6 +204,15 @@ def _worker_method(self):
193204
RIAK_MULTIPUT_POOL = MultiPutPool()
194205

195206

207+
def stop_pools():
208+
"""Stop worker pools at exit."""
209+
RIAK_MULTIGET_POOL.stop()
210+
RIAK_MULTIPUT_POOL.stop()
211+
212+
213+
atexit.register(stop_pools)
214+
215+
196216
def multiget(client, keys, **options):
197217
"""Executes a parallel-fetch across multiple threads. Returns a list
198218
containing :class:`~riak.riak_object.RiakObject` or

0 commit comments

Comments
 (0)