1. Simple scheduled message queue (with threads)

    Here’s a more flexible version of the message queue in my last post. This version uses the threading module instead of processing, so it has no dependencies. See the new example after the code.

    """
    Simple message queue.
    
    Messages are scheduled and processed in a single worker thread spawned
    from the main process.  Thus, events are enqueued asynchronously, but
    processed in a linear fashion.
    
    """
    import time
    import sched
    from Queue import Queue, Empty
    from threading import Thread
    
    
    def delay_put(duration, queue, message):
        time.sleep(duration)
        queue.put(message)
    
    def run_scheduler(scheduler):
        scheduler.run()
    
    class Scheduler(sched.scheduler):
        def __init__(self, queue, handler, timeout):
            self.message_queue = queue
            self.handler = handler
            self.timeout = timeout
            sched.scheduler.__init__(self, time.time, self.delay)
    
        def delay(self, duration):
            queue = self.message_queue
            if duration > 0:
                # Spawn a process that will sleep, enqueue None, and exit.
                Thread(target=delay_put, args=(duration, queue, None)).start()
            try:
                message = queue.get(True, duration + self.timeout) # Block!
            except Empty:
                self.timed_out()
            else:
                if message is not None:
                   # A message was enqueued during the delay.
                    timestamp = message.get('timestamp', self.timefunc())
                    priority = message.get('priority', 1)
                    self.enterabs(timestamp, priority, self.handler, (message,))
    
        def timed_out(self):
            print "Timed out."
    
        def startup(self):
            print "Starting scheduler!"
    
        def shutdown(self):
            print "Scheduler done."
    
        def run(self):
            # Schedule the `startup` event to trigger `delayfunc`.
            self.enter(0, 0, self.startup, ())
            sched.scheduler.run(self)
            self.shutdown()
    
    class MessageQueue(object):
        def __init__(self, handler, timeout=10, scheduler_class=Scheduler):
            self.queue = Queue()
            self.scheduler = scheduler_class(self.queue, handler, timeout)
            self.worker = None
    
        def enqueue(self, message):
            self.queue.put(message)
            if not self.working():
                self.start_worker()
    
        def start_worker(self):
            self.worker = Thread(target=run_scheduler, args=(self.scheduler,))
            self.worker.start()
    
        def working(self):
            return self.worker is not None and self.worker.isAlive()
    

    >>> import time
    >>> def my_handler(message):
    ...     print time.time(), message
    
    >>> mq = MessageQueue(my_handler)
    >>> for i in range(1, 10):
    ...     now = time.time()
    ...     mq.enqueue({'data': i, 'timestamp': now + i})
    
    Starting scheduler!
    1224341361.32 {'timestamp': 1224341361.2808199, 'data': 1}
    1224341362.3 {'timestamp': 1224341362.2912149, 'data': 2}
    1224341363.31 {'timestamp': 1224341363.2913051, 'data': 3}
    1224341364.32 {'timestamp': 1224341364.2913489, 'data': 4}
    1224341365.32 {'timestamp': 1224341365.291404, 'data': 5}
    1224341366.3 {'timestamp': 1224341366.291467, 'data': 6}
    1224341367.32 {'timestamp': 1224341367.291549, 'data': 7}
    1224341368.34 {'timestamp': 1224341368.291626, 'data': 8}
    1224341369.34 {'timestamp': 1224341369.2921841, 'data': 9}
    Timed out.
    Scheduler done.