changeset 1929:aa6aa944246b

introduce per-user and per-session secrets that are used for the cookie signature to avoid cookies being stolen and re-used after logging out
author Johannes Berg <johannes AT sipsolutions DOT net>
date Tue, 03 Apr 2007 18:19:24 +0200
parents 57cbc30bfa31
children 3b25f0f60ede
files MoinMoin/auth/__init__.py MoinMoin/config/multiconfig.py
diffstat 2 files changed, 126 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/MoinMoin/auth/__init__.py	Tue Apr 03 18:18:27 2007 +0200
+++ b/MoinMoin/auth/__init__.py	Tue Apr 03 18:19:24 2007 +0200
@@ -45,11 +45,94 @@
 
 import time, Cookie
 from MoinMoin import user
+from MoinMoin.caching import CacheEntry
 
 # cookie names
 MOIN_SESSION = 'MOIN_SESSION'
 
+# maximum number of stored secrets, i.e. maximum number of
+# different machines a user can use concurrently without having
+# to log in again
+MAX_STORED_SECRETS = 20
+
 import hmac, random
+from sha import sha
+
+class UserSecurityStringCache:
+    """UserSecurityStringCache -- cache a list of secrets for user cookies
+
+    In order to avoid cookie stealing even after a user has logged out we
+    keep a list of secrets (in the cache) associated with a user and verify
+    that the cookie matches the right one.
+
+    This class manages the secrets and their LRU expiry.
+    """
+    def __init__(self, request, userid):
+        # we use 'farm' scope but hash the user_dir into the
+        # secret cache name to make both shared and non-shared
+        # user_dir in a farm work properly
+        cache_name = sha(userid + request.cfg.user_dir).hexdigest()
+        self.ce = CacheEntry(request, 'ussc', cache_name, 'farm', use_pickle=True)
+
+    def _get(self):
+        """Internal: get string dict and LRU list from cache"""
+        if self.ce.exists():
+            return self.ce.content()
+        return {}, []
+
+    def update(self, secidx):
+        """ tell the secret string cache that the secret identified
+            was used
+
+        @param secidx: the index of that secret or None if a new one
+                       shall be assigned
+        """
+        secrets, lru = self._get()
+        # just move this secret to the front of the LRU queue
+        lru.remove(secidx)
+        lru.insert(0, secidx)
+        self.ce.update((secrets, lru))
+
+    def insert(self, secstring):
+        """ insert a new secret string into the cache
+
+        @param secstring: the new secret string
+        @rtype: int
+        @return: the new secret index
+        """
+        secrets, lru = self._get()
+        # find a new unused index
+        # try one that we'll expire first
+        if len(lru) >= MAX_STORED_SECRETS:
+            secidx = lru[-1]
+        else:
+            # select an unused index
+            secidx = random.randint(0, MAX_STORED_SECRETS*5)
+            while secidx in lru:
+                secidx = random.randint(0, MAX_STORED_SECRETS*5)
+        for idx in lru[MAX_STORED_SECRETS-1:]:
+            del secrets[idx]
+        lru = lru[:MAX_STORED_SECRETS-1]
+        lru.insert(0, secidx)
+        secrets[secidx] = secstring
+        self.ce.update((secrets, lru))
+        return secidx
+
+    def remove(self, secidx):
+        """ remove a given secret from the cache
+
+        @param secidx: the index of the secret to be removed
+        """
+        secrets, lru = self._get()
+        del secrets[secidx]
+        lru = [idx for idx in lru if idx != secidx]
+        self.ce.update((secrets,lru))
+
+    def getsecret(self, secidx):
+        secrets, lru = self._get()
+        if secidx in secrets:
+            return secrets[secidx]
+        return ''
 
 def generate_security_string(length):
     """ generate a random length (length/2 .. length) string with random content """
@@ -115,15 +198,25 @@
     # IMPORTANT: Prevent caching of current page and cookie
     request.disableHttpCaching()
 
-def setSessionCookie(request, u):
+def setSessionCookie(request, u, secret=None, securitystringcache=None, secidx=None):
     """ Set moin_session cookie for user obj u """
     import base64
     maxage = getCookieLifetime(request, u)
     expires = time.time() + maxage
     enc_username = base64.encodestring(u.auth_username).replace('\n', '')
     enc_id = base64.encodestring(u.id).replace('\n', '')
