diff MoinMoin/action/SyncPages.py @ 1404:e6bf86b4a735

Migrated code to new RPC-Aggregator technology. Fixed merging bug that affected deleted pages in scenarios where the page was not synced before.
author Alexander Schremmer <alex AT alexanderweb DOT de>
date Wed, 23 Aug 2006 21:20:10 +0200
parents e6e054247a58
children e1419b1f9679
line wrap: on
line diff
--- a/MoinMoin/action/SyncPages.py	Wed Aug 23 20:52:26 2006 +0200
+++ b/MoinMoin/action/SyncPages.py	Wed Aug 23 21:20:10 2006 +0200
@@ -27,10 +27,10 @@
 from MoinMoin.wikisync import TagStore, UnsupportedWikiException, SyncPage, NotAllowedException
 from MoinMoin.wikisync import MoinLocalWiki, MoinRemoteWiki, UP, DOWN, BOTH, MIMETYPE_MOIN
 from MoinMoin.util.bdiff import decompress, patch, compress, textdiff
-from MoinMoin.util import diff3
+from MoinMoin.util import diff3, rpc_aggregator
 
 
-debug = False
+debug = True
 
 
 # map sync directions
@@ -219,179 +219,187 @@
             m_pages = SyncPage.filter(m_pages, params["pageMatch"].match)
             self.log_status(self.INFO, _("After filtering: %s pages"), (str(len(m_pages)), ))
 
-        def handle_page(sp):
-            # XXX add locking, acquire read-lock on sp
-            if debug:
-                self.log_status(ActionClass.INFO, raw_suffix="Processing %r" % sp)
-
-            local_pagename = sp.local_name
-            current_page = PageEditor(self.request, local_pagename) # YYY direct access
-            comment = u"Local Merge - %r" % (remote.get_interwiki_name() or remote.get_iwid())
-
-            tags = TagStore(current_page)
-
-            matching_tags = tags.fetch(iwid_full=remote.iwid_full, direction=match_direction)
-            matching_tags.sort()
-            if debug:
-                self.log_status(ActionClass.INFO, raw_suffix="Tags: %r [[BR]] All: %r" % (matching_tags, tags.tags))
-
-            # some default values for non matching tags
-            normalised_name = None
-            remote_rev = None
-            local_rev = sp.local_rev # merge against the newest version
-            old_contents = ""
-
-            if matching_tags:
-                newest_tag = matching_tags[-1]
-
-                local_change = newest_tag.current_rev != sp.local_rev
-                remote_change = newest_tag.remote_rev != sp.remote_rev
+        class handle_page(rpc_aggregator.RPCYielder):
+            def run(yielder, sp):
+                # XXX add locking, acquire read-lock on sp
+                if debug:
+                    self.log_status(ActionClass.INFO, raw_suffix="Processing %r" % sp)
+    
+                local_pagename = sp.local_name
+                current_page = PageEditor(self.request, local_pagename) # YYY direct access
+                comment = u"Local Merge - %r" % (remote.get_interwiki_name() or remote.get_iwid())
 
-                # handle some cases where we cannot continue for this page
-                if not remote_change and (direction == DOWN or not local_change):
-                    return # no changes done, next page
-                if sp.local_deleted and sp.remote_deleted:
-                    return
-                if sp.remote_deleted and not local_change:
-                    msg = local.delete_page(sp.local_name, comment)
-                    if not msg:
-                        self.log_status(ActionClass.INFO, _("Deleted page %s locally."), (sp.name, ))
-                    else:
-                        self.log_status(ActionClass.ERROR, _("Error while deleting page %s locally:"), (sp.name, ), msg)
-                    return
-                if sp.local_deleted and not remote_change:
-                    if direction == DOWN:
+                tags = TagStore(current_page)
+
+                matching_tags = tags.fetch(iwid_full=remote.iwid_full, direction=match_direction)
+                matching_tags.sort()
+                if debug:
+                    self.log_status(ActionClass.INFO, raw_suffix="Tags: %r [[BR]] All: %r" % (matching_tags, tags.tags))
+
+                # some default values for non matching tags
+                normalised_name = None
+                remote_rev = None
+                local_rev = sp.local_rev # merge against the newest version
+                old_contents = ""
+
+                if matching_tags:
+                    newest_tag = matching_tags[-1]
+    
+                    local_change = newest_tag.current_rev != sp.local_rev
+                    remote_change = newest_tag.remote_rev != sp.remote_rev
+
+                    # handle some cases where we cannot continue for this page
+                    if not remote_change and (direction == DOWN or not local_change):
+                        return # no changes done, next page
+                    if sp.local_deleted and sp.remote_deleted:
                         return
