diff MoinMoin/support/werkzeug/test.py @ 5801:8de563c487be

upgrade werkzeug to 0.8.1, document current bundled version and current minimum requirement (0.6, for py 2.7 compatibility)
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Thu, 01 Dec 2011 01:34:45 +0100
parents 7cb92118a93e
children 9f12f41504fc
line wrap: on
line diff
--- a/MoinMoin/support/werkzeug/test.py	Sat Mar 12 23:48:11 2011 +0100
+++ b/MoinMoin/support/werkzeug/test.py	Thu Dec 01 01:34:45 2011 +0100
@@ -5,7 +5,7 @@
 
     This module implements a client to WSGI applications for testing.
 
-    :copyright: (c) 2009 by the Werkzeug Team, see AUTHORS for more details.
+    :copyright: (c) 2011 by the Werkzeug Team, see AUTHORS for more details.
     :license: BSD, see LICENSE for more details.
 """
 import sys
@@ -19,12 +19,13 @@
 from cookielib import CookieJar
 from urllib2 import Request as U2Request
 
-from werkzeug._internal import _empty_stream
+from werkzeug._internal import _empty_stream, _get_environ
 from werkzeug.wrappers import BaseRequest
-from werkzeug.utils import create_environ, run_wsgi_app, get_current_url, \
-     url_encode, url_decode, FileStorage, get_host
+from werkzeug.urls import url_encode, url_fix, iri_to_uri, _unquote
+from werkzeug.wsgi import get_host, get_current_url, ClosingIterator
+from werkzeug.utils import dump_cookie
 from werkzeug.datastructures import FileMultiDict, MultiDict, \
-     CombinedMultiDict, Headers
+     CombinedMultiDict, Headers, FileStorage
 
 
 def stream_encode_multipart(values, use_tempfile=True, threshold=1024 * 500,
@@ -85,7 +86,7 @@
             else:
                 if isinstance(value, unicode):
                     value = value.encode(charset)
-                write('\r\n\r\n' + value)
+                write('\r\n\r\n' + str(value))
             write('\r\n')
     write('--%s--\r\n' % boundary)
 
@@ -152,7 +153,7 @@
         for cookie in self:
             cvals.append('%s=%s' % (cookie.name, cookie.value))
         if cvals:
-            environ['HTTP_COOKIE'] = ','.join(cvals)
+            environ['HTTP_COOKIE'] = '; '.join(cvals)
 
     def extract_wsgi(self, environ, headers):
         """Extract the server's set-cookie headers as cookies into the
@@ -174,8 +175,12 @@
             for value in values:
                 yield key, value
     else:
-        for item in data.iteritems():
-            yield item
+        for key, values in data.iteritems():
+            if isinstance(values, list):
+                for value in values:
+                    yield key, value
+            else:
+                yield key, values
 
 
 class EnvironBuilder(object):
@@ -198,13 +203,17 @@
         the :attr:`content_length` is set and you have to provide a
         :attr:`content_type`.
     -   a `dict`: If it's a dict the keys have to be strings and the values
-        and of the following objects:
+        any of the following objects:
 
         -   a :class:`file`-like object.  These are converted into
             :class:`FileStorage` objects automatically.
         -   a tuple.  The :meth:`~FileMultiDict.add_file` method is called
             with the tuple items as positional arguments.
 
+    .. versionadded:: 0.6
+       `path` and `base_url` can now be unicode strings that are encoded using
+       the :func:`iri_to_uri` function.
+
     :param path: the path of the request.  In the WSGI environment this will
                  end up as `PATH_INFO`.  If the `query_string` is not defined
                  and there is a question mark in the `path` everything after
@@ -253,7 +262,14 @@
         if query_string is None and '?' in path:
             path, query_string = path.split('?', 1)
         self.charset = charset
+        if isinstance(path, unicode):
+            path = iri_to_uri(path, charset)
         self.path = path
+        if base_url is not None:
+            if isinstance(base_url, unicode):
+                base_url = iri_to_uri(base_url, charset)
+            else:
+                base_url = url_fix(base_url, charset)
         self.base_url = base_url
         if isinstance(query_string, basestring):
             self.query_string = query_string