-    cookie_body = "username=%s:id=%s:expires=%d" % (enc_username, enc_id, expires)
-    cookie_hash = sign_cookie_data(request, cookie_body)
+    if secret is None and secidx is None:
+        secret = generate_security_string(32)
+    if securitystringcache is None:
+        securitystringcache = UserSecurityStringCache(request, u.id)
+    if secret is None:
+        # secidx must be assigned
+        securitystringcache.update(secidx)
+        secret = securitystringcache.getsecret(secidx)
+    else:
+        secidx = securitystringcache.insert(secret)
+    cookie_body = "username=%s:id=%s:expires=%d:secidx=%d" % (enc_username, enc_id, expires, secidx)
+    cookie_hash = sign_cookie_data(request, cookie_body, securitystring=secret)
     cookie_string = ':'.join([cookie_hash, cookie_body])
     setCookie(request, u, MOIN_SESSION, cookie_string, maxage, expires)
 
@@ -230,25 +323,36 @@
         if verbose: request.log("invalid cookie format: (%s)" % cookie[cookie_name].value)
         return user_obj, True
 
-    if cookie_hash != sign_cookie_data(request, cookie_body):
-        # Invalid cookie
+    # Parse cookie, be careful
+    params = {'username': '', 'id': '', 'expires': 0, 'secidx': -1, }
+    cookie_pairs = cookie_body.split(":")
+    for key, value in [pair.split("=", 1) for pair in cookie_pairs]:
+        try:
+            if isinstance(params[key], str):
+                params[key] = base64.decodestring(value)
+            elif isinstance(params[key], int):
+                params[key] = int(value)
+        except Exception:
+            # ignore any errors from parsing the values
+            pass
+    # This may seem odd, but checking expiry is cheaper
+    # than checking the signature.
+    if params['expires'] < time.time():
+        # XXX Cookie clear here???
+        if verbose: request.log("cookie expired")
+        return user_obj, True
+
+    secidx = params['secidx']
+
+    ussc = UserSecurityStringCache(request, params['id'])
+    secstring = ussc.getsecret(secidx)
+    if cookie_hash != sign_cookie_data(request, cookie_body, securitystring=secstring):
         # XXX Cookie clear here???
         if verbose: request.log("cookie recovered had invalid hash")
         return user_obj, True
 
-    # We can trust the cookie
     if verbose: request.log("Cookie OK, authenticated.")
-    params = {'username': '', 'id': '', 'expires': 0, }
-    cookie_pairs = cookie_body.split(":")
-    for key, value in [pair.split("=", 1) for pair in cookie_pairs]:
-        if isinstance(params[key], str):
-            params[key] = base64.decodestring(value)
-        elif isinstance(params[key], int):
-            params[key] = int(value)
-    if params['expires'] < time.time():
-        # XXX Cookie clear here???
-        if verbose: request.log("cookie expired")
-        return user_obj, True
+
     # XXX Should name be in auth_attribs?
     u = user.User(request,
                   id=params['id'],
@@ -260,10 +364,13 @@
     if logout:
         if verbose: request.log("Logout requested, setting u invalid and 'deleting' cookie")
         u.valid = 0 # just make user invalid, but remember him
+        # delete secret for this cookie
+        ussc.remove(secidx)
         deleteCookie(request, cookie_name)
         return u, True # we return a invalidated user object, so that
                        # following auth methods can get the name of
                        # the user who logged out
-    setSessionCookie(request, u) # refreshes cookie lifetime
+    # refresh cookie lifetime
+    setSessionCookie(request, u, securitystringcache=ussc, secidx=secidx)
     return u, True # use True to get other methods called, too
 
--- a/MoinMoin/config/multiconfig.py	Tue Apr 03 18:18:27 2007 +0200
+++ b/MoinMoin/config/multiconfig.py	Tue Apr 03 18:19:24 2007 +0200
@@ -555,7 +555,7 @@
         for dirname in ('user', 'cache', 'plugin'):
             name = dirname + '_dir'
             if not getattr(self, name, None):
-                setattr(self, name, os.path.join(data_dir, dirname))
+                setattr(self, name, os.path.abspath(os.path.join(data_dir, dirname)))
 
         # Try to decode certain names which allow unicode
         self._decode()