-                    msg = remote.delete_page(sp.remote_name, sp.remote_rev, local_full_iwid)
-                    if not msg:
-                        self.log_status(ActionClass.INFO, _("Deleted page %s remotely."), (sp.name, ))
+                    if sp.remote_deleted and not local_change:
+                        msg = local.delete_page(sp.local_name, comment)
+                        if not msg:
+                            self.log_status(ActionClass.INFO, _("Deleted page %s locally."), (sp.name, ))
+                        else:
+                            self.log_status(ActionClass.ERROR, _("Error while deleting page %s locally:"), (sp.name, ), msg)
+                        return
+                    if sp.local_deleted and not remote_change:
+                        if direction == DOWN:
+                            return
+                        yield remote.delete_page_pre(sp.remote_name, sp.remote_rev, local_full_iwid)
+                        msg = remote.delete_page_post(yielder.fetch_result())
+                        if not msg:
+                            self.log_status(ActionClass.INFO, _("Deleted page %s remotely."), (sp.name, ))
+                        else:
+                            self.log_status(ActionClass.ERROR, _("Error while deleting page %s remotely:"), (sp.name, ), msg)
+                        return
+                    if sp.local_mime_type != MIMETYPE_MOIN and not (local_change ^ remote_change):
+                        self.log_status(ActionClass.WARN, _("The item %s cannot be merged but was changed in both wikis. Please delete it in one of both wikis and try again."), (sp.name, ))
+                        return
+                    if sp.local_mime_type != sp.remote_mime_type:
+                        self.log_status(ActionClass.WARN, _("The item %s has different mime types in both wikis and cannot be merged. Please delete it in one of both wikis or unify the mime type, and try again."), (sp.name, ))
+                        return
+                    if newest_tag.normalised_name != sp.name:
+                        self.log_status(ActionClass.WARN, _("The item %s was renamed locally. This is not implemented yet. Therefore the full syncronisation history is lost for this page."), (sp.name, )) # XXX implement renames
                     else:
-                        self.log_status(ActionClass.ERROR, _("Error while deleting page %s remotely:"), (sp.name, ), msg)
-                    return
-                if sp.local_mime_type != MIMETYPE_MOIN and not (local_change ^ remote_change):
-                    self.log_status(ActionClass.WARN, _("The item %s cannot be merged but was changed in both wikis. Please delete it in one of both wikis and try again."), (sp.name, ))
-                    return
-                if sp.local_mime_type != sp.remote_mime_type:
-                    self.log_status(ActionClass.WARN, _("The item %s has different mime types in both wikis and cannot be merged. Please delete it in one of both wikis or unify the mime type, and try again."), (sp.name, ))
-                    return
-                if newest_tag.normalised_name != sp.name:
-                    self.log_status(ActionClass.WARN, _("The item %s was renamed locally. This is not implemented yet. Therefore the full syncronisation history is lost for this page."), (sp.name, )) # XXX implement renames
+                        normalised_name = newest_tag.normalised_name
+                        local_rev = newest_tag.current_rev
+                        remote_rev = newest_tag.remote_rev
+                        old_contents = Page(self.request, local_pagename, rev=newest_tag.current_rev).get_raw_body_str() # YYY direct access
                 else:
-                    normalised_name = newest_tag.normalised_name
-                    local_rev = newest_tag.current_rev
-                    remote_rev = newest_tag.remote_rev
-                    old_contents = Page(self.request, local_pagename, rev=newest_tag.current_rev).get_raw_body_str() # YYY direct access
-
-            self.log_status(ActionClass.INFO, _("Synchronising page %s with remote page %s ..."), (local_pagename, sp.remote_name))
+                    if (sp.local_deleted and not sp.remote_rev) or (
+                        sp.remote_deleted and not sp.local_rev):
+                        return
 
-            if direction == DOWN:
-                remote_rev = None # always fetch the full page, ignore remote conflict check
-                patch_base_contents = ""
-            else:
-                patch_base_contents = old_contents
+                self.log_status(ActionClass.INFO, _("Synchronising page %s with remote page %s ..."), (local_pagename, sp.remote_name))
 
-            if remote_rev != sp.remote_rev:
-                if sp.remote_deleted: # ignore remote changes
-                    current_remote_rev = sp.remote_rev
-                    is_remote_conflict = False
+                if direction == DOWN:
+                    remote_rev = None # always fetch the full page, ignore remote conflict check
+                    patch_base_contents = ""
+                else:
+                    patch_base_contents = old_contents
+
+                if remote_rev != sp.remote_rev:
+                    if sp.remote_deleted: # ignore remote changes
+                        current_remote_rev = sp.remote_rev
+                        is_remote_conflict = False
+                        diff = None
+                        self.log_status(ActionClass.WARN, _("The page %s was deleted remotely but changed locally."), (sp.name, ))
+                    else:
+                        yield remote.get_diff_pre(sp.remote_name, remote_rev, None, normalised_name)
+                        diff_result = remote.get_diff_post(yielder.fetch_result())
+                        if diff_result is None:
+                            self.log_status(ActionClass.ERROR, _("The page %s could not be synced. The remote page was renamed. This is not supported yet. You may want to delete one of the pages to get it synced."), (sp.remote_name, ))
+                            return
+                        is_remote_conflict = diff_result["conflict"]
+                        assert diff_result["diffversion"] == 1
+                        diff = diff_result["diff"]
+                        current_remote_rev = diff_result["current"]
+                else:
+                    current_remote_rev = remote_rev
+                    if sp.local_mime_type == MIMETYPE_MOIN:
+                        is_remote_conflict = wikiutil.containsConflictMarker(old_contents.decode("utf-8"))
+                    else:
+                        is_remote_conflict = NotImplemented
                     diff = None
-                    self.log_status(ActionClass.WARN, _("The page %s was deleted remotely but changed locally."), (sp.name, ))
-                else:
-                    diff_result = remote.get_diff(sp.remote_name, remote_rev, None, normalised_name)
-                    if diff_result is None:
-                        self.log_status(ActionClass.ERROR, _("The page %s could not be synced. The remote page was renamed. This is not supported yet. You may want to delete one of the pages to get it synced."), (sp.remote_name, ))
-                        return
-                    is_remote_conflict = diff_result["conflict"]
-                    assert diff_result["diffversion"] == 1
-                    diff = diff_result["diff"]
-                    current_remote_rev = diff_result["current"]
-            else:
-                current_remote_rev = remote_rev
-                if sp.local_mime_type == MIMETYPE_MOIN:
-                    is_remote_conflict = wikiutil.containsConflictMarker(old_contents.decode("utf-8"))
-                else:
-                    is_remote_conflict = NotImplemented
-                diff = None
-
-            # do not sync if the conflict is remote and local, or if it is local
-            # and the page has never been syncronised
-            if (sp.local_mime_type == MIMETYPE_MOIN and wikiutil.containsConflictMarker(current_page.get_raw_body()) # YYY direct access
-                and (remote_rev is None or is_remote_conflict)):
-                self.log_status(ActionClass.WARN, _("Skipped page %s because of a locally or remotely unresolved conflict."), (local_pagename, ))
-                return
-
-            if remote_rev is None and direction == BOTH:
-                self.log_status(ActionClass.INFO, _("This is the first synchronisation between the local and the remote wiki for the page %s."), (sp.name, ))
-
-            if sp.remote_deleted:
-                remote_contents = ""
-            elif diff is None:
-                remote_contents = old_contents
-            else:
-                remote_contents = patch(patch_base_contents, decompress(diff))
 
