|
tw@3650
|
1 |
# -*- coding: utf-8 -*-
|
|
tw@3650
|
2 |
"""
|
|
tw@3650
|
3 |
LDAPTestBase: LDAP testing support for py.test based unit tests
|
|
tw@3650
|
4 |
|
|
tw@3650
|
5 |
Features
|
|
tw@3650
|
6 |
--------
|
|
tw@3650
|
7 |
|
|
tw@3650
|
8 |
* setup_class
|
|
tw@3650
|
9 |
* automatic creation of a temporary LDAP server environment
|
|
tw@3650
|
10 |
* automatic creation of a LDAP server process (slapd)
|
|
tw@3650
|
11 |
|
|
tw@3650
|
12 |
* teardown_class
|
|
tw@3650
|
13 |
* LDAP server process will be killed and termination will be waited for
|
|
tw@3650
|
14 |
* temporary LDAP environment will be removed
|
|
tw@3650
|
15 |
|
|
tw@3650
|
16 |
Usage
|
|
tw@3650
|
17 |
-----
|
|
tw@3650
|
18 |
|
|
tw@3650
|
19 |
Write your own test class and derive from LDAPTestBase:
|
|
tw@3650
|
20 |
|
|
tw@3650
|
21 |
class TestLdap(LDAPTestBase):
|
|
tw@3650
|
22 |
def testFunction(self):
|
|
tw@3650
|
23 |
server_url = self.ldap_env.slapd.url
|
|
tw@3650
|
24 |
lo = ldap.initialize(server_url)
|
|
tw@3650
|
25 |
lo.simple_bind_s('', '')
|
|
tw@3650
|
26 |
|
|
tw@3650
|
27 |
Notes
|
|
tw@3650
|
28 |
-----
|
|
tw@3650
|
29 |
|
|
tw@3650
|
30 |
On Ubuntu 8.04 there is apparmor imposing some restrictions on /usr/sbin/slapd,
|
|
tw@3650
|
31 |
so you need to disable apparmor by invoking this as root:
|
|
tw@3650
|
32 |
|
|
tw@3650
|
33 |
# /etc/init.d/apparmor stop
|
|
tw@3650
|
34 |
|
|
tw@3908
|
35 |
Requires Python 2.4 (for subprocess module).
|
|
tw@3908
|
36 |
|
|
tw@3650
|
37 |
@copyright: 2008 by Thomas Waldmann
|
|
tw@3650
|
38 |
@license: GNU GPL, see COPYING for details.
|
|
tw@3650
|
39 |
"""
|
|
tw@3650
|
40 |
|
|
tw@3651
|
41 |
SLAPD_EXECUTABLE = 'slapd' # filename of LDAP server executable - if it is not
|
|
tw@3651
|
42 |
# in your PATH, you have to give full path/filename.
|
|
tw@3651
|
43 |
|
|
tw@3650
|
44 |
import os, shutil, tempfile, time
|
|
tw@3650
|
45 |
from StringIO import StringIO
|
|
tw@3650
|
46 |
import signal
|
|
tw@3650
|
47 |
|
|
tw@3651
|
48 |
try:
|
|
tw@3651
|
49 |
import subprocess # needs Python 2.4
|
|
tw@3651
|
50 |
except ImportError:
|
|
tw@3651
|
51 |
subprocess = None
|
|
tw@3651
|
52 |
|
|
tw@3651
|
53 |
try:
|
|
tw@3651
|
54 |
import ldap, ldif, ldap.modlist # needs python-ldap
|
|
tw@3651
|
55 |
except ImportError:
|
|
tw@3651
|
56 |
ldap = None
|
|
tw@3651
|
57 |
|
|
tw@3651
|
58 |
|
|
tw@3651
|
59 |
def check_environ():
|
|
tw@3651
|
60 |
""" Check the system environment whether we are able to run.
|
|
tw@3651
|
61 |
Either return some failure reason if we can't or None if everything
|
|
tw@3651
|
62 |
looks OK.
|
|
tw@3651
|
63 |
"""
|
|
tw@3651
|
64 |
if subprocess is None:
|
|
tw@3651
|
65 |
return "You need at least python 2.4 to use ldap_testbase."
|
|
tw@3651
|
66 |
if ldap is None:
|
|
tw@3651
|
67 |
return "You need python-ldap installed to use ldap_testbase."
|
|
tw@3651
|
68 |
slapd = False
|
|
tw@3651
|
69 |
try:
|
|
tw@3651
|
70 |
p = subprocess.Popen([SLAPD_EXECUTABLE, '-V'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
tw@3651
|
71 |
pid = p.pid
|
|
tw@3651
|
72 |
rc = p.wait()
|
|
tw@3651
|
73 |
if pid and rc == 1:
|
|
tw@3651
|
74 |
slapd = True # it works
|
|
tw@3651
|
75 |
except OSError, err:
|
|
tw@3651
|
76 |
import errno
|
|
tw@3651
|
77 |
if not (err.errno == errno.ENOENT or
|
|
tw@3651
|
78 |
(err.errno == 3 and os.name == 'nt')):
|
|
tw@3651
|
79 |
raise
|
|
tw@3651
|
80 |
if not slapd:
|
|
tw@3651
|
81 |
return "Can't start %s (see SLAPD_EXECUTABLE)." % SLAPD_EXECUTABLE
|
|
tw@3651
|
82 |
return None
|
|
tw@3651
|
83 |
|
|
tw@3650
|
84 |
|
|
tw@3650
|
85 |
class Slapd(object):
|
|
tw@3650
|
86 |
""" Manage a slapd process for testing purposes """
|
|
tw@3650
|
87 |
def __init__(self,
|
|
tw@3650
|
88 |
config=None, # config filename for -f
|
|
tw@3651
|
89 |
executable=SLAPD_EXECUTABLE,
|
|
tw@3650
|
90 |
debug_flags='', # None, # for -d stats,acl,args,trace,sync,config
|
|
tw@3650
|
91 |
proto='ldap', ip='127.0.0.1', port=3890, # use -h proto://ip:port
|
|
tw@3650
|
92 |
service_name='' # defaults to -n executable:port, use None to not use -n
|
|
tw@3650
|
93 |
):
|
|
tw@3650
|
94 |
self.executable = executable
|
|
tw@3650
|
95 |
self.config = config
|
|
tw@3650
|
96 |
self.debug_flags = debug_flags
|
|
tw@3650
|
97 |
self.proto = proto
|
|
tw@3650
|
98 |
self.ip = ip
|
|
tw@3650
|
99 |
self.port = port
|
|
tw@3650
|
100 |
self.url = '%s://%s:%d' % (proto, ip, port) # can be used for ldap.initialize() call
|
|
tw@3650
|
101 |
if service_name == '':
|
|
tw@3650
|
102 |
self.service_name = '%s:%d' % (executable, port)
|
|
tw@3650
|
103 |
else:
|
|
tw@3650
|
104 |
self.service_name = service_name
|
|
tw@3650
|
105 |
|
|
tw@3650
|
106 |
def start(self, timeout=0):
|
|
tw@3650
|
107 |
""" start a slapd process and optionally wait up to timeout seconds until it responds """
|
|
tw@3650
|
108 |
args = [self.executable, '-h', self.url, ]
|
|
tw@3650
|
109 |
if self.config is not None:
|
|
tw@3650
|
110 |
args.extend(['-f', self.config])
|
|
tw@3650
|
111 |
if self.debug_flags is not None:
|
|
tw@3650
|
112 |
args.extend(['-d', self.debug_flags])
|
|
tw@3650
|
113 |
if self.service_name:
|
|
tw@3650
|
114 |
args.extend(['-n', self.service_name])
|
|
tw@3650
|
115 |
self.process = subprocess.Popen(args)
|
|
tw@3650
|
116 |
started = None
|
|
tw@3650
|
117 |
if timeout:
|
|
tw@3650
|
118 |
lo = ldap.initialize(self.url)
|
|
tw@3650
|
119 |
ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
|
|
tw@3650
|
120 |
started = False
|
|
tw@3650
|
121 |
wait_until = time.time() + timeout
|
|
tw@3650
|
122 |
while time.time() < wait_until:
|
|
tw@3650
|
123 |
try:
|
|
tw@3650
|
124 |
lo.simple_bind_s('', '')
|
|
tw@3650
|
125 |
started = True
|
|
tw@3650
|
126 |
except ldap.SERVER_DOWN, err:
|
|
tw@3650
|
127 |
time.sleep(0.1)
|
|
tw@3650
|
128 |
else:
|
|
tw@3650
|
129 |
break
|
|
tw@3650
|
130 |
return started
|
|
tw@3650
|
131 |
|
|
tw@3650
|
132 |
def stop(self):
|
|
tw@3650
|
133 |
""" stop this slapd process and wait until it has terminated """
|
|
tw@3650
|
134 |
pid = self.process.pid
|
|
tw@3650
|
135 |
os.kill(pid, signal.SIGTERM)
|
|
tw@3650
|
136 |
os.waitpid(pid, 0)
|
|
tw@3650
|
137 |
|
|
tw@3650
|
138 |
|
|
tw@3650
|
139 |
class LdapEnvironment(object):
|
|
tw@3650
|
140 |
""" Manage a (temporary) environment for running a slapd in it """
|
|
tw@3650
|
141 |
|
|
tw@3650
|
142 |
# default DB_CONFIG bdb configuration file contents
|
|
tw@3650
|
143 |
DB_CONFIG = """\
|
|
tw@3650
|
144 |
# STRANGE: if i use those settings, after the test slapd goes to 100% and doesn't terminate on SIGTERM
|
|
tw@3650
|
145 |
# Set the database in memory cache size.
|
|
tw@3650
|
146 |
#set_cachesize 0 10000000 1
|
|
tw@3650
|
147 |
|
|
tw@3650
|
148 |
# Set log values.
|
|
tw@3650
|
149 |
#set_lg_regionmax 262144
|
|
tw@3650
|
150 |
#set_lg_bsize 262144
|
|
tw@3650
|
151 |
#set_lg_max 10485760
|
|
tw@3650
|
152 |
|
|
tw@3650
|
153 |
#set_tas_spins 0
|
|
tw@3650
|
154 |
"""
|
|
tw@3650
|
155 |
|
|
tw@3650
|
156 |
def __init__(self,
|
|
tw@3650
|
157 |
basedn,
|
|
tw@3650
|
158 |
rootdn, rootpw,
|
|
tw@3650
|
159 |
instance=0, # use different values when running multiple LdapEnvironments
|
|
tw@3650
|
160 |
schema_dir='/etc/ldap/schema', # directory with schemas
|
|
tw@3650
|
161 |
coding='utf-8', # coding used for config files
|
|
tw@3650
|
162 |
timeout=10, # how long to wait for slapd starting [s]
|
|
tw@3650
|
163 |
):
|
|
tw@3650
|
164 |
self.basedn = basedn
|
|
tw@3650
|
165 |
self.rootdn = rootdn
|
|
tw@3650
|
166 |
self.rootpw = rootpw
|
|
tw@3650
|
167 |
self.instance = instance
|
|
tw@3650
|
168 |
self.schema_dir = schema_dir
|
|
tw@3650
|
169 |
self.coding = coding
|
|
tw@3650
|
170 |
self.ldap_dir = None
|
|
tw@3650
|
171 |
self.slapd_conf = None
|
|
tw@3650
|
172 |
self.timeout = timeout
|
|
tw@3650
|
173 |
|
|
tw@3650
|
174 |
def create_env(self, slapd_config, db_config=DB_CONFIG):
|
|
tw@3650
|
175 |
""" create a temporary LDAP server environment in a temp. directory,
|
|
tw@3650
|
176 |
including writing a slapd.conf (see configure_slapd) and a
|
|
tw@3650
|
177 |
DB_CONFIG there.
|
|
tw@3650
|
178 |
"""
|
|
tw@3650
|
179 |
# create directories
|
|
tw@3650
|
180 |
self.ldap_dir = tempfile.mkdtemp(prefix='LdapEnvironment-%d.' % self.instance)
|
|
tw@3650
|
181 |
self.ldap_db_dir = os.path.join(self.ldap_dir, 'db')
|
|
tw@3650
|
182 |
os.mkdir(self.ldap_db_dir)
|
|
tw@3650
|
183 |
|
|
tw@3650
|
184 |
# create DB_CONFIG for bdb backend
|
|
tw@3650
|
185 |
db_config_fname = os.path.join(self.ldap_db_dir, 'DB_CONFIG')
|
|
tw@3650
|
186 |
f = open(db_config_fname, 'w')
|
|
tw@3650
|
187 |
f.write(db_config)
|
|
tw@3650
|
188 |
f.close()
|
|
tw@3650
|
189 |
|
|
tw@3650
|
190 |
# create slapd.conf from content template in slapd_config
|
|
tw@3650
|
191 |
slapd_config = slapd_config % {
|
|
tw@3650
|
192 |
'ldap_dir': self.ldap_dir,
|
|
tw@3650
|
193 |
'ldap_db_dir': self.ldap_db_dir,
|
|
tw@3650
|
194 |
'schema_dir': self.schema_dir,
|
|
tw@3650
|
195 |
'basedn': self.basedn,
|
|
tw@3650
|
196 |
'rootdn': self.rootdn,
|
|
tw@3650
|
197 |
'rootpw': self.rootpw,
|
|
tw@3650
|
198 |
}
|
|
tw@3650
|
199 |
if isinstance(slapd_config, unicode):
|
|
tw@3650
|
200 |
slapd_config = slapd_config.encode(self.coding)
|
|
tw@3650
|
201 |
self.slapd_conf = os.path.join(self.ldap_dir, "slapd.conf")
|
|
tw@3650
|
202 |
f = open(self.slapd_conf, 'w')
|
|
tw@3650
|
203 |
f.write(slapd_config)
|
|
tw@3650
|
204 |
f.close()
|
|
tw@3650
|
205 |
|
|
tw@3650
|
206 |
def start_slapd(self):
|
|
tw@3650
|
207 |
""" start a slapd and optionally wait until it talks with us """
|
|
tw@3650
|
208 |
self.slapd = Slapd(config=self.slapd_conf, port=3890+self.instance)
|
|
tw@3651
|
209 |
started = self.slapd.start(timeout=self.timeout)
|
|
tw@3651
|
210 |
return started
|
|
tw@3650
|
211 |
|
|
tw@3650
|
212 |
def load_directory(self, ldif_content):
|
|
tw@3650
|
213 |
""" load the directory with the ldif_content (str) """
|
|
tw@3650
|
214 |
lo = ldap.initialize(self.slapd.url)
|
|
tw@3650
|
215 |
ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
|
|
tw@3650
|
216 |
lo.simple_bind_s(self.rootdn, self.rootpw)
|
|
tw@3650
|
217 |
|
|
tw@3650
|
218 |
class LDIFLoader(ldif.LDIFParser):
|
|
tw@3650
|
219 |
def handle(self, dn, entry):
|
|
tw@3650
|
220 |
lo.add_s(dn, ldap.modlist.addModlist(entry))
|
|
tw@3650
|
221 |
|
|
tw@3650
|
222 |
loader = LDIFLoader(StringIO(ldif_content))
|
|
tw@3650
|
223 |
loader.parse()
|
|
tw@3650
|
224 |
|
|
tw@3650
|
225 |
def stop_slapd(self):
|
|
tw@3650
|
226 |
""" stop a slapd """
|
|
tw@3650
|
227 |
self.slapd.stop()
|
|
tw@3650
|
228 |
|
|
tw@3650
|
229 |
def destroy_env(self):
|
|
tw@3650
|
230 |
""" remove the temporary LDAP server environment """
|
|
tw@3650
|
231 |
shutil.rmtree(self.ldap_dir)
|
|
tw@3650
|
232 |
|
|
tw@3651
|
233 |
try:
|
|
tw@3651
|
234 |
import py.test
|
|
tw@3650
|
235 |
|
|
tw@3661
|
236 |
class LDAPTstBase:
|
|
tw@3651
|
237 |
""" Test base class for py.test based tests which need a LDAP server to talk to.
|
|
tw@3650
|
238 |
|
|
tw@3651
|
239 |
Inherit your test class from this base class to test LDAP stuff.
|
|
tw@3651
|
240 |
"""
|
|
tw@3650
|
241 |
|
|
tw@3651
|
242 |
# You MUST define these in your derived class:
|
|
tw@3651
|
243 |
slapd_config = None # a string with your slapd.conf template
|
|
tw@3651
|
244 |
ldif_content = None # a string with your ldif contents
|
|
tw@3651
|
245 |
basedn = None # your base DN
|
|
tw@3651
|
246 |
rootdn = None # root DN
|
|
tw@3651
|
247 |
rootpw = None # root password
|
|
tw@3650
|
248 |
|
|
tw@3651
|
249 |
def setup_class(self):
|
|
tw@3651
|
250 |
""" Create LDAP server environment, start slapd """
|
|
tw@3651
|
251 |
self.ldap_env = LdapEnvironment(self.basedn, self.rootdn, self.rootpw)
|
|
tw@3651
|
252 |
self.ldap_env.create_env(slapd_config=self.slapd_config)
|
|
tw@3651
|
253 |
started = self.ldap_env.start_slapd()
|
|
tw@3651
|
254 |
if not started:
|
|
tw@3651
|
255 |
py.test.skip("Failed to start %s process, please see your syslog / log files"
|
|
tw@3651
|
256 |
" (and check if stopping apparmor helps, in case you use it)." % SLAPD_EXECUTABLE)
|
|
tw@3651
|
257 |
self.ldap_env.load_directory(ldif_content=self.ldif_content)
|
|
tw@3650
|
258 |
|
|
tw@3651
|
259 |
def teardown_class(self):
|
|
tw@3651
|
260 |
""" Stop slapd, remove LDAP server environment """
|
|
tw@3651
|
261 |
self.ldap_env.stop_slapd()
|
|
tw@3651
|
262 |
self.ldap_env.destroy_env()
|
|
tw@3650
|
263 |
|
|
tw@3651
|
264 |
except ImportError:
|
|
tw@3651
|
265 |
pass # obviously py.test not in use
|
|
tw@3650
|
266 |
|