Skip to content

Commit fd4405d

Browse files
committed
make sure PartitionFilter is enabled while performing [concurrent] partitioning, fixes for ConcurrentPartWorker: pythonish tests pass
1 parent 0042c46 commit fd4405d

File tree

4 files changed

+44
-20
lines changed

4 files changed

+44
-20
lines changed

init.sql

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ BEGIN
247247
RETURN;
248248
END
249249
$$
250-
LANGUAGE plpgsql;
250+
LANGUAGE plpgsql
251+
SET pg_pathman.enable_partitionfilter = on; /* ensures that PartitionFilter is ON */
251252

252253
/*
253254
* Old school way to distribute rows to partitions.
@@ -275,7 +276,8 @@ BEGIN
275276
RETURN;
276277
END
277278
$$
278-
LANGUAGE plpgsql;
279+
LANGUAGE plpgsql
280+
SET pg_pathman.enable_partitionfilter = on; /* ensures that PartitionFilter is ON */
279281

280282
/*
281283
* Disable pathman partitioning for specified relation.
@@ -541,7 +543,7 @@ BEGIN
541543
RETURN v_part_count;
542544
END
543545
$$ LANGUAGE plpgsql
544-
SET pg_pathman.enable_partitionfilter = off;
546+
SET pg_pathman.enable_partitionfilter = off; /* ensures that PartitionFilter is OFF */
545547

546548

547549

src/pathman_workers.c

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -421,9 +421,11 @@ bgw_main_concurrent_part(Datum main_arg)
421421
/* Do the job */
422422
do
423423
{
424-
Oid types[2] = { OIDOID, INT4OID };
425-
Datum vals[2] = { part_slot->relid, part_slot->batch_size };
426-
bool nulls[2] = { false, false };
424+
MemoryContext old_mcxt;
425+
426+
Oid types[2] = { OIDOID, INT4OID };
427+
Datum vals[2] = { part_slot->relid, part_slot->batch_size };
428+
bool nulls[2] = { false, false };
427429

428430
/* Reset loop variables */
429431
failed = false;
@@ -432,22 +434,25 @@ bgw_main_concurrent_part(Datum main_arg)
432434
/* Start new transaction (syscache access etc.) */
433435
StartTransactionCommand();
434436

437+
/* We'll need this to recover from errors */
438+
old_mcxt = CurrentMemoryContext;
439+
435440
SPI_connect();
436441
PushActiveSnapshot(GetTransactionSnapshot());
437442

438443
/* Prepare the query if needed */
439444
if (sql == NULL)
440445
{
441-
MemoryContext oldcontext;
446+
MemoryContext current_mcxt;
442447

443448
/*
444449
* Allocate as SQL query in top memory context because current
445450
* context will be destroyed after transaction finishes
446451
*/
447-
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
452+
current_mcxt = MemoryContextSwitchTo(TopMemoryContext);
448453
sql = psprintf("SELECT %s._partition_data_concurrent($1::oid, p_limit:=$2)",
449454
get_namespace_name(get_pathman_schema()));
450-
MemoryContextSwitchTo(oldcontext);
455+
MemoryContextSwitchTo(current_mcxt);
451456
}
452457

453458
/* Exec ret = _partition_data_concurrent() */
@@ -471,21 +476,33 @@ bgw_main_concurrent_part(Datum main_arg)
471476
}
472477
PG_CATCH();
473478
{
474-
ErrorData *error;
475-
476-
EmitErrorReport();
479+
ErrorData *error;
480+
char *sleep_time_str;
477481

482+
/* Switch to the original context & copy edata */
483+
MemoryContextSwitchTo(old_mcxt);
478484
error = CopyErrorData();
479-
elog(LOG, "%s: %s", concurrent_part_bgw, error->message);
480485
FlushErrorState();
486+
487+
/* Print messsage for this BGWorker to server log */
488+
sleep_time_str = datum_to_cstring(Float8GetDatum(part_slot->sleep_time),
489+
FLOAT8OID);
490+
ereport(LOG,
491+
(errmsg("%s: %s", concurrent_part_bgw, error->message),
492+
errdetail("Attempt: %d/%d, sleep time: %s",
493+
failures_count + 1,
494+
PART_WORKER_MAX_ATTEMPTS,
495+
sleep_time_str)));
496+
pfree(sleep_time_str); /* free the time string */
497+
481498
FreeErrorData(error);
482499

483500
/*
484501
* The most common exception we can catch here is a deadlock with
485502
* concurrent user queries. Check that attempts count doesn't exceed
486503
* some reasonable value
487504
*/
488-
if (failures_count++ > PART_WORKER_MAX_ATTEMPTS)
505+
if (failures_count++ >= PART_WORKER_MAX_ATTEMPTS)
489506
{
490507
/* Mark slot as FREE */
491508
part_slot->worker_status = WS_FREE;
@@ -510,8 +527,11 @@ bgw_main_concurrent_part(Datum main_arg)
510527
if (failed)
511528
{
512529
#ifdef USE_ASSERT_CHECKING
513-
elog(DEBUG2, "%s: could not relocate batch, total: %lu [%u]",
514-
concurrent_part_bgw, part_slot->total_rows, MyProcPid);
530+
elog(DEBUG1, "%s: could not relocate batch (%d/%d), total: %lu [%u]",
531+
concurrent_part_bgw,
532+
failures_count, PART_WORKER_MAX_ATTEMPTS, /* current/max */
533+
part_slot->total_rows,
534+
MyProcPid);
515535
#endif
516536

517537
/* Abort transaction and sleep for a second */
@@ -528,7 +548,7 @@ bgw_main_concurrent_part(Datum main_arg)
528548
part_slot->total_rows += rows;
529549

530550
#ifdef USE_ASSERT_CHECKING
531-
elog(DEBUG2, "%s: relocated %d rows, total: %lu [%u]",
551+
elog(DEBUG1, "%s: relocated %d rows, total: %lu [%u]",
532552
concurrent_part_bgw, rows, part_slot->total_rows, MyProcPid);
533553
#endif
534554
}

src/pathman_workers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ typedef struct
8282
#define PART_WORKER_SLOTS 10
8383

8484
/* Max number of attempts per batch */
85-
#define PART_WORKER_MAX_ATTEMPTS 100
85+
#define PART_WORKER_MAX_ATTEMPTS 60
8686

8787

8888
/*

tests/partitioning_test.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ def test_concurrent(self):
5353
while True:
5454
# update some rows to check for deadlocks
5555
node.safe_psql('postgres',
56-
'''update abc set t = 'test'
57-
where id in (select (random() * 300000)::int from generate_series(1, 3000))''')
56+
'''
57+
update abc set t = 'test'
58+
where id in (select (random() * 300000)::int from generate_series(1, 3000))
59+
''')
5860

5961
count = node.execute('postgres', 'select count(*) from pathman_concurrent_part_tasks')
6062

0 commit comments

Comments
 (0)