changeset 5053:508135789e41

UpdateQueue: use caching for on-disk storage of the page queue
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Sat, 29 Aug 2009 23:32:30 +0200
parents 40e3e50d4b47
children f6f530720499
files MoinMoin/search/builtin.py
diffstat 1 files changed, 56 insertions(+), 115 deletions(-) [+]
line wrap: on
line diff
--- a/MoinMoin/search/builtin.py	Sat Aug 29 22:40:13 2009 +0200
+++ b/MoinMoin/search/builtin.py	Sat Aug 29 23:32:30 2009 +0200
@@ -5,7 +5,7 @@
     @copyright: 2005 MoinMoin:FlorianFesti,
                 2005 MoinMoin:NirSoffer,
                 2005 MoinMoin:AlexanderSchremmer,
-                2006-2008 MoinMoin:ThomasWaldmann,
+                2006-2009 MoinMoin:ThomasWaldmann,
                 2006 MoinMoin:FranzPletz
     @license: GNU GPL, see COPYING for details
 """
@@ -15,7 +15,7 @@
 from MoinMoin import log
 logging = log.getLogger(__name__)
 
-from MoinMoin import wikiutil, config
+from MoinMoin import wikiutil, config, caching
 from MoinMoin.Page import Page
 from MoinMoin.util import lock, filesys
 from MoinMoin.search.results import getSearchResults, Match, TextMatch, TitleMatch, getSearchResults
@@ -26,49 +26,57 @@
 
 
 class UpdateQueue:
-    """ Represents a locked page queue on the disk
-
-        XXX: check whether we just can use the caching module
+    """
+    Represents a locked page queue on the disk
     """
 
-    def __init__(self, f, lock_dir):
+    def __init__(self, request, xapian_dir, queuename, timeout=10.0):
         """
-        @param f: file to write to
-        @param lock_dir: directory to save the lock files
+        @param request: request object
+        @param xapian_dir: the xapian main directory
+        @param queuename: name of the queue (used for caching key)
+        @param timeout: lock acquire timeout
         """
-        self.file = f
-        self.writeLock = lock.WriteLock(lock_dir, timeout=10.0)
-        self.readLock = lock.ReadLock(lock_dir, timeout=10.0)
+        self.request = request
+        self.xapian_dir = xapian_dir
+        self.queuename = queuename
+        self.timeout = timeout
+
+    def get_cache(self, locking):
+        return caching.CacheEntry(self.request, self.xapian_dir, self.queuename,
+                                  scope='dir', use_pickle=True, do_locking=locking)
 
     def exists(self):
         """ Checks if the queue exists on the filesystem """
-        return os.path.exists(self.file)
+        cache = self.get_cache(locking=False)
+        return cache.exists()
+
+    def _queue(self, cache):
+        try:
+            queue = cache.content()
+        except caching.CacheError:
+            # likely nothing there yet
+            queue = []
+        return queue
+
+    def pages(self):
+        """ Return list of pages in the queue """
+        cache = self.get_cache(locking=True)
+        return self._queue(cache)
 
     def append(self, pagename):
         """ Append a page to queue
 
         @param pagename: string to save
         """
-        if not self.writeLock.acquire(60.0):
-            logging.warning("can't add %r to xapian update queue: can't lock queue" % pagename)
-            return
+        cache = self.get_cache(locking=False) # we lock manually
+        cache.lock('w', 60.0)
         try:
-            f = codecs.open(self.file, 'a', config.charset)
-            try:
-                f.write(pagename + "\n")
-            finally:
-                f.close()
+            queue = self._queue(cache)
+            queue.append(pagename)
+            cache.update(queue)
         finally:
-            self.writeLock.release()
-
-    def pages(self):
-        """ Return list of pages in the queue """
-        if self.readLock.acquire(1.0):
-            try:
-                return self._decode(self._read())
-            finally:
-                self.readLock.release()
-        return []
+            cache.unlock()
 
     def remove(self, pages):
         """ Remove pages from the queue
@@ -78,89 +86,24 @@
 
         @param pages: list of pagenames to remove
         """
-        if self.writeLock.acquire(30.0):
-            try:
-                queue = self._decode(self._read())
-                for page in pages:
-                    try:
-                        queue.remove(page)
-                    except ValueError:
-                        pass
-                if queue:
-                    self._write(queue)
-                else:
-                    self._removeFile()
-                return True
-            finally:
-                self.writeLock.release()
+        cache = self.get_cache(locking=False) # we lock manually
+        cache.lock('w', 60.0)
+        try:
+            queue = self._queue(cache)
+            for page in pages:
+                try:
+                    queue.remove(page)
+                except ValueError:
+                    pass
+            if queue:
+                cache.update(queue)
+            else:
+                cache.remove()
+            return True
+        finally:
+            cache.unlock()
         return False
 
-    # Private -------------------------------------------------------
-
-    def _decode(self, data):
-        """ Decode queue data
-
-        @param data: the data to decode
-        """
-        pages = data.splitlines()
-        return self._filterDuplicates(pages)
-
-    def _filterDuplicates(self, pages):
-        """ Filter duplicates in page list, keeping the order
-
-        @param pages: list of pages to filter
-        """
-        unique = []
-        seen = {}
-        for name in pages:
-            if not name in seen:
-                unique.append(name)
-                seen[name] = 1
-        return unique
-
-    def _read(self):
-        """ Read and return queue data
-
-        This does not do anything with the data so we can release the
-        lock as soon as possible, enabling others to update the queue.
-        """
-        try:
-            f = codecs.open(self.file, 'r', config.charset)
-            try:
-                return f.read()
-            finally:
-                f.close()
-        except (OSError, IOError), err:
-            if err.errno != errno.ENOENT:
-                raise
-            return ''
-
-    def _write(self, pages):
-        """ Write pages to queue file
-
-        Requires queue write locking.
-
-        @param pages: list of pages to write
-        """
-        # XXX use tmpfile/move for atomic replace on real operating systems
-        data = '\n'.join(pages) + '\n'
-        f = codecs.open(self.file, 'w', config.charset)
-        try:
-            f.write(data)
-        finally:
-            f.close()
-
-    def _removeFile(self):
-        """ Remove queue file
-
-        Requires queue write locking.
-        """
-        try:
-            os.remove(self.file)
-        except OSError, err:
-            if err.errno != errno.ENOENT:
-                raise
-
 
 class BaseIndex:
     """ Represents a search engine index """
@@ -181,10 +124,8 @@
         lock_dir = os.path.join(main_dir, 'index-lock')
         self.lock = lock.WriteLock(lock_dir, timeout=3600.0, readlocktimeout=60.0)
         #self.read_lock = lock.ReadLock(lock_dir, timeout=3600.0)
-        self.update_queue = UpdateQueue(os.path.join(main_dir, 'update-queue'),
-                                os.path.join(main_dir, 'update-queue-lock'))
-        self.remove_queue = UpdateQueue(os.path.join(main_dir, 'remove-queue'),
-                                os.path.join(main_dir, 'remove-queue-lock'))
+        self.update_queue = UpdateQueue(request, main_dir, 'update-queue')
+        self.remove_queue = UpdateQueue(request, main_dir, 'remove-queue')
 
         # Disabled until we have a sane way to build the index with a
         # queue in small steps.