Skip to content

Potential Quadratic Complexity Vulnerabilities in the email Module #136063

@kexinoh

Description

@kexinoh

Bug Description:
A series of simple quadratic complexity vulnerabilities has been identified in the email package. After confirmation by CPython's security team, these low-threat DOS vulnerabilities can be fixed with community assistance.

Vulnerability Locations (All Fixed):

  1. def _parseparam(s):

    2.
    def get_phrase(value):

    3.
    while value and (value[0]=='\\' or value[0] not in PHRASE_ENDS):

    4.
    value = value[1:]

    5.
    value = value[1:]

    6.
    value = value[1:]

    7.
    value = value[1:]

    8.
    value = value[1:]

    9.
    value = value[1:]

    10.
    value = value[1:]

    11.
    value = value[1:]

    12.
    value = value[1:]

    13.
    value = value[1:]

    14.
    to_encode = to_encode[1:]

Below are the newly identified DoS risk points in the email module, added on 2026/01/23, updated from #144133:

  1. def _refold_parse_tree(parse_tree, *, policy):
  2. def _fold_mime_parameters(part, lines, maxlen, encoding):
  3. def set_boundary(self, boundary):
  4. def __add__(self, other):
  5. def __isub__(self, other):
  6. def all_mailboxes(self):
  7. def mailboxes(self):

more:

TokenList.all_defects:

@property
def all_defects(self):
return sum((x.all_defects for x in self), self.defects)

AddressList.mailboxes:
@property
def mailboxes(self):
return sum((x.mailboxes
for x in self if x.token_type=='address'), [])

get_encoded_word:
def get_encoded_word(value, terminal_type='vtext'):
""" encoded-word = "=?" charset "?" encoding "?" encoded-text "?="
"""
ew = EncodedWord()
if not value.startswith('=?'):
raise errors.HeaderParseError(
"expected encoded word but found {}".format(value))
tok, *remainder = value[2:].split('?=', 1)
if tok == value[2:]:
raise errors.HeaderParseError(
"expected encoded word but found {}".format(value))
remstr = ''.join(remainder)
if (len(remstr) > 1 and
remstr[0] in hexdigits and
remstr[1] in hexdigits and
tok.count('?') < 2):
# The ? after the CTE was followed by an encoded word escape (=XX).
rest, *remainder = remstr.split('?=', 1)
tok = tok + '?=' + rest
if len(tok.split()) > 1:
ew.defects.append(errors.InvalidHeaderDefect(
"whitespace inside encoded word"))
ew.cte = value
value = ''.join(remainder)
try:
text, charset, lang, defects = _ew.decode('=?' + tok + '?=')
except (ValueError, KeyError):
raise _InvalidEwError(
"encoded word format invalid: '{}'".format(ew.cte))
ew.charset = charset
ew.lang = lang
ew.defects.extend(defects)
while text:
if text[0] in WSP:
token, text = get_fws(text)
ew.append(token)
continue
chars, *remainder = _wsp_splitter(text, 1)
vtext = ValueTerminal(chars, terminal_type)
_validate_xtext(vtext)
ew.append(vtext)
text = ''.join(remainder)
# Encoded words should be followed by a WS
if value and value[0] not in WSP:
ew.defects.append(errors.InvalidHeaderDefect(
"missing trailing whitespace after encoded-word"))
return ew, value

get_unstructured:
def get_unstructured(value):
"""unstructured = (*([FWS] vchar) *WSP) / obs-unstruct
obs-unstruct = *((*LF *CR *(obs-utext) *LF *CR)) / FWS)
obs-utext = %d0 / obs-NO-WS-CTL / LF / CR
obs-NO-WS-CTL is control characters except WSP/CR/LF.
So, basically, we have printable runs, plus control characters or nulls in
the obsolete syntax, separated by whitespace. Since RFC 2047 uses the
obsolete syntax in its specification, but requires whitespace on either
side of the encoded words, I can see no reason to need to separate the
non-printable-non-whitespace from the printable runs if they occur, so we
parse this into xtext tokens separated by WSP tokens.
Because an 'unstructured' value must by definition constitute the entire
value, this 'get' routine does not return a remaining value, only the
parsed TokenList.
"""
# XXX: but what about bare CR and LF? They might signal the start or
# end of an encoded word. YAGNI for now, since our current parsers
# will never send us strings with bare CR or LF.
unstructured = UnstructuredTokenList()
while value:
if value[0] in WSP:
token, value = get_fws(value)
unstructured.append(token)
continue
valid_ew = True
if value.startswith('=?'):
try:
token, value = get_encoded_word(value, 'utext')
except _InvalidEwError:
valid_ew = False
except errors.HeaderParseError:
# XXX: Need to figure out how to register defects when
# appropriate here.
pass
else:
have_ws = True
if len(unstructured) > 0:
if unstructured[-1].token_type != 'fws':
unstructured.defects.append(errors.InvalidHeaderDefect(
"missing whitespace before encoded word"))
have_ws = False
if have_ws and len(unstructured) > 1:
if unstructured[-2].token_type == 'encoded-word':
unstructured[-1] = EWWhiteSpaceTerminal(
unstructured[-1], 'fws')
unstructured.append(token)
continue
tok, *remainder = _wsp_splitter(value, 1)
# Split in the middle of an atom if there is a rfc2047 encoded word
# which does not have WSP on both sides. The defect will be registered
# the next time through the loop.
# This needs to only be performed when the encoded word is valid;
# otherwise, performing it on an invalid encoded word can cause
# the parser to go in an infinite loop.
if valid_ew and rfc2047_matcher.search(tok):
tok, *remainder = value.partition('=?')
vtext = ValueTerminal(tok, 'utext')
_validate_xtext(vtext)
unstructured.append(vtext)
value = ''.join(remainder)
return unstructured

