Skip to content

Commit 11be0fe

Browse files
committed
ConcurrentPartWorker connects to DB using a given role, new macros for pathman_concurrent_part_tasks, fixes
1 parent 4ddfbb5 commit 11be0fe

File tree

4 files changed

+158
-70
lines changed

4 files changed

+158
-70
lines changed

init.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ LANGUAGE plpgsql;
158158
*/
159159
CREATE OR REPLACE FUNCTION @extschema@.show_concurrent_part_tasks()
160160
RETURNS TABLE (
161+
userid REGROLE,
161162
pid INT,
162163
dbid OID,
163164
relid REGCLASS,
@@ -188,7 +189,7 @@ RETURNS BOOL AS 'pg_pathman', 'stop_concurrent_part_task' LANGUAGE C STRICT;
188189
* Copy rows to partitions concurrently.
189190
*/
190191
CREATE OR REPLACE FUNCTION @extschema@._partition_data_concurrent(
191-
p_relation regclass,
192+
p_relation REGCLASS,
192193
p_min ANYELEMENT DEFAULT NULL::text,
193194
p_max ANYELEMENT DEFAULT NULL::text,
194195
p_limit INT DEFAULT NULL,

src/pathman_workers.c

Lines changed: 123 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "pathman_workers.h"
1919
#include "relation_info.h"
2020
#include "utils.h"
21+
#include "xact_handling.h"
2122

2223
#include "access/htup_details.h"
2324
#include "access/xact.h"
@@ -31,6 +32,7 @@
3132
#include "storage/latch.h"
3233
#include "utils/builtins.h"
3334
#include "utils/datum.h"
35+
#include "utils/memutils.h"
3436
#include "utils/lsyscache.h"
3537
#include "utils/typcache.h"
3638
#include "utils/resowner.h"
@@ -351,6 +353,17 @@ bgw_main_spawn_partitions(Datum main_arg)
351353
DebugPrintDatum(value, args->value_type), MyProcPid);
352354
#endif
353355

356+
/* Check again if there's a conflicting lock */
357+
if (xact_conflicting_lock_exists(args->partitioned_table))
358+
{
359+
elog(LOG, "%s: there's a conflicting lock on relation \"%s\"",
360+
spawn_partitions_bgw,
361+
get_rel_name_or_relid(args->partitioned_table));
362+
363+
dsm_detach(segment);
364+
return; /* exit quickly */
365+
}
366+
354367
/* Create partitions and save the Oid of the last one */
355368
args->result = create_partitions_internal(args->partitioned_table,
356369
value, /* unpacked Datum */
@@ -378,45 +391,51 @@ bgw_main_spawn_partitions(Datum main_arg)
378391
static void
379392
bgw_main_concurrent_part(Datum main_arg)
380393
{
381-
ConcurrentPartSlot *args;
382-
Oid types[2] = { OIDOID, INT4OID };
383-
Datum vals[2];
384-
bool nulls[2] = { false, false };
385394
int rows;
386-
int slot_idx = DatumGetInt32(main_arg);
387-
MemoryContext worker_context = CurrentMemoryContext;
388-
int failures_count = 0;
389395
bool failed;
396+
int failures_count = 0;
390397
char *sql = NULL;
391-
392-
/* Create resource owner */
393-
CurrentResourceOwner = ResourceOwnerCreate(NULL, "PartitionDataWorker");
394-
395-
args = &concurrent_part_slots[slot_idx];
396-
args->pid = MyProcPid;
397-
vals[0] = args->relid;
398-
vals[1] = 10000;
398+
ConcurrentPartSlot *part_slot;
399399

400400
/* Establish signal handlers before unblocking signals. */
401401
pqsignal(SIGTERM, handle_sigterm);
402402

403403
/* We're now ready to receive signals */
404404
BackgroundWorkerUnblockSignals();
405405

406+
/* Create resource owner */
407+
CurrentResourceOwner = ResourceOwnerCreate(NULL, concurrent_part_bgw);
408+
409+
/* Update concurrent part slot */
410+
part_slot = &concurrent_part_slots[DatumGetInt32(main_arg)];
411+
part_slot->pid = MyProcPid;
412+
406413
/* Establish connection and start transaction */
407-
BackgroundWorkerInitializeConnectionByOid(args->dbid, InvalidOid);
414+
BackgroundWorkerInitializeConnectionByOid(part_slot->dbid, part_slot->userid);
408415

416+
/* Initialize pg_pathman's local config */
417+
StartTransactionCommand();
418+
bg_worker_load_config(concurrent_part_bgw);
419+
CommitTransactionCommand();
420+
421+
/* Do the job */
409422
do
410423
{
424+
Oid types[2] = { OIDOID, INT4OID };
425+
Datum vals[2] = { part_slot->relid, part_slot->batch_size };
426+
bool nulls[2] = { false, false };
427+
428+
/* Reset loop variables */
411429
failed = false;
412430
rows = 0;
431+
432+
/* Start new transaction (syscache access etc.) */
413433
StartTransactionCommand();
414-
bg_worker_load_config("PartitionDataWorker");
415434

416435
SPI_connect();
417436
PushActiveSnapshot(GetTransactionSnapshot());
418437

419-
/* Do some preparation within the first iteration */
438+
/* Prepare the query if needed */
420439
if (sql == NULL)
421440
{
422441
MemoryContext oldcontext;
@@ -425,78 +444,104 @@ bgw_main_concurrent_part(Datum main_arg)
425444
* Allocate as SQL query in top memory context because current
426445
* context will be destroyed after transaction finishes
427446
*/
428-
oldcontext = MemoryContextSwitchTo(worker_context);
447+
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
429448
sql = psprintf("SELECT %s._partition_data_concurrent($1::oid, p_limit:=$2)",
430-
get_namespace_name(get_pathman_schema()));
449+
get_namespace_name(get_pathman_schema()));
431450
MemoryContextSwitchTo(oldcontext);
432451
}
433452

453+
/* Exec ret = _partition_data_concurrent() */
434454
PG_TRY();
435455
{
436456
int ret;
437457
bool isnull;
438458

439459
ret = SPI_execute_with_args(sql, 2, types, vals, nulls, false, 0);
440-
if (ret > 0)
460+
if (ret == SPI_OK_SELECT)
441461
{
442462
TupleDesc tupdesc = SPI_tuptable->tupdesc;
443463
HeapTuple tuple = SPI_tuptable->vals[0];
444464

445-
Assert(SPI_processed == 1);
465+
Assert(SPI_processed == 1); /* there should be 1 result at most */
446466

447467
rows = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 1, &isnull));
468+
469+
Assert(!isnull); /* ... and ofc it must not be NULL */
448470
}
449471
}
450472
PG_CATCH();
451473
{
452474
ErrorData *error;
475+
453476
EmitErrorReport();
477+
454478
error = CopyErrorData();
455-
elog(LOG, "Worker error: %s", error->message);
479+
elog(LOG, "%s: %s", concurrent_part_bgw, error->message);
456480
FlushErrorState();
481+
FreeErrorData(error);
457482

458483
/*
459484
* The most common exception we can catch here is a deadlock with
460485
* concurrent user queries. Check that attempts count doesn't exceed
461486
* some reasonable value
462487
*/
463-
if (100 <= failures_count++)
488+
if (failures_count++ > PART_WORKER_MAX_ATTEMPTS)
464489
{
465-
pfree(sql);
466-
args->worker_status = WS_FREE;
490+
/* Mark slot as FREE */
491+
part_slot->worker_status = WS_FREE;
492+
467493
elog(LOG,
468-
"The concurrent partitioning worker exiting because the "
469-
"maximum attempts count exceeded. See the error message below");
470-
exit(1);
494+
"Concurrent partitioning worker has canceled the task because "
495+
"maximum amount of attempts (%d) had been exceeded. "
496+
"See the error message below",
497+
PART_WORKER_MAX_ATTEMPTS);
498+
499+
return; /* exit quickly */
471500
}
501+
502+
/* Set 'failed' flag */
472503
failed = true;
473504
}
474505
PG_END_TRY();
475506