-            if sp.local_mime_type == MIMETYPE_MOIN:
-                remote_contents_unicode = remote_contents.decode("utf-8")
-                # here, the actual 3-way merge happens
-                merged_text = diff3.text_merge(old_contents.decode("utf-8"), remote_contents_unicode, current_page.get_raw_body(), 1, *conflict_markers) # YYY direct access
-                if debug:
-                    self.log_status(ActionClass.INFO, raw_suffix="Merging %r, %r and %r into %r" % (old_contents.decode("utf-8"), remote_contents_unicode, current_page.get_raw_body(), merged_text))
-                merged_text_raw = merged_text.encode("utf-8")
-            else:
-                if diff is None:
-                    merged_text_raw = remote_contents
-                else:
-                    merged_text_raw = current_page.get_raw_body_str() # YYY direct access
-
-            diff = textdiff(remote_contents, merged_text_raw)
-            if debug:
-                self.log_status(ActionClass.INFO, raw_suffix="Diff against %r" % remote_contents)
+                # do not sync if the conflict is remote and local, or if it is local
+                # and the page has never been syncronised
+                if (sp.local_mime_type == MIMETYPE_MOIN and wikiutil.containsConflictMarker(current_page.get_raw_body()) # YYY direct access
+                    and (remote_rev is None or is_remote_conflict)):
+                    self.log_status(ActionClass.WARN, _("Skipped page %s because of a locally or remotely unresolved conflict."), (local_pagename, ))
+                    return
 
-            # XXX upgrade to write lock
-            try:
-                local_change_done = True
-                current_page.saveText(merged_text, sp.local_rev or 0, comment=comment) # YYY direct access
-            except PageEditor.Unchanged:
-                local_change_done = False
-            except PageEditor.EditConflict:
-                local_change_done = False
-                assert False, "You stumbled on a problem with the current storage system - I cannot lock pages"
-
-            new_local_rev = current_page.get_real_rev() # YYY direct access
+                if remote_rev is None and direction == BOTH:
+                    self.log_status(ActionClass.INFO, _("This is the first synchronisation between the local and the remote wiki for the page %s."), (sp.name, ))
 
-            def rollback_local_change(): # YYY direct local access
-                rev = new_local_rev - 1
-                revstr = '%08d' % rev
-                oldpg = Page(self.request, sp.local_name, rev=rev)
-                pg = PageEditor(self.request, sp.local_name)
-                savemsg = pg.saveText(oldpg.get_raw_body(), 0, comment=u"Wikisync rollback", extra=revstr, action="SAVE/REVERT")
+                if sp.remote_deleted:
+                    remote_contents = ""
+                elif diff is None:
+                    remote_contents = old_contents
+                else:
+                    remote_contents = patch(patch_base_contents, decompress(diff))
 
