view MoinMoin/search/Xapian/ @ 5277:110efceb9e28

xapian index: remove redundant and unused fulltitle field
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Sat, 14 Nov 2009 13:33:45 +0100
parents 195db0fdbb80
children 66f7fd87b2ae
line wrap: on
line source
# -*- coding: iso-8859-1 -*-
    MoinMoin - xapian search engine indexing

    @copyright: 2006-2009 MoinMoin:ThomasWaldmann,
                2006 MoinMoin:FranzPletz,
                2009 MoinMoin:DmitrijsMilajevs
    @license: GNU GPL, see COPYING for details.

import os, re
import xapian

from MoinMoin import log
logging = log.getLogger(__name__)

from import xappy
from import BaseIndex
from import WikiAnalyzer

from MoinMoin.Page import Page
from MoinMoin import config, wikiutil

class Query(xapian.Query):

class UnicodeQuery(xapian.Query):
    """ Xapian query object which automatically encodes unicode strings """

    def __init__(self, *args, **kwargs):
        @keyword encoding: specify the encoding manually (default: value of config.charset)
        self.encoding = kwargs.get('encoding', config.charset)

        nargs = []
        for term in args:
            if isinstance(term, unicode):
                term = term.encode(self.encoding)
            elif isinstance(term, list) or isinstance(term, tuple):
                term = [t.encode(self.encoding) for t in term]

        Query.__init__(self, *nargs, **kwargs)

class MoinSearchConnection(xappy.SearchConnection):

    def get_all_documents(self, query=None):
        Return all the documents in the index (that match query, if given).
        document_count = self.get_doccount()
        query = query or self.query_all()
        hits =, 0, document_count)
        return hits

    def get_all_documents_with_fields(self, **fields):
        Return all the documents in the index (that match the field=value kwargs given).
        field_queries = [self.query_field(field, value) for field, value in fields.iteritems()]
        query = self.query_composite(self.OP_AND, field_queries)
        return self.get_all_documents(query)

class MoinIndexerConnection(xappy.IndexerConnection):

    def __init__(self, *args, **kwargs):
        super(MoinIndexerConnection, self).__init__(*args, **kwargs)

    def _define_fields_actions(self):
        SORTABLE = xappy.FieldActions.SORTABLE
        INDEX_EXACT = xappy.FieldActions.INDEX_EXACT
        INDEX_FREETEXT = xappy.FieldActions.INDEX_FREETEXT
        STORE_CONTENT = xappy.FieldActions.STORE_CONTENT

        self.add_field_action('wikiname', INDEX_EXACT)
        self.add_field_action('wikiname', STORE_CONTENT)
        self.add_field_action('pagename', INDEX_EXACT)
        self.add_field_action('pagename', STORE_CONTENT)
        self.add_field_action('pagename', SORTABLE)
        self.add_field_action('attachment', INDEX_EXACT)
        self.add_field_action('attachment', STORE_CONTENT)
        self.add_field_action('mtime', INDEX_EXACT)
        self.add_field_action('mtime', STORE_CONTENT)
        self.add_field_action('revision', STORE_CONTENT)
        self.add_field_action('revision', INDEX_EXACT)
        self.add_field_action('mimetype', INDEX_EXACT)
        self.add_field_action('mimetype', STORE_CONTENT)
        self.add_field_action('title', INDEX_FREETEXT, weight=100)
        self.add_field_action('title', STORE_CONTENT)
        self.add_field_action('content', INDEX_FREETEXT, spell=True)
        self.add_field_action('domain', INDEX_EXACT)
        self.add_field_action('domain', STORE_CONTENT)
        self.add_field_action('lang', INDEX_EXACT)
        self.add_field_action('lang', STORE_CONTENT)
        self.add_field_action('stem_lang', INDEX_EXACT)
        self.add_field_action('author', INDEX_EXACT)
        self.add_field_action('linkto', INDEX_EXACT)
        self.add_field_action('linkto', STORE_CONTENT)
        self.add_field_action('category', INDEX_EXACT)
        self.add_field_action('category', STORE_CONTENT)

class StemmedField(xappy.Field):

    def __init__(self, name, value, request):
        analyzer = WikiAnalyzer(request=request, language=request.cfg.language_default)
        value = ' '.join(unicode('%s %s' % (word, stemmed)).strip() for word, stemmed in analyzer.tokenize(value))
        super(StemmedField, self).__init__(name, value)