get_bare_quoted_string:
def get_bare_quoted_string(value):
"""bare-quoted-string = DQUOTE *([FWS] qcontent) [FWS] DQUOTE
A quoted-string without the leading or trailing white space. Its
value is the text between the quote marks, with whitespace
preserved and quoted pairs decoded.
"""
if not value or value[0] != '"':
raise errors.HeaderParseError(
"expected '\"' but found '{}'".format(value))
bare_quoted_string = BareQuotedString()
value = value[1:]
if value and value[0] == '"':
token, value = get_qcontent(value)
bare_quoted_string.append(token)
while value and value[0] != '"':
if value[0] in WSP:
token, value = get_fws(value)
elif value[:2] == '=?':
valid_ew = False
try:
token, value = get_encoded_word(value)
bare_quoted_string.defects.append(errors.InvalidHeaderDefect(
"encoded word inside quoted string"))
valid_ew = True
except errors.HeaderParseError:
token, value = get_qcontent(value)
# Collapse the whitespace between two encoded words that occur in a
# bare-quoted-string.
if valid_ew and len(bare_quoted_string) > 1:
if (bare_quoted_string[-1].token_type == 'fws' and
bare_quoted_string[-2].token_type == 'encoded-word'):
bare_quoted_string[-1] = EWWhiteSpaceTerminal(
bare_quoted_string[-1], 'fws')
else:
token, value = get_qcontent(value)
bare_quoted_string.append(token)
if not value:
bare_quoted_string.defects.append(errors.InvalidHeaderDefect(
"end of header inside quoted string"))
return bare_quoted_string, value
return bare_quoted_string, value[1:]

get_comment:
def get_comment(value):
"""comment = "(" *([FWS] ccontent) [FWS] ")"
ccontent = ctext / quoted-pair / comment
We handle nested comments here, and quoted-pair in our qp-ctext routine.
"""
if value and value[0] != '(':
raise errors.HeaderParseError(
"expected '(' but found '{}'".format(value))
comment = Comment()
value = value[1:]
while value and value[0] != ")":
if value[0] in WSP:
token, value = get_fws(value)
elif value[0] == '(':
token, value = get_comment(value)
else:
token, value = get_qp_ctext(value)
comment.append(token)
if not value:
comment.defects.append(errors.InvalidHeaderDefect(
"end of header inside comment"))
return comment, value
return comment, value[1:]

get_dot_atom_text:
def get_dot_atom_text(value):
""" dot-text = 1*atext *("." 1*atext)
"""
dot_atom_text = DotAtomText()
if not value or value[0] in ATOM_ENDS:
raise errors.HeaderParseError("expected atom at a start of "
"dot-atom-text but found '{}'".format(value))
while value and value[0] not in ATOM_ENDS:
token, value = get_atext(value)
dot_atom_text.append(token)
if value and value[0] == '.':
dot_atom_text.append(DOT)
value = value[1:]
if dot_atom_text[-1] is DOT:
raise errors.HeaderParseError("expected atom at end of dot-atom-text "
"but found '{}'".format('.'+value))
return dot_atom_text, value

get_dot_atom:
def get_dot_atom(value):
""" dot-atom = [CFWS] dot-atom-text [CFWS]
Any place we can have a dot atom, we could instead have an rfc2047 encoded
word.
"""
dot_atom = DotAtom()
if value[0] in CFWS_LEADER:
token, value = get_cfws(value)
dot_atom.append(token)
if value.startswith('=?'):
try:
token, value = get_encoded_word(value)
except errors.HeaderParseError:
# XXX: need to figure out how to register defects when
# appropriate here.
token, value = get_dot_atom_text(value)
else:
token, value = get_dot_atom_text(value)
dot_atom.append(token)
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
dot_atom.append(token)
return dot_atom, value

