Package lamson :: Module server
[hide private]
[frames] | no frames]

Source Code for Module lamson.server

  1  """ 
  2  The majority of the server related things Lamson needs to run, like receivers,  
  3  relays, and queue processors. 
  4  """ 
  5   
  6  import smtplib 
  7  import smtpd 
  8  import asyncore 
  9  import threading 
 10  import socket 
 11  import logging 
 12  from lamson import queue, mail, routing 
 13  import time 
 14  import traceback 
 15  from lamson.bounce import PRIMARY_STATUS_CODES, SECONDARY_STATUS_CODES, COMBINED_STATUS_CODES 
 16   
 17   
18 -def undeliverable_message(raw_message, failure_type):
19 """ 20 Used universally in this file to shove totally screwed messages 21 into the routing.Router.UNDELIVERABLE_QUEUE (if it's set). 22 """ 23 if routing.Router.UNDELIVERABLE_QUEUE: 24 key = routing.Router.UNDELIVERABLE_QUEUE.push(raw_message) 25 26 logging.error("Failed to deliver message because of %r, put it in " 27 "undeliverable queue with key %r", failure_type, key)
28
29 -class SMTPError(Exception):
30 """ 31 You can raise this error when you want to abort with a SMTP error code to 32 the client. This is really only relevant when you're using the 33 SMTPReceiver and the client understands the error. 34 35 If you give a message than it'll use that, but it'll also produce a 36 consistent error message based on your code. It uses the errors in 37 lamson.bounce to produce them. 38 """
39 - def __init__(self, code, message=None):
40 self.code = code 41 self.message = message or self.error_for_code(code) 42 43 Exception.__init__(self, "%d %s" % (self.code, self.message))
44
45 - def error_for_code(self, code):
46 primary, secondary, tertiary = str(code) 47 48 primary = PRIMARY_STATUS_CODES.get(primary, "") 49 secondary = SECONDARY_STATUS_CODES.get(secondary, "") 50 combined = COMBINED_STATUS_CODES.get(primary + secondary, "") 51 52 return " ".join([primary, secondary, combined]).strip()
53 54
55 -class Relay(object):
56 """ 57 Used to talk to your "relay server" or smart host, this is probably the most 58 important class in the handlers next to the lamson.routing.Router. 59 It supports a few simple operations for sending mail, replying, and can 60 log the protocol it uses to stderr if you set debug=1 on __init__. 61 """
62 - def __init__(self, host='127.0.0.1', port=25, username=None, password=None, 63 ssl=False, starttls=False, debug=0):
64 """ 65 The hostname and port we're connecting to, and the debug level (default to 0). 66 Optional username and password for smtp authentication. 67 If ssl is True smtplib.SMTP_SSL will be used. 68 If starttls is True (and ssl False), smtp connection will be put in TLS mode. 69 It does the hard work of delivering messages to the relay host. 70 """ 71 self.hostname = host 72 self.port = port 73 self.debug = debug 74 self.username = username 75 self.password = password 76 self.ssl = ssl 77 self.starttls = starttls
78
79 - def configure_relay(self, hostname):
80 if self.ssl: 81 relay_host = smtplib.SMTP_SSL(hostname, self.port) 82 else: 83 relay_host = smtplib.SMTP(hostname, self.port) 84 85 relay_host.set_debuglevel(self.debug) 86 87 if self.starttls: 88 relay_host.starttls() 89 if self.username and self.password: 90 relay_host.login(self.username, self.password) 91 92 assert relay_host, 'Code error, tell Zed.' 93 return relay_host
94
95 - def deliver(self, message, To=None, From=None):
96 """ 97 Takes a fully formed email message and delivers it to the 98 configured relay server. 99 100 You can pass in an alternate To and From, which will be used in the 101 SMTP send lines rather than what's in the message. 102 """ 103 recipient = To or message['To'] 104 sender = From or message['From'] 105 106 hostname = self.hostname or self.resolve_relay_host(recipient) 107 108 try: 109 relay_host = self.configure_relay(hostname) 110 except socket.error: 111 logging.exception("Failed to connect to host %s:%d" % (hostname, self.port)) 112 return 113 114 relay_host.sendmail(sender, recipient, str(message)) 115 relay_host.quit()
116
117 - def resolve_relay_host(self, To):
118 import DNS 119 address, target_host = To.split('@') 120 mx_hosts = DNS.mxlookup(target_host) 121 122 if not mx_hosts: 123 logging.debug("Domain %r does not have an MX record, using %r instead.", target_host, target_host) 124 return target_host 125 else: 126 logging.debug("Delivering to MX record %r for target %r", mx_hosts[0], target_host) 127 return mx_hosts[0][1]
128 129
130 - def __repr__(self):
131 """Used in logging and debugging to indicate where this relay goes.""" 132 return "<Relay to (%s:%d)>" % (self.hostname, self.port)
133 134
135 - def reply(self, original, From, Subject, Body):
136 """Calls self.send but with the from and to of the original message reversed.""" 137 self.send(original['from'], From=From, Subject=Subject, Body=Body)
138
139 - def send(self, To, From, Subject, Body):
140 """ 141 Does what it says, sends an email. If you need something more complex 142 then look at lamson.mail.MailResponse. 143 """ 144 msg = mail.MailResponse(To=To, From=From, Subject=Subject, Body=Body) 145 self.deliver(msg)
146 147 148
149 -class SMTPReceiver(smtpd.SMTPServer):
150 """Receives emails and hands it to the Router for further processing.""" 151
152 - def __init__(self, host='127.0.0.1', port=8825):
153 """ 154 Initializes to bind on the given port and host/ipaddress. Typically 155 in deployment you'd give 0.0.0.0 for "all internet devices" but consult 156 your operating system. 157 158 This uses smtpd.SMTPServer in the __init__, which means that you have to 159 call this far after you use python-daemonize or else daemonize will 160 close the socket. 161 """ 162 self.host = host 163 self.port = port 164 smtpd.SMTPServer.__init__(self, (self.host, self.port), None)
165
166 - def start(self):
167 """ 168 Kicks everything into gear and starts listening on the port. This 169 fires off threads and waits until they are done. 170 """ 171 logging.info("SMTPReceiver started on %s:%d." % (self.host, self.port)) 172 self.poller = threading.Thread(target=asyncore.loop, 173 kwargs={'timeout':0.1, 'use_poll':True}) 174 self.poller.start()
175
176 - def process_message(self, Peer, From, To, Data):
177 """ 178 Called by smtpd.SMTPServer when there's a message received. 179 """ 180 181 try: 182 logging.debug("Message received from Peer: %r, From: %r, to To %r." % (Peer, From, To)) 183 routing.Router.deliver(mail.MailRequest(Peer, From, To, Data)) 184 except SMTPError, err: 185 # looks like they want to return an error, so send it out 186 return str(err) 187 undeliverable_message(Data, "Handler raised SMTPError on purpose: %s" % err) 188 except: 189 logging.exception("Exception while processing message from Peer: %r, From: %r, to To %r." % 190 (Peer, From, To)) 191 undeliverable_message(Data, "Error in message %r:%r:%r, look in logs." % (Peer, From, To))
192 193
194 - def close(self):
195 """Doesn't do anything except log who called this, since nobody should. Ever.""" 196 logging.error(traceback.format_exc())
197 198
199 -class QueueReceiver(object):
200 """ 201 Rather than listen on a socket this will watch a queue directory and 202 process messages it recieves from that. It works in almost the exact 203 same way otherwise. 204 """ 205
206 - def __init__(self, queue_dir, sleep=10, size_limit=0, oversize_dir=None):
207 """ 208 The router should be fully configured and ready to work, the 209 queue_dir can be a fully qualified path or relative. 210 """ 211 self.queue = queue.Queue(queue_dir, pop_limit=size_limit, 212 oversize_dir=oversize_dir) 213 self.queue_dir = queue_dir 214 self.sleep = sleep
215
216 - def start(self, one_shot=False):
217 """ 218 Start simply loops indefinitely sleeping and pulling messages 219 off for processing when they are available. 220 221 If you give one_shot=True it will run once rather than do a big 222 while loop with a sleep. 223 """ 224 225 logging.info("Queue receiver started on queue dir %s" % 226 (self.queue_dir)) 227 logging.debug("Sleeping for %d seconds..." % self.sleep) 228 229 inq = queue.Queue(self.queue_dir) 230 231 while True: 232 key = None 233 234 try: 235 key, msg = inq.pop() 236 237 while key: 238 logging.debug("Pulled message with key: %r off", key) 239 self.process_message(msg) 240 key, msg = inq.pop() 241 242 if one_shot: 243 return 244 else: 245 time.sleep(self.sleep) 246 247 except: 248 logging.exception("Error popping from the queue, this might be a problem.") 249 undeliverable_message(exc.data, exc._message) 250 time.sleep(self.sleep)
251
252 - def process_message(self, msg):
253 """ 254 Exactly the same as SMTPReceiver.process_message but just designed for the queue's 255 quirks. 256 """ 257 258 try: 259 Peer = self.queue_dir # this is probably harmless but I should check it 260 From = msg['from'] 261 To = [msg['to']] 262 263 logging.debug("Message received from Peer: %r, From: %r, to To %r." % (Peer, From, To)) 264 routing.Router.deliver(msg) 265 except SMTPError, err: 266 # looks like they want to return an error, so send it out 267 logging.exception("Raising SMTPError when running in a QueueReceiver is unsupported.") 268 undeliverable_message(msg.original, err.message) 269 except: 270 logging.exception("Exception while processing message from Peer: " 271 "%r, From: %r, to To %r." % (Peer, From, To)) 272 undeliverable_message(msg.original, "Router failed to catch exception.")
273