Deferred Processing To Queues

As of the 0.9.2 release there is preliminary support for deferring handling of
a mail message to a queue for another process to deal with in a separate
handler. This support is rough at this time, but still useful and not too
difficult to configure. As the feature gets more use it will improve and
hopefully turn into a generic “defer to queue” system in Lamson.

What is meant by “defer to queue” is simply that you take messages your state
function receives and you dump them into a maildir queue. You then have
another separate process read from this queue and do the real work.
Potentially you could have many processes deal with this work, and they could
even be on multiple computers.

A More Concrete Example

Imagine that you have a blog posting system and you want to update a big “front
page index” that shows recent posts by your users. However, you don’t want to
generate this index on every blog post users make, since that could involve
expensive computation and hold up other threads that need to deal with more
urgent email.

The solution is to do the minimum quick processing you can in your POSTING
state function, and then use the
lamson.queue.Queue
to queue up messages meant for “front page indexing”. Here’s how that code
might go:


route("(post_name)osb\\.(host)”)
def POSTING: # do the regular posting to blog thing name, address = parseaddr(message['from’]) post.post(post_name, address, message) msg = view.respond('page_ready.msg’, locals()) relay.deliver(msg)

# drop the message off into the 'posts’ queue for later index_q = queue.Queue(“run/posts”) index_q.push(message) return POSTING

You can see that you just drop it into the queue with push(message) and it’s
done. What you don’t see is how this then gets picked up by another process to
actually do somehing with this email.

Configuring A config/queue.py

In Lamson you are given control over how your software boots, which gives you
the ability to configure extra services how you need. By default the lamson gen command just outputs a basic config/boot.py and config/testing.py file
so you can get working, and these will work for most development purposes.

In this tutorial you get to write a new boot configuration and tell Lamson how
to use it. We’ll be copying the original boot file over first:

$ cp config/boot.py config/queue.py

Next you want to edit this file so that instead of running an
SMTPReceiver
it will use a
QueueReceiver
configured to pull out of the run/posts queue you are using in your POSTING
handler.


...
  1. where to listen for incoming messages

settings.receiver = QueueReceiver(settings.queue_config['queue’], settings.queue_config['sleep’])

settings.database = configure_database(settings.database_config, also_create=False)

Router.defaults(**settings.router_defaults)

  1. NOTE: this is using a different handlers variable in settings

Router.load(settings.queue_handlers)
Router.RELOAD=True

I’ve removed the code above the … and below it since it’s the same in the two
files. Notice that you have a QueueReceiver now, and that you are telling
the
Router
that it will use settings.queue_handlers for the list of handlers to load and
run.

You now add these two lines to your config/settings.py:


...
  1. this is for when you run the config.queue boot

queue_config = {'queue’: 'run/posts’, 'sleep’: 10}

queue_handlers = ['app.handlers.index’]

The queue_config variable is read by the config/queue.py file for the
QueueReceiver and the queue_handlers is fed to the Router as described
above.

Writing The Index Handler

You now have to write a new handler that is in app/handlers/index.py so that
this config.queue boot setup will load it and run it whenever a message hits
the run/queue. Here’s the code:


from lamson import queue
from lamson.routing import route, stateless
import logging

route("(post_name)osb\\.(host)”)
@stateless
def START: logging.debug(“Got message from %s”, message['from’])

This simple demonstration will just log what messages it receives so you can
make sure it is working.

There are two points to notice about this handler. First, it is marked
stateless because it will run independent of the regular Lamson server, and
you don’t want its parallel operations to interfere with your normal server’s
state operations. Second, it uses a Router.defaults named post_name that
you would add to your config.settings.router_defaults.

Once you have all this slightly complicated setup done you are ready to test
it.

Also note that the examples in the source releases have code
that does a deferred queue similar to this. Go look there for more code to
steal.

Running Your Queue Receiver

Run your logger and lamson server like normal:

$ lamson log
$ lamson start

Next, go look in your logs and make sure it works by running your unit
tests:


$ nosetests
….............
———————————————————————————————————
Ran 16 tests in 1.346s

OK

Your logs should look normal, but now you should see some files in the
run/posts/new directory:

$ ls run/posts/new/
1244080328.M408474P3147Q4.mycomputer.local

That’s the results of your POSTING handler putting the messages it receives
into your run/posts maildir queue.

Finally, you’ll want to run your queue receiver:

$ lamson start -boot config.queue -pid run/queue.pid

If you’re running the code given above then you should see this in the
logs/lamson.log file:

...
DEBUG:root:Sleeping for 10 seconds…
DEBUG:root:Pulled message with key:
’1244080328.M408474P3147Q4.zed-shaws-macbook.local’ off
DEBUG:root:Message received from Peer: 'run/posts’, From:
'sender-1244080328.22@sender.com’, to To
['test.blog.1244080328@osb.test.com’].
DEBUG:root:Got message from sender-1244080328.22@sender.com
DEBUG:root:Message to test.blog.1244080328@osb.test.com was handled by
app.handlers.index.START

Which means your queue receiver is running. You could in theory run as many
of these as you wanted, as long as their handlers are stateless.

When you’re done you can stop the whole setup with the following command:

$ lamson stop -ALL run
Stopping processes with the following PID files:
['run/log.pid’, 'run/queue.pid’, 'run/smtp.pid’]
Attempting to stop lamson at pid 3092
Attempting to stop lamson at pid 3157
Attempting to stop lamson at pid 3096

Further Advanced Usage

This configuration is debatable whether it is very usable or not, but it works
and will improve as the project continues. To give you some ideas of what you
can do with it:

  1. Defer activity to other machines or processes.
  2. Receive messages from other mail systems that know maildir.
  3. Deliver messages to other maildir aware systems.
  4. Process messages from a web application, and possibly even generic work.

It might also be possible to actually make your state functions transition to
the queue handler states by simply having the function return the
module.FUNCTION that should be next. Take care with this though as it means
your end user’s actions are effectively blocked for that event until the next
run of the queue receiver.

Call For Suggestions

Feel free to offer suggestions in improving this setup (or even better code).