get_word:
def get_word(value):
"""word = atom / quoted-string
Either atom or quoted-string may start with CFWS. We have to peel off this
CFWS first to determine which type of word to parse. Afterward we splice
the leading CFWS, if any, into the parsed sub-token.
If neither an atom or a quoted-string is found before the next special, a
HeaderParseError is raised.
The token returned is either an Atom or a QuotedString, as appropriate.
This means the 'word' level of the formal grammar is not represented in the
parse tree; this is because having that extra layer when manipulating the
parse tree is more confusing than it is helpful.
"""
if value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
else:
leader = None
if not value:
raise errors.HeaderParseError(
"Expected 'atom' or 'quoted-string' but found nothing.")
if value[0]=='"':
token, value = get_quoted_string(value)
elif value[0] in SPECIALS:
raise errors.HeaderParseError("Expected 'atom' or 'quoted-string' "
"but found '{}'".format(value))
else:
token, value = get_atom(value)
if leader is not None:
token[:0] = [leader]
return token, value

get_local_part:
def get_local_part(value):
""" local-part = dot-atom / quoted-string / obs-local-part
"""
local_part = LocalPart()
leader = None
if value and value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value:
raise errors.HeaderParseError(
"expected local-part but found '{}'".format(value))
try:
token, value = get_dot_atom(value)
except errors.HeaderParseError:
try:
token, value = get_word(value)
except errors.HeaderParseError:
if value[0] != '\\' and value[0] in PHRASE_ENDS:
raise
token = TokenList()
if leader is not None:
token[:0] = [leader]
local_part.append(token)
if value and (value[0]=='\\' or value[0] not in PHRASE_ENDS):
obs_local_part, value = get_obs_local_part(str(local_part) + value)
if obs_local_part.token_type == 'invalid-obs-local-part':
local_part.defects.append(errors.InvalidHeaderDefect(
"local-part is not dot-atom, quoted-string, or obs-local-part"))
else:
local_part.defects.append(errors.ObsoleteHeaderDefect(
"local-part is not a dot-atom (contains CFWS)"))
local_part[0] = obs_local_part
try:
local_part.value.encode('ascii')
except UnicodeEncodeError:
local_part.defects.append(errors.NonASCIILocalPartDefect(
"local-part contains non-ASCII characters)"))
return local_part, value

get_domain:
def get_domain(value):
""" domain = dot-atom / domain-literal / obs-domain
obs-domain = atom *("." atom))
"""
domain = Domain()
leader = None
if value and value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value:
raise errors.HeaderParseError(
"expected domain but found '{}'".format(value))
if value[0] == '[':
token, value = get_domain_literal(value)
if leader is not None:
token[:0] = [leader]
domain.append(token)
return domain, value
try:
token, value = get_dot_atom(value)
except errors.HeaderParseError:
token, value = get_atom(value)
if value and value[0] == '@':
raise errors.HeaderParseError('Invalid Domain')
if leader is not None:
token[:0] = [leader]
domain.append(token)
if value and value[0] == '.':
domain.defects.append(errors.ObsoleteHeaderDefect(
"domain is not a dot-atom (contains CFWS)"))
if domain[0].token_type == 'dot-atom':
domain[:] = domain[0]
while value and value[0] == '.':
domain.append(DOT)
token, value = get_atom(value[1:])
domain.append(token)
return domain, value

get_addr_spec:
def get_addr_spec(value):
""" addr-spec = local-part "@" domain
"""
addr_spec = AddrSpec()
token, value = get_local_part(value)
addr_spec.append(token)
if not value or value[0] != '@':
addr_spec.defects.append(errors.InvalidHeaderDefect(
"addr-spec local part with no domain"))
return addr_spec, value
addr_spec.append(ValueTerminal('@', 'address-at-symbol'))
token, value = get_domain(value[1:])
addr_spec.append(token)
return addr_spec, value

get_angle_addr:
def get_angle_addr(value):
""" angle-addr = [CFWS] "<" addr-spec ">" [CFWS] / obs-angle-addr
obs-angle-addr = [CFWS] "<" obs-route addr-spec ">" [CFWS]
"""
angle_addr = AngleAddr()
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
angle_addr.append(token)
if not value or value[0] != '<':
raise errors.HeaderParseError(
"expected angle-addr but found '{}'".format(value))
angle_addr.append(ValueTerminal('<', 'angle-addr-start'))
value = value[1:]
# Although it is not legal per RFC5322, SMTP uses '<>' in certain
# circumstances.
if value and value[0] == '>':
angle_addr.append(ValueTerminal('>', 'angle-addr-end'))
angle_addr.defects.append(errors.InvalidHeaderDefect(
"null addr-spec in angle-addr"))
value = value[1:]
return angle_addr, value
try:
token, value = get_addr_spec(value)
except errors.HeaderParseError:
try:
token, value = get_obs_route(value)
angle_addr.defects.append(errors.ObsoleteHeaderDefect(
"obsolete route specification in angle-addr"))
except errors.HeaderParseError:
raise errors.HeaderParseError(
"expected addr-spec or obs-route but found '{}'".format(value))
angle_addr.append(token)
token, value = get_addr_spec(value)
angle_addr.append(token)
if value and value[0] == '>':
value = value[1:]
else:
angle_addr.defects.append(errors.InvalidHeaderDefect(
"missing trailing '>' on angle-addr"))
angle_addr.append(ValueTerminal('>', 'angle-addr-end'))
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
angle_addr.append(token)
return angle_addr, value

