Skip to content

Commit cf25780

Browse files
schmiddydvarrazzo
authored andcommitted
Further improvements to concurrent-DDL guard.
Fix table locking so that race conditions don't exist between lock release in primary conn, and lock acquisition in conn2. Also, have conn2 be in charge of performing the table swap step, to avoid a similar race. Part of work for Issue #8.
1 parent 3606e0a commit cf25780

File tree

1 file changed

+119
-40
lines changed

1 file changed

+119
-40
lines changed

bin/pg_repack.c

Lines changed: 119 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,24 @@ const char *PROGRAM_VERSION = "unknown";
5353
"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
5454
" AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)"
5555

56+
/* To be run while our main connection holds an AccessExclusive lock on the
57+
* target table, and our secondary conn is attempting to grab an AccessShare
58+
* lock. We know that "granted" must be false for these queries because
59+
* we already hold the AccessExclusive lock. Also, we only care about other
60+
* transactions trying to grab an ACCESS EXCLUSIVE lock, because that lock
61+
* level is needed for any of the disallowed DDL commands, e.g. ALTER TABLE
62+
* or TRUNCATE.
63+
*/
64+
#define CANCEL_COMPETING_LOCKS \
65+
"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
66+
" AND granted = false AND relation = %u"\
67+
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
68+
69+
#define KILL_COMPETING_LOCKS \
70+
"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
71+
" AND granted = false AND relation = %u"\
72+
" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
73+
5674
/*
5775
* per-table information
5876
*/
@@ -95,7 +113,7 @@ static void repack_cleanup(bool fatal, void *userdata);
95113

96114
static char *getstr(PGresult *res, int row, int col);
97115
static Oid getoid(PGresult *res, int row, int col);
98-
static void lock_exclusive(const char *relid, const char *lock_query, bool release_conn2);
116+
static void lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact);
99117

100118
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
101119
#define SQLSTATE_QUERY_CANCELED "57014"
@@ -356,7 +374,7 @@ repack_one_database(const char *orderby, const char *table)
356374
}
357375

358376
static int
359-
apply_log(const repack_table *table, int count)
377+
apply_log(PGconn *conn, const repack_table *table, int count)
360378
{
361379
int result;
362380
PGresult *res;
@@ -370,8 +388,9 @@ apply_log(const repack_table *table, int count)
370388
params[4] = table->sql_pop;
371389
params[5] = utoa(count, buffer);
372390

373-
res = execute("SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)",
374-
6, params);
391+
res = pgut_execute(conn,
392+
"SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)",
393+
6, params);
375394
result = atoi(PQgetvalue(res, 0, 0));
376395
PQclear(res);
377396

@@ -421,8 +440,7 @@ repack_one_table(const repack_table *table, const char *orderby)
421440
* 1. Setup workspaces and a trigger.
422441
*/
423442
elog(DEBUG2, "---- setup ----");
424-
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, FALSE);
425-
443+
lock_exclusive(connection, utoa(table->target_oid, buffer), table->lock_table, TRUE);
426444

427445
/*
428446
* Check z_repack_trigger is the trigger executed at last so that
@@ -444,39 +462,101 @@ repack_one_table(const repack_table *table, const char *orderby)
444462
command(table->enable_trigger, 0, NULL);
445463
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid);
446464
command(sql.data, 0, NULL);
447-
command("COMMIT", 0, NULL);
448465

466+
/* While we are still holding an AccessExclusive lock on the table, submit
467+
* the request for an AccessShare lock asynchronously from conn2.
468+
* We want to submit this query in conn2 while connection's
469+
* transaction still holds its lock, so that no DDL may sneak in
470+
* between the time that connection commits and conn2 gets its lock.
471+
*
472+
*/
449473
pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
450-
elog(DEBUG2, "Obtaining ACCESS SHARE lock for %s", table->target_name);
451474

452-
/* XXX: table name escaping? */
453-
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
454-
table->target_name);
455-
res = pgut_execute(conn2, sql.data, 0, NULL);
456-
if (PQresultStatus(res) != PGRES_COMMAND_OK)
475+
/* grab the backend PID of conn2; we'll need this when querying
476+
* pg_locks momentarily.
477+
*/
478+
res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL);
479+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
457480
{
458481
printf("%s", PQerrorMessage(conn2));
459482
PQclear(res);
460483
exit(1);
461484
}
485+
lock_conn_pid = strdup(PQgetvalue(res, 0, 0));
462486
PQclear(res);
463487

464-
elog(DEBUG2, "Obtained ACCESS SHARE lock of %s", table->target_name);
488+
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
489+
table->target_name);
490+
elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name);
491+
if (!(PQsendQuery(conn2, sql.data))) {
492+
printf("Error sending async query: %s\n%s", sql.data, PQerrorMessage(conn2));
493+
exit(1);
494+
}
465495

