view jabberbot/ @ 6133:a6283e189869 tip

fixup: remove nonexisting passlib.utils._blowfish this was removed by the passlib 1.7.1 upgrade.
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Thu, 01 Jun 2017 18:10:19 +0200
parents b397401dda7f
line wrap: on
line source
# -*- coding: iso-8859-1 -*-
    MoinMoin - a xmlrpc server and client for the notification bot

    @copyright: 2007 by Karol Nowak <>
    @license: GNU GPL, see COPYING for details.

import logging, xmlrpclib, Queue
from SimpleXMLRPCServer import SimpleXMLRPCServer
from threading import Thread

import jabberbot.commands as cmd

class ConfigurationError(Exception):

    def __init__(self, message):
        self.message = message

def _xmlrpc_decorator(function):
    """A decorator function, which adds some maintenance code

    This function takes care of preparing a MultiCall object and
    an authentication token, and deleting them at the end.

    def wrapped_func(self, command):
        # Dummy function, so that the string appears in a .po file
        _ = lambda x: x

        self.token = None
        self.multicall = xmlrpclib.MultiCall(self.connection)
        jid = command.jid
        if type(jid) is not list:
            jid = [jid]

                if self.token:

                function(self, command)

            except xmlrpclib.Fault, fault:
                msg = _("Your request has failed. The reason is:\n%(error)s")
                self.report_error(jid, msg, {'error': fault.faultString})
            except xmlrpclib.Error, err:
                msg = _("A serious error occurred while processing your request:\n%(error)s")
                self.report_error(jid, msg, {'error': str(err)})
            except Exception, exc:
                msg = _("An internal error has occurred, please contact the administrator.")
                self.report_error(jid, msg)

            del self.token
            del self.multicall

    return wrapped_func

class XMLRPCClient(Thread):
    """XMLRPC Client

    It's responsible for performing XMLRPC operations on
    a wiki, as inctructed by command objects received from
    the XMPP component"""

    def __init__(self, config, commands_in, commands_out):
        """A constructor

        @param commands_out: an output command queue (to xmpp)
        @param commands_in: an input command queue (from xmpp)

        self.log = logging.getLogger(__name__)

        if not config.secret:
            error = "You must set a (long) secret string!"
            raise ConfigurationError(error)

        self.commands_in = commands_in
        self.commands_out = commands_out
        self.config = config
        self.url = config.wiki_url + "?action=xmlrpc2"
        self.connection = self.create_connection()
        self.token = None
        self.multicall = None
        self.stopping = False

        self._cmd_handlers = {cmd.GetPage: self.get_page,
                              cmd.GetPageHTML: self.get_page_html,
                              cmd.GetPageList: self.get_page_list,
                              cmd.GetPageInfo: self.get_page_info,
                              cmd.GetUserLanguage: self.get_language_by_jid,
                              cmd.Search: self.do_search,
                              cmd.RevertPage: self.do_revert}

    def run(self):
        """Starts the server / thread"""
        while True:
            if self.stopping:

                command = self.commands_in.get(True, 2)
            except Queue.Empty:

    def stop(self):
        """Stop the thread"""
        self.stopping = True

    def create_connection(self):
        return xmlrpclib.ServerProxy(self.url, allow_none=True, verbose=self.config.verbose)

    def execute_command(self, command):
        """Execute commands coming from the XMPP component"""

        cmd_name = command.__class__

            handler = self._cmd_handlers[cmd_name]
        except KeyError:
            self.log.debug("No such command: " + cmd_name.__name__)


    def report_error(self, jid, text, data={}):
        """Reports an internal error

        @param jid: Jabber ID that should be informed about the error condition
        @param text: description of the error
        @param data: dictionary used to substitute strings in translated message
        @type data: dict

        # Dummy function, so that the string appears in a .po file
        _ = lambda x: x

        cmddata = {'text': text, 'data': data}
        report = cmd.NotificationCommandI18n(jid, cmddata, msg_type=u"chat", async=False)

    def get_auth_token(self, jid):
        """Get an auth token using user's Jabber ID

        @type jid: unicode
        # We have to use a bare JID
        jid = jid.split('/')[0]
        token = self.connection.getJabberAuthToken(jid, self.config.secret)
        if token:
            self.token = token

    def warn_no_credentials(self, jid):
        """Warn a given JID that credentials check failed

        @param jid: full JID to notify about failure
        @type jid: str

        # Dummy function, so that the string appears in a .po file
        _ = lambda x: x

        cmddata = {'text': _("Credentials check failed, you might be unable to see all information.")}
        warning = cmd.NotificationCommandI18n([jid], cmddata, async=False)

    def _get_multicall_result(self, jid):
        """Returns multicall results and issues a warning if there's an auth error

        @param jid: a full JID to use if there's an error
        @type jid: str


        if not self.token:
            result = self.multicall()[0]
            token_result = u"FAILURE"
            token_result, result = self.multicall()

        if token_result != u"SUCCESS":

        return result

    def get_page(self, command):
        """Returns a raw page"""

        self.multicall.getPage(command.pagename) = self._get_multicall_result(command.jid)

    get_page = _xmlrpc_decorator(get_page)

    def get_page_html(self, command):
        """Returns a html-formatted page"""

        self.multicall.getPageHTML(command.pagename) = self._get_multicall_result(command.jid)

    get_page_html = _xmlrpc_decorator(get_page_html)

    def get_page_list(self, command):
        """Returns a list of all accesible pages"""

        # Dummy function, so that the string appears in a .po file
        _ = lambda x: x

        cmd_data = {'text': _("This command may take a while to complete, please be patient...")}
        info = cmd.NotificationCommandI18n([command.jid], cmd_data, async=False, msg_type=u"chat")

        self.multicall.getAllPages() = self._get_multicall_result(command.jid)

    get_page_list = _xmlrpc_decorator(get_page_list)

    def get_page_info(self, command):
        """Returns detailed information about a given page"""

        self.multicall.getPageInfo(command.pagename) = self._get_multicall_result(command.jid)

    get_page_info = _xmlrpc_decorator(get_page_info)

    def do_search(self, command):
        """Performs a search"""

        # Dummy function, so that the string appears in a .po file
        _ = lambda x: x

        cmd_data = {'text': _("This command may take a while to complete, please be patient...")}
        info = cmd.NotificationCommandI18n([command.jid], cmd_data, async=False, msg_type=u"chat")

        c = command
        self.multicall.searchPagesEx(c.term, c.search_type, 30,, c.mtime, c.regexp) = self._get_multicall_result(command.jid)

    do_search = _xmlrpc_decorator(do_search)

    def do_revert(self, command):
        """Performs a page revert"""

        # Dummy function, so that the string appears in a .po file
        _ = lambda x: x

        self.multicall.revertPage(command.pagename, command.revision)
        data = self._get_multicall_result(command.jid)

        if type(data) == bool and data:
            cmd_data = {'text': _("Page has been reverted.")}
        elif isinstance(str, data) or isinstance(unicode, data):
            cmd_data = {'text': _("Revert failed: %(reason)s" % {'reason': data})}
            cmd_data = {'text': _("Revert failed.")}

        info = cmd.NotificationCommandI18n([command.jid], cmd_data, async=False, msg_type=u"chat")

    do_revert = _xmlrpc_decorator(do_revert)

    def get_language_by_jid(self, command):
        """Returns language of the a user identified by the given JID"""

        server = xmlrpclib.ServerProxy(self.config.wiki_url + "?action=xmlrpc2")
        language = "en"

            language = server.getUserLanguageByJID(command.jid)
        except xmlrpclib.Fault, fault:
        except xmlrpclib.Error, err:
        except Exception, exc:

        command.language = language

