Skip to content

Commit 6b787d9

Browse files
committed
Improve SQLSTATE reporting in some replication-related code.
I started out with the goal of reporting ERRCODE_CONNECTION_FAILURE when walrcv_connect() fails, but as I looked around I realized that whoever wrote this code was of the opinion that errcodes are purely optional. That's not my understanding of our project policy. Hence, make sure that an errcode is provided in each ereport that (a) is ERROR or higher level and (b) isn't arguably an internal logic error. Also fix some very dubious existing errcode assignments. While this is not per policy, it's also largely cosmetic, since few of these cases could get reported to applications. So I don't feel a need to back-patch. Discussion: https://postgr.es/m/2189704.1623512522@sss.pgh.pa.us
1 parent d0303bc commit 6b787d9

File tree

5 files changed

+88
-47
lines changed

5 files changed

+88
-47
lines changed

src/backend/commands/subscriptioncmds.c

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
468468
wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
469469
if (!wrconn)
470470
ereport(ERROR,
471-
(errmsg("could not connect to the publisher: %s", err)));
471+
(errcode(ERRCODE_CONNECTION_FAILURE),
472+
errmsg("could not connect to the publisher: %s", err)));
472473

473474
PG_TRY();
474475
{
@@ -565,7 +566,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
565566
wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
566567
if (!wrconn)
567568
ereport(ERROR,
568-
(errmsg("could not connect to the publisher: %s", err)));
569+
(errcode(ERRCODE_CONNECTION_FAILURE),
570+
errmsg("could not connect to the publisher: %s", err)));
569571

