changeset 5750:021c1f6d3272

experimental queued indexing support to work around memory leak you can use this like this: #!/bin/sh # put all pages into indexer queue: python MoinMoin/script/moin.py index build --mode=makequeue # take <count> pages out of the indexer queue and index them, # you need to repeat the following line respective to the total amount of pages # in your wiki (doing more indexer calls does no harm, doing less means an # incomplete index): python MoinMoin/script/moin.py index build --mode=buildnewindexqueued --count=500 python MoinMoin/script/moin.py index build --mode=buildnewindexqueued --count=500 python MoinMoin/script/moin.py index build --mode=buildnewindexqueued --count=500 # ... # switch to new index: python MoinMoin/script/moin.py index build --mode=usenewindex
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Sun, 12 Dec 2010 18:45:12 +0100
parents 5d5ec86e40a2
children e4479bf1c820
files MoinMoin/script/index/build.py MoinMoin/search/Xapian/indexing.py MoinMoin/search/builtin.py
diffstat 3 files changed, 110 insertions(+), 16 deletions(-) [+]
line wrap: on
line diff
--- a/MoinMoin/script/index/build.py	Wed Dec 08 00:18:35 2010 +0100
+++ b/MoinMoin/script/index/build.py	Sun Dec 12 18:45:12 2010 +0100
@@ -71,6 +71,10 @@
             help="either add (unconditionally add), update (conditional update), rebuild (complete 1-stage index rebuild)"
                  " or buildnewindex and usenewindex (complete 2-stage index rebuild)"
         )
+        self.parser.add_option(
+            "--count", metavar="COUNT", dest="count",
+            help="for queued indexing only: how many queue entries to process in this indexing run"
+        )
 
     def mainloop(self):
         self.init_request()
@@ -87,23 +91,30 @@
     def command(self):
         from MoinMoin.search.Xapian import XapianIndex
         mode = self.options.mode
-        if mode in ('rebuild', 'buildnewindex'):
+        if mode in ['rebuild', 'buildnewindex', 'makequeue', 'buildnewindexqueued', ]:
             # rebuilding the DB into a new index directory, so the rebuild
             # process does not interfere with the currently in-use DB
             idx_mode, idx_name = 'add', 'index.new'
-        elif mode in ('add', 'update'):
+        elif mode in ['add', 'update', ]:
             # update/add in-place
             idx_mode, idx_name = mode, 'index'
-        elif mode == 'usenewindex':
+        elif mode in ['usenewindex', ]:
             pass # nothing todo
         else:
             pass # XXX give error msg about invalid mode
 
-        if mode != 'usenewindex':
+        if mode in ['makequeue', ]:
             idx = XapianIndex(self.request, name=idx_name)
-            idx.indexPages(self.files, idx_mode)
+            idx.queuePages()
 
-        if mode in ('rebuild', 'usenewindex'):
+        if mode in ['rebuild', 'buildnewindex', 'buildnewindexqueued', ]:
+            idx = XapianIndex(self.request, name=idx_name)
+            if mode == 'buildnewindexqueued':
+                idx.indexPagesQueued(int(self.options.count))
+            else:
+                idx.indexPages(self.files, idx_mode)
+
+        if mode in ['rebuild', 'usenewindex', ]:
             # 'rebuild' is still a bit dirty, because just killing old index will
             # fail currently running searches. Thus, maybe do this in a time
             # with litte wiki activity or better use 'buildnewindex' and
--- a/MoinMoin/search/Xapian/indexing.py	Wed Dec 08 00:18:35 2010 +0100
+++ b/MoinMoin/search/Xapian/indexing.py	Sun Dec 12 18:45:12 2010 +0100
@@ -194,6 +194,7 @@
             request = self._indexingRequest(self.request)
             connection = self.get_indexer_connection()
             self.touch()
+            total = amount
             try:
                 done_count = 0
                 while amount:
@@ -205,17 +206,23 @@
                         # queue empty
                         break
                     else:
-                        logging.debug("got from indexer queue: %r %r %r" % (pagename, attachmentname, revno))
-                        if not attachmentname:
-                            if revno is None:
-                                # generic "index this page completely, with attachments" request
-                                self._index_page(request, connection, pagename, mode='update')
+                        logging.info("got from indexer queue: %r %r %r [%d/%d]" % (
+                            pagename, attachmentname, revno,
+                            done_count, total))
+                        if pagename:
+                            if not attachmentname:
+                                if revno is None:
+                                    # generic "index this page completely, with attachments" request
+                                    self._index_page(request, connection, pagename, mode='update')
+                                else:
+                                    # "index this page revision" request
+                                    self._index_page_rev(request, connection, pagename, revno, mode='update')
                             else:
-                                # "index this page revision" request
-                                self._index_page_rev(request, connection, pagename, revno, mode='update')
-                        else:
-                            # "index this attachment" request
-                            self._index_attachment(request, connection, pagename, attachmentname, mode='update')
+                                # "index this attachment" request
+                                self._index_attachment(request, connection, pagename, attachmentname, mode='update')
+                        else: # pagename == None
+                            # index an additional filesystem file (full path given in attachmentname)
+                            self._index_file(request, connection, attachmentname, mode='update')
                         done_count += 1
             finally:
                 logging.debug("updated xapian index with %d queued updates" % done_count)
@@ -223,6 +230,7 @@
         except XapianDatabaseLockError:
             # another indexer has locked the index, we can retry it later...
             logging.debug("can't lock xapian index, not doing queued updates now")
+        return done_count
 
     def _get_document(self, connection, doc_id, mtime, mode):
         do_index = False
@@ -528,6 +536,27 @@
 
         except (OSError, IOError, UnicodeError):
             logging.exception("_index_file crashed:")
+    
+    def _queue_pages(self, request, files=None, pages=None):
+        """ Put all (given) pages into indexer queue
+
+        This should be called from queuePages only!
+
+        @param request: request suitable for indexing
+        @param files: an optional list of files to index
+        @param pages: list of pages to index, if not given, all pages are indexed
+        """
+        if pages is None:
+            # Index all pages
+            pages = request.rootpage.getPageList(user='', exists=1)
+
+        logging.info("queuing %d pages..." % len(pages))
+        entries = [(pagename, None, None) for pagename in pages]
+        self.update_queue.mput(entries)
+        if files:
+            logging.info("indexing all files...")
+            entries = [(None, fname.strip(), None) for fname in files]
+            self.update_queue.mput(entries)
 
     def _index_pages(self, request, files=None, mode='update', pages=None):
         """ Index all (given) pages (and all given files)
--- a/MoinMoin/search/builtin.py	Wed Dec 08 00:18:35 2010 +0100
+++ b/MoinMoin/search/builtin.py	Sun Dec 12 18:45:12 2010 +0100
@@ -59,6 +59,23 @@
             queue = []
         return queue
 
+    def mput(self, entries):
+        """ Put multiple entries into the queue (append at end)
+
+        @param entries: list of tuples (pagename, attachmentname, revno)
+                        pagename: page name [unicode]
+                        attachmentname: attachment name [unicode or None]
+                        revision number (int) or None (all revs)
+        """
+        cache = self.get_cache(locking=False) # we lock manually
+        cache.lock('w', 60.0)
+        try:
+            queue = self._queue(cache)
+            queue.extend(entries)
+            cache.update(queue)
+        finally:
+            cache.unlock()
+
     def put(self, pagename, attachmentname=None, revno=None):
         """ Put an entry into the queue (append at end)
 
@@ -76,6 +93,22 @@
         finally:
             cache.unlock()
 
+    def mget(self, count):
+        """ Get (and remove) first <count> entries from the queue
+
+        Raises IndexError if queue was empty when calling get().
+        """
+        cache = self.get_cache(locking=False) # we lock manually
+        cache.lock('w', 60.0)
+        try:
+            queue = self._queue(cache)
+            entries = queue[:count]
+            queue = queue[count:]
+            cache.update(queue)
+        finally:
+            cache.unlock()
+        return entries
+
     def get(self):
         """ Get (and remove) first entry from the queue
 
@@ -146,6 +179,27 @@
         if now:
             self.do_queued_updates()
 
+    def queuePages(self, files=None, pages=None):
+        """ Put pages (and files, if given) into indexer queue
+
+        @param files: iterator or list of files to index additionally
+        @param mode: set the mode of indexing the pages, either 'update' or 'add'
+        @param pages: list of pages to index, if not given, all pages are indexed
+        """
+        start = time.time()
+        request = self._indexingRequest(self.request)
+        self._queue_pages(request, files, pages)
+        logging.info("queuing completed successfully in %0.2f seconds." %
+                    (time.time() - start))
+
+    def indexPagesQueued(self, count=-1):
+        """ Index <count> queued pages (and/or files)
+        """
+        start = time.time()
+        done_count = self.do_queued_updates(count)
+        logging.info("indexing %d items completed successfully in %0.2f seconds." %
+                    (done_count, time.time() - start))
+
     def indexPages(self, files=None, mode='update', pages=None):
         """ Index pages (and files, if given)