Simple scheduled message queue in Python
Here’s a very simple message queue using Python’s sched module and processing (available as multiprocessing in Python 2.6). This lets you asynchronously schedule events to occur at a specific time. It would be very easy to modify this to process messages with a pool of workers, or use threading instead of processing. There is one thing I could use lazyweb’s help with: find places in the code where I need to use a lock or where I am ignoring these guidelines.
Update: Here’s a cleaned up version using threads.
"""
Simple message queue.
Messages are scheduled and processed in a single worker process spawned
from the main process. Thus, events are enqueued asynchronously, but
processed in a linear fashion.
"""
import sched
import time
from processing import Queue, Process
from processing.queue import Empty
def delay_put(duration, queue, message):
time.sleep(duration)
queue.put(message)
queue.close()
class Scheduler(sched.scheduler):
def __init__(self, queue, handler):
delayfunc = self.make_delay_func(queue, handler)
sched.scheduler.__init__(self, time.time, delayfunc)
def make_delay_func(self, queue, handler):
def delay(duration):
if duration > 0:
# Spawn a process that will sleep, enqueue None, and exit.
Process(target=delay_put, args=(duration, queue, None)).start()
try:
message = queue.get(True, duration + TIMEOUT) # Block!
except Empty:
print "Timed out."
else:
if message is not None:
# A message was enqueued during the delay.
timestamp = message.get('timestamp', time.time())
priority = message.get('priority', 1)
self.enterabs(timestamp, priority, handler, (message,))
return delay
def startup(self):
print "Starting scheduler!"
def run(self):
# Schedule the `startup` event to trigger `delayfunc`.
self.enter(0, 0, self.startup, ())
sched.scheduler.run(self)
def handle(message):
print "[%s] MESSAGE: %s" % (time.time(), message)
def run_scheduler(scheduler):
scheduler.run()
print "Scheduler done."
QUEUE = Queue() # Message queue. Use `enqueue` to add messages.
TIMEOUT = 10 # Seconds for scheduler to wait for items in queue.
SCHEDULER = Scheduler(QUEUE, handle) # Message handler scheduler.
PROCESS = None # Process running the scheduler.
def enqueue(message):
global PROCESS
QUEUE.put(message)
if PROCESS is None or PROCESS.getExitCode() is not None:
# There is no scheduler process running; start one.
PROCESS = Process(target=run_scheduler, args=(SCHEDULER,))
PROCESS.start()
Here’s a usage example:
>>> import time
>>> enqueue({'data': 1})
Starting scheduler!
[2008-10-17 14:33:56.212] MESSAGE: {'data': 1}
>>> enqueue({'data': 3, 'timestamp': time.time() + 10})
>>> enqueue({'data': 2, 'timestamp': time.time() + 7})
>>> enqueue({'data': 4, 'timestamp': time.time() + 15})
>>> time.sleep(26)
[2008-10-17 14:34:03.221] MESSAGE: {'timestamp': 1224268443.219, 'data': 2}
[2008-10-17 14:34:06.217] MESSAGE: {'timestamp': 1224268446.215, 'data': 3}
[2008-10-17 14:34:11.225] MESSAGE: {'timestamp': 1224268451.222, 'data': 4}
Timed out.
Scheduler done.
>>> enqueue({'data': 5, 'timestamp': time.time() + 5})
Starting scheduler!
[2008-10-17 14:34:27.233] MESSAGE: {'timestamp': 1224268467.232, 'data': 5}
Timed out.
Scheduler done.