1 """
2 Simpler queue management than the regular mailbox.Maildir stuff. You
3 do get a lot more features from the Python library, so if you need
4 to do some serious surgery go use that. This works as a good
5 API for the 90% case of "put mail in, get mail out" queues.
6 """
7
8 import mailbox
9 from lamson import mail
10 import hashlib
11 import socket
12 import time
13 import os
14 import errno
15 import logging
16
17
18
19 HASHED_HOSTNAME = hashlib.md5(socket.gethostname()).hexdigest()
20
23 now = time.time()
24 uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
25 mailbox.Maildir._count, HASHED_HOSTNAME)
26 path = os.path.join(self._path, 'tmp', uniq)
27 try:
28 os.stat(path)
29 except OSError, e:
30 if e.errno == errno.ENOENT:
31 mailbox.Maildir._count += 1
32 try:
33 return mailbox._create_carefully(path)
34 except OSError, e:
35 if e.errno != errno.EEXIST:
36 raise
37 else:
38 raise
39
40
41 raise mailbox.ExternalClashError('Name clash prevented file creation: %s' % path)
42
43
45
47 Exception.__init__(self, msg)
48 self._message = msg
49 self.data = data
50
51
53 """
54 Provides a simplified API for dealing with 'queues' in Lamson.
55 It currently just supports maildir queues since those are the
56 most robust, but could implement others later.
57 """
58
59 - def __init__(self, queue_dir, safe=False, pop_limit=0, oversize_dir=None):
60 """
61 This gives the Maildir queue directory to use, and whether you want
62 this Queue to use the SafeMaildir variant which hashes the hostname
63 so you can expose it publicly.
64
65 The pop_limit and oversize_queue both set a upper limit on the mail
66 you pop out of the queue. The size is checked before any Lamson
67 processing is done and is based on the size of the file on disk. The
68 purpose is to prevent people from sending 10MB attachments. If a
69 message is over the pop_limit then it is placed into the
70 oversize_dir (which should be a maildir).
71
72 The oversize protection only works on pop messages off, not
73 putting them in, get, or any other call. If you use get you can
74 use self.oversize to also check if it's oversize manually.
75 """
76 self.dir = queue_dir
77
78 if safe:
79 self.mbox = SafeMaildir(queue_dir)
80 else:
81 self.mbox = mailbox.Maildir(queue_dir)
82
83 self.pop_limit = pop_limit
84
85 if oversize_dir:
86 if not os.path.exists(oversize_dir):
87 osmb = mailbox.Maildir(oversize_dir)
88
89 self.oversize_dir = os.path.join(oversize_dir, "new")
90
91 if not os.path.exists(self.oversize_dir):
92 os.mkdir(self.oversize_dir)
93 else:
94 self.oversize_dir = None
95
96 - def push(self, message):
97 """
98 Pushes the message onto the queue. Remember the order is probably
99 not maintained. It returns the key that gets created.
100 """
101 return self.mbox.add(str(message))
102
104 """
105 Pops a message off the queue, order is not really maintained
106 like a stack.
107
108 It returns a (key, message) tuple for that item.
109 """
110 for key in self.mbox.iterkeys():
111 over, over_name = self.oversize(key)
112
113 if over:
114 if self.oversize_dir:
115 logging.info("Message key %s over size limit %d, moving to %s.",
116 key, self.pop_limit, self.oversize_dir)
117 os.rename(over_name, os.path.join(self.oversize_dir, key))
118 else:
119 logging.info("Message key %s over size limit %d, DELETING (set oversize_dir).",
120 key, self.pop_limit)
121 os.unlink(over_name)
122 else:
123 try:
124 msg = self.get(key)
125 except QueueError, exc:
126 logging.exception("Failed to parse message %r garbage.", key)
127 finally:
128 self.remove(key)
129 return key, msg
130
131 return None, None
132
133 - def get(self, key):
134 """
135 Get the specific message referenced by the key. The message is NOT
136 removed from the queue.
137 """
138 msg_file = self.mbox.get_file(key)
139
140 if not msg_file:
141 return None
142
143 msg_data = msg_file.read()
144
145 try:
146 return mail.MailRequest(self.dir, None, None, msg_data)
147 except Exception, exc:
148 raise QueueError("Failed to decode message: %s" % exc, msg_data)
149
150
152 """Removes the queue, but not returned."""
153 self.mbox.remove(key)
154
156 """Returns the number of messages in the queue."""
157 return len(self.mbox)
158
160 """
161 Clears out the contents of the entire queue.
162 Warning: This could be horribly inefficient since it
163 basically pops until the queue is empty.
164 """
165
166 while self.count() > 0:
167 self.pop()
168
170 """
171 Returns the keys in the queue.
172 """
173 return self.mbox.keys()
174
176 if self.pop_limit:
177 file_name = os.path.join(self.dir, "new", key)
178 return os.path.getsize(file_name) > self.pop_limit, file_name
179 else:
180 return False, None
181