-            try:
-                if direction == BOTH:
+                if sp.local_mime_type == MIMETYPE_MOIN:
+                    remote_contents_unicode = remote_contents.decode("utf-8")
+                    # here, the actual 3-way merge happens
+                    merged_text = diff3.text_merge(old_contents.decode("utf-8"), remote_contents_unicode, current_page.get_raw_body(), 1, *conflict_markers) # YYY direct access
+                    if debug:
+                        self.log_status(ActionClass.INFO, raw_suffix="Merging %r, %r and %r into %r" % (old_contents.decode("utf-8"), remote_contents_unicode, current_page.get_raw_body(), merged_text))
+                    merged_text_raw = merged_text.encode("utf-8")
+                else:
+                    if diff is None:
+                        merged_text_raw = remote_contents
+                    else:
+                        merged_text_raw = current_page.get_raw_body_str() # YYY direct access
+
+                diff = textdiff(remote_contents, merged_text_raw)
+                if debug:
+                    self.log_status(ActionClass.INFO, raw_suffix="Diff against %r" % remote_contents)
+
+                # XXX upgrade to write lock
+                try:
+                    local_change_done = True
+                    current_page.saveText(merged_text, sp.local_rev or 0, comment=comment) # YYY direct access
+                except PageEditor.Unchanged:
+                    local_change_done = False
+                except PageEditor.EditConflict:
+                    local_change_done = False
+                    assert False, "You stumbled on a problem with the current storage system - I cannot lock pages"
+
+                new_local_rev = current_page.get_real_rev() # YYY direct access
+
+                def rollback_local_change(): # YYY direct local access
+                    rev = new_local_rev - 1
+                    revstr = '%08d' % rev
+                    oldpg = Page(self.request, sp.local_name, rev=rev)
+                    pg = PageEditor(self.request, sp.local_name)
                     try:
-                        very_current_remote_rev = remote.merge_diff(sp.remote_name, compress(diff), new_local_rev, current_remote_rev, current_remote_rev, local_full_iwid, sp.name)
-                    except NotAllowedException:
-                        self.log_status(ActionClass.ERROR, _("The page %s could not be merged because you are not allowed to modify the page in the remote wiki."), (sp.name, ))
-                        return
+                        savemsg = pg.saveText(oldpg.get_raw_body(), 0, comment=u"Wikisync rollback", extra=revstr, action="SAVE/REVERT")
+                    except PageEditor.Unchanged:
+                        pass
+
+                if direction == BOTH:
+                    yield remote.merge_diff_pre(sp.remote_name, compress(diff), new_local_rev, current_remote_rev, current_remote_rev, local_full_iwid, sp.name)
+                    try:
+                        try:
+                            very_current_remote_rev = remote.merge_diff_post(yielder.fetch_result())
+                        except NotAllowedException:
+                            self.log_status(ActionClass.ERROR, _("The page %s could not be merged because you are not allowed to modify the page in the remote wiki."), (sp.name, ))
+                            return
+                    finally:
+                        if local_change_done:
+                            rollback_local_change()
                 else:
                     very_current_remote_rev = current_remote_rev
 
-                local_change_done = False # changes are committed remotely, all is fine
-            finally:
-                if local_change_done:
-                    rollback_local_change()
-
-            tags.add(remote_wiki=remote_full_iwid, remote_rev=very_current_remote_rev, current_rev=new_local_rev, direction=direction, normalised_name=sp.name)
+                tags.add(remote_wiki=remote_full_iwid, remote_rev=very_current_remote_rev, current_rev=new_local_rev, direction=direction, normalised_name=sp.name)
 
-            if sp.local_mime_type != MIMETYPE_MOIN or not wikiutil.containsConflictMarker(merged_text):
-                self.log_status(ActionClass.INFO, _("Page %s successfully merged."), (sp.name, ))
-            else:
-                self.log_status(ActionClass.WARN, _("Page %s merged with conflicts."), (sp.name, ))
+                if sp.local_mime_type != MIMETYPE_MOIN or not wikiutil.containsConflictMarker(merged_text):
+                    self.log_status(ActionClass.INFO, _("Page %s successfully merged."), (sp.name, ))
+                else:
+                    self.log_status(ActionClass.WARN, _("Page %s merged with conflicts."), (sp.name, ))
 
-            # XXX release lock
+                # XXX release lock
 
-        for sp in m_pages:
-            handle_page(sp)
+        rpc_aggregator.scheduler(remote.create_multicall_object, handle_page, m_pages, 8)
 
 
 def execute(pagename, request):