1 # Copyright (C) 2002-2007 Python Software Foundation 2 # Author: Ben Gertzfield, Barry Warsaw 3 # Contact: email-sig@python.org 4 5 """Header encoding and decoding functionality.""" 6 7 __all__ = [ 8 'Header', 9 'decode_header', 10 'make_header', 11 ] 12 13 import re 14 import binascii 15 16 import email.quoprimime 17 import email.base64mime 18 19 from email.errors import HeaderParseError 20 from email import charset as _charset 21 Charset = _charset.Charset 22 23 NL = '\n' 24 SPACE = ' ' 25 BSPACE = b' ' 26 SPACE8 = ' ' * 8 27 EMPTYSTRING = '' 28 MAXLINELEN = 78 29 FWS = ' \t' 30 31 USASCII = Charset('us-ascii') 32 UTF8 = Charset('utf-8') 33 34 # Match encoded-word strings in the form =?charset?q?Hello_World?= 35 ecre = re.compile(r''' 36 =\? # literal =? 37 (?P<charset>[^?]*?) # non-greedy up to the next ? is the charset 38 \? # literal ? 39 (?P<encoding>[qb]) # either a "q" or a "b", case insensitive 40 \? # literal ? 41 (?P<encoded>.*?) # non-greedy up to the next ?= is the encoded string 42 \?= # literal ?= 43 ''', re.VERBOSE | re.IGNORECASE | re.MULTILINE) 44 45 # Field name regexp, including trailing colon, but not separating whitespace, 46 # according to RFC 2822. Character range is from tilde to exclamation mark. 47 # For use with .match() 48 fcre = re.compile(r'[\041-\176]+:$') 49 50 # Find a header embedded in a putative header value. Used to check for 51 # header injection attack. 52 _embeded_header = re.compile(r'\n[^ \t]+:') 53 54 55 56 # Helpers 57 _max_append = email.quoprimime._max_append 58 59 60 61 def decode_header(header): 62 """Decode a message header value without converting charset. 63 64 Returns a list of (string, charset) pairs containing each of the decoded 65 parts of the header. Charset is None for non-encoded parts of the header, 66 otherwise a lower-case string containing the name of the character set 67 specified in the encoded string. 68 69 header may be a string that may or may not contain RFC2047 encoded words, 70 or it may be a Header object. 71 72 An email.errors.HeaderParseError may be raised when certain decoding error 73 occurs (e.g. a base64 decoding exception). 74 """ 75 # If it is a Header object, we can just return the encoded chunks. 76 if hasattr(header, '_chunks'): 77 return [(_charset._encode(string, str(charset)), str(charset)) 78 for string, charset in header._chunks] 79 # If no encoding, just return the header with no charset. 80 if not ecre.search(header): 81 return [(header, None)] 82 # First step is to parse all the encoded parts into triplets of the form 83 # (encoded_string, encoding, charset). For unencoded strings, the last 84 # two parts will be None. 85 words = [] 86 for line in header.splitlines(): 87 parts = ecre.split(line) 88 first = True 89 while parts: 90 unencoded = parts.pop(0) 91 if first: 92 unencoded = unencoded.lstrip() 93 first = False 94 if unencoded: 95 words.append((unencoded, None, None)) 96 if parts: 97 charset = parts.pop(0).lower() 98 encoding = parts.pop(0).lower() 99 encoded = parts.pop(0) 100 words.append((encoded, encoding, charset)) 101 # Now loop over words and remove words that consist of whitespace 102 # between two encoded strings. 103 import sys 104 droplist = [] 105 for n, w in enumerate(words): 106 if n>1 and w[1] and words[n-2][1] and words[n-1][0].isspace(): 107 droplist.append(n-1) 108 for d in reversed(droplist): 109 del words[d] 110 111 # The next step is to decode each encoded word by applying the reverse 112 # base64 or quopri transformation. decoded_words is now a list of the 113 # form (decoded_word, charset). 114 decoded_words = [] 115 for encoded_string, encoding, charset in words: 116 if encoding is None: 117 # This is an unencoded word. 118 decoded_words.append((encoded_string, charset)) 119 elif encoding == 'q': 120 word = email.quoprimime.header_decode(encoded_string) 121 decoded_words.append((word, charset)) 122 elif encoding == 'b': 123 paderr = len(encoded_string) % 4 # Postel's law: add missing padding 124 if paderr: 125 encoded_string += '==='[:4 - paderr] 126 try: 127 word = email.base64mime.decode(encoded_string) 128 except binascii.Error: 129 raise HeaderParseError('Base64 decoding error') 130 else: 131 decoded_words.append((word, charset)) 132 else: 133 raise AssertionError('Unexpected encoding: ' + encoding) 134 # Now convert all words to bytes and collapse consecutive runs of 135 # similarly encoded words. 136 collapsed = [] 137 last_word = last_charset = None 138 for word, charset in decoded_words: 139 if isinstance(word, str): 140 word = bytes(word, 'raw-unicode-escape') 141 if last_word is None: 142 last_word = word 143 last_charset = charset 144 elif charset != last_charset: 145 collapsed.append((last_word, last_charset)) 146 last_word = word 147 last_charset = charset 148 elif last_charset is None: 149 last_word += BSPACE + word 150 else: 151 last_word += word 152 collapsed.append((last_word, last_charset)) 153 return collapsed 154 155 156 157 def make_header(decoded_seq, maxlinelen=None, header_name=None, 158 continuation_ws=' '): 159 """Create a Header from a sequence of pairs as returned by decode_header() 160 161 decode_header() takes a header value string and returns a sequence of 162 pairs of the format (decoded_string, charset) where charset is the string 163 name of the character set. 164 165 This function takes one of those sequence of pairs and returns a Header 166 instance. Optional maxlinelen, header_name, and continuation_ws are as in 167 the Header constructor. 168 """ 169 h = Header(maxlinelen=maxlinelen, header_name=header_name, 170 continuation_ws=continuation_ws) 171 for s, charset in decoded_seq: 172 # None means us-ascii but we can simply pass it on to h.append() 173 if charset is not None and not isinstance(charset, Charset): 174 charset = Charset(charset) 175 h.append(s, charset) 176 return h 177 178 179 180 class Header: 181 def __init__(self, s=None, charset=None, 182 maxlinelen=None, header_name=None, 183 continuation_ws=' ', errors='strict'): 184 """Create a MIME-compliant header that can contain many character sets. 185 186 Optional s is the initial header value. If None, the initial header 187 value is not set. You can later append to the header with .append() 188 method calls. s may be a byte string or a Unicode string, but see the 189 .append() documentation for semantics. 190 191 Optional charset serves two purposes: it has the same meaning as the 192 charset argument to the .append() method. It also sets the default 193 character set for all subsequent .append() calls that omit the charset 194 argument. If charset is not provided in the constructor, the us-ascii 195 charset is used both as s's initial charset and as the default for 196 subsequent .append() calls. 197 198 The maximum line length can be specified explicitly via maxlinelen. For 199 splitting the first line to a shorter value (to account for the field 200 header which isn't included in s, e.g. `Subject') pass in the name of 201 the field in header_name. The default maxlinelen is 78 as recommended 202 by RFC 2822. 203 204 continuation_ws must be RFC 2822 compliant folding whitespace (usually 205 either a space or a hard tab) which will be prepended to continuation 206 lines. 207 208 errors is passed through to the .append() call. 209 """ 210 if charset is None: 211 charset = USASCII 212 elif not isinstance(charset, Charset): 213 charset = Charset(charset) 214 self._charset = charset 215 self._continuation_ws = continuation_ws 216 self._chunks = [] 217 if s is not None: 218 self.append(s, charset, errors) 219 if maxlinelen is None: 220 maxlinelen = MAXLINELEN 221 self._maxlinelen = maxlinelen 222 if header_name is None: 223 self._headerlen = 0 224 else: 225 # Take the separating colon and space into account. 226 self._headerlen = len(header_name) + 2 227 228 def __str__(self): 229 """Return the string value of the header.""" 230 self._normalize() 231 uchunks = [] 232 lastcs = None 233 lastspace = None 234 for string, charset in self._chunks: 235 # We must preserve spaces between encoded and non-encoded word 236 # boundaries, which means for us we need to add a space when we go 237 # from a charset to None/us-ascii, or from None/us-ascii to a 238 # charset. Only do this for the second and subsequent chunks. 239 # Don't add a space if the None/us-ascii string already has 240 # a space (trailing or leading depending on transition) 241 nextcs = charset 242 if nextcs == _charset.UNKNOWN8BIT: 243 original_bytes = string.encode('ascii', 'surrogateescape') 244 string = original_bytes.decode('ascii', 'replace') 245 if uchunks: 246 hasspace = string and self._nonctext(string[0]) 247 if lastcs not in (None, 'us-ascii'): 248 if nextcs in (None, 'us-ascii') and not hasspace: 249 uchunks.append(SPACE) 250 nextcs = None 251 elif nextcs not in (None, 'us-ascii') and not lastspace: 252 uchunks.append(SPACE) 253 lastspace = string and self._nonctext(string[-1]) 254 lastcs = nextcs 255 uchunks.append(string) 256 return EMPTYSTRING.join(uchunks) 257 258 # Rich comparison operators for equality only. BAW: does it make sense to 259 # have or explicitly disable <, <=, >, >= operators? 260 def __eq__(self, other): 261 # other may be a Header or a string. Both are fine so coerce 262 # ourselves to a unicode (of the unencoded header value), swap the 263 # args and do another comparison. 264 return other == str(self) 265 266 def __ne__(self, other): 267 return not self == other 268 269 def append(self, s, charset=None, errors='strict'): 270 """Append a string to the MIME header. 271 272 Optional charset, if given, should be a Charset instance or the name 273 of a character set (which will be converted to a Charset instance). A 274 value of None (the default) means that the charset given in the 275 constructor is used. 276 277 s may be a byte string or a Unicode string. If it is a byte string 278 (i.e. isinstance(s, str) is false), then charset is the encoding of 279 that byte string, and a UnicodeError will be raised if the string 280 cannot be decoded with that charset. If s is a Unicode string, then 281 charset is a hint specifying the character set of the characters in 282 the string. In either case, when producing an RFC 2822 compliant 283 header using RFC 2047 rules, the string will be encoded using the 284 output codec of the charset. If the string cannot be encoded to the 285 output codec, a UnicodeError will be raised. 286 287 Optional `errors' is passed as the errors argument to the decode 288 call if s is a byte string. 289 """ 290 if charset is None: 291 charset = self._charset 292 elif not isinstance(charset, Charset): 293 charset = Charset(charset) 294 if not isinstance(s, str): 295 input_charset = charset.input_codec or 'us-ascii' 296 if input_charset == _charset.UNKNOWN8BIT: 297 s = s.decode('us-ascii', 'surrogateescape') 298 else: 299 s = s.decode(input_charset, errors) 300 # Ensure that the bytes we're storing can be decoded to the output 301 # character set, otherwise an early error is raised. 302 output_charset = charset.output_codec or 'us-ascii' 303 if output_charset != _charset.UNKNOWN8BIT: 304 try: 305 s.encode(output_charset, errors) 306 except UnicodeEncodeError: 307 if output_charset!='us-ascii': 308 raise 309 charset = UTF8 310 self._chunks.append((s, charset)) 311 312 def _nonctext(self, s): 313 """True if string s is not a ctext character of RFC822. 314 """ 315 return s.isspace() or s in ('(', ')', '\\') 316 317 def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'): 318 r"""Encode a message header into an RFC-compliant format. 319 320 There are many issues involved in converting a given string for use in 321 an email header. Only certain character sets are readable in most 322 email clients, and as header strings can only contain a subset of 323 7-bit ASCII, care must be taken to properly convert and encode (with 324 Base64 or quoted-printable) header strings. In addition, there is a 325 75-character length limit on any given encoded header field, so 326 line-wrapping must be performed, even with double-byte character sets. 327 328 Optional maxlinelen specifies the maximum length of each generated 329 line, exclusive of the linesep string. Individual lines may be longer 330 than maxlinelen if a folding point cannot be found. The first line 331 will be shorter by the length of the header name plus ": " if a header 332 name was specified at Header construction time. The default value for 333 maxlinelen is determined at header construction time. 334 335 Optional splitchars is a string containing characters which should be 336 given extra weight by the splitting algorithm during normal header 337 wrapping. This is in very rough support of RFC 2822's `higher level 338 syntactic breaks': split points preceded by a splitchar are preferred 339 during line splitting, with the characters preferred in the order in 340 which they appear in the string. Space and tab may be included in the 341 string to indicate whether preference should be given to one over the 342 other as a split point when other split chars do not appear in the line 343 being split. Splitchars does not affect RFC 2047 encoded lines. 344 345 Optional linesep is a string to be used to separate the lines of 346 the value. The default value is the most useful for typical 347 Python applications, but it can be set to \r\n to produce RFC-compliant 348 line separators when needed. 349 """ 350 self._normalize() 351 if maxlinelen is None: 352 maxlinelen = self._maxlinelen 353 # A maxlinelen of 0 means don't wrap. For all practical purposes, 354 # choosing a huge number here accomplishes that and makes the 355 # _ValueFormatter algorithm much simpler. 356 if maxlinelen == 0: 357 maxlinelen = 1000000 358 formatter = _ValueFormatter(self._headerlen, maxlinelen, 359 self._continuation_ws, splitchars) 360 lastcs = None 361 hasspace = lastspace = None 362 for string, charset in self._chunks: 363 if hasspace is not None: 364 hasspace = string and self._nonctext(string[0]) 365 import sys 366 if lastcs not in (None, 'us-ascii'): 367 if not hasspace or charset not in (None, 'us-ascii'): 368 formatter.add_transition() 369 elif charset not in (None, 'us-ascii') and not lastspace: 370 formatter.add_transition() 371 lastspace = string and self._nonctext(string[-1]) 372 lastcs = charset 373 hasspace = False 374 lines = string.splitlines() 375 if lines: 376 formatter.feed('', lines[0], charset) 377 else: 378 formatter.feed('', '', charset) 379 for line in lines[1:]: 380 formatter.newline() 381 if charset.header_encoding is not None: 382 formatter.feed(self._continuation_ws, ' ' + line.lstrip(), 383 charset) 384 else: 385 sline = line.lstrip() 386 fws = line[:len(line)-len(sline)] 387 formatter.feed(fws, sline, charset) 388 if len(lines) > 1: 389 formatter.newline() 390 if self._chunks: 391 formatter.add_transition() 392 value = formatter._str(linesep) 393 if _embeded_header.search(value): 394 raise HeaderParseError("header value appears to contain " 395 "an embedded header: {!r}".format(value)) 396 return value 397 398 def _normalize(self): 399 # Step 1: Normalize the chunks so that all runs of identical charsets 400 # get collapsed into a single unicode string. 401 chunks = [] 402 last_charset = None 403 last_chunk = [] 404 for string, charset in self._chunks: 405 if charset == last_charset: 406 last_chunk.append(string) 407 else: 408 if last_charset is not None: 409 chunks.append((SPACE.join(last_chunk), last_charset)) 410 last_chunk = [string] 411 last_charset = charset 412 if last_chunk: 413 chunks.append((SPACE.join(last_chunk), last_charset)) 414 self._chunks = chunks 415 416 417 418 class _ValueFormatter: 419 def __init__(self, headerlen, maxlen, continuation_ws, splitchars): 420 self._maxlen = maxlen 421 self._continuation_ws = continuation_ws 422 self._continuation_ws_len = len(continuation_ws) 423 self._splitchars = splitchars 424 self._lines = [] 425 self._current_line = _Accumulator(headerlen) 426 427 def _str(self, linesep): 428 self.newline() 429 return linesep.join(self._lines) 430 431 def __str__(self): 432 return self._str(NL) 433 434 def newline(self): 435 end_of_line = self._current_line.pop() 436 if end_of_line != (' ', ''): 437 self._current_line.push(*end_of_line) 438 if len(self._current_line) > 0: 439 if self._current_line.is_onlyws(): 440 self._lines[-1] += str(self._current_line) 441 else: 442 self._lines.append(str(self._current_line)) 443 self._current_line.reset() 444 445 def add_transition(self): 446 self._current_line.push(' ', '') 447 448 def feed(self, fws, string, charset): 449 # If the charset has no header encoding (i.e. it is an ASCII encoding) 450 # then we must split the header at the "highest level syntactic break" 451 # possible. Note that we don't have a lot of smarts about field 452 # syntax; we just try to break on semi-colons, then commas, then 453 # whitespace. Eventually, this should be pluggable. 454 if charset.header_encoding is None: 455 self._ascii_split(fws, string, self._splitchars) 456 return 457 # Otherwise, we're doing either a Base64 or a quoted-printable 458 # encoding which means we don't need to split the line on syntactic 459 # breaks. We can basically just find enough characters to fit on the 460 # current line, minus the RFC 2047 chrome. What makes this trickier 461 # though is that we have to split at octet boundaries, not character 462 # boundaries but it's only safe to split at character boundaries so at 463 # best we can only get close. 464 encoded_lines = charset.header_encode_lines(string, self._maxlengths()) 465 # The first element extends the current line, but if it's None then 466 # nothing more fit on the current line so start a new line. 467 try: 468 first_line = encoded_lines.pop(0) 469 except IndexError: 470 # There are no encoded lines, so we're done. 471 return 472 if first_line is not None: 473 self._append_chunk(fws, first_line) 474 try: 475 last_line = encoded_lines.pop() 476 except IndexError: 477 # There was only one line. 478 return 479 self.newline() 480 self._current_line.push(self._continuation_ws, last_line) 481 # Everything else are full lines in themselves. 482 for line in encoded_lines: 483 self._lines.append(self._continuation_ws + line) 484 485 def _maxlengths(self): 486 # The first line's length. 487 yield self._maxlen - len(self._current_line) 488 while True: 489 yield self._maxlen - self._continuation_ws_len 490 491 def _ascii_split(self, fws, string, splitchars): 492 # The RFC 2822 header folding algorithm is simple in principle but 493 # complex in practice. Lines may be folded any place where "folding 494 # white space" appears by inserting a linesep character in front of the 495 # FWS. The complication is that not all spaces or tabs qualify as FWS, 496 # and we are also supposed to prefer to break at "higher level 497 # syntactic breaks". We can't do either of these without intimate 498 # knowledge of the structure of structured headers, which we don't have 499 # here. So the best we can do here is prefer to break at the specified 500 # splitchars, and hope that we don't choose any spaces or tabs that 501 # aren't legal FWS. (This is at least better than the old algorithm, 502 # where we would sometimes *introduce* FWS after a splitchar, or the 503 # algorithm before that, where we would turn all white space runs into 504 # single spaces or tabs.) 505 parts = re.split("(["+FWS+"]+)", fws+string) 506 if parts[0]: 507 parts[:0] = [''] 508 else: 509 parts.pop(0) 510 for fws, part in zip(*[iter(parts)]*2): 511 self._append_chunk(fws, part) 512 513 def _append_chunk(self, fws, string): 514 self._current_line.push(fws, string) 515 if len(self._current_line) > self._maxlen: 516 # Find the best split point, working backward from the end. 517 # There might be none, on a long first line. 518 for ch in self._splitchars: 519 for i in range(self._current_line.part_count()-1, 0, -1): 520 if ch.isspace(): 521 fws = self._current_line[i][0] 522 if fws and fws[0]==ch: 523 break 524 prevpart = self._current_line[i-1][1] 525 if prevpart and prevpart[-1]==ch: 526 break 527 else: 528 continue 529 break 530 else: 531 fws, part = self._current_line.pop() 532 if self._current_line._initial_size > 0: 533 # There will be a header, so leave it on a line by itself. 534 self.newline() 535 if not fws: 536 # We don't use continuation_ws here because the whitespace 537 # after a header should always be a space. 538 fws = ' ' 539 self._current_line.push(fws, part) 540 return 541 remainder = self._current_line.pop_from(i) 542 self._lines.append(str(self._current_line)) 543 self._current_line.reset(remainder) 544 545 546 class _Accumulator(list): 547 548 def __init__(self, initial_size=0): 549 self._initial_size = initial_size 550 super().__init__() 551 552 def push(self, fws, string): 553 self.append((fws, string)) 554 555 def pop_from(self, i=0): 556 popped = self[i:] 557 self[i:] = [] 558 return popped 559 560 def pop(self): 561 if self.part_count()==0: 562 return ('', '') 563 return super().pop() 564 565 def __len__(self): 566 return sum((len(fws)+len(part) for fws, part in self), 567 self._initial_size) 568 569 def __str__(self): 570 return EMPTYSTRING.join((EMPTYSTRING.join((fws, part)) 571 for fws, part in self)) 572 573 def reset(self, startval=None): 574 if startval is None: 575 startval = [] 576 self[:] = startval 577 self._initial_size = 0 578 579 def is_onlyws(self): 580 return self._initial_size==0 and (not self or str(self).isspace()) 581 582 def part_count(self): 583 return super().__len__() |