view MoinMoin/_tests/ldap_testbase.py @ 6028:1893da1d5213

userid lookup caches: use 1 on-disk cache file, update cache rather than rebuild Before this, we maintained one cache file per attribute (e.g. name2id, openid2id, ...) - the related code did multiple passes over all user profiles to rebuild these cache files. Now doing a one-pass rebuild, writing all attribute -> userid mappings into one on-disk cache file called "lookup". Additionally to "name" and "openids", support fast lookup for "email" and "jid" also. On profile save, we use to just kill the cache and let it rebuild. Now the cache is read, updated and written back (which is much less expensive for wikis with more than a few users). Did some refactoring also, reducing duplication, breaking down the code into smaller functions / methods.
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Wed, 12 Feb 2014 18:22:10 +0100
parents 8a3c0c726d66
children
line wrap: on
line source

# -*- coding: utf-8 -*-
"""
    LDAPTestBase: LDAP testing support for py.test based unit tests

    Features
    --------

    * setup_class
      * automatic creation of a temporary LDAP server environment
      * automatic creation of a LDAP server process (slapd)

    * teardown_class
      * LDAP server process will be killed and termination will be waited for
      * temporary LDAP environment will be removed

    Usage
    -----

    Write your own test class and derive from LDAPTestBase:

    class TestLdap(LDAPTestBase):
        def testFunction(self):
            server_url = self.ldap_env.slapd.url
            lo = ldap.initialize(server_url)
            lo.simple_bind_s('', '')

    Notes
    -----

    On Ubuntu 8.04 there is apparmor imposing some restrictions on /usr/sbin/slapd,
    so you need to disable apparmor by invoking this as root:

    # /etc/init.d/apparmor stop

    @copyright: 2008 by Thomas Waldmann
    @license: GNU GPL, see COPYING for details.
"""

SLAPD_EXECUTABLE = 'slapd'  # filename of LDAP server executable - if it is not
                            # in your PATH, you have to give full path/filename.

import os, shutil, tempfile, time, base64, md5
from StringIO import StringIO
import signal
import subprocess

try:
    import ldap, ldif, ldap.modlist  # needs python-ldap
except ImportError:
    ldap = None


def check_environ():
    """ Check the system environment whether we are able to run.
        Either return some failure reason if we can't or None if everything
        looks OK.
    """
    if ldap is None:
        return "You need python-ldap installed to use ldap_testbase."
    slapd = False
    try:
        p = subprocess.Popen([SLAPD_EXECUTABLE, '-V'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        pid = p.pid
        rc = p.wait()
        if pid and rc == 1:
            slapd = True  # it works
    except OSError, err:
        import errno
        if not (err.errno == errno.ENOENT or
                (err.errno == 3 and os.name == 'nt')):
            raise
    if not slapd:
        return "Can't start %s (see SLAPD_EXECUTABLE)." % SLAPD_EXECUTABLE
    return None


class Slapd(object):
    """ Manage a slapd process for testing purposes """
    def __init__(self,
                 config=None,  # config filename for -f
                 executable=SLAPD_EXECUTABLE,
                 debug_flags='', # None,  # for -d stats,acl,args,trace,sync,config
                 proto='ldap', ip='127.0.0.1', port=3890,  # use -h proto://ip:port
                 service_name=''  # defaults to -n executable:port, use None to not use -n
                ):
        self.executable = executable
        self.config = config
        self.debug_flags = debug_flags
        self.proto = proto
        self.ip = ip
        self.port = port
        self.url = '%s://%s:%d' % (proto, ip, port) # can be used for ldap.initialize() call
        if service_name == '':
            self.service_name = '%s:%d' % (executable, port)
        else:
            self.service_name = service_name

    def start(self, timeout=0):
        """ start a slapd process and optionally wait up to timeout seconds until it responds """
        args = [self.executable, '-h', self.url, ]
        if self.config is not None:
            args.extend(['-f', self.config])
        if self.debug_flags is not None:
            args.extend(['-d', self.debug_flags])
        if self.service_name:
            args.extend(['-n', self.service_name])
        self.process = subprocess.Popen(args)
        started = None
        if timeout:
            lo = ldap.initialize(self.url)
            ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
            started = False
            wait_until = time.time() + timeout
            while time.time() < wait_until:
                try:
                    lo.simple_bind_s('', '')
                    started = True
                except ldap.SERVER_DOWN, err:
                    time.sleep(0.1)
                else:
                    break
        return started

    def stop(self):
        """ stop this slapd process and wait until it has terminated """
        pid = self.process.pid
        os.kill(pid, signal.SIGTERM)
        os.waitpid(pid, 0)


class LdapEnvironment(object):
    """ Manage a (temporary) environment for running a slapd in it """

    # default DB_CONFIG bdb configuration file contents
    DB_CONFIG = """\
# STRANGE: if i use those settings, after the test slapd goes to 100% and doesn't terminate on SIGTERM
# Set the database in memory cache size.
#set_cachesize 0 10000000 1

# Set log values.
#set_lg_regionmax 262144
#set_lg_bsize 262144
#set_lg_max 10485760

#set_tas_spins 0
"""

    def __init__(self,
                 basedn,
                 rootdn, rootpw,
                 instance=0,  # use different values when running multiple LdapEnvironments
                 schema_dir='/etc/ldap/schema',  # directory with schemas
                 coding='utf-8',  # coding used for config files
                 timeout=10,  # how long to wait for slapd starting [s]
                ):
        self.basedn = basedn
        self.rootdn = rootdn
        self.rootpw = rootpw
        self.instance = instance
        self.schema_dir = schema_dir
        self.coding = coding
        self.ldap_dir = None
        self.slapd_conf = None
        self.timeout = timeout

    def create_env(self, slapd_config, db_config=DB_CONFIG):
        """ create a temporary LDAP server environment in a temp. directory,
            including writing a slapd.conf (see configure_slapd) and a
            DB_CONFIG there.
        """
        # create directories
        self.ldap_dir = tempfile.mkdtemp(prefix='LdapEnvironment-%d.' % self.instance)
        self.ldap_db_dir = os.path.join(self.ldap_dir, 'db')
        os.mkdir(self.ldap_db_dir)

        # create DB_CONFIG for bdb backend
        db_config_fname = os.path.join(self.ldap_db_dir, 'DB_CONFIG')
        f = open(db_config_fname, 'w')
        f.write(db_config)
        f.close()

        rootpw = '{MD5}' + base64.b64encode(md5.new(self.rootpw).digest())

        # create slapd.conf from content template in slapd_config
        slapd_config = slapd_config % {
            'ldap_dir': self.ldap_dir,
            'ldap_db_dir': self.ldap_db_dir,
            'schema_dir': self.schema_dir,
            'basedn': self.basedn,
            'rootdn': self.rootdn,
            'rootpw': rootpw,
        }
        if isinstance(slapd_config, unicode):
            slapd_config = slapd_config.encode(self.coding)
        self.slapd_conf = os.path.join(self.ldap_dir, "slapd.conf")
        f = open(self.slapd_conf, 'w')
        f.write(slapd_config)
        f.close()

    def start_slapd(self):
        """ start a slapd and optionally wait until it talks with us """
        self.slapd = Slapd(config=self.slapd_conf, port=3890+self.instance)
        started = self.slapd.start(timeout=self.timeout)
        return started

    def load_directory(self, ldif_content):
        """ load the directory with the ldif_content (str) """
        lo = ldap.initialize(self.slapd.url)
        ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
        lo.simple_bind_s(self.rootdn, self.rootpw)

        class LDIFLoader(ldif.LDIFParser):
            def handle(self, dn, entry):
                lo.add_s(dn, ldap.modlist.addModlist(entry))

        loader = LDIFLoader(StringIO(ldif_content))
        loader.parse()

    def stop_slapd(self):
        """ stop a slapd """
        self.slapd.stop()

    def destroy_env(self):
        """ remove the temporary LDAP server environment """
        shutil.rmtree(self.ldap_dir)

try:
    import py.test

    class LDAPTstBase:
        """ Test base class for py.test based tests which need a LDAP server to talk to.

            Inherit your test class from this base class to test LDAP stuff.
        """

        # You MUST define these in your derived class:
        slapd_config = None  # a string with your slapd.conf template
        ldif_content = None  # a string with your ldif contents
        basedn = None  # your base DN
        rootdn = None  # root DN
        rootpw = None  # root password

        def setup_class(self):
            """ Create LDAP server environment, start slapd """
            self.ldap_env = LdapEnvironment(self.basedn, self.rootdn, self.rootpw)
            self.ldap_env.create_env(slapd_config=self.slapd_config)
            started = self.ldap_env.start_slapd()
            if not started:
                py.test.skip("Failed to start %s process, please see your syslog / log files"
                             " (and check if stopping apparmor helps, in case you use it)." % SLAPD_EXECUTABLE)
            self.ldap_env.load_directory(ldif_content=self.ldif_content)

        def teardown_class(self):
            """ Stop slapd, remove LDAP server environment """
            self.ldap_env.stop_slapd()
            self.ldap_env.destroy_env()

except ImportError:
    pass  # obviously py.test not in use