aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/snakeoil/cli/arghparse.py2
-rw-r--r--src/snakeoil/process/spawn.py86
2 files changed, 45 insertions, 43 deletions
diff --git a/src/snakeoil/cli/arghparse.py b/src/snakeoil/cli/arghparse.py
index a1209092..09eaed36 100644
--- a/src/snakeoil/cli/arghparse.py
+++ b/src/snakeoil/cli/arghparse.py
@@ -1309,7 +1309,7 @@ class ArgumentParser(OptionalsParser, CsvActionsParser):
self.error(str(exc))
# run final arg validation
- for check in set(args.__dict__.keys()):
+ for check in tuple(args.__dict__.keys()):
if check.startswith("__final_check__"):
functor = args.pop(check)
functor(self, args)
diff --git a/src/snakeoil/process/spawn.py b/src/snakeoil/process/spawn.py
index ce8fe58a..27095988 100644
--- a/src/snakeoil/process/spawn.py
+++ b/src/snakeoil/process/spawn.py
@@ -16,6 +16,7 @@ import itertools
import os
import signal
import sys
+from typing import Iterable, Optional, Sequence, Union
from ..mappings import ProtectedDict
from ..osutils import access
@@ -27,12 +28,12 @@ SANDBOX_BINARY = find_binary("sandbox", fallback="/usr/bin/sandbox")
try:
import resource
- max_fd_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
+ max_fd_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
except ImportError:
max_fd_limit = 256
-def bash_version(force=False):
+def bash_version(force: bool = False):
"""Get the system bash version of the form major.minor.patch."""
if not force:
try:
@@ -41,13 +42,13 @@ def bash_version(force=False):
pass
try:
ret, ver = spawn_get_output(
- [
+ (
BASH_BINARY,
"--norc",
"--noprofile",
"-c",
"printf ${BASH_VERSINFO[0]}.${BASH_VERSINFO[1]}.${BASH_VERSINFO[2]}",
- ]
+ )
)
if ret == 0:
try:
@@ -62,7 +63,9 @@ def bash_version(force=False):
return ver
-def spawn_bash(mycommand, debug=False, name=None, **kwds):
+def spawn_bash(
+ mycommand: Union[str, Iterable[str]], debug: bool = False, name: str = None, **kwds
+):
"""spawn the command via bash -c"""
args = [BASH_BINARY, "--norc", "--noprofile"]
@@ -79,7 +82,7 @@ def spawn_bash(mycommand, debug=False, name=None, **kwds):
return spawn(args, name=name, **kwds)
-def spawn_sandbox(mycommand, name=None, **kwds):
+def spawn_sandbox(mycommand: Union[str, Iterable[str]], name: str = None, **kwds):
"""spawn the command under sandboxed"""
if not is_sandbox_capable():
@@ -134,10 +137,9 @@ atexit.register(run_exitfuncs)
spawned_pids = []
-def cleanup_pids(pids=None):
+def cleanup_pids(pids: Iterable[int] = None):
"""reap list of pids if specified, else all children"""
- global spawned_pids
if pids is None:
pids = spawned_pids
elif pids is not spawned_pids:
@@ -162,17 +164,17 @@ def cleanup_pids(pids=None):
def spawn(
- mycommand,
- env=None,
- name=None,
- fd_pipes=None,
- returnpid=False,
- uid=None,
- gid=None,
- groups=None,
- umask=None,
- cwd=None,
- pgid=None,
+ mycommand: Union[str, Sequence[str]],
+ env: Optional[dict[str, str]] = None,
+ name: Optional[str] = None,
+ fd_pipes: Optional[dict[int, int]] = None,
+ returnpid: bool = False,
+ uid: Optional[int] = None,
+ gid: Optional[int] = None,
+ groups: Optional[Sequence[int]] = None,
+ umask: Optional[int] = None,
+ cwd: Optional[str] = None,
+ pgid: Optional[int] = None,
):
"""wrapper around execve
@@ -216,11 +218,11 @@ def spawn(
cwd,
pgid,
)
- except Exception as e:
+ except Exception as exc:
# We need to catch _any_ exception so that it doesn't
- # propogate out of this function and cause exiting
+ # propagate out of this function and cause exiting
# with anything other than os._exit()
- sys.stderr.write("%s:\n %s\n" % (e, " ".join(mycommand)))
+ sys.stderr.write(f"{exc}:\n {' '.join(mycommand)}\n")
os._exit(1)
# Add the pid to our local and the global pid lists.
@@ -266,17 +268,17 @@ def spawn(
def _exec(
- binary,
- mycommand,
- name=None,
- fd_pipes=None,
- env=None,
- gid=None,
- groups=None,
- uid=None,
- umask=None,
- cwd=None,
- pgid=None,
+ binary: str,
+ mycommand: Sequence[str],
+ name: str = None,
+ fd_pipes: Optional[dict[int, int]] = None,
+ env: Optional[dict[str, str]] = None,
+ gid: Optional[int] = None,
+ groups: Optional[Sequence[int]] = None,
+ uid: Optional[int] = None,
+ umask: Optional[int] = None,
+ cwd: Optional[str] = None,
+ pgid: Optional[int] = None,
):
"""internal function to handle exec'ing the child process.
@@ -304,7 +306,7 @@ def _exec(
yield potential
# If we haven't been told what file descriptors to use
- # default to propogating our stdin, stdout and stderr.
+ # default to propagating our stdin, stdout and stderr.
if fd_pipes is None:
fd_pipes = {0: 0, 1: 1, 2: 2}
@@ -370,12 +372,12 @@ def _exec(
def spawn_get_output(
- mycommand,
+ mycommand: Union[str, Iterable[str]],
spawn_type=None,
- raw_exit_code=False,
- collect_fds=(1,),
- fd_pipes=None,
- split_lines=True,
+ raw_exit_code: bool = False,
+ collect_fds: Iterable[int] = (1,),
+ fd_pipes: Optional[dict[int, int]] = None,
+ split_lines: bool = True,
**kwds,
):
@@ -435,7 +437,7 @@ def spawn_get_output(
pass
-def process_exit_code(retval):
+def process_exit_code(retval: int):
"""Process a waitpid returned exit code.
:return: The exit code if it exit'd, the signal if it died from signalling.
@@ -460,7 +462,7 @@ class ExecutionFailure(Exception):
# cached capabilities
-def is_sandbox_capable(force=False):
+def is_sandbox_capable(force: bool = False):
if not force:
try:
return is_sandbox_capable.cached_result
@@ -481,7 +483,7 @@ def is_sandbox_capable(force=False):
return res
-def is_userpriv_capable(force=False):
+def is_userpriv_capable(force: bool = False):
if not force:
try:
return is_userpriv_capable.cached_result