Skip to content

Commit 64cfd25

Browse files
author
Maksim Milyutin
committed
Add stress test for concurrent detach
1 parent 592efd0 commit 64cfd25

File tree

1 file changed

+53
-0
lines changed

1 file changed

+53
-0
lines changed

tests/python/partitioning_test.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
"""
88

99
import unittest
10+
import math
1011
from testgres import get_new_node, stop_all
1112
import time
1213
import os
14+
import re
15+
import subprocess
1316
import threading
1417

1518

@@ -708,6 +711,56 @@ def con2_thread():
708711
node.stop()
709712
node.cleanup()
710713

714+
def test_concurrent_detach(self):
715+
"""Test concurrent detach partition with contiguous tuple inserting and spawning new partitions"""
716+
717+
# Init parameters
718+
num_insert_workers = 8
719+
detach_timeout = 0.1 # time in sec between successive inserts and detachs
720+
num_detachs = 100 # estimated number of detachs
721+
inserts_advance = 1 # abvance in sec of inserts process under detachs
722+
test_interval = int(math.ceil(detach_timeout * num_detachs))
723+
724+
# Create and start new instance
725+
node = self.start_new_pathman_cluster(allows_streaming=False)
726+
727+
# Create partitioned table for testing that spawns new partition on each next *detach_timeout* sec
728+
with node.connect() as con0:
729+
con0.begin()
730+
con0.execute('create table ts_range_partitioned(ts timestamp not null)')
731+
con0.execute("select create_range_partitions('ts_range_partitioned', 'ts', current_timestamp, interval '%f', 1)" % detach_timeout)
732+
con0.commit()
733+
734+
# Run in background inserts and detachs processes
735+
FNULL = open(os.devnull, 'w')
736+
inserts = node.pgbench(stdout=FNULL, stderr=subprocess.PIPE, options=[
737+
"-j", "%i" % num_insert_workers,
738+
"-c", "%i" % num_insert_workers,
739+
"-f", "pgbench_scripts/insert_current_timestamp.pgbench",
740+
"-T", "%i" % (test_interval+inserts_advance)
741+
])
742+
time.sleep(inserts_advance)
743+
detachs = node.pgbench(stdout=FNULL, stderr=subprocess.PIPE, options=[
744+
"-D", "timeout=%f" % detach_timeout,
745+
"-f", "pgbench_scripts/detachs_in_timeout.pgbench",
746+
"-T", "%i" % test_interval
747+
])
748+
749+
# Wait for completion of processes
750+
inserts.wait()
751+
detachs.wait()
752+
753+
# Obtain error log from inserts process
754+
inserts_errors = inserts.stderr.read()
755+
756+
self.assertIsNone(
757+
re.search("ERROR: constraint", inserts_errors),
758+
msg="Race condition between detach and concurrent inserts with append partition is expired")
759+
760+
# Stop instance and finish work
761+
node.stop()
762+
node.cleanup()
763+
711764

712765
if __name__ == "__main__":
713766
unittest.main()

0 commit comments

Comments
 (0)