Skip to content

Commit 725da50

Browse files
gh-133485: Use interpreters.Interpreter in InterpreterPoolExecutor (gh-133957)
Most importantly, this resolves the issues with functions and types defined in __main__. It also expands the number of supported objects and simplifies the implementation.
1 parent 15f2bac commit 725da50

File tree

5 files changed

+266
-205
lines changed

5 files changed

+266
-205
lines changed

Doc/library/concurrent.futures.rst

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ Each worker's interpreter is isolated from all the other interpreters.
265265
"Isolated" means each interpreter has its own runtime state and
266266
operates completely independently. For example, if you redirect
267267
:data:`sys.stdout` in one interpreter, it will not be automatically
268-
redirected any other interpreter. If you import a module in one
268+
redirected to any other interpreter. If you import a module in one
269269
interpreter, it is not automatically imported in any other. You
270270
would need to import the module separately in interpreter where
271271
you need it. In fact, each module imported in an interpreter is
@@ -287,7 +287,7 @@ efficient alternative is to serialize with :mod:`pickle` and then send
287287
the bytes over a shared :mod:`socket <socket>` or
288288
:func:`pipe <os.pipe>`.
289289

290-
.. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), shared=None)
290+
.. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
291291

292292
A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously
293293
using a pool of at most *max_workers* threads. Each thread runs
@@ -304,32 +304,17 @@ the bytes over a shared :mod:`socket <socket>` or
304304
and *initargs* using :mod:`pickle` when sending them to the worker's
305305
interpreter.
306306

307-
.. note::
308-
Functions defined in the ``__main__`` module cannot be pickled
309-
and thus cannot be used.
310-
311307
.. note::
312308
The executor may replace uncaught exceptions from *initializer*
313309
with :class:`~concurrent.futures.interpreter.ExecutionFailed`.
314310

315-
The optional *shared* argument is a :class:`dict` of objects that all
316-
interpreters in the pool share. The *shared* items are added to each
317-
interpreter's ``__main__`` module. Not all objects are shareable.
318-
Shareable objects include the builtin singletons, :class:`str`
319-
and :class:`bytes`, and :class:`memoryview`. See :pep:`734`
320-
for more info.
321-
322311
Other caveats from parent :class:`ThreadPoolExecutor` apply here.
323312

324313
:meth:`~Executor.submit` and :meth:`~Executor.map` work like normal,
325314
except the worker serializes the callable and arguments using
326315
:mod:`pickle` when sending them to its interpreter. The worker
327316
likewise serializes the return value when sending it back.
328317

329-
.. note::
330-
Functions defined in the ``__main__`` module cannot be pickled
331-
and thus cannot be used.
332-
333318
When a worker's current task raises an uncaught exception, the worker
334319
always tries to preserve the exception as-is. If that is successful
335320
then it also sets the ``__cause__`` to a corresponding

Lib/concurrent/futures/interpreter.py

Lines changed: 45 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,39 @@
11
"""Implements InterpreterPoolExecutor."""
22

3-
import contextlib
4-
import pickle
3+
from concurrent import interpreters
4+
import sys
55
import textwrap
66
from . import thread as _thread
7-
import _interpreters
8-
import _interpqueues
7+
import traceback
98

109

11-
class ExecutionFailed(_interpreters.InterpreterError):
12-
"""An unhandled exception happened during execution."""
13-
14-
def __init__(self, excinfo):
15-
msg = excinfo.formatted
16-
if not msg:
17-
if excinfo.type and excinfo.msg:
18-
msg = f'{excinfo.type.__name__}: {excinfo.msg}'
19-
else:
20-
msg = excinfo.type.__name__ or excinfo.msg
21-
super().__init__(msg)
22-
self.excinfo = excinfo
23-
24-
def __str__(self):
10+
def do_call(results, func, args, kwargs):
11+
try:
12+
return func(*args, **kwargs)
13+
except BaseException as exc:
14+
# Send the captured exception out on the results queue,
15+
# but still leave it unhandled for the interpreter to handle.
2516
try:
26-
formatted = self.excinfo.errdisplay
27-
except Exception:
28-
return super().__str__()
29-
else:
30-
return textwrap.dedent(f"""
31-
{super().__str__()}
32-
33-
Uncaught in the interpreter:
34-
35-
{formatted}
36-
""".strip())
17+
results.put(exc)
18+
except interpreters.NotShareableError:
19+
# The exception is not shareable.
20+
print('exception is not shareable:', file=sys.stderr)
21+
traceback.print_exception(exc)
22+
results.put(None)
23+
raise # re-raise
3724

