view MoinMoin/lupy.py @ 0:77665d8e2254

tag of nonpublic@localhost--archive/moin--enterprise--1.5--base-0 (automatically generated log message) imported from: moin--main--1.5--base-0
author Thomas Waldmann <tw-public@gmx.de>
date Thu, 22 Sep 2005 15:09:50 +0000
parents
children 368e6a8e4f98
line wrap: on
line source
# -*- coding: iso-8859-1 -*-
"""
    MoinMoin - lupy indexing search engine

    @copyright: 2005 by Florian Festi, Nir Soffer
    @license: GNU GPL, see COPYING for details.
"""

import os, re, codecs, errno, time

from MoinMoin.Page import Page
from MoinMoin import config
from MoinMoin.util import filesys, lock
from MoinMoin.support.lupy.index.term import Term
from MoinMoin.support.lupy import document
from MoinMoin.support.lupy.index.indexwriter import IndexWriter
from MoinMoin.support.lupy.search.indexsearcher import IndexSearcher

##############################################################################
### Tokenizer
##############################################################################

word_re = re.compile(r"\w+", re.U)
wikiword_re = re.compile(r"^([%(u)s][%(l)s]+)+$" % {'u': config.chars_upper,
                                                'l': config.chars_lower}, re.U)
singleword_re = re.compile(r"[%(u)s][%(l)s]+" % {'u': config.chars_upper,
                                             'l': config.chars_lower}, re.U)

token_re = re.compile(
    r"(?P<company>\w+[&@]\w+)|" + #company names like AT&T and Excite@Home.
    r"(?P<email>\w+([.-]\w+)*@\w+([.-]\w+)*)|" +    # email addresses
    r"(?P<hostname>\w+(\.\w+)+)|" +                 # hostnames
    r"(?P<num>(\w+[-/.,])*\w*\d\w*([-/.,]\w+)*)|" + # version numbers
    r"(?P<acronym>(\w\.)+)|" +          # acronyms: U.S.A., I.B.M., etc.
    r"(?P<word>\w+)",                   # words
    re.U)

dot_re = re.compile(r"[-_/,.]")
mail_re = re.compile(r"[-_/,.]|(@)")

def tokenizer(value):
    """Yield a stream of lower cased words from a string."""
    if isinstance(value, list): # used for page links
        for v in value:
            yield v
    else:
        tokenstream = re.finditer(token_re, value)
        for m in tokenstream:
            if m.group("acronym"):
                yield m.group("acronym").replace('.','').lower()
            elif m.group("company"):
                yield m.group("company").lower()
            elif m.group("email"):
                for word in mail_re.split(m.group("email").lower()):
                    if word:
                        yield word
            elif m.group("hostname"):                
                for word in dot_re.split(m.group("hostname").lower()):
                    yield word
            elif m.group("num"):
                for word in dot_re.split(m.group("num").lower()):
                    yield word
            elif m.group("word"):
                if wikiword_re.match(m.group("word")):
                    for sm in re.finditer(singleword_re, m.group()):
                        yield sm.group().lower()
                else:
                    yield  m.group("word").lower()


#############################################################################
### Indexing
#############################################################################

