changeset 5549:d18c06596cd5

update flup middleware to a current upstream repo checkout
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Sat, 20 Feb 2010 03:01:45 +0100
parents a42e6b2cd528
children 49f8dd576950 0dea6dbebafb
files MoinMoin/support/flup/NOTES.moin MoinMoin/support/flup/server/ajp.py MoinMoin/support/flup/server/ajp_base.py MoinMoin/support/flup/server/ajp_fork.py MoinMoin/support/flup/server/fcgi.py MoinMoin/support/flup/server/fcgi_base.py MoinMoin/support/flup/server/fcgi_fork.py MoinMoin/support/flup/server/paste_factory.py MoinMoin/support/flup/server/preforkserver.py MoinMoin/support/flup/server/scgi.py MoinMoin/support/flup/server/scgi_base.py MoinMoin/support/flup/server/scgi_fork.py MoinMoin/support/flup/server/threadpool.py
diffstat 13 files changed, 146 insertions(+), 29 deletions(-) [+]
line wrap: on
line diff
--- a/MoinMoin/support/flup/NOTES.moin	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/NOTES.moin	Sat Feb 20 03:01:45 2010 +0100
@@ -4,7 +4,7 @@
 
 The shipped version is available via mercurial checkout:
 
-hg clone -r 6ea1ffac1bcb http://hg.saddi.com/flup-server
+hg clone -r 0266cedb313b http://hg.saddi.com/flup-server
 # this is flup 1.0.2+, minimum requirement for moin is 1.0.2.
 
 Thanks to Allan Saddi <allan@saddi.com> for writing flup and to
--- a/MoinMoin/support/flup/server/ajp.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/ajp.py	Sat Feb 20 03:01:45 2010 +0100
@@ -142,8 +142,8 @@
         for key in ('jobClass', 'jobArgs'):
             if kw.has_key(key):
                 del kw[key]
-        ThreadedServer.__init__(self, jobClass=Connection, jobArgs=(self,),
-                                **kw)
+        ThreadedServer.__init__(self, jobClass=Connection,
+                                jobArgs=(self, None), **kw)
 
     def run(self):
         """
--- a/MoinMoin/support/flup/server/ajp_base.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/ajp_base.py	Sat Feb 20 03:01:45 2010 +0100
@@ -36,6 +36,7 @@
 import errno
 import datetime
 import time
+import traceback
 
 # Unfortunately, for now, threads are required.
 import thread
@@ -591,21 +592,31 @@
             data = data[toWrite:]
             bytesLeft -= toWrite
 
+class TimeoutException(Exception):
+    pass
+
 class Connection(object):
     """
     A single Connection with the server. Requests are not multiplexed over the
     same connection, so at any given time, the Connection is either
     waiting for a request, or processing a single request.
     """
-    def __init__(self, sock, addr, server):
+    def __init__(self, sock, addr, server, timeout):
         self.server = server
         self._sock = sock
         self._addr = addr
+        self._timeout = timeout
 
         self._request = None
 
         self.logger = logging.getLogger(LoggerName)
 
+    def timeout_handler(self, signum, frame):
+        self.logger.error('Timeout Exceeded')
+        self.logger.error("\n".join(traceback.format_stack(frame)))
+
+        raise TimeoutException
+
     def run(self):
         self.logger.debug('Connection starting up (%s:%d)',
                           self._addr[0], self._addr[1])
@@ -702,11 +713,21 @@
         if req.input.bytesAvailForAdd():
             self.processInput()
 
+        # If there is a timeout
+        if self._timeout:
+            old_alarm = signal.signal(signal.SIGALRM, self.timeout_handler)
+            signal.alarm(self._timeout)
+            
         # Run Request.
         req.run()
 
         self._request = None
 
+        # Restore old handler if timeout was given
+        if self._timeout:
+            signal.alarm(0)
+            signal.signal(signal.SIGALRM, old_alarm)
+
     def _shutdown(self, pkt):
         """Not sure what to do with this yet."""
         self.logger.info('Received shutdown request from server')
--- a/MoinMoin/support/flup/server/ajp_fork.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/ajp_fork.py	Sat Feb 20 03:01:45 2010 +0100
@@ -108,7 +108,7 @@
     """
     def __init__(self, application, scriptName='', environ=None,
                  bindAddress=('localhost', 8009), allowedServers=None,
-                 loggingLevel=logging.INFO, debug=False, **kw):
+                 loggingLevel=logging.INFO, debug=False, timeout=None, **kw):
         """
         scriptName is the initial portion of the URL path that "belongs"
         to your application. It is used to determine PATH_INFO (which doesn't
@@ -141,7 +141,8 @@
         for key in ('multithreaded', 'multiprocess', 'jobClass', 'jobArgs'):
             if kw.has_key(key):
                 del kw[key]
-        PreforkServer.__init__(self, jobClass=Connection, jobArgs=(self,), **kw)
+        PreforkServer.__init__(self, jobClass=Connection,
+                               jobArgs=(self, timeout), **kw)
 
     def run(self):
         """
