changeset 5281:5f0ec1f315bc

xapian: new locking, removed threading and signing (details see below) Locking: The old locking (moin based code) somehow didn't work reliably, often there was a lock leftover and blocked index updates until someone removed it manually. This code was removed. Moin now relies on Xapian's builtin locking that is active between conn = IndexerConnection() and conn.close() If one tries to create another IndexerConnection, it raises a XapianDatabaseLockError. Signing: The only purpose of this code (creating / removing a "completed" file in the xapian dir) was to signal whether indexing has completed. This is not really needed and full index rebuilds will be done differently soon anyway. Threading: Removed to simplify the code and testing of it. Cleaned up docstrings.
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Sun, 15 Nov 2009 12:45:30 +0100
parents d25574af97eb
children bee5567d7084
files MoinMoin/search/Xapian/__init__.py MoinMoin/search/Xapian/indexing.py MoinMoin/search/Xapian/search.py MoinMoin/search/_tests/test_search.py MoinMoin/search/builtin.py
diffstat 5 files changed, 129 insertions(+), 209 deletions(-) [+]
line wrap: on
line diff
--- a/MoinMoin/search/Xapian/__init__.py	Sun Nov 15 10:23:51 2009 +0100
+++ b/MoinMoin/search/Xapian/__init__.py	Sun Nov 15 12:45:30 2009 +0100
@@ -2,10 +2,10 @@
 """
     MoinMoin - xapian search engine
 
-    @copyright: 2006-2008 MoinMoin:ThomasWaldmann,
+    @copyright: 2006-2009 MoinMoin:ThomasWaldmann,
                 2006 MoinMoin:FranzPletz
     @license: GNU GPL, see COPYING for details.
 """
 
-from MoinMoin.search.Xapian.indexing import XapianIndex, Query, MoinSearchConnection, MoinIndexerConnection
+from MoinMoin.search.Xapian.indexing import XapianIndex, Query, MoinSearchConnection, MoinIndexerConnection, XapianDatabaseLockError
 from MoinMoin.search.Xapian.tokenizer import WikiAnalyzer
--- a/MoinMoin/search/Xapian/indexing.py	Sun Nov 15 10:23:51 2009 +0100
+++ b/MoinMoin/search/Xapian/indexing.py	Sun Nov 15 12:45:30 2009 +0100
@@ -66,6 +66,8 @@
         return self.get_all_documents(query)
 
 
+XapianDatabaseLockError = xappy.XapianDatabaseLockError
+
 class MoinIndexerConnection(xappy.IndexerConnection):
 
     def __init__(self, *args, **kwargs):
@@ -164,31 +166,44 @@
         self.request.cfg.xapian_searchers.append((searcher, timestamp))
         return hits
 
-    def _do_queued_updates(self, request, amount=5):
-        """ Index <amount> entries from the indexer queue. """
-        self.touch()
-        connection = MoinIndexerConnection(self.dir)
+    def do_queued_updates(self, amount=-1):
+        """ Index <amount> entries from the indexer queue.
+
+            @param amount: amount of queue entries to process (default: -1 == all)
+        """
         try:
-            for i in range(amount):
-                try:
-                    pagename, attachmentname, revno = self.update_queue.get()
-                except IndexError:
-                    # queue empty
-                    break
-                else:
-                    logging.debug("got from indexer queue: %r %r %r" % (pagename, attachmentname, revno))
-                    if not attachmentname:
-                        if revno is None:
-                            # generic "index this page completely, with attachments" request
-                            self._index_page(request, connection, pagename, mode='update')
+            request = self._indexingRequest(self.request)
+            connection = MoinIndexerConnection(self.dir)
+            self.touch()
+            try:
+                done_count = 0
+                while amount:
+                    # trick: if amount starts from -1, it will never get 0
+                    amount -= 1
+                    try:
+                        pagename, attachmentname, revno = self.update_queue.get()
+                    except IndexError:
+                        # queue empty
+                        break
+                    else:
+                        logging.debug("got from indexer queue: %r %r %r" % (pagename, attachmentname, revno))
+                        if not attachmentname:
+                            if revno is None:
+                                # generic "index this page completely, with attachments" request
+                                self._index_page(request, connection, pagename, mode='update')
+                            else:
+                                # "index this page revision" request
+                                self._index_page_rev(request, connection, pagename, revno, mode='update')
                         else:
-                            # "index this page revision" request
-                            self._index_page_rev(request, connection, pagename, revno, mode='update')
-                    else:
-                        # "index this attachment" request
-                        self._index_attachment(request, connection, pagename, attachmentname, mode='update')
-        finally:
-            connection.close()
+                            # "index this attachment" request
+                            self._index_attachment(request, connection, pagename, attachmentname, mode='update')
+                        done_count += 1
+            finally:
+                logging.debug("updated xapian index with %d queued updates" % done_count)
+                connection.close()
+        except XapianDatabaseLockError:
+            # another indexer has locked the index, we can retry it later...
+            logging.debug("can't lock xapian index, not doing queued updates now")
 
     def _get_document(self, connection, doc_id, mtime, mode):
         do_index = False
@@ -203,6 +218,8 @@
                 do_index = mtime > docmtime
         elif mode == 'add':
             do_index = True
+        else:
+            raise ValueError("mode must be 'update' or 'add'")
 
         if do_index:
             document = xappy.UnprocessedDocument()
@@ -292,13 +309,15 @@
 
         Index all revisions (if wanted by configuration) and all attachments.
 
-        @arg connection: the Indexer connection object
-        @arg pagename: a page name
-        @arg mode: 'add' = just add, no checks
-                   'update' = check if already in index and update if needed (mtime)
+        @param request: request suitable for indexing
+        @param connection: the Indexer connection object
+        @param pagename: a page name
+        @param mode: 'add' = just add, no checks
+                     'update' = check if already in index and update if needed (mtime)
         """
         page = Page(request, pagename)
         revlist = page.getRevList() # recent revs first, does not include deleted revs
+        logging.debug("indexing page %r, %d revs found" % (pagename, len(revlist)))
 
         if not revlist:
             # we have an empty revision list, that means the page is not there any more,
@@ -343,11 +362,12 @@
     def _index_page_rev(self, request, connection, pagename, revno, mode='update'):
         """ Index a page revision.
 
-        @arg connection: the Indexer connection object
-        @arg pagename: the page name
-        @arg revno: page revision number (int)
-        @arg mode: 'add' = just add, no checks
-                   'update' = check if already in index and update if needed (mtime)
+        @param request: request suitable for indexing
+        @param connection: the Indexer connection object
+        @param pagename: the page name
+        @param revno: page revision number (int)
+        @param mode: 'add' = just add, no checks
+                     'update' = check if already in index and update if needed (mtime)
         """
         page = Page(request, pagename, rev=revno)
         request.page = page # XXX for what is this needed?
@@ -358,8 +378,6 @@
         mtime = page.mtime_usecs()
 
         doc = self._get_document(connection, itemid, mtime, mode)
-        logging.debug("%s %r" % (pagename, doc))
-
         if doc:
             mimetype = 'text/%s' % page.pi['format']  # XXX improve this
 
@@ -393,11 +411,10 @@
     def _remove_page_rev(self, request, connection, pagename, revno):
         """ Remove a page revision from the index.
 
-        @arg connection: the Indexer connection object
-        @arg pagename: the page name
-        @arg revno: a real revision number (int), > 0
-        @arg mode: 'add' = just add, no checks
-                   'update' = check if already in index and update if needed (mtime)
+        @param request: request suitable for indexing
+        @param connection: the Indexer connection object
+        @param pagename: the page name
+        @param revno: a real revision number (int), > 0
         """
         wikiname = request.cfg.interwikiname or u"Self"
         revision = str(revno)
@@ -407,6 +424,13 @@
 
     def _index_attachment(self, request, connection, pagename, attachmentname, mode='update'):
         """ Index an attachment
+
+        @param request: request suitable for indexing
+        @param connection: the Indexer connection object
+        @param pagename: the page name
+        @param attachmentname: the attachment's name
+        @param mode: 'add' = just add, no checks
+                     'update' = check if already in index and update if needed (mtime)
         """
         from MoinMoin.action import AttachFile
         wikiname = request.cfg.interwikiname or u"Self"
@@ -447,7 +471,14 @@
             logging.debug('attachment %s (page %s) removed from index' % (attachmentname, pagename))
 
     def _index_file(self, request, connection, filename, mode='update'):
-        """ index files (that are NOT attachments, just arbitrary files) """
+        """ index files (that are NOT attachments, just arbitrary files)
+
+        @param request: request suitable for indexing
+        @param connection: the Indexer connection object
+        @param filename: a filesystem file name
+        @param mode: 'add' = just add, no checks
+                     'update' = check if already in index and update if needed (mtime)
+        """
         wikiname = request.cfg.interwikiname or u"Self"
         fs_rootpage = 'FS' # XXX FS hardcoded
 
@@ -481,15 +512,15 @@
             logging.exception("_index_file crashed:")
 
     def _index_pages(self, request, files=None, mode='update', pages=None):
-        """ Index pages (and all given files)
-
-        This should be called from indexPages or indexPagesInNewThread only!
+        """ Index all (given) pages (and all given files)
 
-        @param request: the current request
+        This should be called from indexPages only!
+
+        @param request: request suitable for indexing
         @param files: an optional list of files to index
-        @param mode: how to index the files, either 'add', 'update' or 'rebuild'
+        @param mode: 'add' = just add, no checks
+                     'update' = check if already in index and update if needed (mtime)
         @param pages: list of pages to index, if not given, all pages are indexed
-
         """
         if pages is None:
             # Index all pages
@@ -501,17 +532,20 @@
                 os.unlink(os.path.join(self.dir, fname))
             mode = 'add'
 
-        self.touch()
-        connection = MoinIndexerConnection(self.dir)
         try:
-            logging.debug("indexing all (%d) pages..." % len(pages))
-            for pagename in pages:
-                self._index_page(request, connection, pagename, mode=mode)
-            if files:
-                logging.debug("indexing all files...")
-                for fname in files:
-                    fname = fname.strip()
-                    self._index_file(request, connection, fname, mode)
-        finally:
-            connection.close()
+            connection = MoinIndexerConnection(self.dir)
+            self.touch()
+            try:
+                logging.info("indexing %d pages..." % len(pages))
+                for pagename in pages:
+                    self._index_page(request, connection, pagename, mode=mode)
+                if files:
+                    logging.info("indexing all files...")
+                    for fname in files:
+                        fname = fname.strip()
+                        self._index_file(request, connection, fname, mode)
+            finally:
+                connection.close()
+        except XapianDatabaseLockError:
+            logging.warning("xapian index is locked, can't index.")
 
--- a/MoinMoin/search/Xapian/search.py	Sun Nov 15 10:23:51 2009 +0100
+++ b/MoinMoin/search/Xapian/search.py	Sun Nov 15 12:45:30 2009 +0100
@@ -5,7 +5,7 @@
     @copyright: 2005 MoinMoin:FlorianFesti,
                 2005 MoinMoin:NirSoffer,
                 2005 MoinMoin:AlexanderSchremmer,
-                2006-2008 MoinMoin:ThomasWaldmann,
+                2006-2009 MoinMoin:ThomasWaldmann,
                 2006 MoinMoin:FranzPletz
     @license: GNU GPL, see COPYING for details
 """
@@ -49,13 +49,10 @@
         index = self.index
 
         clock.start('_xapianSearch')
-        try:
-            clock.start('_xapianQuery')
-            search_results = index.search(self.query, sort=self.sort, historysearch=self.historysearch)
-            clock.stop('_xapianQuery')
-            logging.debug("_xapianSearch: finds: %r" % search_results)
-        except BaseIndex.LockedException:
-            pass
+        clock.start('_xapianQuery')
+        search_results = index.search(self.query, sort=self.sort, historysearch=self.historysearch)
+        clock.stop('_xapianQuery')
+        logging.debug("_xapianSearch: finds: %r" % search_results)
 
         # Note: .data is (un)pickled inside xappy, so we get back exactly what
         #       we had put into it at indexing time (including unicode objects).
--- a/MoinMoin/search/_tests/test_search.py	Sun Nov 15 10:23:51 2009 +0100
+++ b/MoinMoin/search/_tests/test_search.py	Sun Nov 15 12:45:30 2009 +0100
@@ -3,12 +3,14 @@
     MoinMoin - MoinMoin.search Tests
 
     @copyright: 2005 by Nir Soffer <nirs@freeshell.org>,
-                2007 by MoinMoin:ThomasWaldmann
+                2007-2009 by MoinMoin:ThomasWaldmann
     @license: GNU GPL, see COPYING for details.
 """
 
 
-import py, os, StringIO
+import os, StringIO, time
+
+import py
 
 from MoinMoin.search import QueryError, _get_searcher
 from MoinMoin.search.queryparser import QueryParser
@@ -101,7 +103,7 @@
 
     searcher_class = None
 
-    def _wait_for_index_update(self):
+    def _index_update(self):
         pass
 
     @classmethod
@@ -283,13 +285,15 @@
         self.pages['TestCreatePage'] = 'some text' # Moin search must search this page
         try:
             create_page(self.request, 'TestCreatePage', self.pages['TestCreatePage'])
-            self._wait_for_index_update()
+            self._index_update()
             result = self.search(u'TestCreatePage')
             assert len(result.hits) == 1
         finally:
             nuke_page(self.request, 'TestCreatePage')
-            self._wait_for_index_update()
+            self._index_update()
             del self.pages['TestCreatePage']
+            result = self.search(u'TestCreatePage')
+            assert len(result.hits) == 0
 
     def test_attachment(self):
         page_name = u'TestAttachment'
@@ -306,16 +310,15 @@
             create_page(self.request, page_name, self.pages[page_name])
             AttachFile.add_attachment(self.request, page_name, filename, filecontent, True)
             append_page(self.request, page_name, '[[attachment:%s]]' % filename)
-            self._wait_for_index_update()
+            self._index_update()
             result = self.search(filename)
             assert len(result.hits) > 0
         finally:
             nuke_page(self.request, page_name)
             del self.pages[page_name]
-            self._wait_for_index_update()
-
-        result = self.search(filename)
-        assert len(result.hits) == 0
+            self._index_update()
+            result = self.search(filename)
+            assert len(result.hits) == 0
 
     def test_get_searcher(self):
         assert isinstance(_get_searcher(self.request, ''), self.searcher_class)
@@ -344,11 +347,12 @@
 
         xapian_search = True
 
-    def _wait_for_index_update(self):
+    def _index_update(self):
+        # for xapian, we queue index updates so they can get indexed later.
+        # here we make sure the queue will be processed completely,
+        # before we continue:
         from MoinMoin.search.Xapian import XapianIndex
-        index = XapianIndex(self.request)
-        index.lock.acquire()
-        index.lock.release()
+        XapianIndex(self.request).do_queued_updates()
 
     def get_searcher(self, query):
         from MoinMoin.search.Xapian.search import XapianSearch
@@ -438,27 +442,6 @@
         assert len(result.hits) == 2
 
 
-class TestXapianIndexingInNewThread(object):
-    """ search: test Xapian indexing """
-
-    class Config(wikiconfig.Config):
-
-        xapian_search = True
-
-    def test_index_in_new_thread(self):
-        """ search: kicks off indexing for a single pages in Xapian """
-        try:
-            from MoinMoin.search.Xapian import XapianIndex
-        except ImportError:
-            py.test.skip('xapian is not installed')
-
-        nuke_xapian_index(self.request)
-        index = XapianIndex(self.request)
-        index.indexPagesInNewThread(mode='add')
-
-        nuke_xapian_index(self.request)
-
-
 class TestGetSearcher(object):
 
     class Config(wikiconfig.Config):
@@ -466,7 +449,7 @@
         xapian_search = True
 
     def test_get_searcher(self):
-        assert isinstance(_get_searcher(self.request, ''), MoinSearch), 'Xapian index is not created, despite the configuration, MoinSearch mist be used!'
+        assert isinstance(_get_searcher(self.request, ''), MoinSearch), 'Xapian index is not created, despite the configuration, MoinSearch must be used!'
 
 coverage_modules = ['MoinMoin.search']
 
--- a/MoinMoin/search/builtin.py	Sun Nov 15 10:23:51 2009 +0100
+++ b/MoinMoin/search/builtin.py	Sun Nov 15 12:45:30 2009 +0100
@@ -17,7 +17,7 @@
 
 from MoinMoin import wikiutil, config, caching
 from MoinMoin.Page import Page
-from MoinMoin.util import lock, filesys
+from MoinMoin.util import filesys
 from MoinMoin.search.results import getSearchResults, Match, TextMatch, TitleMatch, getSearchResults
 
 ##############################################################################
@@ -96,9 +96,6 @@
 class BaseIndex(object):
     """ Represents a search engine index """
 
-    class LockedException(Exception):
-        pass
-
     def __init__(self, request):
         """
         @param request: current request
@@ -108,9 +105,6 @@
         self.dir = os.path.join(main_dir, 'index')
         if not os.path.exists(self.dir):
             os.makedirs(self.dir)
-        self.sig_file = os.path.join(main_dir, 'complete')
-        lock_dir = os.path.join(main_dir, 'index-lock')
-        self.lock = lock.WriteLock(lock_dir, timeout=3600.0, readlocktimeout=60.0)
         self.update_queue = IndexerQueue(request, main_dir, 'indexer-queue')
 
     def _main_dir(self):
@@ -118,7 +112,7 @@
 
     def exists(self):
         """ Check if index exists """
-        return os.path.exists(self.sig_file)
+        return os.path.exists(self.dir)
 
     def mtime(self):
         """ Modification time of the index """
@@ -152,60 +146,25 @@
         """
         self.update_queue.put(pagename, attachmentname, revno)
         if now:
