-
Simple scheduled message queue in Python
Here’s a very simple message queue using Python’s sched module and processing (available as multiprocessing in Python 2.6). This lets you asynchronously schedule events to occur at a specific time. It would be very easy to modify this to process messages with a pool of workers, or use
threadinginstead ofprocessing. There is one thing I could use lazyweb’s help with: find places in the code where I need to use a lock or where I am ignoring these guidelines.Update: Here’s a cleaned up version using threads.
""" Simple message queue. Messages are scheduled and processed in a single worker process spawned from the main process. Thus, events are enqueued asynchronously, but processed in a linear fashion. """ import sched import time from processing import Queue, Process from processing.queue import Empty def delay_put(duration, queue, message): time.sleep(duration) queue.put(message) queue.close() class Scheduler(sched.scheduler): def __init__(self, queue, handler): delayfunc = self.make_delay_func(queue, handler) sched.scheduler.__init__(self, time.time, delayfunc) def make_delay_func(self, queue, handler): def delay(duration): if duration > 0: # Spawn a process that will sleep, enqueue None, and exit. Process(target=delay_put, args=(duration, queue, None)).start() try: message = queue.get(True, duration + TIMEOUT) # Block! except Empty: print "Timed out." else: if message is not None: # A message was enqueued during the delay. timestamp = message.get('timestamp', time.time()) priority = message.get('priority', 1) self.enterabs(timestamp, priority, handler, (message,)) return delay def startup(self): print "Starting scheduler!" def run(self): # Schedule the `startup` event to trigger `delayfunc`. self.enter(0, 0, self.startup, ()) sched.scheduler.run(self) def handle(message): print "[%s] MESSAGE: %s" % (time.time(), message) def run_scheduler(scheduler): scheduler.run() print "Scheduler done." QUEUE = Queue() # Message queue. Use `enqueue` to add messages. TIMEOUT = 10 # Seconds for scheduler to wait for items in queue. SCHEDULER = Scheduler(QUEUE, handle) # Message handler scheduler. PROCESS = None # Process running the scheduler. def enqueue(message): global PROCESS QUEUE.put(message) if PROCESS is None or PROCESS.getExitCode() is not None: # There is no scheduler process running; start one. PROCESS = Process(target=run_scheduler, args=(SCHEDULER,)) PROCESS.start()Here’s a usage example:
>>> import time >>> enqueue({'data': 1}) Starting scheduler! [2008-10-17 14:33:56.212] MESSAGE: {'data': 1} >>> enqueue({'data': 3, 'timestamp': time.time() + 10}) >>> enqueue({'data': 2, 'timestamp': time.time() + 7}) >>> enqueue({'data': 4, 'timestamp': time.time() + 15}) >>> time.sleep(26) [2008-10-17 14:34:03.221] MESSAGE: {'timestamp': 1224268443.219, 'data': 2} [2008-10-17 14:34:06.217] MESSAGE: {'timestamp': 1224268446.215, 'data': 3} [2008-10-17 14:34:11.225] MESSAGE: {'timestamp': 1224268451.222, 'data': 4} Timed out. Scheduler done. >>> enqueue({'data': 5, 'timestamp': time.time() + 5}) Starting scheduler! [2008-10-17 14:34:27.233] MESSAGE: {'timestamp': 1224268467.232, 'data': 5} Timed out. Scheduler done.