Package lamson :: Module queue
[hide private]
[frames] | no frames]

Source Code for Module lamson.queue

  1  """ 
  2  Simpler queue management than the regular mailbox.Maildir stuff.  You 
  3  do get a lot more features from the Python library, so if you need 
  4  to do some serious surgery go use that.  This works as a good 
  5  API for the 90% case of "put mail in, get mail out" queues. 
  6  """ 
  7   
  8  import mailbox 
  9  from lamson import mail 
 10  import hashlib 
 11  import socket 
 12  import time 
 13  import os 
 14  import errno 
 15  import logging 
 16   
 17  # we calculate this once, since the hostname shouldn't change for every 
 18  # email we put in a queue 
 19  HASHED_HOSTNAME = hashlib.md5(socket.gethostname()).hexdigest() 
 20   
21 -class SafeMaildir(mailbox.Maildir):
22 - def _create_tmp(self):
23 now = time.time() 24 uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(), 25 mailbox.Maildir._count, HASHED_HOSTNAME) 26 path = os.path.join(self._path, 'tmp', uniq) 27 try: 28 os.stat(path) 29 except OSError, e: 30 if e.errno == errno.ENOENT: 31 mailbox.Maildir._count += 1 32 try: 33 return mailbox._create_carefully(path) 34 except OSError, e: 35 if e.errno != errno.EEXIST: 36 raise 37 else: 38 raise 39 40 # Fall through to here if stat succeeded or open raised EEXIST. 41 raise mailbox.ExternalClashError('Name clash prevented file creation: %s' % path)
42 43
44 -class QueueError(Exception):
45
46 - def __init__(self, msg, data):
47 Exception.__init__(self, msg) 48 self._message = msg 49 self.data = data
50 51
52 -class Queue(object):
53 """ 54 Provides a simplified API for dealing with 'queues' in Lamson. 55 It currently just supports maildir queues since those are the 56 most robust, but could implement others later. 57 """ 58
59 - def __init__(self, queue_dir, safe=False, pop_limit=0, oversize_dir=None):
60 """ 61 This gives the Maildir queue directory to use, and whether you want 62 this Queue to use the SafeMaildir variant which hashes the hostname 63 so you can expose it publicly. 64 65 The pop_limit and oversize_queue both set a upper limit on the mail 66 you pop out of the queue. The size is checked before any Lamson 67 processing is done and is based on the size of the file on disk. The 68 purpose is to prevent people from sending 10MB attachments. If a 69 message is over the pop_limit then it is placed into the 70 oversize_dir (which should be a maildir). 71 72 The oversize protection only works on pop messages off, not 73 putting them in, get, or any other call. If you use get you can 74 use self.oversize to also check if it's oversize manually. 75 """ 76 self.dir = queue_dir 77 78 if safe: 79 self.mbox = SafeMaildir(queue_dir) 80 else: 81 self.mbox = mailbox.Maildir(queue_dir) 82 83 self.pop_limit = pop_limit 84 85 if oversize_dir: 86 if not os.path.exists(oversize_dir): 87 osmb = mailbox.Maildir(oversize_dir) 88 89 self.oversize_dir = os.path.join(oversize_dir, "new") 90 91 if not os.path.exists(self.oversize_dir): 92 os.mkdir(self.oversize_dir) 93 else: 94 self.oversize_dir = None
95
96 - def push(self, message):
97 """ 98 Pushes the message onto the queue. Remember the order is probably 99 not maintained. It returns the key that gets created. 100 """ 101 return self.mbox.add(str(message))
102
103 - def pop(self):
104 """ 105 Pops a message off the queue, order is not really maintained 106 like a stack. 107 108 It returns a (key, message) tuple for that item. 109 """ 110 for key in self.mbox.iterkeys(): 111 over, over_name = self.oversize(key) 112 113 if over: 114 if self.oversize_dir: 115 logging.info("Message key %s over size limit %d, moving to %s.", 116 key, self.pop_limit, self.oversize_dir) 117 os.rename(over_name, os.path.join(self.oversize_dir, key)) 118 else: 119 logging.info("Message key %s over size limit %d, DELETING (set oversize_dir).", 120 key, self.pop_limit) 121 os.unlink(over_name) 122 else: 123 try: 124 msg = self.get(key) 125 except QueueError, exc: 126 logging.exception("Failed to parse message %r garbage.", key) 127 finally: 128 self.remove(key) 129 return key, msg 130 131 return None, None
132
133 - def get(self, key):
134 """ 135 Get the specific message referenced by the key. The message is NOT 136 removed from the queue. 137 """ 138 msg_file = self.mbox.get_file(key) 139 140 if not msg_file: 141 return None 142 143 msg_data = msg_file.read() 144 145 try: 146 return mail.MailRequest(self.dir, None, None, msg_data) 147 except Exception, exc: 148 raise QueueError("Failed to decode message: %s" % exc, msg_data)
149 150
151 - def remove(self, key):
152 """Removes the queue, but not returned.""" 153 self.mbox.remove(key)
154
155 - def count(self):
156 """Returns the number of messages in the queue.""" 157 return len(self.mbox)
158
159 - def clear(self):
160 """ 161 Clears out the contents of the entire queue. 162 Warning: This could be horribly inefficient since it 163 basically pops until the queue is empty. 164 """ 165 # man this is probably a really bad idea 166 while self.count() > 0: 167 self.pop()
168
169 - def keys(self):
170 """ 171 Returns the keys in the queue. 172 """ 173 return self.mbox.keys()
174
175 - def oversize(self, key):
176 if self.pop_limit: 177 file_name = os.path.join(self.dir, "new", key) 178 return os.path.getsize(file_name) > self.pop_limit, file_name 179 else: 180 return False, None
181