class UpdateQueue:
    def __init__(self, file, lock_dir):
        self.file = file
        self.writeLock = lock.WriteLock(lock_dir, timeout=10.0)
        self.readLock = lock.ReadLock(lock_dir, timeout=10.0)

    def exists(self):
        return os.path.exists(self.file)

    def append(self, pagename):
        """ Append a page to queue 
        
        TODO: tune timeout
        """
        if not self.writeLock.acquire(60.0):
            request.log("can't add %r to lupy update queue: can't lock queue" %
                        pagename)
            return
        try:
            f = codecs.open(self.file, 'a', config.charset)
            try:
                f.write(pagename + "\n")
            finally:
                f.close()                
        finally:
            self.writeLock.release()

    def pages(self):
        """ Return list of pages in the queue 
        
        TODO: tune timeout
        """
        if self.readLock.acquire(1.0):
            try:
                return self._decode(self._read())
            finally:
                self.readLock.release()            
        return []

    def remove(self, pages):
        """ Remove pages from the queue
        
        When the queue is empty, the queue file is removed, so exists()
        can tell if there is something waiting in the queue.
        
        TODO: tune the timeout
        """
        if self.writeLock.acquire(30.0):
            try:
                queue = self._decode(self._read())
                for page in pages:
                    try:
                        queue.remove(page)
                    except ValueError:
                        pass
                if queue:
                    self._write(queue)
                else:
                    self._removeFile()
                return True
            finally:
                self.writeLock.release()
        return False

    # Private -------------------------------------------------------

    def _decode(self, data):
        """ Decode queue data """
        pages = data.splitlines()
        return self._filterDuplicates(pages)

    def _filterDuplicates(self, pages):
        """ Filter duplicates in page list, keeping the order """
        unique = []
        seen = {}
        for name in pages:
            if name in seen:
                continue
            unique.append(name)
            seen[name] = 1
        return unique

    def _read(self):
        """ Read and return queue data
        
        This does not do anything with the data so we can release the
        lock as soon as possible, enabling others to update the queue.
        """
        try:
            f = codecs.open(self.file, 'r', config.charset)
            try:
                return f.read()
            finally:
                f.close()
        except (OSError, IOError), err:
            if err.errno != errno.ENOENT:
                raise
            return ''

    def _write(self, pages):
        """ Write pages to queue file
        
        Require queue write locking.
        """
        # XXX use tmpfile/move for atomic replace on real operating systems
        data = '\n'.join(pages) + '\n'
        f = codecs.open(self.file, 'w', config.charset)
        try:
            f.write(data)
        finally:
            f.close()            

    def _removeFile(self):
        """ Remove queue file 
        
        Require write locking.
        """
        try:
            os.remove(self.file)
        except OSError, err:
            if err.errno != errno.ENOENT:
                raise

