Simple scheduled message queue (with threads)
Here’s a more flexible version of the message queue in my last post. This version uses the threading module instead of processing, so it has no dependencies. See the new example after the code.
"""
Simple message queue.
Messages are scheduled and processed in a single worker thread spawned
from the main process. Thus, events are enqueued asynchronously, but
processed in a linear fashion.
"""
import time
import sched
from Queue import Queue, Empty
from threading import Thread
def delay_put(duration, queue, message):
time.sleep(duration)
queue.put(message)
def run_scheduler(scheduler):
scheduler.run()
class Scheduler(sched.scheduler):
def __init__(self, queue, handler, timeout):
self.message_queue = queue
self.handler = handler
self.timeout = timeout
sched.scheduler.__init__(self, time.time, self.delay)
def delay(self, duration):
queue = self.message_queue
if duration > 0:
# Spawn a process that will sleep, enqueue None, and exit.
Thread(target=delay_put, args=(duration, queue, None)).start()
try:
message = queue.get(True, duration + self.timeout) # Block!
except Empty:
self.timed_out()
else:
if message is not None:
# A message was enqueued during the delay.
timestamp = message.get('timestamp', self.timefunc())
priority = message.get('priority', 1)
self.enterabs(timestamp, priority, self.handler, (message,))
def timed_out(self):
print "Timed out."
def startup(self):
print "Starting scheduler!"
def shutdown(self):
print "Scheduler done."
def run(self):
# Schedule the `startup` event to trigger `delayfunc`.
self.enter(0, 0, self.startup, ())
sched.scheduler.run(self)
self.shutdown()
class MessageQueue(object):
def __init__(self, handler, timeout=10, scheduler_class=Scheduler):
self.queue = Queue()
self.scheduler = scheduler_class(self.queue, handler, timeout)
self.worker = None
def enqueue(self, message):
self.queue.put(message)
if not self.working():
self.start_worker()
def start_worker(self):
self.worker = Thread(target=run_scheduler, args=(self.scheduler,))
self.worker.start()
def working(self):
return self.worker is not None and self.worker.isAlive()
>>> import time
>>> def my_handler(message):
... print time.time(), message
>>> mq = MessageQueue(my_handler)
>>> for i in range(1, 10):
... now = time.time()
... mq.enqueue({'data': i, 'timestamp': now + i})
Starting scheduler!
1224341361.32 {'timestamp': 1224341361.2808199, 'data': 1}
1224341362.3 {'timestamp': 1224341362.2912149, 'data': 2}
1224341363.31 {'timestamp': 1224341363.2913051, 'data': 3}
1224341364.32 {'timestamp': 1224341364.2913489, 'data': 4}
1224341365.32 {'timestamp': 1224341365.291404, 'data': 5}
1224341366.3 {'timestamp': 1224341366.291467, 'data': 6}
1224341367.32 {'timestamp': 1224341367.291549, 'data': 7}
1224341368.34 {'timestamp': 1224341368.291626, 'data': 8}
1224341369.34 {'timestamp': 1224341369.2921841, 'data': 9}
Timed out.
Scheduler done.