Skip to content

Commit 6fa240f

Browse files
committed
Integrate FDW with XTM
1 parent dee9a9a commit 6fa240f

File tree

4 files changed

+87
-37
lines changed

4 files changed

+87
-37
lines changed

contrib/pg_xtm/pg_dtm.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,8 @@ TransactionId
327327
DtmGetNewTransactionId(bool isSubXact)
328328
{
329329
TransactionId xid;
330+
331+
XTM_INFO("%d: GetNewTransactionId\n", getpid());
330332

331333
/*
332334
* Workers synchronize transaction state at the beginning of each parallel
@@ -580,9 +582,9 @@ static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn)
580582

581583
static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
582584
{
583-
XTM_TRACE("XTM: DtmSetTransactionStatus %u = %u \n", xid, status);
585+
XTM_INFO("%d: DtmSetTransactionStatus %u = %u\n", getpid(), xid, status);
584586
if (!RecoveryInProgress()) {
585-
if (TransactionIdIsValid(DtmNextXid)) {
587+
if (!DtmIsGlobalTransaction && TransactionIdIsValid(DtmNextXid)) {
586588
/* Already should be IN_PROGRESS */
587589
/* CLOGTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_IN_PROGRESS, lsn); */
588590
CurrentTransactionSnapshot = NULL;
@@ -638,6 +640,8 @@ static void DtmInitialize()
638640
dtm->minXid = InvalidTransactionId;
639641
dtm->activeSnapshot.xip = (TransactionId*)ShmemAlloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
640642
dtm->activeSnapshot.subxip = (TransactionId*)ShmemAlloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
643+
644+
RegisterXactCallback(DtmXactCallback, NULL);
641645
}
642646
LWLockRelease(AddinShmemInitLock);
643647

@@ -652,7 +656,6 @@ static void DtmInitialize()
652656
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
653657
);
654658

655-
RegisterXactCallback(DtmXactCallback, NULL);
656659

657660
TM = &DtmTM;
658661
}

contrib/postgres_fdw/connection.c

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#include "utils/hsearch.h"
2121
#include "utils/memutils.h"
2222

23+
#undef DEBUG3
24+
#define DEBUG3 WARNING
2325

2426
/*
2527
* Connection cache hash table entry
@@ -68,6 +70,8 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
6870
static void check_conn_params(const char **keywords, const char **values);
6971
static void configure_remote_session(PGconn *conn);
7072
static void do_sql_command(PGconn *conn, const char *sql);
73+
static void do_sql_send_command(PGconn *conn, const char *sql);
74+
static void do_sql_wait_command(PGconn *conn, const char *sql);
7175
static void begin_remote_xact(ConnCacheEntry *entry);
7276
static void pgfdw_xact_callback(XactEvent event, void *arg);
7377
static void pgfdw_subxact_callback(SubXactEvent event,
@@ -358,6 +362,27 @@ do_sql_command(PGconn *conn, const char *sql)
358362
PQclear(res);
359363
}
360364

365+
static void
366+
do_sql_send_command(PGconn *conn, const char *sql)
367+
{
368+
if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK) {
369+
PGresult *res = PQgetResult(conn);
370+
pgfdw_report_error(ERROR, res, conn, true, sql);
371+
PQclear(res);
372+
}
373+
}
374+
375+
static void
376+
do_sql_wait_command(PGconn *conn, const char *sql)
377+
{
378+
PGresult *res;
379+
while ((res = PQgetResult(conn)) != NULL) {
380+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
381+
pgfdw_report_error(ERROR, res, conn, true, sql);
382+
PQclear(res);
383+
}
384+
}
385+
361386
/*
362387
* Start remote transaction or subtransaction, if needed.
363388
*
@@ -541,16 +566,35 @@ pgfdw_xact_callback(XactEvent event, void *arg)
541566
/* If it has an open remote transaction, try to close it */
542567
if (entry->xact_depth > 0)
543568
{
544-
elog(DEBUG3, "closing remote transaction on connection %p",
545-
entry->conn);
569+
elog(DEBUG3, "closing remote transaction on connection %p event %d",
570+
entry->conn, event);
546571

547572
switch (event)
548573
{
549574
case XACT_EVENT_PARALLEL_PRE_COMMIT:
550575
case XACT_EVENT_PRE_COMMIT:
551576
/* Commit all remote transactions during pre-commit */
552-
do_sql_command(entry->conn, "COMMIT TRANSACTION");
577+
do_sql_send_command(entry->conn, "COMMIT TRANSACTION");
578+
continue;
579+
case XACT_EVENT_PRE_PREPARE:
553580

581+
/*
582+
* We disallow remote transactions that modified anything,
583+
* since it's not very reasonable to hold them open until
584+
* the prepared transaction is committed. For the moment,
585+
* throw error unconditionally; later we might allow
586+
* read-only cases. Note that the error will cause us to
587+
* come right back here with event == XACT_EVENT_ABORT, so
588+
* we'll clean up the connection state at that point.
589+
*/
590+
ereport(ERROR,
591+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
592+
errmsg("cannot prepare a transaction that modified remote tables")));
593+
break;
594+
case XACT_EVENT_PARALLEL_COMMIT:
595+
case XACT_EVENT_COMMIT:
596+
case XACT_EVENT_PREPARE:
597+
do_sql_wait_command(entry->conn, "COMMIT TRANSACTION");
554598
/*
555599
* If there were any errors in subtransactions, and we
556600
* made prepared statements, do a DEALLOCATE ALL to make
@@ -574,27 +618,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
574618
entry->have_prep_stmt = false;
575619
entry->have_error = false;
576620
break;
577-
case XACT_EVENT_PRE_PREPARE:
578-
579-
/*
580-
* We disallow remote transactions that modified anything,
581-
* since it's not very reasonable to hold them open until
582-
* the prepared transaction is committed. For the moment,
583-
* throw error unconditionally; later we might allow
584-
* read-only cases. Note that the error will cause us to
585-
* come right back here with event == XACT_EVENT_ABORT, so
586-
* we'll clean up the connection state at that point.
587-
*/
588-
ereport(ERROR,
589-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
590-
errmsg("cannot prepare a transaction that modified remote tables")));
591-
break;
592-
case XACT_EVENT_PARALLEL_COMMIT:
593-
case XACT_EVENT_COMMIT:
594-
case XACT_EVENT_PREPARE:
595-
/* Pre-commit should have closed the open transaction */
596-
elog(ERROR, "missed cleaning up connection during pre-commit");
597-
break;
598621
case XACT_EVENT_PARALLEL_ABORT:
599622
case XACT_EVENT_ABORT:
600623
/* Assume we might have lost track of prepared statements */
@@ -631,21 +654,23 @@ pgfdw_xact_callback(XactEvent event, void *arg)
631654
if (PQstatus(entry->conn) != CONNECTION_OK ||
632655
PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
633656
{
634-
elog(DEBUG3, "discarding connection %p", entry->conn);
657+
elog(DEBUG3, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn));
635658
PQfinish(entry->conn);
636659
entry->conn = NULL;
637660
}
638661
}
639662