3825

3926
class WorkerContext(_thread.WorkerContext):
4027

4128
@classmethod
42-
def prepare(cls, initializer, initargs, shared):
29+
def prepare(cls, initializer, initargs):
4330
def resolve_task(fn, args, kwargs):
4431
if isinstance(fn, str):
4532
# XXX Circle back to this later.
4633
raise TypeError('scripts not supported')
4734
else:
48-
# Functions defined in the __main__ module can't be pickled,
49-
# so they can't be used here. In the future, we could possibly
50-
# borrow from multiprocessing to work around this.
5135
task = (fn, args, kwargs)
52-
data = pickle.dumps(task)
53-
return data
36+
return task
5437

5538
if initializer is not None:
5639
try:
@@ -62,68 +45,24 @@ def resolve_task(fn, args, kwargs):
6245
else:
6346
initdata = None
6447
def create_context():
65-
return cls(initdata, shared)
48+
return cls(initdata)
6649
return create_context, resolve_task
6750

68-
@classmethod
69-
@contextlib.contextmanager
70-
def _capture_exc(cls, resultsid):
71-
try:
72-
yield
73-
except BaseException as exc:
74-
# Send the captured exception out on the results queue,
75-
# but still leave it unhandled for the interpreter to handle.
76-
_interpqueues.put(resultsid, (None, exc))
77-
raise # re-raise
78-
79-
@classmethod
80-
def _send_script_result(cls, resultsid):
81-
_interpqueues.put(resultsid, (None, None))
82-
83-
@classmethod
84-
def _call(cls, func, args, kwargs, resultsid):
85-
with cls._capture_exc(resultsid):
86-
res = func(*args or (), **kwargs or {})
87-
# Send the result back.
88-
with cls._capture_exc(resultsid):
89-
_interpqueues.put(resultsid, (res, None))
90-
91-
@classmethod
92-
def _call_pickled(cls, pickled, resultsid):
93-
with cls._capture_exc(resultsid):
94-
fn, args, kwargs = pickle.loads(pickled)
95-
cls._call(fn, args, kwargs, resultsid)
96-
97-
def __init__(self, initdata, shared=None):
51+
def __init__(self, initdata):
9852
self.initdata = initdata
99-
self.shared = dict(shared) if shared else None
100-
self.interpid = None
101-
self.resultsid = None
53+
self.interp = None
54+
self.results = None
10255

10356
def __del__(self):
104-
if self.interpid is not None:
57+
if self.interp is not None:
10558
self.finalize()
10659

107-
def _exec(self, script):
108-
assert self.interpid is not None
109-
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
110-
if excinfo is not None:
111-
raise ExecutionFailed(excinfo)
112-
11360
def initialize(self):
114-
assert self.interpid is None, self.interpid
115-
self.interpid = _interpreters.create(reqrefs=True)
61+
assert self.interp is None, self.interp
62+
self.interp = interpreters.create()
11663
try:
117-
_interpreters.incref(self.interpid)
118-
11964
maxsize = 0
120-
self.resultsid = _interpqueues.create(maxsize)
121-
122-
self._exec(f'from {__name__} import WorkerContext')
123-
124-
if self.shared:
125-
_interpreters.set___main___attrs(
126-
self.interpid, self.shared, restrict=True)
65+
self.results = interpreters.create_queue(maxsize)
12766

