#!/usr/bin/env python """ Helpful email utility functions. """ __version__ = '0.2' __author__ = 'Eugene Shumulinsky (exshum@gmail.com)' __copyright__ = 'Copyright (c) 2005 Eugene Shumulinsky' __license__ = 'Python' from __future__ import generators import os import re import email, email.Header from datetime import datetime import time import types RE_FIELD_COMMENT = re.compile(r'\s*\([^)]*\)') PAIR_KEYS = ('from', 'by', 'via', 'with', 'id', 'for') def extract_received_pairs(field): """ Generate 2-tuple for each name/value pair in Received field. Yields (field_name, field_value); all unknown fields are None. The date-time's field_name is "date-time". Comments are stripped except from the date-time. """ # Normalize folded whitespace. field = ' '.join(field.split()) # Pull off the date-time from end of string. try: semi = field.rindex(';') yield ('date-time', field[semi+1:].strip()) field = field[:semi] except ValueError: pass field = RE_FIELD_COMMENT.sub('', field) # Tokenize and yield name/value pairs. field_name = None field_values = [] for t in field.split(): if t in PAIR_KEYS: if field_name is not None and field_values: yield (field_name, ' '.join(field_values)) field_name = t field_values = [] else: field_values.append(t) if field_name is not None and field_values: yield (field_name, ' '.join(field_values)) def make_datetime(date_string, tz=None): """Return a datetime object for date_string or None if unparsable.""" try: date_parts = email.Utils.parsedate_tz(date_string) return datetime.fromtimestamp(email.Utils.mktime_tz(date_parts), tz) except TypeError: return None def get_payload_name(payload): """ Return an email object's (i.e. the payload) name in unicode. Prefers Content-Disposition's "filename" parameter to Content-Type's "name" parameter. """ return unicode(os.path.basename(payload.get_filename() or email.Utils.collapse_rfc2231_value(payload.get_param('name', ''))), 'us-ascii') def collapse_rfc2047_value(header, errors='replace', fallback_charset='us-ascii'): """ Return decoded RFC2047-encoded email header as unicode string. header is the header. errors is same as built-in unicode() function; defaults to 'replace'. fallback_charset specifies the charset to use if the one in the RFC 2047 header is not known; defaults to 'us-ascii'. """ def dec(t): try: return unicode(t[0], t[1] or 'us-ascii', errors) except LookupError: return unicode(t[0], fallback_charset, errors) return ''.join(map(dec, email.Header.decode_header(header))) if __name__ == '__main__': import sys import unittest class MakeDateTime(unittest.TestCase): """Test make_datetime function. Needs some portable tz tests.""" def testValidTimestamp(self): """make_datetime should return datetime object for valid date""" d = make_datetime('Tue Oct 18 19:20:35 2005') self.assertEquals(d, datetime(2005, 10, 18, 19, 20, 35)) def testInvalidTimestamp(self): """make_datetime should return None for invalid date""" d = make_datetime('foo') self.assertEquals(d, None) class UDecodeHeader(unittest.TestCase): """Test collapse_rfc2047_value function.""" def testEncodedHeader(self): """collapse_rfc2047_value should accept properly encoded headers""" u = collapse_rfc2047_value('=?iso-8859-1?Q?fo=f3?=') self.assertEqual(u, u'fo\xf3') def testPlainHeader(self): """collapse_rfc2047_value should accept unencoded ascii strings""" u = collapse_rfc2047_value('foo') self.assertEqual(u, u'foo') def test8bitHeaderStrict(self): """collapse_rfc2047_value should fail with 8bit strings (strict)""" udecode = lambda h: collapse_rfc2047_value(h, 'strict') self.assertRaises(UnicodeDecodeError, udecode, 'fo\xf3') def test8bitHeaderReplace(self): """collapse_rfc2047_value should pass with 8bit strings (replace)""" u = collapse_rfc2047_value('fo\xf3') self.assertEqual(u, u'fo\ufffd') unittest.main() """ import email.Parser msg = email.Parser.Parser().parse(sys.stdin, True) for rh in msg.get_all('received', []): fields = dict(extract_received_pairs(rh)) for k in PAIR_KEYS: print '%s = %s' % (k, fields.get(k, 'not set')) print """