476507
SPI_finish();
477508
PopActiveSnapshot();
509+
478510
if (failed)
479511
{
480-
/* abort transaction and sleep for a second */
512+
#ifdef USE_ASSERT_CHECKING
513+
elog(DEBUG2, "%s: could not relocate batch, total: %lu [%u]",
514+
concurrent_part_bgw, part_slot->total_rows, MyProcPid);
515+
#endif
516+
517+
/* Abort transaction and sleep for a second */
481518
AbortCurrentTransaction();
482-
DirectFunctionCall1(pg_sleep, Float8GetDatum(1));
519+
DirectFunctionCall1(pg_sleep, Float8GetDatum(part_slot->sleep_time));
483520
}
484521
else
485522
{
486-
/* Reset failures counter and commit transaction */
523+
/* Commit transaction and reset 'failures_count' */
487524
CommitTransactionCommand();
488525
failures_count = 0;
489-
args->total_rows += rows;
526+
527+
/* Add rows to total_rows */
528+
part_slot->total_rows += rows;
529+
530+
#ifdef USE_ASSERT_CHECKING
531+
elog(DEBUG2, "%s: relocated %d rows, total: %lu [%u]",
532+
concurrent_part_bgw, rows, part_slot->total_rows, MyProcPid);
533+
#endif
490534
}
491535

