diff options
author | Zac Medico <zmedico@gentoo.org> | 2024-03-02 22:30:50 -0800 |
---|---|---|
committer | Zac Medico <zmedico@gentoo.org> | 2024-03-02 22:30:50 -0800 |
commit | c3ebdbb42e72335ca65335c855a82b99537c7606 (patch) | |
tree | d21e59a3bfa5ba38fb294b04438302a268063a94 /lib | |
parent | binarytree._populate_remote: Fix UnboundLocalError for binpkg-request-signature (diff) | |
download | portage-c3ebdbb42e72335ca65335c855a82b99537c7606.tar.gz portage-c3ebdbb42e72335ca65335c855a82b99537c7606.tar.bz2 portage-c3ebdbb42e72335ca65335c855a82b99537c7606.zip |
elog/mod_custom: Spawn processes in background
Since elog_process is typically called while the event loop is
running, hold references to spawned processes and wait for them
asynchronously, ultimately waiting for them if necessary when
the AsyncioEventLoop _close_main method calls _async_finalize
via portage.process.run_coroutine_exitfuncs().
ConfigProtectTestCase is useful for exercising this code, and
this little make.globals patch can be used to test failure
during finalize with this error message:
!!! PORTAGE_ELOG_COMMAND failed with exitcode 1
--- a/cnf/make.globals
+++ b/cnf/make.globals
@@ -144 +144,2 @@ PORTAGE_ELOG_CLASSES="log warn error"
-PORTAGE_ELOG_SYSTEM="save_summary:log,warn,error,qa echo"
+PORTAGE_ELOG_SYSTEM="save_summary:log,warn,error,qa echo custom"
+PORTAGE_ELOG_COMMAND="/bin/false"
Bug: https://bugs.gentoo.org/925907
Signed-off-by: Zac Medico <zmedico@gentoo.org>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/portage/elog/mod_custom.py | 73 | ||||
-rw-r--r-- | lib/portage/process.py | 29 | ||||
-rw-r--r-- | lib/portage/util/_eventloop/asyncio_event_loop.py | 8 |
3 files changed, 105 insertions, 5 deletions
diff --git a/lib/portage/elog/mod_custom.py b/lib/portage/elog/mod_custom.py index e0ae77e10..a3e199bcb 100644 --- a/lib/portage/elog/mod_custom.py +++ b/lib/portage/elog/mod_custom.py @@ -1,10 +1,33 @@ # elog/mod_custom.py - elog dispatch module -# Copyright 2006-2020 Gentoo Authors +# Copyright 2006-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +import types + +import portage import portage.elog.mod_save import portage.exception import portage.process +from portage.util.futures import asyncio + +# Since elog_process is typically called while the event loop is +# running, hold references to spawned processes and wait for them +# asynchronously, ultimately waiting for them if necessary when +# the AsyncioEventLoop _close_main method calls _async_finalize +# via portage.process.run_coroutine_exitfuncs(). +_proc_refs = None + + +def _get_procs() -> list[tuple[portage.process.MultiprocessingProcess, asyncio.Future]]: + """ + Return list of (proc, asyncio.ensure_future(proc.wait())) which is not + inherited from the parent after fork. + """ + global _proc_refs + if _proc_refs is None or _proc_refs.pid != portage.getpid(): + _proc_refs = types.SimpleNamespace(pid=portage.getpid(), procs=[]) + portage.process.atexit_register(_async_finalize) + return _proc_refs.procs def process(mysettings, key, logentries, fulltext): @@ -18,8 +41,50 @@ def process(mysettings, key, logentries, fulltext): mylogcmd = mysettings["PORTAGE_ELOG_COMMAND"] mylogcmd = mylogcmd.replace("${LOGFILE}", elogfilename) mylogcmd = mylogcmd.replace("${PACKAGE}", key) - retval = portage.process.spawn_bash(mylogcmd) - if retval != 0: + loop = asyncio.get_event_loop() + proc = portage.process.spawn_bash(mylogcmd, returnproc=True) + procs = _get_procs() + procs.append((proc, asyncio.ensure_future(proc.wait(), loop=loop))) + for index, (proc, waiter) in reversed(list(enumerate(procs))): + if not waiter.done(): + continue + del procs[index] + if waiter.result() != 0: + raise portage.exception.PortageException( + f"!!! PORTAGE_ELOG_COMMAND failed with exitcode {waiter.result()}" + ) + + +async def _async_finalize(): + """ + Async finalize is preferred, since we can wait for process exit status. + """ + procs = _get_procs() + while procs: + proc, waiter = procs.pop() + if (await waiter) != 0: + raise portage.exception.PortageException( + f"!!! PORTAGE_ELOG_COMMAND failed with exitcode {waiter.result()}" + ) + + +def finalize(): + """ + NOTE: This raises PortageException if there are any processes + still running, so it's better to use _async_finalize instead + (invoked via portage.process.run_coroutine_exitfuncs() in + the AsyncioEventLoop _close_main method). + """ + procs = _get_procs() + while procs: + proc, waiter = procs.pop() + if not waiter.done(): + waiter.cancel() + proc.terminate() + raise portage.exception.PortageException( + f"!!! PORTAGE_ELOG_COMMAND was killed after it was found running in the background (pid {proc.pid})" + ) + elif waiter.result() != 0: raise portage.exception.PortageException( - "!!! PORTAGE_ELOG_COMMAND failed with exitcode %d" % retval + f"!!! PORTAGE_ELOG_COMMAND failed with exitcode {waiter.result()}" ) diff --git a/lib/portage/process.py b/lib/portage/process.py index b652e3294..1bc0c507c 100644 --- a/lib/portage/process.py +++ b/lib/portage/process.py @@ -6,6 +6,7 @@ import atexit import errno import fcntl +import io import logging import multiprocessing import platform @@ -226,6 +227,34 @@ def run_exitfuncs(): raise exc_info[0](exc_info[1]).with_traceback(exc_info[2]) +async def run_coroutine_exitfuncs(): + """ + This is the same as run_exitfuncs but it uses asyncio.iscoroutinefunction + to check which functions to run. It is called by the AsyncioEventLoop + _close_main method just before the loop is closed. + """ + tasks = [] + for index, (func, targs, kargs) in reversed(list(enumerate(_exithandlers))): + if asyncio.iscoroutinefunction(func): + del _exithandlers[index] + tasks.append(asyncio.ensure_future(func(*targs, **kargs))) + tracebacks = [] + exc_info = None + for task in tasks: + try: + await task + except Exception: + file = io.StringIO() + traceback.print_exc(file=file) + tracebacks.append(file.getvalue()) + exc_info = sys.exc_info() + if len(tracebacks) > 1: + for tb in tracebacks[:-1]: + print(tb, file=sys.stderr, flush=True) + if exc_info is not None: + raise exc_info[1].with_traceback(exc_info[2]) + + atexit.register(run_exitfuncs) # It used to be necessary for API consumers to remove pids from spawned_pids, diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py index ee9e4c60e..a598b1b51 100644 --- a/lib/portage/util/_eventloop/asyncio_event_loop.py +++ b/lib/portage/util/_eventloop/asyncio_event_loop.py @@ -79,8 +79,14 @@ class AsyncioEventLoop(_AbstractEventLoop): # we can properly wait for it and avoid messages like this: # [ERROR] Task was destroyed but it is pending! if socks5.proxy.is_running(): - await socks5.proxy.stop() + # TODO: Convert socks5.proxy.stop() to a regular coroutine + # function so that it doesn't need to be wrapped like this. + async def stop_socks5_proxy(): + await socks5.proxy.stop() + portage.process.atexit_register(stop_socks5_proxy) + + await portage.process.run_coroutine_exitfuncs() portage.process.run_exitfuncs() @staticmethod |