@@ -95,6 +95,13 @@ static uint32 pgfdw_we_get_result = 0;
95
95
*/
96
96
#define CONNECTION_CLEANUP_TIMEOUT 30000
97
97
98
+ /*
99
+ * Milliseconds to wait before issuing another cancel request. This covers
100
+ * the race condition where the remote session ignored our cancel request
101
+ * because it arrived while idle.
102
+ */
103
+ #define RETRY_CANCEL_TIMEOUT 1000
104
+
98
105
/* Macro for constructing abort command to be sent */
99
106
#define CONSTRUCT_ABORT_COMMAND (sql , entry , toplevel ) \
100
107
do { \
@@ -145,6 +152,7 @@ static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
145
152
static bool pgfdw_cancel_query (PGconn * conn );
146
153
static bool pgfdw_cancel_query_begin (PGconn * conn , TimestampTz endtime );
147
154
static bool pgfdw_cancel_query_end (PGconn * conn , TimestampTz endtime ,
155
+ TimestampTz retrycanceltime ,
148
156
bool consume_input );
149
157
static bool pgfdw_exec_cleanup_query (PGconn * conn , const char * query ,
150
158
bool ignore_errors );
@@ -154,6 +162,7 @@ static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
154
162
bool consume_input ,
155
163
bool ignore_errors );
156
164
static bool pgfdw_get_cleanup_result (PGconn * conn , TimestampTz endtime ,
165
+ TimestampTz retrycanceltime ,
157
166
PGresult * * result , bool * timed_out );
158
167
static void pgfdw_abort_cleanup (ConnCacheEntry * entry , bool toplevel );
159
168
static bool pgfdw_abort_cleanup_begin (ConnCacheEntry * entry , bool toplevel ,
@@ -1322,18 +1331,25 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1322
1331
static bool
1323
1332
pgfdw_cancel_query (PGconn * conn )
1324
1333
{
1334
+ TimestampTz now = GetCurrentTimestamp ();
1325
1335
TimestampTz endtime ;
1336
+ TimestampTz retrycanceltime ;
1326
1337
1327
1338
/*
1328
1339
* If it takes too long to cancel the query and discard the result, assume
1329
1340
* the connection is dead.
1330
1341
*/
1331
- endtime = TimestampTzPlusMilliseconds (GetCurrentTimestamp (),
1332
- CONNECTION_CLEANUP_TIMEOUT );
1342
+ endtime = TimestampTzPlusMilliseconds (now , CONNECTION_CLEANUP_TIMEOUT );
1343
+
1344
+ /*
1345
+ * Also, lose patience and re-issue the cancel request after a little bit.
1346
+ * (This serves to close some race conditions.)
1347
+ */
1348
+ retrycanceltime = TimestampTzPlusMilliseconds (now , RETRY_CANCEL_TIMEOUT );
1333
1349
1334
1350
if (!pgfdw_cancel_query_begin (conn , endtime ))
1335
1351
return false;
1336
- return pgfdw_cancel_query_end (conn , endtime , false);
1352
+ return pgfdw_cancel_query_end (conn , endtime , retrycanceltime , false);
1337
1353
}
1338
1354
1339
1355
/*
@@ -1359,9 +1375,10 @@ pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
1359
1375
}
1360
1376
1361
1377
static bool
1362
- pgfdw_cancel_query_end (PGconn * conn , TimestampTz endtime , bool consume_input )
1378
+ pgfdw_cancel_query_end (PGconn * conn , TimestampTz endtime ,
1379
+ TimestampTz retrycanceltime , bool consume_input )
1363
1380
{
1364
- PGresult * result = NULL ;
1381
+ PGresult * result ;
1365
1382
bool timed_out ;
1366
1383
1367
1384
/*
@@ -1380,7 +1397,8 @@ pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
1380
1397
}
1381
1398
1382
1399
/* Get and discard the result of the query. */
1383
- if (pgfdw_get_cleanup_result (conn , endtime , & result , & timed_out ))
1400
+ if (pgfdw_get_cleanup_result (conn , endtime , retrycanceltime ,
1401
+ & result , & timed_out ))
1384
1402
{
1385
1403
if (timed_out )
1386
1404
ereport (WARNING ,
@@ -1453,7 +1471,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1453
1471
TimestampTz endtime , bool consume_input ,
1454
1472
bool ignore_errors )
1455
1473
{
1456
- PGresult * result = NULL ;
1474
+ PGresult * result ;
1457
1475
bool timed_out ;
1458
1476
1459
1477
Assert (query != NULL );
@@ -1471,7 +1489,7 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1471
1489
}
1472
1490
1473
1491
/* Get the result of the query. */
1474
- if (pgfdw_get_cleanup_result (conn , endtime , & result , & timed_out ))
1492
+ if (pgfdw_get_cleanup_result (conn , endtime , endtime , & result , & timed_out ))
1475
1493
{
1476
1494
if (timed_out )
1477
1495
ereport (WARNING ,
@@ -1495,28 +1513,36 @@ pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1495
1513
}
1496
1514
1497
1515
/*
1498
- * Get, during abort cleanup, the result of a query that is in progress. This
1499
- * might be a query that is being interrupted by transaction abort, or it might
1500
- * be a query that was initiated as part of transaction abort to get the remote
1501
- * side back to the appropriate state.
1516
+ * Get, during abort cleanup, the result of a query that is in progress.
1517
+ * This might be a query that is being interrupted by a cancel request or by
1518
+ * transaction abort, or it might be a query that was initiated as part of
1519
+ * transaction abort to get the remote side back to the appropriate state.
1520
+ *
1521
+ * endtime is the time at which we should give up and assume the remote side
1522
+ * is dead. retrycanceltime is the time at which we should issue a fresh
1523
+ * cancel request (pass the same value as endtime if this is not wanted).
1502
1524
*
1503
- * endtime is the time at which we should give up and assume the remote
1504
- * side is dead. Returns true if the timeout expired or connection trouble
1505
- * occurred, false otherwise. Sets *result except in case of a timeout.
1506
- * Sets timed_out to true only when the timeout expired.
1525
+ * Returns true if the timeout expired or connection trouble occurred,
1526
+ * false otherwise. Sets *result except in case of a true result.
1527
+ * Sets *timed_out to true only when the timeout expired.
1507
1528
*/
1508
1529
static bool
1509
- pgfdw_get_cleanup_result (PGconn * conn , TimestampTz endtime , PGresult * * result ,
1530
+ pgfdw_get_cleanup_result (PGconn * conn , TimestampTz endtime ,
1531
+ TimestampTz retrycanceltime ,
1532
+ PGresult * * result ,
1510
1533
bool * timed_out )
1511
1534
{
1512
1535
volatile bool failed = false;
1513
1536
PGresult * volatile last_res = NULL ;
1514
1537
1538
+ * result = NULL ;
1515
1539
* timed_out = false;
1516
1540
1517
1541
/* In what follows, do not leak any PGresults on an error. */
1518
1542
PG_TRY ();
1519
1543
{
1544
+ int canceldelta = RETRY_CANCEL_TIMEOUT * 2 ;
1545
+
1520
1546
for (;;)
1521
1547
{
1522
1548
PGresult * res ;
@@ -1527,8 +1553,33 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
1527
1553
TimestampTz now = GetCurrentTimestamp ();
1528
1554
long cur_timeout ;
1529
1555
1556
+ /* If timeout has expired, give up. */
1557
+ if (now >= endtime )
1558
+ {
1559
+ * timed_out = true;
1560
+ failed = true;
1561
+ goto exit ;
1562
+ }
1563
+
1564
+ /* If we need to re-issue the cancel request, do that. */
1565
+ if (now >= retrycanceltime )
1566
+ {
1567
+ /* We ignore failure to issue the repeated request. */
1568
+ (void ) libpqsrv_cancel (conn , endtime );
1569
+
1570
+ /* Recompute "now" in case that took measurable time. */
1571
+ now = GetCurrentTimestamp ();
1572
+
1573
+ /* Adjust re-cancel timeout in increasing steps. */
1574
+ retrycanceltime = TimestampTzPlusMilliseconds (now ,
1575
+ canceldelta );
1576
+ canceldelta += canceldelta ;
1577
+ }
1578
+
1530
1579
/* If timeout has expired, give up, else get sleep time. */
1531
- cur_timeout = TimestampDifferenceMilliseconds (now , endtime );
1580
+ cur_timeout = TimestampDifferenceMilliseconds (now ,
1581
+ Min (endtime ,
1582
+ retrycanceltime ));
1532
1583
if (cur_timeout <= 0 )
1533
1584
{
1534
1585
* timed_out = true;
@@ -1849,7 +1900,9 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1849
1900
foreach (lc , cancel_requested )
1850
1901
{
1851
1902
ConnCacheEntry * entry = (ConnCacheEntry * ) lfirst (lc );
1903
+ TimestampTz now = GetCurrentTimestamp ();
1852
1904
TimestampTz endtime ;
1905
+ TimestampTz retrycanceltime ;
1853
1906
char sql [100 ];
1854
1907
1855
1908
Assert (entry -> changing_xact_state );
@@ -1863,10 +1916,13 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1863
1916
* remaining entries in the list, leading to slamming that entry's
1864
1917
* connection shut.
1865
1918
*/
1866
- endtime = TimestampTzPlusMilliseconds (GetCurrentTimestamp () ,
1919
+ endtime = TimestampTzPlusMilliseconds (now ,
1867
1920
CONNECTION_CLEANUP_TIMEOUT );
1921
+ retrycanceltime = TimestampTzPlusMilliseconds (now ,
1922
+ RETRY_CANCEL_TIMEOUT );
1868
1923
1869
- if (!pgfdw_cancel_query_end (entry -> conn , endtime , true))
1924
+ if (!pgfdw_cancel_query_end (entry -> conn , endtime ,
1925
+ retrycanceltime , true))
1870
1926
{
1871
1927
/* Unable to cancel running query */
1872
1928
pgfdw_reset_xact_state (entry , toplevel );
0 commit comments