Skip to content

Commit 9555f74

Browse files
committed
refactoring, new log messages (DEBUG2), fix function find_inheritance_children_array()
1 parent c2a77f5 commit 9555f74

File tree

6 files changed

+82
-37
lines changed

6 files changed

+82
-37
lines changed

src/init.c

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -486,25 +486,28 @@ find_inheritance_children_array(Oid parentrelId,
486486
SysScanDesc scan;
487487
ScanKeyData key[1];
488488
HeapTuple inheritsTuple;
489-
Oid inhrelid;
489+
490490
Oid *oidarr;
491491
uint32 maxoids,
492-
numoids,
493-
i;
492+
numoids;
493+
494+
Oid *result = NULL;
495+
uint32 nresult = 0;
496+
497+
uint32 i;
498+
499+
Assert(lockmode != NoLock);
500+
501+
/* Init safe return values */
502+
*children_size = 0;
503+
*children = NULL;
494504

495505
/*
496-
* Can skip the scan if pg_class shows the relation has never had a
497-
* subclass.
506+
* Can skip the scan if pg_class shows the
507+
* relation has never had a subclass.
498508
*/
499509
if (!has_subclass(parentrelId))
500-
{
501-
/* Init return values */
502-
*children_size = 0;
503-
children = NULL;
504-
505-
/* Ok, could not find any children */
506510
return FCS_NO_CHILDREN;
507-
}
508511

509512
/*
510513
* Scan pg_inherits and build a working array of subclass OIDs.
@@ -525,6 +528,8 @@ find_inheritance_children_array(Oid parentrelId,
525528

526529
while ((inheritsTuple = systable_getnext(scan)) != NULL)
527530
{
531+
Oid inhrelid;
532+
528533
inhrelid = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhrelid;
529534
if (numoids >= maxoids)
530535
{
@@ -547,12 +552,10 @@ find_inheritance_children_array(Oid parentrelId,
547552
if (numoids > 1)
548553
qsort(oidarr, numoids, sizeof(Oid), oid_cmp);
549554

550-
/*
551-
* Acquire locks and build the result list.
552-
*/
555+
/* Acquire locks and build the result list */
553556
for (i = 0; i < numoids; i++)
554557
{
555-
inhrelid = oidarr[i];
558+
Oid inhrelid = oidarr[i];
556559

557560
if (lockmode != NoLock)
558561
{
@@ -567,9 +570,7 @@ find_inheritance_children_array(Oid parentrelId,
567570
for (j = 0; j < i; j++)
568571
UnlockRelationOid(oidarr[j], lockmode);
569572

570-
/* Init return values */
571-
*children_size = numoids;
572-
*children = oidarr;
573+
pfree(oidarr);
573574

574575
/* We couldn't lock this child, retreat! */
575576
return FCS_COULD_NOT_LOCK;
@@ -586,18 +587,28 @@ find_inheritance_children_array(Oid parentrelId,
586587
{
587588
/* Release useless lock */
588589
UnlockRelationOid(inhrelid, lockmode);
590+
589591
/* And ignore this relation */
590592
continue;
591593
}
592594
}
595+
596+
/* Alloc array if it's the first time */
597+
if (nresult == 0)
598+
result = palloc(numoids * sizeof(Oid));
599+
600+
/* Save Oid of the existing relation */
601+
result[nresult++] = inhrelid;
593602
}
594603

595-
/* Init return values */
596-
*children_size = numoids;
597-
*children = oidarr;
604+
/* Set return values */
605+
*children_size = nresult;
606+
*children = result;
607+
608+
pfree(oidarr);
598609

599-
/* Ok, we have children */
600-
return FCS_FOUND;
610+
/* Do we have children? */
611+
return nresult > 0 ? FCS_FOUND : FCS_NO_CHILDREN;
601612
}
602613

603614
/*

src/partition_creation.c

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,6 @@ create_partitions_for_value(Oid relid, Datum value, Oid value_type)
236236
/* Check that table is partitioned and fetch xmin */
237237
if (pathman_config_contains_relation(relid, NULL, NULL, &rel_xmin))
238238
{
239-
/* Was table partitioned in some previous transaction? */
240-
bool part_in_prev_xact =
241-
TransactionIdPrecedes(rel_xmin, GetCurrentTransactionId()) ||
242-
TransactionIdEquals(rel_xmin, FrozenTransactionId);
243-
244239
/* Take default values */
245240
bool spawn_using_bgw = DEFAULT_SPAWN_USING_BGW,
246241
enable_auto = DEFAULT_AUTO;
@@ -264,7 +259,8 @@ create_partitions_for_value(Oid relid, Datum value, Oid value_type)
264259
* If table has been partitioned in some previous xact AND
265260
* we don't hold any conflicting locks, run BGWorker.
266261
*/
267-
if (spawn_using_bgw && part_in_prev_xact &&
262+
if (spawn_using_bgw &&
263+
xact_object_is_visible(rel_xmin) &&
268264
!xact_bgw_conflicting_lock_exists(relid))
269265
{
270266
elog(DEBUG2, "create_partitions(): chose BGWorker [%u]", MyProcPid);

src/pathman_workers.c

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "pathman_workers.h"
2020
#include "relation_info.h"
2121
#include "utils.h"
22+
#include "xact_handling.h"
2223

2324
#include "access/htup_details.h"
2425
#include "access/xact.h"
@@ -601,11 +602,12 @@ bgw_main_concurrent_part(Datum main_arg)
601602
Datum
602603
partition_table_concurrently(PG_FUNCTION_ARGS)
603604
{
604-
Oid relid = PG_GETARG_OID(0);
605-
int32 batch_size = PG_GETARG_INT32(1);
606-
float8 sleep_time = PG_GETARG_FLOAT8(2);
607-
int empty_slot_idx = -1, /* do we have a slot for BGWorker? */
608-
i;
605+
Oid relid = PG_GETARG_OID(0);
606+
int32 batch_size = PG_GETARG_INT32(1);
607+
float8 sleep_time = PG_GETARG_FLOAT8(2);
608+
int empty_slot_idx = -1, /* do we have a slot for BGWorker? */
609+
i;
610+
TransactionId rel_xmin;
609611

610612
/* Check batch_size */
611613
if (batch_size < 1 || batch_size > 10000)
@@ -622,6 +624,16 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
622624
get_pathman_relation_info_after_lock(relid, true, NULL),
623625
/* Partitioning type does not matter here */
624626
PT_INDIFFERENT);
627+
628+
/* Check that partitioning operation result is visible */
629+
if (pathman_config_contains_relation(relid, NULL, NULL, &rel_xmin))
630+
{
631+
if (!xact_object_is_visible(rel_xmin))
632+
ereport(ERROR, (errmsg("cannot start %s", concurrent_part_bgw),
633+
errdetail("table is being partitioned now")));
634+
}
635+
else elog(ERROR, "cannot find relation %d", relid);
636+
625637
/*
626638
* Look for an empty slot and also check that a concurrent
627639
* partitioning operation for this table hasn't been started yet
@@ -686,7 +698,8 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
686698
/* Tell user everything's fine */
687699
elog(NOTICE,
688700
"worker started, you can stop it "
689-
"with the following command: select %s('%s');",
701+
"with the following command: select %s.%s('%s');",
702+
get_namespace_name(get_pathman_schema()),
690703
CppAsString(stop_concurrent_part_task),
691704
get_rel_name(relid));
692705

src/relation_info.c

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,17 +162,26 @@ refresh_pathman_relation_info(Oid relid,
162162
{
163163
/* If there's no children at all, remove this entry */
164164
case FCS_NO_CHILDREN:
165+
elog(DEBUG2, "refresh: relation %u has no children [%u]",
166+
relid, MyProcPid);
167+
165168
UnlockRelationOid(relid, lockmode);
166169
remove_pathman_relation_info(relid);
167170
return NULL; /* exit */
168171

169172
/* If can't lock children, leave an invalid entry */
170173
case FCS_COULD_NOT_LOCK:
174+
elog(DEBUG2, "refresh: cannot lock children of relation %u [%u]",
175+
relid, MyProcPid);
176+
171177
UnlockRelationOid(relid, lockmode);
172178
return NULL; /* exit */
173179

174180
/* Found some children, just unlock parent */
175181
case FCS_FOUND:
182+
elog(DEBUG2, "refresh: found children of relation %u [%u]",
183+
relid, MyProcPid);
184+
176185
UnlockRelationOid(relid, lockmode);
177186
break; /* continue */
178187

@@ -200,7 +209,8 @@ refresh_pathman_relation_info(Oid relid,
200209
UnlockRelationOid(prel_children[i], lockmode);
201210
}
202211

203-
pfree(prel_children);
212+
if (prel_children)
213+
pfree(prel_children);
204214

205215
/* Read additional parameters ('enable_parent' and 'auto' at the moment) */
206216
if (read_pathman_params(relid, param_values, param_isnull))
@@ -292,6 +302,9 @@ get_pathman_relation_info(Oid relid)
292302
"Fetching %s record for relation %u from pg_pathman's cache [%u]",
293303
(prel ? "live" : "NULL"), relid, MyProcPid);
294304

305+
/* Make sure that 'prel' is valid */
306+
Assert(PrelIsValid(prel));
307+
295308
return prel;
296309
}
297310

src/xact_handling.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "xact_handling.h"
1212

1313
#include "postgres.h"
14+
#include "access/transam.h"
1415
#include "access/xact.h"
1516
#include "catalog/catalog.h"
1617
#include "miscadmin.h"
@@ -156,6 +157,16 @@ xact_is_set_transaction_stmt(Node *stmt)
156157
return false;
157158
}
158159

160+
/*
161+
* Check if object is visible in newer transactions.
162+
*/
163+
bool
164+
xact_object_is_visible(TransactionId obj_xmin)
165+
{
166+
return TransactionIdPrecedes(obj_xmin, GetCurrentTransactionId()) ||
167+
TransactionIdEquals(obj_xmin, FrozenTransactionId);
168+
}
169+
159170
/*
160171
* Do we hold the specified lock?
161172
*/

src/xact_handling.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ bool xact_bgw_conflicting_lock_exists(Oid relid);
3333
bool xact_is_level_read_committed(void);
3434
bool xact_is_transaction_stmt(Node *stmt);
3535
bool xact_is_set_transaction_stmt(Node *stmt);
36+
bool xact_object_is_visible(TransactionId obj_xmin);
3637

3738

3839
#endif /* XACT_HANDLING_H */

0 commit comments

Comments
 (0)