diff MoinMoin/support/werkzeug/test.py @ 6094:9f12f41504fc

upgrade werkzeug from 0.8.3 to 0.11.11 no other changes, does not work like this, see next commit.
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Mon, 05 Sep 2016 23:25:59 +0200
parents 8de563c487be
children 7f12cf241d5e
line wrap: on
line diff
--- a/MoinMoin/support/werkzeug/test.py	Fri Jan 09 20:17:10 2015 +0100
+++ b/MoinMoin/support/werkzeug/test.py	Mon Sep 05 23:25:59 2016 +0200
@@ -5,27 +5,37 @@
 
     This module implements a client to WSGI applications for testing.
 
-    :copyright: (c) 2011 by the Werkzeug Team, see AUTHORS for more details.
+    :copyright: (c) 2014 by the Werkzeug Team, see AUTHORS for more details.
     :license: BSD, see LICENSE for more details.
 """
 import sys
-import urlparse
 import mimetypes
 from time import time
 from random import random
 from itertools import chain
 from tempfile import TemporaryFile
-from cStringIO import StringIO
-from cookielib import CookieJar
-from urllib2 import Request as U2Request
+from io import BytesIO
 
+try:
+    from urllib2 import Request as U2Request
+except ImportError:
+    from urllib.request import Request as U2Request
+try:
+    from http.cookiejar import CookieJar
+except ImportError:  # Py2
+    from cookielib import CookieJar
+
+from werkzeug._compat import iterlists, iteritems, itervalues, to_bytes, \
+    string_types, text_type, reraise, wsgi_encoding_dance, \
+    make_literal_wrapper
 from werkzeug._internal import _empty_stream, _get_environ
 from werkzeug.wrappers import BaseRequest
-from werkzeug.urls import url_encode, url_fix, iri_to_uri, _unquote
+from werkzeug.urls import url_encode, url_fix, iri_to_uri, url_unquote, \
+    url_unparse, url_parse
 from werkzeug.wsgi import get_host, get_current_url, ClosingIterator
 from werkzeug.utils import dump_cookie
 from werkzeug.datastructures import FileMultiDict, MultiDict, \
-     CombinedMultiDict, Headers, FileStorage
+    CombinedMultiDict, Headers, FileStorage
 
 
 def stream_encode_multipart(values, use_tempfile=True, threshold=1024 * 500,
@@ -36,10 +46,10 @@
     """
     if boundary is None:
         boundary = '---------------WerkzeugFormPart_%s%s' % (time(), random())
-    _closure = [StringIO(), 0, False]
+    _closure = [BytesIO(), 0, False]
 
     if use_tempfile:
-        def write(string):
+        def write_binary(string):
             stream, total_length, on_disk = _closure
             if on_disk:
                 stream.write(string)
@@ -55,12 +65,15 @@
                     _closure[2] = True
                 _closure[1] = total_length + length
     else:
-        write = _closure[0].write
+        write_binary = _closure[0].write
+
+    def write(string):
+        write_binary(string.encode(charset))
 
     if not isinstance(values, MultiDict):
         values = MultiDict(values)
 
-    for key, values in values.iterlists():
+    for key, values in iterlists(values):
         for value in values:
             write('--%s\r\nContent-Disposition: form-data; name="%s"' %
                   (boundary, key))
@@ -82,11 +95,14 @@
                     chunk = reader(16384)
                     if not chunk:
                         break
-                    write(chunk)
+                    write_binary(chunk)
             else:
-                if isinstance(value, unicode):
-                    value = value.encode(charset)
-                write('\r\n\r\n' + str(value))
+                if not isinstance(value, string_types):
+                    value = str(value)
+
+                value = to_bytes(value, charset)
+                write('\r\n\r\n')
+                write_binary(value)
             write('\r\n')
     write('--%s--\r\n' % boundary)
 
@@ -113,6 +129,7 @@
 
 
 class _TestCookieHeaders(object):
