changeset 5212:ce70252a3e90

Xapian indexing: fix deadlocks, new MoinMoin.util.SubProcess module (see below) Due to the usage of multiple pipes (stdout, stderr) to the subprocess (usually a external filter program like pdftotext or antiword), deadlocks could occur (especially when the filter program wrote lots of output to stderr, larger than the buffer). This was (hopefully) fixed as follows: Python >= 2.4: use enhanced subclass of subprocess.Popen that implements a timeout - if the process did not end when the timeout is reached, it will get killed automatically. Note: there is windows code to support the timeout functionality, but only the linux code was tested. On windows with Python < 2.6, you need Python win32 extensions to support process termination. Python 2.3: use popen2 to only get stdout (not stderr) - no support for: * timeout * stderr * rc Please use Python >= 2.4 if you have issues with this. We will drop the popen2 based code after requiring Python 2.4 with moin 1.9.
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Wed, 07 Oct 2009 18:47:05 +0200
parents de38d8179cb2
children 5204e8eb9737 5c20ed1c0e24
files MoinMoin/filter/__init__.py MoinMoin/util/SubProcess.py
diffstat 2 files changed, 263 insertions(+), 85 deletions(-) [+]
line wrap: on
line diff
--- a/MoinMoin/filter/__init__.py	Wed Oct 07 18:23:44 2009 +0200
+++ b/MoinMoin/filter/__init__.py	Wed Oct 07 18:47:05 2009 +0200
@@ -9,12 +9,6 @@
 import sys, os
 import time
 
-try:
-    # requires Python >= 2.4
-    import subprocess
-except ImportError:
-    subprocess = None
-
 from MoinMoin import log
 logging = log.getLogger(__name__)
 
@@ -24,6 +18,23 @@
 
 standard_codings = ['utf-8', 'iso-8859-15', 'iso-8859-1', ]
 
+try:
+    # requires Python >= 2.4
+    from MoinMoin.util.SubProcess import exec_cmd
+except ImportError:
+    # Note: remove this after requiring 2.4 for moin
+    def exec_cmd(cmd, input=None, timeout=None):
+        child_stdin, child_stdout = os.popen2(cmd)
+        # Note: we do NOT use popen3 because it would cause
+        #       deadlocks with such simple code as below:
+        if input:
+            child_stdin.write(input)
+        data = child_stdout.read()
+        child_stdout.close()
+        errors = '' # we don't have it (see note above about popen3)
+        rc = 0 # we don't have it, TODO: check if we can get it somehow
+        return data, errors, rc
+
 
 def quote_filename(filename):
     """ quote a filename (could contain blanks or other special chars) in a
@@ -39,91 +50,14 @@
     return filename
 
 
-def exec_cmd(cmd, timeout=300):
-    logging.debug("Trying to start command: %s" % cmd)
-    if subprocess:
-        tstart = time.time()
-        p = subprocess.Popen(cmd, shell=True,
-                             close_fds=not subprocess.mswindows,
-                             bufsize=1024,
-                             stdin=subprocess.PIPE,
-                             stdout=subprocess.PIPE,
-                             stderr=subprocess.PIPE)
-        child_stdin, child_stdout, child_stderr = p.stdin, p.stdout, p.stderr
-
-        tmax = tstart + timeout
-        tnow = time.time()
-        data = []
-        errors = []
-        rc = None
-        while rc is None and tnow < tmax:
-            time.sleep(0.001) # wait at least 1ms between polls
-            data.append(child_stdout.read())
-            errors.append(child_stderr.read())
-            rc = p.poll()
-            tnow = time.time()
-
-        child_stdout.close()
-        child_stderr.close()
-
-        data = ''.join(data)
-        errors = ''.join(errors)
-
-        t = tnow - tstart
-        if rc is None:
-            logging.warning("Command '%s' timed out (%fs)" % (cmd, t))
-            rc = terminate(p)
-
-        if rc < 0:
-            logging.warning("Command '%s' was terminated by signal %d (%fs)" % (cmd, -rc, t))
-        elif rc > 0:
-            logging.warning("Command '%s' terminated with rc=%d (%fs)" % (cmd, rc, t))
-        else:
-            logging.debug("Command '%s' terminated normally rc=%d (%fs)" % (cmd, rc, t))
-
-    else:
-        child_stdin, child_stdout, child_stderr = os.popen3(cmd)
-        data = child_stdout.read()
-        errors = child_stderr.read()
-        child_stdout.close()
-        child_stderr.close()
-
-    logging.debug("Command '%s', stderr: %s, stdout: %d bytes" % (cmd, errors, len(data)))
-    return data, errors
-
-
-def terminate(proc):
-    """ subprocess.Popen.terminate is not implemented on some Windows python versions.
-        This workaround works on both POSIX and Windows.
-        Originally from: Guillaume Rava, http://code.activestate.com/recipes/576667/
-        terminate also seems to be only implemented in Python >= 2.6.
-    """
-    try:
-        proc.terminate()
-    except AttributeError:
-        if not subprocess.mswindows:
-            import signal
-            os.kill(proc.pid, signal.SIGTERM)
-        else:
-            try:
-                import win32api
-            except ImportError:
-                logging.warning("could not import win32api, please install win32 extensions")
-            else:
-                PROCESS_TERMINATE = 1
-                handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
-                win32api.TerminateProcess(handle, -1)
-                win32api.CloseHandle(handle)
-    return proc.wait() # wait until it is really dead
-
-
 def execfilter(cmd, filename, codings=standard_codings):
     """ use cmd to get plaintext content of filename
         to decode to unicode, we use the first coding of codings list that
         does not throw an exception or force ascii
     """
     filter_cmd = cmd % quote_filename(filename)
-    data, errors = exec_cmd(filter_cmd)
+    data, errors, rc = exec_cmd(filter_cmd, timeout=300)
+    logging.debug("Command '%s', rc: %d, stdout: %d bytes, stderr: %s" % (filter_cmd, rc, len(data), errors))
     for c in codings:
         try:
             return data.decode(c)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/MoinMoin/util/SubProcess.py	Wed Oct 07 18:47:05 2009 +0200
@@ -0,0 +1,244 @@
+"""
+Enhanced subprocess.Popen subclass, supporting:
+    * .communicate() with timeout
+    * kill/terminate/send_signal (like in Py 2.6) for Py 2.4 / 2.5
+
+Sample usage:
+    out, err = Popen(...).communicate(input, timeout=300)
+"""
+
+import os
+import time
+import subprocess
+import threading
+import signal
+
+if subprocess.mswindows:
+    try:
+        # Python >= 2.6 should have this:
+        from _subprocess import TerminateProcess
+    except ImportError:
+        # otherwise you need win32 extensions:
+        from win32process import TerminateProcess
+else:
+    import select
+
+class Popen(subprocess.Popen):
+    # send_signal, terminate, kill copied from Python 2.6
+    # (we want to support Python >= 2.4)
+    if subprocess.mswindows:
+        def send_signal(self, sig):
+            """Send a signal to the process
+            """
+            if sig == signal.SIGTERM:
+                self.terminate()
+            else:
+                raise ValueError("Only SIGTERM is supported on Windows")
+
+        def terminate(self):
+            """Terminates the process
+            """
+            TerminateProcess(self._handle, 1)
+
+        kill = terminate
+
+    else: # POSIX
+        def send_signal(self, sig):
+            """Send a signal to the process
+            """
+            os.kill(self.pid, sig)
+
+        def terminate(self):
+            """Terminate the process with SIGTERM
+            """
+            self.send_signal(signal.SIGTERM)
+
+        def kill(self):
+            """Kill the process with SIGKILL
+            """
+            self.send_signal(signal.SIGKILL)
+
+    def communicate(self, input=None, timeout=None):
+        """Interact with process: Send data to stdin.  Read data from
+        stdout and stderr, until end-of-file is reached.  Wait for
+        process to terminate.  The optional input argument should be a
+        string to be sent to the child process, or None, if no data
+        should be sent to the child.
+
+        communicate() returns a tuple (stdout, stderr)."""
+
+        self.timeout = timeout
+
+        # Optimization: If we are only using one pipe, or no pipe at
+        # all, using select() or threads is unnecessary.
+        if [self.stdin, self.stdout, self.stderr].count(None) >= 2:
+            stdout = None
+            stderr = None
+            if self.stdin:
+                if input:
+                    self._fo_write_no_intr(self.stdin, input)
+                self.stdin.close()
+            elif self.stdout:
+                stdout = self._fo_read_no_intr(self.stdout)
+                self.stdout.close()
+            elif self.stderr:
+                stderr = self._fo_read_no_intr(self.stderr)
+                self.stderr.close()
+            self.wait()
+            return (stdout, stderr)
+
+        return self._communicate(input)
+
+    if subprocess.mswindows:
+        def _communicate(self, input):
+            stdout = None # Return
+            stderr = None # Return
+
+            if self.stdout:
+                stdout = []
+                stdout_thread = threading.Thread(target=self._readerthread,
+                                                 args=(self.stdout, stdout))
+                stdout_thread.setDaemon(True)
+                stdout_thread.start()
+            if self.stderr:
+                stderr = []
+                stderr_thread = threading.Thread(target=self._readerthread,
+                                                 args=(self.stderr, stderr))
+                stderr_thread.setDaemon(True)
+                stderr_thread.start()
+
+            if self.stdin:
+                if input is not None:
+                    self.stdin.write(input)
+                self.stdin.close()
+
+            if self.stdout:
+                stdout_thread.join(self.timeout)
+            if self.stderr:
+                stderr_thread.join(self.timeout)
+
+            # if the threads are still alive, that means the thread join timed out
+            timed_out = (self.stdout and self.stdout_thread.isAlive() or
+                         self.stderr and self.stderr_thread.isAlive())
+            if timed_out:
+                self.kill()
+            else:
+                self.wait()
+
+            # All data exchanged.  Translate lists into strings.
+            if stdout is not None:
+                stdout = stdout[0]
+            if stderr is not None:
+                stderr = stderr[0]
+
+            # Translate newlines, if requested.  We cannot let the file
+            # object do the translation: It is based on stdio, which is
+            # impossible to combine with select (unless forcing no
+            # buffering).
+            if self.universal_newlines and hasattr(file, 'newlines'):
+                if stdout:
+                    stdout = self._translate_newlines(stdout)
+                if stderr:
+                    stderr = self._translate_newlines(stderr)
+
+            return (stdout, stderr)
+
+    else: # POSIX
+        def _communicate(self, input):
+            timed_out = False
+            read_set = []
+            write_set = []
+            stdout = None # Return
+            stderr = None # Return
+
+            if self.stdin:
+                # Flush stdio buffer.  This might block, if the user has
+                # been writing to .stdin in an uncontrolled fashion.
+                self.stdin.flush()
+                if input:
+                    write_set.append(self.stdin)
+                else:
+                    self.stdin.close()
+            if self.stdout:
+                read_set.append(self.stdout)
+                stdout = []
+            if self.stderr:
+                read_set.append(self.stderr)
+                stderr = []
+
+            input_offset = 0
+            while read_set or write_set:
+                try:
+                    rlist, wlist, xlist = select.select(read_set, write_set, [], self.timeout)
+                except select.error, e:
+                    if e.args[0] == errno.EINTR:
+                        continue
+                    raise
+
+                timed_out = not (rlist or wlist or xlist)
+                if timed_out:
+                    break
+
+                if self.stdin in wlist:
+                    # When select has indicated that the file is writable,
+                    # we can write up to PIPE_BUF bytes without risk
+                    # blocking.  POSIX defines PIPE_BUF >= 512
+                    chunk = input[input_offset:input_offset + 512]
+                    bytes_written = os.write(self.stdin.fileno(), chunk)
+                    input_offset += bytes_written
+                    if input_offset >= len(input):
+                        self.stdin.close()
+                        write_set.remove(self.stdin)
+
+                if self.stdout in rlist:
+                    data = os.read(self.stdout.fileno(), 1024)
+                    if data == "":
+                        self.stdout.close()
+                        read_set.remove(self.stdout)
+                    stdout.append(data)
+
+                if self.stderr in rlist:
+                    data = os.read(self.stderr.fileno(), 1024)
+                    if data == "":
+                        self.stderr.close()
+                        read_set.remove(self.stderr)
+                    stderr.append(data)
+
+            # All data exchanged.  Translate lists into strings.
+            if stdout is not None:
+                stdout = ''.join(stdout)
+            if stderr is not None:
+                stderr = ''.join(stderr)
+
+            # Translate newlines, if requested.  We cannot let the file
+            # object do the translation: It is based on stdio, which is
+            # impossible to combine with select (unless forcing no
+            # buffering).
+            if self.universal_newlines and hasattr(file, 'newlines'):
+                if stdout:
+                    stdout = self._translate_newlines(stdout)
+                if stderr:
+                    stderr = self._translate_newlines(stderr)
+
+            if timed_out:
+                self.kill()
+            else:
+                self.wait()
+            return (stdout, stderr)
+
+
+def exec_cmd(cmd, input=None, timeout=None):
+    p = Popen(cmd, shell=True,
+              close_fds=not subprocess.mswindows,
+              bufsize=1024,
+              stdin=subprocess.PIPE,
+              stdout=subprocess.PIPE,
+              stderr=subprocess.PIPE)
+    data, errors = p.communicate(input, timeout=timeout)
+    return data, errors, p.returncode
+
+
+if __name__ == '__main__':
+    print exec_cmd("python", "import time ; time.sleep(20) ; print 'never!' ;", timeout=10)
+    print exec_cmd("python", "import time ; time.sleep(20) ; print '20s gone' ;")
+