Skip to content

Commit c9f9831

Browse files
committed
fix partition creation callback invocation
1 parent b94d661 commit c9f9831

File tree

6 files changed

+62
-85
lines changed

6 files changed

+62
-85
lines changed

expected/pathman_callbacks.out

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,11 @@
22
CREATE EXTENSION pg_pathman;
33
CREATE SCHEMA callbacks;
44
/* Check callbacks */
5-
CREATE TABLE callbacks.log(id serial, message text);
6-
CREATE OR REPLACE FUNCTION callbacks.abc_on_part_created_range_callback(
5+
CREATE OR REPLACE FUNCTION callbacks.abc_on_part_created_callback(
76
args JSONB)
87
RETURNS VOID AS $$
9-
DECLARE
10-
start_value TEXT := args->>'start';
11-
end_value TEXT := args->'end';
128
BEGIN
13-
INSERT INTO callbacks.log(message)
14-
VALUES (start_value || '-' || end_value);
15-
END
16-
$$ language plpgsql;
17-
CREATE OR REPLACE FUNCTION callbacks.abc_on_part_created_hash_callback(
18-
args JSONB)
19-
RETURNS VOID AS $$
20-
BEGIN
21-
RAISE WARNING 'callback: partition %', args->'partition';
9+
RAISE WARNING 'callback arg: %', args::TEXT;
2210
END
2311
$$ language plpgsql;
2412
/* set callback to be called on RANGE partitions */
@@ -31,7 +19,7 @@ NOTICE: sequence "abc_seq" does not exist, skipping
3119
(1 row)
3220

3321
SELECT set_part_init_callback('callbacks.abc',
34-
'callbacks.abc_on_part_created_range_callback');
22+
'callbacks.abc_on_part_created_callback');
3523
set_part_init_callback
3624
------------------------
3725

@@ -40,32 +28,26 @@ SELECT set_part_init_callback('callbacks.abc',
4028
INSERT INTO callbacks.abc VALUES (123, 1);
4129
INSERT INTO callbacks.abc VALUES (223, 1);
4230
SELECT append_range_partition('callbacks.abc');
31+
WARNING: callback arg: {"parent": "abc", "parttype": "2", "partition": "abc_4", "range_max": "401", "range_min": "301"}
4332
append_range_partition
4433
------------------------
4534
callbacks.abc_4
4635
(1 row)
4736

4837
SELECT prepend_range_partition('callbacks.abc');
38+
WARNING: callback arg: {"parent": "abc", "parttype": "2", "partition": "abc_5", "range_max": "1", "range_min": "-99"}
4939
prepend_range_partition
5040
-------------------------
5141
callbacks.abc_5
5242
(1 row)
5343

5444
SELECT add_range_partition('callbacks.abc', 401, 502);
45+
WARNING: callback arg: {"parent": "abc", "parttype": "2", "partition": "abc_6", "range_max": "502", "range_min": "401"}
5546
add_range_partition
5647
---------------------
5748
callbacks.abc_6
5849
(1 row)
5950

60-
SELECT message FROM callbacks.log ORDER BY id;
61-
message
62-
-----------
63-
201-"301"
64-
301-"401"
65-
-99-"1"
66-
401-"502"
67-
(4 rows)
68-
6951
SELECT drop_partitions('callbacks.abc');
7052
NOTICE: function callbacks.abc_upd_trig_func() does not exist, skipping
7153
NOTICE: 0 rows copied from callbacks.abc_1
@@ -81,23 +63,23 @@ NOTICE: 0 rows copied from callbacks.abc_6
8163

8264
/* set callback to be called on HASH partitions */
8365
SELECT set_part_init_callback('callbacks.abc',
84-
'callbacks.abc_on_part_created_hash_callback');
66+
'callbacks.abc_on_part_created_callback');
8567
set_part_init_callback
8668
------------------------
8769

8870
(1 row)
8971