@@ -295,7 +311,7 @@
                        hasattr(value, 'read'):
                         self._add_file_from_data(key, value)
                     else:
-                        self.form[key] = value
+                        self.form.setlistdefault(key).append(value)
 
     def _add_file_from_data(self, key, value):
         """Called in the EnvironBuilder to add files from the data dict."""
@@ -305,8 +321,7 @@
             from warnings import warn
             warn(DeprecationWarning('it\'s no longer possible to pass dicts '
                                     'as `data`.  Use tuples or FileStorage '
-                                    'objects intead'), stacklevel=2)
-            args = v
+                                    'objects instead'), stacklevel=2)
             value = dict(value)
             mimetype = value.pop('mimetype', None)
             if mimetype is not None:
@@ -343,7 +358,7 @@
     def _get_content_type(self):
         ct = self.headers.get('Content-Type')
         if ct is None and not self._input_stream:
-            if self.method in ('POST', 'PUT'):
+            if self.method in ('POST', 'PUT', 'PATCH'):
                 if self._files:
                     return 'multipart/form-data'
                 return 'application/x-www-form-urlencoded'
@@ -507,15 +522,15 @@
         if self.environ_base:
             result.update(self.environ_base)
 
-        def _encode(x):
+        def _path_encode(x):
             if isinstance(x, unicode):