--- a/MoinMoin/support/flup/server/fcgi.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/fcgi.py	Sat Feb 20 03:01:45 2010 +0100
@@ -93,7 +93,7 @@
             if kw.has_key(key):
                 del kw[key]
         ThreadedServer.__init__(self, jobClass=self._connectionClass,
-                                jobArgs=(self,), **kw)
+                                jobArgs=(self, None), **kw)
 
     def _isClientAllowed(self, addr):
         return self._web_server_addrs is None or \
--- a/MoinMoin/support/flup/server/fcgi_base.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/fcgi_base.py	Sat Feb 20 03:01:45 2010 +0100
@@ -533,6 +533,9 @@
         if self.paddingLength:
             self._sendall(sock, '\x00'*self.paddingLength)
             
+class TimeoutException(Exception):
+    pass
+
 class Request(object):
     """
     Represents a single FastCGI request.
@@ -542,8 +545,9 @@
     be called by your handler. However, server, params, stdin, stdout,
     stderr, and data are free for your handler's use.
     """
-    def __init__(self, conn, inputStreamClass):
+    def __init__(self, conn, inputStreamClass, timeout):
         self._conn = conn
+        self._timeout = timeout
 
         self.server = conn.server
         self.params = {}
@@ -552,8 +556,20 @@
         self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True)
         self.data = inputStreamClass(conn)
 
+    def timeout_handler(self, signum, frame):
+        self.stderr.write('Timeout Exceeded\n')
+        self.stderr.write("\n".join(traceback.format_stack(frame)))
+        self.stderr.flush()
+
+        raise TimeoutException
+
     def run(self):
         """Runs the handler, flushes the streams, and ends the request."""
+        # If there is a timeout
+        if self._timeout:
+            old_alarm = signal.signal(signal.SIGALRM, self.timeout_handler)
+            signal.alarm(self._timeout)
+            
         try:
             protocolStatus, appStatus = self.server.handler(self)
         except:
@@ -567,6 +583,11 @@
         if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' %
                              (protocolStatus, appStatus))
 
+        # Restore old handler if timeout was given
+        if self._timeout:
+            signal.alarm(0)
+            signal.signal(signal.SIGALRM, old_alarm)
+
         try:
             self._flush()
             self._end(appStatus, protocolStatus)
@@ -596,6 +617,7 @@
         self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity!
         self.stderr = sys.stderr
         self.data = StringIO.StringIO()
+        self._timeout = 0
         
     def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
         sys.exit(appStatus)
@@ -615,10 +637,11 @@
     _multiplexed = False
     _inputStreamClass = InputStream
 
-    def __init__(self, sock, addr, server):
+    def __init__(self, sock, addr, server, timeout):
         self._sock = sock
         self._addr = addr
         self.server = server
