Skip to content

[3.8] bpo-36402: Fix threading._shutdown() race condition (GH-13948) #14050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 76 additions & 3 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,41 @@ def __del__(self):
self.assertEqual(data.splitlines(),
["GC: True True True"] * 2)

def test_finalization_shutdown(self):
# bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
# until Python thread states of all non-daemon threads get deleted.
#
# Test similar to SubinterpThreadingTests.test_threads_join_2(), but
# test the finalization of the main interpreter.
code = """if 1:
import os
import threading
import time
import random

def random_sleep():
seconds = random.random() * 0.010
time.sleep(seconds)

class Sleeper:
def __del__(self):
random_sleep()

tls = threading.local()

def f():
# Sleep a bit so that the thread is still running when
# Py_Finalize() is called.
random_sleep()
tls.x = Sleeper()
random_sleep()

threading.Thread(target=f).start()
random_sleep()
"""
rc, out, err = assert_python_ok("-c", code)
self.assertEqual(err, b"")

def test_tstate_lock(self):
# Test an implementation detail of Thread objects.
started = _thread.allocate_lock()
Expand Down Expand Up @@ -703,6 +738,30 @@ def callback():
finally:
sys.settrace(old_trace)

@cpython_only
def test_shutdown_locks(self):
for daemon in (False, True):
with self.subTest(daemon=daemon):
event = threading.Event()
thread = threading.Thread(target=event.wait, daemon=daemon)

# Thread.start() must add lock to _shutdown_locks,
# but only for non-daemon thread
thread.start()
tstate_lock = thread._tstate_lock
if not daemon:
self.assertIn(tstate_lock, threading._shutdown_locks)
else:
self.assertNotIn(tstate_lock, threading._shutdown_locks)

# unblock the thread and join it
event.set()
thread.join()

# Thread._stop() must remove tstate_lock from _shutdown_locks.
# Daemon threads must never add it to _shutdown_locks.
self.assertNotIn(tstate_lock, threading._shutdown_locks)


class ThreadJoinOnShutdown(BaseTestCase):

Expand Down Expand Up @@ -878,15 +937,22 @@ def test_threads_join(self):
self.addCleanup(os.close, w)
code = r"""if 1:
import os
import random
import threading
import time

def random_sleep():
seconds = random.random() * 0.010
time.sleep(seconds)

def f():
# Sleep a bit so that the thread is still running when
# Py_EndInterpreter is called.
time.sleep(0.05)
random_sleep()
os.write(%d, b"x")

threading.Thread(target=f).start()
random_sleep()
""" % (w,)
ret = test.support.run_in_subinterp(code)
self.assertEqual(ret, 0)
Expand All @@ -903,22 +969,29 @@ def test_threads_join_2(self):
self.addCleanup(os.close, w)
code = r"""if 1:
import os
import random
import threading
import time

def random_sleep():
seconds = random.random() * 0.010
time.sleep(seconds)

class Sleeper:
def __del__(self):
time.sleep(0.05)
random_sleep()

tls = threading.local()

def f():
# Sleep a bit so that the thread is still running when
# Py_EndInterpreter is called.
time.sleep(0.05)
random_sleep()
tls.x = Sleeper()
os.write(%d, b"x")

threading.Thread(target=f).start()
random_sleep()
""" % (w,)
ret = test.support.run_in_subinterp(code)
self.assertEqual(ret, 0)
Expand Down
49 changes: 40 additions & 9 deletions Lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,11 @@ def _newname(template="Thread-%d"):
_active = {} # maps thread id to Thread object
_limbo = {}
_dangling = WeakSet()
# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown()
# to wait until all Python thread states get deleted:
# see Thread._set_tstate_lock().
_shutdown_locks_lock = _allocate_lock()
_shutdown_locks = set()

# Main class for threads

Expand Down Expand Up @@ -903,6 +908,10 @@ def _set_tstate_lock(self):
self._tstate_lock = _set_sentinel()
self._tstate_lock.acquire()

if not self.daemon:
with _shutdown_locks_lock:
_shutdown_locks.add(self._tstate_lock)

def _bootstrap_inner(self):
try:
self._set_ident()
Expand Down Expand Up @@ -954,6 +963,9 @@ def _stop(self):
assert not lock.locked()
self._is_stopped = True
self._tstate_lock = None
if not self.daemon:
with _shutdown_locks_lock:
_shutdown_locks.discard(lock)

def _delete(self):
"Remove current thread from the dict of currently running threads."
Expand Down Expand Up @@ -1342,6 +1354,9 @@ def enumerate():
_main_thread = _MainThread()

def _shutdown():
"""
Wait until the Python thread state of all non-daemon threads get deleted.
"""
# Obscure: other threads may be waiting to join _main_thread. That's
# dubious, but some code does it. We can't wait for C code to release
# the main thread's tstate_lock - that won't happen until the interpreter
Expand All @@ -1350,23 +1365,33 @@ def _shutdown():
if _main_thread._is_stopped:
# _shutdown() was already called
return

# Main thread
tlock = _main_thread._tstate_lock
# The main thread isn't finished yet, so its thread state lock can't have
# been released.
assert tlock is not None
assert tlock.locked()
tlock.release()
_main_thread._stop()
t = _pickSomeNonDaemonThread()
while t:
t.join()
t = _pickSomeNonDaemonThread()

def _pickSomeNonDaemonThread():
for t in enumerate():
if not t.daemon and t.is_alive():
return t
return None
# Join all non-deamon threads
while True:
with _shutdown_locks_lock:
locks = list(_shutdown_locks)
_shutdown_locks.clear()

if not locks:
break

for lock in locks:
# mimick Thread.join()
lock.acquire()
lock.release()

# new threads can be spawned while we were waiting for the other
# threads to complete


def main_thread():
"""Return the main thread object.
Expand All @@ -1392,12 +1417,18 @@ def _after_fork():
# Reset _active_limbo_lock, in case we forked while the lock was held
# by another (non-forked) thread. http://bugs.python.org/issue874900
global _active_limbo_lock, _main_thread
global _shutdown_locks_lock, _shutdown_locks
_active_limbo_lock = _allocate_lock()

# fork() only copied the current thread; clear references to others.
new_active = {}
current = current_thread()
_main_thread = current

# reset _shutdown() locks: threads re-register their _tstate_lock below
_shutdown_locks_lock = _allocate_lock()
_shutdown_locks = set()

with _active_limbo_lock:
# Dangling thread instances must still have their locks reset,
# because someone may join() them.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fix a race condition at Python shutdown when waiting for threads. Wait until
the Python thread state of all non-daemon threads get deleted (join all
non-daemon threads), rather than just wait until non-daemon Python threads
complete.