-                return x.encode(self.charset)
-            return x
+                x = x.encode(self.charset)
+            return _unquote(x)
 
         result.update({
             'REQUEST_METHOD':       self.method,
-            'SCRIPT_NAME':          _encode(self.script_root),
-            'PATH_INFO':            _encode(self.path),
+            'SCRIPT_NAME':          _path_encode(self.script_root),
+            'PATH_INFO':            _path_encode(self.path),
             'QUERY_STRING':         self.query_string,
             'SERVER_NAME':          self.server_name,
             'SERVER_PORT':          str(self.server_port),
@@ -548,6 +563,13 @@
         return cls(self.get_environ())
 
 
+class ClientRedirectError(Exception):
+    """
+    If a redirect loop is detected when using follow_redirects=True with
+    the :cls:`Client`, then this exception is raised.
+    """
+
+
 class Client(object):
     """This class allows to send requests to a wrapped application.
 
@@ -566,12 +588,17 @@
     sent for subsequent requests. This is True by default, but passing False
     will disable this behaviour.
 
+    If you want to request some subdomain of your application you may set
+    `allow_subdomain_redirects` to `True` as if not no external redirects
+    are allowed.
+
     .. versionadded:: 0.5
        `use_cookies` is new in this version.  Older versions did not provide
        builtin cookie support.
     """
 
-    def __init__(self, application, response_wrapper=None, use_cookies=True):
+    def __init__(self, application, response_wrapper=None, use_cookies=True,
+                 allow_subdomain_redirects=False):
         self.application = application
         if response_wrapper is None:
             response_wrapper = lambda a, s, h: (a, s, h)
@@ -580,6 +607,27 @@
             self.cookie_jar = _TestCookieJar()
         else:
             self.cookie_jar = None
+        self.redirect_client = None
+        self.allow_subdomain_redirects = allow_subdomain_redirects
+
+    def set_cookie(self, server_name, key, value='', max_age=None,
+                   expires=None, path='/', domain=None, secure=None,
+                   httponly=False, charset='utf-8'):
+        """Sets a cookie in the client's cookie jar.  The server name
+        is required and has to match the one that is also passed to
+        the open call.
+        """
+        assert self.cookie_jar is not None, 'cookies disabled'
+        header = dump_cookie(key, value, max_age, expires, path, domain,
+                             secure, httponly, charset)
+        environ = create_environ(path, base_url='http://' + server_name)
+        headers = [('Set-Cookie', header)]
+        self.cookie_jar.extract_wsgi(environ, headers)
+
+    def delete_cookie(self, server_name, key, path='/', domain=None):
+        """Deletes a cookie in the test client."""
+        self.set_cookie(server_name, key, expires=0, max_age=0,
+                        path=path, domain=domain)
 
     def open(self, *args, **kwargs):
         """Takes the same arguments as the :class:`EnvironBuilder` class with
@@ -600,7 +648,7 @@
         Additional parameters:
 
         :param as_tuple: Returns a tuple in the form ``(environ, result)``
-        :param buffered: Set this to true to buffer the application run.
+        :param buffered: Set this to True to buffer the application run.
                          This will automatically close the application for
                          you as well.
         :param follow_redirects: Set this to True if the `Client` should
@@ -632,29 +680,49 @@
         redirect_chain = []
         status_code = int(rv[1].split(None, 1)[0])
         while status_code in (301, 302, 303, 305, 307) and follow_redirects:
+            if not self.redirect_client:
+                # assume that we're not using the user defined response wrapper
+                # so that we don't need any ugly hacks to get the status
+                # code from the response.
+                self.redirect_client = Client(self.application)
+                self.redirect_client.cookie_jar = self.cookie_jar
+
             redirect = dict(rv[2])['Location']
-            host = get_host(create_environ('/', redirect))
-            if get_host(environ).split(':', 1)[0] != host:
+
+            scheme, netloc, script_root, qs, anchor = urlparse.urlsplit(redirect)
+            base_url = urlparse.urlunsplit((scheme, netloc, '', '', '')).rstrip('/') + '/'
+
+            cur_server_name = netloc.split(':', 1)[0].split('.')
+            real_server_name = get_host(environ).split(':', 1)[0].split('.')
+
+            if self.allow_subdomain_redirects:
+                allowed = cur_server_name[-len(real_server_name):] == real_server_name
+            else:
+                allowed = cur_server_name == real_server_name
+
+            if not allowed:
                 raise RuntimeError('%r does not support redirect to '
                                    'external targets' % self.__class__)
 
-            scheme, netloc, script_root, qs, anchor = urlparse.urlsplit(redirect)
             redirect_chain.append((redirect, status_code))
 
-            kwargs.update({
-                'base_url':         urlparse.urlunsplit((scheme, host,
-                                    script_root, '', '')).rstrip('/') + '/',
+            # the redirect request should be a new request, and not be based on
+            # the old request
+
+            redirect_kwargs = {
+                'path':             script_root,
+                'base_url':         base_url,
                 'query_string':     qs,
-                'as_tuple':         as_tuple,
+                'as_tuple':         True,
                 'buffered':         buffered,
-                'follow_redirects': False
-            })
-            rv = self.open(*args, **kwargs)
+                'follow_redirects': False,
+            }
+            environ, rv = self.redirect_client.open(**redirect_kwargs)
             status_code = int(rv[1].split(None, 1)[0])
 
             # Prevent loops
-            if redirect_chain[-1] in redirect_chain[0:-1]:
-                break
+            if redirect_chain[-1] in redirect_chain[:-1]:
+                raise ClientRedirectError("loop detected")
 
         response = self.response_wrapper(*rv)
         if as_tuple:
@@ -666,6 +734,11 @@
         kw['method'] = 'GET'
         return self.open(*args, **kw)
 
+    def patch(self, *args, **kw):
+        """Like open but method is enforced to PATCH."""
+        kw['method'] = 'PATCH'
+        return self.open(*args, **kw)
+
     def post(self, *args, **kw):
         """Like open but method is enforced to POST."""
         kw['method'] = 'POST'
@@ -732,6 +805,7 @@
     :param buffered: set to `True` to enforce buffering.
     :return: tuple in the form ``(app_iter, status, headers)``
     """
+    environ = _get_environ(environ)
     response = []
     buffer = []
 
@@ -743,7 +817,7 @@
 
     app_iter = app(environ, start_response)
 
-    # when buffering we emit the close call early and conver the
+    # when buffering we emit the close call early and convert the
     # application iterator into a regular list
     if buffered:
         close_func = getattr(app_iter, 'close', None)
@@ -761,8 +835,8 @@
         while not response:
             buffer.append(app_iter.next())
         if buffer:
+            close_func = getattr(app_iter, 'close', None)
             app_iter = chain(buffer, app_iter)
-            close_func = getattr(app_iter, 'close', None)
             if close_func is not None:
                 app_iter = ClosingIterator(app_iter, close_func)