view MoinMoin/util/ @ 2286:01f05e74aa9c

Big PEP8 and whitespace cleanup
author Thomas Waldmann <tw AT waldmann-edv DOT de>
date Mon, 02 Jul 2007 23:33:20 +0200
parents bb2e053067fb
line wrap: on
line source

# -*- coding: iso-8859-1 -*-
    MoinMoin - RPC Aggregator

    Aggregates RPC calls into MultiCall batches in order to increase
    the speed.

    @copyright: 2006 MoinMoin:AlexanderSchremmer
    @license: GNU GPL, see COPYING for details.

import xmlrpclib
INVALID = object()

class RPCYielder(object):
    """ If you want to have a batchable function, you need
    to inherit from this class and define a method "run" that
    takes exactly one argument.
    This method has to be a generator that yields (func, arg)
    tuples whereas func is the RPC method name (str).
    You can fetch the calls by calling fetch_call(),
    then you have to return the result by calling set_result(res).

    def __init__(self, arg, raise_fault=False):
        self._comm_slot = [INVALID]
        self.raise_fault = raise_fault
        self._gen =

    def fetch_call(self):
            next_item =
        except StopIteration:
            return None
        return next_item

    def set_result(self, result):
        self._comm_slot[0] = result

    def fetch_result(self):
        result = self._comm_slot[0]
            if result is INVALID:
                return RuntimeError("Invalid state, there is no result to fetch.")
            if self.raise_fault and isinstance(result, xmlrpclib.Fault):
                raise result
                return result
            self._comm_slot[0] = INVALID

    def run(self, arg):
        return NotImplementedError

def scheduler(multicall_func, handler, args, max_calls=10, prepare_multicall_func=None):
    # all generator (or better, RPCYielder) instances
    gens = []
    # those instances that have to be queried in the next step again
    gens_todo = []
    # pending calls, stored as tuples: (generator, (funcname, (args,*)))
    call_list = []

    # instantiate generators
    for arg in args:
    # schedule generators
    while gens:
        for gen in gens:
            if len(call_list) > max_calls:
            call = gen.fetch_call()
            if call is not None:
                call_list.append((gen, call))
        if call_list:
            if prepare_multicall_func is not None:
                pre_calls = [(RPCYielder(0), x) for x in prepare_multicall_func()]
                call_list = pre_calls + call_list

            m = multicall_func()
            gens_result = [] # generators that will get a result
            for gen, (func, args) in call_list:
                getattr(m, func)(*args) # register call
            result = iter(m()) # execute multicall
            for gen in gens_result:
                    item =
                except xmlrpclib.Fault, e:
                    # this exception is reraised by the RPCYielder
                    item = e
            call_list = []
        gens = gens_todo
        gens_todo = []

def scheduler_simple(multicall_func, handler, args):
    for arg in args:
        cur_handler = handler(arg)
        while 1:
            call = cur_handler.fetch_call()
            if call is not None:
                func, arg = call
                m = multicall_func()
                getattr(m, func)(arg) # register call
                result = iter(m()) # execute multicall
                    item =
                except xmlrpclib.Fault, e:
                    # this exception is reraised by the RPCYielder
                    item = e