get_display_name:
def get_display_name(value):
""" display-name = phrase
Because this is simply a name-rule, we don't return a display-name
token containing a phrase, but rather a display-name token with
the content of the phrase.
"""
display_name = DisplayName()
token, value = get_phrase(value)
display_name.extend(token[:])
display_name.defects = token.defects[:]
return display_name, value

get_name_addr:
def get_name_addr(value):
""" name-addr = [display-name] angle-addr
"""
name_addr = NameAddr()
# Both the optional display name and the angle-addr can start with cfws.
leader = None
if not value:
raise errors.HeaderParseError(
"expected name-addr but found '{}'".format(value))
if value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value:
raise errors.HeaderParseError(
"expected name-addr but found '{}'".format(leader))
if value[0] != '<':
if value[0] in PHRASE_ENDS:
raise errors.HeaderParseError(
"expected name-addr but found '{}'".format(value))
token, value = get_display_name(value)
if not value:
raise errors.HeaderParseError(
"expected name-addr but found '{}'".format(token))
if leader is not None:
if isinstance(token[0], TokenList):
token[0][:0] = [leader]
else:
token[:0] = [leader]
leader = None
name_addr.append(token)
token, value = get_angle_addr(value)
if leader is not None:
token[:0] = [leader]
name_addr.append(token)
return name_addr, value

get_mailbox:
def get_mailbox(value):
""" mailbox = name-addr / addr-spec
"""
# The only way to figure out if we are dealing with a name-addr or an
# addr-spec is to try parsing each one.
mailbox = Mailbox()
try:
token, value = get_name_addr(value)
except errors.HeaderParseError:
try:
token, value = get_addr_spec(value)
except errors.HeaderParseError:
raise errors.HeaderParseError(
"expected mailbox but found '{}'".format(value))
if any(isinstance(x, errors.InvalidHeaderDefect)
for x in token.all_defects):
mailbox.token_type = 'invalid-mailbox'
mailbox.append(token)
return mailbox, value

get_mailbox_list:
def get_mailbox_list(value):
""" mailbox-list = (mailbox *("," mailbox)) / obs-mbox-list
obs-mbox-list = *([CFWS] ",") mailbox *("," [mailbox / CFWS])
For this routine we go outside the formal grammar in order to improve error
handling. We recognize the end of the mailbox list only at the end of the
value or at a ';' (the group terminator). This is so that we can turn
invalid mailboxes into InvalidMailbox tokens and continue parsing any
remaining valid mailboxes. We also allow all mailbox entries to be null,
and this condition is handled appropriately at a higher level.
"""
mailbox_list = MailboxList()
while value and value[0] != ';':
try:
token, value = get_mailbox(value)
mailbox_list.append(token)
except errors.HeaderParseError:
leader = None
if value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value or value[0] in ',;':
mailbox_list.append(leader)
mailbox_list.defects.append(errors.ObsoleteHeaderDefect(
"empty element in mailbox-list"))
else:
token, value = get_invalid_mailbox(value, ',;')
if leader is not None:
token[:0] = [leader]
mailbox_list.append(token)
mailbox_list.defects.append(errors.InvalidHeaderDefect(
"invalid mailbox in mailbox-list"))
elif value[0] == ',':
mailbox_list.defects.append(errors.ObsoleteHeaderDefect(
"empty element in mailbox-list"))
else:
token, value = get_invalid_mailbox(value, ',;')
if leader is not None:
token[:0] = [leader]
mailbox_list.append(token)
mailbox_list.defects.append(errors.InvalidHeaderDefect(
"invalid mailbox in mailbox-list"))
if value and value[0] not in ',;':
# Crap after mailbox; treat it as an invalid mailbox.
# The mailbox info will still be available.
mailbox = mailbox_list[-1]
mailbox.token_type = 'invalid-mailbox'
token, value = get_invalid_mailbox(value, ',;')
mailbox.extend(token)
mailbox_list.defects.append(errors.InvalidHeaderDefect(
"invalid mailbox in mailbox-list"))
if value and value[0] == ',':
mailbox_list.append(ListSeparator)
value = value[1:]
return mailbox_list, value