class XapianIndex(BaseIndex):

    def _main_dir(self):
        """ Get the directory of the xapian index """
        if self.request.cfg.xapian_index_dir:
            return os.path.join(self.request.cfg.xapian_index_dir,
            return os.path.join(self.request.cfg.cache_dir, 'xapian')

    def exists(self):
        """ Check if the Xapian index exists """
        return BaseIndex.exists(self) and os.listdir(self.dir)

    def _search(self, query, sort='weight', historysearch=0):
        Perform the search using xapian

        @param query: the search query objects
        @param sort: the sorting of the results (default: 'weight')
        @param historysearch: whether to search in all page revisions (default: 0) TODO: use/implement this
        while True:
                searcher, timestamp = self.request.cfg.xapian_searchers.pop()
                if timestamp != self.mtime():
            except IndexError:
                searcher = MoinSearchConnection(self.dir)
                timestamp = self.mtime()

        # Refresh connection, since it may be outdated.
        query = query.xapian_term(self.request, searcher)

        # Get maximum possible amount of hits from xappy, which is number of documents in the index.
        document_count = searcher.get_doccount()

        kw = {}
        if sort == 'page_name':
            kw['sortby'] = 'pagename'

        hits =, 0, document_count, **kw)

        self.request.cfg.xapian_searchers.append((searcher, timestamp))
        return hits

    def _do_queued_updates(self, request, amount=5):
        """ Assumes that the write lock is acquired """
        connection = MoinIndexerConnection(self.dir)
            for i in range(amount):
                    pagename, attachmentname, revno = self.update_queue.get()
                except IndexError:
                    # queue empty
                    logging.debug("got from indexer queue: %r %r %r" % (pagename, attachmentname, revno))
                    if not attachmentname:
                        if revno is None:
                            # generic "index this page completely, with attachments" request
                            self._index_page(request, connection, pagename, mode='update')
                            # "index this page revision" request
                            self._index_page_rev(request, connection, pagename, revno, mode='update')
                        # "index this attachment" request
                        self._index_attachment(request, connection, pagename, attachmentname, mode='update')

    def _get_document(self, connection, doc_id, mtime, mode):
        do_index = False

        if mode == 'update':
                doc = connection.get_document(doc_id)
                docmtime = long(['mtime'][0])
            except KeyError:
                do_index = True
                do_index = mtime > docmtime
        elif mode == 'add':
            do_index = True

        if do_index:
            document = xappy.UnprocessedDocument()
   = doc_id
            document = None
        return document

    def _add_fields_to_document(self, request, document, fields=None, multivalued_fields=None):

        fields_to_stem = ['title', 'content']

        if fields is None:
            fields = {}
        if multivalued_fields is None:
            multivalued_fields = {}

        for field, value in fields.iteritems():
            document.fields.append(xappy.Field(field, value))
            if field in fields_to_stem:
                document.fields.append(StemmedField(field, value, request))

        for field, values in multivalued_fields.iteritems():
            for value in values:
                document.fields.append(xappy.Field(field, value))

    def _get_languages(self, page):
        """ Get language of a page and the language to stem it in

        @param page: the page instance
        lang = None
        default_lang = page.request.cfg.language_default

        # if we should stem, we check if we have stemmer for the language available
        if page.request.cfg.xapian_stemming:
            lang = page.pi['language']
                # if there is no exception, lang is stemmable
                return (lang, lang)
            except xapian.InvalidArgumentError:
                # lang is not stemmable

        if not lang:
            # no lang found at all.. fallback to default language
            lang = default_lang

        # return actual lang and lang to stem in
        return (lang, default_lang)

    def _get_categories(self, page):
        """ Get all categories the page belongs to through the old
            regular expression

        @param page: the page instance
        body = page.get_raw_body()

        prev, next = (0, 1)
        pos = 0
        while next:
            if next != 1:
                pos += next.end()
            prev, next = next,'-----*\s*\r?\n', body[pos:])

        if not prev or prev == 1:
            return []
        # for CategoryFoo, group 'all' matched CategoryFoo, group 'key' matched just Foo
        return ['all') for m in self.request.cfg.cache.page_category_regex.finditer(body[pos:])]

    def _get_domains(self, page):
        """ Returns a generator with all the domains the page belongs to

        @param page: page
        if page.isUnderlayPage():
            yield 'underlay'
        if page.isStandardPage():
            yield 'standard'
        if wikiutil.isSystemPage(self.request, page.page_name):
            yield 'system'

    def _index_page(self, request, connection, pagename, mode='update'):
        """ Index a page - assumes that the write lock is acquired

        Index all revisions (if wanted by configuration) and all attachments.

        @arg connection: the Indexer connection object
        @arg pagename: a page name
        @arg mode: 'add' = just add, no checks
                   'update' = check if already in index and update if needed (mtime)
        page = Page(request, pagename)
        revlist = page.getRevList() # recent revs first, does not include deleted revs

        if not revlist:
            # we have an empty revision list, that means the page is not there any more,
            # likely it (== all of its revisions, all of its attachments) got either renamed or nuked
            wikiname = request.cfg.interwikiname or u'Self'

            sc = MoinSearchConnection(self.dir)
            docs_to_delete = sc.get_all_documents_with_fields(wikiname=wikiname, pagename=pagename)
                                                              # any page rev, any attachment

            for doc in docs_to_delete:
            logging.debug('page %s (all revs, all attachments) removed from xapian index' % pagename)

            if request.cfg.xapian_index_history:
                index_revs, remove_revs = revlist, []
                if page.exists(): # is current rev not deleted?
                    index_revs, remove_revs = revlist[:1], revlist[1:]
                    index_revs, remove_revs = [], revlist

            for revno in index_revs:
                updated = self._index_page_rev(request, connection, pagename, revno, mode=mode)
                logging.debug("updated page %r rev %d (updated==%r)" % (pagename, revno, updated))
                if not updated:
                    # we reached the revisions that are already present in the index

            for revno in remove_revs:
                # XXX remove_revs can be rather long for pages with many revs and
                # XXX most page revs usually will be already deleted. optimize?
                self._remove_page_rev(request, connection, pagename, revno)
                logging.debug("removed page %r rev %d" % (pagename, revno))

            from MoinMoin.action import AttachFile
            for attachmentname in AttachFile._get_files(request, pagename):
                self._index_attachment(request, connection, pagename, attachmentname, mode)

    def _index_page_rev(self, request, connection, pagename, revno, mode='update'):
        """ Index a page revision - assumes that the write lock is acquired

        @arg connection: the Indexer connection object
        @arg pagename: the page name
        @arg revno: page revision number (int)
        @arg mode: 'add' = just add, no checks
                   'update' = check if already in index and update if needed (mtime)
        page = Page(request, pagename, rev=revno) = page # XXX for what is this needed?

        wikiname = request.cfg.interwikiname or u"Self"
        revision = str(page.get_real_rev())
        itemid = "%s:%s:%s" % (wikiname, pagename, revision)
        mtime = page.mtime_usecs()

        doc = self._get_document(connection, itemid, mtime, mode)
        logging.debug("%s %r" % (pagename, doc))

        if doc:
            mimetype = 'text/%s' % page.pi['format']  # XXX improve this

            fields = {}
            fields['wikiname'] = wikiname
            fields['pagename'] = pagename
            fields['attachment'] = '' # this is a real page, not an attachment
            fields['mtime'] = str(mtime)
            fields['revision'] = revision
            fields['title'] = pagename
            fields['content'] = page.get_raw_body()
            fields['lang'], fields['stem_lang'] = self._get_languages(page)
            fields['author'] = page.edit_info().get('editor', '?')

            multivalued_fields = {}
            multivalued_fields['mimetype'] = [mt for mt in [mimetype] + mimetype.split('/')]
            multivalued_fields['domain'] = self._get_domains(page)
            multivalued_fields['linkto'] = page.getPageLinks(request)
            multivalued_fields['category'] = self._get_categories(page)

            self._add_fields_to_document(request, doc, fields, multivalued_fields)

            except xappy.IndexerError, err:
                logging.warning("IndexerError at %r %r %r (%s)" % (
                    wikiname, pagename, revision, str(err)))

        return bool(doc)

    def _remove_page_rev(self, request, connection, pagename, revno):
        """ Remove a page revision from the index - assumes that the write lock is acquired

        @arg connection: the Indexer connection object
        @arg pagename: the page name
        @arg revno: a real revision number (int), > 0
        @arg mode: 'add' = just add, no checks
                   'update' = check if already in index and update if needed (mtime)
        wikiname = request.cfg.interwikiname or u"Self"
        revision = str(revno)
        itemid = "%s:%s:%s" % (wikiname, pagename, revision)
        logging.debug('page %s, revision %d removed from index' % (pagename, revno))

    def _index_attachment(self, request, connection, pagename, attachmentname, mode='update'):
        """ Index an attachment
        from MoinMoin.action import AttachFile
        wikiname = request.cfg.interwikiname or u"Self"
        itemid = "%s:%s//%s" % (wikiname, pagename, attachmentname)

        filename = AttachFile.getFilename(request, pagename, attachmentname)
        # check if the file is still there. as we might be doing queued index updates,
        # the file could be gone meanwhile...
        if os.path.exists(filename):
            mtime = wikiutil.timestamp2version(os.path.getmtime(filename))
            doc = self._get_document(connection, itemid, mtime, mode)
            logging.debug("%s %s %r" % (pagename, attachmentname, doc))
            if doc:
                page = Page(request, pagename)
                mimetype, att_content = self.contentfilter(filename)

                fields = {}
                fields['wikiname'] = wikiname
                fields['pagename'] = pagename
                fields['attachment'] = attachmentname
                fields['mtime'] = str(mtime)
                fields['revision'] = '0'
                fields['title'] = '%s/%s' % (pagename, attachmentname)
                fields['content'] = att_content
                fields['lang'], fields['stem_lang'] = self._get_languages(page)

                multivalued_fields = {}
                multivalued_fields['mimetype'] = [mt for mt in [mimetype] + mimetype.split('/')]
                multivalued_fields['domain'] = self._get_domains(page)

                self._add_fields_to_document(request, doc, fields, multivalued_fields)

                logging.debug('attachment %s (page %s) updated in index' % (attachmentname, pagename))
            # attachment file was deleted, remove it from index also
            logging.debug('attachment %s (page %s) removed from index' % (attachmentname, pagename))

    def _index_file(self, request, connection, filename, mode='update'):
        """ index a file as it were a page named pagename
            Assumes that the write lock is acquired
        wikiname = request.cfg.interwikiname or u"Self"
        fs_rootpage = 'FS' # XXX FS hardcoded

            itemid = "%s:%s" % (wikiname, os.path.join(fs_rootpage, filename))
            mtime = wikiutil.timestamp2version(os.path.getmtime(filename))

            doc = self._get_document(connection, itemid, mtime, mode)
            logging.debug("%s %r" % (filename, doc))

            if doc:
                mimetype, file_content = self.contentfilter(filename)

                fields = {}
                fields['wikiname'] = wikiname
                fields['pagename'] = fs_rootpage
                fields['attachment'] = filename # XXX we should treat files like real pages, not attachments
                fields['mtime'] = str(mtime)
                fields['revision'] = '0'
                fields['title'] = " ".join(os.path.join(fs_rootpage, filename).split("/"))
                fields['content'] = file_content

                multivalued_fields = {}
                multivalued_fields['mimetype'] = [mt for mt in [mimetype] + mimetype.split('/')]

                self._add_fields_to_document(request, doc, fields, multivalued_fields)


        except (OSError, IOError, UnicodeError):
            logging.exception("_index_file crashed:")

    def _index_pages(self, request, files=None, mode='update', pages=None):
        """ Index pages (and all given files)

        This should be called from indexPages or indexPagesInNewThread only!

        When called in a new thread, lock is acquired before the call,
        and this method must release it when it finishes or fails.

        @param request: the current request
        @param files: an optional list of files to index
        @param mode: how to index the files, either 'add', 'update' or 'rebuild'
        @param pages: list of pages to index, if not given, all pages are indexed

        if pages is None:
            # Index all pages
            pages = request.rootpage.getPageList(user='', exists=1)

        # rebuilding the DB: delete it and add everything
        if mode == 'rebuild':
            for fname in os.listdir(self.dir):
                os.unlink(os.path.join(self.dir, fname))
            mode = 'add'

        connection = MoinIndexerConnection(self.dir)
            logging.debug("indexing all (%d) pages..." % len(pages))
            for pagename in pages:
                self._index_page(request, connection, pagename, mode=mode)
            if files:
                logging.debug("indexing all files...")
                for fname in files:
                    fname = fname.strip()
                    self._index_file(request, connection, fname, mode)