Skip to content

Commit 1477ae0

Browse files
committed
pgindent
1 parent b1a82ba commit 1477ae0

File tree

8 files changed

+118
-773
lines changed

8 files changed

+118
-773
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 105 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ static bool xact_got_connection = false;
6464

6565
typedef long long csn_t;
6666
static csn_t currentGlobalTransactionId = 0;
67-
static int currentLocalTransactionId = 0;
67+
static int currentLocalTransactionId = 0;
6868

6969
/* prototypes of private functions */
7070
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
@@ -367,8 +367,10 @@ do_sql_command(PGconn *conn, const char *sql)
367367
static void
368368
do_sql_send_command(PGconn *conn, const char *sql)
369369
{
370-
if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK) {
371-
PGresult *res = PQgetResult(conn);
370+
if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK)
371+
{
372+
PGresult *res = PQgetResult(conn);
373+
372374
elog(WARNING, "Failed to send command %s", sql);
373375
pgfdw_report_error(ERROR, res, conn, true, sql);
374376
PQclear(res);
@@ -378,8 +380,10 @@ do_sql_send_command(PGconn *conn, const char *sql)
378380
static void
379381
do_sql_wait_command(PGconn *conn, const char *sql)
380382
{
381-
PGresult *res;
382-
while ((res = PQgetResult(conn)) != NULL) {
383+
PGresult *res;
384+
385+
while ((res = PQgetResult(conn)) != NULL)
386+
{
383387
if (PQresultStatus(res) != PGRES_COMMAND_OK)
384388
pgfdw_report_error(ERROR, res, conn, true, sql);
385389
PQclear(res);
@@ -410,9 +414,10 @@ begin_remote_xact(ConnCacheEntry *entry)
410414
elog(DEBUG3, "starting remote transaction on connection %p",
411415
entry->conn);
412416

413-
if (TransactionIdIsValid(gxid)) {
414-
char stmt[64];
415-
PGresult *res;
417+
if (TransactionIdIsValid(gxid))
418+
{
419+
char stmt[64];
420+
PGresult *res;
416421

417422
snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid);
418423
res = PQexec(entry->conn, stmt);
@@ -425,14 +430,15 @@ begin_remote_xact(ConnCacheEntry *entry)
425430
sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
426431
do_sql_command(entry->conn, sql);
427432
entry->xact_depth = 1;
428-
if (UseTsDtmTransactions)
433+
if (UseTsDtmTransactions)
429434
{
430435
if (!currentGlobalTransactionId)
431436
{
432-
PGresult *res = PQexec(entry->conn, psprintf("SELECT public.dtm_extend('%d.%d')",
433-
MyProcPid, ++currentLocalTransactionId));
434-
char* resp;
435-
if (PQresultStatus(res) != PGRES_TUPLES_OK)
437+
PGresult *res = PQexec(entry->conn, psprintf("SELECT public.dtm_extend('%d.%d')",
438+
MyProcPid, ++currentLocalTransactionId));
439+
char *resp;
440+
441+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
436442
{
437443
pgfdw_report_error(ERROR, res, entry->conn, true, sql);
438444
}
@@ -442,13 +448,16 @@ begin_remote_xact(ConnCacheEntry *entry)
442448
pgfdw_report_error(ERROR, res, entry->conn, true, sql);
443449
}
444450
PQclear(res);
445-
} else {
446-
PGresult *res = PQexec(entry->conn, psprintf("SELECT public.dtm_access(%llu, '%d.%d')", currentGlobalTransactionId, MyProcPid, currentLocalTransactionId));
447-
if (PQresultStatus(res) != PGRES_TUPLES_OK)
451+
}
452+
else
453+
{
454+
PGresult *res = PQexec(entry->conn, psprintf("SELECT public.dtm_access(%llu, '%d.%d')", currentGlobalTransactionId, MyProcPid, currentLocalTransactionId));
455+
456+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
448457
{
449458
pgfdw_report_error(ERROR, res, entry->conn, true, sql);
450459
}
451-
PQclear(res);
460+
PQclear(res);
452461
}
453462
}
454463
}
@@ -576,13 +585,14 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
576585
PQclear(res);
577586
}
578587

579-
typedef bool (*DtmCommandResultHandler)(PGresult *result, void* arg);
588+
typedef bool (*DtmCommandResultHandler) (PGresult *result, void *arg);
580589

581-
static bool RunDtmStatement(char const* sql, unsigned expectedStatus, DtmCommandResultHandler handler, void* arg)
590+
static bool
591+
RunDtmStatement(char const * sql, unsigned expectedStatus, DtmCommandResultHandler handler, void *arg)
582592
{
583593
HASH_SEQ_STATUS scan;
584594
ConnCacheEntry *entry;
585-
bool allOk = true;
595+
bool allOk = true;
586596

587597
hash_seq_init(&scan, ConnectionHash);
588598
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
@@ -598,40 +608,47 @@ static bool RunDtmStatement(char const* sql, unsigned expectedStatus, DtmCommand
598608
{
599609
if (entry->xact_depth > 0)
600610
{
601-
PGresult *result = PQgetResult(entry->conn);
611+
PGresult *result = PQgetResult(entry->conn);
612+
602613
if (PQresultStatus(result) != expectedStatus || (handler && !handler(result, arg)))
603614
{
604615
elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus);
605616
pgfdw_report_error(ERROR, result, entry->conn, true, sql);
606617
allOk = false;
607618
}
608619
PQclear(result);
609-
PQgetResult(entry->conn); /* consume NULL result */
620+
PQgetResult(entry->conn); /* consume NULL result */
610621
}
611622
}
612623
return allOk;
613624
}
614625

615-
static bool RunDtmCommand(char const* sql)
626+
static bool
627+
RunDtmCommand(char const * sql)
616628
{
617629
return RunDtmStatement(sql, PGRES_COMMAND_OK, NULL, NULL);
618630
}
619631

620-
static bool RunDtmFunction(char const* sql)
632+
static bool
633+
RunDtmFunction(char const * sql)
621634
{
622635
return RunDtmStatement(sql, PGRES_TUPLES_OK, NULL, NULL);
623636
}
624637

625638

626-
static bool DtmMaxCSN(PGresult *result, void* arg)
639+
static bool
640+
DtmMaxCSN(PGresult *result, void *arg)
627641
{
628-
char* resp = PQgetvalue(result, 0, 0);
629-
csn_t* maxCSN = (csn_t*)arg;
630-
csn_t csn = 0;
642+
char *resp = PQgetvalue(result, 0, 0);
643+
csn_t *maxCSN = (csn_t *) arg;
644+
csn_t csn = 0;
645+
631646
if (resp == NULL || (*resp) == '\0' || sscanf(resp, "%lld", &csn) != 1)
632647
{
633648
return false;
634-
} else {
649+
}
650+
else
651+
{
635652
if (*maxCSN < csn)
636653
{
637654
*maxCSN = csn;
@@ -657,35 +674,36 @@ pgfdw_xact_callback(XactEvent event, void *arg)
657674
{
658675
switch (event)
659676
{
660-
case XACT_EVENT_PARALLEL_PRE_COMMIT:
661-
case XACT_EVENT_PRE_COMMIT:
662-
{
663-
csn_t maxCSN = 0;
664-
665-
if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'",
666-
MyProcPid, currentLocalTransactionId)) ||
667-
!RunDtmFunction(psprintf("SELECT public.dtm_begin_prepare('%d.%d')",
668-
MyProcPid, currentLocalTransactionId)) ||
669-
!RunDtmStatement(psprintf("SELECT public.dtm_prepare('%d.%d',0)",
670-
MyProcPid, currentLocalTransactionId), PGRES_TUPLES_OK, DtmMaxCSN, &maxCSN) ||
671-
!RunDtmFunction(psprintf("SELECT public.dtm_end_prepare('%d.%d',%lld)",
672-
MyProcPid, currentLocalTransactionId, maxCSN)) ||
673-
!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'",
674-
MyProcPid, currentLocalTransactionId)))
675-
{
676-
RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
677-
MyProcPid, currentLocalTransactionId));
678-
ereport(ERROR,
679-
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
680-
errmsg("transaction was aborted at one of the shards")));
681-
break;
682-
}
683-
return;
684-
}
685-
default:
686-
break;
677+
case XACT_EVENT_PARALLEL_PRE_COMMIT:
678+
case XACT_EVENT_PRE_COMMIT:
679+
{
680+
csn_t maxCSN = 0;
681+
682+
if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'",
683+
MyProcPid, currentLocalTransactionId)) ||
684+
!RunDtmFunction(psprintf("SELECT public.dtm_begin_prepare('%d.%d')",
685+
MyProcPid, currentLocalTransactionId)) ||
686+
!RunDtmStatement(psprintf("SELECT public.dtm_prepare('%d.%d',0)",
687+
MyProcPid, currentLocalTransactionId), PGRES_TUPLES_OK, DtmMaxCSN, &maxCSN) ||
688+
!RunDtmFunction(psprintf("SELECT public.dtm_end_prepare('%d.%d',%lld)",
689+
MyProcPid, currentLocalTransactionId, maxCSN)) ||
690+
!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'",
691+
MyProcPid, currentLocalTransactionId)))
692+
{
693+
RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
694+
MyProcPid, currentLocalTransactionId));
695+
ereport(ERROR,
696+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
697+
errmsg("transaction was aborted at one of the shards")));
698+
break;
699+
}
700+
return;
701+
}
702+
default:
703+
break;
687704
}
688705
}
706+
689707
/*
690708
* Scan all connection cache entries to find open remote transactions, and
691709
* close them.
@@ -694,27 +712,28 @@ pgfdw_xact_callback(XactEvent event, void *arg)
694712
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
695713
{
696714
PGresult *res;
697-
715+
698716
/* Ignore cache entry if no open connection right now */
699717
if (entry->conn == NULL)
700718
continue;
701-
719+
702720
/* If it has an open remote transaction, try to close it */
703721
if (entry->xact_depth > 0)
704722
{
705723
elog(DEBUG3, "closing remote transaction on connection %p event %d",
706724
entry->conn, event);
707-
725+
708726
switch (event)
709727
{
710-
case XACT_EVENT_PARALLEL_PRE_COMMIT:
711-
case XACT_EVENT_PRE_COMMIT:
712-
/* Commit all remote transactions during pre-commit */
728+
case XACT_EVENT_PARALLEL_PRE_COMMIT:
729+
case XACT_EVENT_PRE_COMMIT:
730+
/* Commit all remote transactions during pre-commit */
713731
do_sql_send_command(entry->conn, "COMMIT TRANSACTION");
714732
continue;
715-
716-
case XACT_EVENT_PRE_PREPARE:
717-
/*
733+
734+
case XACT_EVENT_PRE_PREPARE:
735+
736+
/*
718737
* We disallow remote transactions that modified anything,
719738
* since it's not very reasonable to hold them open until
720739
* the prepared transaction is committed. For the moment,
@@ -723,18 +742,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
723742
* come right back here with event == XACT_EVENT_ABORT, so
724743
* we'll clean up the connection state at that point.
725744
*/
726-
ereport(ERROR,
745+
ereport(ERROR,
727746
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
728747
errmsg("cannot prepare a transaction that modified remote tables")));
729748
break;
730749

731-
case XACT_EVENT_PARALLEL_COMMIT:
750+
case XACT_EVENT_PARALLEL_COMMIT:
732751
case XACT_EVENT_COMMIT:
733-
case XACT_EVENT_PREPARE:
734-
if (!currentGlobalTransactionId)
752+
case XACT_EVENT_PREPARE:
753+
if (!currentGlobalTransactionId)
735754
{
736755
do_sql_wait_command(entry->conn, "COMMIT TRANSACTION");
737756
}
757+
738758
/*
739759
* If there were any errors in subtransactions, and we
740760
* made prepared statements, do a DEALLOCATE ALL to make
@@ -758,11 +778,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
758778
entry->have_prep_stmt = false;
759779
entry->have_error = false;
760780
break;
761-
762-
case XACT_EVENT_PARALLEL_ABORT:
763-
case XACT_EVENT_ABORT:
764-
/* Assume we might have lost track of prepared statements */
765-
entry->have_error = true;
781+
782+
case XACT_EVENT_PARALLEL_ABORT:
783+
case XACT_EVENT_ABORT:
784+
/* Assume we might have lost track of prepared statements */
785+
entry->have_error = true;
766786
/* If we're aborting, abort all remote transactions too */
767787
res = PQexec(entry->conn, "ABORT TRANSACTION");
768788
/* Note: can't throw ERROR, it would be infinite loop */
@@ -782,11 +802,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
782802
entry->have_error = false;
783803
}
784804
break;
785-
786-
case XACT_EVENT_START:
787-
case XACT_EVENT_ABORT_PREPARED:
788-
case XACT_EVENT_COMMIT_PREPARED:
789-
break;
805+
806+
case XACT_EVENT_START:
807+
case XACT_EVENT_ABORT_PREPARED:
808+
case XACT_EVENT_COMMIT_PREPARED:
809+
break;
790810
}
791811
}
792812
/* Reset state to show we're out of a transaction */
@@ -804,11 +824,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
804824
entry->conn = NULL;
805825
}
806826
}
807-
if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT) {
827+
if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT)
828+
{
808829
/*
809-
* Regardless of the event type, we can now mark ourselves as out of the
810-
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
811-
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
830+
* Regardless of the event type, we can now mark ourselves as out of
831+
* the transaction. (Note: if we are here during PRE_COMMIT or
832+
* PRE_PREPARE, this saves a useless scan of the hashtable during
833+
* COMMIT or PREPARE.)
812834
*/
813835
xact_got_connection = false;
814836

contrib/postgres_fdw/option.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,9 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
133133
}
134134
else if (strcmp(def->defname, "fetch_size") == 0)
135135
{
136-
int fetch_size;
136+
int fetch_size;
137137

138-
fetch_size = strtol(defGetString(def), NULL,10);
138+
fetch_size = strtol(defGetString(def), NULL, 10);
139139
if (fetch_size <= 0)
140140
ereport(ERROR,
141141
(errcode(ERRCODE_SYNTAX_ERROR),

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,8 @@ typedef struct
193193
List *already_used; /* expressions already dealt with */
194194
} ec_member_foreign_arg;
195195

196-
bool UseTsDtmTransactions;
197-
void _PG_init(void);
196+
bool UseTsDtmTransactions;
197+
void _PG_init(void);
198198

199199
/*
200200
* SQL functions
@@ -3222,14 +3222,15 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel)
32223222
Datum
32233223
postgres_fdw_exec(PG_FUNCTION_ARGS)
32243224
{
3225-
Oid relid = PG_GETARG_OID(0);
3226-
char const* sql = PG_GETARG_CSTRING(1);
3225+
Oid relid = PG_GETARG_OID(0);
3226+
char const *sql = PG_GETARG_CSTRING(1);
32273227
Oid userid = GetUserId();
32283228
ForeignTable *table = GetForeignTable(relid);
32293229
ForeignServer *server = GetForeignServer(table->serverid);
32303230
UserMapping *user = GetUserMapping(userid, server->serverid);
3231-
PGconn* conn = GetConnection(server, user, false);
3232-
PGresult* res = PQexec(conn, sql);
3231+
PGconn *conn = GetConnection(server, user, false);
3232+
PGresult *res = PQexec(conn, sql);
3233+
32333234
PQclear(res);
32343235
ReleaseConnection(conn);
32353236
PG_RETURN_VOID();
@@ -3240,7 +3241,6 @@ _PG_init(void)
32403241
{
32413242
DefineCustomBoolVariable("postgres_fdw.use_tsdtm",
32423243
"Use timestamp base distributed transaction manager for FDW connections", NULL,
3243-
&UseTsDtmTransactions, false, PGC_USERSET, 0, NULL,
3244+
&UseTsDtmTransactions, false, PGC_USERSET, 0, NULL,
32443245
NULL, NULL);
32453246
}
3246-

0 commit comments

Comments
 (0)