+
     """A headers adapter for cookielib
     """
 
@@ -127,8 +144,16 @@
                 headers.append(v)
         return headers
 
+    def get_all(self, name, default=None):
+        rv = []
+        for k, v in self.headers:
+            if k.lower() == name.lower():
+                rv.append(v)
+        return rv or default or []
+
 
 class _TestCookieResponse(object):
+
     """Something that looks like a httplib.HTTPResponse, but is actually just an
     adapter for our test responses to make them available for cookielib.
     """
@@ -141,6 +166,7 @@
 
 
 class _TestCookieJar(CookieJar):
+
     """A cookielib.CookieJar modified to inject and read cookie headers from
     and to wsgi environments, and wsgi application responses.
     """
@@ -171,11 +197,11 @@
     :class:`EnvironBuilder`.
     """
     if isinstance(data, MultiDict):
-        for key, values in data.iterlists():
+        for key, values in iterlists(data):
             for value in values:
                 yield key, value
     else:
-        for key, values in data.iteritems():
+        for key, values in iteritems(data):
             if isinstance(values, list):
                 for value in values:
                     yield key, value
@@ -184,6 +210,7 @@
 
 
 class EnvironBuilder(object):
+
     """This class can be used to conveniently create a WSGI environment
     for testing purposes.  It can be used to quickly create WSGI environments
     or request objects from arbitrary data.
@@ -259,19 +286,15 @@
                  content_length=None, errors_stream=None, multithread=False,
                  multiprocess=False, run_once=False, headers=None, data=None,
                  environ_base=None, environ_overrides=None, charset='utf-8'):
-        if query_string is None and '?' in path:
-            path, query_string = path.split('?', 1)
+        path_s = make_literal_wrapper(path)
+        if query_string is None and path_s('?') in path:
+            path, query_string = path.split(path_s('?'), 1)
         self.charset = charset
-        if isinstance(path, unicode):
-            path = iri_to_uri(path, charset)
-        self.path = path
+        self.path = iri_to_uri(path)
         if base_url is not None:
-            if isinstance(base_url, unicode):
-                base_url = iri_to_uri(base_url, charset)
-            else:
-                base_url = url_fix(base_url, charset)
+            base_url = url_fix(iri_to_uri(base_url, charset), charset)
         self.base_url = base_url
-        if isinstance(query_string, basestring):
+        if isinstance(query_string, (bytes, text_type)):
             self.query_string = query_string
         else:
             if query_string is None:
@@ -285,7 +308,8 @@
         elif not isinstance(headers, Headers):
             headers = Headers(headers)
         self.headers = headers
-        self.content_type = content_type
+        if content_type is not None:
+            self.content_type = content_type
         if errors_stream is None:
             errors_stream = sys.stderr
         self.errors_stream = errors_stream
@@ -301,8 +325,10 @@
         if data:
             if input_stream is not None:
                 raise TypeError('can\'t provide input stream and data')
-            if isinstance(data, basestring):
-                self.input_stream = StringIO(data)
+            if isinstance(data, text_type):
+                data = data.encode(self.charset)
+            if isinstance(data, bytes):
+                self.input_stream = BytesIO(data)
                 if self.content_length is None:
                     self.content_length = len(data)
             else:
@@ -331,17 +357,16 @@
             self.files.add_file(key, value)
 
     def _get_base_url(self):
-        return urlparse.urlunsplit((self.url_scheme, self.host,
-                                    self.script_root, '', '')).rstrip('/') + '/'
+        return url_unparse((self.url_scheme, self.host,
+                            self.script_root, '', '')).rstrip('/') + '/'
 
     def _set_base_url(self, value):
         if value is None:
             scheme = 'http'
             netloc = 'localhost'
-            scheme = 'http'
             script_root = ''
         else:
-            scheme, netloc, script_root, qs, anchor = urlparse.urlsplit(value)
+            scheme, netloc, script_root, qs, anchor = url_parse(value)
             if qs or anchor:
                 raise ValueError('base url must not contain a query string '
                                  'or fragment')
@@ -358,9 +383,9 @@
     def _get_content_type(self):
         ct = self.headers.get('Content-Type')
         if ct is None and not self._input_stream:
-            if self.method in ('POST', 'PUT', 'PATCH'):
-                if self._files:
-                    return 'multipart/form-data'
+            if self._files:
+                return 'multipart/form-data'
+            elif self._form:
                 return 'application/x-www-form-urlencoded'
             return None
         return ct
@@ -394,6 +419,7 @@
 
     def form_property(name, storage, doc):
         key = '_' + name
+
         def getter(self):
             if self._input_stream is not None:
                 raise AttributeError('an input stream is defined')
@@ -401,7 +427,9 @@
             if rv is None:
                 rv = storage()
                 setattr(self, key, rv)
+
             return rv
+
         def setter(self, value):
             self._input_stream = None
             setattr(self, key, value)
@@ -474,7 +502,10 @@
         return 80
 
     def __del__(self):
-        self.close()
+        try:
+            self.close()
+        except Exception:
+            pass
 
     def close(self):
         """Closes all files.  If you put real :class:`file` objects into the
@@ -484,13 +515,13 @@
         if self.closed:
             return
         try:
-            files = self.files.itervalues()
+            files = itervalues(self.files)
         except AttributeError:
             files = ()
         for f in files:
             try:
                 f.close()
-            except Exception, e:
+            except Exception:
                 pass
         self.closed = True
 
@@ -512,9 +543,11 @@
                 stream_encode_multipart(values, charset=self.charset)
             content_type += '; boundary="%s"' % boundary
         elif content_type == 'application/x-www-form-urlencoded':
+            # XXX: py2v3 review
             values = url_encode(self.form, charset=self.charset)
+            values = values.encode('ascii')
             content_length = len(values)
-            input_stream = StringIO(values)
+            input_stream = BytesIO(values)
         else:
             input_stream = _empty_stream
 
@@ -523,15 +556,15 @@
             result.update(self.environ_base)
 
         def _path_encode(x):
-            if isinstance(x, unicode):
-                x = x.encode(self.charset)
-            return _unquote(x)
+            return wsgi_encoding_dance(url_unquote(x, self.charset), self.charset)
+
+        qs = wsgi_encoding_dance(self.query_string)
 
         result.update({
             'REQUEST_METHOD':       self.method,
             'SCRIPT_NAME':          _path_encode(self.script_root),
             'PATH_INFO':            _path_encode(self.path),
-            'QUERY_STRING':         self.query_string,
+            'QUERY_STRING':         qs,
             'SERVER_NAME':          self.server_name,
             'SERVER_PORT':          str(self.server_port),
             'HTTP_HOST':            self.host,
@@ -546,7 +579,7 @@
             'wsgi.multiprocess':    self.multiprocess,
             'wsgi.run_once':        self.run_once
         })
-        for key, value in self.headers.to_list(self.charset):
+        for key, value in self.headers.to_wsgi_list():
             result['HTTP_%s' % key.upper().replace('-', '_')] = value
         if self.environ_overrides:
             result.update(self.environ_overrides)
@@ -564,6 +597,7 @@
 
 
 class ClientRedirectError(Exception):
+
     """
     If a redirect loop is detected when using follow_redirects=True with
     the :cls:`Client`, then this exception is raised.
@@ -571,6 +605,7 @@
 
 
 class Client(object):
