diff --git a/Lib/email/_header_value_parser.py b/Lib/email/_header_value_parser.py index f11fa83d45ed2d..235b180473b3b0 100644 --- a/Lib/email/_header_value_parser.py +++ b/Lib/email/_header_value_parser.py @@ -82,11 +82,14 @@ WSP = set(' \t') CFWS_LEADER = WSP | set('(') +CFWS_LEADER_WITH_DOT = CFWS_LEADER | set('.') SPECIALS = set(r'()<>@,:;.\"[]') ATOM_ENDS = SPECIALS | WSP DOT_ATOM_ENDS = ATOM_ENDS - set('.') # '.', '"', and '(' do not end phrases in order to support obs-phrase PHRASE_ENDS = SPECIALS - set('."(') +PHRASE_ENDS_CHARS = r''.join(PHRASE_ENDS) +PHRASE_ENDS_CHARS_NO_SEMICOLON = PHRASE_ENDS_CHARS.replace(';', '') TSPECIALS = (SPECIALS | set('/?=')) - set('.') TOKEN_ENDS = TSPECIALS | WSP ASPECIALS = TSPECIALS | set("*'%") @@ -1300,6 +1303,12 @@ def get_cfws(value): cfws.append(token) return cfws, value +def get_cfws_digits(value, leader_set): + ind = 0 + while ind < len(value) and value[ind] not in leader_set: + ind += 1 + return value[:ind], value[ind:] + def get_quoted_string(value): """quoted-string = [CFWS] [CFWS] @@ -1443,11 +1452,14 @@ def get_phrase(value): phrase.defects.append(errors.InvalidHeaderDefect( "phrase does not start with word")) while value and value[0] not in PHRASE_ENDS: - if value[0]=='.': - phrase.append(DOT) - phrase.defects.append(errors.ObsoleteHeaderDefect( - "period in 'phrase'")) - value = value[1:] + if value[0] == '.': + tmpvalue = value.lstrip('.') + n = len(value) - len(tmpvalue) + phrase.extend(DOT for _ in range(n)) + phrase.defects.extend( + errors.ObsoleteHeaderDefect("period in 'phrase'") + for _ in range(n)) + value = tmpvalue else: try: token, value = get_word(value) @@ -1461,6 +1473,19 @@ def get_phrase(value): phrase.append(token) return phrase, value +def _find_phrase(reslist, value, phrase_ends, phrase_end_chars, endchars): + while value and value[0] not in endchars: + if value[0] in phrase_ends: + tmpvalue = value.lstrip(phrase_end_chars) + reslist.extend( + ValueTerminal(value[i], 'misplaced-special') + for i in range(len(value) - len(tmpvalue))) + value = tmpvalue + else: + token, value = get_phrase(value) + reslist.append(token) + return value + def get_local_part(value): """ local-part = dot-atom / quoted-string / obs-local-part @@ -1842,14 +1867,10 @@ def get_invalid_mailbox(value, endchars): """ invalid_mailbox = InvalidMailbox() - while value and value[0] not in endchars: - if value[0] in PHRASE_ENDS: - invalid_mailbox.append(ValueTerminal(value[0], - 'misplaced-special')) - value = value[1:] - else: - token, value = get_phrase(value) - invalid_mailbox.append(token) + # lstrip() should not strip stuff in 'endchars' + phrase_end_chars = ''.join(PHRASE_ENDS - set(endchars)) + value = _find_phrase(invalid_mailbox, value, + PHRASE_ENDS, phrase_end_chars, endchars) return invalid_mailbox, value def get_mailbox_list(value): @@ -2196,10 +2217,7 @@ def parse_mime_version(value): if not value: mime_version.defects.append(errors.HeaderMissingRequiredValue( "Expected MIME version number but found only CFWS")) - digits = '' - while value and value[0] != '.' and value[0] not in CFWS_LEADER: - digits += value[0] - value = value[1:] + digits, value = get_cfws_digits(value, CFWS_LEADER_WITH_DOT) if not digits.isdigit(): mime_version.defects.append(errors.InvalidHeaderDefect( "Expected MIME major version number but found {!r}".format(digits))) @@ -2227,10 +2245,7 @@ def parse_mime_version(value): mime_version.defects.append(errors.InvalidHeaderDefect( "Incomplete MIME version; found only major number")) return mime_version - digits = '' - while value and value[0] not in CFWS_LEADER: - digits += value[0] - value = value[1:] + digits, value = get_cfws_digits(value, CFWS_LEADER) if not digits.isdigit(): mime_version.defects.append(errors.InvalidHeaderDefect( "Expected MIME minor version number but found {!r}".format(digits))) @@ -2255,14 +2270,8 @@ def get_invalid_parameter(value): """ invalid_parameter = InvalidParameter() - while value and value[0] != ';': - if value[0] in PHRASE_ENDS: - invalid_parameter.append(ValueTerminal(value[0], - 'misplaced-special')) - value = value[1:] - else: - token, value = get_phrase(value) - invalid_parameter.append(token) + value = _find_phrase(invalid_parameter, value, + PHRASE_ENDS, PHRASE_ENDS_CHARS_NO_SEMICOLON, ';') return invalid_parameter, value def get_ttext(value): @@ -2407,10 +2416,8 @@ def get_section(value): if not value or not value[0].isdigit(): raise errors.HeaderParseError("Expected section number but " "found {}".format(value)) - digits = '' - while value and value[0].isdigit(): - digits += value[0] - value = value[1:] + ind = next((i for i, ch in enumerate(value) if not ch.isdigit()), 0) + digits, value = value[:ind], value[ind:] if digits[0] == '0' and digits != '0': section.defects.append(errors.InvalidHeaderDefect( "section number has an invalid leading 0")) @@ -2567,12 +2574,15 @@ def get_parameter(value): while value: if value[0] in WSP: token, value = get_fws(value) + v.append(token) elif value[0] == '"': - token = ValueTerminal('"', 'DQUOTE') - value = value[1:] + tmpvalue = value.lstrip('"') + n = len(value) - len(tmpvalue) + v.extend((ValueTerminal('"', 'DQUOTE') for _ in range(n))) + value = tmpvalue else: token, value = get_qcontent(value) - v.append(token) + v.append(token) token = v else: token, value = get_value(value) @@ -2638,17 +2648,11 @@ def _find_mime_parameters(tokenlist, value): """Do our best to find the parameters in an invalid MIME header """ - while value and value[0] != ';': - if value[0] in PHRASE_ENDS: - tokenlist.append(ValueTerminal(value[0], 'misplaced-special')) - value = value[1:] - else: - token, value = get_phrase(value) - tokenlist.append(token) - if not value: - return - tokenlist.append(ValueTerminal(';', 'parameter-separator')) - tokenlist.append(parse_mime_parameters(value[1:])) + value = _find_phrase(tokenlist, value, + PHRASE_ENDS, PHRASE_ENDS_CHARS_NO_SEMICOLON, ';') + if value: + tokenlist.append(ValueTerminal(';', 'parameter-separator')) + tokenlist.append(parse_mime_parameters(value[1:])) def parse_content_type_header(value): """ maintype "/" subtype *( ";" parameter ) @@ -2757,12 +2761,16 @@ def parse_content_transfer_encoding_header(value): if not value: return cte_header while value: - cte_header.defects.append(errors.InvalidHeaderDefect( - "Extra text after content transfer encoding")) if value[0] in PHRASE_ENDS: - cte_header.append(ValueTerminal(value[0], 'misplaced-special')) - value = value[1:] + tmpvalue = value.lstrip(PHRASE_ENDS_CHARS) + for i in range(len(value) - len(tmpvalue)): + cte_header.defects.append(errors.InvalidHeaderDefect( + "Extra text after content transfer encoding")) + cte_header.append(ValueTerminal(value[i], 'misplaced-special')) + value = tmpvalue else: + cte_header.defects.append(errors.InvalidHeaderDefect( + "Extra text after content transfer encoding")) token, value = get_phrase(value) cte_header.append(token) return cte_header diff --git a/Lib/test/test_email/test__header_value_parser.py b/Lib/test/test_email/test__header_value_parser.py index fd4ac2c404ce47..3d91537191c69b 100644 --- a/Lib/test/test_email/test__header_value_parser.py +++ b/Lib/test/test_email/test__header_value_parser.py @@ -2676,6 +2676,16 @@ def test_invalid_content_transfer_encoding(self): ";foo", ";foo", ";foo", [errors.InvalidHeaderDefect]*3 ) + def test_invalid_content_transfer_encoding_misplaced_special(self): + cte = parser.parse_content_transfer_encoding_header("foo;;;;;") + self.assertEqual(len(cte), 6) + self.assertEqual(cte[0].value, "foo") + self.assertEqual(cte[0].token_type, "token") + self.assertEqual(cte[0].value, "foo") + self.assertEqual(cte[0].token_type, "token") + terminal = parser.ValueTerminal(";", "misplaced-special") + self.assertEqual(cte[1:], [terminal] * 5) + # get_msg_id def test_get_msg_id_empty(self): diff --git a/Misc/NEWS.d/next/Security/2025-05-30-20-08-38.gh-issue-134873.6Z5xUC.rst b/Misc/NEWS.d/next/Security/2025-05-30-20-08-38.gh-issue-134873.6Z5xUC.rst new file mode 100644 index 00000000000000..93389a64ee2ead --- /dev/null +++ b/Misc/NEWS.d/next/Security/2025-05-30-20-08-38.gh-issue-134873.6Z5xUC.rst @@ -0,0 +1,2 @@ +Fix various HTTP header value parsing routines with worst-case +quadratic time complexity. Patch by Bénédikt Tran.