-
Simple scheduled message queue (with threads)
Here’s a more flexible version of the message queue in my last post. This version uses the threading module instead of processing, so it has no dependencies. See the new example after the code.
""" Simple message queue. Messages are scheduled and processed in a single worker thread spawned from the main process. Thus, events are enqueued asynchronously, but processed in a linear fashion. """ import time import sched from Queue import Queue, Empty from threading import Thread def delay_put(duration, queue, message): time.sleep(duration) queue.put(message) def run_scheduler(scheduler): scheduler.run() class Scheduler(sched.scheduler): def __init__(self, queue, handler, timeout): self.message_queue = queue self.handler = handler self.timeout = timeout sched.scheduler.__init__(self, time.time, self.delay) def delay(self, duration): queue = self.message_queue if duration > 0: # Spawn a process that will sleep, enqueue None, and exit. Thread(target=delay_put, args=(duration, queue, None)).start() try: message = queue.get(True, duration + self.timeout) # Block! except Empty: self.timed_out() else: if message is not None: # A message was enqueued during the delay. timestamp = message.get('timestamp', self.timefunc()) priority = message.get('priority', 1) self.enterabs(timestamp, priority, self.handler, (message,)) def timed_out(self): print "Timed out." def startup(self): print "Starting scheduler!" def shutdown(self): print "Scheduler done." def run(self): # Schedule the `startup` event to trigger `delayfunc`. self.enter(0, 0, self.startup, ()) sched.scheduler.run(self) self.shutdown() class MessageQueue(object): def __init__(self, handler, timeout=10, scheduler_class=Scheduler): self.queue = Queue() self.scheduler = scheduler_class(self.queue, handler, timeout) self.worker = None def enqueue(self, message): self.queue.put(message) if not self.working(): self.start_worker() def start_worker(self): self.worker = Thread(target=run_scheduler, args=(self.scheduler,)) self.worker.start() def working(self): return self.worker is not None and self.worker.isAlive()>>> import time >>> def my_handler(message): ... print time.time(), message >>> mq = MessageQueue(my_handler) >>> for i in range(1, 10): ... now = time.time() ... mq.enqueue({'data': i, 'timestamp': now + i}) Starting scheduler! 1224341361.32 {'timestamp': 1224341361.2808199, 'data': 1} 1224341362.3 {'timestamp': 1224341362.2912149, 'data': 2} 1224341363.31 {'timestamp': 1224341363.2913051, 'data': 3} 1224341364.32 {'timestamp': 1224341364.2913489, 'data': 4} 1224341365.32 {'timestamp': 1224341365.291404, 'data': 5} 1224341366.3 {'timestamp': 1224341366.291467, 'data': 6} 1224341367.32 {'timestamp': 1224341367.291549, 'data': 7} 1224341368.34 {'timestamp': 1224341368.291626, 'data': 8} 1224341369.34 {'timestamp': 1224341369.2921841, 'data': 9} Timed out. Scheduler done.