Skip to content

Cleanup #927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pymysql/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ def escape_float(value, mapping=None):


_escape_table = [chr(x) for x in range(128)]
_escape_table[0] = u"\\0"
_escape_table[ord("\\")] = u"\\\\"
_escape_table[ord("\n")] = u"\\n"
_escape_table[ord("\r")] = u"\\r"
_escape_table[ord("\032")] = u"\\Z"
_escape_table[ord('"')] = u'\\"'
_escape_table[ord("'")] = u"\\'"
_escape_table[0] = "\\0"
_escape_table[ord("\\")] = "\\\\"
_escape_table[ord("\n")] = "\\n"
_escape_table[ord("\r")] = "\\r"
_escape_table[ord("\032")] = "\\Z"
_escape_table[ord('"')] = '\\"'
_escape_table[ord("'")] = "\\'"


def escape_string(value, mapping=None):
Expand Down
14 changes: 7 additions & 7 deletions pymysql/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,31 +182,31 @@ def read_struct(self, fmt):

def is_ok_packet(self):
# https://dev.mysql.com/doc/internals/en/packet-OK_Packet.html
return self._data[0:1] == b"\0" and len(self._data) >= 7
return self._data[0] == 0 and len(self._data) >= 7

def is_eof_packet(self):
# http://dev.mysql.com/doc/internals/en/generic-response-packets.html#packet-EOF_Packet
# Caution: \xFE may be LengthEncodedInteger.
# If \xFE is LengthEncodedInteger header, 8bytes followed.
return self._data[0:1] == b"\xfe" and len(self._data) < 9
return self._data[0] == 0xFE and len(self._data) < 9

def is_auth_switch_request(self):
# http://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchRequest
return self._data[0:1] == b"\xfe"
return self._data[0] == 0xFE

def is_extra_auth_data(self):
# https://dev.mysql.com/doc/internals/en/successful-authentication.html
return self._data[0:1] == b"\x01"
return self._data[0] == 1

def is_resultset_packet(self):
field_count = ord(self._data[0:1])
field_count = self._data[0]
return 1 <= field_count <= 250

def is_load_local_packet(self):
return self._data[0:1] == b"\xfb"
return self._data[0] == 0xFB

def is_error_packet(self):
return self._data[0:1] == b"\xff"
return self._data[0] == 0xFF

def check_error(self):
if self.is_error_packet():
Expand Down
8 changes: 4 additions & 4 deletions pymysql/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_datatypes(self):
123456789012,
5.7,
"hello'\" world",
u"Espa\xc3\xb1ol",
"Espa\xc3\xb1ol",
"binary\x00data".encode(conn.encoding),
datetime.date(1988, 2, 2),
datetime.datetime(2014, 5, 15, 7, 45, 57),
Expand Down Expand Up @@ -147,9 +147,9 @@ def test_untyped(self):
conn = self.connect()
c = conn.cursor()
c.execute("select null,''")
self.assertEqual((None, u""), c.fetchone())
self.assertEqual((None, ""), c.fetchone())
c.execute("select '',null")
self.assertEqual((u"", None), c.fetchone())
self.assertEqual(("", None), c.fetchone())

def test_timedelta(self):
""" test timedelta conversion """
Expand Down Expand Up @@ -300,7 +300,7 @@ def test_json(self):
)
cur = conn.cursor()

json_str = u'{"hello": "こんにちは"}'
json_str = '{"hello": "こんにちは"}'
cur.execute("INSERT INTO test_json (id, `json`) values (42, %s)", (json_str,))
cur.execute("SELECT `json` from `test_json` WHERE `id`=42")
res = cur.fetchone()[0]
Expand Down
12 changes: 6 additions & 6 deletions pymysql/tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ class TestAuthentication(base.PyMySQLTestCase):
del db["user"]
cur.execute("SHOW PLUGINS")
for r in cur:
if (r[1], r[2]) != (u"ACTIVE", u"AUTHENTICATION"):
if (r[1], r[2]) != ("ACTIVE", "AUTHENTICATION"):
continue
if r[3] == u"auth_socket.so" or r[0] == u"unix_socket":
if r[3] == "auth_socket.so" or r[0] == "unix_socket":
socket_plugin_name = r[0]
socket_found = True
elif r[3] == u"dialog_examples.so":
elif r[3] == "dialog_examples.so":
if r[0] == "two_questions":
two_questions_found = True
elif r[0] == "three_attempts":
three_attempts_found = True
elif r[0] == u"pam":
elif r[0] == "pam":
pam_found = True
pam_plugin_name = r[3].split(".")[0]
if pam_plugin_name == "auth_pam":
Expand All @@ -92,9 +92,9 @@ class TestAuthentication(base.PyMySQLTestCase):
# https://mariadb.com/kb/en/mariadb/pam-authentication-plugin/

