from _multiprocessing import SemLock from threading import Thread import thread import time import sys import pytest @pytest.mark.skipif(sys.platform=='win32', reason='segfaults on win32') def test_notify_all(): """A low-level variation on test_notify_all() in lib-python's test_multiprocessing.py """ N_THREADS = 1000 lock = SemLock(0, 1, 1) results = [] def f(n): if lock.acquire(timeout=5.): results.append(n) lock.release() else: print("lock acquire timed out!") threads = [Thread(target=f, args=(i,)) for i in range(N_THREADS)] n_started = N_THREADS with lock: for t in threads: try: t.start() except thread.error: # too many threads for this system t.started = False n_started -= 1 else: t.started = True time.sleep(0.1) print("started %d threads" % n_started) for t in threads: if t.started: t.join() assert len(results) == n_started