get_group_list:
def get_group_list(value):
""" group-list = mailbox-list / CFWS / obs-group-list
obs-group-list = 1*([CFWS] ",") [CFWS]
"""
group_list = GroupList()
if not value:
group_list.defects.append(errors.InvalidHeaderDefect(
"end of header before group-list"))
return group_list, value
leader = None
if value and value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value:
# This should never happen in email parsing, since CFWS-only is a
# legal alternative to group-list in a group, which is the only
# place group-list appears.
group_list.defects.append(errors.InvalidHeaderDefect(
"end of header in group-list"))
group_list.append(leader)
return group_list, value
if value[0] == ';':
group_list.append(leader)
return group_list, value
token, value = get_mailbox_list(value)
if len(token.all_mailboxes)==0:
if leader is not None:
group_list.append(leader)
group_list.extend(token)
group_list.defects.append(errors.ObsoleteHeaderDefect(
"group-list with empty entries"))
return group_list, value
if leader is not None:
token[:0] = [leader]
group_list.append(token)
return group_list, value

get_group:
def get_group(value):
""" group = display-name ":" [group-list] ";" [CFWS]
"""
group = Group()
token, value = get_display_name(value)
if not value or value[0] != ':':
raise errors.HeaderParseError("expected ':' at end of group "
"display name but found '{}'".format(value))
group.append(token)
group.append(ValueTerminal(':', 'group-display-name-terminator'))
value = value[1:]
if value and value[0] == ';':
group.append(ValueTerminal(';', 'group-terminator'))
return group, value[1:]
token, value = get_group_list(value)
group.append(token)
if not value:
group.defects.append(errors.InvalidHeaderDefect(
"end of header in group"))
elif value[0] != ';':
raise errors.HeaderParseError(
"expected ';' at end of group but found {}".format(value))
group.append(ValueTerminal(';', 'group-terminator'))
value = value[1:]
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
group.append(token)
return group, value

get_address:
def get_address(value):
""" address = mailbox / group
Note that counter-intuitively, an address can be either a single address or
a list of addresses (a group). This is why the returned Address object has
a 'mailboxes' attribute which treats a single address as a list of length
one. When you need to differentiate between to two cases, extract the single
element, which is either a mailbox or a group token.
"""
# The formal grammar isn't very helpful when parsing an address. mailbox
# and group, especially when allowing for obsolete forms, start off very
# similarly. It is only when you reach one of @, <, or : that you know
# what you've got. So, we try each one in turn, starting with the more
# likely of the two. We could perhaps make this more efficient by looking
# for a phrase and then branching based on the next character, but that
# would be a premature optimization.
address = Address()
try:
token, value = get_group(value)
except errors.HeaderParseError:
try:
token, value = get_mailbox(value)
except errors.HeaderParseError:
raise errors.HeaderParseError(
"expected address but found '{}'".format(value))
address.append(token)
return address, value

get_address_list:
def get_address_list(value):
""" address_list = (address *("," address)) / obs-addr-list
obs-addr-list = *([CFWS] ",") address *("," [address / CFWS])
We depart from the formal grammar here by continuing to parse until the end
of the input, assuming the input to be entirely composed of an
address-list. This is always true in email parsing, and allows us
to skip invalid addresses to parse additional valid ones.
"""
address_list = AddressList()
while value:
try:
token, value = get_address(value)
address_list.append(token)
except errors.HeaderParseError:
leader = None
if value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value or value[0] == ',':
address_list.append(leader)
address_list.defects.append(errors.ObsoleteHeaderDefect(
"address-list entry with no content"))
else:
token, value = get_invalid_mailbox(value, ',')
if leader is not None:
token[:0] = [leader]
address_list.append(Address([token]))
address_list.defects.append(errors.InvalidHeaderDefect(
"invalid address in address-list"))
elif value[0] == ',':
address_list.defects.append(errors.ObsoleteHeaderDefect(
"empty element in address-list"))
else:
token, value = get_invalid_mailbox(value, ',')
if leader is not None:
token[:0] = [leader]
address_list.append(Address([token]))
address_list.defects.append(errors.InvalidHeaderDefect(
"invalid address in address-list"))
if value and value[0] != ',':
# Crap after address; treat it as an invalid mailbox.
# The mailbox info will still be available.
mailbox = address_list[-1][0]
mailbox.token_type = 'invalid-mailbox'
token, value = get_invalid_mailbox(value, ',')
mailbox.extend(token)
address_list.defects.append(errors.InvalidHeaderDefect(
"invalid address in address-list"))
if value: # Must be a , at this point.
address_list.append(ListSeparator)
value = value[1:]
return address_list, value

