MoinMoin/_tests/ldap_testbase.py
author Thomas Waldmann <tw AT waldmann-edv DOT de>
Tue, 11 Nov 2008 22:00:33 +0100
changeset 4199 f414aece63e0
parent 3661 2d5a325ae247
permissions -rw-r--r--
send_page has no msg param any more, use theme.add_msg
tw@3650
     1
# -*- coding: utf-8 -*-
tw@3650
     2
"""
tw@3650
     3
    LDAPTestBase: LDAP testing support for py.test based unit tests
tw@3650
     4
tw@3650
     5
    Features
tw@3650
     6
    --------
tw@3650
     7
tw@3650
     8
    * setup_class
tw@3650
     9
      * automatic creation of a temporary LDAP server environment
tw@3650
    10
      * automatic creation of a LDAP server process (slapd)
tw@3650
    11
tw@3650
    12
    * teardown_class
tw@3650
    13
      * LDAP server process will be killed and termination will be waited for
tw@3650
    14
      * temporary LDAP environment will be removed
tw@3650
    15
tw@3650
    16
    Usage
tw@3650
    17
    -----
tw@3650
    18
tw@3650
    19
    Write your own test class and derive from LDAPTestBase:
tw@3650
    20
tw@3650
    21
    class TestLdap(LDAPTestBase):
tw@3650
    22
        def testFunction(self):
tw@3650
    23
            server_url = self.ldap_env.slapd.url
tw@3650
    24
            lo = ldap.initialize(server_url)
tw@3650
    25
            lo.simple_bind_s('', '')
tw@3650
    26
tw@3650
    27
    Notes
tw@3650
    28
    -----
tw@3650
    29
tw@3650
    30
    On Ubuntu 8.04 there is apparmor imposing some restrictions on /usr/sbin/slapd,
tw@3650
    31
    so you need to disable apparmor by invoking this as root:
tw@3650
    32
tw@3650
    33
    # /etc/init.d/apparmor stop
tw@3650
    34
tw@3908
    35
    Requires Python 2.4 (for subprocess module).
tw@3908
    36
tw@3650
    37
    @copyright: 2008 by Thomas Waldmann
tw@3650
    38
    @license: GNU GPL, see COPYING for details.
tw@3650
    39
"""
tw@3650
    40
tw@3651
    41
SLAPD_EXECUTABLE = 'slapd'  # filename of LDAP server executable - if it is not
tw@3651
    42
                            # in your PATH, you have to give full path/filename.
tw@3651
    43
tw@3650
    44
import os, shutil, tempfile, time
tw@3650
    45
from StringIO import StringIO
tw@3650
    46
import signal
tw@3650
    47
tw@3651
    48
try:
tw@3651
    49
    import subprocess  # needs Python 2.4
tw@3651
    50
except ImportError:
tw@3651
    51
    subprocess = None
tw@3651
    52
tw@3651
    53
try:
tw@3651
    54
    import ldap, ldif, ldap.modlist  # needs python-ldap
tw@3651
    55
except ImportError:
tw@3651
    56
    ldap = None
tw@3651
    57
tw@3651
    58
tw@3651
    59
def check_environ():
tw@3651
    60
    """ Check the system environment whether we are able to run.
tw@3651
    61
        Either return some failure reason if we can't or None if everything
tw@3651
    62
        looks OK.
tw@3651
    63
    """
tw@3651
    64
    if subprocess is None:
tw@3651
    65
        return "You need at least python 2.4 to use ldap_testbase."
tw@3651
    66
    if ldap is None:
tw@3651
    67
        return "You need python-ldap installed to use ldap_testbase."
tw@3651
    68
    slapd = False
tw@3651
    69
    try:
tw@3651
    70
        p = subprocess.Popen([SLAPD_EXECUTABLE, '-V'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
tw@3651
    71
        pid = p.pid
tw@3651
    72
        rc = p.wait()
tw@3651
    73
        if pid and rc == 1:
tw@3651
    74
            slapd = True  # it works
tw@3651
    75
    except OSError, err:
tw@3651
    76
        import errno
tw@3651
    77
        if not (err.errno == errno.ENOENT or
tw@3651
    78
                (err.errno == 3 and os.name == 'nt')):
tw@3651
    79
            raise
tw@3651
    80
    if not slapd:
tw@3651
    81
        return "Can't start %s (see SLAPD_EXECUTABLE)." % SLAPD_EXECUTABLE
tw@3651
    82
    return None
tw@3651
    83
tw@3650
    84
tw@3650
    85
class Slapd(object):
tw@3650
    86
    """ Manage a slapd process for testing purposes """
tw@3650
    87
    def __init__(self,
tw@3650
    88
                 config=None,  # config filename for -f
tw@3651
    89
                 executable=SLAPD_EXECUTABLE,
tw@3650
    90
                 debug_flags='', # None,  # for -d stats,acl,args,trace,sync,config
tw@3650
    91
                 proto='ldap', ip='127.0.0.1', port=3890,  # use -h proto://ip:port
tw@3650
    92
                 service_name=''  # defaults to -n executable:port, use None to not use -n
tw@3650
    93
                ):
tw@3650
    94
        self.executable = executable
tw@3650
    95
        self.config = config
tw@3650
    96
        self.debug_flags = debug_flags
tw@3650
    97
        self.proto = proto
tw@3650
    98
        self.ip = ip
tw@3650
    99
        self.port = port
tw@3650
   100
        self.url = '%s://%s:%d' % (proto, ip, port) # can be used for ldap.initialize() call
tw@3650
   101
        if service_name == '':
tw@3650
   102
            self.service_name = '%s:%d' % (executable, port)
tw@3650
   103
        else:
tw@3650
   104
            self.service_name = service_name
tw@3650
   105
tw@3650
   106
    def start(self, timeout=0):
tw@3650
   107
        """ start a slapd process and optionally wait up to timeout seconds until it responds """
tw@3650
   108
        args = [self.executable, '-h', self.url, ]
tw@3650
   109
        if self.config is not None:
tw@3650
   110
            args.extend(['-f', self.config])
tw@3650
   111
        if self.debug_flags is not None:
tw@3650
   112
            args.extend(['-d', self.debug_flags])
tw@3650
   113
        if self.service_name:
tw@3650
   114
            args.extend(['-n', self.service_name])
tw@3650
   115
        self.process = subprocess.Popen(args)
tw@3650
   116
        started = None
tw@3650
   117
        if timeout:
tw@3650
   118
            lo = ldap.initialize(self.url)
tw@3650
   119
            ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
tw@3650
   120
            started = False
tw@3650
   121
            wait_until = time.time() + timeout
tw@3650
   122
            while time.time() < wait_until:
tw@3650
   123
                try:
tw@3650
   124
                    lo.simple_bind_s('', '')
tw@3650
   125
                    started = True
tw@3650
   126
                except ldap.SERVER_DOWN, err:
tw@3650
   127
                    time.sleep(0.1)
tw@3650
   128
                else:
tw@3650
   129
                    break
tw@3650
   130
        return started
tw@3650
   131
tw@3650
   132
    def stop(self):
tw@3650
   133
        """ stop this slapd process and wait until it has terminated """
tw@3650
   134
        pid = self.process.pid
tw@3650
   135
        os.kill(pid, signal.SIGTERM)
tw@3650
   136
        os.waitpid(pid, 0)
tw@3650
   137
tw@3650
   138
tw@3650
   139
class LdapEnvironment(object):
tw@3650
   140
    """ Manage a (temporary) environment for running a slapd in it """
tw@3650
   141
tw@3650
   142
    # default DB_CONFIG bdb configuration file contents
tw@3650
   143
    DB_CONFIG = """\
tw@3650
   144
# STRANGE: if i use those settings, after the test slapd goes to 100% and doesn't terminate on SIGTERM
tw@3650
   145
# Set the database in memory cache size.
tw@3650
   146
#set_cachesize 0 10000000 1
tw@3650
   147
tw@3650
   148
# Set log values.
tw@3650
   149
#set_lg_regionmax 262144
tw@3650
   150
#set_lg_bsize 262144
tw@3650
   151
#set_lg_max 10485760
tw@3650
   152
tw@3650
   153
#set_tas_spins 0
tw@3650
   154
"""
tw@3650
   155
tw@3650
   156
    def __init__(self,
tw@3650
   157
                 basedn,
tw@3650
   158
                 rootdn, rootpw,
tw@3650
   159
                 instance=0,  # use different values when running multiple LdapEnvironments
tw@3650
   160
                 schema_dir='/etc/ldap/schema',  # directory with schemas
tw@3650
   161
                 coding='utf-8',  # coding used for config files
tw@3650
   162
                 timeout=10,  # how long to wait for slapd starting [s]
tw@3650
   163
                ):
tw@3650
   164
        self.basedn = basedn
tw@3650
   165
        self.rootdn = rootdn
tw@3650
   166
        self.rootpw = rootpw
tw@3650
   167
        self.instance = instance
tw@3650
   168
        self.schema_dir = schema_dir
tw@3650
   169
        self.coding = coding
tw@3650
   170
        self.ldap_dir = None
tw@3650
   171
        self.slapd_conf = None
tw@3650
   172
        self.timeout = timeout
tw@3650
   173
tw@3650
   174
    def create_env(self, slapd_config, db_config=DB_CONFIG):
tw@3650
   175
        """ create a temporary LDAP server environment in a temp. directory,
tw@3650
   176
            including writing a slapd.conf (see configure_slapd) and a
tw@3650
   177
            DB_CONFIG there.
tw@3650
   178
        """
tw@3650
   179
        # create directories
tw@3650
   180
        self.ldap_dir = tempfile.mkdtemp(prefix='LdapEnvironment-%d.' % self.instance)
tw@3650
   181
        self.ldap_db_dir = os.path.join(self.ldap_dir, 'db')
tw@3650
   182
        os.mkdir(self.ldap_db_dir)
tw@3650
   183
tw@3650
   184
        # create DB_CONFIG for bdb backend
tw@3650
   185
        db_config_fname = os.path.join(self.ldap_db_dir, 'DB_CONFIG')
tw@3650
   186
        f = open(db_config_fname, 'w')
tw@3650
   187
        f.write(db_config)
tw@3650
   188
        f.close()
tw@3650
   189
tw@3650
   190
        # create slapd.conf from content template in slapd_config
tw@3650
   191
        slapd_config = slapd_config % {
tw@3650
   192
            'ldap_dir': self.ldap_dir,
tw@3650
   193
            'ldap_db_dir': self.ldap_db_dir,
tw@3650
   194
            'schema_dir': self.schema_dir,
tw@3650
   195
            'basedn': self.basedn,
tw@3650
   196
            'rootdn': self.rootdn,
tw@3650
   197
            'rootpw': self.rootpw,
tw@3650
   198
        }
tw@3650
   199
        if isinstance(slapd_config, unicode):
tw@3650
   200
            slapd_config = slapd_config.encode(self.coding)
tw@3650
   201
        self.slapd_conf = os.path.join(self.ldap_dir, "slapd.conf")
tw@3650
   202
        f = open(self.slapd_conf, 'w')
tw@3650
   203
        f.write(slapd_config)
tw@3650
   204
        f.close()
tw@3650
   205
tw@3650
   206
    def start_slapd(self):
tw@3650
   207
        """ start a slapd and optionally wait until it talks with us """
tw@3650
   208
        self.slapd = Slapd(config=self.slapd_conf, port=3890+self.instance)
tw@3651
   209
        started = self.slapd.start(timeout=self.timeout)
tw@3651
   210
        return started
tw@3650
   211
tw@3650
   212
    def load_directory(self, ldif_content):
tw@3650
   213
        """ load the directory with the ldif_content (str) """
tw@3650
   214
        lo = ldap.initialize(self.slapd.url)
tw@3650
   215
        ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
tw@3650
   216
        lo.simple_bind_s(self.rootdn, self.rootpw)
tw@3650
   217
tw@3650
   218
        class LDIFLoader(ldif.LDIFParser):
tw@3650
   219
            def handle(self, dn, entry):
tw@3650
   220
                lo.add_s(dn, ldap.modlist.addModlist(entry))
tw@3650
   221
tw@3650
   222
        loader = LDIFLoader(StringIO(ldif_content))
tw@3650
   223
        loader.parse()
tw@3650
   224
tw@3650
   225
    def stop_slapd(self):
tw@3650
   226
        """ stop a slapd """
tw@3650
   227
        self.slapd.stop()
tw@3650
   228
tw@3650
   229
    def destroy_env(self):
tw@3650
   230
        """ remove the temporary LDAP server environment """
tw@3650
   231
        shutil.rmtree(self.ldap_dir)
tw@3650
   232
tw@3651
   233
try:
tw@3651
   234
    import py.test
tw@3650
   235
tw@3661
   236
    class LDAPTstBase:
tw@3651
   237
        """ Test base class for py.test based tests which need a LDAP server to talk to.
tw@3650
   238
tw@3651
   239
            Inherit your test class from this base class to test LDAP stuff.
tw@3651
   240
        """
tw@3650
   241
tw@3651
   242
        # You MUST define these in your derived class:
tw@3651
   243
        slapd_config = None  # a string with your slapd.conf template
tw@3651
   244
        ldif_content = None  # a string with your ldif contents
tw@3651
   245
        basedn = None  # your base DN
tw@3651
   246
        rootdn = None  # root DN
tw@3651
   247
        rootpw = None  # root password
tw@3650
   248
tw@3651
   249
        def setup_class(self):
tw@3651
   250
            """ Create LDAP server environment, start slapd """
tw@3651
   251
            self.ldap_env = LdapEnvironment(self.basedn, self.rootdn, self.rootpw)
tw@3651
   252
            self.ldap_env.create_env(slapd_config=self.slapd_config)
tw@3651
   253
            started = self.ldap_env.start_slapd()
tw@3651
   254
            if not started:
tw@3651
   255
                py.test.skip("Failed to start %s process, please see your syslog / log files"
tw@3651
   256
                             " (and check if stopping apparmor helps, in case you use it)." % SLAPD_EXECUTABLE)
tw@3651
   257
            self.ldap_env.load_directory(ldif_content=self.ldif_content)
tw@3650
   258
tw@3651
   259
        def teardown_class(self):
tw@3651
   260
            """ Stop slapd, remove LDAP server environment """
tw@3651
   261
            self.ldap_env.stop_slapd()
tw@3651
   262
            self.ldap_env.destroy_env()
tw@3650
   263
tw@3651
   264
except ImportError:
tw@3651
   265
    pass  # obviously py.test not in use
tw@3650
   266