diff --git a/.azure_pipeline.yml b/.azure_pipeline.yml index 1e2648cc..1b7b884a 100644 --- a/.azure_pipeline.yml +++ b/.azure_pipeline.yml @@ -109,3 +109,51 @@ jobs: curl -s https://codecov.io/bash | bash displayName: 'Upload to codecov' condition: and(succeeded(), ne(variables['joblib.tests'], 'true')) + + +- job: 'test_frozen_loky' + strategy: + matrix: + + windows-py310: + imageName: windows-latest + python.version: "3.10" + macos-py310: + imageName: "macos-latest" + python.version: "3.10" + linux-py310: + imageName: "ubuntu-latest" + python.version: "3.10" + pool: + vmImage: $(imageName) + variables: + JUNITXML: 'test-data.xml' + PYINSTALLER_TESTS: "true" + steps: + - task: UsePythonVersion@0 + inputs: + versionSpec: '$(python.version)' + displayName: 'Use Python $(python.version)' + + # azure-pipelines unpredictably switches between Git\bin\bash and + # Git\usr\bin\bash when running a bash script inside Windows environments. + # The latter may use wrong bash commands, resulting in errors when codecov + # tries to upload the coverage results. + - bash: echo "##vso[task.prependpath]C:/Program Files/Git/bin" + displayName: 'Override Git bash shell for Windows' + condition: eq(variables['Agent.OS'], 'Windows_NT') + + - script: | + bash continuous_integration/runtests.sh + displayName: 'Test loky with PyInstaller' + + - task: PublishTestResults@2 + inputs: + testResultsFiles: '$(JUNITXML)' + displayName: 'Publish Test Results' + condition: succeededOrFailed() + + - bash: | + curl -s https://codecov.io/bash | bash + displayName: 'Upload to codecov' + condition: and(succeeded(), ne(variables['joblib.tests'], 'true')) diff --git a/continuous_integration/runtests.sh b/continuous_integration/runtests.sh index 3e478a9a..617392d1 100755 --- a/continuous_integration/runtests.sh +++ b/continuous_integration/runtests.sh @@ -23,6 +23,24 @@ if [ "$JOBLIB_TESTS" = "true" ]; then cp "$BUILD_SOURCESDIRECTORY"/continuous_integration/copy_loky.sh $JOBLIB/externals (cd $JOBLIB/externals && bash copy_loky.sh "$BUILD_SOURCESDIRECTORY") pytest -vl --ignore $JOBLIB/externals --pyargs joblib +elif [ "$PYINSTALLER_TESTS" = "true" ]; then + python -m venv venv/ + if [ -d "./venv/Scripts" ]; then + source ./venv/Scripts/activate + else + source ./venv/bin/activate + fi + which python + pip install pytest pytest-timeout psutil coverage pyinstaller + pip install . + python -c "import loky; print('loky.cpu_count():', loky.cpu_count())" + python -c "import os; print('os.cpu_count():', os.cpu_count())" + export COVERAGE_PROCESS_START=`pwd`/.coveragerc + python continuous_integration/install_coverage_subprocess_pth.py + pytest -vl --maxfail=5 --timeout=60 -k pyinstaller --junitxml="${JUNITXML}" + coverage combine --quiet --append + coverage xml -i # language agnostic report for the codecov upload script + coverage report # display the report as text on stdout else # Make sure that we have the python docker image cached locally to avoid # a timeout in a test that needs it. diff --git a/loky/__init__.py b/loky/__init__.py index 2d5c0810..6aece7a8 100644 --- a/loky/__init__.py +++ b/loky/__init__.py @@ -16,6 +16,7 @@ from ._base import Future from .backend.context import cpu_count +from .backend.spawn import freeze_support from .backend.reduction import set_loky_pickler from .reusable_executor import get_reusable_executor from .cloudpickle_wrapper import wrap_non_picklable_objects @@ -23,21 +24,26 @@ __all__ = [ - "get_reusable_executor", - "cpu_count", - "wait", - "as_completed", - "Future", + # Constants + "ALL_COMPLETED", + "FIRST_COMPLETED", + "FIRST_EXCEPTION", + # Classes "Executor", + "Future", "ProcessPoolExecutor", + # Functions + "as_completed", + "cpu_count", + "freeze_support", + "get_reusable_executor", + "set_loky_pickler", + "wait", + "wrap_non_picklable_objects", + # Errors "BrokenProcessPool", "CancelledError", "TimeoutError", - "FIRST_COMPLETED", - "FIRST_EXCEPTION", - "ALL_COMPLETED", - "wrap_non_picklable_objects", - "set_loky_pickler", ] diff --git a/loky/backend/fork_exec.py b/loky/backend/fork_exec.py index 2353c42f..bfb38a8e 100644 --- a/loky/backend/fork_exec.py +++ b/loky/backend/fork_exec.py @@ -35,9 +35,12 @@ def fork_exec(cmd, keep_fds, env=None): env = env or {} child_env = {**os.environ, **env} + # make sure fds are inheritable + [os.set_inheritable(fd, True) for fd in keep_fds] + pid = os.fork() if pid == 0: # pragma: no cover close_fds(keep_fds) - os.execve(sys.executable, cmd, child_env) + os.execve(cmd[0], cmd, child_env) else: return pid diff --git a/loky/backend/popen_loky_posix.py b/loky/backend/popen_loky_posix.py index 74395be0..1f312d20 100644 --- a/loky/backend/popen_loky_posix.py +++ b/loky/backend/popen_loky_posix.py @@ -6,9 +6,8 @@ import os import sys import signal -import pickle from io import BytesIO -from multiprocessing import util, process +from multiprocessing import util from multiprocessing.connection import wait from multiprocessing.context import set_spawning_popen @@ -31,6 +30,19 @@ def detach(self): return self.fd +# +# Backward compat for pypy and python<=3.7 +# XXX: to remove once 3.7 is not supported anymore. +# +if not hasattr(util, "close_fds"): + + def _close_fds(*fds): + for fd in fds: + os.close(fd) + + util.close_fds = _close_fds + + # # Start child process using subprocess.Popen # @@ -49,7 +61,7 @@ def __init__(self, process_obj): def duplicate_for_child(self, fd): self._fds.append(fd) - return reduction._mk_inheritable(fd) + return fd def poll(self, flag=os.WNOHANG): if self.returncode is None: @@ -90,7 +102,6 @@ def terminate(self): raise def _launch(self, process_obj): - tracker_fd = resource_tracker._resource_tracker.getfd() fp = BytesIO() @@ -109,15 +120,12 @@ def _launch(self, process_obj): try: parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() - # for fd in self._fds: - # _mk_inheritable(fd) - - cmd_python = [sys.executable] - cmd_python += ["-m", self.__module__] - cmd_python += ["--process-name", str(process_obj.name)] - cmd_python += ["--pipe", str(reduction._mk_inheritable(child_r))] - reduction._mk_inheritable(child_w) - reduction._mk_inheritable(tracker_fd) + + cmd_python = spawn.get_command_line( + pipe_handle=child_r, + parent_pid=os.getpid(), + process_name=process_obj.name, + ) self._fds += [child_r, child_w, tracker_fd] if sys.version_info >= (3, 8) and os.name == "posix": mp_tracker_fd = prep_data["mp_tracker_args"]["fd"] @@ -129,17 +137,26 @@ def _launch(self, process_obj): util.debug( f"launched python with pid {pid} and cmd:\n{cmd_python}" ) - self.sentinel = parent_r + # Write the preparation data in the queue in a backward compatible + # way. + # XXX: can this be simplify now that we only support python3.7+ method = "getbuffer" if not hasattr(fp, method): method = "getvalue" - with os.fdopen(parent_w, "wb") as f: + with os.fdopen(parent_w, "wb", closefd=False) as f: f.write(getattr(fp, method)()) + + # Store the process's information self.pid = pid + self.sentinel = parent_r finally: - if parent_r is not None: - util.Finalize(self, os.close, (parent_r,)) + fds_to_close = [] + for fd in (parent_r, parent_w): + if fd is not None: + fds_to_close.append(fd) + self.finalizer = util.Finalize(self, util.close_fds, fds_to_close) + for fd in (child_r, child_w): if fd is not None: os.close(fd) @@ -147,47 +164,3 @@ def _launch(self, process_obj): @staticmethod def thread_is_spawning(): return True - - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser("Command line parser") - parser.add_argument( - "--pipe", type=int, required=True, help="File handle for the pipe" - ) - parser.add_argument( - "--process-name", - type=str, - default=None, - help="Identifier for debugging purpose", - ) - - args = parser.parse_args() - - info = {} - exitcode = 1 - try: - with os.fdopen(args.pipe, "rb") as from_parent: - process.current_process()._inheriting = True - try: - prep_data = pickle.load(from_parent) - spawn.prepare(prep_data) - process_obj = pickle.load(from_parent) - finally: - del process.current_process()._inheriting - - exitcode = process_obj._bootstrap() - except Exception: - print("\n\n" + "-" * 80) - print(f"{args.process_name} failed with traceback: ") - print("-" * 80) - import traceback - - print(traceback.format_exc()) - print("\n" + "-" * 80) - finally: - if from_parent is not None: - from_parent.close() - - sys.exit(exitcode) diff --git a/loky/backend/popen_loky_win32.py b/loky/backend/popen_loky_win32.py index 2edf8025..26027232 100644 --- a/loky/backend/popen_loky_win32.py +++ b/loky/backend/popen_loky_win32.py @@ -2,9 +2,10 @@ import sys import msvcrt import _winapi -from pickle import load -from multiprocessing import process, util + +from multiprocessing import util from multiprocessing.context import set_spawning_popen +from multiprocessing.popen_spawn_win32 import _close_handles from multiprocessing.popen_spawn_win32 import Popen as _Popen from . import reduction, spawn @@ -12,6 +13,11 @@ __all__ = ["Popen"] +POPEN_FLAG = 0 +if spawn.OPEN_CONSOLE_FOR_SUBPROCESSES: + POPEN_FLAG = _winapi.CREATE_NEW_CONSOLE + + # # # @@ -65,9 +71,11 @@ def __init__(self, process_obj): # terminated before it could steal the handle from the parent process. rhandle, whandle = _winapi.CreatePipe(None, 0) wfd = msvcrt.open_osfhandle(whandle, 0) - cmd = get_command_line(parent_pid=os.getpid(), pipe_handle=rhandle) - python_exe = spawn.get_executable() + cmd = spawn.get_command_line( + pipe_handle=rhandle, parent_pid=os.getpid() + ) + python_exe = cmd[0] # copy the environment variables to set in the child process child_env = {**os.environ, **process_obj.env} @@ -79,7 +87,6 @@ def __init__(self, process_obj): child_env["__PYVENV_LAUNCHER__"] = sys.executable cmd = " ".join(f'"{x}"' for x in cmd) - with open(wfd, "wb") as to_child: # start process try: @@ -115,59 +122,3 @@ def __init__(self, process_obj): reduction.dump(process_obj, to_child) finally: set_spawning_popen(None) - - -def get_command_line(pipe_handle, parent_pid, **kwds): - """Returns prefix of command line used for spawning a child process.""" - if getattr(sys, "frozen", False): - return [sys.executable, "--multiprocessing-fork", pipe_handle] - else: - prog = ( - "from loky.backend.popen_loky_win32 import main; " - f"main(pipe_handle={pipe_handle}, parent_pid={parent_pid})" - ) - opts = util._args_from_interpreter_flags() - return [ - spawn.get_executable(), - *opts, - "-c", - prog, - "--multiprocessing-fork", - ] - - -def is_forking(argv): - """Return whether commandline indicates we are forking.""" - if len(argv) >= 2 and argv[1] == "--multiprocessing-fork": - return True - else: - return False - - -def main(pipe_handle, parent_pid=None): - """Run code specified by data received over pipe.""" - assert is_forking(sys.argv), "Not forking" - - if parent_pid is not None: - source_process = _winapi.OpenProcess( - _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE, False, parent_pid - ) - else: - source_process = None - new_handle = reduction.duplicate( - pipe_handle, source_process=source_process - ) - fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) - parent_sentinel = source_process - - with os.fdopen(fd, "rb", closefd=True) as from_parent: - process.current_process()._inheriting = True - try: - preparation_data = load(from_parent) - spawn.prepare(preparation_data, parent_sentinel) - self = load(from_parent) - finally: - del process.current_process()._inheriting - - exitcode = self._bootstrap(parent_sentinel) - sys.exit(exitcode) diff --git a/loky/backend/process.py b/loky/backend/process.py index 35625509..ef94c253 100644 --- a/loky/backend/process.py +++ b/loky/backend/process.py @@ -44,6 +44,13 @@ def _Popen(process_obj): from .popen_loky_posix import Popen return Popen(process_obj) + def _bootstrap(self, parent_sentinel=None): + try: + super()._bootstrap(parent_sentinel=parent_sentinel) + except TypeError: + # Compat for pypy that doesn't accept the parent_sentinel argument + super()._bootstrap() + class LokyInitMainProcess(LokyProcess): _start_method = "loky_init_main" diff --git a/loky/backend/resource_tracker.py b/loky/backend/resource_tracker.py index 25204a7a..f5644a22 100644 --- a/loky/backend/resource_tracker.py +++ b/loky/backend/resource_tracker.py @@ -49,15 +49,16 @@ import signal import warnings import threading -from _multiprocessing import sem_unlink from multiprocessing import util +from _multiprocessing import sem_unlink from . import spawn if sys.platform == "win32": import _winapi import msvcrt - from multiprocessing.reduction import duplicate + from .spawn import duplicate + from .spawn import duplicate_in_child_process __all__ = ["ensure_running", "register", "unregister"] @@ -116,25 +117,27 @@ def ensure_running(self): "leak." ) - fds_to_pass = [] - try: - fds_to_pass.append(sys.stderr.fileno()) - except Exception: - pass - - r, w = os.pipe() if sys.platform == "win32": - _r = duplicate(msvcrt.get_osfhandle(r), inheritable=True) - os.close(r) - r = _r + r, whandle = _winapi.CreatePipe(None, 0) + w = msvcrt.open_osfhandle(whandle, 0) + fds_to_pass = [r] + else: + r, w = os.pipe() + fds_to_pass = [r] + try: + fds_to_pass.append(sys.stderr.fileno()) + except Exception: + pass - cmd = f"from {main.__module__} import main; main({r}, {VERBOSE})" try: - fds_to_pass.append(r) # process will out live us, so no need to wait on pid - exe = spawn.get_executable() - args = [exe, *util._args_from_interpreter_flags(), "-c", cmd] - util.debug(f"launching resource tracker: {args}") + cmd = spawn.get_command_line( + main_prog=main, + pipe_handle=r, + parent_pid=os.getpid(), + verbose=int(VERBOSE), + ) + util.debug(f"launching resource tracker: {cmd}") # bpo-33613: Register a signal mask that will block the # signals. This signal mask will be inherited by the child # that is going to be spawned and will protect the child from a @@ -146,7 +149,7 @@ def ensure_running(self): signal.pthread_sigmask( signal.SIG_BLOCK, _IGNORED_SIGNALS ) - pid = spawnv_passfds(exe, args, fds_to_pass) + pid = spawnv_passfds(cmd, fds_to_pass) finally: if _HAVE_SIGMASK: signal.pthread_sigmask( @@ -159,9 +162,7 @@ def ensure_running(self): self._fd = w self._pid = pid finally: - if sys.platform == "win32": - _winapi.CloseHandle(r) - else: + if sys.platform != "win32": os.close(r) def _check_alive(self): @@ -206,8 +207,20 @@ def _send(self, cmd, name, rtype): getfd = _resource_tracker.getfd -def main(fd, verbose=0): +def main(pipe_handle, parent_pid, verbose=0): """Run resource tracker.""" + # Make sure the arguments have the right type as they are + # passed as strings through the command line. + pipe_handle, parent_pid = int(pipe_handle), int(parent_pid) + verbose = int(verbose) + if sys.platform == "win32": + handle, parent_sentinel = duplicate_in_child_process( + pipe_handle, parent_pid + ) + fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) + else: + fd = pipe_handle + # protect the process from ^C and "killall python" etc if verbose: util.log_to_stderr(level=util.DEBUG) @@ -230,9 +243,7 @@ def main(fd, verbose=0): registry = {rtype: {} for rtype in _CLEANUP_FUNCS.keys()} try: # keep track of registered/unregistered resources - if sys.platform == "win32": - fd = msvcrt.open_osfhandle(fd, os.O_RDONLY) - with open(fd, "rb") as f: + with open(fd, "rb", closefd=True) as f: while True: line = f.readline() if line == b"": # EOF @@ -274,8 +285,8 @@ def main(fd, verbose=0): del registry[rtype][name] if verbose: util.debug( - f"[ResourceTracker] unregister {name} {rtype}: " - f"registry({len(registry)})" + f"[ResourceTracker] unregister {name} {rtype}:" + f" registry({len(registry)})" ) elif cmd == "MAYBE_UNLINK": registry[rtype][name] -= 1 @@ -353,26 +364,28 @@ def _unlink_resources(rtype_registry, rtype): # -def spawnv_passfds(path, args, passfds): +def spawnv_passfds(cmd, passfds): + """Spawn the resource tracker in a platform specific way. + + For posix platforms, make the passfds inheritable and use fork_exec. + + For windows platforms, passfds is only used to clean up the handles in + case of failure, the inheritance of the handles will be taken care in the + child process through _winapi.OpenProcess. + """ passfds = sorted(passfds) if sys.platform != "win32": - errpipe_read, errpipe_write = os.pipe() - try: - from .reduction import _mk_inheritable - from .fork_exec import fork_exec - - _pass = [_mk_inheritable(fd) for fd in passfds] - return fork_exec(args, _pass) - finally: - os.close(errpipe_read) - os.close(errpipe_write) + from .fork_exec import fork_exec + + return fork_exec(cmd, passfds) else: - cmd = " ".join(f'"{x}"' for x in args) + exe = cmd[0] + cmd = " ".join(f'"{x}"' for x in cmd) try: _, ht, pid, _ = _winapi.CreateProcess( - path, cmd, None, None, True, 0, None, None, None + exe, cmd, None, None, False, 0, None, None, None ) _winapi.CloseHandle(ht) + return pid except BaseException: - pass - return pid + _winapi.CloseHandle(passfds[0]) diff --git a/loky/backend/spawn.py b/loky/backend/spawn.py index d011c398..5a493416 100644 --- a/loky/backend/spawn.py +++ b/loky/backend/spawn.py @@ -11,7 +11,17 @@ import runpy import textwrap import types +import pickle +import importlib + from multiprocessing import process, util +from multiprocessing import freeze_support as _freeze_support_mp + + +# If set to True, the child process will open a console that can be used to +# get access to debugger. This is useful for debugging the child process +# step-by-step. +OPEN_CONSOLE_FOR_SUBPROCESSES = False if sys.platform != "win32": @@ -24,6 +34,25 @@ WINEXE = sys.platform == "win32" and getattr(sys, "frozen", False) WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") + def duplicate_in_child_process(handle, parent_pid=None): + """Duplicate a handle in child process given its parent pid. + + Returns a file descriptor for the handle and the parent process. + """ + import _winapi + + if parent_pid is not None: + source_process = _winapi.OpenProcess( + _winapi.SYNCHRONIZE | _winapi.PROCESS_DUP_HANDLE, + False, + parent_pid, + ) + else: + source_process = None + new_handle = duplicate(handle, source_process=source_process) + return new_handle, source_process + + if WINSERVICE: _python_exe = os.path.join(sys.exec_prefix, "python.exe") else: @@ -248,3 +277,94 @@ def _fixup_main_from_path(main_path): main_content = runpy.run_path(main_path, run_name="__mp_main__") main_module.__dict__.update(main_content) sys.modules["__main__"] = sys.modules["__mp_main__"] = main_module + + +def main(pipe_handle, parent_pid, process_name=None): + # arguments are passed as strings, convert them back to int. + pipe_handle, parent_pid = int(pipe_handle), int(parent_pid) + if sys.platform == "win32": + handle, parent_sentinel = duplicate_in_child_process( + pipe_handle, parent_pid + ) + fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) + else: + fd = pipe_handle + parent_sentinel = os.dup(pipe_handle) + + exitcode = 1 + try: + exitcode = _main(fd, parent_sentinel=parent_sentinel) + except Exception: + print("\n\n" + "-" * 80) + print(f"{process_name} failed with traceback: ") + print("-" * 80) + import traceback + + print(traceback.format_exc()) + print("\n" + "-" * 80) + finally: + sys.exit(exitcode) + + +def _main(fd, parent_sentinel): + with os.fdopen(fd, "rb", closefd=True) as from_parent: + process.current_process()._inheriting = True + try: + preparation_data = pickle.load(from_parent) + prepare(preparation_data) + self = pickle.load(from_parent) + finally: + del process.current_process()._inheriting + return self._bootstrap(parent_sentinel) + + +def get_command_line(main_prog=main, **kwargs): + """ + Returns a command line used for spawning a child process. + This command provides supports for frozen executables and + only works with main_prog named main. + """ + + assert main_prog.__name__ == "main" + + if getattr(sys, "frozen", False): + # For frozen executables, add flag '--multiprocessin-fork' to notify, + # the `freeze_support` function and pass the arguments as 'key=value' + # so they can be used to call main. + list_kwargs = [f"{k}={v}" for k, v in kwargs.items()] + argv = [ + sys.executable, + "--loky-fork", + main_prog.__module__, + *list_kwargs, + ] + else: + # For non-frozen executables, directly call `main_prog` with + # the arguments passed as strings. + list_kwargs = [f'{k}="{v}"' for k, v in kwargs.items()] + prog = ( + f"from {main_prog.__module__} import main; " + f'main({", ".join(list_kwargs)})' + ) + opts = util._args_from_interpreter_flags() + argv = [get_executable(), *opts, "-c", prog] + return argv + + +def freeze_support(): + """Run code for the child workers when necessary. + This helper allows the frozen executable to call the code for the child + workers when not in the main process. + It should be called right after the beginning of the programme, to + avoid recursive process spawning. + """ + if len(sys.argv) >= 2 and sys.argv[1] == "--loky-fork": + module_main = sys.argv[2] + main = importlib.import_module(module_main).main + kwargs = {} + for p in sys.argv[3:]: + k, v = p.split("=") + kwargs[k] = v + exitcode = main(**kwargs) + sys.exit(exitcode) + _freeze_support_mp() diff --git a/loky/backend/synchronize.py b/loky/backend/synchronize.py index 18db3e34..f7cfe0fa 100644 --- a/loky/backend/synchronize.py +++ b/loky/backend/synchronize.py @@ -19,7 +19,7 @@ import _multiprocessing from time import time as _time from multiprocessing import process, util -from multiprocessing.context import assert_spawning +from multiprocessing import context from . import resource_tracker @@ -59,7 +59,6 @@ class SemLock: - _rand = tempfile._RandomNameSequence() def __init__(self, kind, value, maxvalue, name=None): @@ -122,9 +121,13 @@ def __exit__(self, *args): return self._semlock.release() def __getstate__(self): - assert_spawning(self) + context.assert_spawning(self) sl = self._semlock h = sl.handle + if sys.platform == "win32": + h = context.get_spawning_popen().duplicate_for_child(sl.handle) + else: + h = sl.handle return (h, sl.kind, sl.maxvalue, sl.name) def __setstate__(self, state): @@ -249,7 +252,7 @@ def __init__(self, lock=None): self._make_methods() def __getstate__(self): - assert_spawning(self) + context.assert_spawning(self) return ( self._lock, self._sleeping_count, diff --git a/loky/process_executor.py b/loky/process_executor.py index 4cb9741c..17d5bc4d 100644 --- a/loky/process_executor.py +++ b/loky/process_executor.py @@ -146,7 +146,6 @@ class _ExecutorFlags: """ def __init__(self, shutdown_lock): - self.shutdown = False self.broken = None self.kill_workers = False @@ -259,7 +258,6 @@ def _rebuild_exc(exc, tb): class _WorkItem: - __slots__ = ["future", "fn", "args", "kwargs"] def __init__(self, future, fn, args, kwargs): @@ -1020,7 +1018,6 @@ class ShutdownExecutorError(RuntimeError): class ProcessPoolExecutor(Executor): - _at_exit = None def __init__( diff --git a/tests/_test_process_executor.py b/tests/_test_process_executor.py index f58af9f8..15eb54c3 100644 --- a/tests/_test_process_executor.py +++ b/tests/_test_process_executor.py @@ -192,7 +192,6 @@ def test_processes_terminate(self): p.join() def test_processes_terminate_on_executor_gc(self): - results = self.executor.map(sleep_and_return, [0.1] * 10, range(10)) assert len(self.executor._processes) == self.worker_count processes = self.executor._processes diff --git a/tests/test_loky_backend.py b/tests/test_loky_backend.py index 7f5b626a..8b8f3000 100644 --- a/tests/test_loky_backend.py +++ b/tests/test_loky_backend.py @@ -71,7 +71,6 @@ def teardown_class(cls): kill_process_tree(child_process) def test_current(self): - current = self.current_process() authkey = current.authkey @@ -83,7 +82,6 @@ def test_current(self): assert current.exitcode is None def test_daemon_argument(self): - # By default uses the current process's daemon flag. proc0 = self.Process(target=self._test_process) assert proc0.daemon == self.current_process().daemon @@ -296,7 +294,6 @@ def _test_terminate(cls, event): time.sleep(100) def test_terminate(self): - manager = self.Manager() event = manager.Event() @@ -481,7 +478,6 @@ def _check_fds(self, pid, w): n_pipe = 0 named_sem = [] for fd, t, name in zip(lines[::3], lines[1::3], lines[2::3]): - # Check if fd is a standard IO file. For python 3.x stdin # should be closed. is_std = fd in ["f1", "f2"] @@ -509,10 +505,11 @@ def _check_fds(self, pid, w): # - one pipe for communication with main process # - loky's resource_tracker pipe # - the Connection pipe + # - the pipe used for the parent_sentinel # - additionally, on posix + Python 3.8: multiprocessing's # resource_tracker pipe - if sys.version_info >= (3, 8) and os.name == "posix": - n_expected_pipes = 4 + if sys.version_info >= (3, 8): + n_expected_pipes = 5 if os.name == "posix" else 4 else: n_expected_pipes = 3 msg = ( @@ -552,7 +549,6 @@ def test_sync_object_handling(self): ) named_sem = [] try: - p.start() assert started.wait(5), "The process took too long to start" r.close() @@ -647,10 +643,8 @@ def test_interactively_define_process_fail_main(self): stdout, stderr = check_subprocess_call( [sys.executable, filename], timeout=10 ) - if sys.platform == "win32": - assert "RuntimeError:" in stderr - else: - assert "RuntimeError:" in stdout + all_outputs = f"stdout:\n{stdout}\nstderr:\n{stderr}" + assert "RuntimeError:" in all_outputs, all_outputs finally: os.unlink(filename) diff --git a/tests/test_loky_module.py b/tests/test_loky_module.py index b9734eac..1c840913 100644 --- a/tests/test_loky_module.py +++ b/tests/test_loky_module.py @@ -3,8 +3,9 @@ import sys import shutil import tempfile +import textwrap import warnings -from subprocess import check_output +from subprocess import check_call, check_output import pytest @@ -215,3 +216,50 @@ def test_only_physical_cores_with_user_limitation(): if cpu_count_user < cpu_count_mp: assert cpu_count() == cpu_count_user assert cpu_count(only_physical_cores=True) == cpu_count_user + + +def test_freeze_support_with_pyinstaller(tmpdir): + pyinstaller = shutil.which("pyinstaller") + + if pyinstaller is None: + raise pytest.skip("pyinstaller is not installed") + + frozen_source_code = textwrap.dedent( + """ + import loky + + if __name__ == "__main__": + loky.freeze_support() + e = loky.get_reusable_executor(max_workers=2) + print(sum(e.map(int, range(10)))) + """ + ) + python_source_path = tmpdir / "frozen_loky.py" + python_source_path.write_text(frozen_source_code, encoding="utf-8") + + # Run the Python script directly: + non_frozen_result = check_output( + [sys.executable, python_source_path], + text=True, + ) + + # Call pyinstaller to generate the frozen_loky executable. + check_call( + [ + pyinstaller, + "--onefile", + "--distpath", + tmpdir, + "--specpath", + tmpdir, + python_source_path, + ] + ) + if sys.platform == "win32": + frozen_loky = tmpdir / "frozen_loky.exe" + else: + frozen_loky = tmpdir / "frozen_loky" + assert frozen_loky.exists() + + frozen_result = check_output([frozen_loky], text=True) + assert frozen_result == non_frozen_result diff --git a/tests/test_synchronize.py b/tests/test_synchronize.py index 1ceaa01c..cefd8225 100644 --- a/tests/test_synchronize.py +++ b/tests/test_synchronize.py @@ -113,7 +113,6 @@ def test_bounded_semaphore(self): assert_sem_value_equal(sem, 2) def test_timeout(self): - sem = loky_context.Semaphore(0) acquire = TimingWrapper(sem.acquire) diff --git a/tests/test_worker_timeout.py b/tests/test_worker_timeout.py index 740bd171..68872cbe 100644 --- a/tests/test_worker_timeout.py +++ b/tests/test_worker_timeout.py @@ -57,7 +57,6 @@ def close(self): @staticmethod def _feed(readlock, reader, writer, delay): - PICKLE_NONE = dumps(None) while True: