changeset 5276:195db0fdbb80

Fixed and cleaned up Xapian based search (details below) Queue(s) for delayed indexing: We had 2 queues: update-queue and remove-queue. Problem: keeping correct order, doing the right thing. The new 1 queue system fixes and simplifies this by just storing hints like (pagename, attachname, revno) and the indexer then later finds out itself what it has to do (depending on the state the hinted item has THEN: update item in index, remove item from index, etc.). To queue a hint, just call XapianIndex.update_item(pagename, filename, revno). The new IndexerQueue has a simpler interface: put() to add hints to the queue, get() to get (and remove) a hint from the queue. tests: fixed some typos General cleanup, better API consistency, rename some methods *_page to *_item.
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Sat, 14 Nov 2009 13:30:20 +0100
parents adb58cd3ece2
children 110efceb9e28
files MoinMoin/action/AttachFile.py MoinMoin/events/xapian_index.py MoinMoin/search/Xapian/indexing.py MoinMoin/search/_tests/test_search.py MoinMoin/search/builtin.py
diffstat 5 files changed, 199 insertions(+), 198 deletions(-) [+]
line wrap: on
line diff
--- a/MoinMoin/action/AttachFile.py	Fri Nov 13 07:25:08 2009 +0100
+++ b/MoinMoin/action/AttachFile.py	Sat Nov 14 13:30:20 2009 +0100
@@ -627,7 +627,7 @@
         from MoinMoin.search.Xapian import XapianIndex
         index = XapianIndex(request)
         if index.exists:
-            index.remove_item(pagename, filename)
+            index.update_item(pagename, filename)
 
     upload_form(pagename, request, msg=_("Attachment '%(filename)s' deleted.") % {'filename': filename})
 
--- a/MoinMoin/events/xapian_index.py	Fri Nov 13 07:25:08 2009 +0100
+++ b/MoinMoin/events/xapian_index.py	Sat Nov 14 13:30:20 2009 +0100
@@ -22,8 +22,8 @@
     if request.cfg.xapian_search:
         index = _get_index(request)
         if index and index.exists():
-            index.remove_item(event.old_page.page_name, now=0)
-            index.update_page(event.page.page_name)
+            index.update_item(event.old_page.page_name, now=False)
+            index.update_item(event.page.page_name)
 
 
 def handle_copied(event):
@@ -34,9 +34,9 @@
     if request.cfg.xapian_search:
         index = _get_index(request)
         if index and index.exists():
-            index.update_page(event.page.page_name)
+            index.update_item(event.page.page_name)
 
-def handle_changed(event, deleted=False):
+def handle_changed(event):
     """Updates Xapian index when a page is changed"""
 
     request = event.request
@@ -44,16 +44,13 @@
     if request.cfg.xapian_search:
         index = _get_index(request)
         if index and index.exists():
-            if deleted:
-                index.remove_item(event.page.page_name)
-            else:
-                index.update_page(event.page.page_name)
+            index.update_item(event.page.page_name)
 
 
 def handle_deleted(event):
     """Updates Xapian index when a page is deleted"""
     event = ev.PageChangedEvent(event.request, event.page, event.comment)
-    handle_changed(event, deleted=True)
+    handle_changed(event)
 
 
 def handle_attached(event):
@@ -64,7 +61,7 @@
     if request.cfg.xapian_search:
         index = _get_index(request)
         if index and index.exists():
-            index.update_page(event.pagename)
+            index.update_item(event.pagename, event.filename)
 
 
 def handle(event):
--- a/MoinMoin/search/Xapian/indexing.py	Fri Nov 13 07:25:08 2009 +0100
+++ b/MoinMoin/search/Xapian/indexing.py	Sat Nov 14 13:30:20 2009 +0100
@@ -48,20 +48,22 @@
 
 class MoinSearchConnection(xappy.SearchConnection):
 
-    def get_all_documents(self):
+    def get_all_documents(self, query=None):
         """
-        Return all the documents in the xapian index.
+        Return all the documents in the index (that match query, if given).
         """
         document_count = self.get_doccount()
-        query = self.query_all()
+        query = query or self.query_all()
         hits = self.search(query, 0, document_count)
         return hits
 
-    def get_all_documents_with_field(self, field, field_value):
-        document_count = self.get_doccount()
-        query = self.query_field(field, field_value)
-        hits = self.search(query, 0, document_count)
-        return hits
+    def get_all_documents_with_fields(self, **fields):
+        """
+        Return all the documents in the index (that match the field=value kwargs given).
+        """
+        field_queries = [self.query_field(field, value) for field, value in fields.iteritems()]
+        query = self.query_composite(self.OP_AND, field_queries)
+        return self.get_all_documents(query)
 
 
 class MoinIndexerConnection(xappy.IndexerConnection):
@@ -169,22 +171,24 @@
         self.touch()
         connection = MoinIndexerConnection(self.dir)
         try:
-            # do all page updates
-            pages = self.update_queue.pages()[:amount]
-            logging.debug("update queue: updating %d pages+attachments in index..." % len(pages))
-            for name in pages:
-                self._index_page(request, connection, name, mode='update')
-                self.update_queue.remove([name])
-
-            # do page/attachment removals
-            items = self.remove_queue.pages()[:amount]
-            logging.debug("remove queue: removing %d pages/attachments from index..." % len(items))
-            for item in items:
-                assert len(item.split('//')) == 2
-                pagename, attachment = item.split('//')
-                page = Page(request, pagename)
-                self._remove_item(request, connection, page, attachment)
-                self.remove_queue.remove([item])
+            for i in range(amount):
+                try:
+                    pagename, attachmentname, revno = self.update_queue.get()
+                except IndexError:
+                    # queue empty
+                    break
+                else:
+                    logging.debug("got from indexer queue: %r %r %r" % (pagename, attachmentname, revno))
+                    if not attachmentname:
+                        if revno is None:
+                            # generic "index this page completely, with attachments" request
+                            self._index_page(request, connection, pagename, mode='update')
+                        else:
+                            # "index this page revision" request
+                            self._index_page_rev(request, connection, pagename, revno, mode='update')
+                    else:
+                        # "index this attachment" request
+                        self._index_attachment(request, connection, pagename, attachmentname, mode='update')
         finally:
             connection.close()
 
@@ -227,43 +231,6 @@
             for value in values:
                 document.fields.append(xappy.Field(field, value))
 
-    def _index_file(self, request, connection, filename, mode='update'):
-        """ index a file as it were a page named pagename
-            Assumes that the write lock is acquired
-        """
-        fields = {}
-        multivalued_fields = {}
-
-        wikiname = request.cfg.interwikiname or u"Self"
-        fs_rootpage = 'FS' # XXX FS hardcoded
-
-        try:
-            itemid = "%s:%s" % (wikiname, os.path.join(fs_rootpage, filename))
-            mtime = wikiutil.timestamp2version(os.path.getmtime(filename))
-
-            doc = self._get_document(connection, itemid, mtime, mode)
-            logging.debug("%s %r" % (filename, doc))
-
-            if doc:
-                mimetype, file_content = self.contentfilter(filename)
-
-                fields['wikiname'] = wikiname
-                fields['pagename'] = fs_rootpage
-                fields['attachment'] = filename # XXX we should treat files like real pages, not attachments
-                fields['mtime'] = str(mtime)
-                fields['revision'] = '0'
-                fields['title'] = " ".join(os.path.join(fs_rootpage, filename).split("/"))
-                fields['content'] = file_content
-
-                multivalued_fields['mimetype'] = [mt for mt in [mimetype] + mimetype.split('/')]
-
-                self._add_fields_to_document(request, doc, fields, multivalued_fields)
-
-                connection.replace(doc)
-
-        except (OSError, IOError, UnicodeError):
-            logging.exception("_index_file crashed:")
-
     def _get_languages(self, page):
         """ Get language of a page and the language to stem it in
 
@@ -325,74 +292,67 @@
     def _index_page(self, request, connection, pagename, mode='update'):
         """ Index a page - assumes that the write lock is acquired
 
+        Index all revisions (if wanted by configuration) and all attachments.
+
         @arg connection: the Indexer connection object
         @arg pagename: a page name
         @arg mode: 'add' = just add, no checks
                    'update' = check if already in index and update if needed (mtime)
         """
         page = Page(request, pagename)
-        if request.cfg.xapian_index_history:
-            for rev in page.getRevList():
-                updated = self._index_page_rev(request, connection, Page(request, pagename, rev=rev), mode=mode)
-                logging.debug("updated page %r rev %d (updated==%r)" % (pagename, rev, updated))
+        revlist = page.getRevList() # recent revs first, does not include deleted revs
+
+        if not revlist:
+            # we have an empty revision list, that means the page is not there any more,
+            # likely it (== all of its revisions, all of its attachments) got either renamed or nuked
+            wikiname = request.cfg.interwikiname or u'Self'
+
+            sc = MoinSearchConnection(self.dir)
+            docs_to_delete = sc.get_all_documents_with_fields(wikiname=wikiname, pagename=pagename)
+                                                              # any page rev, any attachment
+            sc.close()
+
+            for doc in docs_to_delete:
+                connection.delete(doc.id)
+            logging.debug('page %s (all revs, all attachments) removed from xapian index' % pagename)
+
+        else:
+            if request.cfg.xapian_index_history:
+                index_revs, remove_revs = revlist, []
+            else:
+                if page.exists(): # is current rev not deleted?
+                    index_revs, remove_revs = revlist[:1], revlist[1:]
+                else:
+                    index_revs, remove_revs = [], revlist
+
+            for revno in index_revs:
+                updated = self._index_page_rev(request, connection, pagename, revno, mode=mode)
+                logging.debug("updated page %r rev %d (updated==%r)" % (pagename, revno, updated))
                 if not updated:
                     # we reached the revisions that are already present in the index
                     break
-        else:
-            self._index_page_rev(request, connection, page, mode=mode)
-
-        self._index_attachments(request, connection, pagename, mode)
-
-    def _index_attachments(self, request, connection, pagename, mode='update'):
-        from MoinMoin.action import AttachFile
-
-        fields = {}
-        multivalued_fields = {}
-
-        wikiname = request.cfg.interwikiname or u"Self"
-        page = Page(request, pagename)
-
-        for att in AttachFile._get_files(request, pagename):
-            itemid = "%s:%s//%s" % (wikiname, pagename, att)
-            filename = AttachFile.getFilename(request, pagename, att)
-            mtime = wikiutil.timestamp2version(os.path.getmtime(filename))
 
-            doc = self._get_document(connection, itemid, mtime, mode)
-            logging.debug("%s %s %r" % (pagename, att, doc))
-
-            if doc:
-                mimetype, att_content = self.contentfilter(filename)
+            for revno in remove_revs:
+                # XXX remove_revs can be rather long for pages with many revs and
+                # XXX most page revs usually will be already deleted. optimize?
+                self._remove_page_rev(request, connection, pagename, revno)
+                logging.debug("removed page %r rev %d" % (pagename, revno))
 
-                fields['wikiname'] = wikiname
-                fields['pagename'] = pagename
-                fields['attachment'] = att
-                fields['mtime'] = str(mtime)
-                fields['revision'] = '0'
-                fields['title'] = '%s/%s' % (pagename, att)
-                fields['content'] = att_content
-                fields['fulltitle'] = pagename
-                fields['lang'], fields['stem_lang'] = self._get_languages(page)
+            from MoinMoin.action import AttachFile
+            for attachmentname in AttachFile._get_files(request, pagename):
+                self._index_attachment(request, connection, pagename, attachmentname, mode)
 
-                multivalued_fields['mimetype'] = [mt for mt in [mimetype] + mimetype.split('/')]
-                multivalued_fields['domain'] = self._get_domains(page)
-
-                self._add_fields_to_document(request, doc, fields, multivalued_fields)
-
-                connection.replace(doc)
-
-    def _index_page_rev(self, request, connection, page, mode='update'):
+    def _index_page_rev(self, request, connection, pagename, revno, mode='update'):
         """ Index a page revision - assumes that the write lock is acquired
 
         @arg connection: the Indexer connection object
-        @arg page: a page object
+        @arg pagename: the page name
+        @arg revno: page revision number (int)
         @arg mode: 'add' = just add, no checks
                    'update' = check if already in index and update if needed (mtime)
         """
-        request.page = page
-        pagename = page.page_name
-
-        fields = {}
-        multivalued_fields = {}
+        page = Page(request, pagename, rev=revno)
+        request.page = page # XXX for what is this needed?
 
         wikiname = request.cfg.interwikiname or u"Self"
         revision = str(page.get_real_rev())
@@ -405,6 +365,7 @@
         if doc:
             mimetype = 'text/%s' % page.pi['format']  # XXX improve this
 
+            fields = {}
             fields['wikiname'] = wikiname
             fields['pagename'] = pagename
             fields['attachment'] = '' # this is a real page, not an attachment
@@ -416,6 +377,7 @@
             fields['lang'], fields['stem_lang'] = self._get_languages(page)
             fields['author'] = page.edit_info().get('editor', '?')
 
+            multivalued_fields = {}
             multivalued_fields['mimetype'] = [mt for mt in [mimetype] + mimetype.split('/')]
             multivalued_fields['domain'] = self._get_domains(page)
             multivalued_fields['linkto'] = page.getPageLinks(request)
@@ -431,25 +393,98 @@
 
         return bool(doc)
 
-    def _remove_item(self, request, connection, page, attachment=None):
-        wikiname = request.cfg.interwikiname or u'Self'
-        pagename = page.page_name
+    def _remove_page_rev(self, request, connection, pagename, revno):
+        """ Remove a page revision from the index - assumes that the write lock is acquired
 
-        if not attachment:
-            search_connection = MoinSearchConnection(self.dir)
-            docs_to_delete = search_connection.get_all_documents_with_field('fulltitle', pagename)
-            ids_to_delete = [d.id for d in docs_to_delete]
-            search_connection.close()
+        @arg connection: the Indexer connection object
+        @arg pagename: the page name
+        @arg revno: a real revision number (int), > 0
+        @arg mode: 'add' = just add, no checks
+                   'update' = check if already in index and update if needed (mtime)
+        """
+        wikiname = request.cfg.interwikiname or u"Self"
+        revision = str(revno)
+        itemid = "%s:%s:%s" % (wikiname, pagename, revision)
+        connection.delete(itemid)
+        logging.debug('page %s, revision %d removed from index' % (pagename, revno))
 
-            for id_ in ids_to_delete:
-                connection.delete(id_)
-                logging.debug('page %s removed from xapian index' % pagename)
+    def _index_attachment(self, request, connection, pagename, attachmentname, mode='update'):
+        """ Index an attachment
+        """
+        from MoinMoin.action import AttachFile
+        wikiname = request.cfg.interwikiname or u"Self"
+        itemid = "%s:%s//%s" % (wikiname, pagename, attachmentname)
+
+        filename = AttachFile.getFilename(request, pagename, attachmentname)
+        # check if the file is still there. as we might be doing queued index updates,
+        # the file could be gone meanwhile...
+        if os.path.exists(filename):
+            mtime = wikiutil.timestamp2version(os.path.getmtime(filename))
+            doc = self._get_document(connection, itemid, mtime, mode)
+            logging.debug("%s %s %r" % (pagename, attachmentname, doc))
+            if doc:
+                page = Page(request, pagename)
+                mimetype, att_content = self.contentfilter(filename)
+
+                fields = {}
+                fields['wikiname'] = wikiname
+                fields['pagename'] = pagename
+                fields['attachment'] = attachmentname
+                fields['mtime'] = str(mtime)
+                fields['revision'] = '0'
+                fields['title'] = '%s/%s' % (pagename, attachmentname)
+                fields['content'] = att_content
+                fields['fulltitle'] = pagename
+                fields['lang'], fields['stem_lang'] = self._get_languages(page)
+
+                multivalued_fields = {}
+                multivalued_fields['mimetype'] = [mt for mt in [mimetype] + mimetype.split('/')]
+                multivalued_fields['domain'] = self._get_domains(page)
+
+                self._add_fields_to_document(request, doc, fields, multivalued_fields)
+
+                connection.replace(doc)
+                logging.debug('attachment %s (page %s) updated in index' % (attachmentname, pagename))
         else:
-            # Only remove a single attachment
-            id_ = "%s:%s//%s" % (wikiname, pagename, attachment)
-            connection.delete(id_)
+            # attachment file was deleted, remove it from index also
+            connection.delete(itemid)
+            logging.debug('attachment %s (page %s) removed from index' % (attachmentname, pagename))
 
-            logging.debug('attachment %s (page %s) removed from index' % (attachment, pagename))
+    def _index_file(self, request, connection, filename, mode='update'):
+        """ index a file as it were a page named pagename
+            Assumes that the write lock is acquired
+        """
+        wikiname = request.cfg.interwikiname or u"Self"
+        fs_rootpage = 'FS' # XXX FS hardcoded
+
+        try:
+            itemid = "%s:%s" % (wikiname, os.path.join(fs_rootpage, filename))
+            mtime = wikiutil.timestamp2version(os.path.getmtime(filename))
+
+            doc = self._get_document(connection, itemid, mtime, mode)
+            logging.debug("%s %r" % (filename, doc))
+
+            if doc:
+                mimetype, file_content = self.contentfilter(filename)
+
+                fields = {}
+                fields['wikiname'] = wikiname
+                fields['pagename'] = fs_rootpage
+                fields['attachment'] = filename # XXX we should treat files like real pages, not attachments
+                fields['mtime'] = str(mtime)
+                fields['revision'] = '0'
+                fields['title'] = " ".join(os.path.join(fs_rootpage, filename).split("/"))
+                fields['content'] = file_content
+
+                multivalued_fields = {}
+                multivalued_fields['mimetype'] = [mt for mt in [mimetype] + mimetype.split('/')]
+
+                self._add_fields_to_document(request, doc, fields, multivalued_fields)
+
+                connection.replace(doc)
+
+        except (OSError, IOError, UnicodeError):
+            logging.exception("_index_file crashed:")
 
     def _index_pages(self, request, files=None, mode='update', pages=None):
         """ Index pages (and all given files)
--- a/MoinMoin/search/_tests/test_search.py	Fri Nov 13 07:25:08 2009 +0100
+++ b/MoinMoin/search/_tests/test_search.py	Sat Nov 14 13:30:20 2009 +0100
@@ -280,7 +280,7 @@
         assert len(result.hits) == 1
 
     def test_create_page(self):
-        self.pages['TestCreatePage'] = 'some text' # Moin serarch must search this page
+        self.pages['TestCreatePage'] = 'some text' # Moin search must search this page
         try:
             create_page(self.request, 'TestCreatePage', self.pages['TestCreatePage'])
             self._wait_for_index_update()
@@ -293,7 +293,7 @@
 
     def test_attachment(self):
         page_name = u'TestAttachment'
-        self.pages[page_name] = 'some text' # Moin serarch must search this page
+        self.pages[page_name] = 'some text' # Moin search must search this page
 
         filename = "AutoCreatedSillyAttachmentForSearching.png"
         data = "Test content"
--- a/MoinMoin/search/builtin.py	Fri Nov 13 07:25:08 2009 +0100
+++ b/MoinMoin/search/builtin.py	Sat Nov 14 13:30:20 2009 +0100
@@ -25,9 +25,15 @@
 ##############################################################################
 
 
-class PageQueue(object):
+class IndexerQueue(object):
     """
-    Represents a locked page queue on the disk
+    Represents a locked on-disk queue with jobs for the xapian indexer
+
+    Each job is a tuple like: (PAGENAME, ATTACHMENTNAME, REVNO)
+    PAGENAME: page name (unicode)
+    ATTACHMENTNAME: attachment name (unicode) or None (for pages)
+    REVNO: revision number (int) - meaning "look at that revision",
+           or None - meaning "look at all revisions"
     """
 
     def __init__(self, request, xapian_dir, queuename, timeout=10.0):
@@ -46,11 +52,6 @@
         return caching.CacheEntry(self.request, self.xapian_dir, self.queuename,
                                   scope='dir', use_pickle=True, do_locking=locking)
 
-    def exists(self):
-        """ Checks if the queue exists on the filesystem """
-        cache = self.get_cache(locking=False)
-        return cache.exists()
-
     def _queue(self, cache):
         try:
             queue = cache.content()
@@ -59,50 +60,37 @@
             queue = []
         return queue
 
-    def pages(self):
-        """ Return list of pages in the queue """
-        cache = self.get_cache(locking=True)
-        return self._queue(cache)
+    def put(self, pagename, attachmentname=None, revno=None):
+        """ Put an entry into the queue (append at end)
 
-    def append(self, pagename):
-        """ Append a page to queue
-
-        @param pagename: string to save
+        @param pagename: page name [unicode]
+        @param attachmentname: attachment name [unicode]
+        @param revno: revision number (int) or None (all revs)
         """
         cache = self.get_cache(locking=False) # we lock manually
         cache.lock('w', 60.0)
         try:
             queue = self._queue(cache)
-            queue.append(pagename)
+            entry = (pagename, attachmentname, revno)
+            queue.append(entry)
             cache.update(queue)
         finally:
             cache.unlock()
 
-    def remove(self, pages):
-        """ Remove pages from the queue
+    def get(self):
+        """ Get (and remove) first entry from the queue
 
-        When the queue is empty, the queue file is removed, so exists()
-        can tell if there is something waiting in the queue.
-
-        @param pages: list of pagenames to remove
+        Raises IndexError if queue was empty when calling get().
         """
         cache = self.get_cache(locking=False) # we lock manually
         cache.lock('w', 60.0)
         try:
             queue = self._queue(cache)
-            for page in pages:
-                try:
-                    queue.remove(page)
-                except ValueError:
-                    pass
-            if queue:
-                cache.update(queue)
-            else:
-                cache.remove()
-            return True
+            entry = queue.pop(0)
+            cache.update(queue)
         finally:
             cache.unlock()
-        return False
+        return entry
 
 
 class BaseIndex(object):
@@ -123,8 +111,7 @@
         self.sig_file = os.path.join(main_dir, 'complete')
         lock_dir = os.path.join(main_dir, 'index-lock')
         self.lock = lock.WriteLock(lock_dir, timeout=3600.0, readlocktimeout=60.0)
-        self.update_queue = PageQueue(request, main_dir, 'update-queue')
-        self.remove_queue = PageQueue(request, main_dir, 'remove-queue')
+        self.update_queue = IndexerQueue(request, main_dir, 'indexer-queue')
 
     def _main_dir(self):
         raise NotImplemented('...')
@@ -155,24 +142,15 @@
         """
         return self._search(query, **kw)
 
-    def update_page(self, pagename, now=1):
-        """ Update a single page in the index
+    def update_item(self, pagename, attachmentname=None, revno=None, now=True):
+        """ Update a single item (page or attachment) in the index
 
         @param pagename: the name of the page to update
-        @keyword now: do all updates now (default: 1)
+        @param attachmentname: the name of the attachment to update
+        @param revno: a specific revision number (int) or None (all revs)
+        @param now: do all updates now (default: True)
         """
-        self.update_queue.append(pagename)
-        if now:
-            self._do_queued_updates_InNewThread()
-
-    def remove_item(self, pagename, attachment=None, now=1):
-        """ Removes a page and all its revisions or a single attachment
-
-        @param pagename: name of the page to be removed
-        @keyword attachment: optional, only remove this attachment of the page
-        @keyword now: do all updates now (default: 1)
-        """
-        self.remove_queue.append('%s//%s' % (pagename, attachment or ''))
+        self.update_queue.put(pagename, attachmentname, revno)
         if now:
             self._do_queued_updates_InNewThread()
 
@@ -241,15 +219,6 @@
         """
         raise NotImplemented('...')
 
-    def _remove_item(self, writer, page, attachment=None):
-        """ Remove a page and all its revisions from the index or just
-            an attachment of that page
-
-        @param pagename: name of the page to remove
-        @keyword attachment: optionally, just remove this attachment
-        """
-        raise NotImplemented('...')
-
     def _do_queued_updates_InNewThread(self):
         """ do queued index updates in a new thread