10
10
from testgres import get_new_node , stop_all
11
11
import time
12
12
import os
13
+ import threading
13
14
14
15
15
16
def if_fdw_enabled (func ):
@@ -26,15 +27,13 @@ class PartitioningTests(unittest.TestCase):
26
27
27
28
def setUp (self ):
28
29
self .setup_cmd = [
29
- # 'create extension pg_pathman',
30
30
'create table abc(id serial, t text)' ,
31
31
'insert into abc select generate_series(1, 300000)' ,
32
32
'select create_hash_partitions(\' abc\' , \' id\' , 3, partition_data := false)' ,
33
33
]
34
34
35
35
def tearDown (self ):
36
36
stop_all ()
37
- # clean_all()
38
37
39
38
def start_new_pathman_cluster (self , name = 'test' , allows_streaming = False ):
40
39
node = get_new_node (name )
@@ -571,7 +570,7 @@ def ordered(obj):
571
570
]
572
571
}
573
572
}
574
- ]
573
+ ]
575
574
""" )
576
575
self .assertEqual (ordered (plan ), ordered (expected ))
577
576
@@ -596,7 +595,6 @@ def ordered(obj):
596
595
]
597
596
""" )
598
597
self .assertEqual (ordered (plan ), ordered (expected ))
599
- # import ipdb; ipdb.set_trace()
600
598
601
599
# Remove all objects for testing
602
600
node .psql ('postgres' , 'drop table range_partitioned cascade' )
@@ -607,6 +605,63 @@ def ordered(obj):
607
605
node .stop ()
608
606
node .cleanup ()
609
607
608
+ def test_conc_part_creation_insert (self ):
609
+ """Test concurrent partition creation on INSERT"""
610
+
611
+ # Create and start new instance
612
+ node = self .start_new_pathman_cluster (allows_streaming = False )
613
+
614
+ # Create table 'ins_test' and partition it
615
+ with node .connect () as con0 :
616
+ con0 .begin ()
617
+ con0 .execute ('create table ins_test(val int not null)' )
618
+ con0 .execute ('insert into ins_test select generate_series(1, 50)' )
619
+ con0 .execute ("select create_range_partitions('ins_test', 'val', 1, 10)" )
620
+ con0 .commit ()
621
+
622
+ # Create two separate connections for this test
623
+ with node .connect () as con1 , node .connect () as con2 :
624
+
625
+ # Thread for connection #2 (it has to wait)
626
+ def con2_thread ():
627
+ con2 .execute ('insert into ins_test values(51)' )
628
+
629
+ # Step 1: lock partitioned table in con1
630
+ con1 .begin ()
631
+ con1 .execute ('lock table ins_test in share update exclusive mode' )
632
+
633
+ # Step 2: try inserting new value in con2 (waiting)
634
+ t = threading .Thread (target = con2_thread )
635
+ t .start ()
636
+
637
+ # Step 3: try inserting new value in con1 (success, unlock)
638
+ con1 .execute ('insert into ins_test values(52)' )
639
+ con1 .commit ()
640
+
641
+ # Step 4: wait for con2
642
+ t .join ()
643
+
644
+ rows = con1 .execute ("""
645
+ select * from pathman_partition_list
646
+ where parent = 'ins_test'::regclass
647
+ order by range_min, range_max
648
+ """ )
649
+
650
+ # check number of partitions
651
+ self .assertEqual (len (rows ), 6 )
652
+
653
+ # check range_max of partitions
654
+ self .assertEqual (int (rows [0 ][5 ]), 11 )
655
+ self .assertEqual (int (rows [1 ][5 ]), 21 )
656
+ self .assertEqual (int (rows [2 ][5 ]), 31 )
657
+ self .assertEqual (int (rows [3 ][5 ]), 41 )
658
+ self .assertEqual (int (rows [4 ][5 ]), 51 )
659
+ self .assertEqual (int (rows [5 ][5 ]), 61 )
660
+
661
+ # Stop instance and finish work
662
+ node .stop ()
663
+ node .cleanup ()
664
+
610
665
611
666
if __name__ == "__main__" :
612
667
unittest .main ()
0 commit comments