@@ -53,6 +53,24 @@ const char *PROGRAM_VERSION = "unknown";
53
53
"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
54
54
" AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)"
55
55
56
+ /* To be run while our main connection holds an AccessExclusive lock on the
57
+ * target table, and our secondary conn is attempting to grab an AccessShare
58
+ * lock. We know that "granted" must be false for these queries because
59
+ * we already hold the AccessExclusive lock. Also, we only care about other
60
+ * transactions trying to grab an ACCESS EXCLUSIVE lock, because that lock
61
+ * level is needed for any of the disallowed DDL commands, e.g. ALTER TABLE
62
+ * or TRUNCATE.
63
+ */
64
+ #define CANCEL_COMPETING_LOCKS \
65
+ "SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
66
+ " AND granted = false AND relation = %u"\
67
+ " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
68
+
69
+ #define KILL_COMPETING_LOCKS \
70
+ "SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
71
+ " AND granted = false AND relation = %u"\
72
+ " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
73
+
56
74
/*
57
75
* per-table information
58
76
*/
@@ -95,7 +113,7 @@ static void repack_cleanup(bool fatal, void *userdata);
95
113
96
114
static char * getstr (PGresult * res , int row , int col );
97
115
static Oid getoid (PGresult * res , int row , int col );
98
- static void lock_exclusive (const char * relid , const char * lock_query , bool release_conn2 );
116
+ static void lock_exclusive (PGconn * conn , const char * relid , const char * lock_query , bool start_xact );
99
117
100
118
#define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
101
119
#define SQLSTATE_QUERY_CANCELED "57014"
@@ -356,7 +374,7 @@ repack_one_database(const char *orderby, const char *table)
356
374
}
357
375
358
376
static int
359
- apply_log (const repack_table * table , int count )
377
+ apply_log (PGconn * conn , const repack_table * table , int count )
360
378
{
361
379
int result ;
362
380
PGresult * res ;
@@ -370,8 +388,9 @@ apply_log(const repack_table *table, int count)
370
388
params [4 ] = table -> sql_pop ;
371
389
params [5 ] = utoa (count , buffer );
372
390
373
- res = execute ("SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)" ,
374
- 6 , params );
391
+ res = pgut_execute (conn ,
392
+ "SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)" ,
393
+ 6 , params );
375
394
result = atoi (PQgetvalue (res , 0 , 0 ));
376
395
PQclear (res );
377
396
@@ -421,8 +440,7 @@ repack_one_table(const repack_table *table, const char *orderby)
421
440
* 1. Setup workspaces and a trigger.
422
441
*/
423
442
elog (DEBUG2 , "---- setup ----" );
424
- lock_exclusive (utoa (table -> target_oid , buffer ), table -> lock_table , FALSE);
425
-
443
+ lock_exclusive (connection , utoa (table -> target_oid , buffer ), table -> lock_table , TRUE);
426
444
427
445
/*
428
446
* Check z_repack_trigger is the trigger executed at last so that
@@ -444,39 +462,101 @@ repack_one_table(const repack_table *table, const char *orderby)
444
462
command (table -> enable_trigger , 0 , NULL );
445
463
printfStringInfo (& sql , "SELECT repack.disable_autovacuum('repack.log_%u')" , table -> target_oid );
446
464
command (sql .data , 0 , NULL );
447
- command ("COMMIT" , 0 , NULL );
448
465
466
+ /* While we are still holding an AccessExclusive lock on the table, submit
467
+ * the request for an AccessShare lock asynchronously from conn2.
468
+ * We want to submit this query in conn2 while connection's
469
+ * transaction still holds its lock, so that no DDL may sneak in
470
+ * between the time that connection commits and conn2 gets its lock.
471
+ *
472
+ */
449
473
pgut_command (conn2 , "BEGIN ISOLATION LEVEL READ COMMITTED" , 0 , NULL );
450
- elog (DEBUG2 , "Obtaining ACCESS SHARE lock for %s" , table -> target_name );
451
474
452
- /* XXX: table name escaping? */
453
- printfStringInfo ( & sql , "LOCK TABLE %s IN ACCESS SHARE MODE" ,
454
- table -> target_name );
455
- res = pgut_execute (conn2 , sql . data , 0 , NULL );
456
- if (PQresultStatus (res ) != PGRES_COMMAND_OK )
475
+ /* grab the backend PID of conn2; we'll need this when querying
476
+ * pg_locks momentarily.
477
+ */
478
+ res = pgut_execute (conn2 , "SELECT pg_backend_pid()" , 0 , NULL );
479
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
457
480
{
458
481
printf ("%s" , PQerrorMessage (conn2 ));
459
482
PQclear (res );
460
483
exit (1 );
461
484
}
485
+ lock_conn_pid = strdup (PQgetvalue (res , 0 , 0 ));
462
486
PQclear (res );
463
487
464
- elog (DEBUG2 , "Obtained ACCESS SHARE lock of %s" , table -> target_name );
488
+ printfStringInfo (& sql , "LOCK TABLE %s IN ACCESS SHARE MODE" ,
489
+ table -> target_name );
490
+ elog (DEBUG2 , "LOCK TABLE %s IN ACCESS SHARE MODE" , table -> target_name );
491
+ if (!(PQsendQuery (conn2 , sql .data ))) {
492
+ printf ("Error sending async query: %s\n%s" , sql .data , PQerrorMessage (conn2 ));
493
+ exit (1 );
494
+ }
465
495
466
- /* store the backend PID of our connection keeping an ACCESS SHARE
467
- lock on the target table.
496
+ /* Now that we've submitted the LOCK TABLE request through conn2,
497
+ * look for and cancel any (potentially dangerous) DDL commands which
498
+ * might also be waiting on our table lock at this point --
499
+ * it's not safe to let them wait, because they may grab their
500
+ * AccessExclusive lock before conn2 gets its AccessShare lock,
501
+ * and perform unsafe DDL on the table.
502
+ *
503
+ * XXX: maybe we should use a loop canceling queries, as in
504
+ * lock_exclusive().
468
505
*/
469
- res = pgut_execute (conn2 , "SELECT pg_backend_pid()" , 0 , NULL );
506
+ printfStringInfo (& sql , CANCEL_COMPETING_LOCKS , table -> target_oid );
507
+ res = execute (sql .data , 0 , NULL );
470
508
if (PQresultStatus (res ) != PGRES_TUPLES_OK )
471
509
{
472
- printf ("%s" , PQerrorMessage (conn2 ));
510
+ printf ("Error canceling competing queries: %s" , PQerrorMessage (connection ));
473
511
PQclear (res );
474
512
exit (1 );
475
513
}
476
- lock_conn_pid = strdup (PQgetvalue (res , 0 , 0 ));
477
- elog (WARNING , "Have backend PID: %s" , lock_conn_pid );
514
+ if (PQntuples (res ) > 0 )
515
+ {
516
+ elog (WARNING , "Canceled %d unsafe queries. Terminating any remaining PIDs." , PQntuples (res ));
517
+
518
+ if (PQserverVersion (connection ) >= 80400 )
519
+ {
520
+ PQclear (res );
521
+ printfStringInfo (& sql , KILL_COMPETING_LOCKS , table -> target_oid );
522
+ res = execute (sql .data , 0 , NULL );
523
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
524
+ {
525
+ printf ("Error killing competing queries: %s" , PQerrorMessage (connection ));
526
+ PQclear (res );
527
+ exit (1 );
528
+ }
529
+ }
530
+
531
+ }
532
+ else
533
+ {
534
+ elog (DEBUG2 , "No competing DDL to cancel." );
535
+ }
478
536
PQclear (res );
479
537
538
+
539
+ /* We're finished killing off any unsafe DDL. COMMIT in our main
540
+ * connection, so that conn2 may get its AccessShare lock.
541
+ */
542
+ command ("COMMIT" , 0 , NULL );
543
+
544
+ /* Keep looping PQgetResult() calls until it returns NULL, indicating the
545
+ * command is done and we have obtained our lock.
546
+ */
547
+ while ((res = PQgetResult (conn2 )))
548
+ {
549
+ elog (DEBUG2 , "Waiting on ACCESS SHARE lock..." );
550
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
551
+ {
552
+ printf ("Error with LOCK TABLE: %s" , PQerrorMessage (conn2 ));
553
+ PQclear (res );
554
+ exit (1 );
555
+ }
556
+ PQclear (res );
557
+ }
558
+
559
+
480
560
/*
481
561
* Register the table to be dropped on error. We use pktype as
482
562
* an advisory lock. The registration should be done after
@@ -558,7 +638,7 @@ repack_one_table(const repack_table *table, const char *orderby)
558
638
*/
559
639
for (;;)
560
640
{
561
- num = apply_log (table , APPLY_COUNT );
641
+ num = apply_log (connection , table , APPLY_COUNT );
562
642
if (num > 0 )
563
643
continue ; /* there might be still some tuples, repeat. */
564
644
@@ -595,14 +675,17 @@ repack_one_table(const repack_table *table, const char *orderby)
595
675
}
596
676
597
677
/*
598
- * 5. Swap.
678
+ * 5. Swap: will be done with conn2, since it already holds an
679
+ * AccessShare lock.
599
680
*/
600
681
elog (DEBUG2 , "---- swap ----" );
601
- lock_exclusive (utoa (table -> target_oid , buffer ), table -> lock_table , TRUE);
602
- apply_log (table , 0 );
682
+ /* Bump our existing AccessShare lock to AccessExclusive */
683
+ lock_exclusive (conn2 , utoa (table -> target_oid , buffer ), table -> lock_table ,
684
+ FALSE);
685
+ apply_log (conn2 , table , 0 );
603
686
params [0 ] = utoa (table -> target_oid , buffer );
604
- command ( "SELECT repack.repack_swap($1)" , 1 , params );
605
- command ( "COMMIT" , 0 , NULL );
687
+ pgut_command ( conn2 , "SELECT repack.repack_swap($1)" , 1 , params );
688
+ pgut_command ( conn2 , "COMMIT" , 0 , NULL );
606
689
607
690
/*
608
691
* 6. Drop.
@@ -616,6 +699,7 @@ repack_one_table(const repack_table *table, const char *orderby)
616
699
617
700
pgut_atexit_pop (& repack_cleanup , (void * ) table );
618
701
free (vxid );
702
+ free (lock_conn_pid );
619
703
620
704
/*
621
705
* 7. Analyze.
@@ -639,31 +723,32 @@ repack_one_table(const repack_table *table, const char *orderby)
639
723
* Try acquire a table lock but avoid long time locks when conflict.
640
724
* Arguments:
641
725
*
726
+ * conn: connection to use
642
727
* relid: OID of relation
643
728
* lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed
644
- * release_conn2: whether we should issue a COMMIT in conn2 to release
645
- * its lock.
729
+ * start_xact: whether we need to issue a BEGIN;
646
730
*/
647
731
static void
648
- lock_exclusive (const char * relid , const char * lock_query , bool release_conn2 )
732
+ lock_exclusive (PGconn * conn , const char * relid , const char * lock_query , bool start_xact )
649
733
{
650
734
time_t start = time (NULL );
651
735
int i ;
652
-
736
+
653
737
for (i = 1 ; ; i ++ )
654
738
{
655
739
time_t duration ;
656
740
char sql [1024 ];
657
741
PGresult * res ;
658
742
int wait_msec ;
659
743
660
- command ("BEGIN ISOLATION LEVEL READ COMMITTED" , 0 , NULL );
744
+ if (start_xact )
745
+ pgut_command (conn , "BEGIN ISOLATION LEVEL READ COMMITTED" , 0 , NULL );
661
746
662
747
duration = time (NULL ) - start ;
663
748
if (duration > wait_timeout )
664
749
{
665
750
const char * cancel_query ;
666
- if (PQserverVersion (connection ) >= 80400 &&
751
+ if (PQserverVersion (conn ) >= 80400 &&
667
752
duration > wait_timeout * 2 )
668
753
{
669
754
elog (WARNING , "terminating conflicted backends" );
@@ -681,21 +766,15 @@ lock_exclusive(const char *relid, const char *lock_query, bool release_conn2)
681
766
" AND relation = $1 AND pid <> pg_backend_pid()" ;
682
767
}
683
768
684
- command ( cancel_query , 1 , & relid );
769
+ pgut_command ( conn , cancel_query , 1 , & relid );
685
770
}
686
771
687
- /* If necessary, issue a COMMIT in conn2 to release its ACCESS SHARE
688
- * lock.
689
- */
690
- if (release_conn2 )
691
- pgut_command (conn2 , "COMMIT" , 0 , NULL );
692
-
693
772
/* wait for a while to lock the table. */
694
773
wait_msec = Min (1000 , i * 100 );
695
774
snprintf (sql , lengthof (sql ), "SET LOCAL statement_timeout = %d" , wait_msec );
696
775
command (sql , 0 , NULL );
697
776
698
- res = execute_elevel ( lock_query , 0 , NULL , DEBUG2 );
777
+ res = pgut_execute_elevel ( conn , lock_query , 0 , NULL , DEBUG2 );
699
778
if (PQresultStatus (res ) == PGRES_COMMAND_OK )
700
779
{
701
780
PQclear (res );
0 commit comments