MoinMoin/storage/middleware/indexing.py
author Thomas Waldmann <tw AT waldmann-edv DOT de>
Sat, 08 Sep 2012 21:21:57 +0200
changeset 1779 082581e8688c
parent 1778 d56cd193cca0
child 1780 4a054063fbb0
permissions -rw-r--r--
indexing: move creation of the whoosh storage object to get_storage method

this way, we can more easily support other whoosh storage backends, currently
only FileStorage is supported.

added some notes about backend-specific methods of the indexing middleware.

removed some exception handler that might be too specific for everything else
than the FileStorage backend.
     1 # Copyright: 2011 MoinMoin:RonnyPfannschmidt
     2 # Copyright: 2011 MoinMoin:ThomasWaldmann
     3 # Copyright: 2011 MoinMoin:MichaelMayorov
     4 # License: GNU GPL v2 (or any later version), see LICENSE.txt for details.
     5 
     6 """
     7 MoinMoin - indexing middleware
     8 
     9 The backends and stores moin uses are rather simple, it is mostly just a
    10 unsorted / unordered bunch of revisions (meta and data) with iteration.
    11 
    12 The indexer middleware adds the needed power: after all metadata and data
    13 is indexed, we can do all sorts of operations on the indexer level:
    14 * searching
    15 * lookup by name, uuid, ...
    16 * selecting
    17 * listing
    18 
    19 Using Whoosh (a fast pure-Python indexing and search library), we build,
    20 maintain and use 2 indexes:
    21 
    22 * "all revisions" index (big, needed for history search)
    23 * "latest revisions" index (smaller, just the current revisions)
    24 
    25 When creating or destroying revisions, indexes are automatically updated.
    26 
    27 There is also code to do a full index rebuild in case it gets damaged, lost
    28 or needs rebuilding for other reasons. There is also index update code to
    29 do a quick "intelligent" update of a "mostly ok" index, that just adds,
    30 updates, deletes stuff that is different in backend compared to current index.
    31 
    32 Indexing is the only layer that can easily deal with **names** (it can
    33 easily translate names to UUIDs and vice versa) and with **items** (it
    34 knows current revision, it can easily list and order historial revisions),
    35 using the index.
    36 
    37 The layers below are using UUIDs to identify revisions meta and data:
    38 
    39 * revid (metaid) - a UUID identifying a specific revision (revision metadata)
    40 * dataid - a UUID identifying some specific revision data (optional), it is
    41   just stored into revision metadata.
    42 * itemid - a UUID identifying an item (== a set of revisions), it is just
    43   stored into revision metadata. itemid is only easily usable on indexing
    44   level.
    45 
    46 Many methods provided by the indexing middleware will be fast, because they
    47 will not access the layers below (like the backend), but just the index files,
    48 usually it is even just the small and thus quick latest-revs index.
    49 """
    50 
    51 
    52 from __future__ import absolute_import, division
    53 
    54 import os
    55 import shutil
    56 import itertools
    57 import time
    58 import datetime
    59 from StringIO import StringIO
    60 
    61 from flask import request
    62 from flask import g as flaskg
    63 from flask import current_app as app
    64 
    65 from whoosh.fields import Schema, TEXT, ID, IDLIST, NUMERIC, DATETIME, KEYWORD, BOOLEAN
    66 from whoosh.index import EmptyIndexError
    67 from whoosh.writing import AsyncWriter
    68 from whoosh.qparser import QueryParser, MultifieldParser, RegexPlugin, \
    69                            PseudoFieldPlugin
    70 from whoosh.qparser import WordNode
    71 from whoosh.query import Every, Term
    72 from whoosh.sorting import FieldFacet
    73 
    74 from MoinMoin import log
    75 logging = log.getLogger(__name__)
    76 
    77 from MoinMoin.config import WIKINAME, NAME, NAME_EXACT, MTIME, CONTENTTYPE, TAGS, \
    78                             LANGUAGE, USERID, ADDRESS, HOSTNAME, SIZE, ACTION, COMMENT, SUMMARY, \
    79                             CONTENT, EXTERNALLINKS, ITEMLINKS, ITEMTRANSCLUSIONS, ACL, EMAIL, OPENID, \
    80                             ITEMID, REVID, CURRENT, PARENTID, \
    81                             PTIME, \
    82                             LATEST_REVS, ALL_REVS, \
    83                             CONTENTTYPE_USER
    84 from MoinMoin.constants import keys
    85 from MoinMoin.constants.keys import ITEMTYPE
    86 
    87 from MoinMoin import user
    88 from MoinMoin.search.analyzers import item_name_analyzer, MimeTokenizer, AclTokenizer
    89 from MoinMoin.themes import utctimestamp
    90 from MoinMoin.util.crypto import make_uuid
    91 from MoinMoin.storage.middleware.validation import ContentMetaSchema, UserMetaSchema
    92 from MoinMoin.storage.error import NoSuchItemError, ItemAlreadyExistsError
    93 
    94 
    95 INDEXES = [LATEST_REVS, ALL_REVS, ]
    96 
    97 
    98 def backend_to_index(meta, content, schema, wikiname):
    99     """
   100     Convert backend metadata/data to a whoosh document.
   101 
   102     :param meta: revision meta from moin backend
   103     :param content: revision data converted to indexable content
   104     :param schema: whoosh schema
   105     :param wikiname: interwikiname of this wiki
   106     :returns: document to put into whoosh index
   107     """
   108     doc = dict([(str(key), value)
   109                 for key, value in meta.items()
   110                 if key in schema])
   111     for key in [MTIME, PTIME]:
   112         if key in doc:
   113             # we have UNIX UTC timestamp (int), whoosh wants datetime
   114             doc[key] = datetime.datetime.utcfromtimestamp(doc[key])
   115     doc[NAME_EXACT] = doc[NAME]
   116     doc[WIKINAME] = wikiname
   117     doc[CONTENT] = content
   118     return doc
   119 
   120 
   121 from MoinMoin.util.mime import Type, type_moin_document
   122 from MoinMoin.util.tree import moin_page
   123 from MoinMoin.converter import default_registry
   124 from MoinMoin.util.iri import Iri
   125 
   126 def convert_to_indexable(meta, data, is_new=False):
   127     """
   128     Convert revision data to a indexable content.
   129 
   130     :param meta: revision metadata (gets updated as a side effect)
   131     :param data: revision data (file-like)
   132                  please make sure that the content file is
   133                  ready to read all indexable content from it. if you have just
   134                  written that content or already read from it, you need to call
   135                  rev.seek(0) before calling convert_to_indexable(rev).
   136     :param is_new: if this is for a new revision and we shall modify
   137                    metadata as a side effect
   138     :returns: indexable content, text/plain, unicode object
   139     """
   140     class PseudoRev(object):
   141         def __init__(self, meta, data):
   142             self.meta = meta
   143             self.data = data
   144             self.revid = meta.get(REVID)
   145             class PseudoItem(object):
   146                 def __init__(self, name):
   147                     self.name = name
   148             self.item = PseudoItem(meta.get(NAME))
   149         def read(self, *args, **kw):
   150             return self.data.read(*args, **kw)
   151         def seek(self, *args, **kw):
   152             return self.data.seek(*args, **kw)
   153         def tell(self, *args, **kw):
   154             return self.data.tell(*args, **kw)
   155 
   156     rev = PseudoRev(meta, data)
   157     try:
   158         # TODO use different converter mode?
   159         # Maybe we want some special mode for the input converters so they emit
   160         # different output than for normal rendering), esp. for the non-markup
   161         # content types (images, etc.).
   162         input_contenttype = meta[CONTENTTYPE]
   163         output_contenttype = 'text/plain'
   164         type_input_contenttype = Type(input_contenttype)
   165         type_output_contenttype = Type(output_contenttype)
   166         reg = default_registry
   167         # first try a direct conversion (this could be useful for extraction
   168         # of (meta)data from binary types, like from images or audio):
   169         conv = reg.get(type_input_contenttype, type_output_contenttype)
   170         if conv:
   171             doc = conv(rev, input_contenttype)
   172             return doc
   173         # otherwise try via DOM as intermediate format (this is useful if
   174         # input type is markup, to get rid of the markup):
   175         input_conv = reg.get(type_input_contenttype, type_moin_document)
   176         refs_conv = reg.get(type_moin_document, type_moin_document, items='refs')
   177         output_conv = reg.get(type_moin_document, type_output_contenttype)
   178         if input_conv and output_conv:
   179             doc = input_conv(rev, input_contenttype)
   180             # We do not convert smileys, includes, macros, links, because
   181             # it does not improve search results or even makes results worse.
   182             # We do run the referenced converter, though, to extract links and
   183             # transclusions.
   184             if is_new:
   185                 # we only can modify new, uncommitted revisions, not stored revs
   186                 i = Iri(scheme='wiki', authority='', path='/' + meta[NAME])
   187                 doc.set(moin_page.page_href, unicode(i))
   188                 refs_conv(doc)
   189                 # side effect: we update some metadata:
   190                 meta[ITEMLINKS] = refs_conv.get_links()
   191                 meta[ITEMTRANSCLUSIONS] = refs_conv.get_transclusions()
   192                 meta[EXTERNALLINKS] = refs_conv.get_external_links()
   193             doc = output_conv(doc)
   194             return doc
   195         # no way
   196         raise TypeError("No converter for {0} --> {1}".format(input_contenttype, output_contenttype))
   197     except Exception as e: # catch all exceptions, we don't want to break an indexing run
   198         logging.exception("Exception happened in conversion of item {0!r} rev {1} contenttype {2}:".format(meta[NAME], meta.get(REVID, 'new'), meta.get(CONTENTTYPE, '')))
   199         doc = u'ERROR [{0!s}]'.format(e)
   200         return doc
   201 
   202 
   203 class IndexingMiddleware(object):
   204     def __init__(self, index_dir, backend, wiki_name=None, acl_rights_contents=[], **kw):
   205         """
   206         Store params, create schemas.
   207         """
   208         self.index_dir = index_dir
   209         self.index_dir_tmp = index_dir + '.temp'
   210         self.backend = backend
   211         self.wikiname = wiki_name
   212         self.ix = {}  # open indexes
   213         self.schemas = {}  # existing schemas
   214 
   215         common_fields = {
   216             # wikiname so we can have a shared index in a wiki farm, always check this!
   217             WIKINAME: ID(stored=True),
   218             # tokenized NAME from metadata - use this for manual searching from UI
   219             NAME: TEXT(stored=True, multitoken_query="and", analyzer=item_name_analyzer(), field_boost=2.0),
   220             # unmodified NAME from metadata - use this for precise lookup by the code.
   221             # also needed for wildcard search, so the original string as well as the query
   222             # (with the wildcard) is not cut into pieces.
   223             NAME_EXACT: ID(field_boost=3.0),
   224             # revision id (aka meta id)
   225             REVID: ID(unique=True, stored=True),
   226             # parent revision id
   227             PARENTID: ID(stored=True),
   228             # MTIME from revision metadata (converted to UTC datetime)
   229             MTIME: DATETIME(stored=True),
   230             # publish time from metadata (converted to UTC datetime)
   231             PTIME: DATETIME(stored=True),
   232             # ITEMTYPE from metadata, always matched exactly hence ID
   233             ITEMTYPE: ID(stored=True),
   234             # tokenized CONTENTTYPE from metadata
   235             CONTENTTYPE: TEXT(stored=True, multitoken_query="and", analyzer=MimeTokenizer()),
   236             # unmodified list of TAGS from metadata
   237             TAGS: ID(stored=True),
   238             LANGUAGE: ID(stored=True),
   239             # USERID from metadata
   240             USERID: ID(stored=True),
   241             # ADDRESS from metadata
   242             ADDRESS: ID(stored=True),
   243             # HOSTNAME from metadata
   244             HOSTNAME: ID(stored=True),
   245             # SIZE from metadata
   246             SIZE: NUMERIC(stored=True),
   247             # ACTION from metadata
   248             ACTION: ID(stored=True),
   249             # tokenized COMMENT from metadata
   250             COMMENT: TEXT(stored=True),
   251             # SUMMARY from metadata
   252             SUMMARY: TEXT(stored=True),
   253             # data (content), converted to text/plain and tokenized
   254             CONTENT: TEXT(stored=True),
   255         }
   256 
   257         latest_revs_fields = {
   258             # ITEMID from metadata - as there is only latest rev of same item here, it is unique
   259             ITEMID: ID(unique=True, stored=True),
   260             # unmodified list of ITEMLINKS from metadata
   261             ITEMLINKS: ID(stored=True),
   262             # unmodified list of ITEMTRANSCLUSIONS from metadata
   263             ITEMTRANSCLUSIONS: ID(stored=True),
   264             # tokenized ACL from metadata
   265             ACL: TEXT(analyzer=AclTokenizer(acl_rights_contents), multitoken_query="and", stored=True),
   266         }
   267         latest_revs_fields.update(**common_fields)
   268 
   269         userprofile_fields = {
   270             # Note: email / openid (if given) should be unique, but we might
   271             # have lots of empty values if it is not given and thus it is NOT
   272             # unique overall! Wrongly declaring it unique would lead to whoosh
   273             # killing other users from index when update_document() is called!
   274             EMAIL: ID(stored=True),
   275             OPENID: ID(stored=True),
   276         }
   277         latest_revs_fields.update(**userprofile_fields)
   278 
   279         # XXX This is a highly adhoc way to support indexing of ticket items.
   280         ticket_fields = {
   281             'effort': NUMERIC(stored=True),
   282             'difficulty': NUMERIC(stored=True),
   283             'severity': NUMERIC(stored=True),
   284             'priority': NUMERIC(stored=True),
   285             'status': ID(stored=True),
   286             'assigned_to': ID(stored=True),
   287             'superseded_by': ID(stored=True),
   288             'depends_on': ID(stored=True),
   289         }
   290         latest_revs_fields.update(**ticket_fields)
   291 
   292         blog_entry_fields = {
   293         }
   294         latest_revs_fields.update(**blog_entry_fields)
   295 
   296         all_revs_fields = {
   297             ITEMID: ID(stored=True),
   298         }
   299         all_revs_fields.update(**common_fields)
   300 
   301         latest_revisions_schema = Schema(**latest_revs_fields)
   302         all_revisions_schema = Schema(**all_revs_fields)
   303 
   304         # Define dynamic fields
   305         dynamic_fields = [("*_id", ID(stored=True)),
   306                           ("*_text", TEXT(stored=True)),
   307                           ("*_keyword", KEYWORD(stored=True)),
   308                           ("*_numeric", NUMERIC(stored=True)),
   309                           ("*_datetime", DATETIME(stored=True)),
   310                           ("*_boolean", BOOLEAN(stored=True)),
   311                          ]
   312 
   313         # Adding dynamic fields to schemas
   314         for glob, field_type in dynamic_fields:
   315             latest_revisions_schema.add(glob, field_type, glob=True)
   316             all_revisions_schema.add(glob, field_type, glob=True)
   317 
   318         # schemas are needed by query parser and for index creation
   319         self.schemas[ALL_REVS] = all_revisions_schema
   320         self.schemas[LATEST_REVS] = latest_revisions_schema
   321 
   322         # what fields could whoosh result documents have (no matter whether all revs index
   323         # or latest revs index):
   324         self.common_fields = set(latest_revs_fields.keys()) & set(all_revs_fields.keys())
   325 
   326     def get_storage(self, tmp=False, create=False):
   327         """
   328         Get the whoosh storage (whoosh supports different kinds of storage,
   329         e.g. to filesystem or to GAE).
   330         Currently we only support the FileStorage.
   331         """
   332         from whoosh.filedb.filestore import FileStorage
   333         index_dir = self.index_dir_tmp if tmp else self.index_dir
   334         if create:
   335             try:
   336                 os.mkdir(index_dir)
   337             except:
   338                 # ignore exception, we'll get another exception below
   339                 # in case there are problems with the index_dir
   340                 pass
   341         storage = FileStorage(index_dir)
   342         return storage
   343 
   344     def open(self):
   345         """
   346         Open all indexes.
   347         """
   348         storage = self.get_storage()
   349         for name in INDEXES:
   350             self.ix[name] = storage.open_index(name)
   351 
   352     def close(self):
   353         """
   354         Close all indexes.
   355         """
   356         for name in self.ix:
   357             self.ix[name].close()
   358         self.ix = {}
   359 
   360     def create(self, tmp=False):
   361         """
   362         Create all indexes (empty).
   363         """
   364         storage = self.get_storage(tmp, create=True)
   365         for name in INDEXES:
   366             storage.create_index(self.schemas[name], indexname=name)
   367 
   368     def destroy(self, tmp=False):
   369         """
   370         Destroy all indexes.
   371         """
   372         # XXX this is whoosh backend specific and currently only works for FileStorage.
   373         index_dir = self.index_dir_tmp if tmp else self.index_dir
   374         if os.path.exists(index_dir):
   375             shutil.rmtree(index_dir)
   376 
   377     def move_index(self):
   378         """
   379         Move freshly built indexes from index_dir_tmp to index_dir.
   380         """
   381         # XXX this is whoosh backend specific and currently only works for FileStorage.
   382         self.destroy()
   383         os.rename(self.index_dir_tmp, self.index_dir)
   384 
   385     def index_revision(self, meta, content, async=True):
   386         """
   387         Index a single revision, add it to all-revs and latest-revs index.
   388 
   389         :param meta: metadata dict
   390         :param content: preprocessed (filtered) indexable content
   391         :param async: if True, use the AsyncWriter, otherwise use normal writer
   392         """
   393         doc = backend_to_index(meta, content, self.schemas[ALL_REVS], self.wikiname)
   394         if async:
   395             writer = AsyncWriter(self.ix[ALL_REVS])
   396         else:
   397             writer = self.ix[ALL_REVS].writer()
   398         with writer as writer:
   399             writer.update_document(**doc) # update, because store_revision() may give us an existing revid
   400         doc = backend_to_index(meta, content, self.schemas[LATEST_REVS], self.wikiname)
   401         if async:
   402             writer = AsyncWriter(self.ix[LATEST_REVS])
   403         else:
   404             writer = self.ix[LATEST_REVS].writer()
   405         with writer as writer:
   406             writer.update_document(**doc)
   407 
   408     def remove_revision(self, revid, async=True):
   409         """
   410         Remove a single revision from indexes.
   411         """
   412         if async:
   413             writer = AsyncWriter(self.ix[ALL_REVS])
   414         else:
   415             writer = self.ix[ALL_REVS].writer()
   416         with writer as writer:
   417             writer.delete_by_term(REVID, revid)
   418         if async:
   419             writer = AsyncWriter(self.ix[LATEST_REVS])
   420         else:
   421             writer = self.ix[LATEST_REVS].writer()
   422         with writer as writer:
   423             # find out itemid related to the revid we want to remove:
   424             with self.ix[LATEST_REVS].searcher() as searcher:
   425                 docnum_remove = searcher.document_number(revid=revid)
   426                 if docnum_remove is not None:
   427                     itemid = searcher.stored_fields(docnum_remove)[ITEMID]
   428             if docnum_remove is not None:
   429                 # we are removing a revid that is in latest revs index
   430                 latest_names_revids = self._find_latest_names_revids(self.ix[ALL_REVS], Term(ITEMID, itemid))
   431                 if latest_names_revids:
   432                     # we have a latest revision, just update the document in the index:
   433                     assert len(latest_names_revids) == 1 # this item must have only one latest revision
   434                     latest_name_revid = latest_names_revids[0]
   435                     # we must fetch from backend because schema for LATEST_REVS is different than for ALL_REVS
   436                     # (and we can't be sure we have all fields stored, too)
   437                     meta, _ = self.backend.retrieve(*latest_name_revid)
   438                     # we only use meta (not data), because we do not want to transform data->content again (this
   439                     # is potentially expensive) as we already have the transformed content stored in ALL_REVS index:
   440                     with self.ix[ALL_REVS].searcher() as searcher:
   441                         doc = searcher.document(revid=latest_name_revid[1])
   442                         content = doc[CONTENT]
   443                     doc = backend_to_index(meta, content, self.schemas[LATEST_REVS], self.wikiname)
   444                     writer.update_document(**doc)
   445                 else:
   446                     # this is no revision left in this item that could be the new "latest rev", just kill the rev
   447                     writer.delete_document(docnum_remove)
   448 
   449     def _modify_index(self, index, schema, wikiname, revids, mode='add', procs=1, limitmb=256):
   450         """
   451         modify index contents - add, update, delete the indexed documents for all given revids
   452 
   453         Note: mode == 'add' is faster but you need to make sure to not create duplicate
   454               documents in the index.
   455         """
   456         with index.writer(procs=procs, limitmb=limitmb) as writer:
   457             for mountpoint, revid in revids:
   458                 if mode in ['add', 'update', ]:
   459                     meta, data = self.backend.retrieve(mountpoint, revid)
   460                     content = convert_to_indexable(meta, data, is_new=False)
   461                     doc = backend_to_index(meta, content, schema, wikiname)
   462                 if mode == 'update':
   463                     writer.update_document(**doc)
   464                 elif mode == 'add':
   465                     writer.add_document(**doc)
   466                 elif mode == 'delete':
   467                     writer.delete_by_term(REVID, revid)
   468                 else:
   469                     raise ValueError("mode must be 'update', 'add' or 'delete', not '{0}'".format(mode))
   470 
   471     def _find_latest_names_revids(self, index, query=None):
   472         """
   473         find the latest revids using the all-revs index
   474 
   475         :param index: an up-to-date and open ALL_REVS index
   476         :param query: query to search only specific revisions (optional, default: all items/revisions)
   477         :returns: a list of tuples (name, latest revid)
   478         """
   479         if query is None:
   480             query = Every()
   481         with index.searcher() as searcher:
   482             result = searcher.search(query, groupedby=ITEMID, sortedby=FieldFacet(MTIME, reverse=True))
   483             by_item = result.groups(ITEMID)
   484             # values in v list are in same relative order as in results, so latest MTIME is first:
   485             latest_names_revids = [(searcher.stored_fields(v[0])[NAME],
   486                                     searcher.stored_fields(v[0])[REVID])
   487                                    for v in by_item.values()]
   488         return latest_names_revids
   489 
   490     def rebuild(self, tmp=False, procs=1, limitmb=256):
   491         """
   492         Add all items/revisions from the backends of this wiki to the index
   493         (which is expected to have no items/revisions from this wiki yet).
   494 
   495         Note: index might be shared by multiple wikis, so it is:
   496               create, rebuild wiki1, rebuild wiki2, ...
   497               create (tmp), rebuild wiki1, rebuild wiki2, ..., move
   498         """
   499         storage = self.get_storage(tmp)
   500         index = storage.open_index(ALL_REVS)
   501         try:
   502             # build an index of all we have (so we know what we have)
   503             all_revids = self.backend # the backend is an iterator over all revids
   504             self._modify_index(index, self.schemas[ALL_REVS], self.wikiname, all_revids, 'add', procs, limitmb)
   505             latest_names_revids = self._find_latest_names_revids(index)
   506         finally:
   507             index.close()
   508         # now build the index of the latest revisions:
   509         index = storage.open_index(LATEST_REVS)
   510         try:
   511             self._modify_index(index, self.schemas[LATEST_REVS], self.wikiname, latest_names_revids, 'add', procs, limitmb)
   512         finally:
   513             index.close()
   514 
   515     def update(self, tmp=False):
   516         """
   517         Make sure index reflects current backend state, add missing stuff, remove outdated stuff.
   518 
   519         This is intended to be used:
   520         * after a full rebuild that was done at tmp location
   521         * after wiki is made read-only or taken offline
   522         * after the index was moved to the normal index location
   523 
   524         Reason: new revisions that were created after the rebuild started might be missing in new index.
   525 
   526         :returns: index changed (bool)
   527         """
   528         storage = self.get_storage(tmp)
   529         index_all = storage.open_index(ALL_REVS)
   530         try:
   531             # NOTE: self.backend iterator gives (mountpoint, revid) tuples, which is NOT
   532             # the same as (name, revid), thus we do the set operations just on the revids.
   533             # first update ALL_REVS index:
   534             revids_mountpoints = dict((revid, mountpoint) for mountpoint, revid in self.backend)
   535             backend_revids = set(revids_mountpoints)
   536             with index_all.searcher() as searcher:
   537                 ix_revids_names = dict((doc[REVID], doc[NAME]) for doc in searcher.all_stored_fields())
   538             revids_mountpoints.update(ix_revids_names) # this is needed for stuff that was deleted from storage
   539             ix_revids = set(ix_revids_names)
   540             add_revids = backend_revids - ix_revids
   541             del_revids = ix_revids - backend_revids
   542             changed = add_revids or del_revids
   543             add_revids = [(revids_mountpoints[revid], revid) for revid in add_revids]
   544             del_revids = [(revids_mountpoints[revid], revid) for revid in del_revids]
   545             self._modify_index(index_all, self.schemas[ALL_REVS], self.wikiname, add_revids, 'add')
   546             self._modify_index(index_all, self.schemas[ALL_REVS], self.wikiname, del_revids, 'delete')
   547 
   548             backend_latest_names_revids = set(self._find_latest_names_revids(index_all))
   549         finally:
   550             index_all.close()
   551         index_latest = storage.open_index(LATEST_REVS)
   552         try:
   553             # now update LATEST_REVS index:
   554             with index_latest.searcher() as searcher:
   555                 ix_revids = set(doc[REVID] for doc in searcher.all_stored_fields())
   556             backend_latest_revids = set(revid for name, revid in backend_latest_names_revids)
   557             upd_revids = backend_latest_revids - ix_revids
   558             upd_revids = [(revids_mountpoints[revid], revid) for revid in upd_revids]
   559             self._modify_index(index_latest, self.schemas[LATEST_REVS], self.wikiname, upd_revids, 'update')
   560             self._modify_index(index_latest, self.schemas[LATEST_REVS], self.wikiname, del_revids, 'delete')
   561         finally:
   562             index_latest.close()
   563         return changed
   564 
   565     def optimize_backend(self):
   566         """
   567         Optimize backend / collect garbage to safe space:
   568 
   569         * deleted items: destroy them? use a deleted_max_age?
   570         * user profiles: only keep latest revision?
   571         * normal wiki items: keep by max_revisions_count / max_age
   572         * deduplicate data (determine dataids with same hash, fix references to point to one of them)
   573         * remove unreferenced dataids (destroyed revisions, deduplicated stuff)
   574         """
   575         # TODO
   576 
   577     def optimize_index(self, tmp=False):
   578         """
   579         Optimize whoosh index.
   580         """
   581         storage = self.get_storage(tmp)
   582         for name in INDEXES:
   583             ix = storage.open_index(name)
   584             try:
   585                 ix.optimize()
   586             finally:
   587                 ix.close()
   588 
   589     def dump(self, tmp=False, idx_name=LATEST_REVS):
   590         """
   591         Yield key/value tuple lists for all documents in the indexes, fields sorted.
   592         """
   593         storage = self.get_storage(tmp)
   594         ix = storage.open_index(idx_name)
   595         try:
   596             with ix.searcher() as searcher:
   597                 for doc in searcher.all_stored_fields():
   598                     name = doc.pop(NAME, u"")
   599                     content = doc.pop(CONTENT, u"")
   600                     yield [(NAME, name), ] + sorted(doc.items()) + [(CONTENT, content), ]
   601         finally:
   602             ix.close()
   603 
   604     def query_parser(self, default_fields, idx_name=LATEST_REVS):
   605         """
   606         Build a query parser for a list of default fields.
   607         """
   608         schema = self.schemas[idx_name]
   609         if len(default_fields) > 1:
   610             qp = MultifieldParser(default_fields, schema=schema)
   611         elif len(default_fields) == 1:
   612             qp = QueryParser(default_fields[0], schema=schema)
   613         else:
   614             raise ValueError("default_fields list must at least contain one field name")
   615         qp.add_plugin(RegexPlugin())
   616         def userid_pseudo_field_factory(fieldname):
   617             """generate a translator function, that searches for the userid
   618                in the given fieldname when provided with the username
   619             """
   620             def userid_pseudo_field(node):
   621                 username = node.text
   622                 users = user.search_users(**{NAME_EXACT: username})
   623                 if users:
   624                     userid = users[0].meta[ITEMID]
   625                     node = WordNode(userid)
   626                     node.set_fieldname(fieldname)
   627                     return node
   628                 return node
   629             return userid_pseudo_field
   630         qp.add_plugin(PseudoFieldPlugin(dict(
   631             # username:JoeDoe searches for revisions modified by JoeDoe
   632             username=userid_pseudo_field_factory(keys.USERID),
   633             # assigned:JoeDoe searches for tickets assigned to JoeDoe
   634             assigned=userid_pseudo_field_factory('assigned_to'), # XXX should be keys.ASSIGNED_TO
   635         )))
   636         return qp
   637 
   638     def search(self, q, idx_name=LATEST_REVS, **kw):
   639         """
   640         Search with query q, yield Revisions.
   641         """
   642         with self.ix[idx_name].searcher() as searcher:
   643             # Note: callers must consume everything we yield, so the for loop
   644             # ends and the "with" is left to close the index files.
   645             for hit in searcher.search(q, **kw):
   646                 doc = hit.fields()
   647                 latest_doc = doc if idx_name == LATEST_REVS else None
   648                 item = Item(self, latest_doc=latest_doc, itemid=doc[ITEMID])
   649                 yield item.get_revision(doc[REVID], doc=doc)
   650 
   651     def search_page(self, q, idx_name=LATEST_REVS, pagenum=1, pagelen=10, **kw):
   652         """
   653         Same as search, but with paging support.
   654         """
   655         with self.ix[idx_name].searcher() as searcher:
   656             # Note: callers must consume everything we yield, so the for loop
   657             # ends and the "with" is left to close the index files.
   658             for hit in searcher.search_page(q, pagenum, pagelen=pagelen, **kw):
   659                 doc = hit.fields()
   660                 latest_doc = doc if idx_name == LATEST_REVS else None
   661                 item = Item(self, latest_doc=latest_doc, itemid=doc[ITEMID])
   662                 yield item.get_revision(doc[REVID], doc=doc)
   663 
   664     def documents(self, idx_name=LATEST_REVS, **kw):
   665         """
   666         Yield Revisions matching the kw args.
   667         """
   668         for doc in self._documents(idx_name, **kw):
   669             latest_doc = doc if idx_name == LATEST_REVS else None
   670             item = Item(self, latest_doc=latest_doc, itemid=doc[ITEMID])
   671             yield item.get_revision(doc[REVID], doc=doc)
   672 
   673     def _documents(self, idx_name=LATEST_REVS, **kw):
   674         """
   675         Yield documents matching the kw args (internal use only).
   676 
   677         If no kw args are given, this yields all documents.
   678         """
   679         with self.ix[idx_name].searcher() as searcher:
   680             # Note: callers must consume everything we yield, so the for loop
   681             # ends and the "with" is left to close the index files.
   682             for doc in searcher.documents(**kw):
   683                 yield doc
   684 
   685     def document(self, idx_name=LATEST_REVS, **kw):
   686         """
   687         Return a Revision matching the kw args.
   688         """
   689         doc = self._document(idx_name, **kw)
   690         if doc:
   691             latest_doc = doc if idx_name == LATEST_REVS else None
   692             item = Item(self, latest_doc=latest_doc, itemid=doc[ITEMID])
   693             return item.get_revision(doc[REVID], doc=doc)
   694 
   695     def _document(self, idx_name=LATEST_REVS, **kw):
   696         """
   697         Return a document matching the kw args (internal use only).
   698         """
   699         with self.ix[idx_name].searcher() as searcher:
   700             return searcher.document(**kw)
   701 
   702     def has_item(self, name):
   703         item = self[name]
   704         return bool(item)
   705 
   706     def __getitem__(self, name):
   707         """
   708         Return item with <name> (may be a new or existing item).
   709         """
   710         return Item(self, name_exact=name)
   711 
   712     def get_item(self, **query):
   713         """
   714         Return item identified by the query (may be a new or existing item).
   715 
   716         :kwargs **query: e.g. name_exact=u"Foo" or itemid="..." or ...
   717                          (must be a unique fieldname=value for the latest-revs index)
   718         """
   719         return Item(self, **query)
   720 
   721     def create_item(self, **query):
   722         """
   723         Return item identified by the query (must be a new item).
   724 
   725         :kwargs **query: e.g. name_exact=u"Foo" or itemid="..." or ...
   726                          (must be a unique fieldname=value for the latest-revs index)
   727         """
   728         return Item.create(self, **query)
   729 
   730     def existing_item(self, **query):
   731         """
   732         Return item identified by query (must be an existing item).
   733 
   734         :kwargs **query: e.g. name_exact=u"Foo" or itemid="..." or ...
   735                          (must be a unique fieldname=value for the latest-revs index)
   736         """
   737         return Item.existing(self, **query)
   738 
   739 
   740 class Item(object):
   741     def __init__(self, indexer, latest_doc=None, **query):
   742         """
   743         :param indexer: indexer middleware instance
   744         :param latest_doc: if caller already has a latest-revs index whoosh document
   745                            it can be given there, to avoid us fetching same doc again
   746                            from the index
   747         :kwargs **query: any unique fieldname=value for the latest-revs index, e.g.:
   748                          name_exact="foo" or itemid="....." to fetch the item's current
   749                          doc from the index (if not given via latest_doc).
   750         """
   751         self.indexer = indexer
   752         self.backend = self.indexer.backend
   753         if latest_doc is None:
   754             # we need to call the method without acl check to avoid endless recursion:
   755             latest_doc = self.indexer._document(**query) or {}
   756         self._current = latest_doc
   757 
   758     def _get_itemid(self):
   759         return self._current.get(ITEMID)
   760     def _set_itemid(self, value):
   761         self._current[ITEMID] = value
   762     itemid = property(_get_itemid, _set_itemid)
   763 
   764     @property
   765     def acl(self):
   766         return self._current.get(ACL)
   767 
   768     @property
   769     def ptime(self):
   770         dt = self._current.get(PTIME)
   771         if dt is not None:
   772             return utctimestamp(dt)
   773 
   774     @property
   775     def mtime(self):
   776         dt = self._current.get(MTIME)
   777         if dt is not None:
   778             return utctimestamp(dt)
   779 
   780     @property
   781     def name(self):
   782         return self._current.get(NAME, 'DoesNotExist')
   783 
   784     @classmethod
   785     def create(cls, indexer, **query):
   786         """
   787         Create a new item and return it, raise exception if it already exists.
   788         """
   789         item = cls(indexer, **query)
   790         if not item:
   791             return item
   792         raise ItemAlreadyExistsError(repr(query))
   793 
   794     @classmethod
   795     def existing(cls, indexer, **query):
   796         """
   797         Get an existing item and return it, raise exception if it does not exist.
   798         """
   799         item = cls(indexer, **query)
   800         if item:
   801             return item
   802         raise NoSuchItemError(repr(query))
   803 
   804     def __nonzero__(self):
   805         """
   806         Item exists (== has at least one revision)?
   807         """
   808         return self.itemid is not None
   809 
   810     def iter_revs(self):
   811         """
   812         Iterate over Revisions belonging to this item.
   813         """
   814         if self:
   815             for rev in self.indexer.documents(idx_name=ALL_REVS, itemid=self.itemid):
   816                 yield rev
   817 
   818     def __getitem__(self, revid):
   819         """
   820         Get Revision with revision id <revid>.
   821         """
   822         return Revision(self, revid)
   823 
   824     def get_revision(self, revid, doc=None):
   825         """
   826         Similar to item[revid], but you can optionally give an already existing
   827         whoosh result document for the given revid to avoid backend accesses for some use cases.
   828         """
   829         return Revision(self, revid, doc)
   830 
   831     def preprocess(self, meta, data):
   832         """
   833         preprocess a revision before it gets stored and put into index.
   834         """
   835         content = convert_to_indexable(meta, data, is_new=True)
   836         return meta, data, content
   837 
   838     def store_revision(self, meta, data, overwrite=False,
   839                        trusted=False, # True for loading a serialized representation or other trusted sources
   840                        name=None, # TODO name we decoded from URL path
   841                        action=u'SAVE',
   842                        remote_addr=None,
   843                        userid=None,
   844                        wikiname=None,
   845                        contenttype_current=None,
   846                        contenttype_guessed=None,
   847                        acl_parent=None,
   848                        ):
   849         """
   850         Store a revision into the backend, write metadata and data to it.
   851 
   852         Usually this will be a new revision, either of an existing item or
   853         a new item. With overwrite mode, we can also store over existing
   854         revisions.
   855 
   856         :type meta: dict
   857         :type data: open file (file must be closed by caller)
   858         :param overwrite: if True, allow overwriting of existing revs.
   859         :returns: a Revision instance of the just created revision
   860         """
   861         if remote_addr is None:
   862             try:
   863                 # if we get here outside a request, this won't work:
   864                 remote_addr = unicode(request.remote_addr)
   865             except:
   866                 pass
   867         if userid is None:
   868             try:
   869                 # if we get here outside a request, this won't work:
   870                 userid = flaskg.user.valid and flaskg.user.itemid or None
   871             except:
   872                 pass
   873         if wikiname is None:
   874             wikiname = app.cfg.interwikiname
   875         state = {'trusted': trusted,
   876                  keys.NAME: name,
   877                  keys.ACTION: action,
   878                  keys.ADDRESS: remote_addr,
   879                  keys.USERID: userid,
   880                  keys.WIKINAME: wikiname,
   881                  keys.ITEMID: self.itemid, # real itemid or None
   882                  'contenttype_current': contenttype_current,
   883                  'contenttype_guessed': contenttype_guessed,
   884                  'acl_parent': acl_parent,
   885                 }
   886         ct = meta.get(keys.CONTENTTYPE)
   887         if ct == CONTENTTYPE_USER:
   888             Schema = UserMetaSchema
   889         else:
   890             Schema = ContentMetaSchema
   891         m = Schema(meta)
   892         valid = m.validate(state)
   893         # TODO: currently we just log validation results. in the end we should
   894         # reject invalid stuff in some comfortable way.
   895         if not valid:
   896             logging.warning("metadata validation failed, see below")
   897             for e in m.children:
   898                 logging.warning("{0}, {1}".format(e.valid, e))
   899 
   900         # we do not have anything in m that is not defined in the schema,
   901         # e.g. userdefined meta keys or stuff we do not validate. thus, we
   902         # just update the meta dict with the validated stuff:
   903         meta.update(dict(m.value.items()))
   904         # we do not want None / empty values:
   905         meta = dict([(k, v) for k, v in meta.items() if v not in [None, []]])
   906 
   907         if self.itemid is None:
   908             self.itemid = meta[ITEMID]
   909         backend = self.backend
   910         if not overwrite:
   911             revid = meta.get(REVID)
   912             if revid is not None and revid in backend:
   913                 raise ValueError('need overwrite=True to overwrite existing revisions')
   914         meta, data, content = self.preprocess(meta, data)
   915         data.seek(0)  # rewind file
   916         revid = backend.store(meta, data)
   917         meta[REVID] = revid
   918         self.indexer.index_revision(meta, content)
   919         if not overwrite:
   920             self._current = self.indexer._document(revid=revid)
   921         return Revision(self, revid)
   922 
   923     def store_all_revisions(self, meta, data):
   924         """
   925         Store over all revisions of this item.
   926         """
   927         for rev in self.iter_revs():
   928             meta[REVID] = rev.revid
   929             self.store_revision(meta, data, overwrite=True)
   930 
   931     def destroy_revision(self, revid):
   932         """
   933         Destroy revision <revid>.
   934         """
   935         rev = Revision(self, revid)
   936         self.backend.remove(rev.name, revid)
   937         self.indexer.remove_revision(revid)
   938 
   939     def destroy_all_revisions(self):
   940         """
   941         Destroy all revisions of this item.
   942         """
   943         for rev in self.iter_revs():
   944             self.destroy_revision(rev.revid)
   945 
   946 
   947 class Revision(object):
   948     """
   949     An existing revision (exists in the backend).
   950     """
   951     def __init__(self, item, revid, doc=None):
   952         is_current = revid == CURRENT
   953         if doc is None:
   954             if is_current:
   955                 doc = item._current
   956             else:
   957                 doc = item.indexer._document(idx_name=ALL_REVS, revid=revid)
   958                 if doc is None:
   959                     raise KeyError
   960         if is_current:
   961             revid = doc.get(REVID)
   962             if revid is None:
   963                 raise KeyError
   964         self.item = item
   965         self.revid = revid
   966         self.backend = item.backend
   967         self._doc = doc
   968         self.meta = Meta(self, self._doc)
   969         self._data = None
   970         # Note: this does not immediately raise a KeyError for non-existing revs any more
   971         # If you access data or meta, it will, though.
   972 
   973     @property
   974     def name(self):
   975         return self.meta.get(NAME, 'DoesNotExist')
   976 
   977     def _load(self):
   978         meta, data = self.backend.retrieve(self._doc[NAME], self.revid) # raises KeyError if rev does not exist
   979         self.meta = Meta(self, self._doc, meta)
   980         self._data = data
   981         return meta, data
   982 
   983     @property
   984     def data(self):
   985         if self._data is None:
   986             self._load()
   987         return self._data
   988 
   989     def close(self):
   990         if self._data is not None:
   991             self._data.close()
   992 
   993     def __enter__(self):
   994         return self
   995 
   996     def __exit__(self, exc_type, exc_value, exc_tb):
   997         self.close()
   998 
   999     def __cmp__(self, other):
  1000         return cmp(self.meta, other.meta)
  1001 
  1002 
  1003 from collections import Mapping
  1004 
  1005 class Meta(Mapping):
  1006     def __init__(self, revision, doc, meta=None):
  1007         self.revision = revision
  1008         self._doc = doc or {}
  1009         self._meta = meta or {}
  1010         self._common_fields = revision.item.indexer.common_fields
  1011 
  1012     def __contains__(self, key):
  1013         try:
  1014             self[key]
  1015         except KeyError:
  1016             return False
  1017         else:
  1018             return True
  1019 
  1020     def __iter__(self):
  1021         self._meta, _ = self.revision._load()
  1022         return iter(self._meta)
  1023 
  1024     def __getitem__(self, key):
  1025         if self._meta:
  1026             # we have real metadata (e.g. from storage)
  1027             return self._meta[key]
  1028         elif self._doc and key in self._common_fields:
  1029             # we have a result document from whoosh, which has quite a lot
  1030             # of the usually wanted metadata, avoid storage access, use this.
  1031             value = self._doc[key]
  1032             if key in [MTIME, PTIME]:
  1033                 # whoosh has a datetime object, but we want a UNIX timestamp
  1034                 value = utctimestamp(value)
  1035             return value
  1036         else:
  1037             self._meta, _ = self.revision._load()
  1038             return self._meta[key]
  1039 
  1040     def __cmp__(self, other):
  1041         if self[REVID] == other[REVID]:
  1042             return 0
  1043         return cmp(self[MTIME], other[MTIME])
  1044 
  1045     def __len__(self):
  1046         return 0 # XXX
  1047 
  1048     def __repr__(self):
  1049         return "Meta _doc: {0!r} _meta: {1!r}".format(self._doc, self._meta)