Skip to content

Commit 9b80448

Browse files
committed
concurrent partitioning subsystem now uses spinlocks
1 parent 67ff4a8 commit 9b80448

File tree

2 files changed

+102
-65
lines changed

2 files changed

+102
-65
lines changed

src/pathman_workers.c

Lines changed: 69 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ init_concurrent_part_task_slots(void)
106106
memset(concurrent_part_slots, 0, size);
107107

108108
for (i = 0; i < PART_WORKER_SLOTS; i++)
109-
pg_atomic_init_flag_impl(&concurrent_part_slots[i].slot_used);
109+
SpinLockInit(&concurrent_part_slots[i].mutex);
110110
}
111111
}
112112

@@ -235,10 +235,10 @@ start_bg_worker(const char bgworker_name[BGW_MAXLEN],
235235
static dsm_segment *
236236
create_partitions_bg_worker_segment(Oid relid, Datum value, Oid value_type)
237237
{
238-
TypeCacheEntry *typcache;
239-
Size datum_size;
240-
Size segment_size;
241-
dsm_segment *segment;
238+
TypeCacheEntry *typcache;
239+
Size datum_size;
240+
Size segment_size;
241+
dsm_segment *segment;
242242
SpawnPartitionArgs *args;
243243

244244
typcache = lookup_type_cache(value_type, 0);
@@ -314,10 +314,10 @@ create_partitions_bg_worker(Oid relid, Datum value, Oid value_type)
314314
static void
315315
bgw_main_spawn_partitions(Datum main_arg)
316316
{
317-
dsm_handle handle = DatumGetUInt32(main_arg);
318-
dsm_segment *segment;
319-
SpawnPartitionArgs *args;
320-
Datum value;
317+
dsm_handle handle = DatumGetUInt32(main_arg);
318+
dsm_segment *segment;
319+
SpawnPartitionArgs *args;
320+
Datum value;
321321

322322
/* Establish signal handlers before unblocking signals. */
323323
pqsignal(SIGTERM, handle_sigterm);
@@ -512,8 +512,7 @@ bgw_main_concurrent_part(Datum main_arg)
512512
if (failures_count++ >= PART_WORKER_MAX_ATTEMPTS)
513513
{
514514
/* Mark slot as FREE */
515-
part_slot->worker_status = WS_FREE;
516-
pg_atomic_clear_flag(&part_slot->slot_used);
515+
cps_set_status(part_slot, WS_FREE);
517516

518517
elog(LOG,
519518
"Concurrent partitioning worker has canceled the task because "
@@ -534,14 +533,6 @@ bgw_main_concurrent_part(Datum main_arg)
534533

535534
if (failed)
536535
{
537-
#ifdef USE_ASSERT_CHECKING
538-
elog(DEBUG1, "%s: could not relocate batch (%d/%d), total: %lu [%u]",
539-
concurrent_part_bgw,
540-
failures_count, PART_WORKER_MAX_ATTEMPTS, /* current/max */
541-
part_slot->total_rows,
542-
MyProcPid);
543-
#endif
544-
545536
/* Abort transaction and sleep for a second */
546537
AbortCurrentTransaction();
547538
DirectFunctionCall1(pg_sleep, Float8GetDatum(part_slot->sleep_time));
@@ -553,26 +544,27 @@ bgw_main_concurrent_part(Datum main_arg)
553544
failures_count = 0;
554545

555546
/* Add rows to total_rows */
547+
SpinLockAcquire(&part_slot->mutex);
556548
part_slot->total_rows += rows;
557-
549+
/* Report debug message */
558550
#ifdef USE_ASSERT_CHECKING
559551
elog(DEBUG1, "%s: relocated %d rows, total: %lu [%u]",
560552
concurrent_part_bgw, rows, part_slot->total_rows, MyProcPid);
561553
#endif
554+
SpinLockRelease(&part_slot->mutex);
562555
}
563556

564557
/* If other backend requested to stop us, quit */
565-
if (part_slot->worker_status == WS_STOPPING)
558+
if (cps_check_status(part_slot) == WS_STOPPING)
566559
break;
567560
}
568561
while(rows > 0 || failed); /* do while there's still rows to be relocated */
569562

570563
/* Reclaim the resources */
571564
pfree(sql);
572565

573-
/* Set slot free */
574-
part_slot->worker_status = WS_FREE;
575-
pg_atomic_clear_flag(&part_slot->slot_used);
566+
/* Mark slot as FREE */
567+
cps_set_status(part_slot, WS_FREE);
576568
}
577569

578570

@@ -589,12 +581,11 @@ bgw_main_concurrent_part(Datum main_arg)
589581
Datum
590582
partition_table_concurrently(PG_FUNCTION_ARGS)
591583
{
592-
#define tostr(str) ( #str )
584+
#define tostr(str) ( #str ) /* convert function's name to literal */
593585

594-
Oid relid = PG_GETARG_OID(0);
595-
ConcurrentPartSlot *my_slot = NULL;
596-
int empty_slot_idx = -1;
597-
int i;
586+
Oid relid = PG_GETARG_OID(0);
587+
int empty_slot_idx = -1;
588+
int i;
598589

599590
/* Check if relation is a partitioned table */
600591
shout_if_prel_is_invalid(relid,
@@ -607,38 +598,43 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
607598
*/
608599
for (i = 0; i < PART_WORKER_SLOTS; i++)
609600
{
610-
/*
611-
* Attempt to acquire the flag. If it has alread been used then skip
612-
* this slot and try another one
613-
*/
614-
if (!pg_atomic_test_set_flag(&concurrent_part_slots[i].slot_used))
615-
continue;
601+
ConcurrentPartSlot *cur_slot = &concurrent_part_slots[i];
602+
bool keep_this_lock = false;
616603

617-
/* If atomic flag wasn't used then status should be WS_FREE */
618-
Assert(concurrent_part_slots[i].worker_status == WS_FREE);
604+
SpinLockAcquire(&cur_slot->mutex);
619605

620606
if (empty_slot_idx < 0)
621607
{
622-
my_slot = &concurrent_part_slots[i];
623608
empty_slot_idx = i;
609+
keep_this_lock = true;
624610
}
625611

626-
if (concurrent_part_slots[i].relid == relid &&
627-
concurrent_part_slots[i].dbid == MyDatabaseId)
612+
if (cur_slot->relid == relid &&
613+
cur_slot->dbid == MyDatabaseId)
628614
{
615+
if (empty_slot_idx >= 0)
616+
SpinLockRelease(&cur_slot->mutex);
617+
629618
elog(ERROR,
630619
"Table \"%s\" is already being partitioned",
631620
get_rel_name(relid));
632621
}
622+
623+
if (!keep_this_lock)
624+
SpinLockRelease(&cur_slot->mutex);
633625
}
634626

635-
if (my_slot == NULL)
627+
if (empty_slot_idx < 0)
636628
elog(ERROR, "No empty worker slots found");
629+
else
630+
{
631+
/* Initialize concurrent part slot */
632+
InitConcurrentPartSlot(&concurrent_part_slots[empty_slot_idx],
633+
GetAuthenticatedUserId(), WS_WORKING,
634+
MyDatabaseId, relid, 1000, 1.0);
637635

638-
/* Initialize concurrent part slot */
639-
InitConcurrentPartSlot(my_slot, GetAuthenticatedUserId(),
640-
WS_WORKING, MyDatabaseId, relid,
641-
1000, 1.0);
636+
SpinLockRelease(&concurrent_part_slots[empty_slot_idx].mutex);
637+
}
642638

643639
/* Start worker (we should not wait) */
644640
start_bg_worker(concurrent_part_bgw,
@@ -712,11 +708,13 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
712708
{
713709
ConcurrentPartSlot *cur_slot = &concurrent_part_slots[i];
714710

711+
SpinLockAcquire(&cur_slot->mutex);
712+
715713
if (cur_slot->worker_status != WS_FREE)
716714
{
717715
HeapTuple tuple;
718716
Datum values[Natts_pathman_cp_tasks];
719-
bool isnull[Natts_pathman_cp_tasks] = { 0, 0, 0, 0, 0, 0 };
717+
bool isnull[Natts_pathman_cp_tasks] = { 0 };
720718

721719
values[Anum_pathman_cp_tasks_userid - 1] = cur_slot->userid;
722720
values[Anum_pathman_cp_tasks_pid - 1] = cur_slot->pid;
@@ -750,6 +748,8 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
750748

751749
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
752750
}
751+
752+
SpinLockRelease(&cur_slot->mutex);
753753
}
754754

755755
SRF_RETURN_DONE(funcctx);
@@ -763,22 +763,35 @@ Datum
763763
stop_concurrent_part_task(PG_FUNCTION_ARGS)
764764
{
765765
Oid relid = PG_GETARG_OID(0);
766+
bool worker_found = false;
766767
int i;
767-
ConcurrentPartSlot *slot;
768768

769-
for (i = 0; i < PART_WORKER_SLOTS; i++)
770-
slot = &concurrent_part_slots[i];
769+
for (i = 0; i < PART_WORKER_SLOTS && !worker_found; i++)
770+
{
771+
ConcurrentPartSlot *cur_slot = &concurrent_part_slots[i];
772+
773+
SpinLockAcquire(&cur_slot->mutex);
771774

772-
if (slot->worker_status != WS_FREE &&
773-
slot->relid == relid &&
774-
slot->dbid == MyDatabaseId)
775+
if (cur_slot->worker_status != WS_FREE &&
776+
cur_slot->relid == relid &&
777+
cur_slot->dbid == MyDatabaseId)
775778
{
776-
slot->worker_status = WS_STOPPING;
777779
elog(NOTICE, "Worker will stop after it finishes current batch");
778780

779-
PG_RETURN_BOOL(true);
781+
cur_slot->worker_status = WS_STOPPING;
782+
worker_found = true;
780783
}
781784

782-
elog(ERROR, "Cannot find worker for relation \"%s\"",
783-
get_rel_name_or_relid(relid));
785+
SpinLockRelease(&cur_slot->mutex);
786+
}
787+
788+
if (worker_found)
789+
PG_RETURN_BOOL(true);
790+
else
791+
{
792+
elog(ERROR, "Cannot find worker for relation \"%s\"",
793+
get_rel_name_or_relid(relid));
794+
795+
PG_RETURN_BOOL(false); /* keep compiler happy */
796+
}
784797
}

src/pathman_workers.h

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#define PATHMAN_WORKERS_H
1919

2020
#include "postgres.h"
21+
#include "storage/spin.h"
2122

2223

2324
/*
@@ -41,22 +42,24 @@ typedef struct
4142
} SpawnPartitionArgs;
4243

4344

45+
typedef enum
46+
{
47+
WS_FREE = 0, /* slot is empty */
48+
WS_WORKING, /* occupied by live worker */
49+
WS_STOPPING /* worker is going to shutdown */
50+
51+
} ConcurrentPartSlotStatus;
52+
4453
/*
4554
* Store args and execution status of a single ConcurrentPartWorker.
4655
*/
4756
typedef struct
4857
{
49-
pg_atomic_flag slot_used; /* flag for atomic slot acquirement */
50-
Oid userid; /* connect as a specified user */
58+
slock_t mutex; /* protect slot from race conditions */
5159

52-
enum
53-
{
54-
WS_FREE = 0, /* slot is empty */
55-
WS_WORKING, /* occupied by live worker */
56-
WS_STOPPING /* worker is going to shutdown */
57-
58-
} worker_status; /* status of a particular worker */
60+
ConcurrentPartSlotStatus worker_status; /* status of a particular worker */
5961

62+
Oid userid; /* connect as a specified user */
6063
pid_t pid; /* worker's PID */
6164
Oid dbid; /* database which contains the relation */
6265
Oid relid; /* table to be partitioned concurrently */
@@ -78,6 +81,27 @@ typedef struct
7881
(slot)->sleep_time = (sleep_t); \
7982
} while (0)
8083

84+
static inline ConcurrentPartSlotStatus
85+
cps_check_status(ConcurrentPartSlot *slot)
86+
{
87+
ConcurrentPartSlotStatus status;
88+
89+
SpinLockAcquire(&slot->mutex);
90+
status = slot->worker_status;
91+
SpinLockRelease(&slot->mutex);
92+
93+
return status;
94+
}
95+
96+
static inline void
97+
cps_set_status(ConcurrentPartSlot *slot, ConcurrentPartSlotStatus status)
98+
{
99+
SpinLockAcquire(&slot->mutex);
100+
slot->worker_status = status;
101+
SpinLockRelease(&slot->mutex);
102+
}
103+
104+
81105

82106
/* Number of worker slots for concurrent partitioning */
83107
#define PART_WORKER_SLOTS 10

0 commit comments

Comments
 (0)