Source code for fsleyes_props.callqueue

#!/usr/bin/env python
#
# callqueue.py - A queue for calling functions sequentially.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module provides the :class:`CallQueue` class, which is used by
:class:`.PropertyValue` instances to enqueue and execute property listener
callback functions.
"""


import logging

import queue

import fsl.utils.idle  as idle


log = logging.getLogger(__name__)


[docs] class Call: """A little class which is used to represent function calls that are on the queue. """ def __init__(self, func, name, args, kwargs): self.func = func self.name = name self.args = args self.kwargs = kwargs self.execute = True
# The CallQueue.dequeue method sets the # above execute attribute to False for # calls which are to be dequeued - this # causees the CallQueue.__call method # to skip over the call.
[docs] class CallQueue: """A queue of functions to be called. Functions can be enqueued via the :meth:`call` or :meth:`callAll` methods. """ def __init__(self, skipDuplicates=False): """Create a ``CallQueue`` instance. If ``skipDuplicates`` is ``True``, a function which is already on the queue will be silently dropped if an attempt is made to add it again. .. note:: The ``skipDuplicates`` test is based solely on the name of the function. This means that the ``CallQueue`` does not support enqueueing the same function with different arguments. Testing for function *and* argument equality is a difficult task: - I can't take the hash of the arguments, as I can't assume that they are hashable (e.g. ``numpy`` arrays). - I can't test for identity, as things which have the same value may not have the same id (e.g. strings). - I can't test argument equality, because in some cases the argument may be a mutable type (e.g. a ``list``), and its value may have changed between the time the function was queued, and the time it is called. *And* the arguments might be big (again, ``numpy`` arrays), so an equality test could be expensive. So this is quite a pickle. Something to come back to if things are breaking because of it. **Holding the queue** The :meth:`hold` method temporarily stops the ``CallQueue`` from queueing and executing functions. Any functions which are enqueued while the queue is held are kept in a separate queue. The queue is released via the :meth:`release` method, after which any held functions may be accessed via the :meth:`clearHeld` method (which also clears the internal queue of held functions). Once the queue has been released, these held functions can be re-queued as normal via the :meth:`call` or :meth:`callAll` methods. """ # The queue is a queue of Call instances # # The queued dict contains mappings of # {name : [List of Call instances]} # self.__queue = queue.Queue() self.__queued = {} self.__skipDuplicates = skipDuplicates self.__calling = False # Every call to hold will increment # this count, and every call to # release will decrement it. self.__holding = 0 # If the queue is being held, enqueued # functions are added to this list self.__held = []
[docs] @idle.mutex def dequeue(self, name): """If the specified function is on the queue, it is (effectively) dequeued, and not executed. If ``skipDuplicates`` is ``False``, and more than one function of the same name is enqueued, they are all dequeued. """ # Get all calls with the specified name calls = self.__queued.get(name, []) # Set each of their execute flags to # False - the __call method will skip # over Call instances with execute=False for call in calls: self.__debug(call, 'Dequeueing function', 'from queue') call.execute = False # Check the held queue as well for call in self.__held: if call.name == name: self.__debug(call, 'Dequeueing held function', 'from queue') call.execute = False
[docs] def call(self, func, name, *args, **kwargs): """Enqueues the given function, and calls all functions in the queue (unless the call to this method was as a result another function being called from the queue). """ if self.__push(Call(func, name, args, kwargs)): self.__call()
[docs] def callAll(self, funcs): """Enqueues all of the given functions, and calls all functions in the queue. (unless the call to this method was as a result another function being called from the queue). Assumes that the given ``funcs`` parameter is a list of ``(function, name, args, kwargs)`` tuples. """ anyEnqueued = False for func in funcs: if self.__push(Call(*func)): anyEnqueued = True if anyEnqueued: self.__call()
[docs] @idle.mutex def hold(self): """Holds the queue. For every call to ``hold``, the :meth:`release` method must be called once before the queue will be truly released. """ self.__holding += 1
[docs] @idle.mutex def release(self): """Releases the queue. """ self.__holding -= 1
[docs] @idle.mutex def clearHeld(self): """Clears and returns the list of held functions. """ if self.__holding > 0: return [] held = self.__held self.__held = [] return [(c.func, c.name, c.args, c.kwargs) for c in held if c.execute]
def __call(self): """Call all of the functions which are currently enqueued. This method is not re-entrant - if a call to one of the functions in the queue triggers another call to this method, this second call will return immediately without doing anything. """ if self.__calling: return self.__calling = True while True: try: call = self.__pop() if not call.execute: self.__debug(call, 'Skipping dequeued function') continue self.__debug(call, 'Calling function') try: call.func(*call.args, **call.kwargs) except Exception as e: import traceback log.warning('Function {} raised exception: {}'.format( call.name, e), exc_info=True) traceback.print_stack() except queue.Empty: break self.__calling = False @idle.mutex def __push(self, call): """Enqueues the given ``Call`` instance. If the queue has been held (see :meth:`hold`), the call is stored, and ``False`` is returned. If ``True`` was passed in for the ``skipDuplicates`` parameter during initialisation, and the function is already enqueued, it is not added to the queue, and this method returns ``False``. Otherwise, this method returnes ``True``. """ enqueued = self.__queued.get(call.name, []) if self.__holding > 0: self.__debug(call, 'Holding function') self.__held.append(call) return False # Skip this function if there are already # functions in the queue with the same name if self.__skipDuplicates and (len(enqueued) > 0): self.__debug(call, 'Skipping function') return False self.__debug(call, 'Queueing function', 'to queue') self.__queue.put_nowait(call) self.__queued[call.name] = enqueued + [call] return True @idle.mutex def __pop(self): """Pops the next function from the queue and returns the ``Call`` instance which encapsulates it. """ call = self.__queue.get_nowait() enqueued = self.__queued[call.name] # TODO shouldn't the call always # be at the end of the list? # # This will be a little bit faster # if you remove the call by index. enqueued.remove(call) if len(enqueued) == 0: self.__queued.pop(call.name) return call def __debug(self, call, prefix, postfix=None): """Prints a standardised log message.""" if log.getEffectiveLevel() != logging.DEBUG: return funcName, modName = self.__getCallbackDetails(call.func) if postfix is None: postfix = ' ' else: postfix = ' {} '.format(postfix) log.debug('{} {} [{}.{}]{}({} in queue)'.format( prefix, call.name, modName, funcName, postfix, self.__queue.qsize())) def __getCallbackDetails(self, cb): """Returns the function name and module name of the given function reference. Used purely for debug log statements. """ import inspect if log.getEffectiveLevel() != logging.DEBUG: return '', '' members = inspect.getmembers(cb) funcName = '' modName = '' for name, val in members: if name == '__func__': funcName = val.__name__ modName = val.__module__ break return funcName, modName