I just released a new reusable Django app called django-adminbrowse on GitHub. This was factored out of a Django project I’ve been working on for a while.
Ever wish it was easier to get around in the Django admin? Maybe you’ve deployed the admin site as a management tool for some less technical users? This project lets you easily spiff up changelist pages to include better text and markup. I’ve been using it in production for a while and it’s been a welcome improvement.
In the screenshot (click to zoom), you’ll notice:
URL fields become clickable links.
Foreign key fields link to the change form for the corresponding object.
Related objects get a link to their changelist page, filtered appropriately.
There are a few more features, but the most powerful is that the code is very easily extensible. If you ever wanted to include a custom markup in your changelist columns, this makes it really easy.
See the README or use Python’s help() for starters. More advanced documentation is in progress.
Python descriptors are great for customizing access to attributes on a class or instance. They are a big win for tasks like mapping Python objects to data from non-Python sources (such as SQL), since mapped attributes will need to be encoded/decoded and connected to other attributes in some way.
Below is a very simple descriptor; as you can see, accessing it from both the class and the instance invoke the descriptor protocol:
class Test(object):
pass
class Descriptor(object):
def __get__(self, instance, owner):
return "Hello, world."
>>> Test.x = Descriptor()
>>> Test.x
'Hello, world.'
>>> test = Test()
>>> test.x
'Hello, world.'
However, in order to add descriptors to an object, they must be added to the object’s class. Descriptors added to an instance do not invoke the descriptor protocol:
>>> test.y = Descriptor()
>>> test.y
<__main__.Descriptor object at 0x16fe810>
This means that creating an instance with dynamic (determined at runtime) descriptors requires either the heavy-handed approach of generating a class just for that object (since adding descriptors to its class will add them to all other instances of the class), or the ad-hoc approach of redefining getattr/setattr behavior (essentially re-implementing your own descriptor protocol).
It turns out the latter approach is not as messy as it first sounds. Below is a class that enables “instance descriptors”:
class InstanceDescriptorMixin(object):
def __getattribute__(self, name):
value = object.__getattribute__(self, name)
if hasattr(value, '__get__'):
value = value.__get__(self, self.__class__)
return value
def __setattr__(self, name, value):
try:
obj = object.__getattribute__(self, name)
except AttributeError:
pass
else:
if hasattr(obj, '__set__'):
return obj.__set__(self, value)
return object.__setattr__(self, name, value)
class Test(InstanceDescriptorMixin):
pass
>>> test = Test()
>>> test.z = Descriptor()
>>> test.z
'Hello, world.'
After suffering from over a year of poor maintenance, geopy is finally
getting some love this month. A few other developers and I will be
focusing on geopy at this month’s Cleveland Code Co-op meeting.
We’ve come up with an ambitious todo list, including:
Reverse geocoding support (finding locations near a point)
Higher level Points and Locations (instead of tuples and strings)
Keeping up with third-party geocoder APIs (and hacks)
A “compound” geocoder for querying multiple geocoders (as
fallbacks or for averaging results)
A parser module with support for geotagged documents (including the
Geo microformat), ISO 6709, GPX files, etc.
Geohash encoding/decoding
A formatter module for pretty-printing coordinates, distances, and
ordinal directions (think “south by southwest”)
setuptools entry points to support geocoder plugins and discovery
I think these features are in line with the “geocoding toolbox” goal
of the project. While there are a lot of features there, I think
geopy will still feel like a nice compact library.
Why does geopy deserve some developer attention? Because it’s being
used in numerous interesting ways, including: directing robots at Carnegie Mellon
University, calculating stream lengths for the U.S. Geological
Survey, and updating address data for the Barack Obama presidential campaign.
We’ll be sprinting on Sunday, November 16th. If anyone would like
to join us in person or on IRC, please get in touch!
Here’s a more flexible version of the message queue in my last post. This version uses the threading module instead of processing, so it has no dependencies. See the new example after the code.
"""
Simple message queue.
Messages are scheduled and processed in a single worker thread spawned
from the main process. Thus, events are enqueued asynchronously, but
processed in a linear fashion.
"""
import time
import sched
from Queue import Queue, Empty
from threading import Thread
def delay_put(duration, queue, message):
time.sleep(duration)
queue.put(message)
def run_scheduler(scheduler):
scheduler.run()
class Scheduler(sched.scheduler):
def __init__(self, queue, handler, timeout):
self.message_queue = queue
self.handler = handler
self.timeout = timeout
sched.scheduler.__init__(self, time.time, self.delay)
def delay(self, duration):
queue = self.message_queue
if duration > 0:
# Spawn a process that will sleep, enqueue None, and exit.
Thread(target=delay_put, args=(duration, queue, None)).start()
try:
message = queue.get(True, duration + self.timeout) # Block!
except Empty:
self.timed_out()
else:
if message is not None:
# A message was enqueued during the delay.
timestamp = message.get('timestamp', self.timefunc())
priority = message.get('priority', 1)
self.enterabs(timestamp, priority, self.handler, (message,))
def timed_out(self):
print "Timed out."
def startup(self):
print "Starting scheduler!"
def shutdown(self):
print "Scheduler done."
def run(self):
# Schedule the `startup` event to trigger `delayfunc`.
self.enter(0, 0, self.startup, ())
sched.scheduler.run(self)
self.shutdown()
class MessageQueue(object):
def __init__(self, handler, timeout=10, scheduler_class=Scheduler):
self.queue = Queue()
self.scheduler = scheduler_class(self.queue, handler, timeout)
self.worker = None
def enqueue(self, message):
self.queue.put(message)
if not self.working():
self.start_worker()
def start_worker(self):
self.worker = Thread(target=run_scheduler, args=(self.scheduler,))
self.worker.start()
def working(self):
return self.worker is not None and self.worker.isAlive()
Here’s a very simple message queue using Python’s sched module and processing (available as multiprocessing in Python 2.6). This lets you asynchronously schedule events to occur at a specific time. It would be very easy to modify this to process messages with a pool of workers, or use threading instead of processing. There is one thing I could use lazyweb’s help with: find places in the code where I need to use a lock or where I am ignoring these guidelines.
"""
Simple message queue.
Messages are scheduled and processed in a single worker process spawned
from the main process. Thus, events are enqueued asynchronously, but
processed in a linear fashion.
"""
import sched
import time
from processing import Queue, Process
from processing.queue import Empty
def delay_put(duration, queue, message):
time.sleep(duration)
queue.put(message)
queue.close()
class Scheduler(sched.scheduler):
def __init__(self, queue, handler):
delayfunc = self.make_delay_func(queue, handler)
sched.scheduler.__init__(self, time.time, delayfunc)
def make_delay_func(self, queue, handler):
def delay(duration):
if duration > 0:
# Spawn a process that will sleep, enqueue None, and exit.
Process(target=delay_put, args=(duration, queue, None)).start()
try:
message = queue.get(True, duration + TIMEOUT) # Block!
except Empty:
print "Timed out."
else:
if message is not None:
# A message was enqueued during the delay.
timestamp = message.get('timestamp', time.time())
priority = message.get('priority', 1)
self.enterabs(timestamp, priority, handler, (message,))
return delay
def startup(self):
print "Starting scheduler!"
def run(self):
# Schedule the `startup` event to trigger `delayfunc`.
self.enter(0, 0, self.startup, ())
sched.scheduler.run(self)
def handle(message):
print "[%s] MESSAGE: %s" % (time.time(), message)
def run_scheduler(scheduler):
scheduler.run()
print "Scheduler done."
QUEUE = Queue() # Message queue. Use `enqueue` to add messages.
TIMEOUT = 10 # Seconds for scheduler to wait for items in queue.
SCHEDULER = Scheduler(QUEUE, handle) # Message handler scheduler.
PROCESS = None # Process running the scheduler.
def enqueue(message):
global PROCESS
QUEUE.put(message)
if PROCESS is None or PROCESS.getExitCode() is not None:
# There is no scheduler process running; start one.
PROCESS = Process(target=run_scheduler, args=(SCHEDULER,))
PROCESS.start()