492-
/* If other backend requested to stop worker then quit */
493-
if (args->worker_status == WS_STOPPING)
536+
/* If other backend requested to stop us, quit */
537+
if (part_slot->worker_status == WS_STOPPING)
494538
break;
495539
}
496-
while(rows > 0 || failed); /* do while there is still rows to relocate */
540+
while(rows > 0 || failed); /* do while there's still rows to be relocated */
497541

542+
/* Reclaim the resources */
498543
pfree(sql);
499-
args->worker_status = WS_FREE;
544+
part_slot->worker_status = WS_FREE;
500545
}
501546

502547

@@ -513,6 +558,8 @@ bgw_main_concurrent_part(Datum main_arg)
513558
Datum
514559
partition_table_concurrently(PG_FUNCTION_ARGS)
515560
{
561+
#define tostr(str) ( #str )
562+
516563
Oid relid = PG_GETARG_OID(0);
517564
ConcurrentPartSlot *my_slot = NULL;
518565
int empty_slot_idx = -1;
@@ -550,7 +597,9 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
550597
elog(ERROR, "No empty worker slots found");
551598

552599
/* Initialize concurrent part slot */
553-
InitConcurrentPartSlot(my_slot, WS_WORKING, MyDatabaseId, relid);
600+
InitConcurrentPartSlot(my_slot, GetAuthenticatedUserId(),
601+
WS_WORKING, MyDatabaseId, relid,
602+
1000, 1.0);
554603

555604
/* Start worker (we should not wait) */
556605
start_bg_worker(concurrent_part_bgw,
@@ -560,8 +609,9 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
560609

561610
/* Tell user everything's fine */
562611
elog(NOTICE,
563-
"Worker started. You can stop it with the following command: "
564-
"select stop_concurrent_part_task('%s');",
612+
"Worker started. You can stop it "
613+
"with the following command: select %s('%s');",
614+
tostr(stop_concurrent_part_task), /* convert function's name to literal */
565615
get_rel_name(relid));
566616

567617
PG_RETURN_VOID();
@@ -594,12 +644,20 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
594644
userctx->cur_idx = 0;
595645

596646
/* Create tuple descriptor */
597-
tupdesc = CreateTemplateTupleDesc(5, false);
598-
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pid", INT4OID, -1, 0);
599-
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "dbid", OIDOID, -1, 0);
600-
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "relid", REGCLASSOID, -1, 0);
601-
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "processed", INT4OID, -1, 0);
602-
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status", TEXTOID, -1, 0);
647+
tupdesc = CreateTemplateTupleDesc(Natts_pathman_cp_tasks, false);
648+
649+
TupleDescInitEntry(tupdesc, Anum_pathman_cp_tasks_userid,
650+
"userid", REGROLEOID, -1, 0);
651+
TupleDescInitEntry(tupdesc, Anum_pathman_cp_tasks_pid,
652+
"pid", INT4OID, -1, 0);
653+
TupleDescInitEntry(tupdesc, Anum_pathman_cp_tasks_dbid,
654+
"dbid", OIDOID, -1, 0);
655+
TupleDescInitEntry(tupdesc, Anum_pathman_cp_tasks_relid,
656+
"relid", REGCLASSOID, -1, 0);
657+
TupleDescInitEntry(tupdesc, Anum_pathman_cp_tasks_processed,
658+
"processed", INT4OID, -1, 0);
659+
TupleDescInitEntry(tupdesc, Anum_pathman_cp_tasks_status,
660+
"status", TEXTOID, -1, 0);
603661

