Skip to content

Commit 78bae38

Browse files
schmiddydvarrazzo
authored andcommitted
Take an ACCESS SHARE LOCK on the target table, in an initial attempt to prevent concurrent DDL.
This is a first pass at Daniele's suggestion in Issue #8, although it is definitely still buggy -- it is still possible for another transaction to get in an AccessExclusive lock and perform DDL either before the ACCESS SHARE lock is acquired or immediately after it is released.
1 parent e028116 commit 78bae38

File tree

3 files changed

+69
-7
lines changed

3 files changed

+69
-7
lines changed

bin/pg_repack.c

Lines changed: 66 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,20 @@ const char *PROGRAM_VERSION = "unknown";
3535
*/
3636
#define APPLY_COUNT 1000
3737

38-
/* The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
38+
/* Record the PIDs of any possibly-conflicting transactions. Ignore the PID
39+
* of our primary connection, and our second connection holding an
40+
* ACCESS SHARE table lock.
41+
* The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
3942
* servers. See GH ticket #1.
4043
*/
4144
#define SQL_XID_SNAPSHOT \
4245
"SELECT repack.array_accum(virtualtransaction) FROM pg_locks"\
43-
" WHERE locktype = 'virtualxid' AND pid <> pg_backend_pid()"\
46+
" WHERE locktype = 'virtualxid' AND pid NOT IN (pg_backend_pid(), $1)"\
4447
" AND (virtualxid, virtualtransaction) <> ('1/1', '-1/0')"
4548

49+
/* Later, check whether any of the transactions we saw before are still
50+
* alive, and wait for them to go away.
51+
*/
4652
#define SQL_XID_ALIVE \
4753
"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
4854
" AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)"
@@ -89,7 +95,7 @@ static void repack_cleanup(bool fatal, void *userdata);
8995

9096
static char *getstr(PGresult *res, int row, int col);
9197
static Oid getoid(PGresult *res, int row, int col);
92-
static void lock_exclusive(const char *relid, const char *lock_query);
98+
static void lock_exclusive(const char *relid, const char *lock_query, bool release_conn2);
9399

94100
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
95101
#define SQLSTATE_QUERY_CANCELED "57014"
@@ -384,6 +390,7 @@ repack_one_table(const repack_table *table, const char *orderby)
384390
int i;
385391
int num_waiting = 0;
386392
char *vxid;
393+
char *lock_conn_pid;
387394
char buffer[12];
388395
StringInfoData sql;
389396

@@ -414,7 +421,8 @@ repack_one_table(const repack_table *table, const char *orderby)
414421
* 1. Setup workspaces and a trigger.
415422
*/
416423
elog(DEBUG2, "---- setup ----");
417-
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table);
424+
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, FALSE);
425+
418426

