aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'bin/ebuild-ipc.py')
-rwxr-xr-xbin/ebuild-ipc.py177
1 files changed, 53 insertions, 124 deletions
diff --git a/bin/ebuild-ipc.py b/bin/ebuild-ipc.py
index 3caf2d185..00337ee22 100755
--- a/bin/ebuild-ipc.py
+++ b/bin/ebuild-ipc.py
@@ -1,20 +1,17 @@
-#!/usr/bin/python
-# Copyright 2010-2012 Gentoo Foundation
+#!/usr/bin/python -b
+# Copyright 2010-2014 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
#
# This is a helper which ebuild processes can use
# to communicate with portage's main python process.
-import errno
import logging
import os
import pickle
import platform
-import select
import signal
import sys
import time
-import traceback
def debug_signal(signum, frame):
import pdb
@@ -38,14 +35,28 @@ if os.environ.get("SANDBOX_ON") == "1":
":".join(filter(None, sandbox_write))
import portage
+portage._internal_caller = True
portage._disable_legacy_globals()
+from portage.util._async.ForkProcess import ForkProcess
+from portage.util._eventloop.global_event_loop import global_event_loop
+from _emerge.PipeReader import PipeReader
+
+class FifoWriter(ForkProcess):
+
+ __slots__ = ('buf', 'fifo',)
+
+ def _run(self):
+ # Atomically write the whole buffer into the fifo.
+ with open(self.fifo, 'wb', 0) as f:
+ f.write(self.buf)
+ return os.EX_OK
+
class EbuildIpc(object):
# Timeout for each individual communication attempt (we retry
# as long as the daemon process appears to be alive).
- _COMMUNICATE_RETRY_TIMEOUT_SECONDS = 15
- _BUFSIZE = 4096
+ _COMMUNICATE_RETRY_TIMEOUT_MS = 15000
def __init__(self):
self.fifo_dir = os.environ['PORTAGE_BUILDDIR']
@@ -89,7 +100,7 @@ class EbuildIpc(object):
'ebuild-ipc: daemon process not detected\n'),
level=logging.ERROR, noiselevel=-1)
- def _wait(self, pid, pr, msg):
+ def _run_writer(self, fifo_writer, msg):
"""
Wait on pid and return an appropriate exit code. This
may return unsuccessfully due to timeout if the daemon
@@ -98,88 +109,48 @@ class EbuildIpc(object):
start_time = time.time()
- while True:
- try:
- events = select.select([pr], [], [],
- self._COMMUNICATE_RETRY_TIMEOUT_SECONDS)
- except select.error as e:
- portage.util.writemsg_level(
- "ebuild-ipc: %s: %s\n" % \
- (portage.localization._('during select'), e),
- level=logging.ERROR, noiselevel=-1)
- continue
+ fifo_writer.start()
+ eof = fifo_writer.poll() is not None
- if events[0]:
- break
+ while not eof:
+ fifo_writer._wait_loop(timeout=self._COMMUNICATE_RETRY_TIMEOUT_MS)
- if self._daemon_is_alive():
+ eof = fifo_writer.poll() is not None
+ if eof:
+ break
+ elif self._daemon_is_alive():
self._timeout_retry_msg(start_time, msg)
else:
+ fifo_writer.cancel()
self._no_daemon_msg()
- try:
- os.kill(pid, signal.SIGKILL)
- os.waitpid(pid, 0)
- except OSError as e:
- portage.util.writemsg_level(
- "ebuild-ipc: %s\n" % (e,),
- level=logging.ERROR, noiselevel=-1)
+ fifo_writer.wait()
return 2
- try:
- wait_retval = os.waitpid(pid, 0)
- except OSError as e:
- portage.util.writemsg_level(
- "ebuild-ipc: %s: %s\n" % (msg, e),
- level=logging.ERROR, noiselevel=-1)
- return 2
+ return fifo_writer.wait()
- if not os.WIFEXITED(wait_retval[1]):
- portage.util.writemsg_level(
- "ebuild-ipc: %s: %s\n" % (msg,
- portage.localization._('subprocess failure: %s') % \
- wait_retval[1]),
- level=logging.ERROR, noiselevel=-1)
- return 2
+ def _receive_reply(self, input_fd):
- return os.WEXITSTATUS(wait_retval[1])
+ start_time = time.time()
- def _receive_reply(self, input_fd):
+ pipe_reader = PipeReader(input_files={"input_fd":input_fd},
+ scheduler=global_event_loop())
+ pipe_reader.start()
- # Timeouts are handled by the parent process, so just
- # block until input is available. For maximum portability,
- # use a single atomic read.
- buf = None
- while True:
- try:
- events = select.select([input_fd], [], [])
- except select.error as e:
- portage.util.writemsg_level(
- "ebuild-ipc: %s: %s\n" % \
- (portage.localization._('during select for read'), e),
- level=logging.ERROR, noiselevel=-1)
- continue
-
- if events[0]:
- # For maximum portability, use os.read() here since
- # array.fromfile() and file.read() are both known to
- # erroneously return an empty string from this
- # non-blocking fifo stream on FreeBSD (bug #337465).
- try:
- buf = os.read(input_fd, self._BUFSIZE)
- except OSError as e:
- if e.errno != errno.EAGAIN:
- portage.util.writemsg_level(
- "ebuild-ipc: %s: %s\n" % \
- (portage.localization._('read error'), e),
- level=logging.ERROR, noiselevel=-1)
- break
- # Assume that another event will be generated
- # if there's any relevant data.
- continue
-
- # Only one (atomic) read should be necessary.
- if buf:
- break
+ eof = pipe_reader.poll() is not None
+
+ while not eof:
+ pipe_reader._wait_loop(timeout=self._COMMUNICATE_RETRY_TIMEOUT_MS)
+ eof = pipe_reader.poll() is not None
+ if not eof:
+ if self._daemon_is_alive():
+ self._timeout_retry_msg(start_time,
+ portage.localization._('during read'))
+ else:
+ pipe_reader.cancel()
+ self._no_daemon_msg()
+ return 2
+
+ buf = pipe_reader.getvalue()
retval = 2
@@ -232,32 +203,9 @@ class EbuildIpc(object):
# un-interrupted, while the parent handles all timeout
# considerations. This helps to avoid possible race conditions
# from interference between timeouts and blocking IO operations.
- pr, pw = os.pipe()
- pid = os.fork()
-
- if pid == 0:
- retval = 2
- try:
- os.close(pr)
-
- # File streams are in unbuffered mode since we do atomic
- # read and write of whole pickles.
- output_file = open(self.ipc_in_fifo, 'wb', 0)
- output_file.write(pickle.dumps(args))
- output_file.close()
- retval = os.EX_OK
- except SystemExit:
- raise
- except:
- traceback.print_exc()
- finally:
- os._exit(retval)
-
- os.close(pw)
-
msg = portage.localization._('during write')
- retval = self._wait(pid, pr, msg)
- os.close(pr)
+ retval = self._run_writer(FifoWriter(buf=pickle.dumps(args),
+ fifo=self.ipc_in_fifo, scheduler=global_event_loop()), msg)
if retval != os.EX_OK:
portage.util.writemsg_level(
@@ -270,26 +218,7 @@ class EbuildIpc(object):
self._no_daemon_msg()
return 2
- pr, pw = os.pipe()
- pid = os.fork()
-
- if pid == 0:
- retval = 2
- try:
- os.close(pr)
- retval = self._receive_reply(input_fd)
- except SystemExit:
- raise
- except:
- traceback.print_exc()
- finally:
- os._exit(retval)
-
- os.close(pw)
- retval = self._wait(pid, pr, portage.localization._('during read'))
- os.close(pr)
- os.close(input_fd)
- return retval
+ return self._receive_reply(input_fd)
def ebuild_ipc_main(args):
ebuild_ipc = EbuildIpc()