get_msg_id:
def get_msg_id(value):
"""msg-id = [CFWS] "<" id-left '@' id-right ">" [CFWS]
id-left = dot-atom-text / obs-id-left
id-right = dot-atom-text / no-fold-literal / obs-id-right
no-fold-literal = "[" *dtext "]"
"""
msg_id = MsgID()
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
msg_id.append(token)
if not value or value[0] != '<':
raise errors.HeaderParseError(
"expected msg-id but found '{}'".format(value))
msg_id.append(ValueTerminal('<', 'msg-id-start'))
value = value[1:]
# Parse id-left.
try:
token, value = get_dot_atom_text(value)
except errors.HeaderParseError:
try:
# obs-id-left is same as local-part of add-spec.
token, value = get_obs_local_part(value)
msg_id.defects.append(errors.ObsoleteHeaderDefect(
"obsolete id-left in msg-id"))
except errors.HeaderParseError:
raise errors.HeaderParseError(
"expected dot-atom-text or obs-id-left"
" but found '{}'".format(value))
msg_id.append(token)
if not value or value[0] != '@':
msg_id.defects.append(errors.InvalidHeaderDefect(
"msg-id with no id-right"))
# Even though there is no id-right, if the local part
# ends with `>` let's just parse it too and return
# along with the defect.
if value and value[0] == '>':
msg_id.append(ValueTerminal('>', 'msg-id-end'))
value = value[1:]
return msg_id, value
msg_id.append(ValueTerminal('@', 'address-at-symbol'))
value = value[1:]
# Parse id-right.
try:
token, value = get_dot_atom_text(value)
except errors.HeaderParseError:
try:
token, value = get_no_fold_literal(value)
except errors.HeaderParseError:
try:
token, value = get_domain(value)
msg_id.defects.append(errors.ObsoleteHeaderDefect(
"obsolete id-right in msg-id"))
except errors.HeaderParseError:
raise errors.HeaderParseError(
"expected dot-atom-text, no-fold-literal or obs-id-right"
" but found '{}'".format(value))
msg_id.append(token)
if value and value[0] == '>':
value = value[1:]
else:
msg_id.defects.append(errors.InvalidHeaderDefect(
"missing trailing '>' on msg-id"))
msg_id.append(ValueTerminal('>', 'msg-id-end'))
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
msg_id.append(token)
return msg_id, value

parse_message_id:
def parse_message_id(value):
"""message-id = "Message-ID:" msg-id CRLF
"""
message_id = MessageID()
try:
token, value = get_msg_id(value)
message_id.append(token)
except errors.HeaderParseError as ex:
token = get_unstructured(value)
message_id = InvalidMessageID(token)
message_id.defects.append(
errors.InvalidHeaderDefect("Invalid msg-id: {!r}".format(ex)))
else:
# Value after parsing a valid msg_id should be None.
if value:
message_id.defects.append(errors.InvalidHeaderDefect(
"Unexpected {!r}".format(value)))
return message_id

parse_mime_parameters:
def parse_mime_parameters(value):
""" parameter *( ";" parameter )
That BNF is meant to indicate this routine should only be called after
finding and handling the leading ';'. There is no corresponding rule in
the formal RFC grammar, but it is more convenient for us for the set of
parameters to be treated as its own TokenList.
This is 'parse' routine because it consumes the remaining value, but it
would never be called to parse a full header. Instead it is called to
parse everything after the non-parameter value of a specific MIME header.
"""
mime_parameters = MimeParameters()
while value:
try:
token, value = get_parameter(value)
mime_parameters.append(token)
except errors.HeaderParseError:
leader = None
if value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value:
mime_parameters.append(leader)
return mime_parameters
if value[0] == ';':
if leader is not None:
mime_parameters.append(leader)
mime_parameters.defects.append(errors.InvalidHeaderDefect(
"parameter entry with no content"))
else:
token, value = get_invalid_parameter(value)
if leader:
token[:0] = [leader]
mime_parameters.append(token)
mime_parameters.defects.append(errors.InvalidHeaderDefect(
"invalid parameter {!r}".format(token)))
if value and value[0] != ';':
# Junk after the otherwise valid parameter. Mark it as
# invalid, but it will have a value.
param = mime_parameters[-1]
param.token_type = 'invalid-parameter'
token, value = get_invalid_parameter(value)
param.extend(token)
mime_parameters.defects.append(errors.InvalidHeaderDefect(
"parameter with invalid trailing text {!r}".format(token)))
if value:
# Must be a ';' at this point.
mime_parameters.append(ValueTerminal(';', 'parameter-separator'))
value = value[1:]
return mime_parameters