640-
/*
641-
* Regardless of the event type, we can now mark ourselves as out of the
642-
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
643-
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
644-
*/
645-
xact_got_connection = false;
646-
647-
/* Also reset cursor numbering for next transaction */
648-
cursor_number = 0;
663+
if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT) {
664+
/*
665+
* Regardless of the event type, we can now mark ourselves as out of the
666+
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
667+
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
668+
*/
669+
xact_got_connection = false;
670+
671+
/* Also reset cursor numbering for next transaction */
672+
cursor_number = 0;
673+
}
649674
}
650675

651676
/*

contrib/postgres_fdw/postgres_fdw--1.0.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,8 @@ LANGUAGE C STRICT;
1616
CREATE FOREIGN DATA WRAPPER postgres_fdw
1717
HANDLER postgres_fdw_handler
1818
VALIDATOR postgres_fdw_validator;
19+
20+
CREATE FUNCTION postgres_fdw_exec(relid oid, sql cstring)
21+
RETURNS void
22+
AS 'MODULE_PATHNAME'
23+
LANGUAGE C STRICT;

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ typedef struct
230230
* SQL functions
231231
*/
232232
PG_FUNCTION_INFO_V1(postgres_fdw_handler);
233+
PG_FUNCTION_INFO_V1(postgres_fdw_exec);
233234

234235
/*
235236
* FDW callback routines
@@ -2994,3 +2995,19 @@ conversion_error_callback(void *arg)
29942995
NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
29952996
RelationGetRelationName(errpos->rel));
29962997
}
2998+
2999+
Datum
3000+
postgres_fdw_exec(PG_FUNCTION_ARGS)
3001+
{
3002+
Oid relid = PG_GETARG_OID(0);
3003+
char const* sql = PG_GETARG_CSTRING(1);
3004+
Oid userid = GetUserId();
3005+
ForeignTable *table = GetForeignTable(relid);
3006+
ForeignServer *server = GetForeignServer(table->serverid);
3007+
UserMapping *user = GetUserMapping(userid, server->serverid);
3008+
PGconn* conn = GetConnection(server, user, false);
3009+
PGresult* res = PQexec(conn, sql);
3010+
PQclear(res);
3011+
ReleaseConnection(conn);
3012+
PG_RETURN_VOID();
3013+
}

0 commit comments

Comments
 (0)