+        self._timeout = timeout
 
         # Active Requests for this Connection, mapped by request ID.
         self._requests = {}
@@ -739,7 +762,8 @@
         """Handle an FCGI_BEGIN_REQUEST from the web server."""
         role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData)
 
-        req = self.server.request_class(self, self._inputStreamClass)
+        req = self.server.request_class(self, self._inputStreamClass,
+                                        self._timeout)
         req.requestId, req.role, req.flags = inrec.requestId, role, flags
         req.aborted = False
 
@@ -807,8 +831,9 @@
     _multiplexed = True
     _inputStreamClass = MultiplexedInputStream
 
-    def __init__(self, sock, addr, server):
-        super(MultiplexedConnection, self).__init__(sock, addr, server)
+    def __init__(self, sock, addr, server, timeout):
+        super(MultiplexedConnection, self).__init__(sock, addr, server,
+                                                    timeout)
 
         # Used to arbitrate access to self._requests.
         lock = threading.RLock()
@@ -862,7 +887,10 @@
             self._lock.release()
 
     def _start_request(self, req):
-        thread.start_new_thread(req.run, ())
+        try:
+            thread.start_new_thread(req.run, ())
+        except thread.error, e:
+            self.end_request(req, 0L, FCGI_OVERLOADED, remove=True)
 
     def _do_params(self, inrec):
         self._lock.acquire()
--- a/MoinMoin/support/flup/server/fcgi_fork.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/fcgi_fork.py	Sat Feb 20 03:01:45 2010 +0100
@@ -70,7 +70,8 @@
     """
     def __init__(self, application, environ=None,
                  bindAddress=None, umask=None, multiplexed=False,
-                 debug=False, roles=(FCGI_RESPONDER,), forceCGI=False, **kw):
+                 debug=False, roles=(FCGI_RESPONDER,), forceCGI=False,
+                 timeout=None, **kw):
         """
         environ, if present, must be a dictionary-like object. Its
         contents will be copied into application's environ. Useful
@@ -99,7 +100,7 @@
             if kw.has_key(key):
                 del kw[key]
         PreforkServer.__init__(self, jobClass=self._connectionClass,
-                               jobArgs=(self,), **kw)
+                               jobArgs=(self, timeout), **kw)
 
         try:
             import resource
--- a/MoinMoin/support/flup/server/paste_factory.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/paste_factory.py	Sat Feb 20 03:01:45 2010 +0100
@@ -47,18 +47,29 @@
 
 def run_ajp_fork(wsgi_app, global_conf,
                  scriptName='', host='localhost', port='8009',
-                 allowedServers='127.0.0.1', debug=NoDefault):
+                 allowedServers='127.0.0.1', debug=NoDefault,
+                 minSpare=None, maxSpare=None,
+                 maxChildren=None, maxRequests=None):
     import flup.server.ajp_fork
     addr = (host, int(port))
     if debug is NoDefault:
         debug = global_conf.get('debug', False)
     debug = asbool(debug)
+    prefork_args = {}
+    if minSpare is not None:
+        prefork_args['minSpare'] = int(minSpare)
+    if maxSpare is not None:
+        prefork_args['maxSpare'] = int(maxSpare)
+    if maxChildren is not None:
+        prefork_args['maxChildren'] = int(maxChildren)
+    if maxRequests is not None:
+        prefork_args['maxRequests'] = int(maxRequests)
     s = flup.server.ajp_fork.WSGIServer(
         wsgi_app,
         scriptName=scriptName,
         bindAddress=addr,
         allowedServers=aslist(allowedServers),
-        debug=debug,
+        debug=debug, **prefork_args
         )
     s.run()
 
@@ -91,7 +102,9 @@
                   host=None, port=None,
                   socket=None, umask=None,
                   multiplexed=False,
-                  debug=NoDefault):
+                  debug=NoDefault,
+                  minSpare=None, maxSpare=None,
+                  maxChildren=None, maxRequests=None):
     import flup.server.fcgi_fork
     if socket:
         assert host is None and port is None
