Oct 17

Simple scheduled message queue in Python

Here’s a very simple message queue using Python’s sched module and processing (available as multiprocessing in Python 2.6). This lets you asynchronously schedule events to occur at a specific time. It would be very easy to modify this to process messages with a pool of workers, or use threading instead of processing. There is one thing I could use lazyweb’s help with: find places in the code where I need to use a lock or where I am ignoring these guidelines.

Update: Here’s a cleaned up version using threads.

"""
Simple message queue.

Messages are scheduled and processed in a single worker process spawned
from the main process.  Thus, events are enqueued asynchronously, but
processed in a linear fashion.

"""
import sched
import time
from processing import Queue, Process
from processing.queue import Empty


def delay_put(duration, queue, message):
    time.sleep(duration)
    queue.put(message)
    queue.close()

class Scheduler(sched.scheduler):
    def __init__(self, queue, handler):
        delayfunc = self.make_delay_func(queue, handler)
        sched.scheduler.__init__(self, time.time, delayfunc)

    def make_delay_func(self, queue, handler):
        def delay(duration):
            if duration > 0:
                # Spawn a process that will sleep, enqueue None, and exit.
                Process(target=delay_put, args=(duration, queue, None)).start()
            try:
                message = queue.get(True, duration + TIMEOUT) # Block!
            except Empty:
                print "Timed out."
            else:
                if message is not None:
                    # A message was enqueued during the delay.
                    timestamp = message.get('timestamp', time.time())
                    priority = message.get('priority', 1)
                    self.enterabs(timestamp, priority, handler, (message,))
        return delay

    def startup(self):
        print "Starting scheduler!"

    def run(self):
        # Schedule the `startup` event to trigger `delayfunc`.
        self.enter(0, 0, self.startup, ())
        sched.scheduler.run(self)

def handle(message):
    print "[%s] MESSAGE: %s" % (time.time(), message)

def run_scheduler(scheduler):
    scheduler.run()
    print "Scheduler done."

QUEUE = Queue() # Message queue.  Use `enqueue` to add messages.
TIMEOUT = 10 # Seconds for scheduler to wait for items in queue.
SCHEDULER = Scheduler(QUEUE, handle) # Message handler scheduler.
PROCESS = None # Process running the scheduler.

def enqueue(message):
    global PROCESS
    QUEUE.put(message)
    if PROCESS is None or PROCESS.getExitCode() is not None:
        # There is no scheduler process running; start one.
        PROCESS = Process(target=run_scheduler, args=(SCHEDULER,))
        PROCESS.start()

Here’s a usage example:

>>> import time
>>> enqueue({'data': 1})
Starting scheduler!
[2008-10-17 14:33:56.212] MESSAGE: {'data': 1}

>>> enqueue({'data': 3, 'timestamp': time.time() + 10})
>>> enqueue({'data': 2, 'timestamp': time.time() + 7})
>>> enqueue({'data': 4, 'timestamp': time.time() + 15})
>>> time.sleep(26)
[2008-10-17 14:34:03.221] MESSAGE: {'timestamp': 1224268443.219, 'data': 2}
[2008-10-17 14:34:06.217] MESSAGE: {'timestamp': 1224268446.215, 'data': 3}
[2008-10-17 14:34:11.225] MESSAGE: {'timestamp': 1224268451.222, 'data': 4}
Timed out.
Scheduler done.

>>> enqueue({'data': 5, 'timestamp': time.time() + 5})
Starting scheduler!
[2008-10-17 14:34:27.233] MESSAGE: {'timestamp': 1224268467.232, 'data': 5}
Timed out.
Scheduler done.
Page 1 of 1