12867
if self.initdata:
12968
self.run(self.initdata)
@@ -132,53 +71,25 @@ def initialize(self):
13271
raise # re-raise
13372

13473
def finalize(self):
135-
interpid = self.interpid
136-
resultsid = self.resultsid
137-
self.resultsid = None
138-
self.interpid = None
139-
if resultsid is not None:
140-
try:
141-
_interpqueues.destroy(resultsid)
142-
except _interpqueues.QueueNotFoundError:
143-
pass
144-
if interpid is not None:
145-
try:
146-
_interpreters.decref(interpid)
147-
except _interpreters.InterpreterNotFoundError:
148-
pass
74+
interp = self.interp
75+
results = self.results
76+
self.results = None
77+
self.interp = None
78+
if results is not None:
79+
del results
80+
if interp is not None:
81+
interp.close()
14982

15083
def run(self, task):
151-
data = task
152-
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
153-
15484
try:
155-
self._exec(script)
156-
except ExecutionFailed as exc:
157-
exc_wrapper = exc
158-
else:
159-
exc_wrapper = None
160-
161-
# Return the result, or raise the exception.
162-
while True:
163-
try:
164-
obj = _interpqueues.get(self.resultsid)
165-
except _interpqueues.QueueNotFoundError:
85+
return self.interp.call(do_call, self.results, *task)
86+
except interpreters.ExecutionFailed as wrapper:
87+
# Wait for the exception data to show up.
88+
exc = self.results.get()
89+
if exc is None:
90+
# The exception must have been not shareable.
16691
raise # re-raise
167-
except _interpqueues.QueueError:
168-
continue
169-
except ModuleNotFoundError:
170-
# interpreters._queues doesn't exist, which means
171-
# QueueEmpty doesn't. Act as though it does.
172-
continue
173-
else:
174-
break
175-
(res, exc), unboundop = obj
176-
assert unboundop is None, unboundop
177-
if exc is not None:
178-
assert res is None, res
179-
assert exc_wrapper is not None
180-
raise exc from exc_wrapper
181-
return res
92+
raise exc from wrapper
18293

18394

18495
class BrokenInterpreterPool(_thread.BrokenThreadPool):
@@ -192,11 +103,11 @@ class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
192103
BROKEN = BrokenInterpreterPool
193104

194105
@classmethod
195-
def prepare_context(cls, initializer, initargs, shared):
196-
return WorkerContext.prepare(initializer, initargs, shared)
106+
def prepare_context(cls, initializer, initargs):
107+
return WorkerContext.prepare(initializer, initargs)
197108

198109
def __init__(self, max_workers=None, thread_name_prefix='',
199-
initializer=None, initargs=(), shared=None):
110+
initializer=None, initargs=()):
200111
"""Initializes a new InterpreterPoolExecutor instance.
201112
202113
Args:
@@ -206,8 +117,6 @@ def __init__(self, max_workers=None, thread_name_prefix='',
206117
initializer: A callable or script used to initialize
207118
each worker interpreter.
208119
initargs: A tuple of arguments to pass to the initializer.
209-
shared: A mapping of shareabled objects to be inserted into
210-
each worker interpreter.
211120
"""
212121
super().__init__(max_workers, thread_name_prefix,
213-
initializer, initargs, shared=shared)
122+
initializer, initargs)

Lib/test/test_concurrent_futures/test_init.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
def init(x):
2121
global INITIALIZER_STATUS
2222
INITIALIZER_STATUS = x
23+
# InterpreterPoolInitializerTest.test_initializer fails
24+
# if we don't have a LOAD_GLOBAL. (It could be any global.)
25+
# We will address this separately.
26+
INITIALIZER_STATUS
2327

2428
def get_init_status():
2529
return INITIALIZER_STATUS

0 commit comments

Comments
 (0)