AddrlistClass.getaddress:
def getaddress(self):
"""Parse the next address."""
self.commentlist = []
self.gotonext()
oldpos = self.pos
oldcl = self.commentlist
plist = self.getphraselist()
self.gotonext()
returnlist = []
if self.pos >= len(self.field):
# Bad email address technically, no domain.
if plist:
returnlist = [(SPACE.join(self.commentlist), plist[0])]
elif self.field[self.pos] in '.@':
# email address is just an addrspec
# this isn't very efficient since we start over
self.pos = oldpos
self.commentlist = oldcl
addrspec = self.getaddrspec()
returnlist = [(SPACE.join(self.commentlist), addrspec)]
elif self.field[self.pos] == ':':
# address is a group
returnlist = []
fieldlen = len(self.field)
self.pos += 1
while self.pos < len(self.field):
self.gotonext()
if self.pos < fieldlen and self.field[self.pos] == ';':
self.pos += 1
break
returnlist = returnlist + self.getaddress()
elif self.field[self.pos] == '<':
# Address is a phrase then a route addr
routeaddr = self.getrouteaddr()
if self.commentlist:
returnlist = [(SPACE.join(plist) + ' (' +
' '.join(self.commentlist) + ')', routeaddr)]
else:
returnlist = [(SPACE.join(plist), routeaddr)]
else:
if plist:
returnlist = [(SPACE.join(self.commentlist), plist[0])]
elif self.field[self.pos] in self.specials:
self.pos += 1
self.gotonext()
if self.pos < len(self.field) and self.field[self.pos] == ',':
self.pos += 1
return returnlist

Charset.header_encode_lines:
def header_encode_lines(self, string, maxlengths):
"""Header-encode a string by converting it first to bytes.
This is similar to `header_encode()` except that the string is fit
into maximum line lengths as given by the argument.
:param string: A unicode string for the header. It must be possible
to encode this string to bytes using the character set's
output codec.
:param maxlengths: Maximum line length iterator. Each element
returned from this iterator will provide the next maximum line
length. This parameter is used as an argument to built-in next()
and should never be exhausted. The maximum line lengths should
not count the RFC 2047 chrome. These line lengths are only a
hint; the splitter does the best it can.
:return: Lines of encoded strings, each with RFC 2047 chrome.
"""
# See which encoding we should use.
codec = self.output_codec or 'us-ascii'
header_bytes = _encode(string, codec)
encoder_module = self._get_encoder(header_bytes)
encoder = partial(encoder_module.header_encode, charset=codec)
# Calculate the number of characters that the RFC 2047 chrome will
# contribute to each line.
charset = self.get_output_charset()
extra = len(charset) + RFC2047_CHROME_LEN
# Now comes the hard part. We must encode bytes but we can't split on
# bytes because some character sets are variable length and each
# encoded word must stand on its own. So the problem is you have to
# encode to bytes to figure out this word's length, but you must split
# on characters. This causes two problems: first, we don't know how
# many octets a specific substring of unicode characters will get
# encoded to, and second, we don't know how many ASCII characters
# those octets will get encoded to. Unless we try it. Which seems
# inefficient. In the interest of being correct rather than fast (and
# in the hope that there will be few encoded headers in any such
# message), brute force it. :(
lines = []
current_line = []
maxlen = next(maxlengths) - extra
for character in string:
current_line.append(character)
this_line = EMPTYSTRING.join(current_line)
length = encoder_module.header_length(_encode(this_line, charset))
if length > maxlen:
# This last character doesn't fit so pop it off.
current_line.pop()
# Does nothing fit on the first line?
if not lines and not current_line:
lines.append(None)
else:
joined_line = EMPTYSTRING.join(current_line)
header_bytes = _encode(joined_line, codec)
lines.append(encoder(header_bytes))
current_line = [character]
maxlen = next(maxlengths) - extra
joined_line = EMPTYSTRING.join(current_line)
header_bytes = _encode(joined_line, codec)
lines.append(encoder(header_bytes))
return lines

decode_header:
def decode_header(header):
"""Decode a message header value without converting charset.
For historical reasons, this function may return either:
1. A list of length 1 containing a pair (str, None).
2. A list of (bytes, charset) pairs containing each of the decoded
parts of the header. Charset is None for non-encoded parts of the header,
otherwise a lower-case string containing the name of the character set
specified in the encoded string.
header may be a string that may or may not contain RFC2047 encoded words,
or it may be a Header object.
An email.errors.HeaderParseError may be raised when certain decoding error
occurs (e.g. a base64 decoding exception).
This function exists for backwards compatibility only. For new code, we
recommend using email.headerregistry.HeaderRegistry instead.
"""
# If it is a Header object, we can just return the encoded chunks.
if hasattr(header, '_chunks'):
return [(_charset._encode(string, str(charset)), str(charset))
for string, charset in header._chunks]
# If no encoding, just return the header with no charset.
if not ecre.search(header):
return [(header, None)]
# First step is to parse all the encoded parts into triplets of the form
# (encoded_string, encoding, charset). For unencoded strings, the last
# two parts will be None.
words = []
for line in header.splitlines():
parts = ecre.split(line)
first = True
while parts:
unencoded = parts.pop(0)
if first:
unencoded = unencoded.lstrip()
first = False
if unencoded:
words.append((unencoded, None, None))
if parts:
charset = parts.pop(0).lower()
encoding = parts.pop(0).lower()
encoded = parts.pop(0)
words.append((encoded, encoding, charset))
# Now loop over words and remove words that consist of whitespace
# between two encoded strings.
droplist = []
for n, w in enumerate(words):
if n>1 and w[1] and words[n-2][1] and words[n-1][0].isspace():
droplist.append(n-1)
for d in reversed(droplist):
del words[d]
# The next step is to decode each encoded word by applying the reverse
# base64 or quopri transformation. decoded_words is now a list of the
# form (decoded_word, charset).
decoded_words = []
for encoded_string, encoding, charset in words:
if encoding is None:
# This is an unencoded word.
decoded_words.append((encoded_string, charset))
elif encoding == 'q':
word = email.quoprimime.header_decode(encoded_string)
decoded_words.append((word, charset))
elif encoding == 'b':
paderr = len(encoded_string) % 4 # Postel's law: add missing padding
if paderr:
encoded_string += '==='[:4 - paderr]
try:
word = email.base64mime.decode(encoded_string)
except binascii.Error:
raise HeaderParseError('Base64 decoding error')
else:
decoded_words.append((word, charset))
else:
raise AssertionError('Unexpected encoding: ' + encoding)
# Now convert all words to bytes and collapse consecutive runs of
# similarly encoded words.
collapsed = []
last_word = last_charset = None
for word, charset in decoded_words:
if isinstance(word, str):
word = bytes(word, 'raw-unicode-escape')
if last_word is None:
last_word = word
last_charset = charset
elif charset != last_charset:
collapsed.append((last_word, last_charset))
last_word = word
last_charset = charset
elif last_charset is None:
last_word += BSPACE + word
else:
last_word += word
collapsed.append((last_word, last_charset))
return collapsed

