Skip to content

Commit 6aaa1f7

Browse files
committed
PYTHON-1269 Kill cursors synchronously in Cursor.close and MongoClient.close.
1 parent 3359f85 commit 6aaa1f7

File tree

4 files changed

+151
-71
lines changed

4 files changed

+151
-71
lines changed

pymongo/cursor.py

+14-11
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ def __init__(self, collection, filter=None, projection=None, skip=0,
115115
.. mongodoc:: cursors
116116
"""
117117
self.__id = None
118+
self.__exhaust = False
119+
self.__exhaust_mgr = None
118120

119121
spec = filter
120122
if spec is None:
@@ -163,8 +165,6 @@ def __init__(self, collection, filter=None, projection=None, skip=0,
163165
self.__collation = validate_collation_or_none(collation)
164166

165167
# Exhaust cursor support
166-
self.__exhaust = False
167-
self.__exhaust_mgr = None
168168
if cursor_type == CursorType.EXHAUST:
169169
if self.__collection.database.client.is_mongos:
170170
raise InvalidOperation('Exhaust cursors are '
@@ -214,8 +214,7 @@ def retrieved(self):
214214
return self.__retrieved
215215

216216
def __del__(self):
217-
if self.__id and not self.__killed:
218-
self.__die()
217+
self.__die()
219218

220219
def rewind(self):
221220
"""Rewind this cursor to its unevaluated state.
@@ -264,7 +263,7 @@ def _clone_base(self):
264263
"""
265264
return Cursor(self.__collection)
266265

267-
def __die(self):
266+
def __die(self, synchronous=False):
268267
"""Closes this cursor.
269268
"""
270269
if self.__id and not self.__killed:
@@ -274,10 +273,14 @@ def __die(self):
274273
# to stop the server from sending more data.
275274
self.__exhaust_mgr.sock.close()
276275
else:
277-
self.__collection.database.client.close_cursor(
278-
self.__id,
279-
_CursorAddress(
280-
self.__address, self.__collection.full_name))
276+
address = _CursorAddress(
277+
self.__address, self.__collection.full_name)
278+
if synchronous:
279+
self.__collection.database.client._close_cursor_now(
280+
self.__id, address)
281+
else:
282+
self.__collection.database.client.close_cursor(
283+
self.__id, address)
281284
if self.__exhaust and self.__exhaust_mgr:
282285
self.__exhaust_mgr.close()
283286
self.__killed = True
@@ -287,7 +290,7 @@ def close(self):
287290
other Python implementations that don't use reference counting
288291
garbage collection.
289292
"""
290-
self.__die()
293+
self.__die(True)
291294

292295
def __query_spec(self):
293296
"""Get the spec to use for a query.
@@ -1126,7 +1129,7 @@ def __enter__(self):
11261129
return self
11271130

11281131
def __exit__(self, exc_type, exc_val, exc_tb):
1129-
self.__die()
1132+
self.close()
11301133

11311134
def __copy__(self):
11321135
"""Support function for `copy.copy()`.

pymongo/mongo_client.py

+75-59
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,9 @@ def close(self):
778778
If this instance is used again it will be automatically re-opened and
779779
the threads restarted.
780780
"""
781+
# Run _process_periodic_tasks to send pending killCursor requests
782+
# before closing the topology.
783+
self._process_periodic_tasks()
781784
self._topology.close()
782785

783786
def set_cursor_manager(self, manager_class):
@@ -1024,6 +1027,21 @@ def close_cursor(self, cursor_id, address=None):
10241027
else:
10251028
self.__kill_cursors_queue.append((address, [cursor_id]))
10261029

1030+
def _close_cursor_now(self, cursor_id, address=None):
1031+
"""Send a kill cursors message with the given id.
1032+
1033+
What closing the cursor actually means depends on this client's
1034+
cursor manager. If there is none, the cursor is closed synchronously
1035+
on the current thread.
1036+
"""
1037+
if not isinstance(cursor_id, integer_types):
1038+
raise TypeError("cursor_id must be an instance of (int, long)")
1039+
1040+
if self.__cursor_manager is not None:
1041+
self.__cursor_manager.close(cursor_id, address)
1042+
else:
1043+
self._kill_cursors([cursor_id], address, self._get_topology())
1044+
10271045
def kill_cursors(self, cursor_ids, address=None):
10281046
"""DEPRECATED - Send a kill cursors message soon with the given ids.
10291047
@@ -1055,6 +1073,62 @@ def kill_cursors(self, cursor_ids, address=None):
10551073
# "Atomic", needs no lock.
10561074
self.__kill_cursors_queue.append((address, cursor_ids))
10571075

1076+
def _kill_cursors(self, cursor_ids, address, topology):
1077+
"""Send a kill cursors message with the given ids."""
1078+
listeners = self._event_listeners
1079+
publish = listeners.enabled_for_commands
1080+
if address:
1081+
# address could be a tuple or _CursorAddress, but
1082+
# select_server_by_address needs (host, port).
1083+
server = topology.select_server_by_address(tuple(address))
1084+
else:
1085+
# Application called close_cursor() with no address.
1086+
server = topology.select_server(writable_server_selector)
1087+
1088+
try:
1089+
namespace = address.namespace
1090+
db, coll = namespace.split('.', 1)
1091+
except AttributeError:
1092+
namespace = None
1093+
db = coll = "OP_KILL_CURSORS"
1094+
1095+
spec = SON([('killCursors', coll), ('cursors', cursor_ids)])
1096+
with server.get_socket(self.__all_credentials) as sock_info:
1097+
if sock_info.max_wire_version >= 4 and namespace is not None:
1098+
sock_info.command(db, spec)
1099+
else:
1100+
if publish:
1101+
start = datetime.datetime.now()
1102+
request_id, msg = message.kill_cursors(cursor_ids)
1103+
if publish:
1104+
duration = datetime.datetime.now() - start
1105+
# Here and below, address could be a tuple or
1106+
# _CursorAddress. We always want to publish a
1107+
# tuple to match the rest of the monitoring
1108+
# API.
1109+
listeners.publish_command_start(
1110+
spec, db, request_id, tuple(address))
1111+
start = datetime.datetime.now()
1112+
1113+
try:
1114+
sock_info.send_message(msg, 0)
1115+
except Exception as exc:
1116+
if publish:
1117+
dur = ((datetime.datetime.now() - start) + duration)
1118+
listeners.publish_command_failure(
1119+
dur, message._convert_exception(exc),
1120+
'killCursors', request_id,
1121+
tuple(address))
1122+
raise
1123+
1124+
if publish:
1125+
duration = ((datetime.datetime.now() - start) + duration)
1126+
# OP_KILL_CURSORS returns no reply, fake one.
1127+
reply = {'cursorsUnknown': cursor_ids, 'ok': 1}
1128+
listeners.publish_command_success(
1129+
duration, reply, 'killCursors', request_id,
1130+
tuple(address))
1131+
10581132
# This method is run periodically by a background thread.
10591133
def _process_periodic_tasks(self):
10601134
"""Process any pending kill cursors requests and
@@ -1072,68 +1146,10 @@ def _process_periodic_tasks(self):
10721146

10731147
# Don't re-open topology if it's closed and there's no pending cursors.
10741148
if address_to_cursor_ids:
1075-
listeners = self._event_listeners
1076-
publish = listeners.enabled_for_commands
10771149
topology = self._get_topology()
10781150
for address, cursor_ids in address_to_cursor_ids.items():
10791151
try:
1080-
if address:
1081-
# address could be a tuple or _CursorAddress, but
1082-
# select_server_by_address needs (host, port).
1083-
server = topology.select_server_by_address(
1084-
tuple(address))
1085-
else:
1086-
# Application called close_cursor() with no address.
1087-
server = topology.select_server(
1088-
writable_server_selector)
1089-
1090-
try:
1091-
namespace = address.namespace
1092-
db, coll = namespace.split('.', 1)
1093-
except AttributeError:
1094-
namespace = None
1095-
db = coll = "OP_KILL_CURSORS"
1096-
1097-
spec = SON([('killCursors', coll),
1098-
('cursors', cursor_ids)])
1099-
with server.get_socket(self.__all_credentials) as sock_info:
1100-
if (sock_info.max_wire_version >= 4 and
1101-
namespace is not None):
1102-
sock_info.command(db, spec)
1103-
else:
1104-
if publish:
1105-
start = datetime.datetime.now()
1106-
request_id, msg = message.kill_cursors(cursor_ids)
1107-
if publish:
1108-
duration = datetime.datetime.now() - start
1109-
# Here and below, address could be a tuple or
1110-
# _CursorAddress. We always want to publish a
1111-
# tuple to match the rest of the monitoring
1112-
# API.
1113-
listeners.publish_command_start(
1114-
spec, db, request_id, tuple(address))
1115-
start = datetime.datetime.now()
1116-
1117-
try:
1118-
sock_info.send_message(msg, 0)
1119-
except Exception as exc:
1120-
if publish:
1121-
dur = ((datetime.datetime.now() - start)
1122-
+ duration)
1123-
listeners.publish_command_failure(
1124-
dur, message._convert_exception(exc),
1125-
'killCursors', request_id, tuple(address))
1126-
raise
1127-
1128-
if publish:
1129-
duration = ((datetime.datetime.now() - start)
1130-
+ duration)
1131-
# OP_KILL_CURSORS returns no reply, fake one.
1132-
reply = {'cursorsUnknown': cursor_ids, 'ok': 1}
1133-
listeners.publish_command_success(
1134-
duration, reply, 'killCursors', request_id,
1135-
tuple(address))
1136-
1152+
self._kill_cursors(cursor_ids, address, topology)
11371153
except Exception:
11381154
helpers._handle_exception()
11391155
try:

test/test_client.py

+32
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import contextlib
1818
import datetime
19+
import gc
1920
import os
2021
import signal
2122
import socket
@@ -509,6 +510,37 @@ def test_close(self):
509510

510511
coll.count()
511512

513+
def test_close_kills_cursors(self):
514+
if sys.platform.startswith('java'):
515+
# We can't figure out how to make this test reliable with Jython.
516+
raise SkipTest("Can't test with Jython")
517+
# Kill any cursors possibly queued up by previous tests.
518+
gc.collect()
519+
self.client._process_periodic_tasks()
520+
521+
# Add some test data.
522+
coll = self.client.pymongo_test.test_close_kills_cursors
523+
docs_inserted = 1000
524+
coll.insert_many([{"i": i} for i in range(docs_inserted)])
525+
526+
# Open a cursor and leave it open on the server.
527+
cursor = coll.find().batch_size(10)
528+
self.assertTrue(bool(next(cursor)))
529+
self.assertLess(cursor.retrieved, docs_inserted)
530+
del cursor
531+
# Required for PyPy, Jython and other Python implementations that
532+
# don't use reference counting garbage collection.
533+
gc.collect()
534+
535+
# Close the client and ensure the topology is closed.
536+
self.assertTrue(self.client._topology._opened)
537+
self.client.close()
538+
self.assertFalse(self.client._topology._opened)
539+
540+
# The killCursors task should not need to re-open the topology.
541+
self.client._process_periodic_tasks()
542+
self.assertFalse(self.client._topology._opened)
543+
512544
def test_bad_uri(self):
513545
with self.assertRaises(InvalidURI):
514546
MongoClient("http://localhost")

test/test_cursor.py

+30-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
"""Test the cursor module."""
1616
import copy
17+
import gc
1718
import itertools
1819
import random
1920
import re
@@ -1126,7 +1127,6 @@ def test_tailable(self):
11261127

11271128
self.assertEqual(3, db.test.count())
11281129

1129-
11301130
def test_distinct(self):
11311131
self.db.drop_collection("test")
11321132

@@ -1245,6 +1245,35 @@ def test_alive(self):
12451245

12461246
self.assertTrue(cursor.alive)
12471247

1248+
def test_close_kills_cursor_synchronously(self):
1249+
# Kill any cursors possibly queued up by previous tests.
1250+
gc.collect()
1251+
self.client._process_periodic_tasks()
1252+
1253+
listener = WhiteListEventListener("killCursors")
1254+
results = listener.results
1255+
client = rs_or_single_client(event_listeners=[listener])
1256+
self.addCleanup(client.close)
1257+
coll = client[self.db.name].test_close_kills_cursors
1258+
1259+
# Add some test data.
1260+
docs_inserted = 1000
1261+
coll.insert_many([{"i": i} for i in range(docs_inserted)])
1262+
1263+
results.clear()
1264+
1265+
# Close the cursor while it's still open on the server.
1266+
cursor = coll.find().batch_size(10)
1267+
self.assertTrue(bool(next(cursor)))
1268+
self.assertLess(cursor.retrieved, docs_inserted)
1269+
cursor.close()
1270+
1271+
# Test that the cursor was closed.
1272+
self.assertEqual(1, len(results["started"]))
1273+
self.assertEqual("killCursors", results["started"][0].command_name)
1274+
self.assertEqual(1, len(results["succeeded"]))
1275+
self.assertEqual("killCursors", results["succeeded"][0].command_name)
1276+
12481277

12491278
if __name__ == "__main__":
12501279
unittest.main()

0 commit comments

Comments
 (0)