@@ -35,14 +35,20 @@ const char *PROGRAM_VERSION = "unknown";
35
35
*/
36
36
#define APPLY_COUNT 1000
37
37
38
- /* The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
38
+ /* Record the PIDs of any possibly-conflicting transactions. Ignore the PID
39
+ * of our primary connection, and our second connection holding an
40
+ * ACCESS SHARE table lock.
41
+ * The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
39
42
* servers. See GH ticket #1.
40
43
*/
41
44
#define SQL_XID_SNAPSHOT \
42
45
"SELECT repack.array_accum(virtualtransaction) FROM pg_locks"\
43
- " WHERE locktype = 'virtualxid' AND pid <> pg_backend_pid()"\
46
+ " WHERE locktype = 'virtualxid' AND pid NOT IN ( pg_backend_pid(), $1 )"\
44
47
" AND (virtualxid, virtualtransaction) <> ('1/1', '-1/0')"
45
48
49
+ /* Later, check whether any of the transactions we saw before are still
50
+ * alive, and wait for them to go away.
51
+ */
46
52
#define SQL_XID_ALIVE \
47
53
"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
48
54
" AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)"
@@ -89,7 +95,7 @@ static void repack_cleanup(bool fatal, void *userdata);
89
95
90
96
static char * getstr (PGresult * res , int row , int col );
91
97
static Oid getoid (PGresult * res , int row , int col );
92
- static void lock_exclusive (const char * relid , const char * lock_query );
98
+ static void lock_exclusive (const char * relid , const char * lock_query , bool release_conn2 );
93
99
94
100
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
95
101
#define SQLSTATE_QUERY_CANCELED "57014"
@@ -384,6 +390,7 @@ repack_one_table(const repack_table *table, const char *orderby)
384
390
int i ;
385
391
int num_waiting = 0 ;
386
392
char * vxid ;
393
+ char * lock_conn_pid ;
387
394
char buffer [12 ];
388
395
StringInfoData sql ;
389
396
@@ -414,7 +421,8 @@ repack_one_table(const repack_table *table, const char *orderby)
414
421
* 1. Setup workspaces and a trigger.
415
422
*/
416
423
elog (DEBUG2 , "---- setup ----" );
417
- lock_exclusive (utoa (table -> target_oid , buffer ), table -> lock_table );
424
+ lock_exclusive (utoa (table -> target_oid , buffer ), table -> lock_table , FALSE);
425
+
418
426
419
427
/*
420
428
* Check z_repack_trigger is the trigger executed at last so that
@@ -438,6 +446,37 @@ repack_one_table(const repack_table *table, const char *orderby)
438
446
command (sql .data , 0 , NULL );
439
447
command ("COMMIT" , 0 , NULL );
440
448
449
+ PQclear (PQexec (conn2 , "BEGIN ISOLATION LEVEL READ COMMITTED" ));
450
+ elog (DEBUG2 , "Obtaining ACCESS SHARE lock for %s" , table -> target_name );
451
+
452
+ /* XXX: table name escaping? */
453
+ printfStringInfo (& sql , "LOCK TABLE %s IN ACCESS SHARE MODE" ,
454
+ table -> target_name );
455
+ res = PQexec (conn2 , sql .data );
456
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
457
+ {
458
+ printf ("%s" , PQerrorMessage (conn2 ));
459
+ PQclear (res );
460
+ exit (1 );
461
+ }
462
+ PQclear (res );
463
+
464
+ elog (DEBUG2 , "Obtained ACCESS SHARE lock of %s" , table -> target_name );
465
+
466
+ /* store the backend PID of our connection keeping an ACCESS SHARE
467
+ lock on the target table.
468
+ */
469
+ res = PQexec (conn2 , "SELECT pg_backend_pid()" );
470
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
471
+ {
472
+ printf ("%s" , PQerrorMessage (conn2 ));
473
+ PQclear (res );
474
+ exit (1 );
475
+ }
476
+ lock_conn_pid = strdup (PQgetvalue (res , 0 , 0 ));
477
+ elog (WARNING , "Have backend PID: %s" , lock_conn_pid );
478
+ PQclear (res );
479
+
441
480
/*
442
481
* Register the table to be dropped on error. We use pktype as
443
482
* an advisory lock. The registration should be done after
@@ -455,9 +494,14 @@ repack_one_table(const repack_table *table, const char *orderby)
455
494
command ("SELECT set_config('work_mem', current_setting('maintenance_work_mem'), true)" , 0 , NULL );
456
495
if (orderby && !orderby [0 ])
457
496
command ("SET LOCAL synchronize_seqscans = off" , 0 , NULL );
458
- res = execute (SQL_XID_SNAPSHOT , 0 , NULL );
497
+
498
+ /* Fetch an array of Virtual IDs of all transactions active right now.
499
+ */
500
+ params [0 ] = lock_conn_pid ;
501
+ res = execute (SQL_XID_SNAPSHOT , 1 , params );
459
502
vxid = strdup (PQgetvalue (res , 0 , 0 ));
460
503
PQclear (res );
504
+
461
505
command (table -> delete_log , 0 , NULL );
462
506
command (table -> create_table , 0 , NULL );
463
507
printfStringInfo (& sql , "SELECT repack.disable_autovacuum('repack.table_%u')" , table -> target_oid );
@@ -554,7 +598,7 @@ repack_one_table(const repack_table *table, const char *orderby)
554
598
* 5. Swap.
555
599
*/
556
600
elog (DEBUG2 , "---- swap ----" );
557
- lock_exclusive (utoa (table -> target_oid , buffer ), table -> lock_table );
601
+ lock_exclusive (utoa (table -> target_oid , buffer ), table -> lock_table , TRUE );
558
602
apply_log (table , 0 );
559
603
params [0 ] = utoa (table -> target_oid , buffer );
560
604
command ("SELECT repack.repack_swap($1)" , 1 , params );
@@ -593,9 +637,15 @@ repack_one_table(const repack_table *table, const char *orderby)
593
637
594
638
/*
595
639
* Try acquire a table lock but avoid long time locks when conflict.
640
+ * Arguments:
641
+ *
642
+ * relid: OID of relation
643
+ * lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed
644
+ * release_conn2: whether we should issue a COMMIT in conn2 to release
645
+ * its lock.
596
646
*/
597
647
static void
598
- lock_exclusive (const char * relid , const char * lock_query )
648
+ lock_exclusive (const char * relid , const char * lock_query , bool release_conn2 )
599
649
{
600
650
time_t start = time (NULL );
601
651
int i ;
@@ -634,6 +684,12 @@ lock_exclusive(const char *relid, const char *lock_query)
634
684
command (cancel_query , 1 , & relid );
635
685
}
636
686
687
+ /* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE
688
+ * lock.
689
+ */
690
+ if (release_conn2 )
691
+ PQclear (PQexec (conn2 , "COMMIT" ));
692
+
637
693
/* wait for a while to lock the table. */
638
694
wait_msec = Min (1000 , i * 100 );
639
695
snprintf (sql , lengthof (sql ), "SET LOCAL statement_timeout = %d" , wait_msec );
@@ -683,6 +739,9 @@ repack_cleanup(bool fatal, void *userdata)
683
739
const char * params [1 ];
684
740
685
741
/* Rollback current transaction */
742
+ if (conn2 )
743
+ PQclear (PQexec (conn2 , "ROLLBACK" ));
744
+
686
745
if (connection )
687
746
command ("ROLLBACK" , 0 , NULL );
688
747
0 commit comments