class Index:
    class LockedException(Exception):
        pass
    
    def __init__(self, request):
        self.request = request
        cache_dir = request.cfg.cache_dir
        self.dir = os.path.join(cache_dir, 'lupy_index')
        filesys.makeDirs(self.dir)
        self.sig_file = os.path.join(self.dir, '__complete__')
        self.segments_file = os.path.join(self.dir, 'segments')
        lock_dir = os.path.join(cache_dir, 'lupy_index_lock')
        self.lock = lock.WriteLock(lock_dir,
                                   timeout=3600.0, readlocktimeout=60.0)
        self.read_lock = lock.ReadLock(lock_dir, timeout=3600.0)
        self.queue = UpdateQueue(os.path.join(self.dir, "__update_queue__"),
                                 os.path.join(cache_dir, 'lupy_queue_lock'))
        
        # Disabled until we have a sane way to build the index with a
        # queue in small steps.
        ## if not self.exists():
        ##    self.indexPagesInNewThread(request)

    def exists(self):
        """ Check if index exists """        
        return os.path.exists(self.sig_file)
                
    def mtime(self):
        return os.path.getmtime(self.segments_file)

    def search(self, query):
        if not self.read_lock.acquire(1.0):
            raise self.LockedException
        try:
            while True:
                try:
                    searcher, timestamp = self.request.cfg.lupy_searchers.pop()
                    if timestamp!=self.mtime():
                        searcher.close()
                    else:
                        break
                except IndexError:
                    searcher = IndexSearcher(self.dir)
                    timestamp = self.mtime()
                    break
                
            hits = list(searcher.search(query))
            self.request.cfg.lupy_searchers.append((searcher, timestamp))
        finally:
            self.read_lock.release()
        return hits

    def update_page(self, page):
        if not self.lock.acquire(1.0):
            self.queue.append(page.page_name)
            return
        self.request.clock.start('update_page')
        try:
            self._do_queued_updates()
            self._update_page(page)
        finally:
            self.lock.release()
        self.request.clock.stop('update_page')

    def indexPages(self):
        """ Index all pages
        
        Can be called only from a script. To index pages during a user
        request, use indexPagesInNewThread. 
        
        TODO: tune the acquire timeout
        """
        if not self.lock.acquire(1.0):
            self.request.log("can't index: can't acquire lock")
            return
        try:
            self._index_pages(self._indexingRequest(self.request))
        finally:
            self.lock.release()
    
    def indexPagesInNewThread(self):
        """ Index all pages in a new thread
        
        Should be called from a user request. From a script, use
        indexPages.

        TODO: tune the acquire timeout
        """
        if not self.lock.acquire(1.0):
            self.request.log("can't index: can't acquire lock")
            return
        try:
            # Prevent rebuilding the index just after it was finished
            if self.exists():
                self.lock.release()
                return
            from threading import Thread
            indexThread = Thread(target=self._index_pages,
                args=(self._indexingRequest(self.request), self.lock))
            indexThread.setDaemon(True)
            
            # Join the index thread after current request finish, prevent
            # Apache CGI from killing the process.
            def joinDecorator(finish):
                def func():
                    finish()
                    indexThread.join()
                return func
                
            self.request.finish = joinDecorator(self.request.finish)        
            indexThread.start()
        except:
            self.lock.release()
            raise

    def optimize(self):
        """ Optimize the index
        
        This may take from few seconds to few hours, depending on the
        size of the wiki. Currently it's usable only from a script.
        
        TODO: needs special locking, so the index is readable until the
        optimization is finished.
        """
        if not self.exists():
            raise RuntimeError("Index does not exist or is not finished")
        if not self.lock.acquire(1.0):
            self.request.log("can't lock the index for optimization")
            return
        try:
            self._optimize(self.request)
        finally:
            self.lock.release()

    # -------------------------------------------------------------------
    # Private

    def _do_queued_updates(self, amount=5):
        """ Assumes that the write lock is acquired """
        pages = self.queue.pages()[:amount]
        for name in pages:
            self._update_page(Page(self.request, name))
        self.queue.remove(pages)

    def _update_page(self, page):
        """ Assumes that the write lock is acquired """
        reader = IndexSearcher(self.dir)
        reader.reader.deleteTerm(Term('pagename', page.page_name))
        reader.close()
        if page.exists():
            writer = IndexWriter(self.dir, False, tokenizer)
            self._index_page(writer, page)
            writer.close()
        
    def _index_page(self, writer, page):
        """ Assumes that the write lock is acquired """
        d = document.Document()
        d.add(document.Keyword('pagename', page.page_name))
        d.add(document.Text('title', page.page_name, store=False))        
        d.add(document.Text('text', page.get_raw_body(), store=False))
        
        links = page.getPageLinks(page.request)
        t = document.Text('links', '', store=False)
        t.stringVal = links
        d.add(t)
        d.add(document.Text('link_text', ' '.join(links), store=False))

        writer.addDocument(d)

    def _index_pages(self, request, lock=None):
        """ Index all pages
        
        This should be called from indexPages or indexPagesInNewThread
        only!
        
        This may take few minutes up to few hours, depending on the
        size of the wiki.

        When called in a new thread, lock is acquired before the call,
        and this method must release it when it finishes or fails.
        """
        try:
            self._unsign()
            start = time.time()
            writer = IndexWriter(self.dir, True, tokenizer)
            writer.mergeFactor = 200
            pages = request.rootpage.getPageList(user='', exists=1)
            request.log("indexing all (%d) pages..." % len(pages))
            for pagename in pages:
                # Some code assumes request.page
                request.page = Page(request, pagename)
                self._index_page(writer, request.page)
            writer.close()
            request.log("indexing completed successfully in %0.2f seconds." % 
                        (time.time() - start))
            self._optimize(request)
            self._sign()
        finally:
            if lock:
                lock.release()

    def _optimize(self, request):
        """ Optimize the index """
        start = time.time()
        request.log("optimizing index...")
        writer = IndexWriter(self.dir, False, tokenizer)
        writer.optimize()
        writer.close()
        request.log("optimizing completed successfully in %0.2f seconds." % 
                    (time.time() - start))

    def _indexingRequest(self, request):
        """ Return a new request that can be used for index building.
        
        This request uses a security policy that lets the current user
        read any page. Without this policy some pages will not render,
        which will create broken pagelinks index.        
        """
        from MoinMoin.request import RequestCLI
        from MoinMoin.security import Permissions        
        request = RequestCLI(request.url)
        class SecurityPolicy(Permissions):            
            def read(*args, **kw):
                return True        
        request.user.may = SecurityPolicy(request.user)
        return request

    def _unsign(self):
        """ Remove sig file - assume write lock acquired """
        try:
            os.remove(self.sig_file)
        except OSError, err:
            if err.errno != errno.ENOENT:
                raise

    def _sign(self):
        """ Add sig file - assume write lock acquired """
        f = file(self.sig_file, 'w')
        try:
            f.write('')
        finally:
            f.close()