Skip to content

Commit 641dc1a

Browse files
committed
test concurrent partition creation on INSERT
1 parent 2e6d2f8 commit 641dc1a

File tree

1 file changed

+59
-4
lines changed

1 file changed

+59
-4
lines changed

tests/python/partitioning_test.py

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from testgres import get_new_node, stop_all
1111
import time
1212
import os
13+
import threading
1314

1415

1516
def if_fdw_enabled(func):
@@ -26,15 +27,13 @@ class PartitioningTests(unittest.TestCase):
2627

2728
def setUp(self):
2829
self.setup_cmd = [
29-
# 'create extension pg_pathman',
3030
'create table abc(id serial, t text)',
3131
'insert into abc select generate_series(1, 300000)',
3232
'select create_hash_partitions(\'abc\', \'id\', 3, partition_data := false)',
3333
]
3434

3535
def tearDown(self):
3636
stop_all()
37-
# clean_all()
3837

3938
def start_new_pathman_cluster(self, name='test', allows_streaming=False):
4039
node = get_new_node(name)
@@ -571,7 +570,7 @@ def ordered(obj):
571570
]
572571
}
573572
}
574-
]
573+
]
575574
""")
576575
self.assertEqual(ordered(plan), ordered(expected))
577576

@@ -596,7 +595,6 @@ def ordered(obj):
596595
]
597596
""")
598597
self.assertEqual(ordered(plan), ordered(expected))
599-
# import ipdb; ipdb.set_trace()
600598

601599
# Remove all objects for testing
602600
node.psql('postgres', 'drop table range_partitioned cascade')
@@ -607,6 +605,63 @@ def ordered(obj):
607605
node.stop()
608606
node.cleanup()
609607

608+
def test_conc_part_creation_insert(self):
609+
"""Test concurrent partition creation on INSERT"""
610+
611+
# Create and start new instance
612+
node = self.start_new_pathman_cluster(allows_streaming=False)
613+
614+
# Create table 'ins_test' and partition it
615+
with node.connect() as con0:
616+
con0.begin()
617+
con0.execute('create table ins_test(val int not null)')
618+
con0.execute('insert into ins_test select generate_series(1, 50)')
619+
con0.execute("select create_range_partitions('ins_test', 'val', 1, 10)")
620+
con0.commit()
621+
622+
# Create two separate connections for this test
623+
with node.connect() as con1, node.connect() as con2:
624+
625+
# Thread for connection #2 (it has to wait)
626+
def con2_thread():
627+
con2.execute('insert into ins_test values(51)')
628+
629+
# Step 1: lock partitioned table in con1
630+
con1.begin()
631+
con1.execute('lock table ins_test in share update exclusive mode')
632+
633+
# Step 2: try inserting new value in con2 (waiting)
634+
t = threading.Thread(target=con2_thread)
635+
t.start()
636+
637+
# Step 3: try inserting new value in con1 (success, unlock)
638+
con1.execute('insert into ins_test values(52)')
639+
con1.commit()
640+
641+
# Step 4: wait for con2
642+
t.join()
643+
644+
rows = con1.execute("""
645+
select * from pathman_partition_list
646+
where parent = 'ins_test'::regclass
647+
order by range_min, range_max
648+
""")
649+
650+
# check number of partitions
651+
self.assertEqual(len(rows), 6)
652+
653+
# check range_max of partitions
654+
self.assertEqual(int(rows[0][5]), 11)
655+
self.assertEqual(int(rows[1][5]), 21)
656+
self.assertEqual(int(rows[2][5]), 31)
657+
self.assertEqual(int(rows[3][5]), 41)
658+
self.assertEqual(int(rows[4][5]), 51)
659+
self.assertEqual(int(rows[5][5]), 61)
660+
661+
# Stop instance and finish work
662+
node.stop()
663+
node.cleanup()
664+
610665

611666
if __name__ == "__main__":
612667
unittest.main()

0 commit comments

Comments
 (0)