aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--roverlay/remote/rsync.py114
-rw-r--r--roverlay/tools/runcmd.py10
-rw-r--r--roverlay/tools/shenv.py41
3 files changed, 84 insertions, 81 deletions
diff --git a/roverlay/remote/rsync.py b/roverlay/remote/rsync.py
index 670e714..24e25f8 100644
--- a/roverlay/remote/rsync.py
+++ b/roverlay/remote/rsync.py
@@ -10,10 +10,15 @@ __all__ = [ 'RsyncRepo', ]
import os
import sys
-import subprocess
from roverlay import config, util
+import roverlay.tools.subproc
+from roverlay.tools.subproc import create_subprocess as _create_subprocess
+from roverlay.tools.subproc import stop_subprocess as _stop_subprocess
+from roverlay.tools.subproc import \
+ gracefully_stop_subprocess as _gracefully_stop_subprocess
+
from roverlay.remote.basicrepo import BasicRepo
RSYNC_ENV = util.keepenv (
@@ -55,6 +60,44 @@ DEFAULT_RSYNC_OPTS = (
'--chmod=ugo=r,u+w,Dugo+x', # 0755 for transferred dirs, 0644 for files
)
+def run_rsync ( cmdv, env=RSYNC_ENV ):
+ """Runs an rsync command and terminates/kills the process on error.
+
+ Returns: the command's returncode
+
+ Raises: Passes all exceptions
+
+ arguments:
+ * cmdv -- rsync command to (including the rsync executable!)
+ * env -- environment dict, defaults to RSYNC_ENV
+ """
+ proc = _create_subprocess ( cmdv, env=env )
+
+ try:
+ proc.communicate()
+
+ except KeyboardInterrupt:
+ sys.stderr.write (
+ "\nKeyboard interrupt - waiting for rsync to exit...\n"
+ )
+ # send SIGTERM and wait,
+ # fall back to _stop_subprocess() if another exception is hit
+ _gracefully_stop_subprocess ( proc, kill_timeout_cs=40 )
+ raise
+
+ except Exception:
+ # send SIGTERM, wait up to 4 seconds before sending SIGKILL
+ _stop_subprocess ( proc, kill_timeout_cs=40 )
+ raise
+ # --
+
+ if proc.returncode == RSYNC_SIGINT:
+ raise KeyboardInterrupt ( "propagated from rsync" )
+
+ return proc.returncode
+# --- end of run_rsync (...) ----
+
+
class RsyncRepo ( BasicRepo ):
def __init__ ( self,
@@ -113,9 +156,8 @@ class RsyncRepo ( BasicRepo ):
argv.extend ( ( self.remote_uri, self.distdir ) )
- # removing emty args from argv
- return tuple ( filter ( None, argv ) )
-
+ # remove empty args from argv
+ return [ arg for arg in argv if arg ]
# --- end of _rsync_argv (...) ---
def _dosync ( self ):
@@ -124,66 +166,38 @@ class RsyncRepo ( BasicRepo ):
"""
assert os.EX_OK not in RETRY_ON_RETCODE
- def waitfor ( p ):
- if p.communicate() != ( None, None ):
- raise AssertionError ( "expected None,None from communicate!" )
- if p.returncode == RSYNC_SIGINT:
- raise KeyboardInterrupt ( "propagated from rsync" )
-
- return p.returncode
- # --- end of waitfor (...) ---
-
- retcode = None
- proc = None
-
+ rsync_cmd = self._rsync_argv()
+ retcode = None
try:
- rsync_cmd = self._rsync_argv()
util.dodir ( self.distdir, mkdir_p=True )
self.logger.debug ( 'running rsync cmd: ' + ' '.join ( rsync_cmd ) )
- retry_count = 0
-
- proc = subprocess.Popen ( rsync_cmd, env=RSYNC_ENV )
- retcode = waitfor ( proc )
- proc = None
+ retcode = run_rsync ( rsync_cmd )
- while retcode in RETRY_ON_RETCODE and retry_count < MAX_RSYNC_RETRY:
- # this handles retcodes like
- # * 24: "Partial transfer due to vanished source files"
+ if retcode in RETRY_ON_RETCODE:
+ for retry_count in range ( MAX_RSYNC_RETRY ):
+ # this handles retcodes like
+ # * 24: "Partial transfer due to vanished source files"
- retry_count += 1
-
- self.logger.warning (
- "rsync returned {ret!r}, retrying ({now}/{_max})".format (
- ret=retcode, now=retry_count, _max=MAX_RSYNC_RETRY
+ self.logger.warning (
+ "rsync returned {ret!r}, retrying ({now}/{_max})".format (
+ ret=retcode, now=retry_count, _max=MAX_RSYNC_RETRY
+ )
)
- )
- proc = subprocess.Popen ( rsync_cmd, env=RSYNC_ENV )
- retcode = waitfor ( proc )
- proc = None
- # -- end while
+ retcode = run_rsync ( rsync_cmd )
+ if retcode not in RETRY_ON_RETCODE: break
+ # -- end if <want retry>
except KeyboardInterrupt:
- # maybe add terminate/kill code here,
- # similar to roverlay.tools.shenv->run_script()
- #
- sys.stderr.write (
- "\nKeyboard interrupt - waiting for rsync to exit...\n"
- )
- if proc is not None:
- proc.communicate()
- retcode = proc.returncode
- else:
- retcode = RSYNC_SIGINT
-
- if RERAISE_INTERRUPT:
- raise
+ retcode = RSYNC_SIGINT
+ if RERAISE_INTERRUPT: raise
except Exception as e:
# catch exceptions, log them and return False
+ retcode = None
self.logger.exception ( e )
-
+ # --
if retcode == os.EX_OK:
return True
diff --git a/roverlay/tools/runcmd.py b/roverlay/tools/runcmd.py
index 8c38cbd..7561669 100644
--- a/roverlay/tools/runcmd.py
+++ b/roverlay/tools/runcmd.py
@@ -9,21 +9,21 @@ import os
import subprocess
import roverlay.strutil
+import roverlay.tools.subproc
+from roverlay.tools.subproc import run_subprocess as _run_subprocess
DEBUG_TO_CONSOLE = False
def run_command_get_output (
cmdv, env, debug_to_console=False, use_filter=True, filter_func=None,
- binary_stdout=False,
+ binary_stdout=False, stdin=None
):
-
# note that debug_to_console breaks calls that want to parse stdout
pipe_target = None if debug_to_console else subprocess.PIPE
- cmd_call = subprocess.Popen (
- cmdv, stdin=None, stdout=pipe_target, stderr=pipe_target, env=env
+ cmd_call, raw_output = _run_subprocess (
+ cmdv, stdin=stdin, stdout=pipe_target, stderr=pipe_target, env=env
)
- raw_output = cmd_call.communicate()
if binary_stdout:
assert len ( raw_output ) == 2
diff --git a/roverlay/tools/shenv.py b/roverlay/tools/shenv.py
index 98cd86d..8f2cea2 100644
--- a/roverlay/tools/shenv.py
+++ b/roverlay/tools/shenv.py
@@ -17,6 +17,8 @@ import roverlay.config
import roverlay.strutil
import roverlay.util
import roverlay.stats.collector
+import roverlay.tools.subproc
+from roverlay.tools.subproc import run_subprocess as _run_subprocess
# _SHELL_ENV, _SHELL_INTPR are created when calling run_script()
@@ -346,37 +348,24 @@ def run_script_exec (
def run_script (
script, phase, argv=(), return_success=False, logger=None,
- log_output=True, initial_dir=None
+ log_output=True, initial_dir=None, allow_stdin=True
):
# global _SHELL_INTPR
# if _SHELL_INTPR is None:
# _SHELL_INTPR = roverlay.config.get ( 'SHELL_ENV.shell', '/bin/sh' )
- my_logger = logger or LOGGER
- my_env = get_env ( phase )
- script_call = None
-
- try:
- script_call = subprocess.Popen (
- # ( _SHELL_INTPR, script, ),
- ( script, ) + argv,
- stdin = None,
- stdout = subprocess.PIPE if log_output else None,
- stderr = subprocess.PIPE if log_output else None,
- cwd = my_env ['S'] if initial_dir is None else initial_dir,
- env = my_env,
- )
-
- output = script_call.communicate()
- except:
- if script_call is not None:
- try:
- script_call.terminate()
- time.sleep ( 1 )
- finally:
- script_call.kill()
- raise
-
+ my_logger = logger or LOGGER
+ my_env = get_env ( phase )
+
+ script_call, output = _run_subprocess (
+ # ( _SHELL_INTPR, script, ),
+ ( script, ) + argv,
+ stdin = None if allow_stdin else False,
+ stdout = subprocess.PIPE if log_output else None,
+ stderr = subprocess.PIPE if log_output else None,
+ cwd = my_env ['S'] if initial_dir is None else initial_dir,
+ env = my_env,
+ )
if log_output:
log_snip_here = (