Skip to content

Commit 76af751

Browse files
committed
pathman: concurrent partitions creation issue fixed
1 parent e5caa24 commit 76af751

File tree

6 files changed

+148
-36
lines changed

6 files changed

+148
-36
lines changed

contrib/pg_pathman/init.sql

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ CREATE OR REPLACE FUNCTION @extschema@.get_range_by_idx(
4444
parent_relid OID, idx INTEGER, dummy ANYELEMENT)
4545
RETURNS ANYARRAY AS 'pg_pathman', 'get_range_by_idx' LANGUAGE C STRICT;
4646

47+
/*
48+
* Returns min value of the first range for relation
49+
*/
50+
CREATE OR REPLACE FUNCTION @extschema@.get_min_range_value(
51+
parent_relid OID, dummy ANYELEMENT)
52+
RETURNS ANYELEMENT AS 'pg_pathman', 'get_min_range_value' LANGUAGE C STRICT;
53+
54+
/*
55+
* Returns max value of the last range for relation
56+
*/
57+
CREATE OR REPLACE FUNCTION @extschema@.get_max_range_value(
58+
parent_relid OID, dummy ANYELEMENT)
59+
RETURNS ANYELEMENT AS 'pg_pathman', 'get_max_range_value' LANGUAGE C STRICT;
4760

4861
/*
4962
* Copy rows to partitions
@@ -182,3 +195,15 @@ LANGUAGE plpgsql;
182195
CREATE EVENT TRIGGER pathman_ddl_trigger
183196
ON sql_drop
184197
EXECUTE PROCEDURE @extschema@.pathman_ddl_trigger_func();
198+
199+
/*
200+
* Acquire partitions lock to prevent concurrent partitions creation
201+
*/
202+
CREATE OR REPLACE FUNCTION @extschema@.acquire_partitions_lock()
203+
RETURNS VOID AS 'pg_pathman', 'acquire_partitions_lock' LANGUAGE C STRICT;
204+
205+
/*
206+
* Release partitions lock
207+
*/
208+
CREATE OR REPLACE FUNCTION @extschema@.release_partitions_lock()
209+
RETURNS VOID AS 'pg_pathman', 'release_partitions_lock' LANGUAGE C STRICT;

contrib/pg_pathman/pathman.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ bool irange_list_find(List *rangeset, int index, bool *lossy);
128128

129129
LWLock *load_config_lock;
130130
LWLock *dsm_init_lock;
131+
LWLock *edit_partitions_lock;
131132

132133

133134
/* Dynamic shared memory functions */

