Skip to content

Commit a805ccd

Browse files
author
Arthur Gautier
committed
Tests gtid
1 parent 7cbbad4 commit a805ccd

File tree

1 file changed

+78
-1
lines changed

1 file changed

+78
-1
lines changed

pymysqlreplication/tests/test_basic.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from pymysqlreplication.constants.BINLOG import *
77
from pymysqlreplication.row_event import *
88

9-
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader"]
9+
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestGtidBinLogStreamReader"]
1010

1111

1212
class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
@@ -355,6 +355,83 @@ def test_delete_multiple_row_event(self):
355355
self.assertEqual(event.rows[1]["values"]["data"], "World")
356356

357357

358+
class TestGtidBinLogStreamReader(base.PyMySQLReplicationTestCase):
359+
def test_read_query_event(self):
360+
query = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
361+
self.execute(query)
362+
query = "SELECT @@global.gtid_executed;"
363+
gtid = self.execute(query).fetchone()[0]
364+
365+
self.stream.close()
366+
self.stream = BinLogStreamReader(connection_settings=self.database,
367+
blocking=True,
368+
auto_position=gtid)
369+
370+
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
371+
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
372+
373+
# Insert first event
374+
query = "BEGIN;"
375+
self.execute(query)
376+
query = "INSERT INTO test (id, data) VALUES(1, 'Hello');"
377+
self.execute(query)
378+
query = "COMMIT;"
379+
self.execute(query)
380+
381+
firstevent = self.stream.fetchone()
382+
self.assertIsInstance(firstevent, GtidEvent)
383+
384+
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
385+
self.assertIsInstance(self.stream.fetchone(), TableMapEvent)
386+
self.assertIsInstance(self.stream.fetchone(), WriteRowsEvent)
387+
self.assertIsInstance(self.stream.fetchone(), XidEvent)
388+
389+
# Insert second event
390+
query = "BEGIN;"
391+
self.execute(query)
392+
query = "INSERT INTO test (id, data) VALUES(2, 'Hello');"
393+
self.execute(query)
394+
query = "COMMIT;"
395+
self.execute(query)
396+
397+
secondevent = self.stream.fetchone()
398+
self.assertIsInstance(secondevent, GtidEvent)
399+
400+
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
401+
self.assertIsInstance(self.stream.fetchone(), TableMapEvent)
402+
self.assertIsInstance(self.stream.fetchone(), WriteRowsEvent)
403+
self.assertIsInstance(self.stream.fetchone(), XidEvent)
404+
405+
self.assertEqual(secondevent.gno, firstevent.gno + 1)
406+
407+
def test_position_gtid(self):
408+
query = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
409+
self.execute(query)
410+
query = "BEGIN;"
411+
self.execute(query)
412+
query = "INSERT INTO test (id, data) VALUES(1, 'Hello');"
413+
self.execute(query)
414+
query = "COMMIT;"
415+
self.execute(query)
416+
417+
query = "CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
418+
self.execute(query)
419+
query = "SELECT @@global.gtid_executed;"
420+
gtid = self.execute(query).fetchone()[0]
421+
422+
self.stream.close()
423+
self.stream = BinLogStreamReader(connection_settings=self.database,
424+
blocking=True,
425+
auto_position=gtid)
426+
427+
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
428+
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
429+
self.assertIsInstance(self.stream.fetchone(), GtidEvent)
430+
event = self.stream.fetchone()
431+
432+
self.assertEqual(event.query, 'CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))');
433+
434+
358435
if __name__ == "__main__":
359436
import unittest
360437
unittest.main()

0 commit comments

Comments
 (0)