Skip to content

Commit fe96482

Browse files
committed
use atomic flag for worker slots
1 parent dbcce6e commit fe96482

File tree

3 files changed

+41
-18
lines changed

3 files changed

+41
-18
lines changed

src/init.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ pathman_config_contains_relation(Oid relid, Datum *values, bool *isnull,
622622
}
623623

624624
/*
625-
* Return 'enable_parent' parameter of relation
625+
* Loads additional pathman parameters like 'enable_parent' or 'auto'
626626
*/
627627
bool
628628
read_pathman_params(Oid relid, Datum *values, bool *isnull)

src/pathman_workers.c

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,19 @@ init_concurrent_part_task_slots(void)
9595
{
9696
bool found;
9797
Size size = estimate_concurrent_part_task_slots_size();
98+
int i;
9899

99100
concurrent_part_slots = (ConcurrentPartSlot *)
100101
ShmemInitStruct("array of ConcurrentPartSlots", size, &found);
101102

102103
/* Initialize 'concurrent_part_slots' if needed */
103-
if (!found) memset(concurrent_part_slots, 0, size);
104+
if (!found)
105+
{
106+
memset(concurrent_part_slots, 0, size);
107+
108+
for (i = 0; i < PART_WORKER_SLOTS; i++)
109+
pg_atomic_init_flag_impl(&concurrent_part_slots[i].slot_used);
110+
}
104111
}
105112

106113

@@ -423,9 +430,9 @@ bgw_main_concurrent_part(Datum main_arg)
423430
{
424431
MemoryContext old_mcxt;
425432

426-
Oid types[2] = { OIDOID, INT4OID };
427-
Datum vals[2] = { part_slot->relid, part_slot->batch_size };
428-
bool nulls[2] = { false, false };
433+
Oid types[2] = { OIDOID, INT4OID };
434+
Datum vals[2] = { part_slot->relid, part_slot->batch_size };
435+
bool nulls[2] = { false, false };
429436

430437
/* Reset loop variables */
431438
failed = false;
@@ -506,6 +513,7 @@ bgw_main_concurrent_part(Datum main_arg)
506513
{
507514
/* Mark slot as FREE */
508515
part_slot->worker_status = WS_FREE;
516+
pg_atomic_clear_flag(&part_slot->slot_used);
509517

510518
elog(LOG,
511519
"Concurrent partitioning worker has canceled the task because "
@@ -561,7 +569,10 @@ bgw_main_concurrent_part(Datum main_arg)
561569

562570
/* Reclaim the resources */
563571
pfree(sql);
572+
573+
/* Set slot free */
564574
part_slot->worker_status = WS_FREE;
575+
pg_atomic_clear_flag(&part_slot->slot_used);
565576
}
566577

567578

@@ -596,16 +607,24 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
596607
*/
597608
for (i = 0; i < PART_WORKER_SLOTS; i++)
598609
{
599-
if (concurrent_part_slots[i].worker_status == WS_FREE)
610+
/*
611+
* Attempt to acquire the flag. If it has alread been used then skip
612+
* this slot and try another one
613+
*/
614+
if (!pg_atomic_test_set_flag(&concurrent_part_slots[i].slot_used))
615+
continue;
616+
617+
/* If atomic flag wasn't used then status should be WS_FREE */
618+
Assert(concurrent_part_slots[i].worker_status == WS_FREE);
619+
620+
if (empty_slot_idx < 0)
600621
{
601-
if (empty_slot_idx < 0)
602-
{
603-
my_slot = &concurrent_part_slots[i];
604-
empty_slot_idx = i;
605-
}
622+
my_slot = &concurrent_part_slots[i];
623+
empty_slot_idx = i;
606624
}
607-
else if (concurrent_part_slots[i].relid == relid &&
608-
concurrent_part_slots[i].dbid == MyDatabaseId)
625+
626+
if (concurrent_part_slots[i].relid == relid &&
627+
concurrent_part_slots[i].dbid == MyDatabaseId)
609628
{
610629
elog(ERROR,
611630
"Table \"%s\" is already being partitioned",
@@ -745,13 +764,16 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
745764
{
746765
Oid relid = PG_GETARG_OID(0);
747766
int i;
767+
ConcurrentPartSlot *slot;
748768

749769
for (i = 0; i < PART_WORKER_SLOTS; i++)
750-
if (concurrent_part_slots[i].worker_status != WS_FREE &&
751-
concurrent_part_slots[i].relid == relid &&
752-
concurrent_part_slots[i].dbid == MyDatabaseId)
770+
slot = &concurrent_part_slots[i];
771+
772+
if (slot->worker_status != WS_FREE &&
773+
slot->relid == relid &&
774+
slot->dbid == MyDatabaseId)
753775
{
754-
concurrent_part_slots[i].worker_status = WS_STOPPING;
776+
slot->worker_status = WS_STOPPING;
755777
elog(NOTICE, "Worker will stop after it finishes current batch");
756778

757779
PG_RETURN_BOOL(true);

src/pathman_workers.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ typedef struct
4646
*/
4747
typedef struct
4848
{
49+
pg_atomic_flag slot_used; /* flag for atomic slot acquirement */
4950
Oid userid; /* connect as a specified user */
5051

5152
enum
@@ -57,7 +58,7 @@ typedef struct
5758
} worker_status; /* status of a particular worker */
5859

5960
pid_t pid; /* worker's PID */
60-
Oid dbid; /* database which contains relation 'relid' */
61+
Oid dbid; /* database which contains the relation */
6162
Oid relid; /* table to be partitioned concurrently */
6263
uint64 total_rows; /* total amount of rows processed */
6364

0 commit comments

Comments
 (0)