-            self._do_queued_updates_InNewThread()
+            self.do_queued_updates()
 
     def indexPages(self, files=None, mode='update', pages=None):
         """ Index pages (and files, if given)
 
-        Can be called only from a script. To index pages during a user
-        request, use indexPagesInNewThread.
-
         @param files: iterator or list of files to index additionally
         @param mode: set the mode of indexing the pages, either 'update', 'add' or 'rebuild'
         @param pages: list of pages to index, if not given, all pages are indexed
         """
-        if not self.lock.acquire(1.0):
-            logging.warning("can't index: can't acquire lock")
-            return
-        try:
-            self._unsign()
-            start = time.time()
-            request = self._indexingRequest(self.request)
-            self._index_pages(request, files, mode, pages=pages)
-            logging.info("indexing completed successfully in %0.2f seconds." %
-                        (time.time() - start))
-            self._sign()
-        finally:
-            self.lock.release()
-
-    def indexPagesInNewThread(self, files=None, mode='update', pages=None):
-        """ Index pages in a new thread
-
-        Should be called from a user request. From a script, use indexPages.
-        """
-        # Prevent rebuilding the index just after it was finished
-        if self.exists():
-            return
-
-        from threading import Thread
-        indexThread = Thread(target=self._index_pages, args=(self.request, files, mode, pages))
-        indexThread.setDaemon(True)
-
-        # Join the index thread after current request finish, prevent
-        # Apache CGI from killing the process.
-        def joinDecorator(finish):
-            def func():
-                finish()
-                indexThread.join()
-            return func
-
-        self.request.finish = joinDecorator(self.request.finish)
-        indexThread.start()
+        start = time.time()
+        request = self._indexingRequest(self.request)
+        self._index_pages(request, files, mode, pages=pages)
+        logging.info("indexing completed successfully in %0.2f seconds." %
+                    (time.time() - start))
 
     def _index_pages(self, request, files=None, mode='update', pages=None):
         """ Index all pages (and all given files)
 
-        This should be called from indexPages or indexPagesInNewThread only!
+        This should be called from indexPages only!
 
         @param request: current request
         @param files: iterator or list of files to index additionally
@@ -216,48 +175,11 @@
         """
         raise NotImplemented('...')
 
-    def _do_queued_updates_InNewThread(self):
-        """ do queued index updates in a new thread
-
-        Should be called from a user request. From a script, use indexPages.
-        """
-        if not self.lock.acquire(1.0):
-            logging.warning("can't index: can't acquire lock")
-            return
-        try:
-            def lockedDecorator(f):
-                def func(*args, **kwargs):
-                    try:
-                        return f(*args, **kwargs)
-                    finally:
-                        self.lock.release()
-                return func
-
-            from threading import Thread
-            indexThread = Thread(
-                    target=lockedDecorator(self._do_queued_updates),
-                    args=(self._indexingRequest(self.request), ))
-            indexThread.setDaemon(True)
-
-            # Join the index thread after current request finish, prevent
-            # Apache CGI from killing the process.
-            def joinDecorator(finish):
-                def func():
-                    finish()
-                    indexThread.join()
-                return func
-
-            self.request.finish = joinDecorator(self.request.finish)
-            indexThread.start()
-        except:
-            self.lock.release()
-            raise
-
-    def _do_queued_updates(self, request, amount=5):
+    def do_queued_updates(self, amount=-1):
         """ Perform updates in the queues
 
         @param request: the current request
-        @keyword amount: how many updates to perform at once (default: 5)
+        @keyword amount: how many updates to perform at once (default: -1 == all)
         """
         raise NotImplemented('...')
 
@@ -311,22 +233,6 @@
         r.editlog = editlog.EditLog(r)
         return r
 
-    def _unsign(self):
-        """ Remove sig file - assume write lock acquired """
-        try:
-            os.remove(self.sig_file)
-        except OSError, err:
-            if err.errno != errno.ENOENT:
-                raise
-
-    def _sign(self):
-        """ Add sig file - assume write lock acquired """
-        f = file(self.sig_file, 'w')
-        try:
-            f.write('')
-        finally:
-            f.close()
-
 
 ##############################################################################
 ### Searching