@@ -106,11 +119,21 @@
     if debug is NoDefault:
         debug = global_conf.get('debug', False)
     debug = asbool(debug)
+    prefork_args = {}
+    if minSpare is not None:
+        prefork_args['minSpare'] = int(minSpare)
+    if maxSpare is not None:
+        prefork_args['maxSpare'] = int(maxSpare)
+    if maxChildren is not None:
+        prefork_args['maxChildren'] = int(maxChildren)
+    if maxRequests is not None:
+        prefork_args['maxRequests'] = int(maxRequests)
     s = flup.server.fcgi_fork.WSGIServer(
         wsgi_app,
         bindAddress=sock, umask=umask,
         multiplexed=asbool(multiplexed),
-        debug=debug)
+        debug=debug, **prefork_args
+        )
     s.run()
 
 def run_scgi_thread(wsgi_app, global_conf,
@@ -134,18 +157,29 @@
 def run_scgi_fork(wsgi_app, global_conf,
                   scriptName=NoDefault, host='localhost', port='4000',
                   allowedServers='127.0.0.1',
-                  debug=NoDefault):
+                  debug=NoDefault,
+                  minSpare=None, maxSpare=None,
+                  maxChildren=None, maxRequests=None):
     import flup.server.scgi_fork
     addr = (host, int(port))
     if debug is NoDefault:
         debug = global_conf.get('debug', False)
     debug = asbool(debug)
+    prefork_args = {}
+    if minSpare is not None:
+        prefork_args['minSpare'] = int(minSpare)
+    if maxSpare is not None:
+        prefork_args['maxSpare'] = int(maxSpare)
+    if maxChildren is not None:
+        prefork_args['maxChildren'] = int(maxChildren)
+    if maxRequests is not None:
+        prefork_args['maxRequests'] = int(maxRequests)
     s = flup.server.scgi_fork.WSGIServer(
         wsgi_app,
         scriptName=scriptName,
         bindAddress=addr,
         allowedServers=aslist(allowedServers),
-        debug=debug,
+        debug=debug, **prefork_args
         )
     s.run()
 
--- a/MoinMoin/support/flup/server/preforkserver.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/preforkserver.py	Sat Feb 20 03:01:45 2010 +0100
@@ -146,7 +146,9 @@
                 # children that need to die.
                 timeout = 2
 
-            w = (time.time() > self._last_purge + 10) and self._children_to_purge or []
+            w = []
+            if (time.time() > self._last_purge + 10):
+                w = [x for x in self._children_to_purge if x.fileno() != -1]
             try:
                 r, w, e = select.select(r, w, [], timeout)
             except select.error, e:
@@ -263,6 +265,7 @@
             if self._children.has_key(pid):
                 del self._children[pid]
 
+        signal.alarm(0)
         signal.signal(signal.SIGALRM, oldSIGALRM)
 
         # Forcefully kill any remaining children.
@@ -287,6 +290,7 @@
             if self._children.has_key(pid): # Sanity check.
                 if self._children[pid]['file'] is not None:
                     self._children[pid]['file'].close()
+                    self._children[pid]['file'] = None
                 del self._children[pid]
 
     def _spawnChild(self, sock):
--- a/MoinMoin/support/flup/server/scgi.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/scgi.py	Sat Feb 20 03:01:45 2010 +0100
@@ -138,8 +138,8 @@
         for key in ('jobClass', 'jobArgs'):
             if kw.has_key(key):
                 del kw[key]