+
     """This class allows to send requests to a wrapped application.
 
     The response wrapper can be a class or factory function that takes
@@ -600,14 +635,11 @@
     def __init__(self, application, response_wrapper=None, use_cookies=True,
                  allow_subdomain_redirects=False):
         self.application = application
-        if response_wrapper is None:
-            response_wrapper = lambda a, s, h: (a, s, h)
         self.response_wrapper = response_wrapper
         if use_cookies:
             self.cookie_jar = _TestCookieJar()
         else:
             self.cookie_jar = None
-        self.redirect_client = None
         self.allow_subdomain_redirects = allow_subdomain_redirects
 
     def set_cookie(self, server_name, key, value='', max_age=None,
@@ -629,6 +661,52 @@
         self.set_cookie(server_name, key, expires=0, max_age=0,
                         path=path, domain=domain)
 
+    def run_wsgi_app(self, environ, buffered=False):
+        """Runs the wrapped WSGI app with the given environment."""
+        if self.cookie_jar is not None:
+            self.cookie_jar.inject_wsgi(environ)
+        rv = run_wsgi_app(self.application, environ, buffered=buffered)
+        if self.cookie_jar is not None:
+            self.cookie_jar.extract_wsgi(environ, rv[2])
+        return rv
+
+    def resolve_redirect(self, response, new_location, environ, buffered=False):
+        """Resolves a single redirect and triggers the request again
+        directly on this redirect client.
+        """
+        scheme, netloc, script_root, qs, anchor = url_parse(new_location)
+        base_url = url_unparse((scheme, netloc, '', '', '')).rstrip('/') + '/'
+
+        cur_server_name = netloc.split(':', 1)[0].split('.')
+        real_server_name = get_host(environ).rsplit(':', 1)[0].split('.')
+
+        if self.allow_subdomain_redirects:
+            allowed = cur_server_name[-len(real_server_name):] == real_server_name
+        else:
+            allowed = cur_server_name == real_server_name
+
+        if not allowed:
+            raise RuntimeError('%r does not support redirect to '
+                               'external targets' % self.__class__)
+
+        status_code = int(response[1].split(None, 1)[0])
+        if status_code == 307:
+            method = environ['REQUEST_METHOD']
+        else:
+            method = 'GET'
+
+        # For redirect handling we temporarily disable the response
+        # wrapper.  This is not threadsafe but not a real concern
+        # since the test client must not be shared anyways.
+        old_response_wrapper = self.response_wrapper
+        self.response_wrapper = None
+        try:
+            return self.open(path=script_root, base_url=base_url,
+                             query_string=qs, as_tuple=True,
+                             buffered=buffered, method=method)
+        finally:
+            self.response_wrapper = old_response_wrapper
+
     def open(self, *args, **kwargs):
         """Takes the same arguments as the :class:`EnvironBuilder` class with
         some additions:  You can provide a :class:`EnvironBuilder` or a WSGI
@@ -670,61 +748,26 @@
             finally:
                 builder.close()
 
-        if self.cookie_jar is not None:
-            self.cookie_jar.inject_wsgi(environ)
-        rv = run_wsgi_app(self.application, environ, buffered=buffered)
-        if self.cookie_jar is not None:
-            self.cookie_jar.extract_wsgi(environ, rv[2])
+        response = self.run_wsgi_app(environ, buffered=buffered)
 
         # handle redirects
         redirect_chain = []
-        status_code = int(rv[1].split(None, 1)[0])
-        while status_code in (301, 302, 303, 305, 307) and follow_redirects:
-            if not self.redirect_client:
-                # assume that we're not using the user defined response wrapper
-                # so that we don't need any ugly hacks to get the status
-                # code from the response.
-                self.redirect_client = Client(self.application)
-                self.redirect_client.cookie_jar = self.cookie_jar
-
-            redirect = dict(rv[2])['Location']
-
-            scheme, netloc, script_root, qs, anchor = urlparse.urlsplit(redirect)
-            base_url = urlparse.urlunsplit((scheme, netloc, '', '', '')).rstrip('/') + '/'
-
-            cur_server_name = netloc.split(':', 1)[0].split('.')
-            real_server_name = get_host(environ).split(':', 1)[0].split('.')
-
-            if self.allow_subdomain_redirects:
-                allowed = cur_server_name[-len(real_server_name):] == real_server_name
-            else:
-                allowed = cur_server_name == real_server_name
+        while 1:
+            status_code = int(response[1].split(None, 1)[0])
+            if status_code not in (301, 302, 303, 305, 307) \
+               or not follow_redirects:
+                break
+            new_location = response[2]['location']
+            new_redirect_entry = (new_location, status_code)
+            if new_redirect_entry in redirect_chain:
+                raise ClientRedirectError('loop detected')
+            redirect_chain.append(new_redirect_entry)
+            environ, response = self.resolve_redirect(response, new_location,
+                                                      environ,
+                                                      buffered=buffered)
 