9072
SELECT create_hash_partitions('callbacks.abc', 'a', 5);
91-
WARNING: callback: partition "abc_0"
92-
WARNING: callback: partition "abc_1"
93-
WARNING: callback: partition "abc_2"
94-
WARNING: callback: partition "abc_3"
95-
WARNING: callback: partition "abc_4"
73+
WARNING: callback arg: {"parent": "abc", "parttype": "1", "partition": "abc_0"}
74+
WARNING: callback arg: {"parent": "abc", "parttype": "1", "partition": "abc_1"}
75+
WARNING: callback arg: {"parent": "abc", "parttype": "1", "partition": "abc_2"}
76+
WARNING: callback arg: {"parent": "abc", "parttype": "1", "partition": "abc_3"}
77+
WARNING: callback arg: {"parent": "abc", "parttype": "1", "partition": "abc_4"}
9678
create_hash_partitions
9779
------------------------
9880
5
9981
(1 row)
10082

10183
DROP SCHEMA callbacks CASCADE;
102-
NOTICE: drop cascades to 10 other objects
84+
NOTICE: drop cascades to 8 other objects
10385
DROP EXTENSION pg_pathman CASCADE;

sql/pathman_callbacks.sql

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,12 @@ CREATE EXTENSION pg_pathman;
44
CREATE SCHEMA callbacks;
55

66
/* Check callbacks */
7-
CREATE TABLE callbacks.log(id serial, message text);
87

9-
CREATE OR REPLACE FUNCTION callbacks.abc_on_part_created_range_callback(
8+
CREATE OR REPLACE FUNCTION callbacks.abc_on_part_created_callback(
109
args JSONB)
1110
RETURNS VOID AS $$
12-
DECLARE
13-
start_value TEXT := args->>'start';
14-
end_value TEXT := args->'end';
1511
BEGIN
16-
INSERT INTO callbacks.log(message)
17-
VALUES (start_value || '-' || end_value);
18-
END
19-
$$ language plpgsql;
20-
21-
22-
CREATE OR REPLACE FUNCTION callbacks.abc_on_part_created_hash_callback(
23-
args JSONB)
24-
RETURNS VOID AS $$
25-
BEGIN
26-
RAISE WARNING 'callback: partition %', args->'partition';
12+
RAISE WARNING 'callback arg: %', args::TEXT;
2713
END
2814
$$ language plpgsql;
2915

@@ -33,7 +19,7 @@ CREATE TABLE callbacks.abc(a serial, b int);
3319
SELECT create_range_partitions('callbacks.abc', 'a', 1, 100, 2);
3420

3521
SELECT set_part_init_callback('callbacks.abc',
36-
'callbacks.abc_on_part_created_range_callback');
22+
'callbacks.abc_on_part_created_callback');
3723

3824
INSERT INTO callbacks.abc VALUES (123, 1);
3925
INSERT INTO callbacks.abc VALUES (223, 1);
@@ -42,14 +28,12 @@ SELECT append_range_partition('callbacks.abc');
4228
SELECT prepend_range_partition('callbacks.abc');
4329
SELECT add_range_partition('callbacks.abc', 401, 502);
4430

45-
SELECT message FROM callbacks.log ORDER BY id;
46-
4731
SELECT drop_partitions('callbacks.abc');
4832

4933

5034
/* set callback to be called on HASH partitions */
5135
SELECT set_part_init_callback('callbacks.abc',
52-
'callbacks.abc_on_part_created_hash_callback');
36+
'callbacks.abc_on_part_created_callback');
5337
SELECT create_hash_partitions('callbacks.abc', 'a', 5);
5438

5539