419427
/*
420428
* Check z_repack_trigger is the trigger executed at last so that
@@ -438,6 +446,37 @@ repack_one_table(const repack_table *table, const char *orderby)
438446
command(sql.data, 0, NULL);
439447
command("COMMIT", 0, NULL);
440448

449+
PQclear(PQexec(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED"));
450+
elog(DEBUG2, "Obtaining ACCESS SHARE lock for %s", table->target_name);
451+
452+
/* XXX: table name escaping? */
453+
printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
454+
table->target_name);
455+
res = PQexec(conn2, sql.data);
456+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
457+
{
458+
printf("%s", PQerrorMessage(conn2));
459+
PQclear(res);
460+
exit(1);
461+
}
462+
PQclear(res);
463+
464+
elog(DEBUG2, "Obtained ACCESS SHARE lock of %s", table->target_name);
465+
466+
/* store the backend PID of our connection keeping an ACCESS SHARE
467+
lock on the target table.
468+
*/
469+
res = PQexec(conn2, "SELECT pg_backend_pid()");
470+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
471+
{
472+
printf("%s", PQerrorMessage(conn2));
473+
PQclear(res);
474+
exit(1);
475+
}
476+
lock_conn_pid = strdup(PQgetvalue(res, 0, 0));
477+
elog(WARNING, "Have backend PID: %s", lock_conn_pid);
478+
PQclear(res);
479+
441480
/*
442481
* Register the table to be dropped on error. We use pktype as
443482
* an advisory lock. The registration should be done after
@@ -455,9 +494,14 @@ repack_one_table(const repack_table *table, const char *orderby)
455494
command("SELECT set_config('work_mem', current_setting('maintenance_work_mem'), true)", 0, NULL);
456495
if (orderby && !orderby[0])
457496
command("SET LOCAL synchronize_seqscans = off", 0, NULL);
458-
res = execute(SQL_XID_SNAPSHOT, 0, NULL);
497+
498+
/* Fetch an array of Virtual IDs of all transactions active right now.
499+
*/
500+
params[0] = lock_conn_pid;
501+
res = execute(SQL_XID_SNAPSHOT, 1, params);
459502
vxid = strdup(PQgetvalue(res, 0, 0));
460503
PQclear(res);
504+
461505
command(table->delete_log, 0, NULL);
462506
command(table->create_table, 0, NULL);
463507
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
@@ -554,7 +598,7 @@ repack_one_table(const repack_table *table, const char *orderby)
554598
* 5. Swap.
555599
*/
556600
elog(DEBUG2, "---- swap ----");
557-
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table);
601+
lock_exclusive(utoa(table->target_oid, buffer), table->lock_table, TRUE);
558602
apply_log(table, 0);
559603
params[0] = utoa(table->target_oid, buffer);
560604
command("SELECT repack.repack_swap($1)", 1, params);
@@ -593,9 +637,15 @@ repack_one_table(const repack_table *table, const char *orderby)
593637

594638
/*
595639
* Try acquire a table lock but avoid long time locks when conflict.
640+
* Arguments:
641+
*
642+
* relid: OID of relation
643+
* lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed
644+
* release_conn2: whether we should issue a COMMIT in conn2 to release
645+
* its lock.
596646
*/
597647
static void
598-
lock_exclusive(const char *relid, const char *lock_query)
648+
lock_exclusive(const char *relid, const char *lock_query, bool release_conn2)
599649
{
600650
time_t start = time(NULL);
601651
int i;
@@ -634,6 +684,12 @@ lock_exclusive(const char *relid, const char *lock_query)
634684
command(cancel_query, 1, &relid);
635685
}
636686

687+
/* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE
688+
* lock.
689+
*/
690+
if (release_conn2)
691+
PQclear(PQexec(conn2, "COMMIT"));
692+
637693
/* wait for a while to lock the table. */
638694
wait_msec = Min(1000, i * 100);
639695
snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec);
@@ -683,6 +739,9 @@ repack_cleanup(bool fatal, void *userdata)
683739
const char *params[1];
684740

685741
/* Rollback current transaction */
742+
if (conn2)
743+
PQclear(PQexec(conn2, "ROLLBACK"));
744+
686745
if (connection)
687746
command("ROLLBACK", 0, NULL);
688747

bin/pgut/pgut-fe.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ char *password = NULL;
2424
YesNo prompt_password = DEFAULT;
2525

2626
PGconn *connection = NULL;
27+
PGconn *conn2 = NULL;
2728

2829
static bool parse_pair(const char buffer[], char key[], char value[]);
2930
static char *get_username(void);
@@ -51,6 +52,7 @@ reconnect(int elevel)
5152
appendStringInfo(&buf, "password=%s ", password);
5253

5354
connection = pgut_connect(buf.data, prompt_password, elevel);
55+
conn2 = pgut_connect(buf.data, prompt_password, elevel);
5456

5557
/* update password */
5658
if (connection)

bin/pgut/pgut-fe.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ extern char *password;
5656
extern YesNo prompt_password;
5757

5858
extern PGconn *connection;
59+
extern PGconn *conn2;
5960

6061
extern void pgut_help(bool details);
6162
extern void help(bool details);

0 commit comments

Comments
 (0)