|
7 | 7 | """
|
8 | 8 |
|
9 | 9 | import unittest
|
| 10 | +import math |
10 | 11 | from testgres import get_new_node, stop_all
|
11 | 12 | import time
|
12 | 13 | import os
|
| 14 | +import re |
| 15 | +import subprocess |
13 | 16 | import threading
|
14 | 17 |
|
15 | 18 |
|
@@ -708,6 +711,56 @@ def con2_thread():
|
708 | 711 | node.stop()
|
709 | 712 | node.cleanup()
|
710 | 713 |
|
| 714 | + def test_concurrent_detach(self): |
| 715 | + """Test concurrent detach partition with contiguous tuple inserting and spawning new partitions""" |
| 716 | + |
| 717 | + # Init parameters |
| 718 | + num_insert_workers = 8 |
| 719 | + detach_timeout = 0.1 # time in sec between successive inserts and detachs |
| 720 | + num_detachs = 100 # estimated number of detachs |
| 721 | + inserts_advance = 1 # abvance in sec of inserts process under detachs |
| 722 | + test_interval = int(math.ceil(detach_timeout * num_detachs)) |
| 723 | + |
| 724 | + # Create and start new instance |
| 725 | + node = self.start_new_pathman_cluster(allows_streaming=False) |
| 726 | + |
| 727 | + # Create partitioned table for testing that spawns new partition on each next *detach_timeout* sec |
| 728 | + with node.connect() as con0: |
| 729 | + con0.begin() |
| 730 | + con0.execute('create table ts_range_partitioned(ts timestamp not null)') |
| 731 | + con0.execute("select create_range_partitions('ts_range_partitioned', 'ts', current_timestamp, interval '%f', 1)" % detach_timeout) |
| 732 | + con0.commit() |
| 733 | + |
| 734 | + # Run in background inserts and detachs processes |
| 735 | + FNULL = open(os.devnull, 'w') |
| 736 | + inserts = node.pgbench(stdout=FNULL, stderr=subprocess.PIPE, options=[ |
| 737 | + "-j", "%i" % num_insert_workers, |
| 738 | + "-c", "%i" % num_insert_workers, |
| 739 | + "-f", "pgbench_scripts/insert_current_timestamp.pgbench", |
| 740 | + "-T", "%i" % (test_interval+inserts_advance) |
| 741 | + ]) |
| 742 | + time.sleep(inserts_advance) |
| 743 | + detachs = node.pgbench(stdout=FNULL, stderr=subprocess.PIPE, options=[ |
| 744 | + "-D", "timeout=%f" % detach_timeout, |
| 745 | + "-f", "pgbench_scripts/detachs_in_timeout.pgbench", |
| 746 | + "-T", "%i" % test_interval |
| 747 | + ]) |
| 748 | + |
| 749 | + # Wait for completion of processes |
| 750 | + inserts.wait() |
| 751 | + detachs.wait() |
| 752 | + |
| 753 | + # Obtain error log from inserts process |
| 754 | + inserts_errors = inserts.stderr.read() |
| 755 | + |
| 756 | + self.assertIsNone( |
| 757 | + re.search("ERROR: constraint", inserts_errors), |
| 758 | + msg="Race condition between detach and concurrent inserts with append partition is expired") |
| 759 | + |
| 760 | + # Stop instance and finish work |
| 761 | + node.stop() |
| 762 | + node.cleanup() |
| 763 | + |
711 | 764 |
|
712 | 765 | if __name__ == "__main__":
|
713 | 766 | unittest.main()
|
0 commit comments