Skip to content

Commit dd4b33b

Browse files
committed
Merge branch 'test_concurrent_detach'
2 parents 592efd0 + 5707123 commit dd4b33b

File tree

2 files changed

+54
-1
lines changed

2 files changed

+54
-1
lines changed

range.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1087,7 +1087,7 @@ BEGIN
10871087
parent_relid := @extschema@.get_parent_of_partition(partition);
10881088

10891089
/* Acquire lock on parent */
1090-
PERFORM @extschema@.lock_partitioned_relation(parent_relid);
1090+
PERFORM @extschema@.prevent_relation_modification(parent_relid);
10911091

10921092
v_attname := attname
10931093
FROM @extschema@.pathman_config

tests/python/partitioning_test.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
"""
88

99
import unittest
10+
import math
1011
from testgres import get_new_node, stop_all
1112
import time
1213
import os
14+
import re
15+
import subprocess
1316
import threading
1417

1518

@@ -708,6 +711,56 @@ def con2_thread():
708711
node.stop()
709712
node.cleanup()
710713

714+
def test_concurrent_detach(self):
715+
"""Test concurrent detach partition with contiguous tuple inserting and spawning new partitions"""
716+
717+
# Init parameters
718+
num_insert_workers = 8
719+
detach_timeout = 0.1 # time in sec between successive inserts and detachs
720+
num_detachs = 100 # estimated number of detachs
721+
inserts_advance = 1 # abvance in sec of inserts process under detachs
722+
test_interval = int(math.ceil(detach_timeout * num_detachs))
723+
724+
# Create and start new instance
725+
node = self.start_new_pathman_cluster(allows_streaming=False)
726+
727+
# Create partitioned table for testing that spawns new partition on each next *detach_timeout* sec
728+
with node.connect() as con0:
729+
con0.begin()
730+
con0.execute('create table ts_range_partitioned(ts timestamp not null)')
731+
con0.execute("select create_range_partitions('ts_range_partitioned', 'ts', current_timestamp, interval '%f', 1)" % detach_timeout)
732+
con0.commit()
733+
734+
# Run in background inserts and detachs processes
735+
FNULL = open(os.devnull, 'w')
736+
inserts = node.pgbench(stdout=FNULL, stderr=subprocess.PIPE, options=[
737+
"-j", "%i" % num_insert_workers,
738+
"-c", "%i" % num_insert_workers,
739+
"-f", "pgbench_scripts/insert_current_timestamp.pgbench",
740+
"-T", "%i" % (test_interval+inserts_advance)
741+
])
742+
time.sleep(inserts_advance)
743+
detachs = node.pgbench(stdout=FNULL, stderr=subprocess.PIPE, options=[
744+
"-D", "timeout=%f" % detach_timeout,
745+
"-f", "pgbench_scripts/detachs_in_timeout.pgbench",
746+
"-T", "%i" % test_interval
747+
])
748+
749+
# Wait for completion of processes
750+
inserts.wait()
751+
detachs.wait()
752+
753+
# Obtain error log from inserts process
754+
inserts_errors = inserts.stderr.read()
755+
756+
self.assertIsNone(
757+
re.search("ERROR: constraint", inserts_errors),
758+
msg="Race condition between detach and concurrent inserts with append partition is expired")
759+
760+
# Stop instance and finish work
761+
node.stop()
762+
node.cleanup()
763+
711764

712765
if __name__ == "__main__":
713766
unittest.main()

0 commit comments

Comments
 (0)