aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'pym/portage/tests/process/test_PopenProcess.py')
-rw-r--r--pym/portage/tests/process/test_PopenProcess.py85
1 files changed, 85 insertions, 0 deletions
diff --git a/pym/portage/tests/process/test_PopenProcess.py b/pym/portage/tests/process/test_PopenProcess.py
new file mode 100644
index 000000000..88da0b354
--- /dev/null
+++ b/pym/portage/tests/process/test_PopenProcess.py
@@ -0,0 +1,85 @@
+# Copyright 2012-2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import subprocess
+import tempfile
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util._async.PopenProcess import PopenProcess
+from portage.util._eventloop.global_event_loop import global_event_loop
+from _emerge.PipeReader import PipeReader
+
+class PopenPipeTestCase(TestCase):
+ """
+ Test PopenProcess, which can be useful for Jython support, since it
+ uses the subprocess.Popen instead of os.fork().
+ """
+
+ _echo_cmd = "echo -n '%s'"
+
+ def _testPipeReader(self, test_string):
+ """
+ Use a poll loop to read data from a pipe and assert that
+ the data written to the pipe is identical to the data
+ read from the pipe.
+ """
+
+ producer = PopenProcess(proc=subprocess.Popen(
+ ["bash", "-c", self._echo_cmd % test_string],
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+ pipe_reader=PipeReader(), scheduler=global_event_loop())
+
+ consumer = producer.pipe_reader
+ consumer.input_files = {"producer" : producer.proc.stdout}
+
+ producer.start()
+ producer.wait()
+
+ self.assertEqual(producer.returncode, os.EX_OK)
+ self.assertEqual(consumer.returncode, os.EX_OK)
+
+ return consumer.getvalue().decode('ascii', 'replace')
+
+ def _testPipeLogger(self, test_string):
+
+ producer = PopenProcess(proc=subprocess.Popen(
+ ["bash", "-c", self._echo_cmd % test_string],
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+ scheduler=global_event_loop())
+
+ fd, log_file_path = tempfile.mkstemp()
+ try:
+
+ consumer = PipeLogger(background=True,
+ input_fd=producer.proc.stdout,
+ log_file_path=log_file_path)
+
+ producer.pipe_reader = consumer
+
+ producer.start()
+ producer.wait()
+
+ self.assertEqual(producer.returncode, os.EX_OK)
+ self.assertEqual(consumer.returncode, os.EX_OK)
+
+ with open(log_file_path, 'rb') as f:
+ content = f.read()
+
+ finally:
+ os.close(fd)
+ os.unlink(log_file_path)
+
+ return content.decode('ascii', 'replace')
+
+ def testPopenPipe(self):
+ for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
+ test_string = x * "a"
+ output = self._testPipeReader(test_string)
+ self.assertEqual(test_string, output,
+ "x = %s, len(output) = %s" % (x, len(output)))
+
+ output = self._testPipeLogger(test_string)
+ self.assertEqual(test_string, output,
+ "x = %s, len(output) = %s" % (x, len(output)))