1. Simple scheduled message queue in Python

    Here’s a very simple message queue using Python’s sched module and processing (available as multiprocessing in Python 2.6). This lets you asynchronously schedule events to occur at a specific time. It would be very easy to modify this to process messages with a pool of workers, or use threading instead of processing. There is one thing I could use lazyweb’s help with: find places in the code where I need to use a lock or where I am ignoring these guidelines.

    Update: Here’s a cleaned up version using threads.

    """
    Simple message queue.
    
    Messages are scheduled and processed in a single worker process spawned
    from the main process.  Thus, events are enqueued asynchronously, but
    processed in a linear fashion.
    
    """
    import sched
    import time
    from processing import Queue, Process
    from processing.queue import Empty
    
    
    def delay_put(duration, queue, message):
        time.sleep(duration)
        queue.put(message)
        queue.close()
    
    class Scheduler(sched.scheduler):
        def __init__(self, queue, handler):
            delayfunc = self.make_delay_func(queue, handler)
            sched.scheduler.__init__(self, time.time, delayfunc)
    
        def make_delay_func(self, queue, handler):
            def delay(duration):
                if duration > 0:
                    # Spawn a process that will sleep, enqueue None, and exit.
                    Process(target=delay_put, args=(duration, queue, None)).start()
                try:
                    message = queue.get(True, duration + TIMEOUT) # Block!
                except Empty:
                    print "Timed out."
                else:
                    if message is not None:
                        # A message was enqueued during the delay.
                        timestamp = message.get('timestamp', time.time())
                        priority = message.get('priority', 1)
                        self.enterabs(timestamp, priority, handler, (message,))
            return delay
    
        def startup(self):
            print "Starting scheduler!"
    
        def run(self):
            # Schedule the `startup` event to trigger `delayfunc`.
            self.enter(0, 0, self.startup, ())
            sched.scheduler.run(self)
    
    def handle(message):
        print "[%s] MESSAGE: %s" % (time.time(), message)
    
    def run_scheduler(scheduler):
        scheduler.run()
        print "Scheduler done."
    
    QUEUE = Queue() # Message queue.  Use `enqueue` to add messages.
    TIMEOUT = 10 # Seconds for scheduler to wait for items in queue.
    SCHEDULER = Scheduler(QUEUE, handle) # Message handler scheduler.
    PROCESS = None # Process running the scheduler.
    
    def enqueue(message):
        global PROCESS
        QUEUE.put(message)
        if PROCESS is None or PROCESS.getExitCode() is not None:
            # There is no scheduler process running; start one.
            PROCESS = Process(target=run_scheduler, args=(SCHEDULER,))
            PROCESS.start()
    

    Here’s a usage example:

    >>> import time
    >>> enqueue({'data': 1})
    Starting scheduler!
    [2008-10-17 14:33:56.212] MESSAGE: {'data': 1}
    
    >>> enqueue({'data': 3, 'timestamp': time.time() + 10})
    >>> enqueue({'data': 2, 'timestamp': time.time() + 7})
    >>> enqueue({'data': 4, 'timestamp': time.time() + 15})
    >>> time.sleep(26)
    [2008-10-17 14:34:03.221] MESSAGE: {'timestamp': 1224268443.219, 'data': 2}
    [2008-10-17 14:34:06.217] MESSAGE: {'timestamp': 1224268446.215, 'data': 3}
    [2008-10-17 14:34:11.225] MESSAGE: {'timestamp': 1224268451.222, 'data': 4}
    Timed out.
    Scheduler done.
    
    >>> enqueue({'data': 5, 'timestamp': time.time() + 5})
    Starting scheduler!
    [2008-10-17 14:34:27.233] MESSAGE: {'timestamp': 1224268467.232, 'data': 5}
    Timed out.
    Scheduler done.