Header._normalize:

cpython/Lib/email/header.py

Lines 398 to 414 in 6181b69

def _normalize(self):
# Step 1: Normalize the chunks so that all runs of identical charsets
# get collapsed into a single unicode string.
chunks = []
last_charset = None
last_chunk = []
for string, charset in self._chunks:
if charset == last_charset:
last_chunk.append(string)
else:
if last_charset is not None:
chunks.append((SPACE.join(last_chunk), last_charset))
last_chunk = [string]
last_charset = charset
if last_chunk:
chunks.append((SPACE.join(last_chunk), last_charset))
self._chunks = chunks

Message.set_param:
def set_param(self, param, value, header='Content-Type', requote=True,
charset=None, language='', replace=False):
"""Set a parameter in the Content-Type header.
If the parameter already exists in the header, its value will be
replaced with the new value.
If header is Content-Type and has not yet been defined for this
message, it will be set to "text/plain" and the new parameter and
value will be appended as per RFC 2045.
An alternate header can be specified in the header argument, and all
parameters will be quoted as necessary unless requote is False.
If charset is specified, the parameter will be encoded according to RFC
2231. Optional language specifies the RFC 2231 language, defaulting
to the empty string. Both charset and language should be strings.
"""
if not isinstance(value, tuple) and charset:
value = (charset, language, value)
if header not in self and header.lower() == 'content-type':
ctype = 'text/plain'
else:
ctype = self.get(header)
if not self.get_param(param, header=header):
if not ctype:
ctype = _formatparam(param, value, requote)
else:
ctype = SEMISPACE.join(
[ctype, _formatparam(param, value, requote)])
else:
ctype = ''
for old_param, old_value in self.get_params(header=header,
unquote=requote):
append_param = ''
if old_param.lower() == param.lower():
append_param = _formatparam(param, value, requote)
else:
append_param = _formatparam(old_param, old_value, requote)
if not ctype:
ctype = append_param
else:
ctype = SEMISPACE.join([ctype, append_param])
if ctype != self.get(header):
if replace:
self.replace_header(header, ctype)
else:
del self[header]
self[header] = ctype

Message.del_param:
def del_param(self, param, header='content-type', requote=True):
"""Remove the given parameter completely from the Content-Type header.
The header will be re-written in place without the parameter or its
value. All values will be quoted as necessary unless requote is
False. Optional header specifies an alternative to the Content-Type
header.
"""
if header not in self:
return
new_ctype = ''
for p, v in self.get_params(header=header, unquote=requote):
if p.lower() != param.lower():
if not new_ctype:
new_ctype = _formatparam(p, v, requote)
else:
new_ctype = SEMISPACE.join([new_ctype,
_formatparam(p, v, requote)])
if new_ctype != self.get(header):
del self[header]
self[header] = new_ctype

Repair Status:

Common Information:

  • CPython Version: main branch
  • Operating System: Linux
  • Credits: Finder is kexinoh (Xiangfan Wu) from QI-ANXIN Technology Research Institute.

Linked PRs

Metadata

Metadata

Assignees

No one assigned

    Labels

    3.10only security fixes3.11only security fixes3.12only security fixes3.13bugs and security fixes3.14bugs and security fixes3.15new features, bugs and security fixesstdlibStandard Library Python modules in the Lib/ directorytopic-emailtype-securityA security issue

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions