comparison MoinMoin/util/SubProcess.py @ 6111:1fdd537e9d83

SubProcess: reimplement exec_cmd subclassing Popen and overriding some methods isn't pretty. the code we have was written for py 2.4 or so and the py 2.7 Popen looked quite different. this way with the timer should be less problematic.
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Tue, 06 Sep 2016 04:39:28 +0200
parents 4e911b751b5b
children
comparison
equal deleted inserted replaced
6110:4e911b751b5b 6111:1fdd537e9d83
1 """ 1 """
2 Enhanced subprocess.Popen subclass, supporting .communicate() with timeout. 2 Execute a shell command with timeout.
3 3
4 Sample usage: 4 @copyright: 2016 Thomas Waldmann <tw@waldmann-edv.de>
5 out, err = Popen(...).communicate(input, timeout=300) 5 @license: GNU GPL, see COPYING for details.
6 """ 6 """
7 7
8 import os 8 import os
9 import signal
9 import subprocess 10 import subprocess
10 import threading 11 from threading import Timer
11
12 if not subprocess.mswindows:
13 import select
14 import errno
15 12
16 13
17 class Popen(subprocess.Popen): 14 def exec_cmd(cmd, stdin=None, timeout=None):
18 def communicate(self, input=None, timeout=None): 15 """
19 """Interact with process: Send data to stdin. Read data from 16 Execute a shell <cmd>, send <stdin> to it, kill it after <timeout> if it
20 stdout and stderr, until end-of-file is reached. Wait for 17 is still running. Return stdout, stderr, rc.
21 process to terminate. The optional input argument should be a 18 """
22 string to be sent to the child process, or None, if no data 19 def preexec_fn():
23 should be sent to the child. 20 if not subprocess.mswindows:
21 os.setsid() # start a new session
24 22
25 communicate() returns a tuple (stdout, stderr).""" 23 def kill_it(p):
24 if not subprocess.mswindows:
25 # kills all the processes of the session,
26 # includes the shell + process started by shell
27 os.killpg(p.pid, signal.SIGKILL)
28 else:
29 p.kill()
26 30
27 self.timeout = timeout 31 p = subprocess.Popen(cmd, shell=True,
28 32 close_fds=not subprocess.mswindows,
29 # Optimization: If we are only using one pipe, or no pipe at 33 bufsize=1024,
30 # all, using select() or threads is unnecessary. 34 preexec_fn=preexec_fn,
31 if [self.stdin, self.stdout, self.stderr].count(None) >= 2: 35 stdin=subprocess.PIPE,
32 stdout = None 36 stdout=subprocess.PIPE,
33 stderr = None 37 stderr=subprocess.PIPE)
34 if self.stdin: 38 if timeout is None:
35 if input: 39 stdout, stderr = p.communicate(stdin)
36 self._fo_write_no_intr(self.stdin, input) 40 else:
37 self.stdin.close() 41 timer = Timer(timeout, kill_it, [p, ])
38 elif self.stdout: 42 try:
39 stdout = self._fo_read_no_intr(self.stdout) 43 timer.start()
40 self.stdout.close() 44 stdout, stderr = p.communicate(stdin)
41 elif self.stderr: 45 finally:
42 stderr = self._fo_read_no_intr(self.stderr) 46 timer.cancel()
43 self.stderr.close() 47 return stdout, stderr, p.returncode
44 self.wait()
45 return (stdout, stderr)
46
47 return self._communicate(input)
48
49 if subprocess.mswindows:
50 def _communicate(self, input):
51 stdout = None # Return
52 stderr = None # Return
53
54 if self.stdout:
55 stdout = []
56 stdout_thread = threading.Thread(target=self._readerthread,
57 args=(self.stdout, stdout))
58 stdout_thread.setDaemon(True)
59 stdout_thread.start()
60 if self.stderr:
61 stderr = []
62 stderr_thread = threading.Thread(target=self._readerthread,
63 args=(self.stderr, stderr))
64 stderr_thread.setDaemon(True)
65 stderr_thread.start()
66
67 if self.stdin:
68 if input is not None:
69 self.stdin.write(input)
70 self.stdin.close()
71
72 if self.stdout:
73 stdout_thread.join(self.timeout)
74 if self.stderr:
75 stderr_thread.join(self.timeout)
76
77 # if the threads are still alive, that means the thread join timed out
78 timed_out = (self.stdout and stdout_thread.isAlive() or
79 self.stderr and stderr_thread.isAlive())
80 if timed_out:
81 self.kill()
82 else:
83 self.wait()
84
85 # All data exchanged. Translate lists into strings.
86 if stdout is not None:
87 stdout = stdout[0]
88 if stderr is not None:
89 stderr = stderr[0]
90
91 # Translate newlines, if requested. We cannot let the file
92 # object do the translation: It is based on stdio, which is
93 # impossible to combine with select (unless forcing no
94 # buffering).
95 if self.universal_newlines and hasattr(file, 'newlines'):
96 if stdout:
97 stdout = self._translate_newlines(stdout)
98 if stderr:
99 stderr = self._translate_newlines(stderr)
100
101 return (stdout, stderr)
102
103 else: # POSIX
104 def _communicate(self, input):
105 timed_out = False
106 read_set = []
107 write_set = []
108 stdout = None # Return
109 stderr = None # Return
110
111 if self.stdin:
112 # Flush stdio buffer. This might block, if the user has
113 # been writing to .stdin in an uncontrolled fashion.
114 self.stdin.flush()
115 if input:
116 write_set.append(self.stdin)
117 else:
118 self.stdin.close()
119 if self.stdout:
120 read_set.append(self.stdout)
121 stdout = []
122 if self.stderr:
123 read_set.append(self.stderr)
124 stderr = []
125
126 input_offset = 0
127 while read_set or write_set:
128 try:
129 rlist, wlist, xlist = select.select(read_set, write_set, [], self.timeout)
130 except select.error, e:
131 if e.args[0] == errno.EINTR:
132 continue
133 raise
134
135 timed_out = not (rlist or wlist or xlist)
136 if timed_out:
137 break
138
139 if self.stdin in wlist:
140 # When select has indicated that the file is writable,
141 # we can write up to PIPE_BUF bytes without risk
142 # blocking. POSIX defines PIPE_BUF >= 512
143 chunk = input[input_offset:input_offset + 512]
144 bytes_written = os.write(self.stdin.fileno(), chunk)
145 input_offset += bytes_written
146 if input_offset >= len(input):
147 self.stdin.close()
148 write_set.remove(self.stdin)
149
150 if self.stdout in rlist:
151 data = os.read(self.stdout.fileno(), 1024)
152 if data == "":
153 self.stdout.close()
154 read_set.remove(self.stdout)
155 stdout.append(data)
156
157 if self.stderr in rlist:
158 data = os.read(self.stderr.fileno(), 1024)
159 if data == "":
160 self.stderr.close()
161 read_set.remove(self.stderr)
162 stderr.append(data)
163
164 # All data exchanged. Translate lists into strings.
165 if stdout is not None:
166 stdout = ''.join(stdout)
167 if stderr is not None:
168 stderr = ''.join(stderr)
169
170 # Translate newlines, if requested. We cannot let the file
171 # object do the translation: It is based on stdio, which is
172 # impossible to combine with select (unless forcing no
173 # buffering).
174 if self.universal_newlines and hasattr(file, 'newlines'):
175 if stdout:
176 stdout = self._translate_newlines(stdout)
177 if stderr:
178 stderr = self._translate_newlines(stderr)
179
180 if timed_out:
181 self.kill()
182 else:
183 self.wait()
184
185 # make sure all files are closed:
186 for f in [self.stdin, self.stdout, self.stderr]:
187 try:
188 f.close()
189 except:
190 pass
191
192 return (stdout, stderr)
193
194
195 def exec_cmd(cmd, input=None, timeout=None):
196 p = Popen(cmd, shell=True,
197 close_fds=not subprocess.mswindows,
198 bufsize=1024,
199 stdin=subprocess.PIPE,
200 stdout=subprocess.PIPE,
201 stderr=subprocess.PIPE)
202 data, errors = p.communicate(input, timeout=timeout)
203 return data, errors, p.returncode
204 48
205 49
206 if __name__ == '__main__': 50 if __name__ == '__main__':
207 print exec_cmd("python", "import time ; time.sleep(20) ; print 'never!' ;", timeout=10) 51 # expected output:
52 # ('', '', -9) --> no stdout, stderr output, killed by SIGKILL (signal 9)
53 # ('20s gone\n', '', 0) --> some output on stdout, no stderr, rc = 0 (did not get killed)
54 print exec_cmd("python", "import time ; time.sleep(20) ; print 'timeout does not work!' ;", timeout=10)
208 print exec_cmd("python", "import time ; time.sleep(20) ; print '20s gone' ;") 55 print exec_cmd("python", "import time ; time.sleep(20) ; print '20s gone' ;")