src/pathman_workers.c

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ bgw_main_concurrent_part(Datum main_arg)
488488
failures_count++;
489489
ereport(LOG,
490490
(errmsg("%s: %s", concurrent_part_bgw, error->message),
491-
errdetail("Attempt: %d/%d, sleep time: %s",
491+
errdetail("attempt: %d/%d, sleep time: %s",
492492
failures_count,
493493
PART_WORKER_MAX_ATTEMPTS,
494494
sleep_time_str)));
@@ -507,9 +507,9 @@ bgw_main_concurrent_part(Datum main_arg)
507507
cps_set_status(part_slot, CPS_FREE);
508508

509509
elog(LOG,
510-
"Concurrent partitioning worker has canceled the task because "
511-
"maximum amount of attempts (%d) had been exceeded. "
512-
"See the error message below",
510+
"concurrent partitioning worker has canceled the task because "
511+
"maximum amount of attempts (%d) had been exceeded, "
512+
"see the error message below",
513513
PART_WORKER_MAX_ATTEMPTS);
514514

515515
return; /* exit quickly */
@@ -573,11 +573,9 @@ bgw_main_concurrent_part(Datum main_arg)
573573
Datum
574574
partition_table_concurrently(PG_FUNCTION_ARGS)
575575
{
576-
#define tostr(str) ( #str ) /* convert function's name to literal */
577-
578576
Oid relid = PG_GETARG_OID(0);
579-
int empty_slot_idx = -1; /* do we have a slot for BGWorker? */
580-
int i;
577+
int empty_slot_idx = -1, /* do we have a slot for BGWorker? */
578+
i;
581579

582580
/* Check if relation is a partitioned table */
583581
shout_if_prel_is_invalid(relid,
@@ -617,7 +615,7 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
617615
SpinLockRelease(&concurrent_part_slots[empty_slot_idx].mutex);
618616

619617
elog(ERROR,
620-
"Table \"%s\" is already being partitioned",
618+
"table \"%s\" is already being partitioned",
621619
get_rel_name(relid));
622620
}
623621

@@ -628,7 +626,7 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
628626

629627
/* Looks like we could not find an empty slot */
630628
if (empty_slot_idx < 0)
631-
elog(ERROR, "No empty worker slots found");
629+
elog(ERROR, "no empty worker slots found");
632630
else
633631
{
634632
/* Initialize concurrent part slot */
@@ -648,9 +646,9 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
648646

649647
/* Tell user everything's fine */
650648
elog(NOTICE,
651-
"Worker started. You can stop it "
649+
"worker started, you can stop it "
652650
"with the following command: select %s('%s');",
653-
tostr(stop_concurrent_part_task), /* convert function's name to literal */
651+
CppAsString(stop_concurrent_part_task),
654652
get_rel_name(relid));
655653

656654
PG_RETURN_VOID();
@@ -785,7 +783,7 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
785783
cur_slot->relid == relid &&
786784
cur_slot->dbid == MyDatabaseId)
787785
{
788-
elog(NOTICE, "Worker will stop after it finishes current batch");
786+
elog(NOTICE, "worker will stop after it finishes current batch");
789787

790788
/* Change worker's state & set 'worker_found' */
791789
cur_slot->worker_status = CPS_STOPPING;
@@ -800,7 +798,7 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
800798
PG_RETURN_BOOL(true);
801799
else
802800
{
803-
elog(ERROR, "Cannot find worker for relation \"%s\"",
801+
elog(ERROR, "cannot find worker for relation \"%s\"",
804802
get_rel_name_or_relid(relid));
805803

806804
PG_RETURN_BOOL(false); /* keep compiler happy */

src/pl_funcs.c

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -769,16 +769,25 @@ invoke_on_partition_created_callback(PG_FUNCTION_ARGS)
769769
if (PG_ARGISNULL(ARG_CHILD))
770770
elog(ERROR, "partition should not be null");
771771

772-
/* Both RANGE_START & RANGE_END are not available (HASH) */
773-
if (PG_ARGISNULL(ARG_RANGE_START) && PG_ARGISNULL(ARG_RANGE_START))
774-
part_type = PT_HASH;
772+
switch (PG_NARGS())
773+
{
774+
case 3:
775+
part_type = PT_HASH;
776+
break;
777+
778+
case 5:
779+
{
780+
if (PG_ARGISNULL(ARG_RANGE_START) || PG_ARGISNULL(ARG_RANGE_START))
781+
elog(ERROR, "both bounds must be provided for RANGE partition");
775782

776-
/* Either RANGE_START or RANGE_END is missing */
777-
else if (PG_ARGISNULL(ARG_RANGE_START) || PG_ARGISNULL(ARG_RANGE_START))
778-
elog(ERROR, "both boundaries must be provided for RANGE partition");
783+
part_type = PT_RANGE;
784+
}
785+
break;
779786

780-
/* Both RANGE_START & RANGE_END are provided */
781-
else part_type = PT_RANGE;
787+
default:
788+
elog(ERROR, "error in function \"%s\"",
789+
CppAsString(invoke_on_partition_created_callback));
790+
}
782791

783792
/* Build JSONB according to partitioning type */
784793
switch (part_type)
@@ -791,8 +800,8 @@ invoke_on_partition_created_callback(PG_FUNCTION_ARGS)
791800
JSB_INIT_VAL(&val, WJB_VALUE, get_rel_name_or_relid(parent_oid));
792801
JSB_INIT_VAL(&key, WJB_KEY, "partition");
793802
JSB_INIT_VAL(&val, WJB_VALUE, get_rel_name_or_relid(partition_oid));
794-
JSB_INIT_VAL(&key, WJB_KEY, "part_type");
795-
JSB_INIT_VAL(&val, WJB_VALUE, "HASH");
803+
JSB_INIT_VAL(&key, WJB_KEY, "parttype");
804+
JSB_INIT_VAL(&val, WJB_VALUE, PartTypeToCString(PT_HASH));
796805

797806
result = pushJsonbValue(&jsonb_state, WJB_END_OBJECT, NULL);
798807
}
@@ -814,11 +823,11 @@ invoke_on_partition_created_callback(PG_FUNCTION_ARGS)
814823
JSB_INIT_VAL(&val, WJB_VALUE, get_rel_name_or_relid(parent_oid));
815824
JSB_INIT_VAL(&key, WJB_KEY, "partition");
816825
JSB_INIT_VAL(&val, WJB_VALUE, get_rel_name_or_relid(partition_oid));
817-
JSB_INIT_VAL(&key, WJB_KEY, "part_type");
818-
JSB_INIT_VAL(&val, WJB_VALUE, "RANGE");
819-
JSB_INIT_VAL(&key, WJB_KEY, "start");
826+
JSB_INIT_VAL(&key, WJB_KEY, "parttype");
827+
JSB_INIT_VAL(&val, WJB_VALUE, PartTypeToCString(PT_RANGE));
828+
JSB_INIT_VAL(&key, WJB_KEY, "range_min");
820829
JSB_INIT_VAL(&val, WJB_VALUE, start_value);
821-
JSB_INIT_VAL(&key, WJB_KEY, "end");
830+
JSB_INIT_VAL(&key, WJB_KEY, "range_max");
822831
JSB_INIT_VAL(&val, WJB_VALUE, end_value);
823832

824833
result = pushJsonbValue(&jsonb_state, WJB_END_OBJECT, NULL);

src/relation_info.c

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -627,19 +627,23 @@ DatumGetPartType(Datum datum)
627627
return (PartType) val;
628628
}
629629

630-
Datum
631-
PartTypeGetTextDatum(PartType parttype)
630+
char *
631+
PartTypeToCString(PartType parttype)
632632
{
633-
switch(parttype)
633+
static char *hash_str = "1",
634+
*range_str = "2";
635+
636+
switch (parttype)
634637
{
635638
case PT_HASH:
636-
return CStringGetTextDatum("HASH");
639+
return hash_str;
637640

638641
case PT_RANGE:
639-
return CStringGetTextDatum("RANGE");
642+
return range_str;
640643

641644
default:
642645
elog(ERROR, "Unknown partitioning type %u", parttype);
646+
return NULL; /* keep compiler happy */
643647
}
644648
}
645649

src/relation_info.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ Oid forget_parent_of_partition(Oid partition, PartParentSearch *status);
137137
Oid get_parent_of_partition(Oid partition, PartParentSearch *status);
138138

139139
PartType DatumGetPartType(Datum datum);
140-
Datum PartTypeGetTextDatum(PartType parttype);
140+
char * PartTypeToCString(PartType parttype);
141141

142142
void shout_if_prel_is_invalid(Oid parent_oid,
143143
const PartRelationInfo *prel,

0 commit comments

Comments
 (0)