Skip to content

Commit 788d41c

Browse files
committed
bugfixes for concurrent partitioning
1 parent 0fe96ca commit 788d41c

File tree

2 files changed

+25
-17
lines changed

2 files changed

+25
-17
lines changed

src/pathman_workers.c

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ bgw_main_concurrent_part(Datum main_arg)
512512
if (failures_count++ >= PART_WORKER_MAX_ATTEMPTS)
513513
{
514514
/* Mark slot as FREE */
515-
cps_set_status(part_slot, WS_FREE);
515+
cps_set_status(part_slot, CPS_FREE);
516516

517517
elog(LOG,
518518
"Concurrent partitioning worker has canceled the task because "
@@ -555,7 +555,7 @@ bgw_main_concurrent_part(Datum main_arg)
555555
}
556556

557557
/* If other backend requested to stop us, quit */
558-
if (cps_check_status(part_slot) == WS_STOPPING)
558+
if (cps_check_status(part_slot) == CPS_STOPPING)
559559
break;
560560
}
561561
while(rows > 0 || failed); /* do while there's still rows to be relocated */
@@ -564,7 +564,7 @@ bgw_main_concurrent_part(Datum main_arg)
564564
pfree(sql);
565565

566566
/* Mark slot as FREE */
567-
cps_set_status(part_slot, WS_FREE);
567+
cps_set_status(part_slot, CPS_FREE);
568568
}
569569

570570

@@ -603,7 +603,8 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
603603

604604
SpinLockAcquire(&cur_slot->mutex);
605605

606-
if (empty_slot_idx < 0)
606+
/* Should we take this slot into account? */
607+
if (empty_slot_idx < 0 && cur_slot->worker_status == CPS_FREE)
607608
{
608609
empty_slot_idx = i;
609610
keep_this_lock = true;
@@ -630,7 +631,7 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
630631
{
631632
/* Initialize concurrent part slot */
632633
InitConcurrentPartSlot(&concurrent_part_slots[empty_slot_idx],
633-
GetAuthenticatedUserId(), WS_WORKING,
634+
GetAuthenticatedUserId(), CPS_WORKING,
634635
MyDatabaseId, relid, 1000, 1.0);
635636

636637
SpinLockRelease(&concurrent_part_slots[empty_slot_idx].mutex);
@@ -707,12 +708,13 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
707708
for (i = userctx->cur_idx; i < PART_WORKER_SLOTS; i++)
708709
{
709710
ConcurrentPartSlot *cur_slot = &concurrent_part_slots[i];
711+
HeapTuple htup = NULL;
710712

713+
HOLD_INTERRUPTS();
711714
SpinLockAcquire(&cur_slot->mutex);
712715

713-
if (cur_slot->worker_status != WS_FREE)
716+
if (cur_slot->worker_status != CPS_FREE)
714717
{
715-
HeapTuple tuple;
716718
Datum values[Natts_pathman_cp_tasks];
717719
bool isnull[Natts_pathman_cp_tasks] = { 0 };
718720

@@ -725,12 +727,12 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
725727
/* Now build a status string */
726728
switch(cur_slot->worker_status)
727729
{
728-
case WS_WORKING:
730+
case CPS_WORKING:
729731
values[Anum_pathman_cp_tasks_status - 1] =
730732
PointerGetDatum(cstring_to_text("working"));
731733
break;
732734

733-
case WS_STOPPING:
735+
case CPS_STOPPING:
734736
values[Anum_pathman_cp_tasks_status - 1] =
735737
PointerGetDatum(cstring_to_text("stopping"));
736738
break;
@@ -741,15 +743,18 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
741743
}
742744

743745
/* Form output tuple */
744-
tuple = heap_form_tuple(funcctx->tuple_desc, values, isnull);
746+
htup = heap_form_tuple(funcctx->tuple_desc, values, isnull);
745747

746748
/* Switch to next worker */
747749
userctx->cur_idx = i + 1;
748-
749-
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
750750
}
751751

752752
SpinLockRelease(&cur_slot->mutex);
753+
RESUME_INTERRUPTS();
754+
755+
/* Return tuple if needed */
756+
if (htup)
757+
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(htup));
753758
}
754759

755760
SRF_RETURN_DONE(funcctx);
@@ -770,19 +775,22 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
770775
{
771776
ConcurrentPartSlot *cur_slot = &concurrent_part_slots[i];
772777

778+
HOLD_INTERRUPTS();
773779
SpinLockAcquire(&cur_slot->mutex);
774780

775-
if (cur_slot->worker_status != WS_FREE &&
781+
if (cur_slot->worker_status != CPS_FREE &&
776782
cur_slot->relid == relid &&
777783
cur_slot->dbid == MyDatabaseId)
778784
{
779785
elog(NOTICE, "Worker will stop after it finishes current batch");
780786

781-
cur_slot->worker_status = WS_STOPPING;
787+
/* Change worker's state & set 'worker_found' */
788+
cur_slot->worker_status = CPS_STOPPING;
782789
worker_found = true;
783790
}
784791

785792
SpinLockRelease(&cur_slot->mutex);
793+
RESUME_INTERRUPTS();
786794
}
787795

788796
if (worker_found)

src/pathman_workers.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ typedef struct
4444

4545
typedef enum
4646
{
47-
WS_FREE = 0, /* slot is empty */
48-
WS_WORKING, /* occupied by live worker */
49-
WS_STOPPING /* worker is going to shutdown */
47+
CPS_FREE = 0, /* slot is empty */
48+
CPS_WORKING, /* occupied by live worker */
49+
CPS_STOPPING /* worker is going to shutdown */
5050

5151
} ConcurrentPartSlotStatus;
5252

0 commit comments

Comments
 (0)