Skip to content

Commit 4810082

Browse files
committed
clean init.sql up, improve function partition_table_concurrently()
1 parent 477b6d6 commit 4810082

File tree

4 files changed

+46
-26
lines changed

4 files changed

+46
-26
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,11 @@ Performs RANGE-partitioning from specified range for `relation` by partitioning
124124
### Data migration
125125

126126
```plpgsql
127-
partition_table_concurrently(relation REGCLASS)
127+
partition_table_concurrently(relation REGCLASS,
128+
batch_size INTEGER DEFAULT 1000,
129+
sleep_time FLOAT8 DEFAULT 1.0)
128130
```
129-
Starts a background worker to move data from parent table to partitions. The worker utilizes short transactions to copy small batches of data (up to 10K rows per transaction) and thus doesn't significantly interfere with user's activity.
131+
Starts a background worker to move data from parent table to partitions. The worker utilizes short transactions to copy small batches of data (up to 10K rows per transaction) and thus doesn't significantly interfere with user's activity. If the worker is unable to lock rows of a batch, it sleeps for `sleep_time` seconds up to 60 times before the next attempt, and quits if it's still unable to lock the batch.
130132

131133
```plpgsql
132134
stop_concurrent_part_task(relation REGCLASS)

expected/pathman_basic.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1409,7 +1409,7 @@ CREATE TABLE replies(id SERIAL PRIMARY KEY, message_id INTEGER REFERENCES messag
14091409
INSERT INTO messages SELECT g, md5(g::text) FROM generate_series(1, 10) as g;
14101410
INSERT INTO replies SELECT g, g, md5(g::text) FROM generate_series(1, 10) as g;
14111411
SELECT create_range_partitions('messages', 'id', 1, 100, 2);
1412-
WARNING: foreign key 'replies_message_id_fkey' references relation 'messages'
1412+
WARNING: foreign key "replies_message_id_fkey" references relation "messages"
14131413
ERROR: relation "messages" is referenced from other relations
14141414
ALTER TABLE replies DROP CONSTRAINT replies_message_id_fkey;
14151415
SELECT create_range_partitions('messages', 'id', 1, 100, 2);

init.sql

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -225,21 +225,27 @@ GRANT SELECT ON @extschema@.pathman_concurrent_part_tasks TO PUBLIC;
225225
/*
226226
* Partition table using ConcurrentPartWorker.
227227
*/
228-
CREATE OR REPLACE FUNCTION @extschema@.partition_table_concurrently(relation regclass)
229-
RETURNS VOID AS 'pg_pathman', 'partition_table_concurrently' LANGUAGE C STRICT;
228+
CREATE OR REPLACE FUNCTION @extschema@.partition_table_concurrently(
229+
relation REGCLASS,
230+
batch_size INTEGER DEFAULT 1000,
231+
sleep_time FLOAT8 DEFAULT 1.0)
232+
RETURNS VOID AS 'pg_pathman', 'partition_table_concurrently'
233+
LANGUAGE C STRICT;
230234

231235
/*
232236
* Stop concurrent partitioning task.
233237
*/
234-
CREATE OR REPLACE FUNCTION @extschema@.stop_concurrent_part_task(relation regclass)
235-
RETURNS BOOL AS 'pg_pathman', 'stop_concurrent_part_task' LANGUAGE C STRICT;
238+
CREATE OR REPLACE FUNCTION @extschema@.stop_concurrent_part_task(
239+
relation REGCLASS)
240+
RETURNS BOOL AS 'pg_pathman', 'stop_concurrent_part_task'
241+
LANGUAGE C STRICT;
236242

237243

238244
/*
239245
* Copy rows to partitions concurrently.
240246
*/
241247
CREATE OR REPLACE FUNCTION @extschema@._partition_data_concurrent(
242-
p_relation REGCLASS,
248+
relation REGCLASS,
243249
p_min ANYELEMENT DEFAULT NULL::text,
244250
p_max ANYELEMENT DEFAULT NULL::text,
245251
p_limit INT DEFAULT NULL,
@@ -254,7 +260,7 @@ DECLARE
254260

255261
BEGIN
256262
SELECT attname INTO v_attr
257-
FROM @extschema@.pathman_config WHERE partrel = p_relation;
263+
FROM @extschema@.pathman_config WHERE partrel = relation;
258264

259265
p_total := 0;
260266

@@ -282,15 +288,15 @@ BEGIN
282288
/* Lock rows and copy data */
283289
RAISE NOTICE 'Copying data to partitions...';
284290
EXECUTE format('SELECT array(SELECT ctid FROM ONLY %1$s %2$s %3$s FOR UPDATE NOWAIT)',
285-
p_relation, v_where_clause, v_limit_clause)
291+
relation, v_where_clause, v_limit_clause)
286292
USING p_min, p_max
287293
INTO ctids;
288294

289295
EXECUTE format('
290296
WITH data AS (
291297
DELETE FROM ONLY %1$s WHERE ctid = ANY($1) RETURNING *)
292298
INSERT INTO %1$s SELECT * FROM data',
293-
p_relation)
299+
relation)
294300
USING ctids;
295301

296302
/* Get number of inserted rows */
@@ -376,7 +382,7 @@ LANGUAGE plpgsql;
376382
* Suitable for every partitioning type.
377383
*/
378384
CREATE OR REPLACE FUNCTION @extschema@.common_relation_checks(
379-
p_relation REGCLASS,
385+
relation REGCLASS,
380386
p_attribute TEXT)
381387
RETURNS BOOLEAN AS
382388
$$
@@ -388,33 +394,33 @@ DECLARE
388394
BEGIN
389395
/* Ignore temporary tables */
390396
SELECT relpersistence FROM pg_catalog.pg_class
391-
WHERE oid = p_relation INTO rel_persistence;
397+
WHERE oid = relation INTO rel_persistence;
392398

393399
IF rel_persistence = 't'::CHAR THEN
394400
RAISE EXCEPTION 'temporary table "%" cannot be partitioned',
395-
p_relation::TEXT;
401+
relation::TEXT;
396402
END IF;
397403

398404
IF EXISTS (SELECT * FROM @extschema@.pathman_config
399-
WHERE partrel = p_relation) THEN
400-
RAISE EXCEPTION 'relation "%" has already been partitioned', p_relation;
405+
WHERE partrel = relation) THEN
406+
RAISE EXCEPTION 'relation "%" has already been partitioned', relation;
401407
END IF;
402408

403-
IF @extschema@.is_attribute_nullable(p_relation, p_attribute) THEN
409+
IF @extschema@.is_attribute_nullable(relation, p_attribute) THEN
404410
RAISE EXCEPTION 'partitioning key ''%'' must be NOT NULL', p_attribute;
405411
END IF;
406412

407413
/* Check if there are foreign keys that reference the relation */
408-
FOR v_rec IN (SELECT *
409-
FROM pg_constraint WHERE confrelid = p_relation::regclass::oid)
414+
FOR v_rec IN (SELECT * FROM pg_catalog.pg_constraint
415+
WHERE confrelid = relation::REGCLASS::OID)
410416
LOOP
411417
is_referenced := TRUE;
412-
RAISE WARNING 'foreign key ''%'' references relation ''%''',
413-
v_rec.conname, p_relation;
418+
RAISE WARNING 'foreign key "%" references relation "%"',
419+
v_rec.conname, relation;
414420
END LOOP;
415421

416422
IF is_referenced THEN
417-
RAISE EXCEPTION 'relation "%" is referenced from other relations', p_relation;
423+
RAISE EXCEPTION 'relation "%" is referenced from other relations', relation;
418424
END IF;
419425

420426
RETURN TRUE;
@@ -627,7 +633,7 @@ BEGIN
627633
LOOP
628634
EXECUTE format('ALTER TABLE %s ADD %s',
629635
partition::TEXT,
630-
pg_get_constraintdef(rec.conid));
636+
pg_catalog.pg_get_constraintdef(rec.conid));
631637
END LOOP;
632638
END
633639
$$ LANGUAGE plpgsql STRICT;
@@ -743,7 +749,8 @@ RETURNS BOOLEAN AS 'pg_pathman', 'add_to_pathman_config'
743749
LANGUAGE C;
744750

745751
CREATE OR REPLACE FUNCTION @extschema@.invalidate_relcache(relid OID)
746-
RETURNS VOID AS 'pg_pathman' LANGUAGE C STRICT;
752+
RETURNS VOID AS 'pg_pathman'
753+
LANGUAGE C STRICT;
747754

748755

749756
/*

src/pathman_workers.c

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,9 +574,20 @@ Datum
574574
partition_table_concurrently(PG_FUNCTION_ARGS)
575575
{
576576
Oid relid = PG_GETARG_OID(0);
577+
int32 batch_size = PG_GETARG_INT32(1);
578+
float8 sleep_time = PG_GETARG_FLOAT8(2);
577579
int empty_slot_idx = -1, /* do we have a slot for BGWorker? */
578580
i;
579581

582+
/* Check batch_size */
583+
if (batch_size < 1 || batch_size > 10000)
584+
elog(ERROR, "\"batch_size\" should not be less than 1 "
585+
"or greater than 10000");
586+
587+
/* Check sleep_time */
588+
if (sleep_time < 0.5)
589+
elog(ERROR, "\"sleep_time\" should not be less than 0.5");
590+
580591
/* Check if relation is a partitioned table */
581592
shout_if_prel_is_invalid(relid,
582593
/* We also lock the parent relation */
@@ -631,8 +642,8 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
631642
{
632643
/* Initialize concurrent part slot */
633644
InitConcurrentPartSlot(&concurrent_part_slots[empty_slot_idx],
634-
GetUserId(), CPS_WORKING,
635-
MyDatabaseId, relid, 1000, 1.0);
645+
GetUserId(), CPS_WORKING, MyDatabaseId,
646+
relid, batch_size, sleep_time);
636647

637648
/* Now we can safely unlock slot for new BGWorker */
638649
SpinLockRelease(&concurrent_part_slots[empty_slot_idx].mutex);

0 commit comments

Comments
 (0)