Skip to content

gh-133485: Use interpreters.Interpreter in InterpreterPoolExecutor #133957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 2 additions & 17 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ Each worker's interpreter is isolated from all the other interpreters.
"Isolated" means each interpreter has its own runtime state and
operates completely independently. For example, if you redirect
:data:`sys.stdout` in one interpreter, it will not be automatically
redirected any other interpreter. If you import a module in one
redirected to any other interpreter. If you import a module in one
interpreter, it is not automatically imported in any other. You
would need to import the module separately in interpreter where
you need it. In fact, each module imported in an interpreter is
Expand All @@ -287,7 +287,7 @@ efficient alternative is to serialize with :mod:`pickle` and then send
the bytes over a shared :mod:`socket <socket>` or
:func:`pipe <os.pipe>`.

.. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), shared=None)
.. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously
using a pool of at most *max_workers* threads. Each thread runs
Expand All @@ -304,32 +304,17 @@ the bytes over a shared :mod:`socket <socket>` or
and *initargs* using :mod:`pickle` when sending them to the worker's
interpreter.

.. note::
Functions defined in the ``__main__`` module cannot be pickled
and thus cannot be used.

.. note::
The executor may replace uncaught exceptions from *initializer*
with :class:`~concurrent.futures.interpreter.ExecutionFailed`.

The optional *shared* argument is a :class:`dict` of objects that all
interpreters in the pool share. The *shared* items are added to each
interpreter's ``__main__`` module. Not all objects are shareable.
Shareable objects include the builtin singletons, :class:`str`
and :class:`bytes`, and :class:`memoryview`. See :pep:`734`
for more info.

Other caveats from parent :class:`ThreadPoolExecutor` apply here.

:meth:`~Executor.submit` and :meth:`~Executor.map` work like normal,
except the worker serializes the callable and arguments using
:mod:`pickle` when sending them to its interpreter. The worker
likewise serializes the return value when sending it back.

.. note::
Functions defined in the ``__main__`` module cannot be pickled
and thus cannot be used.

When a worker's current task raises an uncaught exception, the worker
always tries to preserve the exception as-is. If that is successful
then it also sets the ``__cause__`` to a corresponding
Expand Down
181 changes: 45 additions & 136 deletions Lib/concurrent/futures/interpreter.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,39 @@
"""Implements InterpreterPoolExecutor."""

import contextlib
import pickle
from concurrent import interpreters
import sys
import textwrap
from . import thread as _thread
import _interpreters
import _interpqueues
import traceback


class ExecutionFailed(_interpreters.InterpreterError):
"""An unhandled exception happened during execution."""

def __init__(self, excinfo):
msg = excinfo.formatted
if not msg:
if excinfo.type and excinfo.msg:
msg = f'{excinfo.type.__name__}: {excinfo.msg}'
else:
msg = excinfo.type.__name__ or excinfo.msg
super().__init__(msg)
self.excinfo = excinfo

def __str__(self):
def do_call(results, func, args, kwargs):
try:
return func(*args, **kwargs)
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
try:
formatted = self.excinfo.errdisplay
except Exception:
return super().__str__()
else:
return textwrap.dedent(f"""
{super().__str__()}

Uncaught in the interpreter:

{formatted}
""".strip())
results.put(exc)
except interpreters.NotShareableError:
# The exception is not shareable.
print('exception is not shareable:', file=sys.stderr)
traceback.print_exception(exc)
results.put(None)
raise # re-raise


class WorkerContext(_thread.WorkerContext):