570572
PG_TRY();
571573
{
@@ -820,7 +822,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
820822
{
821823
if (sub->enabled && !slotname)
822824
ereport(ERROR,
823-
(errcode(ERRCODE_SYNTAX_ERROR),
825+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
824826
errmsg("cannot set %s for enabled subscription",
825827
"slot_name = NONE")));
826828

@@ -876,7 +878,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
876878

877879
if (!sub->slotname && enabled)
878880
ereport(ERROR,
879-
(errcode(ERRCODE_SYNTAX_ERROR),
881+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
880882
errmsg("cannot enable subscription that does not have a slot name")));
881883

882884
values[Anum_pg_subscription_subenabled - 1] =
@@ -928,7 +930,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
928930
{
929931
if (!sub->enabled)
930932
ereport(ERROR,
931-
(errcode(ERRCODE_SYNTAX_ERROR),
933+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
932934
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
933935
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
934936

@@ -976,7 +978,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
976978
{
977979
if (!sub->enabled)
978980
ereport(ERROR,
979-
(errcode(ERRCODE_SYNTAX_ERROR),
981+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
980982
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
981983
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
982984

@@ -997,7 +999,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
997999

9981000
if (!sub->enabled)
9991001
ereport(ERROR,
1000-
(errcode(ERRCODE_SYNTAX_ERROR),
1002+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
10011003
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
10021004

10031005
parse_subscription_options(stmt->options,
@@ -1354,7 +1356,8 @@ ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missi
13541356
{
13551357
/* ERROR. */
13561358
ereport(ERROR,
1357-
(errmsg("could not drop replication slot \"%s\" on publisher: %s",
1359+
(errcode(ERRCODE_CONNECTION_FAILURE),
1360+
errmsg("could not drop replication slot \"%s\" on publisher: %s",
13581361
slotname, res->err)));
13591362
}
13601363

@@ -1505,7 +1508,8 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
15051508

15061509
if (res->status != WALRCV_OK_TUPLES)
15071510
ereport(ERROR,
1508-
(errmsg("could not receive list of replicated tables from the publisher: %s",
1511+
(errcode(ERRCODE_CONNECTION_FAILURE),
1512+
errmsg("could not receive list of replicated tables from the publisher: %s",
15091513
res->err)));
15101514

15111515
/* Process tables. */
@@ -1569,7 +1573,8 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
15691573
}
15701574

15711575
ereport(ERROR,
1572-
(errmsg("could not connect to publisher when attempting to "
1576+
(errcode(ERRCODE_CONNECTION_FAILURE),
1577+
errmsg("could not connect to publisher when attempting to "
15731578
"drop replication slot \"%s\": %s", slotname, err),
15741579
/* translator: %s is an SQL ALTER command */
15751580
errhint("Use %s to disassociate the subscription from the slot.",
@@ -1601,7 +1606,7 @@ check_duplicates_in_publist(List *publist, Datum *datums)
16011606

16021607
if (strcmp(name, pname) == 0)
16031608
ereport(ERROR,
1604-
(errcode(ERRCODE_SYNTAX_ERROR),
1609+
(errcode(ERRCODE_DUPLICATE_OBJECT),
16051610
errmsg("publication name \"%s\" used more than once",
16061611
pname)));
16071612
}
@@ -1659,7 +1664,7 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *
16591664
oldpublist = lappend(oldpublist, makeString(name));
16601665
else if (!addpub && !found)
16611666
ereport(ERROR,
1662-
(errcode(ERRCODE_SYNTAX_ERROR),
1667+
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
16631668
errmsg("publication \"%s\" is not in subscription \"%s\"",
16641669
name, subname)));
16651670
}

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,8 @@ libpqrcv_get_conninfo(WalReceiverConn *conn)
278278

279279
if (conn_opts == NULL)
280280
ereport(ERROR,
281-
(errmsg("could not parse connection string: %s",
281+
(errcode(ERRCODE_OUT_OF_MEMORY),
282+
errmsg("could not parse connection string: %s",
282283
_("out of memory"))));
283284

284285
/* build a clean connection string from pieces */
@@ -350,7 +351,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
350351
{
351352
PQclear(res);
352353
ereport(ERROR,
353-
(errmsg("could not receive database system identifier and timeline ID from "
354+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
355+
errmsg("could not receive database system identifier and timeline ID from "
354356
"the primary server: %s",
355357
pchomp(PQerrorMessage(conn->streamConn)))));
356358
}
@@ -361,7 +363,8 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
361363

362364
PQclear(res);
363365
ereport(ERROR,
364-
(errmsg("invalid response from primary server"),
366+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
367+
errmsg("invalid response from primary server"),
365368
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
366369
ntuples, nfields, 3, 1)));
367370
}
@@ -437,13 +440,15 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
437440
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
438441
if (!pubnames_str)
439442
ereport(ERROR,
440-
(errmsg("could not start WAL streaming: %s",
443+
(errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
444+
errmsg("could not start WAL streaming: %s",
441445
pchomp(PQerrorMessage(conn->streamConn)))));
442446
pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
443447
strlen(pubnames_str));
444448
if (!pubnames_literal)
445449
ereport(ERROR,
446-
(errmsg("could not start WAL streaming: %s",
450+
(errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
451+
errmsg("could not start WAL streaming: %s",
447452
pchomp(PQerrorMessage(conn->streamConn)))));
448453
appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
449454
PQfreemem(pubnames_literal);
@@ -472,7 +477,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
472477
{
473478
PQclear(res);
474479
ereport(ERROR,
475-
(errmsg("could not start WAL streaming: %s",
480+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
481+
errmsg("could not start WAL streaming: %s",
476482
pchomp(PQerrorMessage(conn->streamConn)))));
477483
}
478484
PQclear(res);
@@ -495,7 +501,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
495501
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
496502
PQflush(conn->streamConn))
497503
ereport(ERROR,
498-
(errmsg("could not send end-of-streaming message to primary: %s",
504+
(errcode(ERRCODE_CONNECTION_FAILURE),
505+
errmsg("could not send end-of-streaming message to primary: %s",
499506
pchomp(PQerrorMessage(conn->streamConn)))));
500507

501508
*next_tli = 0;
@@ -517,7 +524,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
517524
*/
518525
if (PQnfields(res) < 2 || PQntuples(res) != 1)
519526
ereport(ERROR,
520-
(errmsg("unexpected result set after end-of-streaming")));
527+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
528+
errmsg("unexpected result set after end-of-streaming")));
521529
*next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
522530
PQclear(res);
523531

@@ -531,7 +539,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
531539
/* End the copy */
532540
if (PQendcopy(conn->streamConn))
533541
ereport(ERROR,
534-
(errmsg("error while shutting down streaming COPY: %s",
542+
(errcode(ERRCODE_CONNECTION_FAILURE),
543+
errmsg("error while shutting down streaming COPY: %s",
535544
pchomp(PQerrorMessage(conn->streamConn)))));
536545

537546
/* CommandComplete should follow */
@@ -540,15 +549,17 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
540549

541550
if (PQresultStatus(res) != PGRES_COMMAND_OK)
542551
ereport(ERROR,
543-
(errmsg("error reading result of streaming command: %s",
552+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
553+
errmsg("error reading result of streaming command: %s",
544554
pchomp(PQerrorMessage(conn->streamConn)))));
545555
PQclear(res);
546556

547557
/* Verify that there are no more results */
548558
res = libpqrcv_PQgetResult(conn->streamConn);
549559
if (res != NULL)
550560
ereport(ERROR,
551-
(errmsg("unexpected result after CommandComplete: %s",
561+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
562+
errmsg("unexpected result after CommandComplete: %s",
552563
pchomp(PQerrorMessage(conn->streamConn)))));
553564
}
554565

@@ -574,7 +585,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
574585
{
575586
PQclear(res);
576587
ereport(ERROR,
577-
(errmsg("could not receive timeline history file from "
588+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
589+
errmsg("could not receive timeline history file from "
578590
"the primary server: %s",
579591
pchomp(PQerrorMessage(conn->streamConn)))));
580592
}
@@ -585,7 +597,8 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
585597

586598
PQclear(res);
587599
ereport(ERROR,
588-
(errmsg("invalid response from primary server"),
600+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
601+
errmsg("invalid response from primary server"),
589602
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
590603
ntuples, nfields)));
591604
}
@@ -746,7 +759,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
746759
/* Try consuming some data. */
747760
if (PQconsumeInput(conn->streamConn) == 0)
748761
ereport(ERROR,
749-
(errmsg("could not receive data from WAL stream: %s",
762+
(errcode(ERRCODE_CONNECTION_FAILURE),
763+
errmsg("could not receive data from WAL stream: %s",
750764
pchomp(PQerrorMessage(conn->streamConn)))));
751765

752766
/* Now that we've consumed some input, try again */
@@ -782,7 +796,8 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
782796
return -1;
783797

784798
ereport(ERROR,
785-
(errmsg("unexpected result after CommandComplete: %s",
799+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
800+
errmsg("unexpected result after CommandComplete: %s",
786801
PQerrorMessage(conn->streamConn))));
787802
}
788803

@@ -797,13 +812,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
797812
{
798813
PQclear(res);
799814
ereport(ERROR,
800-
(errmsg("could not receive data from WAL stream: %s",
815+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
816+
errmsg("could not receive data from WAL stream: %s",
801817
pchomp(PQerrorMessage(conn->streamConn)))));
802818
}
803819
}
804820
if (rawlen < -1)
805821
ereport(ERROR,
806-
(errmsg("could not receive data from WAL stream: %s",
822+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
823+
errmsg("could not receive data from WAL stream: %s",
807824
pchomp(PQerrorMessage(conn->streamConn)))));
808825

809826
/* Return received messages to caller */
@@ -822,7 +839,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
822839
if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
823840
PQflush(conn->streamConn))
824841
ereport(ERROR,
825-
(errmsg("could not send data to WAL stream: %s",
842+
(errcode(ERRCODE_CONNECTION_FAILURE),
843+
errmsg("could not send data to WAL stream: %s",
826844
pchomp(PQerrorMessage(conn->streamConn)))));
827845
}
828846

@@ -875,7 +893,8 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
875893
{
876894
PQclear(res);
877895
ereport(ERROR,
878-
(errmsg("could not create replication slot \"%s\": %s",
896+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
897+
errmsg("could not create replication slot \"%s\": %s",
879898
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
880899
}
881900

@@ -920,7 +939,8 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
920939
/* Make sure we got expected number of fields. */
921940
if (nfields != nRetTypes)
922941
ereport(ERROR,
923-
(errmsg("invalid query response"),
942+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
943+
errmsg("invalid query response"),
924944
errdetail("Expected %d fields, got %d fields.",
925945
nRetTypes, nfields)));
926946

src/backend/replication/logical/tablesync.c

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -723,13 +723,15 @@ fetch_remote_table_info(char *nspname, char *relname,
723723

724724
if (res->status != WALRCV_OK_TUPLES)
725725
ereport(ERROR,
726-
(errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
726+
(errcode(ERRCODE_CONNECTION_FAILURE),
727+
errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
727728
nspname, relname, res->err)));
728729

729730
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
730731
if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
731732
ereport(ERROR,
732-
(errmsg("table \"%s.%s\" not found on publisher",
733+
(errcode(ERRCODE_UNDEFINED_OBJECT),
734+
errmsg("table \"%s.%s\" not found on publisher",
733735
nspname, relname)));
734736

735737
lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
@@ -764,7 +766,8 @@ fetch_remote_table_info(char *nspname, char *relname,
764766

765767
if (res->status != WALRCV_OK_TUPLES)
766768
ereport(ERROR,
767-
(errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
769+
(errcode(ERRCODE_CONNECTION_FAILURE),
770+
errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
768771
nspname, relname, res->err)));
769772

770773
/* We don't know the number of rows coming, so allocate enough space. */
@@ -851,7 +854,8 @@ copy_table(Relation rel)
851854
pfree(cmd.data);
852855
if (res->status != WALRCV_OK_COPY_OUT)
853856
ereport(ERROR,
854-
(errmsg("could not start initial contents copy for table \"%s.%s\": %s",
857+
(errcode(ERRCODE_CONNECTION_FAILURE),
858+
errmsg("could not start initial contents copy for table \"%s.%s\": %s",
855859
lrel.nspname, lrel.relname, res->err)));
856860
walrcv_clear_result(res);
857861

@@ -967,7 +971,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
967971
walrcv_connect(MySubscription->conninfo, true, slotname, &err);
968972
if (LogRepWorkerWalRcvConn == NULL)
969973
ereport(ERROR,
970-
(errmsg("could not connect to the publisher: %s", err)));
974+
(errcode(ERRCODE_CONNECTION_FAILURE),
975+
errmsg("could not connect to the publisher: %s", err)));
971976

972977
Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
973978
MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
@@ -1050,7 +1055,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
10501055
0, NULL);
10511056
if (res->status != WALRCV_OK_COMMAND)
10521057
ereport(ERROR,
1053-
(errmsg("table copy could not start transaction on publisher: %s",
1058+
(errcode(ERRCODE_CONNECTION_FAILURE),
1059+
errmsg("table copy could not start transaction on publisher: %s",
10541060
res->err)));
10551061
walrcv_clear_result(res);
10561062

@@ -1110,7 +1116,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
11101116
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
11111117
if (res->status != WALRCV_OK_COMMAND)
11121118
ereport(ERROR,
1113-
(errmsg("table copy could not finish transaction on publisher: %s",
1119+
(errcode(ERRCODE_CONNECTION_FAILURE),
1120+
errmsg("table copy could not finish transaction on publisher: %s",
11141121
res->err)));
11151122
walrcv_clear_result(res);
11161123

src/backend/replication/logical/worker.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2388,7 +2388,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
23882388

23892389
if (now >= timeout)
23902390
ereport(ERROR,
2391-
(errmsg("terminating logical replication worker due to timeout")));
2391+
(errcode(ERRCODE_CONNECTION_FAILURE),
2392+
errmsg("terminating logical replication worker due to timeout")));
23922393

23932394
/* Check to see if it's time for a ping. */
23942395
if (!ping_sent)
@@ -3207,7 +3208,8 @@ ApplyWorkerMain(Datum main_arg)
32073208
MySubscription->name, &err);
32083209
if (LogRepWorkerWalRcvConn == NULL)
32093210
ereport(ERROR,
3210-
(errmsg("could not connect to the publisher: %s", err)));
3211+
(errcode(ERRCODE_CONNECTION_FAILURE),
3212+
errmsg("could not connect to the publisher: %s", err)));
32113213

32123214
/*
32133215
* We don't really use the output identify_system for anything but it

0 commit comments

Comments
 (0)