MoinMoin/_tests/ldap_testbase.py
author Thomas Waldmann <tw AT waldmann-edv DOT de>
Sun, 01 Jun 2008 21:44:00 +0200
changeset 3661 2d5a325ae247
parent 3651 df024fd0a129
child 3908 4353a470388e
permissions -rw-r--r--
ldap tests: rename base class so py.test doesn't find it, skip tests if slapd is not started
     1 # -*- coding: utf-8 -*-
     2 """
     3     LDAPTestBase: LDAP testing support for py.test based unit tests
     4 
     5     Features
     6     --------
     7 
     8     * setup_class
     9       * automatic creation of a temporary LDAP server environment
    10       * automatic creation of a LDAP server process (slapd)
    11 
    12     * teardown_class
    13       * LDAP server process will be killed and termination will be waited for
    14       * temporary LDAP environment will be removed
    15 
    16     Usage
    17     -----
    18 
    19     Write your own test class and derive from LDAPTestBase:
    20 
    21     class TestLdap(LDAPTestBase):
    22         def testFunction(self):
    23             server_url = self.ldap_env.slapd.url
    24             lo = ldap.initialize(server_url)
    25             lo.simple_bind_s('', '')
    26 
    27     Notes
    28     -----
    29 
    30     On Ubuntu 8.04 there is apparmor imposing some restrictions on /usr/sbin/slapd,
    31     so you need to disable apparmor by invoking this as root:
    32 
    33     # /etc/init.d/apparmor stop
    34 
    35     @copyright: 2008 by Thomas Waldmann
    36     @license: GNU GPL, see COPYING for details.
    37 """
    38 
    39 SLAPD_EXECUTABLE = 'slapd'  # filename of LDAP server executable - if it is not
    40                             # in your PATH, you have to give full path/filename.
    41 
    42 import os, shutil, tempfile, time
    43 from StringIO import StringIO
    44 import signal
    45 
    46 try:
    47     import subprocess  # needs Python 2.4
    48 except ImportError:
    49     subprocess = None
    50 
    51 try:
    52     import ldap, ldif, ldap.modlist  # needs python-ldap
    53 except ImportError:
    54     ldap = None
    55 
    56 
    57 def check_environ():
    58     """ Check the system environment whether we are able to run.
    59         Either return some failure reason if we can't or None if everything
    60         looks OK.
    61     """
    62     if subprocess is None:
    63         return "You need at least python 2.4 to use ldap_testbase."
    64     if ldap is None:
    65         return "You need python-ldap installed to use ldap_testbase."
    66     slapd = False
    67     try:
    68         p = subprocess.Popen([SLAPD_EXECUTABLE, '-V'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    69         pid = p.pid
    70         rc = p.wait()
    71         if pid and rc == 1:
    72             slapd = True  # it works
    73     except OSError, err:
    74         import errno
    75         if not (err.errno == errno.ENOENT or
    76                 (err.errno == 3 and os.name == 'nt')):
    77             raise
    78     if not slapd:
    79         return "Can't start %s (see SLAPD_EXECUTABLE)." % SLAPD_EXECUTABLE
    80     return None
    81 
    82 
    83 class Slapd(object):
    84     """ Manage a slapd process for testing purposes """
    85     def __init__(self,
    86                  config=None,  # config filename for -f
    87                  executable=SLAPD_EXECUTABLE,
    88                  debug_flags='', # None,  # for -d stats,acl,args,trace,sync,config
    89                  proto='ldap', ip='127.0.0.1', port=3890,  # use -h proto://ip:port
    90                  service_name=''  # defaults to -n executable:port, use None to not use -n
    91                 ):
    92         self.executable = executable
    93         self.config = config
    94         self.debug_flags = debug_flags
    95         self.proto = proto
    96         self.ip = ip
    97         self.port = port
    98         self.url = '%s://%s:%d' % (proto, ip, port) # can be used for ldap.initialize() call
    99         if service_name == '':
   100             self.service_name = '%s:%d' % (executable, port)
   101         else:
   102             self.service_name = service_name
   103 
   104     def start(self, timeout=0):
   105         """ start a slapd process and optionally wait up to timeout seconds until it responds """
   106         args = [self.executable, '-h', self.url, ]
   107         if self.config is not None:
   108             args.extend(['-f', self.config])
   109         if self.debug_flags is not None:
   110             args.extend(['-d', self.debug_flags])
   111         if self.service_name:
   112             args.extend(['-n', self.service_name])
   113         self.process = subprocess.Popen(args)
   114         started = None
   115         if timeout:
   116             lo = ldap.initialize(self.url)
   117             ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
   118             started = False
   119             wait_until = time.time() + timeout
   120             while time.time() < wait_until:
   121                 try:
   122                     lo.simple_bind_s('', '')
   123                     started = True
   124                 except ldap.SERVER_DOWN, err:
   125                     time.sleep(0.1)
   126                 else:
   127                     break
   128         return started
   129 
   130     def stop(self):
   131         """ stop this slapd process and wait until it has terminated """
   132         pid = self.process.pid
   133         os.kill(pid, signal.SIGTERM)
   134         os.waitpid(pid, 0)
   135 
   136 
   137 class LdapEnvironment(object):
   138     """ Manage a (temporary) environment for running a slapd in it """
   139 
   140     # default DB_CONFIG bdb configuration file contents
   141     DB_CONFIG = """\
   142 # STRANGE: if i use those settings, after the test slapd goes to 100% and doesn't terminate on SIGTERM
   143 # Set the database in memory cache size.
   144 #set_cachesize 0 10000000 1
   145 
   146 # Set log values.
   147 #set_lg_regionmax 262144
   148 #set_lg_bsize 262144
   149 #set_lg_max 10485760
   150 
   151 #set_tas_spins 0
   152 """
   153 
   154     def __init__(self,
   155                  basedn,
   156                  rootdn, rootpw,
   157                  instance=0,  # use different values when running multiple LdapEnvironments
   158                  schema_dir='/etc/ldap/schema',  # directory with schemas
   159                  coding='utf-8',  # coding used for config files
   160                  timeout=10,  # how long to wait for slapd starting [s]
   161                 ):
   162         self.basedn = basedn
   163         self.rootdn = rootdn
   164         self.rootpw = rootpw
   165         self.instance = instance
   166         self.schema_dir = schema_dir
   167         self.coding = coding
   168         self.ldap_dir = None
   169         self.slapd_conf = None
   170         self.timeout = timeout
   171 
   172     def create_env(self, slapd_config, db_config=DB_CONFIG):
   173         """ create a temporary LDAP server environment in a temp. directory,
   174             including writing a slapd.conf (see configure_slapd) and a
   175             DB_CONFIG there.
   176         """
   177         # create directories
   178         self.ldap_dir = tempfile.mkdtemp(prefix='LdapEnvironment-%d.' % self.instance)
   179         self.ldap_db_dir = os.path.join(self.ldap_dir, 'db')
   180         os.mkdir(self.ldap_db_dir)
   181 
   182         # create DB_CONFIG for bdb backend
   183         db_config_fname = os.path.join(self.ldap_db_dir, 'DB_CONFIG')
   184         f = open(db_config_fname, 'w')
   185         f.write(db_config)
   186         f.close()
   187 
   188         # create slapd.conf from content template in slapd_config
   189         slapd_config = slapd_config % {
   190             'ldap_dir': self.ldap_dir,
   191             'ldap_db_dir': self.ldap_db_dir,
   192             'schema_dir': self.schema_dir,
   193             'basedn': self.basedn,
   194             'rootdn': self.rootdn,
   195             'rootpw': self.rootpw,
   196         }
   197         if isinstance(slapd_config, unicode):
   198             slapd_config = slapd_config.encode(self.coding)
   199         self.slapd_conf = os.path.join(self.ldap_dir, "slapd.conf")
   200         f = open(self.slapd_conf, 'w')
   201         f.write(slapd_config)
   202         f.close()
   203 
   204     def start_slapd(self):
   205         """ start a slapd and optionally wait until it talks with us """
   206         self.slapd = Slapd(config=self.slapd_conf, port=3890+self.instance)
   207         started = self.slapd.start(timeout=self.timeout)
   208         return started
   209 
   210     def load_directory(self, ldif_content):
   211         """ load the directory with the ldif_content (str) """
   212         lo = ldap.initialize(self.slapd.url)
   213         ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
   214         lo.simple_bind_s(self.rootdn, self.rootpw)
   215 
   216         class LDIFLoader(ldif.LDIFParser):
   217             def handle(self, dn, entry):
   218                 lo.add_s(dn, ldap.modlist.addModlist(entry))
   219 
   220         loader = LDIFLoader(StringIO(ldif_content))
   221         loader.parse()
   222 
   223     def stop_slapd(self):
   224         """ stop a slapd """
   225         self.slapd.stop()
   226 
   227     def destroy_env(self):
   228         """ remove the temporary LDAP server environment """
   229         shutil.rmtree(self.ldap_dir)
   230 
   231 try:
   232     import py.test
   233 
   234     class LDAPTstBase:
   235         """ Test base class for py.test based tests which need a LDAP server to talk to.
   236 
   237             Inherit your test class from this base class to test LDAP stuff.
   238         """
   239 
   240         # You MUST define these in your derived class:
   241         slapd_config = None  # a string with your slapd.conf template
   242         ldif_content = None  # a string with your ldif contents
   243         basedn = None  # your base DN
   244         rootdn = None  # root DN
   245         rootpw = None  # root password
   246 
   247         def setup_class(self):
   248             """ Create LDAP server environment, start slapd """
   249             self.ldap_env = LdapEnvironment(self.basedn, self.rootdn, self.rootpw)
   250             self.ldap_env.create_env(slapd_config=self.slapd_config)
   251             started = self.ldap_env.start_slapd()
   252             if not started:
   253                 py.test.skip("Failed to start %s process, please see your syslog / log files"
   254                              " (and check if stopping apparmor helps, in case you use it)." % SLAPD_EXECUTABLE)
   255             self.ldap_env.load_directory(ldif_content=self.ldif_content)
   256 
   257         def teardown_class(self):
   258             """ Stop slapd, remove LDAP server environment """
   259             self.ldap_env.stop_slapd()
   260             self.ldap_env.destroy_env()
   261 
   262 except ImportError:
   263     pass  # obviously py.test not in use
   264