class XMLRPCServer(Thread):
    """XMLRPC Server

    It waits for notifications requests coming from wiki,
    creates command objects and puts them on a queue for
    later processing by the XMPP component

    @param commands: an input command queue

    def __init__(self, config, commands):
        self.commands = commands
        self.verbose = config.verbose
        self.log = logging.getLogger(__name__)
        self.config = config

        if config.secret:
            self.secret = config.secret
            error = "You must set a (long) secret string"
            raise ConfigurationError(error)

        self.server = None

    def run(self):
        """Starts the server / thread"""

        self.server = SimpleXMLRPCServer((self.config.xmlrpc_host, self.config.xmlrpc_port))

        # Register methods having an "export" attribute as XML RPC functions and
        # decorate them with a check for a shared (wiki-bot) secret.
        items = self.__class__.__dict__.items()
        methods = [(name, func) for (name, func) in items if callable(func)
                   and "export" in func.__dict__]

        for name, func in methods:
            self.server.register_function(self.secret_check(func), name)


    def secret_check(self, function):
        """Adds a check for a secret to a given function

        Using this one does not have to worry about checking for the secret
        in every XML RPC function.
        def protected_func(secret, *args):
            if secret != self.secret:
                raise xmlrpclib.Fault(1, "You are not allowed to use this bot!")
                return function(self, *args)

        return protected_func

    def send_notification(self, jids, notification):
        """Instructs the XMPP component to send a notification

        The notification dict has following entries:
        'text' - notification text (REQUIRED)
        'subject' - notification subject
        'url_list' - a list of dicts describing attached URLs

        @param jids: a list of JIDs to send a message to (bare JIDs)
        @type jids: a list of str or unicode
        @param notification: dictionary with notification data
        @type notification: dict

        command = cmd.NotificationCommand(jids, notification, async=True)
        return True
    send_notification.export = True

    def addJIDToRoster(self, jid):
        """Instructs the XMPP component to add a new JID to its roster

        @param jid: a jid to add, this must be a bare jid
        @type jid: str or unicode,

        command = cmd.AddJIDToRosterCommand(jid)
        return True
    addJIDToRoster.export = True

    def removeJIDFromRoster(self, jid):
        """Instructs the XMPP component to remove a JID from its roster

        @param jid: a jid to remove, this must be a bare jid
        @type jid: str or unicode

        command = cmd.RemoveJIDFromRosterCommand(jid)
        return True
    removeJIDFromRoster.export = True