-            if not allowed:
-                raise RuntimeError('%r does not support redirect to '
-                                   'external targets' % self.__class__)
-
-            redirect_chain.append((redirect, status_code))
-
-            # the redirect request should be a new request, and not be based on
-            # the old request
-
-            redirect_kwargs = {
-                'path':             script_root,
-                'base_url':         base_url,
-                'query_string':     qs,
-                'as_tuple':         True,
-                'buffered':         buffered,
-                'follow_redirects': False,
-            }
-            environ, rv = self.redirect_client.open(**redirect_kwargs)
-            status_code = int(rv[1].split(None, 1)[0])
-
-            # Prevent loops
-            if redirect_chain[-1] in redirect_chain[:-1]:
-                raise ClientRedirectError("loop detected")
-
-        response = self.response_wrapper(*rv)
+        if self.response_wrapper is not None:
+            response = self.response_wrapper(*response)
         if as_tuple:
             return environ, response
         return response
@@ -759,6 +802,16 @@
         kw['method'] = 'DELETE'
         return self.open(*args, **kw)
 
+    def options(self, *args, **kw):
+        """Like open but method is enforced to OPTIONS."""
+        kw['method'] = 'OPTIONS'
+        return self.open(*args, **kw)
+
+    def trace(self, *args, **kw):
+        """Like open but method is enforced to TRACE."""
+        kw['method'] = 'TRACE'
+        return self.open(*args, **kw)
+
     def __repr__(self):
         return '<%s %r>' % (
             self.__class__.__name__,
@@ -811,33 +864,33 @@
 
     def start_response(status, headers, exc_info=None):
         if exc_info is not None:
-            raise exc_info[0], exc_info[1], exc_info[2]
+            reraise(*exc_info)
         response[:] = [status, headers]
         return buffer.append
 
-    app_iter = app(environ, start_response)
+    app_rv = app(environ, start_response)
+    close_func = getattr(app_rv, 'close', None)
+    app_iter = iter(app_rv)
 
     # when buffering we emit the close call early and convert the
     # application iterator into a regular list
     if buffered:
-        close_func = getattr(app_iter, 'close', None)
         try:
             app_iter = list(app_iter)
         finally:
             if close_func is not None:
                 close_func()
 
-    # otherwise we iterate the application iter until we have
-    # a response, chain the already received data with the already
-    # collected data and wrap it in a new `ClosingIterator` if
-    # we have a close callable.
+    # otherwise we iterate the application iter until we have a response, chain
+    # the already received data with the already collected data and wrap it in
+    # a new `ClosingIterator` if we need to restore a `close` callable from the
+    # original return value.
     else:
         while not response:
-            buffer.append(app_iter.next())
+            buffer.append(next(app_iter))
         if buffer:
-            close_func = getattr(app_iter, 'close', None)
             app_iter = chain(buffer, app_iter)
-            if close_func is not None:
-                app_iter = ClosingIterator(app_iter, close_func)
+        if close_func is not None and app_iter is not app_rv:
+            app_iter = ClosingIterator(app_iter, close_func)
 
-    return app_iter, response[0], response[1]
+    return app_iter, response[0], Headers(response[1])