|
17 | 17 | #include "access/xact.h"
|
18 | 18 | #include "mb/pg_wchar.h"
|
19 | 19 | #include "miscadmin.h"
|
| 20 | +#include "storage/latch.h" |
20 | 21 | #include "utils/hsearch.h"
|
21 | 22 | #include "utils/memutils.h"
|
22 | 23 |
|
@@ -447,6 +448,78 @@ GetPrepStmtNumber(PGconn *conn)
|
447 | 448 | return ++prep_stmt_number;
|
448 | 449 | }
|
449 | 450 |
|
| 451 | +/* |
| 452 | + * Submit a query and wait for the result. |
| 453 | + * |
| 454 | + * This function is interruptible by signals. |
| 455 | + * |
| 456 | + * Caller is responsible for the error handling on the result. |
| 457 | + */ |
| 458 | +PGresult * |
| 459 | +pgfdw_exec_query(PGconn *conn, const char *query) |
| 460 | +{ |
| 461 | + /* |
| 462 | + * Submit a query. Since we don't use non-blocking mode, this also can |
| 463 | + * block. But its risk is relatively small, so we ignore that for now. |
| 464 | + */ |
| 465 | + if (!PQsendQuery(conn, query)) |
| 466 | + pgfdw_report_error(ERROR, NULL, conn, false, query); |
| 467 | + |
| 468 | + /* Wait for the result. */ |
| 469 | + return pgfdw_get_result(conn, query); |
| 470 | +} |
| 471 | + |
| 472 | +/* |
| 473 | + * Wait for the result from a prior asynchronous execution function call. |
| 474 | + * |
| 475 | + * This function offers quick responsiveness by checking for any interruptions. |
| 476 | + * |
| 477 | + * This function emulates the PQexec()'s behavior of returning the last result |
| 478 | + * when there are many. |
| 479 | + * |
| 480 | + * Caller is responsible for the error handling on the result. |
| 481 | + */ |
| 482 | +PGresult * |
| 483 | +pgfdw_get_result(PGconn *conn, const char *query) |
| 484 | +{ |
| 485 | + PGresult *last_res = NULL; |
| 486 | + |
| 487 | + for (;;) |
| 488 | + { |
| 489 | + PGresult *res; |
| 490 | + |
| 491 | + while (PQisBusy(conn)) |
| 492 | + { |
| 493 | + int wc; |
| 494 | + |
| 495 | + /* Sleep until there's something to do */ |
| 496 | + wc = WaitLatchOrSocket(MyLatch, |
| 497 | + WL_LATCH_SET | WL_SOCKET_READABLE, |
| 498 | + PQsocket(conn), |
| 499 | + -1L); |
| 500 | + ResetLatch(MyLatch); |
| 501 | + |
| 502 | + CHECK_FOR_INTERRUPTS(); |
| 503 | + |
| 504 | + /* Data available in socket */ |
| 505 | + if (wc & WL_SOCKET_READABLE) |
| 506 | + { |
| 507 | + if (!PQconsumeInput(conn)) |
| 508 | + pgfdw_report_error(ERROR, NULL, conn, false, query); |
| 509 | + } |
| 510 | + } |
| 511 | + |
| 512 | + res = PQgetResult(conn); |
| 513 | + if (res == NULL) |
| 514 | + break; /* query is complete */ |
| 515 | + |
| 516 | + PQclear(last_res); |
| 517 | + last_res = res; |
| 518 | + } |
| 519 | + |
| 520 | + return last_res; |
| 521 | +} |
| 522 | + |
450 | 523 | /*
|
451 | 524 | * Report an error we got from the remote server.
|
452 | 525 | *
|
@@ -598,6 +671,32 @@ pgfdw_xact_callback(XactEvent event, void *arg)
|
598 | 671 | case XACT_EVENT_ABORT:
|
599 | 672 | /* Assume we might have lost track of prepared statements */
|
600 | 673 | entry->have_error = true;
|
| 674 | + |
| 675 | + /* |
| 676 | + * If a command has been submitted to the remote server by |
| 677 | + * using an asynchronous execution function, the command |
| 678 | + * might not have yet completed. Check to see if a command |
| 679 | + * is still being processed by the remote server, and if so, |
| 680 | + * request cancellation of the command; if not, abort |
| 681 | + * gracefully. |
| 682 | + */ |
| 683 | + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) |
| 684 | + { |
| 685 | + PGcancel *cancel; |
| 686 | + char errbuf[256]; |
| 687 | + |
| 688 | + if ((cancel = PQgetCancel(entry->conn))) |
| 689 | + { |
| 690 | + if (!PQcancel(cancel, errbuf, sizeof(errbuf))) |
| 691 | + ereport(WARNING, |
| 692 | + (errcode(ERRCODE_CONNECTION_FAILURE), |
| 693 | + errmsg("could not send cancel request: %s", |
| 694 | + errbuf))); |
| 695 | + PQfreeCancel(cancel); |
| 696 | + } |
| 697 | + break; |
| 698 | + } |
| 699 | + |
601 | 700 | /* If we're aborting, abort all remote transactions too */
|
602 | 701 | res = PQexec(entry->conn, "ABORT TRANSACTION");
|
603 | 702 | /* Note: can't throw ERROR, it would be infinite loop */
|
|
0 commit comments