604662
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
605663
funcctx->user_fctx = (void *) userctx;
@@ -610,35 +668,39 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
610668
funcctx = SRF_PERCALL_SETUP();
611669
userctx = (active_workers_cxt *) funcctx->user_fctx;
612670

613-
/*
614-
* Iterate through worker slots
615-
*/
671+
/* Iterate through worker slots */
616672
for (i = userctx->cur_idx; i < PART_WORKER_SLOTS; i++)
617673
{
618-
if (concurrent_part_slots[i].worker_status != WS_FREE)
674+
ConcurrentPartSlot *cur_slot = &concurrent_part_slots[i];
675+
676+
if (cur_slot->worker_status != WS_FREE)
619677
{
620678
HeapTuple tuple;
621-
Datum values[5];
622-
bool isnull[5] = { false, false, false, false, false };
679+
Datum values[Natts_pathman_cp_tasks];
680+
bool isnull[Natts_pathman_cp_tasks] = { 0, 0, 0, 0, 0, 0 };
623681

624-
values[0] = concurrent_part_slots[i].pid;
625-
values[1] = concurrent_part_slots[i].dbid;
626-
values[2] = concurrent_part_slots[i].relid;
627-
values[3] = concurrent_part_slots[i].total_rows;
682+
values[Anum_pathman_cp_tasks_userid - 1] = cur_slot->userid;
683+
values[Anum_pathman_cp_tasks_pid - 1] = cur_slot->pid;
684+
values[Anum_pathman_cp_tasks_dbid - 1] = cur_slot->dbid;
685+
values[Anum_pathman_cp_tasks_relid - 1] = cur_slot->relid;
686+
values[Anum_pathman_cp_tasks_processed - 1] = cur_slot->total_rows;
628687

629688
/* Now build a status string */
630-
switch(concurrent_part_slots[i].worker_status)
689+
switch(cur_slot->worker_status)
631690
{
632691
case WS_WORKING:
633-
values[4] = PointerGetDatum(pstrdup("working"));
692+
values[Anum_pathman_cp_tasks_status - 1] =
693+
PointerGetDatum(cstring_to_text("working"));
634694
break;
635695

636696
case WS_STOPPING:
637-
values[4] = PointerGetDatum(pstrdup("stopping"));
697+
values[Anum_pathman_cp_tasks_status - 1] =
698+
PointerGetDatum(cstring_to_text("stopping"));
638699
break;
639700

640701
default:
641-
values[4] = PointerGetDatum(pstrdup("[unknown]"));
702+
values[Anum_pathman_cp_tasks_status - 1] =
703+
PointerGetDatum(cstring_to_text("[unknown]"));
642704
}
643705

644706
/* Form output tuple */
@@ -670,7 +732,7 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
670732
concurrent_part_slots[i].dbid == MyDatabaseId)
671733
{
672734
concurrent_part_slots[i].worker_status = WS_STOPPING;
673-
elog(NOTICE, "Worker will stop after current batch's finished");
735+
elog(NOTICE, "Worker will stop after it finishes current batch");
674736

675737
PG_RETURN_BOOL(true);
676738
}

0 commit comments

Comments
 (0)