aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Lib/compileall.py10
-rw-r--r--Lib/concurrent/futures/process.py8
-rw-r--r--Lib/multiprocessing/resource_tracker.py10
-rw-r--r--Lib/test/test_compileall.py11
-rw-r--r--Lib/test/test_concurrent_futures.py19
-rw-r--r--Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst1
6 files changed, 49 insertions, 10 deletions
diff --git a/Lib/compileall.py b/Lib/compileall.py
index fe7f450c55e..672cb439718 100644
--- a/Lib/compileall.py
+++ b/Lib/compileall.py
@@ -84,12 +84,14 @@ def compile_dir(dir, maxlevels=None, ddir=None, force=False,
if workers < 0:
raise ValueError('workers must be greater or equal to 0')
if workers != 1:
+ # Check if this is a system where ProcessPoolExecutor can function.
+ from concurrent.futures.process import _check_system_limits
try:
- # Only import when needed, as low resource platforms may
- # fail to import it
- from concurrent.futures import ProcessPoolExecutor
- except ImportError:
+ _check_system_limits()
+ except NotImplementedError:
workers = 1
+ else:
+ from concurrent.futures import ProcessPoolExecutor
if maxlevels is None:
maxlevels = sys.getrecursionlimit()
files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels)
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 90bc98bf2ec..764719859f7 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -533,6 +533,14 @@ def _check_system_limits():
raise NotImplementedError(_system_limited)
_system_limits_checked = True
try:
+ import multiprocessing.synchronize
+ except ImportError:
+ _system_limited = (
+ "This Python build lacks multiprocessing.synchronize, usually due "
+ "to named semaphores being unavailable on this platform."
+ )
+ raise NotImplementedError(_system_limited)
+ try:
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
except (AttributeError, ValueError):
# sysconf not available or setting not available
diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py
index c9bfa9b82b6..cc42dbdda05 100644
--- a/Lib/multiprocessing/resource_tracker.py
+++ b/Lib/multiprocessing/resource_tracker.py
@@ -37,8 +37,16 @@ if os.name == 'posix':
import _multiprocessing
import _posixshmem
+ # Use sem_unlink() to clean up named semaphores.
+ #
+ # sem_unlink() may be missing if the Python build process detected the
+ # absence of POSIX named semaphores. In that case, no named semaphores were
+ # ever opened, so no cleanup would be necessary.
+ if hasattr(_multiprocessing, 'sem_unlink'):
+ _CLEANUP_FUNCS.update({
+ 'semaphore': _multiprocessing.sem_unlink,
+ })
_CLEANUP_FUNCS.update({
- 'semaphore': _multiprocessing.sem_unlink,
'shared_memory': _posixshmem.shm_unlink,
})
diff --git a/Lib/test/test_compileall.py b/Lib/test/test_compileall.py
index be1149a87fa..fa24b3c5a11 100644
--- a/Lib/test/test_compileall.py
+++ b/Lib/test/test_compileall.py
@@ -16,10 +16,14 @@ import time
import unittest
from unittest import mock, skipUnless
+from concurrent.futures import ProcessPoolExecutor
try:
- from concurrent.futures import ProcessPoolExecutor
+ # compileall relies on ProcessPoolExecutor if ProcessPoolExecutor exists
+ # and it can function.
+ from concurrent.futures.process import _check_system_limits
+ _check_system_limits()
_have_multiprocessing = True
-except ImportError:
+except NotImplementedError:
_have_multiprocessing = False
from test import support
@@ -188,6 +192,7 @@ class CompileallTestsBase:
self.assertRegex(line, r'Listing ([^WindowsPath|PosixPath].*)')
self.assertTrue(os.path.isfile(self.bc_path))
+ @skipUnless(_have_multiprocessing, "requires multiprocessing")
@mock.patch('concurrent.futures.ProcessPoolExecutor')
def test_compile_pool_called(self, pool_mock):
compileall.compile_dir(self.directory, quiet=True, workers=5)
@@ -198,11 +203,13 @@ class CompileallTestsBase:
"workers must be greater or equal to 0"):
compileall.compile_dir(self.directory, workers=-1)
+ @skipUnless(_have_multiprocessing, "requires multiprocessing")
@mock.patch('concurrent.futures.ProcessPoolExecutor')
def test_compile_workers_cpu_count(self, pool_mock):
compileall.compile_dir(self.directory, quiet=True, workers=0)
self.assertEqual(pool_mock.call_args[1]['max_workers'], None)
+ @skipUnless(_have_multiprocessing, "requires multiprocessing")
@mock.patch('concurrent.futures.ProcessPoolExecutor')
@mock.patch('compileall.compile_file')
def test_compile_one_worker(self, compile_file_mock, pool_mock):
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index a182b14fb9b..99651f5f4ed 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -4,8 +4,6 @@ from test.support import threading_helper
# Skip tests if _multiprocessing wasn't built.
import_helper.import_module('_multiprocessing')
-# Skip tests if sem_open implementation is broken.
-support.skip_if_broken_multiprocessing_synchronize()
from test.support import hashlib_helper
from test.support.script_helper import assert_python_ok
@@ -27,7 +25,7 @@ from concurrent import futures
from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
BrokenExecutor)
-from concurrent.futures.process import BrokenProcessPool
+from concurrent.futures.process import BrokenProcessPool, _check_system_limits
from multiprocessing import get_context
import multiprocessing.process
@@ -161,6 +159,10 @@ class ProcessPoolForkMixin(ExecutorMixin):
ctx = "fork"
def get_context(self):
+ try:
+ _check_system_limits()
+ except NotImplementedError:
+ self.skipTest("ProcessPoolExecutor unavailable on this system")
if sys.platform == "win32":
self.skipTest("require unix system")
return super().get_context()
@@ -170,12 +172,23 @@ class ProcessPoolSpawnMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
ctx = "spawn"
+ def get_context(self):
+ try:
+ _check_system_limits()
+ except NotImplementedError:
+ self.skipTest("ProcessPoolExecutor unavailable on this system")
+ return super().get_context()
+
class ProcessPoolForkserverMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
ctx = "forkserver"
def get_context(self):
+ try:
+ _check_system_limits()
+ except NotImplementedError:
+ self.skipTest("ProcessPoolExecutor unavailable on this system")
if sys.platform == "win32":
self.skipTest("require unix system")
return super().get_context()
diff --git a/Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst b/Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst
new file mode 100644
index 00000000000..b92dcdd00af
--- /dev/null
+++ b/Misc/NEWS.d/next/Core and Builtins/2020-05-19-22-10-05.bpo-40692.ajEhrR.rst
@@ -0,0 +1 @@
+In the :class:`concurrent.futures.ProcessPoolExecutor`, validate that :func:`multiprocess.synchronize` is available on a given platform and rely on that check in the :mod:`concurrent.futures` test suite so we can run tests that are unrelated to :class:`ProcessPoolExecutor` on those platforms.