-        ThreadedServer.__init__(self, jobClass=Connection, jobArgs=(self,),
-                                **kw)
+        ThreadedServer.__init__(self, jobClass=Connection,
+                                jobArgs=(self, None), **kw)
 
     def run(self):
         """
--- a/MoinMoin/support/flup/server/scgi_base.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/scgi_base.py	Sat Feb 20 03:01:45 2010 +0100
@@ -37,6 +37,7 @@
 import datetime
 import os
 import warnings
+import traceback
 
 # Threads are required. If you want a non-threaded (forking) version, look at
 # SWAP <http://www.idyll.org/~t/www-tools/wsgi/>.
@@ -197,18 +198,28 @@
                           handlerTime.seconds +
                           handlerTime.microseconds / 1000000.0)
 
+class TimeoutException(Exception):
+    pass
+
 class Connection(object):
     """
     Represents a single client (web server) connection. A single request
     is handled, after which the socket is closed.
     """
-    def __init__(self, sock, addr, server):
+    def __init__(self, sock, addr, server, timeout):
         self._sock = sock
         self._addr = addr
         self.server = server
+        self._timeout = timeout
 
         self.logger = logging.getLogger(LoggerName)
 
+    def timeout_handler(self, signum, frame):
+        self.logger.error('Timeout Exceeded')
+        self.logger.error("\n".join(traceback.format_stack(frame)))
+
+        raise TimeoutException
+
     def run(self):
         if len(self._addr) == 2:
             self.logger.debug('Connection starting up (%s:%d)',
@@ -263,12 +274,23 @@
         # Allocate Request
         req = Request(self, environ, input, output)
 
+        # If there is a timeout
+        if self._timeout:
+            old_alarm = signal.signal(signal.SIGALRM, self.timeout_handler)
+            signal.alarm(self._timeout)
+            
         # Run it.
         req.run()
 
         output.close()
         input.close()
 
+        # Restore old handler if timeout was given
+        if self._timeout:
+            signal.alarm(0)
+            signal.signal(signal.SIGALRM, old_alarm)
+
+
 class BaseSCGIServer(object):
     # What Request class to use.
     requestClass = Request
--- a/MoinMoin/support/flup/server/scgi_fork.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/scgi_fork.py	Sat Feb 20 03:01:45 2010 +0100
@@ -97,7 +97,7 @@
     def __init__(self, application, scriptName=NoDefault, environ=None,
                  bindAddress=('localhost', 4000), umask=None,
                  allowedServers=None,
-                 loggingLevel=logging.INFO, debug=False, **kw):
+                 loggingLevel=logging.INFO, debug=False, timeout=None, **kw):
         """
         scriptName is the initial portion of the URL path that "belongs"
         to your application. It is used to determine PATH_INFO (which doesn't
@@ -137,7 +137,9 @@
         for key in ('multithreaded', 'multiprocess', 'jobClass', 'jobArgs'):
             if kw.has_key(key):
                 del kw[key]
-        PreforkServer.__init__(self, jobClass=Connection, jobArgs=(self,), **kw)
+        
+        PreforkServer.__init__(self, jobClass=Connection,
+                               jobArgs=(self, timeout), **kw)
 
     def run(self):
         """
--- a/MoinMoin/support/flup/server/threadpool.py	Sat Feb 20 02:53:25 2010 +0100
+++ b/MoinMoin/support/flup/server/threadpool.py	Sat Feb 20 03:01:45 2010 +0100
@@ -68,7 +68,7 @@
         self._lock.release()
 
         # wait for all threads to finish
-        for t in self._threads:
+        for t in self._threads[:]:
             t.join()
 
     def addJob(self, job, allowQueuing=True):
@@ -89,9 +89,12 @@
             # Maintain minimum number of spares.
             while self._idleCount < self._minSpare and \
                   self._workerCount < self._maxThreads:
+                try:
+                    self._start_new_thread()
+                except thread.error:
+                    return False
                 self._workerCount += 1
                 self._idleCount += 1
-                self._start_new_thread()
 
             # Hand off the job.
             if self._idleCount or allowQueuing:
@@ -140,6 +143,7 @@
 
             # Die off...
             assert self._workerCount > self._maxSpare
+            self._threads.remove(threading.currentThread())
             self._workerCount -= 1
         finally:
             self._lock.release()