# Names differ but functionality is close
elif r[0] == u"mysql_old_password":
elif r[0] == "mysql_old_password":
mysql_old_password_found = True
elif r[0] == u"sha256_password":
elif r[0] == "sha256_password":
sha256_password_found = True
# else:
# print("plugin: %r" % r[0])
Expand Down
2 changes: 1 addition & 1 deletion pymysql/tests/test_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class TestConverter(TestCase):
def test_escape_string(self):
self.assertEqual(converters.escape_string(u"foo\nbar"), u"foo\\nbar")
self.assertEqual(converters.escape_string("foo\nbar"), "foo\\nbar")

def test_convert_datetime(self):
expected = datetime.datetime(2007, 2, 24, 23, 6, 20)
Expand Down
30 changes: 15 additions & 15 deletions pymysql/tests/test_issues.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ def test_issue_15(self):
c.execute("drop table if exists issue15")
c.execute("create table issue15 (t varchar(32))")
try:
c.execute("insert into issue15 (t) values (%s)", (u"\xe4\xf6\xfc",))
c.execute("insert into issue15 (t) values (%s)", ("\xe4\xf6\xfc",))
c.execute("select t from issue15")
self.assertEqual(u"\xe4\xf6\xfc", c.fetchone()[0])
self.assertEqual("\xe4\xf6\xfc", c.fetchone()[0])
finally:
c.execute("drop table issue15")

Expand Down Expand Up @@ -189,12 +189,12 @@ def test_issue_34(self):
def test_issue_33(self):
conn = pymysql.connect(charset="utf8", **self.databases[0])
self.safe_create_table(
conn, u"hei\xdfe", u"create table hei\xdfe (name varchar(32))"
conn, "hei\xdfe", "create table hei\xdfe (name varchar(32))"
)
c = conn.cursor()
c.execute(u"insert into hei\xdfe (name) values ('Pi\xdfata')")
c.execute(u"select name from hei\xdfe")
self.assertEqual(u"Pi\xdfata", c.fetchone()[0])
c.execute("insert into hei\xdfe (name) values ('Pi\xdfata')")
c.execute("select name from hei\xdfe")
self.assertEqual("Pi\xdfata", c.fetchone()[0])

@pytest.mark.skip("This test requires manual intervention")
def test_issue_35(self):
Expand Down Expand Up @@ -408,18 +408,18 @@ def test_issue_321(self):
)
sql_select = "select * from issue321 where " "value_1 in %s and value_2=%s"
data = [
[(u"a",), u"\u0430"],
[[u"b"], u"\u0430"],
{"value_1": [[u"c"]], "value_2": u"\u0430"},
[("a",), "\u0430"],
[["b"], "\u0430"],
{"value_1": [["c"]], "value_2": "\u0430"},
]
cur = conn.cursor()
self.assertEqual(cur.execute(sql_insert, data[0]), 1)
self.assertEqual(cur.execute(sql_insert, data[1]), 1)
self.assertEqual(cur.execute(sql_dict_insert, data[2]), 1)
self.assertEqual(cur.execute(sql_select, [(u"a", u"b", u"c"), u"\u0430"]), 3)
self.assertEqual(cur.fetchone(), (u"a", u"\u0430"))
self.assertEqual(cur.fetchone(), (u"b", u"\u0430"))
self.assertEqual(cur.fetchone(), (u"c", u"\u0430"))
self.assertEqual(cur.execute(sql_select, [("a", "b", "c"), "\u0430"]), 3)
self.assertEqual(cur.fetchone(), ("a", "\u0430"))
self.assertEqual(cur.fetchone(), ("b", "\u0430"))
self.assertEqual(cur.fetchone(), ("c", "\u0430"))

def test_issue_364(self):
""" Test mixed unicode/binary arguments in executemany. """
Expand All @@ -432,8 +432,8 @@ def test_issue_364(self):
)

sql = "insert into issue364 (value_1, value_2) values (_binary %s, %s)"
usql = u"insert into issue364 (value_1, value_2) values (_binary %s, %s)"
values = [pymysql.Binary(b"\x00\xff\x00"), u"\xe4\xf6\xfc"]
usql = "insert into issue364 (value_1, value_2) values (_binary %s, %s)"
values = [pymysql.Binary(b"\x00\xff\x00"), "\xe4\xf6\xfc"]

# test single insert and select
cur = conn.cursor()
Expand Down