MoinMoin/_tests/ldap_testbase.py
author Thomas Waldmann <tw AT waldmann-edv DOT de>
Tue, 11 Nov 2008 22:00:33 +0100
changeset 4199 f414aece63e0
parent 3908 4353a470388e
permissions -rw-r--r--
send_page has no msg param any more, use theme.add_msg
     1 # -*- coding: utf-8 -*-
     2 """
     3     LDAPTestBase: LDAP testing support for py.test based unit tests
     4 
     5     Features
     6     --------
     7 
     8     * setup_class
     9       * automatic creation of a temporary LDAP server environment
    10       * automatic creation of a LDAP server process (slapd)
    11 
    12     * teardown_class
    13       * LDAP server process will be killed and termination will be waited for
    14       * temporary LDAP environment will be removed
    15 
    16     Usage
    17     -----
    18 
    19     Write your own test class and derive from LDAPTestBase:
    20 
    21     class TestLdap(LDAPTestBase):
    22         def testFunction(self):
    23             server_url = self.ldap_env.slapd.url
    24             lo = ldap.initialize(server_url)
    25             lo.simple_bind_s('', '')
    26 
    27     Notes
    28     -----
    29 
    30     On Ubuntu 8.04 there is apparmor imposing some restrictions on /usr/sbin/slapd,
    31     so you need to disable apparmor by invoking this as root:
    32 
    33     # /etc/init.d/apparmor stop
    34 
    35     Requires Python 2.4 (for subprocess module).
    36 
    37     @copyright: 2008 by Thomas Waldmann
    38     @license: GNU GPL, see COPYING for details.
    39 """
    40 
    41 SLAPD_EXECUTABLE = 'slapd'  # filename of LDAP server executable - if it is not
    42                             # in your PATH, you have to give full path/filename.
    43 
    44 import os, shutil, tempfile, time
    45 from StringIO import StringIO
    46 import signal
    47 
    48 try:
    49     import subprocess  # needs Python 2.4
    50 except ImportError:
    51     subprocess = None
    52 
    53 try:
    54     import ldap, ldif, ldap.modlist  # needs python-ldap
    55 except ImportError:
    56     ldap = None
    57 
    58 
    59 def check_environ():
    60     """ Check the system environment whether we are able to run.
    61         Either return some failure reason if we can't or None if everything
    62         looks OK.
    63     """
    64     if subprocess is None:
    65         return "You need at least python 2.4 to use ldap_testbase."
    66     if ldap is None:
    67         return "You need python-ldap installed to use ldap_testbase."
    68     slapd = False
    69     try:
    70         p = subprocess.Popen([SLAPD_EXECUTABLE, '-V'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    71         pid = p.pid
    72         rc = p.wait()
    73         if pid and rc == 1:
    74             slapd = True  # it works
    75     except OSError, err:
    76         import errno
    77         if not (err.errno == errno.ENOENT or
    78                 (err.errno == 3 and os.name == 'nt')):
    79             raise
    80     if not slapd:
    81         return "Can't start %s (see SLAPD_EXECUTABLE)." % SLAPD_EXECUTABLE
    82     return None
    83 
    84 
    85 class Slapd(object):
    86     """ Manage a slapd process for testing purposes """
    87     def __init__(self,
    88                  config=None,  # config filename for -f
    89                  executable=SLAPD_EXECUTABLE,
    90                  debug_flags='', # None,  # for -d stats,acl,args,trace,sync,config
    91                  proto='ldap', ip='127.0.0.1', port=3890,  # use -h proto://ip:port
    92                  service_name=''  # defaults to -n executable:port, use None to not use -n
    93                 ):
    94         self.executable = executable
    95         self.config = config
    96         self.debug_flags = debug_flags
    97         self.proto = proto
    98         self.ip = ip
    99         self.port = port
   100         self.url = '%s://%s:%d' % (proto, ip, port) # can be used for ldap.initialize() call
   101         if service_name == '':
   102             self.service_name = '%s:%d' % (executable, port)
   103         else:
   104             self.service_name = service_name
   105 
   106     def start(self, timeout=0):
   107         """ start a slapd process and optionally wait up to timeout seconds until it responds """
   108         args = [self.executable, '-h', self.url, ]
   109         if self.config is not None:
   110             args.extend(['-f', self.config])
   111         if self.debug_flags is not None:
   112             args.extend(['-d', self.debug_flags])
   113         if self.service_name:
   114             args.extend(['-n', self.service_name])
   115         self.process = subprocess.Popen(args)
   116         started = None
   117         if timeout:
   118             lo = ldap.initialize(self.url)
   119             ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
   120             started = False
   121             wait_until = time.time() + timeout
   122             while time.time() < wait_until:
   123                 try:
   124                     lo.simple_bind_s('', '')
   125                     started = True
   126                 except ldap.SERVER_DOWN, err:
   127                     time.sleep(0.1)
   128                 else:
   129                     break
   130         return started
   131 
   132     def stop(self):
   133         """ stop this slapd process and wait until it has terminated """
   134         pid = self.process.pid
   135         os.kill(pid, signal.SIGTERM)
   136         os.waitpid(pid, 0)
   137 
   138 
   139 class LdapEnvironment(object):
   140     """ Manage a (temporary) environment for running a slapd in it """
   141 
   142     # default DB_CONFIG bdb configuration file contents
   143     DB_CONFIG = """\
   144 # STRANGE: if i use those settings, after the test slapd goes to 100% and doesn't terminate on SIGTERM
   145 # Set the database in memory cache size.
   146 #set_cachesize 0 10000000 1
   147 
   148 # Set log values.
   149 #set_lg_regionmax 262144
   150 #set_lg_bsize 262144
   151 #set_lg_max 10485760
   152 
   153 #set_tas_spins 0
   154 """
   155 
   156     def __init__(self,
   157                  basedn,
   158                  rootdn, rootpw,
   159                  instance=0,  # use different values when running multiple LdapEnvironments
   160                  schema_dir='/etc/ldap/schema',  # directory with schemas
   161                  coding='utf-8',  # coding used for config files
   162                  timeout=10,  # how long to wait for slapd starting [s]
   163                 ):
   164         self.basedn = basedn
   165         self.rootdn = rootdn
   166         self.rootpw = rootpw
   167         self.instance = instance
   168         self.schema_dir = schema_dir
   169         self.coding = coding
   170         self.ldap_dir = None
   171         self.slapd_conf = None
   172         self.timeout = timeout
   173 
   174     def create_env(self, slapd_config, db_config=DB_CONFIG):
   175         """ create a temporary LDAP server environment in a temp. directory,
   176             including writing a slapd.conf (see configure_slapd) and a
   177             DB_CONFIG there.
   178         """
   179         # create directories
   180         self.ldap_dir = tempfile.mkdtemp(prefix='LdapEnvironment-%d.' % self.instance)
   181         self.ldap_db_dir = os.path.join(self.ldap_dir, 'db')
   182         os.mkdir(self.ldap_db_dir)
   183 
   184         # create DB_CONFIG for bdb backend
   185         db_config_fname = os.path.join(self.ldap_db_dir, 'DB_CONFIG')
   186         f = open(db_config_fname, 'w')
   187         f.write(db_config)
   188         f.close()
   189 
   190         # create slapd.conf from content template in slapd_config
   191         slapd_config = slapd_config % {
   192             'ldap_dir': self.ldap_dir,
   193             'ldap_db_dir': self.ldap_db_dir,
   194             'schema_dir': self.schema_dir,
   195             'basedn': self.basedn,
   196             'rootdn': self.rootdn,
   197             'rootpw': self.rootpw,
   198         }
   199         if isinstance(slapd_config, unicode):
   200             slapd_config = slapd_config.encode(self.coding)
   201         self.slapd_conf = os.path.join(self.ldap_dir, "slapd.conf")
   202         f = open(self.slapd_conf, 'w')
   203         f.write(slapd_config)
   204         f.close()
   205 
   206     def start_slapd(self):
   207         """ start a slapd and optionally wait until it talks with us """
   208         self.slapd = Slapd(config=self.slapd_conf, port=3890+self.instance)
   209         started = self.slapd.start(timeout=self.timeout)
   210         return started
   211 
   212     def load_directory(self, ldif_content):
   213         """ load the directory with the ldif_content (str) """
   214         lo = ldap.initialize(self.slapd.url)
   215         ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
   216         lo.simple_bind_s(self.rootdn, self.rootpw)
   217 
   218         class LDIFLoader(ldif.LDIFParser):
   219             def handle(self, dn, entry):
   220                 lo.add_s(dn, ldap.modlist.addModlist(entry))
   221 
   222         loader = LDIFLoader(StringIO(ldif_content))
   223         loader.parse()
   224 
   225     def stop_slapd(self):
   226         """ stop a slapd """
   227         self.slapd.stop()
   228 
   229     def destroy_env(self):
   230         """ remove the temporary LDAP server environment """
   231         shutil.rmtree(self.ldap_dir)
   232 
   233 try:
   234     import py.test
   235 
   236     class LDAPTstBase:
   237         """ Test base class for py.test based tests which need a LDAP server to talk to.
   238 
   239             Inherit your test class from this base class to test LDAP stuff.
   240         """
   241 
   242         # You MUST define these in your derived class:
   243         slapd_config = None  # a string with your slapd.conf template
   244         ldif_content = None  # a string with your ldif contents
   245         basedn = None  # your base DN
   246         rootdn = None  # root DN
   247         rootpw = None  # root password
   248 
   249         def setup_class(self):
   250             """ Create LDAP server environment, start slapd """
   251             self.ldap_env = LdapEnvironment(self.basedn, self.rootdn, self.rootpw)
   252             self.ldap_env.create_env(slapd_config=self.slapd_config)
   253             started = self.ldap_env.start_slapd()
   254             if not started:
   255                 py.test.skip("Failed to start %s process, please see your syslog / log files"
   256                              " (and check if stopping apparmor helps, in case you use it)." % SLAPD_EXECUTABLE)
   257             self.ldap_env.load_directory(ldif_content=self.ldif_content)
   258 
   259         def teardown_class(self):
   260             """ Stop slapd, remove LDAP server environment """
   261             self.ldap_env.stop_slapd()
   262             self.ldap_env.destroy_env()
   263 
   264 except ImportError:
   265     pass  # obviously py.test not in use
   266