@classmethod
def prepare(cls, initializer, initargs, shared):
def prepare(cls, initializer, initargs):
def resolve_task(fn, args, kwargs):
if isinstance(fn, str):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here. In the future, we could possibly
# borrow from multiprocessing to work around this.
task = (fn, args, kwargs)
data = pickle.dumps(task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like pickle and contextlib are now unused in this file. Importing pickle might be unnecessary when running only stateless functions?

return data
return task

if initializer is not None:
try:
Expand All @@ -62,68 +45,24 @@ def resolve_task(fn, args, kwargs):
else:
initdata = None
def create_context():
return cls(initdata, shared)
return cls(initdata)
return create_context, resolve_task

@classmethod
@contextlib.contextmanager
def _capture_exc(cls, resultsid):
try:
yield
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
_interpqueues.put(resultsid, (None, exc))
raise # re-raise

@classmethod
def _send_script_result(cls, resultsid):
_interpqueues.put(resultsid, (None, None))

@classmethod
def _call(cls, func, args, kwargs, resultsid):
with cls._capture_exc(resultsid):
res = func(*args or (), **kwargs or {})
# Send the result back.
with cls._capture_exc(resultsid):
_interpqueues.put(resultsid, (res, None))

@classmethod
def _call_pickled(cls, pickled, resultsid):
with cls._capture_exc(resultsid):
fn, args, kwargs = pickle.loads(pickled)
cls._call(fn, args, kwargs, resultsid)

def __init__(self, initdata, shared=None):
def __init__(self, initdata):
self.initdata = initdata
self.shared = dict(shared) if shared else None
self.interpid = None
self.resultsid = None
self.interp = None
self.results = None

def __del__(self):
if self.interpid is not None:
if self.interp is not None:
self.finalize()

def _exec(self, script):
assert self.interpid is not None
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
if excinfo is not None:
raise ExecutionFailed(excinfo)

def initialize(self):
assert self.interpid is None, self.interpid
self.interpid = _interpreters.create(reqrefs=True)
assert self.interp is None, self.interp
self.interp = interpreters.create()
try:
_interpreters.incref(self.interpid)

maxsize = 0
self.resultsid = _interpqueues.create(maxsize)

self._exec(f'from {__name__} import WorkerContext')

if self.shared:
_interpreters.set___main___attrs(
self.interpid, self.shared, restrict=True)
self.results = interpreters.create_queue(maxsize)

if self.initdata:
self.run(self.initdata)
Expand All @@ -132,53 +71,25 @@ def initialize(self):
raise # re-raise

def finalize(self):
interpid = self.interpid
resultsid = self.resultsid
self.resultsid = None
self.interpid = None
if resultsid is not None:
try:
_interpqueues.destroy(resultsid)
except _interpqueues.QueueNotFoundError:
pass
if interpid is not None:
try:
_interpreters.decref(interpid)
except _interpreters.InterpreterNotFoundError:
pass
interp = self.interp
results = self.results
self.results = None
self.interp = None
if results is not None:
del results
if interp is not None:
interp.close()

def run(self, task):
data = task
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'

try:
self._exec(script)
except ExecutionFailed as exc:
exc_wrapper = exc
else:
exc_wrapper = None

# Return the result, or raise the exception.
while True:
try:
obj = _interpqueues.get(self.resultsid)
except _interpqueues.QueueNotFoundError:
return self.interp.call(do_call, self.results, *task)
except interpreters.ExecutionFailed as wrapper:
# Wait for the exception data to show up.
exc = self.results.get()
if exc is None:
# The exception must have been not shareable.
raise # re-raise
except _interpqueues.QueueError:
continue
except ModuleNotFoundError:
# interpreters._queues doesn't exist, which means
# QueueEmpty doesn't. Act as though it does.
continue
else:
break
(res, exc), unboundop = obj
assert unboundop is None, unboundop
if exc is not None:
assert res is None, res
assert exc_wrapper is not None
raise exc from exc_wrapper
return res
raise exc from wrapper


class BrokenInterpreterPool(_thread.BrokenThreadPool):
Expand All @@ -192,11 +103,11 @@ class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
BROKEN = BrokenInterpreterPool

@classmethod
def prepare_context(cls, initializer, initargs, shared):
return WorkerContext.prepare(initializer, initargs, shared)
def prepare_context(cls, initializer, initargs):
return WorkerContext.prepare(initializer, initargs)

def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=(), shared=None):
initializer=None, initargs=()):
"""Initializes a new InterpreterPoolExecutor instance.

Args:
Expand All @@ -206,8 +117,6 @@ def __init__(self, max_workers=None, thread_name_prefix='',
initializer: A callable or script used to initialize
each worker interpreter.
initargs: A tuple of arguments to pass to the initializer.
shared: A mapping of shareabled objects to be inserted into
each worker interpreter.
"""
super().__init__(max_workers, thread_name_prefix,
initializer, initargs, shared=shared)
initializer, initargs)
4 changes: 4 additions & 0 deletions Lib/test/test_concurrent_futures/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
def init(x):
global INITIALIZER_STATUS
INITIALIZER_STATUS = x
# InterpreterPoolInitializerTest.test_initializer fails
# if we don't have a LOAD_GLOBAL. (It could be any global.)
# We will address this separately.
INITIALIZER_STATUS
Comment on lines +23 to +26
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markshannon, any ideas on why this is happening? It smells like a ceval bug, but it certainly could be something I've done wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@neonene neonene Jun 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seem to be related changes in inspect.getclosurevars() since 83ba8c2:

before:
ClosureVars(nonlocals={},
            globals={'INITIALIZER_STATUS': 'uninitialized'},
            builtins={}, unbound=set())
after:
ClosureVars(nonlocals={},
            globals={},
            builtins={}, unbound=set())
  • init() on main (without L26):
  3           RESUME                   0

  5           LOAD_FAST_BORROW         0 (x)
              STORE_GLOBAL             0 (INITIALIZER_STATUS)
              LOAD_CONST               0 (None)
              RETURN_VALUE
  • 3.3.5 (2014):
  5           0 LOAD_FAST                0 (x)
              3 STORE_GLOBAL             0 (INITIALIZER_STATUS)
              6 LOAD_CONST               0 (None)
              9 RETURN_VALUE


def get_init_status():
return INITIALIZER_STATUS
Expand Down
Loading
Loading