1 """
2 The majority of the server related things Lamson needs to run, like receivers,
3 relays, and queue processors.
4 """
5
6 import smtplib
7 import smtpd
8 import asyncore
9 import threading
10 import socket
11 import logging
12 from lamson import queue, mail, routing
13 import time
14 import traceback
15 from lamson.bounce import PRIMARY_STATUS_CODES, SECONDARY_STATUS_CODES, COMBINED_STATUS_CODES
16
17
19 """
20 Used universally in this file to shove totally screwed messages
21 into the routing.Router.UNDELIVERABLE_QUEUE (if it's set).
22 """
23 if routing.Router.UNDELIVERABLE_QUEUE:
24 key = routing.Router.UNDELIVERABLE_QUEUE.push(raw_message)
25
26 logging.error("Failed to deliver message because of %r, put it in "
27 "undeliverable queue with key %r", failure_type, key)
28
30 """
31 You can raise this error when you want to abort with a SMTP error code to
32 the client. This is really only relevant when you're using the
33 SMTPReceiver and the client understands the error.
34
35 If you give a message than it'll use that, but it'll also produce a
36 consistent error message based on your code. It uses the errors in
37 lamson.bounce to produce them.
38 """
40 self.code = code
41 self.message = message or self.error_for_code(code)
42
43 Exception.__init__(self, "%d %s" % (self.code, self.message))
44
53
54
56 """
57 Used to talk to your "relay server" or smart host, this is probably the most
58 important class in the handlers next to the lamson.routing.Router.
59 It supports a few simple operations for sending mail, replying, and can
60 log the protocol it uses to stderr if you set debug=1 on __init__.
61 """
62 - def __init__(self, host='127.0.0.1', port=25, username=None, password=None,
63 ssl=False, starttls=False, debug=0):
64 """
65 The hostname and port we're connecting to, and the debug level (default to 0).
66 Optional username and password for smtp authentication.
67 If ssl is True smtplib.SMTP_SSL will be used.
68 If starttls is True (and ssl False), smtp connection will be put in TLS mode.
69 It does the hard work of delivering messages to the relay host.
70 """
71 self.hostname = host
72 self.port = port
73 self.debug = debug
74 self.username = username
75 self.password = password
76 self.ssl = ssl
77 self.starttls = starttls
78
94
95 - def deliver(self, message, To=None, From=None):
96 """
97 Takes a fully formed email message and delivers it to the
98 configured relay server.
99
100 You can pass in an alternate To and From, which will be used in the
101 SMTP send lines rather than what's in the message.
102 """
103 recipient = To or message['To']
104 sender = From or message['From']
105
106 hostname = self.hostname or self.resolve_relay_host(recipient)
107
108 try:
109 relay_host = self.configure_relay(hostname)
110 except socket.error:
111 logging.exception("Failed to connect to host %s:%d" % (hostname, self.port))
112 return
113
114 relay_host.sendmail(sender, recipient, str(message))
115 relay_host.quit()
116
118 import DNS
119 address, target_host = To.split('@')
120 mx_hosts = DNS.mxlookup(target_host)
121
122 if not mx_hosts:
123 logging.debug("Domain %r does not have an MX record, using %r instead.", target_host, target_host)
124 return target_host
125 else:
126 logging.debug("Delivering to MX record %r for target %r", mx_hosts[0], target_host)
127 return mx_hosts[0][1]
128
129
131 """Used in logging and debugging to indicate where this relay goes."""
132 return "<Relay to (%s:%d)>" % (self.hostname, self.port)
133
134
135 - def reply(self, original, From, Subject, Body):
136 """Calls self.send but with the from and to of the original message reversed."""
137 self.send(original['from'], From=From, Subject=Subject, Body=Body)
138
139 - def send(self, To, From, Subject, Body):
140 """
141 Does what it says, sends an email. If you need something more complex
142 then look at lamson.mail.MailResponse.
143 """
144 msg = mail.MailResponse(To=To, From=From, Subject=Subject, Body=Body)
145 self.deliver(msg)
146
147
148
150 """Receives emails and hands it to the Router for further processing."""
151
152 - def __init__(self, host='127.0.0.1', port=8825):
153 """
154 Initializes to bind on the given port and host/ipaddress. Typically
155 in deployment you'd give 0.0.0.0 for "all internet devices" but consult
156 your operating system.
157
158 This uses smtpd.SMTPServer in the __init__, which means that you have to
159 call this far after you use python-daemonize or else daemonize will
160 close the socket.
161 """
162 self.host = host
163 self.port = port
164 smtpd.SMTPServer.__init__(self, (self.host, self.port), None)
165
167 """
168 Kicks everything into gear and starts listening on the port. This
169 fires off threads and waits until they are done.
170 """
171 logging.info("SMTPReceiver started on %s:%d." % (self.host, self.port))
172 self.poller = threading.Thread(target=asyncore.loop,
173 kwargs={'timeout':0.1, 'use_poll':True})
174 self.poller.start()
175
177 """
178 Called by smtpd.SMTPServer when there's a message received.
179 """
180
181 try:
182 logging.debug("Message received from Peer: %r, From: %r, to To %r." % (Peer, From, To))
183 routing.Router.deliver(mail.MailRequest(Peer, From, To, Data))
184 except SMTPError, err:
185
186 return str(err)
187 undeliverable_message(Data, "Handler raised SMTPError on purpose: %s" % err)
188 except:
189 logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r." %
190 (Peer, From, To))
191 undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))
192
193
195 """Doesn't do anything except log who called this, since nobody should. Ever."""
196 logging.error(traceback.format_exc())
197
198
200 """
201 Rather than listen on a socket this will watch a queue directory and
202 process messages it recieves from that. It works in almost the exact
203 same way otherwise.
204 """
205
206 - def __init__(self, queue_dir, sleep=10, size_limit=0, oversize_dir=None):
207 """
208 The router should be fully configured and ready to work, the
209 queue_dir can be a fully qualified path or relative.
210 """
211 self.queue = queue.Queue(queue_dir, pop_limit=size_limit,
212 oversize_dir=oversize_dir)
213 self.queue_dir = queue_dir
214 self.sleep = sleep
215
216 - def start(self, one_shot=False):
217 """
218 Start simply loops indefinitely sleeping and pulling messages
219 off for processing when they are available.
220
221 If you give one_shot=True it will run once rather than do a big
222 while loop with a sleep.
223 """
224
225 logging.info("Queue receiver started on queue dir %s" %
226 (self.queue_dir))
227 logging.debug("Sleeping for %d seconds..." % self.sleep)
228
229 inq = queue.Queue(self.queue_dir)
230
231 while True:
232 key = None
233
234 try:
235 key, msg = inq.pop()
236
237 while key:
238 logging.debug("Pulled message with key: %r off", key)
239 self.process_message(msg)
240 key, msg = inq.pop()
241
242 if one_shot:
243 return
244 else:
245 time.sleep(self.sleep)
246
247 except:
248 logging.exception("Error popping from the queue, this might be a problem.")
249 undeliverable_message(exc.data, exc._message)
250 time.sleep(self.sleep)
251
253 """
254 Exactly the same as SMTPReceiver.process_message but just designed for the queue's
255 quirks.
256 """
257
258 try:
259 Peer = self.queue_dir
260 From = msg['from']
261 To = [msg['to']]
262
263 logging.debug("Message received from Peer: %r, From: %r, to To %r." % (Peer, From, To))
264 routing.Router.deliver(msg)
265 except SMTPError, err:
266
267 logging.exception("Raising SMTPError when running in a QueueReceiver is unsupported.")
268 undeliverable_message(msg.original, err.message)
269 except:
270 logging.exception("Exception while processing message from Peer: "
271 "%r, From: %r, to To %r." % (Peer, From, To))
272 undeliverable_message(msg.original, "Router failed to catch exception.")
273