���ѧۧݧ�ӧ�� �ާ֧ߧ֧էا֧� - ���֧էѧܧ�ڧ��ӧѧ�� - /home/ukubnwwtacc0unt/chapelbellstudios.com/uploads/cover/email.tar
���ѧ٧ѧ�
message.py 0000644 00000074003 15204103444 0006543 0 ustar 00 # Copyright (C) 2001-2006 Python Software Foundation # Author: Barry Warsaw # Contact: email-sig@python.org """Basic message object for the email package object model.""" __all__ = ['Message'] import re import uu import binascii import warnings from cStringIO import StringIO # Intrapackage imports import email.charset from email import utils from email import errors SEMISPACE = '; ' # Regular expression that matches `special' characters in parameters, the # existence of which force quoting of the parameter value. tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]') # Helper functions def _splitparam(param): # Split header parameters. BAW: this may be too simple. It isn't # strictly RFC 2045 (section 5.1) compliant, but it catches most headers # found in the wild. We may eventually need a full fledged parser # eventually. a, sep, b = param.partition(';') if not sep: return a.strip(), None return a.strip(), b.strip() def _formatparam(param, value=None, quote=True): """Convenience function to format and return a key=value pair. This will quote the value if needed or if quote is true. If value is a three tuple (charset, language, value), it will be encoded according to RFC2231 rules. """ if value is not None and len(value) > 0: # A tuple is used for RFC 2231 encoded parameter values where items # are (charset, language, value). charset is a string, not a Charset # instance. if isinstance(value, tuple): # Encode as per RFC 2231 param += '*' value = utils.encode_rfc2231(value[2], value[0], value[1]) # BAW: Please check this. I think that if quote is set it should # force quoting even if not necessary. if quote or tspecials.search(value): return '%s="%s"' % (param, utils.quote(value)) else: return '%s=%s' % (param, value) else: return param def _parseparam(s): plist = [] while s[:1] == ';': s = s[1:] end = s.find(';') while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2: end = s.find(';', end + 1) if end < 0: end = len(s) f = s[:end] if '=' in f: i = f.index('=') f = f[:i].strip().lower() + '=' + f[i+1:].strip() plist.append(f.strip()) s = s[end:] return plist def _unquotevalue(value): # This is different than utils.collapse_rfc2231_value() because it doesn't # try to convert the value to a unicode. Message.get_param() and # Message.get_params() are both currently defined to return the tuple in # the face of RFC 2231 parameters. if isinstance(value, tuple): return value[0], value[1], utils.unquote(value[2]) else: return utils.unquote(value) class Message: """Basic message object. A message object is defined as something that has a bunch of RFC 2822 headers and a payload. It may optionally have an envelope header (a.k.a. Unix-From or From_ header). If the message is a container (i.e. a multipart or a message/rfc822), then the payload is a list of Message objects, otherwise it is a string. Message objects implement part of the `mapping' interface, which assumes there is exactly one occurrence of the header per message. Some headers do in fact appear multiple times (e.g. Received) and for those headers, you must use the explicit API to set or get all the headers. Not all of the mapping methods are implemented. """ def __init__(self): self._headers = [] self._unixfrom = None self._payload = None self._charset = None # Defaults for multipart messages self.preamble = self.epilogue = None self.defects = [] # Default content type self._default_type = 'text/plain' def __str__(self): """Return the entire formatted message as a string. This includes the headers, body, and envelope header. """ return self.as_string(unixfrom=True) def as_string(self, unixfrom=False): """Return the entire formatted message as a string. Optional `unixfrom' when True, means include the Unix From_ envelope header. This is a convenience method and may not generate the message exactly as you intend because by default it mangles lines that begin with "From ". For more flexibility, use the flatten() method of a Generator instance. """ from email.generator import Generator fp = StringIO() g = Generator(fp) g.flatten(self, unixfrom=unixfrom) return fp.getvalue() def is_multipart(self): """Return True if the message consists of multiple parts.""" return isinstance(self._payload, list) # # Unix From_ line # def set_unixfrom(self, unixfrom): self._unixfrom = unixfrom def get_unixfrom(self): return self._unixfrom # # Payload manipulation. # def attach(self, payload): """Add the given payload to the current payload. The current payload will always be a list of objects after this method is called. If you want to set the payload to a scalar object, use set_payload() instead. """ if self._payload is None: self._payload = [payload] else: self._payload.append(payload) def get_payload(self, i=None, decode=False): """Return a reference to the payload. The payload will either be a list object or a string. If you mutate the list object, you modify the message's payload in place. Optional i returns that index into the payload. Optional decode is a flag indicating whether the payload should be decoded or not, according to the Content-Transfer-Encoding header (default is False). When True and the message is not a multipart, the payload will be decoded if this header's value is `quoted-printable' or `base64'. If some other encoding is used, or the header is missing, or if the payload has bogus data (i.e. bogus base64 or uuencoded data), the payload is returned as-is. If the message is a multipart and the decode flag is True, then None is returned. """ if i is None: payload = self._payload elif not isinstance(self._payload, list): raise TypeError('Expected list, got %s' % type(self._payload)) else: payload = self._payload[i] if decode: if self.is_multipart(): return None cte = self.get('content-transfer-encoding', '').lower() if cte == 'quoted-printable': return utils._qdecode(payload) elif cte == 'base64': try: return utils._bdecode(payload) except binascii.Error: # Incorrect padding return payload elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'): sfp = StringIO() try: uu.decode(StringIO(payload+'\n'), sfp, quiet=True) payload = sfp.getvalue() except uu.Error: # Some decoding problem return payload # Everything else, including encodings with 8bit or 7bit are returned # unchanged. return payload def set_payload(self, payload, charset=None): """Set the payload to the given value. Optional charset sets the message's default character set. See set_charset() for details. """ self._payload = payload if charset is not None: self.set_charset(charset) def set_charset(self, charset): """Set the charset of the payload to a given character set. charset can be a Charset instance, a string naming a character set, or None. If it is a string it will be converted to a Charset instance. If charset is None, the charset parameter will be removed from the Content-Type field. Anything else will generate a TypeError. The message will be assumed to be of type text/* encoded with charset.input_charset. It will be converted to charset.output_charset and encoded properly, if needed, when generating the plain text representation of the message. MIME headers (MIME-Version, Content-Type, Content-Transfer-Encoding) will be added as needed. """ if charset is None: self.del_param('charset') self._charset = None return if isinstance(charset, basestring): charset = email.charset.Charset(charset) if not isinstance(charset, email.charset.Charset): raise TypeError(charset) # BAW: should we accept strings that can serve as arguments to the # Charset constructor? self._charset = charset if 'MIME-Version' not in self: self.add_header('MIME-Version', '1.0') if 'Content-Type' not in self: self.add_header('Content-Type', 'text/plain', charset=charset.get_output_charset()) else: self.set_param('charset', charset.get_output_charset()) if isinstance(self._payload, unicode): self._payload = self._payload.encode(charset.output_charset) if str(charset) != charset.get_output_charset(): self._payload = charset.body_encode(self._payload) if 'Content-Transfer-Encoding' not in self: cte = charset.get_body_encoding() try: cte(self) except TypeError: self._payload = charset.body_encode(self._payload) self.add_header('Content-Transfer-Encoding', cte) def get_charset(self): """Return the Charset instance associated with the message's payload. """ return self._charset # # MAPPING INTERFACE (partial) # def __len__(self): """Return the total number of headers, including duplicates.""" return len(self._headers) def __getitem__(self, name): """Get a header value. Return None if the header is missing instead of raising an exception. Note that if the header appeared multiple times, exactly which occurrence gets returned is undefined. Use get_all() to get all the values matching a header field name. """ return self.get(name) def __setitem__(self, name, val): """Set the value of a header. Note: this does not overwrite an existing header with the same field name. Use __delitem__() first to delete any existing headers. """ self._headers.append((name, val)) def __delitem__(self, name): """Delete all occurrences of a header, if present. Does not raise an exception if the header is missing. """ name = name.lower() newheaders = [] for k, v in self._headers: if k.lower() != name: newheaders.append((k, v)) self._headers = newheaders def __contains__(self, name): return name.lower() in [k.lower() for k, v in self._headers] def has_key(self, name): """Return true if the message contains the header.""" missing = object() return self.get(name, missing) is not missing def keys(self): """Return a list of all the message's header field names. These will be sorted in the order they appeared in the original message, or were added to the message, and may contain duplicates. Any fields deleted and re-inserted are always appended to the header list. """ return [k for k, v in self._headers] def values(self): """Return a list of all the message's header values. These will be sorted in the order they appeared in the original message, or were added to the message, and may contain duplicates. Any fields deleted and re-inserted are always appended to the header list. """ return [v for k, v in self._headers] def items(self): """Get all the message's header fields and values. These will be sorted in the order they appeared in the original message, or were added to the message, and may contain duplicates. Any fields deleted and re-inserted are always appended to the header list. """ return self._headers[:] def get(self, name, failobj=None): """Get a header value. Like __getitem__() but return failobj instead of None when the field is missing. """ name = name.lower() for k, v in self._headers: if k.lower() == name: return v return failobj # # Additional useful stuff # def get_all(self, name, failobj=None): """Return a list of all the values for the named field. These will be sorted in the order they appeared in the original message, and may contain duplicates. Any fields deleted and re-inserted are always appended to the header list. If no such fields exist, failobj is returned (defaults to None). """ values = [] name = name.lower() for k, v in self._headers: if k.lower() == name: values.append(v) if not values: return failobj return values def add_header(self, _name, _value, **_params): """Extended header setting. name is the header field to add. keyword arguments can be used to set additional parameters for the header field, with underscores converted to dashes. Normally the parameter will be added as key="value" unless value is None, in which case only the key will be added. If a parameter value contains non-ASCII characters it must be specified as a three-tuple of (charset, language, value), in which case it will be encoded according to RFC2231 rules. Example: msg.add_header('content-disposition', 'attachment', filename='bud.gif') """ parts = [] for k, v in _params.items(): if v is None: parts.append(k.replace('_', '-')) else: parts.append(_formatparam(k.replace('_', '-'), v)) if _value is not None: parts.insert(0, _value) self._headers.append((_name, SEMISPACE.join(parts))) def replace_header(self, _name, _value): """Replace a header. Replace the first matching header found in the message, retaining header order and case. If no matching header was found, a KeyError is raised. """ _name = _name.lower() for i, (k, v) in zip(range(len(self._headers)), self._headers): if k.lower() == _name: self._headers[i] = (k, _value) break else: raise KeyError(_name) # # Use these three methods instead of the three above. # def get_content_type(self): """Return the message's content type. The returned string is coerced to lower case of the form `maintype/subtype'. If there was no Content-Type header in the message, the default type as given by get_default_type() will be returned. Since according to RFC 2045, messages always have a default type this will always return a value. RFC 2045 defines a message's default type to be text/plain unless it appears inside a multipart/digest container, in which case it would be message/rfc822. """ missing = object() value = self.get('content-type', missing) if value is missing: # This should have no parameters return self.get_default_type() ctype = _splitparam(value)[0].lower() # RFC 2045, section 5.2 says if its invalid, use text/plain if ctype.count('/') != 1: return 'text/plain' return ctype def get_content_maintype(self): """Return the message's main content type. This is the `maintype' part of the string returned by get_content_type(). """ ctype = self.get_content_type() return ctype.split('/')[0] def get_content_subtype(self): """Returns the message's sub-content type. This is the `subtype' part of the string returned by get_content_type(). """ ctype = self.get_content_type() return ctype.split('/')[1] def get_default_type(self): """Return the `default' content type. Most messages have a default content type of text/plain, except for messages that are subparts of multipart/digest containers. Such subparts have a default content type of message/rfc822. """ return self._default_type def set_default_type(self, ctype): """Set the `default' content type. ctype should be either "text/plain" or "message/rfc822", although this is not enforced. The default content type is not stored in the Content-Type header. """ self._default_type = ctype def _get_params_preserve(self, failobj, header): # Like get_params() but preserves the quoting of values. BAW: # should this be part of the public interface? missing = object() value = self.get(header, missing) if value is missing: return failobj params = [] for p in _parseparam(';' + value): try: name, val = p.split('=', 1) name = name.strip() val = val.strip() except ValueError: # Must have been a bare attribute name = p.strip() val = '' params.append((name, val)) params = utils.decode_params(params) return params def get_params(self, failobj=None, header='content-type', unquote=True): """Return the message's Content-Type parameters, as a list. The elements of the returned list are 2-tuples of key/value pairs, as split on the `=' sign. The left hand side of the `=' is the key, while the right hand side is the value. If there is no `=' sign in the parameter the value is the empty string. The value is as described in the get_param() method. Optional failobj is the object to return if there is no Content-Type header. Optional header is the header to search instead of Content-Type. If unquote is True, the value is unquoted. """ missing = object() params = self._get_params_preserve(missing, header) if params is missing: return failobj if unquote: return [(k, _unquotevalue(v)) for k, v in params] else: return params def get_param(self, param, failobj=None, header='content-type', unquote=True): """Return the parameter value if found in the Content-Type header. Optional failobj is the object to return if there is no Content-Type header, or the Content-Type header has no such parameter. Optional header is the header to search instead of Content-Type. Parameter keys are always compared case insensitively. The return value can either be a string, or a 3-tuple if the parameter was RFC 2231 encoded. When it's a 3-tuple, the elements of the value are of the form (CHARSET, LANGUAGE, VALUE). Note that both CHARSET and LANGUAGE can be None, in which case you should consider VALUE to be encoded in the us-ascii charset. You can usually ignore LANGUAGE. Your application should be prepared to deal with 3-tuple return values, and can convert the parameter to a Unicode string like so: param = msg.get_param('foo') if isinstance(param, tuple): param = unicode(param[2], param[0] or 'us-ascii') In any case, the parameter value (either the returned string, or the VALUE item in the 3-tuple) is always unquoted, unless unquote is set to False. """ if header not in self: return failobj for k, v in self._get_params_preserve(failobj, header): if k.lower() == param.lower(): if unquote: return _unquotevalue(v) else: return v return failobj def set_param(self, param, value, header='Content-Type', requote=True, charset=None, language=''): """Set a parameter in the Content-Type header. If the parameter already exists in the header, its value will be replaced with the new value. If header is Content-Type and has not yet been defined for this message, it will be set to "text/plain" and the new parameter and value will be appended as per RFC 2045. An alternate header can be specified in the header argument, and all parameters will be quoted as necessary unless requote is False. If charset is specified, the parameter will be encoded according to RFC 2231. Optional language specifies the RFC 2231 language, defaulting to the empty string. Both charset and language should be strings. """ if not isinstance(value, tuple) and charset: value = (charset, language, value) if header not in self and header.lower() == 'content-type': ctype = 'text/plain' else: ctype = self.get(header) if not self.get_param(param, header=header): if not ctype: ctype = _formatparam(param, value, requote) else: ctype = SEMISPACE.join( [ctype, _formatparam(param, value, requote)]) else: ctype = '' for old_param, old_value in self.get_params(header=header, unquote=requote): append_param = '' if old_param.lower() == param.lower(): append_param = _formatparam(param, value, requote) else: append_param = _formatparam(old_param, old_value, requote) if not ctype: ctype = append_param else: ctype = SEMISPACE.join([ctype, append_param]) if ctype != self.get(header): del self[header] self[header] = ctype def del_param(self, param, header='content-type', requote=True): """Remove the given parameter completely from the Content-Type header. The header will be re-written in place without the parameter or its value. All values will be quoted as necessary unless requote is False. Optional header specifies an alternative to the Content-Type header. """ if header not in self: return new_ctype = '' for p, v in self.get_params(header=header, unquote=requote): if p.lower() != param.lower(): if not new_ctype: new_ctype = _formatparam(p, v, requote) else: new_ctype = SEMISPACE.join([new_ctype, _formatparam(p, v, requote)]) if new_ctype != self.get(header): del self[header] self[header] = new_ctype def set_type(self, type, header='Content-Type', requote=True): """Set the main type and subtype for the Content-Type header. type must be a string in the form "maintype/subtype", otherwise a ValueError is raised. This method replaces the Content-Type header, keeping all the parameters in place. If requote is False, this leaves the existing header's quoting as is. Otherwise, the parameters will be quoted (the default). An alternative header can be specified in the header argument. When the Content-Type header is set, we'll always also add a MIME-Version header. """ # BAW: should we be strict? if not type.count('/') == 1: raise ValueError # Set the Content-Type, you get a MIME-Version if header.lower() == 'content-type': del self['mime-version'] self['MIME-Version'] = '1.0' if header not in self: self[header] = type return params = self.get_params(header=header, unquote=requote) del self[header] self[header] = type # Skip the first param; it's the old type. for p, v in params[1:]: self.set_param(p, v, header, requote) def get_filename(self, failobj=None): """Return the filename associated with the payload if present. The filename is extracted from the Content-Disposition header's `filename' parameter, and it is unquoted. If that header is missing the `filename' parameter, this method falls back to looking for the `name' parameter. """ missing = object() filename = self.get_param('filename', missing, 'content-disposition') if filename is missing: filename = self.get_param('name', missing, 'content-type') if filename is missing: return failobj return utils.collapse_rfc2231_value(filename).strip() def get_boundary(self, failobj=None): """Return the boundary associated with the payload if present. The boundary is extracted from the Content-Type header's `boundary' parameter, and it is unquoted. """ missing = object() boundary = self.get_param('boundary', missing) if boundary is missing: return failobj # RFC 2046 says that boundaries may begin but not end in w/s return utils.collapse_rfc2231_value(boundary).rstrip() def set_boundary(self, boundary): """Set the boundary parameter in Content-Type to 'boundary'. This is subtly different than deleting the Content-Type header and adding a new one with a new boundary parameter via add_header(). The main difference is that using the set_boundary() method preserves the order of the Content-Type header in the original message. HeaderParseError is raised if the message has no Content-Type header. """ missing = object() params = self._get_params_preserve(missing, 'content-type') if params is missing: # There was no Content-Type header, and we don't know what type # to set it to, so raise an exception. raise errors.HeaderParseError('No Content-Type header found') newparams = [] foundp = False for pk, pv in params: if pk.lower() == 'boundary': newparams.append(('boundary', '"%s"' % boundary)) foundp = True else: newparams.append((pk, pv)) if not foundp: # The original Content-Type header had no boundary attribute. # Tack one on the end. BAW: should we raise an exception # instead??? newparams.append(('boundary', '"%s"' % boundary)) # Replace the existing Content-Type header with the new value newheaders = [] for h, v in self._headers: if h.lower() == 'content-type': parts = [] for k, v in newparams: if v == '': parts.append(k) else: parts.append('%s=%s' % (k, v)) newheaders.append((h, SEMISPACE.join(parts))) else: newheaders.append((h, v)) self._headers = newheaders def get_content_charset(self, failobj=None): """Return the charset parameter of the Content-Type header. The returned string is always coerced to lower case. If there is no Content-Type header, or if that header has no charset parameter, failobj is returned. """ missing = object() charset = self.get_param('charset', missing) if charset is missing: return failobj if isinstance(charset, tuple): # RFC 2231 encoded, so decode it, and it better end up as ascii. pcharset = charset[0] or 'us-ascii' try: # LookupError will be raised if the charset isn't known to # Python. UnicodeError will be raised if the encoded text # contains a character not in the charset. charset = unicode(charset[2], pcharset).encode('us-ascii') except (LookupError, UnicodeError): charset = charset[2] # charset character must be in us-ascii range try: if isinstance(charset, str): charset = unicode(charset, 'us-ascii') charset = charset.encode('us-ascii') except UnicodeError: return failobj # RFC 2046, $4.1.2 says charsets are not case sensitive return charset.lower() def get_charsets(self, failobj=None): """Return a list containing the charset(s) used in this message. The returned list of items describes the Content-Type headers' charset parameter for this message and all the subparts in its payload. Each item will either be a string (the value of the charset parameter in the Content-Type header of that part) or the value of the 'failobj' parameter (defaults to None), if the part does not have a main MIME type of "text", or the charset is not defined. The list will contain one string for each part of the message, plus one for the container message (i.e. self), so that a non-multipart message will still return a list of length 1. """ return [part.get_content_charset(failobj) for part in self.walk()] # I.e. def walk(self): ... from email.iterators import walk __init__.py 0000644 00000005450 15204103444 0006656 0 ustar 00 # Copyright (C) 2001-2006 Python Software Foundation # Author: Barry Warsaw # Contact: email-sig@python.org """A package for parsing, handling, and generating email messages.""" __version__ = '4.0.3' __all__ = [ # Old names 'base64MIME', 'Charset', 'Encoders', 'Errors', 'Generator', 'Header', 'Iterators', 'Message', 'MIMEAudio', 'MIMEBase', 'MIMEImage', 'MIMEMessage', 'MIMEMultipart', 'MIMENonMultipart', 'MIMEText', 'Parser', 'quopriMIME', 'Utils', 'message_from_string', 'message_from_file', # new names 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'mime', 'parser', 'quoprimime', 'utils', ] # Some convenience routines. Don't import Parser and Message as side-effects # of importing email since those cascadingly import most of the rest of the # email package. def message_from_string(s, *args, **kws): """Parse a string into a Message object model. Optional _class and strict are passed to the Parser constructor. """ from email.parser import Parser return Parser(*args, **kws).parsestr(s) def message_from_file(fp, *args, **kws): """Read a file and parse its contents into a Message object model. Optional _class and strict are passed to the Parser constructor. """ from email.parser import Parser return Parser(*args, **kws).parse(fp) # Lazy loading to provide name mapping from new-style names (PEP 8 compatible # email 4.0 module names), to old-style names (email 3.0 module names). import sys class LazyImporter(object): def __init__(self, module_name): self.__name__ = 'email.' + module_name def __getattr__(self, name): __import__(self.__name__) mod = sys.modules[self.__name__] self.__dict__.update(mod.__dict__) return getattr(mod, name) _LOWERNAMES = [ # email.<old name> -> email.<new name is lowercased old name> 'Charset', 'Encoders', 'Errors', 'FeedParser', 'Generator', 'Header', 'Iterators', 'Message', 'Parser', 'Utils', 'base64MIME', 'quopriMIME', ] _MIMENAMES = [ # email.MIME<old name> -> email.mime.<new name is lowercased old name> 'Audio', 'Base', 'Image', 'Message', 'Multipart', 'NonMultipart', 'Text', ] for _name in _LOWERNAMES: importer = LazyImporter(_name.lower()) sys.modules['email.' + _name] = importer setattr(sys.modules['email'], _name, importer) import email.mime for _name in _MIMENAMES: importer = LazyImporter('mime.' + _name.lower()) sys.modules['email.MIME' + _name] = importer setattr(sys.modules['email'], 'MIME' + _name, importer) setattr(sys.modules['email.mime'], _name, importer) headerregistry.py 0000644 00000050513 15204103444 0010140 0 ustar 00 """Representing and manipulating email headers via custom objects. This module provides an implementation of the HeaderRegistry API. The implementation is designed to flexibly follow RFC5322 rules. Eventually HeaderRegistry will be a public API, but it isn't yet, and will probably change some before that happens. """ from types import MappingProxyType from email import utils from email import errors from email import _header_value_parser as parser class Address: def __init__(self, display_name='', username='', domain='', addr_spec=None): """Create an object representing a full email address. An address can have a 'display_name', a 'username', and a 'domain'. In addition to specifying the username and domain separately, they may be specified together by using the addr_spec keyword *instead of* the username and domain keywords. If an addr_spec string is specified it must be properly quoted according to RFC 5322 rules; an error will be raised if it is not. An Address object has display_name, username, domain, and addr_spec attributes, all of which are read-only. The addr_spec and the string value of the object are both quoted according to RFC5322 rules, but without any Content Transfer Encoding. """ inputs = ''.join(filter(None, (display_name, username, domain, addr_spec))) if '\r' in inputs or '\n' in inputs: raise ValueError("invalid arguments; address parts cannot contain CR or LF") # This clause with its potential 'raise' may only happen when an # application program creates an Address object using an addr_spec # keyword. The email library code itself must always supply username # and domain. if addr_spec is not None: if username or domain: raise TypeError("addrspec specified when username and/or " "domain also specified") a_s, rest = parser.get_addr_spec(addr_spec) if rest: raise ValueError("Invalid addr_spec; only '{}' " "could be parsed from '{}'".format( a_s, addr_spec)) if a_s.all_defects: raise a_s.all_defects[0] username = a_s.local_part domain = a_s.domain self._display_name = display_name self._username = username self._domain = domain @property def display_name(self): return self._display_name @property def username(self): return self._username @property def domain(self): return self._domain @property def addr_spec(self): """The addr_spec (username@domain) portion of the address, quoted according to RFC 5322 rules, but with no Content Transfer Encoding. """ nameset = set(self.username) if len(nameset) > len(nameset-parser.DOT_ATOM_ENDS): lp = parser.quote_string(self.username) else: lp = self.username if self.domain: return lp + '@' + self.domain if not lp: return '<>' return lp def __repr__(self): return "{}(display_name={!r}, username={!r}, domain={!r})".format( self.__class__.__name__, self.display_name, self.username, self.domain) def __str__(self): nameset = set(self.display_name) if len(nameset) > len(nameset-parser.SPECIALS): disp = parser.quote_string(self.display_name) else: disp = self.display_name if disp: addr_spec = '' if self.addr_spec=='<>' else self.addr_spec return "{} <{}>".format(disp, addr_spec) return self.addr_spec def __eq__(self, other): if type(other) != type(self): return False return (self.display_name == other.display_name and self.username == other.username and self.domain == other.domain) class Group: def __init__(self, display_name=None, addresses=None): """Create an object representing an address group. An address group consists of a display_name followed by colon and a list of addresses (see Address) terminated by a semi-colon. The Group is created by specifying a display_name and a possibly empty list of Address objects. A Group can also be used to represent a single address that is not in a group, which is convenient when manipulating lists that are a combination of Groups and individual Addresses. In this case the display_name should be set to None. In particular, the string representation of a Group whose display_name is None is the same as the Address object, if there is one and only one Address object in the addresses list. """ self._display_name = display_name self._addresses = tuple(addresses) if addresses else tuple() @property def display_name(self): return self._display_name @property def addresses(self): return self._addresses def __repr__(self): return "{}(display_name={!r}, addresses={!r}".format( self.__class__.__name__, self.display_name, self.addresses) def __str__(self): if self.display_name is None and len(self.addresses)==1: return str(self.addresses[0]) disp = self.display_name if disp is not None: nameset = set(disp) if len(nameset) > len(nameset-parser.SPECIALS): disp = parser.quote_string(disp) adrstr = ", ".join(str(x) for x in self.addresses) adrstr = ' ' + adrstr if adrstr else adrstr return "{}:{};".format(disp, adrstr) def __eq__(self, other): if type(other) != type(self): return False return (self.display_name == other.display_name and self.addresses == other.addresses) # Header Classes # class BaseHeader(str): """Base class for message headers. Implements generic behavior and provides tools for subclasses. A subclass must define a classmethod named 'parse' that takes an unfolded value string and a dictionary as its arguments. The dictionary will contain one key, 'defects', initialized to an empty list. After the call the dictionary must contain two additional keys: parse_tree, set to the parse tree obtained from parsing the header, and 'decoded', set to the string value of the idealized representation of the data from the value. (That is, encoded words are decoded, and values that have canonical representations are so represented.) The defects key is intended to collect parsing defects, which the message parser will subsequently dispose of as appropriate. The parser should not, insofar as practical, raise any errors. Defects should be added to the list instead. The standard header parsers register defects for RFC compliance issues, for obsolete RFC syntax, and for unrecoverable parsing errors. The parse method may add additional keys to the dictionary. In this case the subclass must define an 'init' method, which will be passed the dictionary as its keyword arguments. The method should use (usually by setting them as the value of similarly named attributes) and remove all the extra keys added by its parse method, and then use super to call its parent class with the remaining arguments and keywords. The subclass should also make sure that a 'max_count' attribute is defined that is either None or 1. XXX: need to better define this API. """ def __new__(cls, name, value): kwds = {'defects': []} cls.parse(value, kwds) if utils._has_surrogates(kwds['decoded']): kwds['decoded'] = utils._sanitize(kwds['decoded']) self = str.__new__(cls, kwds['decoded']) del kwds['decoded'] self.init(name, **kwds) return self def init(self, name, *, parse_tree, defects): self._name = name self._parse_tree = parse_tree self._defects = defects @property def name(self): return self._name @property def defects(self): return tuple(self._defects) def __reduce__(self): return ( _reconstruct_header, ( self.__class__.__name__, self.__class__.__bases__, str(self), ), self.__dict__) @classmethod def _reconstruct(cls, value): return str.__new__(cls, value) def fold(self, *, policy): """Fold header according to policy. The parsed representation of the header is folded according to RFC5322 rules, as modified by the policy. If the parse tree contains surrogateescaped bytes, the bytes are CTE encoded using the charset 'unknown-8bit". Any non-ASCII characters in the parse tree are CTE encoded using charset utf-8. XXX: make this a policy setting. The returned value is an ASCII-only string possibly containing linesep characters, and ending with a linesep character. The string includes the header name and the ': ' separator. """ # At some point we need to put fws here if it was in the source. header = parser.Header([ parser.HeaderLabel([ parser.ValueTerminal(self.name, 'header-name'), parser.ValueTerminal(':', 'header-sep')]), ]) if self._parse_tree: header.append( parser.CFWSList([parser.WhiteSpaceTerminal(' ', 'fws')])) header.append(self._parse_tree) return header.fold(policy=policy) def _reconstruct_header(cls_name, bases, value): return type(cls_name, bases, {})._reconstruct(value) class UnstructuredHeader: max_count = None value_parser = staticmethod(parser.get_unstructured) @classmethod def parse(cls, value, kwds): kwds['parse_tree'] = cls.value_parser(value) kwds['decoded'] = str(kwds['parse_tree']) class UniqueUnstructuredHeader(UnstructuredHeader): max_count = 1 class DateHeader: """Header whose value consists of a single timestamp. Provides an additional attribute, datetime, which is either an aware datetime using a timezone, or a naive datetime if the timezone in the input string is -0000. Also accepts a datetime as input. The 'value' attribute is the normalized form of the timestamp, which means it is the output of format_datetime on the datetime. """ max_count = None # This is used only for folding, not for creating 'decoded'. value_parser = staticmethod(parser.get_unstructured) @classmethod def parse(cls, value, kwds): if not value: kwds['defects'].append(errors.HeaderMissingRequiredValue()) kwds['datetime'] = None kwds['decoded'] = '' kwds['parse_tree'] = parser.TokenList() return if isinstance(value, str): value = utils.parsedate_to_datetime(value) kwds['datetime'] = value kwds['decoded'] = utils.format_datetime(kwds['datetime']) kwds['parse_tree'] = cls.value_parser(kwds['decoded']) def init(self, *args, **kw): self._datetime = kw.pop('datetime') super().init(*args, **kw) @property def datetime(self): return self._datetime class UniqueDateHeader(DateHeader): max_count = 1 class AddressHeader: max_count = None @staticmethod def value_parser(value): address_list, value = parser.get_address_list(value) assert not value, 'this should not happen' return address_list @classmethod def parse(cls, value, kwds): if isinstance(value, str): # We are translating here from the RFC language (address/mailbox) # to our API language (group/address). kwds['parse_tree'] = address_list = cls.value_parser(value) groups = [] for addr in address_list.addresses: groups.append(Group(addr.display_name, [Address(mb.display_name or '', mb.local_part or '', mb.domain or '') for mb in addr.all_mailboxes])) defects = list(address_list.all_defects) else: # Assume it is Address/Group stuff if not hasattr(value, '__iter__'): value = [value] groups = [Group(None, [item]) if not hasattr(item, 'addresses') else item for item in value] defects = [] kwds['groups'] = groups kwds['defects'] = defects kwds['decoded'] = ', '.join([str(item) for item in groups]) if 'parse_tree' not in kwds: kwds['parse_tree'] = cls.value_parser(kwds['decoded']) def init(self, *args, **kw): self._groups = tuple(kw.pop('groups')) self._addresses = None super().init(*args, **kw) @property def groups(self): return self._groups @property def addresses(self): if self._addresses is None: self._addresses = tuple(address for group in self._groups for address in group.addresses) return self._addresses class UniqueAddressHeader(AddressHeader): max_count = 1 class SingleAddressHeader(AddressHeader): @property def address(self): if len(self.addresses)!=1: raise ValueError(("value of single address header {} is not " "a single address").format(self.name)) return self.addresses[0] class UniqueSingleAddressHeader(SingleAddressHeader): max_count = 1 class MIMEVersionHeader: max_count = 1 value_parser = staticmethod(parser.parse_mime_version) @classmethod def parse(cls, value, kwds): kwds['parse_tree'] = parse_tree = cls.value_parser(value) kwds['decoded'] = str(parse_tree) kwds['defects'].extend(parse_tree.all_defects) kwds['major'] = None if parse_tree.minor is None else parse_tree.major kwds['minor'] = parse_tree.minor if parse_tree.minor is not None: kwds['version'] = '{}.{}'.format(kwds['major'], kwds['minor']) else: kwds['version'] = None def init(self, *args, **kw): self._version = kw.pop('version') self._major = kw.pop('major') self._minor = kw.pop('minor') super().init(*args, **kw) @property def major(self): return self._major @property def minor(self): return self._minor @property def version(self): return self._version class ParameterizedMIMEHeader: # Mixin that handles the params dict. Must be subclassed and # a property value_parser for the specific header provided. max_count = 1 @classmethod def parse(cls, value, kwds): kwds['parse_tree'] = parse_tree = cls.value_parser(value) kwds['decoded'] = str(parse_tree) kwds['defects'].extend(parse_tree.all_defects) if parse_tree.params is None: kwds['params'] = {} else: # The MIME RFCs specify that parameter ordering is arbitrary. kwds['params'] = {utils._sanitize(name).lower(): utils._sanitize(value) for name, value in parse_tree.params} def init(self, *args, **kw): self._params = kw.pop('params') super().init(*args, **kw) @property def params(self): return MappingProxyType(self._params) class ContentTypeHeader(ParameterizedMIMEHeader): value_parser = staticmethod(parser.parse_content_type_header) def init(self, *args, **kw): super().init(*args, **kw) self._maintype = utils._sanitize(self._parse_tree.maintype) self._subtype = utils._sanitize(self._parse_tree.subtype) @property def maintype(self): return self._maintype @property def subtype(self): return self._subtype @property def content_type(self): return self.maintype + '/' + self.subtype class ContentDispositionHeader(ParameterizedMIMEHeader): value_parser = staticmethod(parser.parse_content_disposition_header) def init(self, *args, **kw): super().init(*args, **kw) cd = self._parse_tree.content_disposition self._content_disposition = cd if cd is None else utils._sanitize(cd) @property def content_disposition(self): return self._content_disposition class ContentTransferEncodingHeader: max_count = 1 value_parser = staticmethod(parser.parse_content_transfer_encoding_header) @classmethod def parse(cls, value, kwds): kwds['parse_tree'] = parse_tree = cls.value_parser(value) kwds['decoded'] = str(parse_tree) kwds['defects'].extend(parse_tree.all_defects) def init(self, *args, **kw): super().init(*args, **kw) self._cte = utils._sanitize(self._parse_tree.cte) @property def cte(self): return self._cte class MessageIDHeader: max_count = 1 value_parser = staticmethod(parser.parse_message_id) @classmethod def parse(cls, value, kwds): kwds['parse_tree'] = parse_tree = cls.value_parser(value) kwds['decoded'] = str(parse_tree) kwds['defects'].extend(parse_tree.all_defects) # The header factory # _default_header_map = { 'subject': UniqueUnstructuredHeader, 'date': UniqueDateHeader, 'resent-date': DateHeader, 'orig-date': UniqueDateHeader, 'sender': UniqueSingleAddressHeader, 'resent-sender': SingleAddressHeader, 'to': UniqueAddressHeader, 'resent-to': AddressHeader, 'cc': UniqueAddressHeader, 'resent-cc': AddressHeader, 'bcc': UniqueAddressHeader, 'resent-bcc': AddressHeader, 'from': UniqueAddressHeader, 'resent-from': AddressHeader, 'reply-to': UniqueAddressHeader, 'mime-version': MIMEVersionHeader, 'content-type': ContentTypeHeader, 'content-disposition': ContentDispositionHeader, 'content-transfer-encoding': ContentTransferEncodingHeader, 'message-id': MessageIDHeader, } class HeaderRegistry: """A header_factory and header registry.""" def __init__(self, base_class=BaseHeader, default_class=UnstructuredHeader, use_default_map=True): """Create a header_factory that works with the Policy API. base_class is the class that will be the last class in the created header class's __bases__ list. default_class is the class that will be used if "name" (see __call__) does not appear in the registry. use_default_map controls whether or not the default mapping of names to specialized classes is copied in to the registry when the factory is created. The default is True. """ self.registry = {} self.base_class = base_class self.default_class = default_class if use_default_map: self.registry.update(_default_header_map) def map_to_type(self, name, cls): """Register cls as the specialized class for handling "name" headers. """ self.registry[name.lower()] = cls def __getitem__(self, name): cls = self.registry.get(name.lower(), self.default_class) return type('_'+cls.__name__, (cls, self.base_class), {}) def __call__(self, name, value): """Create a header instance for header 'name' from 'value'. Creates a header instance by creating a specialized class for parsing and representing the specified header by combining the factory base_class with a specialized class from the registry or the default_class, and passing the name and value to the constructed class's constructor. """ return self[name](name, value) charset.py 0000644 00000037254 15204103444 0006557 0 ustar 00 # Copyright (C) 2001-2006 Python Software Foundation # Author: Ben Gertzfield, Barry Warsaw # Contact: email-sig@python.org __all__ = [ 'Charset', 'add_alias', 'add_charset', 'add_codec', ] import codecs import email.base64mime import email.quoprimime from email import errors from email.encoders import encode_7or8bit # Flags for types of header encodings QP = 1 # Quoted-Printable BASE64 = 2 # Base64 SHORTEST = 3 # the shorter of QP and base64, but only for headers # In "=?charset?q?hello_world?=", the =?, ?q?, and ?= add up to 7 MISC_LEN = 7 DEFAULT_CHARSET = 'us-ascii' # Defaults CHARSETS = { # input header enc body enc output conv 'iso-8859-1': (QP, QP, None), 'iso-8859-2': (QP, QP, None), 'iso-8859-3': (QP, QP, None), 'iso-8859-4': (QP, QP, None), # iso-8859-5 is Cyrillic, and not especially used # iso-8859-6 is Arabic, also not particularly used # iso-8859-7 is Greek, QP will not make it readable # iso-8859-8 is Hebrew, QP will not make it readable 'iso-8859-9': (QP, QP, None), 'iso-8859-10': (QP, QP, None), # iso-8859-11 is Thai, QP will not make it readable 'iso-8859-13': (QP, QP, None), 'iso-8859-14': (QP, QP, None), 'iso-8859-15': (QP, QP, None), 'iso-8859-16': (QP, QP, None), 'windows-1252':(QP, QP, None), 'viscii': (QP, QP, None), 'us-ascii': (None, None, None), 'big5': (BASE64, BASE64, None), 'gb2312': (BASE64, BASE64, None), 'euc-jp': (BASE64, None, 'iso-2022-jp'), 'shift_jis': (BASE64, None, 'iso-2022-jp'), 'iso-2022-jp': (BASE64, None, None), 'koi8-r': (BASE64, BASE64, None), 'utf-8': (SHORTEST, BASE64, 'utf-8'), # We're making this one up to represent raw unencoded 8-bit '8bit': (None, BASE64, 'utf-8'), } # Aliases for other commonly-used names for character sets. Map # them to the real ones used in email. ALIASES = { 'latin_1': 'iso-8859-1', 'latin-1': 'iso-8859-1', 'latin_2': 'iso-8859-2', 'latin-2': 'iso-8859-2', 'latin_3': 'iso-8859-3', 'latin-3': 'iso-8859-3', 'latin_4': 'iso-8859-4', 'latin-4': 'iso-8859-4', 'latin_5': 'iso-8859-9', 'latin-5': 'iso-8859-9', 'latin_6': 'iso-8859-10', 'latin-6': 'iso-8859-10', 'latin_7': 'iso-8859-13', 'latin-7': 'iso-8859-13', 'latin_8': 'iso-8859-14', 'latin-8': 'iso-8859-14', 'latin_9': 'iso-8859-15', 'latin-9': 'iso-8859-15', 'latin_10':'iso-8859-16', 'latin-10':'iso-8859-16', 'cp949': 'ks_c_5601-1987', 'euc_jp': 'euc-jp', 'euc_kr': 'euc-kr', 'ascii': 'us-ascii', } # Map charsets to their Unicode codec strings. CODEC_MAP = { 'gb2312': 'eucgb2312_cn', 'big5': 'big5_tw', # Hack: We don't want *any* conversion for stuff marked us-ascii, as all # sorts of garbage might be sent to us in the guise of 7-bit us-ascii. # Let that stuff pass through without conversion to/from Unicode. 'us-ascii': None, } # Convenience functions for extending the above mappings def add_charset(charset, header_enc=None, body_enc=None, output_charset=None): """Add character set properties to the global registry. charset is the input character set, and must be the canonical name of a character set. Optional header_enc and body_enc is either Charset.QP for quoted-printable, Charset.BASE64 for base64 encoding, Charset.SHORTEST for the shortest of qp or base64 encoding, or None for no encoding. SHORTEST is only valid for header_enc. It describes how message headers and message bodies in the input charset are to be encoded. Default is no encoding. Optional output_charset is the character set that the output should be in. Conversions will proceed from input charset, to Unicode, to the output charset when the method Charset.convert() is called. The default is to output in the same character set as the input. Both input_charset and output_charset must have Unicode codec entries in the module's charset-to-codec mapping; use add_codec(charset, codecname) to add codecs the module does not know about. See the codecs module's documentation for more information. """ if body_enc == SHORTEST: raise ValueError('SHORTEST not allowed for body_enc') CHARSETS[charset] = (header_enc, body_enc, output_charset) def add_alias(alias, canonical): """Add a character set alias. alias is the alias name, e.g. latin-1 canonical is the character set's canonical name, e.g. iso-8859-1 """ ALIASES[alias] = canonical def add_codec(charset, codecname): """Add a codec that map characters in the given charset to/from Unicode. charset is the canonical name of a character set. codecname is the name of a Python codec, as appropriate for the second argument to the unicode() built-in, or to the encode() method of a Unicode string. """ CODEC_MAP[charset] = codecname class Charset: """Map character sets to their email properties. This class provides information about the requirements imposed on email for a specific character set. It also provides convenience routines for converting between character sets, given the availability of the applicable codecs. Given a character set, it will do its best to provide information on how to use that character set in an email in an RFC-compliant way. Certain character sets must be encoded with quoted-printable or base64 when used in email headers or bodies. Certain character sets must be converted outright, and are not allowed in email. Instances of this module expose the following information about a character set: input_charset: The initial character set specified. Common aliases are converted to their `official' email names (e.g. latin_1 is converted to iso-8859-1). Defaults to 7-bit us-ascii. header_encoding: If the character set must be encoded before it can be used in an email header, this attribute will be set to Charset.QP (for quoted-printable), Charset.BASE64 (for base64 encoding), or Charset.SHORTEST for the shortest of QP or BASE64 encoding. Otherwise, it will be None. body_encoding: Same as header_encoding, but describes the encoding for the mail message's body, which indeed may be different than the header encoding. Charset.SHORTEST is not allowed for body_encoding. output_charset: Some character sets must be converted before they can be used in email headers or bodies. If the input_charset is one of them, this attribute will contain the name of the charset output will be converted to. Otherwise, it will be None. input_codec: The name of the Python codec used to convert the input_charset to Unicode. If no conversion codec is necessary, this attribute will be None. output_codec: The name of the Python codec used to convert Unicode to the output_charset. If no conversion codec is necessary, this attribute will have the same value as the input_codec. """ def __init__(self, input_charset=DEFAULT_CHARSET): # RFC 2046, $4.1.2 says charsets are not case sensitive. We coerce to # unicode because its .lower() is locale insensitive. If the argument # is already a unicode, we leave it at that, but ensure that the # charset is ASCII, as the standard (RFC XXX) requires. try: if isinstance(input_charset, unicode): input_charset.encode('ascii') else: input_charset = unicode(input_charset, 'ascii') except UnicodeError: raise errors.CharsetError(input_charset) input_charset = input_charset.lower().encode('ascii') # Set the input charset after filtering through the aliases and/or codecs if not (input_charset in ALIASES or input_charset in CHARSETS): try: input_charset = codecs.lookup(input_charset).name except LookupError: pass self.input_charset = ALIASES.get(input_charset, input_charset) # We can try to guess which encoding and conversion to use by the # charset_map dictionary. Try that first, but let the user override # it. henc, benc, conv = CHARSETS.get(self.input_charset, (SHORTEST, BASE64, None)) if not conv: conv = self.input_charset # Set the attributes, allowing the arguments to override the default. self.header_encoding = henc self.body_encoding = benc self.output_charset = ALIASES.get(conv, conv) # Now set the codecs. If one isn't defined for input_charset, # guess and try a Unicode codec with the same name as input_codec. self.input_codec = CODEC_MAP.get(self.input_charset, self.input_charset) self.output_codec = CODEC_MAP.get(self.output_charset, self.output_charset) def __str__(self): return self.input_charset.lower() __repr__ = __str__ def __eq__(self, other): return str(self) == str(other).lower() def __ne__(self, other): return not self.__eq__(other) def get_body_encoding(self): """Return the content-transfer-encoding used for body encoding. This is either the string `quoted-printable' or `base64' depending on the encoding used, or it is a function in which case you should call the function with a single argument, the Message object being encoded. The function should then set the Content-Transfer-Encoding header itself to whatever is appropriate. Returns "quoted-printable" if self.body_encoding is QP. Returns "base64" if self.body_encoding is BASE64. Returns "7bit" otherwise. """ assert self.body_encoding != SHORTEST if self.body_encoding == QP: return 'quoted-printable' elif self.body_encoding == BASE64: return 'base64' else: return encode_7or8bit def convert(self, s): """Convert a string from the input_codec to the output_codec.""" if self.input_codec != self.output_codec: return unicode(s, self.input_codec).encode(self.output_codec) else: return s def to_splittable(self, s): """Convert a possibly multibyte string to a safely splittable format. Uses the input_codec to try and convert the string to Unicode, so it can be safely split on character boundaries (even for multibyte characters). Returns the string as-is if it isn't known how to convert it to Unicode with the input_charset. Characters that could not be converted to Unicode will be replaced with the Unicode replacement character U+FFFD. """ if isinstance(s, unicode) or self.input_codec is None: return s try: return unicode(s, self.input_codec, 'replace') except LookupError: # Input codec not installed on system, so return the original # string unchanged. return s def from_splittable(self, ustr, to_output=True): """Convert a splittable string back into an encoded string. Uses the proper codec to try and convert the string from Unicode back into an encoded format. Return the string as-is if it is not Unicode, or if it could not be converted from Unicode. Characters that could not be converted from Unicode will be replaced with an appropriate character (usually '?'). If to_output is True (the default), uses output_codec to convert to an encoded format. If to_output is False, uses input_codec. """ if to_output: codec = self.output_codec else: codec = self.input_codec if not isinstance(ustr, unicode) or codec is None: return ustr try: return ustr.encode(codec, 'replace') except LookupError: # Output codec not installed return ustr def get_output_charset(self): """Return the output character set. This is self.output_charset if that is not None, otherwise it is self.input_charset. """ return self.output_charset or self.input_charset def encoded_header_len(self, s): """Return the length of the encoded header string.""" cset = self.get_output_charset() # The len(s) of a 7bit encoding is len(s) if self.header_encoding == BASE64: return email.base64mime.base64_len(s) + len(cset) + MISC_LEN elif self.header_encoding == QP: return email.quoprimime.header_quopri_len(s) + len(cset) + MISC_LEN elif self.header_encoding == SHORTEST: lenb64 = email.base64mime.base64_len(s) lenqp = email.quoprimime.header_quopri_len(s) return min(lenb64, lenqp) + len(cset) + MISC_LEN else: return len(s) def header_encode(self, s, convert=False): """Header-encode a string, optionally converting it to output_charset. If convert is True, the string will be converted from the input charset to the output charset automatically. This is not useful for multibyte character sets, which have line length issues (multibyte characters must be split on a character, not a byte boundary); use the high-level Header class to deal with these issues. convert defaults to False. The type of encoding (base64 or quoted-printable) will be based on self.header_encoding. """ cset = self.get_output_charset() if convert: s = self.convert(s) # 7bit/8bit encodings return the string unchanged (modulo conversions) if self.header_encoding == BASE64: return email.base64mime.header_encode(s, cset) elif self.header_encoding == QP: return email.quoprimime.header_encode(s, cset, maxlinelen=None) elif self.header_encoding == SHORTEST: lenb64 = email.base64mime.base64_len(s) lenqp = email.quoprimime.header_quopri_len(s) if lenb64 < lenqp: return email.base64mime.header_encode(s, cset) else: return email.quoprimime.header_encode(s, cset, maxlinelen=None) else: return s def body_encode(self, s, convert=True): """Body-encode a string and convert it to output_charset. If convert is True (the default), the string will be converted from the input charset to output charset automatically. Unlike header_encode(), there are no issues with byte boundaries and multibyte charsets in email bodies, so this is usually pretty safe. The type of encoding (base64 or quoted-printable) will be based on self.body_encoding. """ if convert: s = self.convert(s) # 7bit/8bit encodings return the string unchanged (module conversions) if self.body_encoding is BASE64: return email.base64mime.body_encode(s) elif self.body_encoding is QP: return email.quoprimime.body_encode(s) else: return s iterators.py 0000644 00000004232 15204103444 0007130 0 ustar 00 # Copyright (C) 2001-2006 Python Software Foundation # Author: Barry Warsaw # Contact: email-sig@python.org """Various types of useful iterators and generators.""" __all__ = [ 'body_line_iterator', 'typed_subpart_iterator', 'walk', # Do not include _structure() since it's part of the debugging API. ] import sys from cStringIO import StringIO # This function will become a method of the Message class def walk(self): """Walk over the message tree, yielding each subpart. The walk is performed in depth-first order. This method is a generator. """ yield self if self.is_multipart(): for subpart in self.get_payload(): for subsubpart in subpart.walk(): yield subsubpart # These two functions are imported into the Iterators.py interface module. def body_line_iterator(msg, decode=False): """Iterate over the parts, returning string payloads line-by-line. Optional decode (default False) is passed through to .get_payload(). """ for subpart in msg.walk(): payload = subpart.get_payload(decode=decode) if isinstance(payload, basestring): for line in StringIO(payload): yield line def typed_subpart_iterator(msg, maintype='text', subtype=None): """Iterate over the subparts with a given MIME type. Use `maintype' as the main MIME type to match against; this defaults to "text". Optional `subtype' is the MIME subtype to match against; if omitted, only the main type is matched. """ for subpart in msg.walk(): if subpart.get_content_maintype() == maintype: if subtype is None or subpart.get_content_subtype() == subtype: yield subpart def _structure(msg, fp=None, level=0, include_default=False): """A handy debugging aid""" if fp is None: fp = sys.stdout tab = ' ' * (level * 4) print >> fp, tab + msg.get_content_type(), if include_default: print >> fp, '[%s]' % msg.get_default_type() else: print >> fp if msg.is_multipart(): for subpart in msg.get_payload(): _structure(subpart, fp, level+1, include_default) _parseaddr.py 0000644 00000037412 15204103444 0007226 0 ustar 00 # Copyright (C) 2002-2007 Python Software Foundation # Contact: email-sig@python.org """Email address parsing code. Lifted directly from rfc822.py. This should eventually be rewritten. """ __all__ = [ 'mktime_tz', 'parsedate', 'parsedate_tz', 'quote', ] import time, calendar SPACE = ' ' EMPTYSTRING = '' COMMASPACE = ', ' # Parse a date field _monthnames = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec', 'january', 'february', 'march', 'april', 'may', 'june', 'july', 'august', 'september', 'october', 'november', 'december'] _daynames = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] # The timezone table does not include the military time zones defined # in RFC822, other than Z. According to RFC1123, the description in # RFC822 gets the signs wrong, so we can't rely on any such time # zones. RFC1123 recommends that numeric timezone indicators be used # instead of timezone names. _timezones = {'UT':0, 'UTC':0, 'GMT':0, 'Z':0, 'AST': -400, 'ADT': -300, # Atlantic (used in Canada) 'EST': -500, 'EDT': -400, # Eastern 'CST': -600, 'CDT': -500, # Central 'MST': -700, 'MDT': -600, # Mountain 'PST': -800, 'PDT': -700 # Pacific } def parsedate_tz(data): """Convert a date string to a time tuple. Accounts for military timezones. """ data = data.split() # The FWS after the comma after the day-of-week is optional, so search and # adjust for this. if data[0].endswith(',') or data[0].lower() in _daynames: # There's a dayname here. Skip it del data[0] else: i = data[0].rfind(',') if i >= 0: data[0] = data[0][i+1:] if len(data) == 3: # RFC 850 date, deprecated stuff = data[0].split('-') if len(stuff) == 3: data = stuff + data[1:] if len(data) == 4: s = data[3] i = s.find('+') if i > 0: data[3:] = [s[:i], s[i+1:]] else: data.append('') # Dummy tz if len(data) < 5: return None data = data[:5] [dd, mm, yy, tm, tz] = data mm = mm.lower() if mm not in _monthnames: dd, mm = mm, dd.lower() if mm not in _monthnames: return None mm = _monthnames.index(mm) + 1 if mm > 12: mm -= 12 if dd[-1] == ',': dd = dd[:-1] i = yy.find(':') if i > 0: yy, tm = tm, yy if yy[-1] == ',': yy = yy[:-1] if not yy[0].isdigit(): yy, tz = tz, yy if tm[-1] == ',': tm = tm[:-1] tm = tm.split(':') if len(tm) == 2: [thh, tmm] = tm tss = '0' elif len(tm) == 3: [thh, tmm, tss] = tm else: return None try: yy = int(yy) dd = int(dd) thh = int(thh) tmm = int(tmm) tss = int(tss) except ValueError: return None # Check for a yy specified in two-digit format, then convert it to the # appropriate four-digit format, according to the POSIX standard. RFC 822 # calls for a two-digit yy, but RFC 2822 (which obsoletes RFC 822) # mandates a 4-digit yy. For more information, see the documentation for # the time module. if yy < 100: # The year is between 1969 and 1999 (inclusive). if yy > 68: yy += 1900 # The year is between 2000 and 2068 (inclusive). else: yy += 2000 tzoffset = None tz = tz.upper() if tz in _timezones: tzoffset = _timezones[tz] else: try: tzoffset = int(tz) except ValueError: pass # Convert a timezone offset into seconds ; -0500 -> -18000 if tzoffset: if tzoffset < 0: tzsign = -1 tzoffset = -tzoffset else: tzsign = 1 tzoffset = tzsign * ( (tzoffset//100)*3600 + (tzoffset % 100)*60) # Daylight Saving Time flag is set to -1, since DST is unknown. return yy, mm, dd, thh, tmm, tss, 0, 1, -1, tzoffset def parsedate(data): """Convert a time string to a time tuple.""" t = parsedate_tz(data) if isinstance(t, tuple): return t[:9] else: return t def mktime_tz(data): """Turn a 10-tuple as returned by parsedate_tz() into a POSIX timestamp.""" if data[9] is None: # No zone info, so localtime is better assumption than GMT return time.mktime(data[:8] + (-1,)) else: t = calendar.timegm(data) return t - data[9] def quote(str): """Prepare string to be used in a quoted string. Turns backslash and double quote characters into quoted pairs. These are the only characters that need to be quoted inside a quoted string. Does not add the surrounding double quotes. """ return str.replace('\\', '\\\\').replace('"', '\\"') class AddrlistClass: """Address parser class by Ben Escoto. To understand what this class does, it helps to have a copy of RFC 2822 in front of you. Note: this class interface is deprecated and may be removed in the future. Use rfc822.AddressList instead. """ def __init__(self, field): """Initialize a new instance. `field' is an unparsed address header field, containing one or more addresses. """ self.specials = '()<>@,:;.\"[]' self.pos = 0 self.LWS = ' \t' self.CR = '\r\n' self.FWS = self.LWS + self.CR self.atomends = self.specials + self.LWS + self.CR # Note that RFC 2822 now specifies `.' as obs-phrase, meaning that it # is obsolete syntax. RFC 2822 requires that we recognize obsolete # syntax, so allow dots in phrases. self.phraseends = self.atomends.replace('.', '') self.field = field self.commentlist = [] def gotonext(self): """Parse up to the start of the next address.""" while self.pos < len(self.field): if self.field[self.pos] in self.LWS + '\n\r': self.pos += 1 elif self.field[self.pos] == '(': self.commentlist.append(self.getcomment()) else: break def getaddrlist(self): """Parse all addresses. Returns a list containing all of the addresses. """ result = [] while self.pos < len(self.field): ad = self.getaddress() if ad: result += ad else: result.append(('', '')) return result def getaddress(self): """Parse the next address.""" self.commentlist = [] self.gotonext() oldpos = self.pos oldcl = self.commentlist plist = self.getphraselist() self.gotonext() returnlist = [] if self.pos >= len(self.field): # Bad email address technically, no domain. if plist: returnlist = [(SPACE.join(self.commentlist), plist[0])] elif self.field[self.pos] in '.@': # email address is just an addrspec # this isn't very efficient since we start over self.pos = oldpos self.commentlist = oldcl addrspec = self.getaddrspec() returnlist = [(SPACE.join(self.commentlist), addrspec)] elif self.field[self.pos] == ':': # address is a group returnlist = [] fieldlen = len(self.field) self.pos += 1 while self.pos < len(self.field): self.gotonext() if self.pos < fieldlen and self.field[self.pos] == ';': self.pos += 1 break returnlist = returnlist + self.getaddress() elif self.field[self.pos] == '<': # Address is a phrase then a route addr routeaddr = self.getrouteaddr() if self.commentlist: returnlist = [(SPACE.join(plist) + ' (' + ' '.join(self.commentlist) + ')', routeaddr)] else: returnlist = [(SPACE.join(plist), routeaddr)] else: if plist: returnlist = [(SPACE.join(self.commentlist), plist[0])] elif self.field[self.pos] in self.specials: self.pos += 1 self.gotonext() if self.pos < len(self.field) and self.field[self.pos] == ',': self.pos += 1 return returnlist def getrouteaddr(self): """Parse a route address (Return-path value). This method just skips all the route stuff and returns the addrspec. """ if self.field[self.pos] != '<': return expectroute = False self.pos += 1 self.gotonext() adlist = '' while self.pos < len(self.field): if expectroute: self.getdomain() expectroute = False elif self.field[self.pos] == '>': self.pos += 1 break elif self.field[self.pos] == '@': self.pos += 1 expectroute = True elif self.field[self.pos] == ':': self.pos += 1 else: adlist = self.getaddrspec() self.pos += 1 break self.gotonext() return adlist def getaddrspec(self): """Parse an RFC 2822 addr-spec.""" aslist = [] self.gotonext() while self.pos < len(self.field): if self.field[self.pos] == '.': aslist.append('.') self.pos += 1 elif self.field[self.pos] == '"': aslist.append('"%s"' % quote(self.getquote())) elif self.field[self.pos] in self.atomends: break else: aslist.append(self.getatom()) self.gotonext() if self.pos >= len(self.field) or self.field[self.pos] != '@': return EMPTYSTRING.join(aslist) aslist.append('@') self.pos += 1 self.gotonext() domain = self.getdomain() if not domain: # Invalid domain, return an empty address instead of returning a # local part to denote failed parsing. return EMPTYSTRING return EMPTYSTRING.join(aslist) + domain def getdomain(self): """Get the complete domain name from an address.""" sdlist = [] while self.pos < len(self.field): if self.field[self.pos] in self.LWS: self.pos += 1 elif self.field[self.pos] == '(': self.commentlist.append(self.getcomment()) elif self.field[self.pos] == '[': sdlist.append(self.getdomainliteral()) elif self.field[self.pos] == '.': self.pos += 1 sdlist.append('.') elif self.field[self.pos] == '@': # bpo-34155: Don't parse domains with two `@` like # `a@malicious.org@important.com`. return EMPTYSTRING elif self.field[self.pos] in self.atomends: break else: sdlist.append(self.getatom()) return EMPTYSTRING.join(sdlist) def getdelimited(self, beginchar, endchars, allowcomments=True): """Parse a header fragment delimited by special characters. `beginchar' is the start character for the fragment. If self is not looking at an instance of `beginchar' then getdelimited returns the empty string. `endchars' is a sequence of allowable end-delimiting characters. Parsing stops when one of these is encountered. If `allowcomments' is non-zero, embedded RFC 2822 comments are allowed within the parsed fragment. """ if self.field[self.pos] != beginchar: return '' slist = [''] quote = False self.pos += 1 while self.pos < len(self.field): if quote: slist.append(self.field[self.pos]) quote = False elif self.field[self.pos] in endchars: self.pos += 1 break elif allowcomments and self.field[self.pos] == '(': slist.append(self.getcomment()) continue # have already advanced pos from getcomment elif self.field[self.pos] == '\\': quote = True else: slist.append(self.field[self.pos]) self.pos += 1 return EMPTYSTRING.join(slist) def getquote(self): """Get a quote-delimited fragment from self's field.""" return self.getdelimited('"', '"\r', False) def getcomment(self): """Get a parenthesis-delimited fragment from self's field.""" return self.getdelimited('(', ')\r', True) def getdomainliteral(self): """Parse an RFC 2822 domain-literal.""" return '[%s]' % self.getdelimited('[', ']\r', False) def getatom(self, atomends=None): """Parse an RFC 2822 atom. Optional atomends specifies a different set of end token delimiters (the default is to use self.atomends). This is used e.g. in getphraselist() since phrase endings must not include the `.' (which is legal in phrases).""" atomlist = [''] if atomends is None: atomends = self.atomends while self.pos < len(self.field): if self.field[self.pos] in atomends: break else: atomlist.append(self.field[self.pos]) self.pos += 1 return EMPTYSTRING.join(atomlist) def getphraselist(self): """Parse a sequence of RFC 2822 phrases. A phrase is a sequence of words, which are in turn either RFC 2822 atoms or quoted-strings. Phrases are canonicalized by squeezing all runs of continuous whitespace into one space. """ plist = [] while self.pos < len(self.field): if self.field[self.pos] in self.FWS: self.pos += 1 elif self.field[self.pos] == '"': plist.append(self.getquote()) elif self.field[self.pos] == '(': self.commentlist.append(self.getcomment()) elif self.field[self.pos] in self.phraseends: break else: plist.append(self.getatom(self.phraseends)) return plist class AddressList(AddrlistClass): """An AddressList encapsulates a list of parsed RFC 2822 addresses.""" def __init__(self, field): AddrlistClass.__init__(self, field) if field: self.addresslist = self.getaddrlist() else: self.addresslist = [] def __len__(self): return len(self.addresslist) def __add__(self, other): # Set union newaddr = AddressList(None) newaddr.addresslist = self.addresslist[:] for x in other.addresslist: if not x in self.addresslist: newaddr.addresslist.append(x) return newaddr def __iadd__(self, other): # Set union, in-place for x in other.addresslist: if not x in self.addresslist: self.addresslist.append(x) return self def __sub__(self, other): # Set difference newaddr = AddressList(None) for x in self.addresslist: if not x in other.addresslist: newaddr.addresslist.append(x) return newaddr def __isub__(self, other): # Set difference, in-place for x in other.addresslist: if x in self.addresslist: self.addresslist.remove(x) return self def __getitem__(self, index): # Make indexing, slices, and 'in' work return self.addresslist[index] _encoded_words.py 0000644 00000020514 15204103444 0010073 0 ustar 00 """ Routines for manipulating RFC2047 encoded words. This is currently a package-private API, but will be considered for promotion to a public API if there is demand. """ # An ecoded word looks like this: # # =?charset[*lang]?cte?encoded_string?= # # for more information about charset see the charset module. Here it is one # of the preferred MIME charset names (hopefully; you never know when parsing). # cte (Content Transfer Encoding) is either 'q' or 'b' (ignoring case). In # theory other letters could be used for other encodings, but in practice this # (almost?) never happens. There could be a public API for adding entries # to the CTE tables, but YAGNI for now. 'q' is Quoted Printable, 'b' is # Base64. The meaning of encoded_string should be obvious. 'lang' is optional # as indicated by the brackets (they are not part of the syntax) but is almost # never encountered in practice. # # The general interface for a CTE decoder is that it takes the encoded_string # as its argument, and returns a tuple (cte_decoded_string, defects). The # cte_decoded_string is the original binary that was encoded using the # specified cte. 'defects' is a list of MessageDefect instances indicating any # problems encountered during conversion. 'charset' and 'lang' are the # corresponding strings extracted from the EW, case preserved. # # The general interface for a CTE encoder is that it takes a binary sequence # as input and returns the cte_encoded_string, which is an ascii-only string. # # Each decoder must also supply a length function that takes the binary # sequence as its argument and returns the length of the resulting encoded # string. # # The main API functions for the module are decode, which calls the decoder # referenced by the cte specifier, and encode, which adds the appropriate # RFC 2047 "chrome" to the encoded string, and can optionally automatically # select the shortest possible encoding. See their docstrings below for # details. import re import base64 import binascii import functools from string import ascii_letters, digits from email import errors __all__ = ['decode_q', 'encode_q', 'decode_b', 'encode_b', 'len_q', 'len_b', 'decode', 'encode', ] # # Quoted Printable # # regex based decoder. _q_byte_subber = functools.partial(re.compile(br'=([a-fA-F0-9]{2})').sub, lambda m: bytes.fromhex(m.group(1).decode())) def decode_q(encoded): encoded = encoded.replace(b'_', b' ') return _q_byte_subber(encoded), [] # dict mapping bytes to their encoded form class _QByteMap(dict): safe = b'-!*+/' + ascii_letters.encode('ascii') + digits.encode('ascii') def __missing__(self, key): if key in self.safe: self[key] = chr(key) else: self[key] = "={:02X}".format(key) return self[key] _q_byte_map = _QByteMap() # In headers spaces are mapped to '_'. _q_byte_map[ord(' ')] = '_' def encode_q(bstring): return ''.join(_q_byte_map[x] for x in bstring) def len_q(bstring): return sum(len(_q_byte_map[x]) for x in bstring) # # Base64 # def decode_b(encoded): # First try encoding with validate=True, fixing the padding if needed. # This will succeed only if encoded includes no invalid characters. pad_err = len(encoded) % 4 missing_padding = b'==='[:4-pad_err] if pad_err else b'' try: return ( base64.b64decode(encoded + missing_padding, validate=True), [errors.InvalidBase64PaddingDefect()] if pad_err else [], ) except binascii.Error: # Since we had correct padding, this is likely an invalid char error. # # The non-alphabet characters are ignored as far as padding # goes, but we don't know how many there are. So try without adding # padding to see if it works. try: return ( base64.b64decode(encoded, validate=False), [errors.InvalidBase64CharactersDefect()], ) except binascii.Error: # Add as much padding as could possibly be necessary (extra padding # is ignored). try: return ( base64.b64decode(encoded + b'==', validate=False), [errors.InvalidBase64CharactersDefect(), errors.InvalidBase64PaddingDefect()], ) except binascii.Error: # This only happens when the encoded string's length is 1 more # than a multiple of 4, which is invalid. # # bpo-27397: Just return the encoded string since there's no # way to decode. return encoded, [errors.InvalidBase64LengthDefect()] def encode_b(bstring): return base64.b64encode(bstring).decode('ascii') def len_b(bstring): groups_of_3, leftover = divmod(len(bstring), 3) # 4 bytes out for each 3 bytes (or nonzero fraction thereof) in. return groups_of_3 * 4 + (4 if leftover else 0) _cte_decoders = { 'q': decode_q, 'b': decode_b, } def decode(ew): """Decode encoded word and return (string, charset, lang, defects) tuple. An RFC 2047/2243 encoded word has the form: =?charset*lang?cte?encoded_string?= where '*lang' may be omitted but the other parts may not be. This function expects exactly such a string (that is, it does not check the syntax and may raise errors if the string is not well formed), and returns the encoded_string decoded first from its Content Transfer Encoding and then from the resulting bytes into unicode using the specified charset. If the cte-decoded string does not successfully decode using the specified character set, a defect is added to the defects list and the unknown octets are replaced by the unicode 'unknown' character \\uFDFF. The specified charset and language are returned. The default for language, which is rarely if ever encountered, is the empty string. """ _, charset, cte, cte_string, _ = ew.split('?') charset, _, lang = charset.partition('*') cte = cte.lower() # Recover the original bytes and do CTE decoding. bstring = cte_string.encode('ascii', 'surrogateescape') bstring, defects = _cte_decoders[cte](bstring) # Turn the CTE decoded bytes into unicode. try: string = bstring.decode(charset) except UnicodeError: defects.append(errors.UndecodableBytesDefect("Encoded word " "contains bytes not decodable using {} charset".format(charset))) string = bstring.decode(charset, 'surrogateescape') except LookupError: string = bstring.decode('ascii', 'surrogateescape') if charset.lower() != 'unknown-8bit': defects.append(errors.CharsetError("Unknown charset {} " "in encoded word; decoded as unknown bytes".format(charset))) return string, charset, lang, defects _cte_encoders = { 'q': encode_q, 'b': encode_b, } _cte_encode_length = { 'q': len_q, 'b': len_b, } def encode(string, charset='utf-8', encoding=None, lang=''): """Encode string using the CTE encoding that produces the shorter result. Produces an RFC 2047/2243 encoded word of the form: =?charset*lang?cte?encoded_string?= where '*lang' is omitted unless the 'lang' parameter is given a value. Optional argument charset (defaults to utf-8) specifies the charset to use to encode the string to binary before CTE encoding it. Optional argument 'encoding' is the cte specifier for the encoding that should be used ('q' or 'b'); if it is None (the default) the encoding which produces the shortest encoded sequence is used, except that 'q' is preferred if it is up to five characters longer. Optional argument 'lang' (default '') gives the RFC 2243 language string to specify in the encoded word. """ if charset == 'unknown-8bit': bstring = string.encode('ascii', 'surrogateescape') else: bstring = string.encode(charset) if encoding is None: qlen = _cte_encode_length['q'](bstring) blen = _cte_encode_length['b'](bstring) # Bias toward q. 5 is arbitrary. encoding = 'q' if qlen - blen < 5 else 'b' encoded = _cte_encoders[encoding](bstring) if lang: lang = '*' + lang return "=?{}{}?{}?{}?=".format(charset, lang, encoding, encoded) architecture.rst 0000644 00000022531 15204103444 0007760 0 ustar 00 :mod:`email` Package Architecture ================================= Overview -------- The email package consists of three major components: Model An object structure that represents an email message, and provides an API for creating, querying, and modifying a message. Parser Takes a sequence of characters or bytes and produces a model of the email message represented by those characters or bytes. Generator Takes a model and turns it into a sequence of characters or bytes. The sequence can either be intended for human consumption (a printable unicode string) or bytes suitable for transmission over the wire. In the latter case all data is properly encoded using the content transfer encodings specified by the relevant RFCs. Conceptually the package is organized around the model. The model provides both "external" APIs intended for use by application programs using the library, and "internal" APIs intended for use by the Parser and Generator components. This division is intentionally a bit fuzzy; the API described by this documentation is all a public, stable API. This allows for an application with special needs to implement its own parser and/or generator. In addition to the three major functional components, there is a third key component to the architecture: Policy An object that specifies various behavioral settings and carries implementations of various behavior-controlling methods. The Policy framework provides a simple and convenient way to control the behavior of the library, making it possible for the library to be used in a very flexible fashion while leveraging the common code required to parse, represent, and generate message-like objects. For example, in addition to the default :rfc:`5322` email message policy, we also have a policy that manages HTTP headers in a fashion compliant with :rfc:`2616`. Individual policy controls, such as the maximum line length produced by the generator, can also be controlled individually to meet specialized application requirements. The Model --------- The message model is implemented by the :class:`~email.message.Message` class. The model divides a message into the two fundamental parts discussed by the RFC: the header section and the body. The `Message` object acts as a pseudo-dictionary of named headers. Its dictionary interface provides convenient access to individual headers by name. However, all headers are kept internally in an ordered list, so that the information about the order of the headers in the original message is preserved. The `Message` object also has a `payload` that holds the body. A `payload` can be one of two things: data, or a list of `Message` objects. The latter is used to represent a multipart MIME message. Lists can be nested arbitrarily deeply in order to represent the message, with all terminal leaves having non-list data payloads. Message Lifecycle ----------------- The general lifecycle of a message is: Creation A `Message` object can be created by a Parser, or it can be instantiated as an empty message by an application. Manipulation The application may examine one or more headers, and/or the payload, and it may modify one or more headers and/or the payload. This may be done on the top level `Message` object, or on any sub-object. Finalization The Model is converted into a unicode or binary stream, or the model is discarded. Header Policy Control During Lifecycle -------------------------------------- One of the major controls exerted by the Policy is the management of headers during the `Message` lifecycle. Most applications don't need to be aware of this. A header enters the model in one of two ways: via a Parser, or by being set to a specific value by an application program after the Model already exists. Similarly, a header exits the model in one of two ways: by being serialized by a Generator, or by being retrieved from a Model by an application program. The Policy object provides hooks for all four of these pathways. The model storage for headers is a list of (name, value) tuples. The Parser identifies headers during parsing, and passes them to the :meth:`~email.policy.Policy.header_source_parse` method of the Policy. The result of that method is the (name, value) tuple to be stored in the model. When an application program supplies a header value (for example, through the `Message` object `__setitem__` interface), the name and the value are passed to the :meth:`~email.policy.Policy.header_store_parse` method of the Policy, which returns the (name, value) tuple to be stored in the model. When an application program retrieves a header (through any of the dict or list interfaces of `Message`), the name and value are passed to the :meth:`~email.policy.Policy.header_fetch_parse` method of the Policy to obtain the value returned to the application. When a Generator requests a header during serialization, the name and value are passed to the :meth:`~email.policy.Policy.fold` method of the Policy, which returns a string containing line breaks in the appropriate places. The :meth:`~email.policy.Policy.cte_type` Policy control determines whether or not Content Transfer Encoding is performed on the data in the header. There is also a :meth:`~email.policy.Policy.binary_fold` method for use by generators that produce binary output, which returns the folded header as binary data, possibly folded at different places than the corresponding string would be. Handling Binary Data -------------------- In an ideal world all message data would conform to the RFCs, meaning that the parser could decode the message into the idealized unicode message that the sender originally wrote. In the real world, the email package must also be able to deal with badly formatted messages, including messages containing non-ASCII characters that either have no indicated character set or are not valid characters in the indicated character set. Since email messages are *primarily* text data, and operations on message data are primarily text operations (except for binary payloads of course), the model stores all text data as unicode strings. Un-decodable binary inside text data is handled by using the `surrogateescape` error handler of the ASCII codec. As with the binary filenames the error handler was introduced to handle, this allows the email package to "carry" the binary data received during parsing along until the output stage, at which time it is regenerated in its original form. This carried binary data is almost entirely an implementation detail. The one place where it is visible in the API is in the "internal" API. A Parser must do the `surrogateescape` encoding of binary input data, and pass that data to the appropriate Policy method. The "internal" interface used by the Generator to access header values preserves the `surrogateescaped` bytes. All other interfaces convert the binary data either back into bytes or into a safe form (losing information in some cases). Backward Compatibility ---------------------- The :class:`~email.policy.Policy.Compat32` Policy provides backward compatibility with version 5.1 of the email package. It does this via the following implementation of the four+1 Policy methods described above: header_source_parse Splits the first line on the colon to obtain the name, discards any spaces after the colon, and joins the remainder of the line with all of the remaining lines, preserving the linesep characters to obtain the value. Trailing carriage return and/or linefeed characters are stripped from the resulting value string. header_store_parse Returns the name and value exactly as received from the application. header_fetch_parse If the value contains any `surrogateescaped` binary data, return the value as a :class:`~email.header.Header` object, using the character set `unknown-8bit`. Otherwise just returns the value. fold Uses :class:`~email.header.Header`'s folding to fold headers in the same way the email5.1 generator did. binary_fold Same as fold, but encodes to 'ascii'. New Algorithm ------------- header_source_parse Same as legacy behavior. header_store_parse Same as legacy behavior. header_fetch_parse If the value is already a header object, returns it. Otherwise, parses the value using the new parser, and returns the resulting object as the value. `surrogateescaped` bytes get turned into unicode unknown character code points. fold Uses the new header folding algorithm, respecting the policy settings. surrogateescaped bytes are encoded using the ``unknown-8bit`` charset for ``cte_type=7bit`` or ``8bit``. Returns a string. At some point there will also be a ``cte_type=unicode``, and for that policy fold will serialize the idealized unicode message with RFC-like folding, converting any surrogateescaped bytes into the unicode unknown character glyph. binary_fold Uses the new header folding algorithm, respecting the policy settings. surrogateescaped bytes are encoded using the `unknown-8bit` charset for ``cte_type=7bit``, and get turned back into bytes for ``cte_type=8bit``. Returns bytes. At some point there will also be a ``cte_type=unicode``, and for that policy binary_fold will serialize the message according to :rfc:``5335``. __pycache__/utils.cpython-38.opt-2.pyc 0000644 00000014114 15204103444 0013502 0 ustar 00 U e5d�4 � @ sf d ddddddddd d ddd dgZ ddlZddlZddlZddlZddlZddlZddlZddl m Z ddl mZ ddl m Z ddl mZmZmZ ddlmZ dZdZdZdZdZe�d�Ze�d�Zdd� Zdd� Zd6d!d�Zd"d� Zd#d$� Zd7d&d�Z d8d'd�Z!d9d(d�Z"d)d � Z#d*d � Z$d+d� Z%d,d� Z&d:d-d�Z'e�d.ej(�Z)d/d� Z*d;d2d �Z+d<d4d5�Z,dS )=�collapse_rfc2231_value� decode_params�decode_rfc2231�encode_rfc2231� formataddr� formatdate�format_datetime�getaddresses� make_msgid� mktime_tz� parseaddr� parsedate�parsedate_tz�parsedate_to_datetime�unquote� N)�quote)�AddressList)r )r r � _parsedate_tz)�Charsetz, � z �'z[][\\()<>@,:;".]z[\\"]c C s* z| � � W dS tk r$ Y dS X d S )NFT)�encode�UnicodeEncodeError)�s� r �#/usr/lib64/python3.8/email/utils.py�_has_surrogates3 s r c C s | � dd�}|�dd�S )N�utf-8�surrogateescape�replace)r �decode)�stringZoriginal_bytesr r r � _sanitize@ s r"