contrib/pg_pathman/pg_pathman.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,9 +245,10 @@ static void
245245
pathman_shmem_startup(void)
246246
{
247247
/* Initialize locks */
248-
RequestAddinLWLocks(2);
248+
RequestAddinLWLocks(3);
249249
load_config_lock = LWLockAssign();
250250
dsm_init_lock = LWLockAssign();
251+
edit_partitions_lock = LWLockAssign();
251252

252253
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
253254

contrib/pg_pathman/pl_funcs.c

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ PG_FUNCTION_INFO_V1( on_partitions_removed );
1717
PG_FUNCTION_INFO_V1( find_or_create_range_partition);
1818
PG_FUNCTION_INFO_V1( get_range_by_idx );
1919
PG_FUNCTION_INFO_V1( get_partition_range );
20+
PG_FUNCTION_INFO_V1( acquire_partitions_lock );
21+
PG_FUNCTION_INFO_V1( release_partitions_lock );
22+
PG_FUNCTION_INFO_V1( get_min_range_value );
23+
PG_FUNCTION_INFO_V1( get_max_range_value );
2024

2125
/*
2226
* Callbacks
@@ -249,3 +253,66 @@ get_range_by_idx(PG_FUNCTION_ARGS)
249253
construct_array(elems, 2, prel->atttype,
250254
tce->typlen, tce->typbyval, tce->typalign));
251255
}
256+
257+
/*
258+
* Returns min value of the first range for relation
259+
*/
260+
Datum
261+
get_min_range_value(PG_FUNCTION_ARGS)
262+
{
263+
int parent_oid = DatumGetInt32(PG_GETARG_DATUM(0));
264+
PartRelationInfo *prel;
265+
RangeRelation *rangerel;
266+
RangeEntry *ranges;
267+
268+
prel = get_pathman_relation_info(parent_oid, NULL);
269+
rangerel = get_pathman_range_relation(parent_oid, NULL);
270+
271+
if (!prel || !rangerel || prel->parttype != PT_RANGE || rangerel->ranges.length == 0)
272+
PG_RETURN_NULL();
273+
274+
ranges = dsm_array_get_pointer(&rangerel->ranges);
275+
PG_RETURN_DATUM(ranges[0].min);
276+
}
277+
278+
/*
279+
* Returns max value of the last range for relation
280+
*/
281+
Datum
282+
get_max_range_value(PG_FUNCTION_ARGS)
283+
{
284+
int parent_oid = DatumGetInt32(PG_GETARG_DATUM(0));
285+
PartRelationInfo *prel;
286+
RangeRelation *rangerel;
287+
RangeEntry *ranges;
288+
289+
prel = get_pathman_relation_info(parent_oid, NULL);
290+
rangerel = get_pathman_range_relation(parent_oid, NULL);
291+
292+
if (!prel || !rangerel || prel->parttype != PT_RANGE || rangerel->ranges.length == 0)
293+
PG_RETURN_NULL();
294+
295+
ranges = dsm_array_get_pointer(&rangerel->ranges);
296+
PG_RETURN_DATUM(ranges[rangerel->ranges.length-1].max);
297+
}
298+
299+
/*
300+
* Acquire partitions lock
301+
*/
302+
Datum
303+
acquire_partitions_lock(PG_FUNCTION_ARGS)
304+
{
305+
// int relid = DatumGetInt32(PG_GETARG_DATUM(0));
306+
// LockRelationOid(relid, AccessExclusiveLock);
307+
LWLockAcquire(edit_partitions_lock, LW_EXCLUSIVE);
308+
PG_RETURN_NULL();
309+
}
310+
311+
Datum
312+
release_partitions_lock(PG_FUNCTION_ARGS)
313+
{
314+
// int relid = DatumGetInt32(PG_GETARG_DATUM(0));
315+
// UnlockRelationOid(relid, AccessExclusiveLock);
316+
LWLockRelease(edit_partitions_lock);
317+
PG_RETURN_NULL();
318+
}

contrib/pg_pathman/range.sql

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -663,12 +663,25 @@ BEGIN
663663
FROM @extschema@.pathman_config WHERE relname = p_relation;
664664

665665
v_atttype := @extschema@.get_attribute_type_name(p_relation, v_attname);
666+
667+
/* Prevent concurrent partition creation */
668+
PERFORM @extschema@.acquire_partitions_lock();
666669

667670
EXECUTE format('SELECT @extschema@.append_partition_internal($1, $2, $3, ARRAY[]::%s[])', v_atttype)
668671
INTO v_part_name
669672
USING p_relation, v_atttype, v_interval;
670673

674+
/* Release lock */
675+
PERFORM @extschema@.release_partitions_lock();
676+
677+
/* Tell backend to reload configuration */
678+
PERFORM @extschema@.on_create_partitions(p_relation::regclass::oid);
679+
RAISE NOTICE 'Done!';
671680
RETURN v_part_name;
681+
682+
EXCEPTION WHEN others THEN
683+
PERFORM @extschema@.release_partitions_lock();
684+
RAISE EXCEPTION '% %', SQLERRM, SQLSTATE;
672685
END
673686
$$
674687
LANGUAGE plpgsql;
@@ -696,9 +709,6 @@ BEGIN
696709
INTO v_part_name;
697710
END IF;
698711

699-
/* Tell backend to reload configuration */
700-
PERFORM @extschema@.on_create_partitions(p_relation::regclass::oid);
701-
RAISE NOTICE 'Done!';
702712
RETURN v_part_name;
703713
END
704714
$$
@@ -723,11 +733,24 @@ BEGIN
723733
FROM @extschema@.pathman_config WHERE relname = p_relation;
724734
v_atttype := @extschema@.get_attribute_type_name(p_relation, v_attname);
725735

736+
/* Prevent concurrent partition creation */
737+
PERFORM @extschema@.acquire_partitions_lock();
738+
726739
EXECUTE format('SELECT @extschema@.prepend_partition_internal($1, $2, $3, ARRAY[]::%s[])', v_atttype)
727740
INTO v_part_name
728741
USING p_relation, v_atttype, v_interval;
729742

743+
/* Release lock */
744+
PERFORM @extschema@.release_partitions_lock();
745+
746+
/* Tell backend to reload configuration */
747+
PERFORM @extschema@.on_create_partitions(p_relation::regclass::oid);
748+
RAISE NOTICE 'Done!';
730749
RETURN v_part_name;
750+
751+
EXCEPTION WHEN others THEN
752+
PERFORM @extschema@.release_partitions_lock();
753+
RAISE EXCEPTION '% %', SQLERRM, SQLSTATE;
731754
END
732755
$$
733756
LANGUAGE plpgsql;
@@ -759,10 +782,6 @@ BEGIN
759782
INTO v_part_name;
760783
END IF;
761784

762-
/* Tell backend to reload configuration */
763-
PERFORM @extschema@.on_create_partitions(p_relation::regclass::oid);
764-
RAISE NOTICE 'Done!';
765-
766785
RETURN v_part_name;
767786
END
768787
$$
@@ -933,8 +952,6 @@ $$ LANGUAGE plpgsql;
933952
*/
934953
CREATE OR REPLACE FUNCTION @extschema@.append_partitions_on_demand_internal(
935954
p_relid OID
936-
, p_min ANYELEMENT
937-
, p_max ANYELEMENT
938955
, p_new_value ANYELEMENT)
939956
RETURNS OID AS
940957
$$
@@ -945,6 +962,8 @@ DECLARE
945962
v_part TEXT;
946963
v_interval TEXT;
947964
v_attname TEXT;
965+
v_min p_new_value%TYPE;
966+
v_max p_new_value%TYPE;
948967
v_cur_value p_new_value%TYPE;
949968
v_next_value p_new_value%TYPE;
950969
v_is_date BOOLEAN;
@@ -955,16 +974,21 @@ BEGIN
955974
SELECT attname, range_interval INTO v_attname, v_interval
956975
FROM @extschema@.pathman_config WHERE relname = v_relation;
957976

977+
v_min := @extschema@.get_min_range_value(p_relid::regclass::oid, p_new_value);
978+
v_max := @extschema@.get_max_range_value(p_relid::regclass::oid, p_new_value);
979+
958980
v_is_date := @extschema@.is_date(pg_typeof(p_new_value)::regtype);
959981

960-
IF p_new_value >= p_max THEN
961-
v_cur_value := p_max;
982+
IF p_new_value >= v_max THEN
983+
v_cur_value := v_max;
962984
WHILE v_cur_value <= p_new_value AND i < 1000
963985
LOOP
964986
IF v_is_date THEN
965987
v_next_value := v_cur_value + v_interval::interval;
966988
ELSE
967-
v_next_value := v_cur_value + v_interval;
989+
EXECUTE format('SELECT $1 + $2::%s', pg_typeof(p_new_value))
990+
USING v_cur_value, v_interval
991+
INTO v_next_value;
968992
END IF;
969993

970994
v_part := @extschema@.create_single_range_partition(
@@ -975,14 +999,16 @@ BEGIN
975999
v_cur_value := v_next_value;
9761000
RAISE NOTICE 'partition % created', v_part;
9771001
END LOOP;
978-
ELSIF p_new_value <= p_min THEN
979-
v_cur_value := p_min;
1002+
ELSIF p_new_value <= v_min THEN
1003+
v_cur_value := v_min;
9801004
WHILE v_cur_value >= p_new_value AND i < 1000
9811005
LOOP
9821006
IF v_is_date THEN
9831007
v_next_value := v_cur_value - v_interval::interval;
9841008
ELSE
985-
v_next_value := v_cur_value - v_interval;
1009+
EXECUTE format('SELECT $1 - $2::%s', pg_typeof(p_new_value))
1010+
USING v_cur_value, v_interval
1011+
INTO v_next_value;
9861012
END IF;
9871013

9881014
v_part := @extschema@.create_single_range_partition(

contrib/pg_pathman/worker.c

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,9 @@ create_partitions(Oid relid, Datum value, Oid value_type)
141141
{
142142
int ret;
143143
RangeEntry *ranges;
144-
Datum vals[4];
145-
Oid oids[] = {OIDOID, value_type, value_type, value_type};
146-
bool nulls[] = {false, false, false, false};
147-
RangeEntry *re;
148-
int cmp_upper;
149-
int cmp_lower;
144+
Datum vals[2];
145+
Oid oids[] = {OIDOID, value_type};
146+
bool nulls[] = {false, false};
150147
char *sql;
151148
bool found;
152149
int pos;
@@ -167,24 +164,16 @@ create_partitions(Oid relid, Datum value, Oid value_type)
167164
/* Comparison function */
168165
cmp_func = *get_cmp_func(value_type, prel->atttype);
169166

170-
/* Determine nearest range partition */
171-
re = &ranges[rangerel->ranges.length-1];
172-
cmp_upper = FunctionCall2(&cmp_func, value, ranges[rangerel->ranges.length-1].max);
173-
cmp_lower = FunctionCall2(&cmp_func, value, ranges[0].min);
174-
if (cmp_upper > 0)
175-
re = &ranges[rangerel->ranges.length-1];
176-
else if (cmp_lower < 0)
177-
re = &ranges[0];
178-
179167
vals[0] = ObjectIdGetDatum(relid);
180-
vals[1] = re->min;
181-
vals[2] = re->max;
182-
vals[3] = value;
168+
vals[1] = value;
169+
170+
/* Restrict concurrent partition creation */
171+
LWLockAcquire(edit_partitions_lock, LW_EXCLUSIVE);
183172

184173
/* Perform PL procedure */
185-
sql = psprintf("SELECT %s.append_partitions_on_demand_internal($1, $2, $3, $4)",
174+
sql = psprintf("SELECT %s.append_partitions_on_demand_internal($1, $2)",
186175
schema);
187-
ret = SPI_execute_with_args(sql, 4, oids, vals, nulls, false, 0);
176+
ret = SPI_execute_with_args(sql, 2, oids, vals, nulls, false, 0);
188177
if (ret > 0)
189178
{
190179
/* Update relation info */
@@ -195,6 +184,9 @@ create_partitions(Oid relid, Datum value, Oid value_type)
195184
else
196185
elog(WARNING, "Attempt to create new partitions failed");
197186

187+
/* Release lock */
188+
LWLockRelease(edit_partitions_lock);
189+
198190
/* Repeat binary search */
199191
ranges = dsm_array_get_pointer(&rangerel->ranges);
200192
pos = range_binary_search(rangerel, &cmp_func, value, &found);

0 commit comments

Comments
 (0)