466-
/* store the backend PID of our connection keeping an ACCESS SHARE
467-
lock on the target table.
496+
/* Now that we've submitted the LOCK TABLE request through conn2,
497+
* look for and cancel any (potentially dangerous) DDL commands which
498+
* might also be waiting on our table lock at this point --
499+
* it's not safe to let them wait, because they may grab their
500+
* AccessExclusive lock before conn2 gets its AccessShare lock,
501+
* and perform unsafe DDL on the table.
502+
*
503+
* XXX: maybe we should use a loop canceling queries, as in
504+
* lock_exclusive().
468505
*/
469-
res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL);
506+
printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, table->target_oid);
507+
res = execute(sql.data, 0, NULL);
470508
if (PQresultStatus(res) != PGRES_TUPLES_OK)
471509
{
472-
printf("%s", PQerrorMessage(conn2));
510+
printf("Error canceling competing queries: %s", PQerrorMessage(connection));
473511
PQclear(res);
474512
exit(1);
475513
}
476-
lock_conn_pid = strdup(PQgetvalue(res, 0, 0));
477-
elog(WARNING, "Have backend PID: %s", lock_conn_pid);
514+
if (PQntuples(res) > 0)
515+
{
516+
elog(WARNING, "Canceled %d unsafe queries. Terminating any remaining PIDs.", PQntuples(res));
517+
518+
if (PQserverVersion(connection) >= 80400)
519+
{
520+
PQclear(res);
521+
printfStringInfo(&sql, KILL_COMPETING_LOCKS, table->target_oid);
522+
res = execute(sql.data, 0, NULL);
523+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
524+
{
525+
printf("Error killing competing queries: %s", PQerrorMessage(connection));
526+
PQclear(res);
527+
exit(1);
528+
}
529+
}
530+
531+
}
532+
else
533+
{
534+
elog(DEBUG2, "No competing DDL to cancel.");
535+
}
478536
PQclear(res);
479537

538+
539+
/* We're finished killing off any unsafe DDL. COMMIT in our main
540+
* connection, so that conn2 may get its AccessShare lock.
541+
*/
542+
command("COMMIT", 0, NULL);
543+
544+
/* Keep looping PQgetResult() calls until it returns NULL, indicating the
545+
* command is done and we have obtained our lock.
546+
*/
547+
while ((res = PQgetResult(conn2)))
548+
{
549+
elog(DEBUG2, "Waiting on ACCESS SHARE lock...");
550+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
551+
{
552+
printf("Error with LOCK TABLE: %s", PQerrorMessage(conn2));
553+
PQclear(res);
554+
exit(1);
555+
}
556+
PQclear(res);
557+
}
558+
559+
480560
/*
481561
* Register the table to be dropped on error. We use pktype as
482562
* an advisory lock. The registration should be done after
@@ -558,7 +638,7 @@ repack_one_table(const repack_table *table, const char *orderby)
558638
*/
559639
for (;;)
560640
{
561-
num = apply_log(table, APPLY_COUNT);
641+
num = apply_log(connection, table, APPLY_COUNT);
562642
if (num > 0)
563643
continue; /* there might be still some tuples, repeat. */
564644

@@ -595,14 +675,17 @@ repack_one_table(const repack_table *table, const char *orderby)
595675
}
596676

597677
/*
598-
* 5. Swap.
678+
* 5. Swap: will be done with conn2, since it already holds an
679+
* AccessShare lock.
599680
*/
600681
elog(DEBUG2, "---- swap ----");
601-
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, TRUE);
602-
apply_log(table, 0);
682+
/* Bump our existing AccessShare lock to AccessExclusive */
683+
lock_exclusive(conn2, utoa(table->target_oid, buffer), table->lock_table,
684+
FALSE);
685+
apply_log(conn2, table, 0);
603686
params[0] = utoa(table->target_oid, buffer);
604-
command("SELECT repack.repack_swap($1)", 1, params);
605-
command("COMMIT", 0, NULL);
687+
pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params);
688+
pgut_command(conn2, "COMMIT", 0, NULL);
606689

607690
/*
608691
* 6. Drop.
@@ -616,6 +699,7 @@ repack_one_table(const repack_table *table, const char *orderby)
616699

617700
pgut_atexit_pop(&repack_cleanup, (void *) table);
618701
free(vxid);
702+
free(lock_conn_pid);
619703

620704
/*
621705
* 7. Analyze.
@@ -639,31 +723,32 @@ repack_one_table(const repack_table *table, const char *orderby)
639723
* Try acquire a table lock but avoid long time locks when conflict.
640724
* Arguments:
641725
*
726+
* conn: connection to use
642727
* relid: OID of relation
643728
* lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed
644-
* release_conn2: whether we should issue a COMMIT in conn2 to release
645-
* its lock.
729+
* start_xact: whether we need to issue a BEGIN;
646730
*/
647731
static void
648-
lock_exclusive(const char *relid, const char *lock_query, bool release_conn2)
732+
lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact)
649733
{
650734
time_t start = time(NULL);
651735
int i;
652-
736+
653737
for (i = 1; ; i++)
654738
{
655739
time_t duration;
656740
char sql[1024];
657741
PGresult *res;
658742
int wait_msec;
659743

660-
command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
744+
if (start_xact)
745+
pgut_command(conn, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
661746

662747
duration = time(NULL) - start;
663748
if (duration > wait_timeout)
664749
{
665750
const char *cancel_query;
666-
if (PQserverVersion(connection) >= 80400 &&
751+
if (PQserverVersion(conn) >= 80400 &&
667752
duration > wait_timeout * 2)
668753
{
669754
elog(WARNING, "terminating conflicted backends");
@@ -681,21 +766,15 @@ lock_exclusive(const char *relid, const char *lock_query, bool release_conn2)
681766
" AND relation = $1 AND pid <> pg_backend_pid()";
682767
}
683768

684-
command(cancel_query, 1, &relid);
769+
pgut_command(conn, cancel_query, 1, &relid);
685770
}
686771

687-
/* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE
688-
* lock.
689-
*/
690-
if (release_conn2)
691-
pgut_command(conn2, "COMMIT", 0, NULL);
692-
693772
/* wait for a while to lock the table. */
694773
wait_msec = Min(1000, i * 100);
695774
snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec);
696775
command(sql, 0, NULL);
697776

698-
res = execute_elevel(lock_query, 0, NULL, DEBUG2);
777+
res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2);
699778
if (PQresultStatus(res) == PGRES_COMMAND_OK)
700779
{
701780
PQclear(res);

0 commit comments

Comments
 (0)