|
6 | 6 | from pymysqlreplication.constants.BINLOG import *
|
7 | 7 | from pymysqlreplication.row_event import *
|
8 | 8 |
|
9 |
| -__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader"] |
| 9 | +__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestGtidBinLogStreamReader"] |
10 | 10 |
|
11 | 11 |
|
12 | 12 | class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
|
@@ -355,6 +355,83 @@ def test_delete_multiple_row_event(self):
|
355 | 355 | self.assertEqual(event.rows[1]["values"]["data"], "World")
|
356 | 356 |
|
357 | 357 |
|
| 358 | +class TestGtidBinLogStreamReader(base.PyMySQLReplicationTestCase): |
| 359 | + def test_read_query_event(self): |
| 360 | + query = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" |
| 361 | + self.execute(query) |
| 362 | + query = "SELECT @@global.gtid_executed;" |
| 363 | + gtid = self.execute(query).fetchone()[0] |
| 364 | + |
| 365 | + self.stream.close() |
| 366 | + self.stream = BinLogStreamReader(connection_settings=self.database, |
| 367 | + blocking=True, |
| 368 | + auto_position=gtid) |
| 369 | + |
| 370 | + self.assertIsInstance(self.stream.fetchone(), RotateEvent) |
| 371 | + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) |
| 372 | + |
| 373 | + # Insert first event |
| 374 | + query = "BEGIN;" |
| 375 | + self.execute(query) |
| 376 | + query = "INSERT INTO test (id, data) VALUES(1, 'Hello');" |
| 377 | + self.execute(query) |
| 378 | + query = "COMMIT;" |
| 379 | + self.execute(query) |
| 380 | + |
| 381 | + firstevent = self.stream.fetchone() |
| 382 | + self.assertIsInstance(firstevent, GtidEvent) |
| 383 | + |
| 384 | + self.assertIsInstance(self.stream.fetchone(), QueryEvent) |
| 385 | + self.assertIsInstance(self.stream.fetchone(), TableMapEvent) |
| 386 | + self.assertIsInstance(self.stream.fetchone(), WriteRowsEvent) |
| 387 | + self.assertIsInstance(self.stream.fetchone(), XidEvent) |
| 388 | + |
| 389 | + # Insert second event |
| 390 | + query = "BEGIN;" |
| 391 | + self.execute(query) |
| 392 | + query = "INSERT INTO test (id, data) VALUES(2, 'Hello');" |
| 393 | + self.execute(query) |
| 394 | + query = "COMMIT;" |
| 395 | + self.execute(query) |
| 396 | + |
| 397 | + secondevent = self.stream.fetchone() |
| 398 | + self.assertIsInstance(secondevent, GtidEvent) |
| 399 | + |
| 400 | + self.assertIsInstance(self.stream.fetchone(), QueryEvent) |
| 401 | + self.assertIsInstance(self.stream.fetchone(), TableMapEvent) |
| 402 | + self.assertIsInstance(self.stream.fetchone(), WriteRowsEvent) |
| 403 | + self.assertIsInstance(self.stream.fetchone(), XidEvent) |
| 404 | + |
| 405 | + self.assertEqual(secondevent.gno, firstevent.gno + 1) |
| 406 | + |
| 407 | + def test_position_gtid(self): |
| 408 | + query = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" |
| 409 | + self.execute(query) |
| 410 | + query = "BEGIN;" |
| 411 | + self.execute(query) |
| 412 | + query = "INSERT INTO test (id, data) VALUES(1, 'Hello');" |
| 413 | + self.execute(query) |
| 414 | + query = "COMMIT;" |
| 415 | + self.execute(query) |
| 416 | + |
| 417 | + query = "CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" |
| 418 | + self.execute(query) |
| 419 | + query = "SELECT @@global.gtid_executed;" |
| 420 | + gtid = self.execute(query).fetchone()[0] |
| 421 | + |
| 422 | + self.stream.close() |
| 423 | + self.stream = BinLogStreamReader(connection_settings=self.database, |
| 424 | + blocking=True, |
| 425 | + auto_position=gtid) |
| 426 | + |
| 427 | + self.assertIsInstance(self.stream.fetchone(), RotateEvent) |
| 428 | + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) |
| 429 | + self.assertIsInstance(self.stream.fetchone(), GtidEvent) |
| 430 | + event = self.stream.fetchone() |
| 431 | + |
| 432 | + self.assertEqual(event.query, 'CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))'); |
| 433 | + |
| 434 | + |
358 | 435 | if __name__ == "__main__":
|
359 | 436 | import unittest
|
360 | 437 | unittest.main()
|
0 commit comments