Skip to content

Commit 9ea3b6f

Browse files
Expand usage of macros for protocol characters.
This commit makes use of the existing PqMsg_* macros in more places and adds new PqReplMsg_* and PqBackupMsg_* macros for use in special replication and backup messages, respectively. Author: Dave Cramer <davecramer@gmail.com> Co-authored-by: Fabrízio de Royes Mello <fabriziomello@gmail.com> Reviewed-by: Jacob Champion <jacob.champion@enterprisedb.com> Reviewed-by: Álvaro Herrera <alvherre@kurilemu.de> Reviewed-by: Euler Taveira <euler@eulerto.com> Discussion: https://postgr.es/m/aIECfYfevCUpenBT@nathan Discussion: https://postgr.es/m/CAFcNs%2Br73NOUb7%2BqKrV4HHEki02CS96Z%2Bx19WaFgE087BWwEng%40mail.gmail.com
1 parent 35baa60 commit 9ea3b6f

File tree

9 files changed

+70
-43
lines changed

9 files changed

+70
-43
lines changed

src/backend/backup/basebackup_copy.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ bbsink_copystream_begin_backup(bbsink *sink)
143143
buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
144144
mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
145145
mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
146-
mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
146+
mysink->msgbuffer[0] = PqMsg_CopyData; /* archive or manifest data */
147147

148148
/* Tell client the backup start location. */
149149
SendXlogRecPtrResult(state->startptr, state->starttli);
@@ -170,7 +170,7 @@ bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
170170

171171
ti = list_nth(state->tablespaces, state->tablespace_num);
172172
pq_beginmessage(&buf, PqMsg_CopyData);
173-
pq_sendbyte(&buf, 'n'); /* New archive */
173+
pq_sendbyte(&buf, PqBackupMsg_NewArchive);
174174
pq_sendstring(&buf, archive_name);
175175
pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
176176
pq_endmessage(&buf);
@@ -191,7 +191,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
191191
if (mysink->send_to_client)
192192
{
193193
/* Add one because we're also sending a leading type byte. */
194-
pq_putmessage('d', mysink->msgbuffer, len + 1);
194+
pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
195195
}
196196

197197
/* Consider whether to send a progress report to the client. */
@@ -221,7 +221,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
221221
mysink->last_progress_report_time = now;
222222

223223
pq_beginmessage(&buf, PqMsg_CopyData);
224-
pq_sendbyte(&buf, 'p'); /* Progress report */
224+
pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
225225
pq_sendint64(&buf, state->bytes_done);
226226
pq_endmessage(&buf);
227227
pq_flush_if_writable();
@@ -247,7 +247,7 @@ bbsink_copystream_end_archive(bbsink *sink)
247247
mysink->bytes_done_at_last_time_check = state->bytes_done;
248248
mysink->last_progress_report_time = GetCurrentTimestamp();
249249
pq_beginmessage(&buf, PqMsg_CopyData);
250-
pq_sendbyte(&buf, 'p'); /* Progress report */
250+
pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
251251
pq_sendint64(&buf, state->bytes_done);
252252
pq_endmessage(&buf);
253253
pq_flush_if_writable();
@@ -262,7 +262,7 @@ bbsink_copystream_begin_manifest(bbsink *sink)
262262
StringInfoData buf;
263263

264264
pq_beginmessage(&buf, PqMsg_CopyData);
265-
pq_sendbyte(&buf, 'm'); /* Manifest */
265+
pq_sendbyte(&buf, PqBackupMsg_Manifest);
266266
pq_endmessage(&buf);
267267
}
268268

@@ -277,7 +277,7 @@ bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
277277
if (mysink->send_to_client)
278278
{
279279
/* Add one because we're also sending a leading type byte. */
280-
pq_putmessage('d', mysink->msgbuffer, len + 1);
280+
pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
281281
}
282282
}
283283

src/backend/replication/logical/applyparallelworker.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -778,10 +778,10 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
778778

779779
/*
780780
* The first byte of messages sent from leader apply worker to
781-
* parallel apply workers can only be 'w'.
781+
* parallel apply workers can only be PqReplMsg_WALData.
782782
*/
783783
c = pq_getmsgbyte(&s);
784-
if (c != 'w')
784+
if (c != PqReplMsg_WALData)
785785
elog(ERROR, "unexpected message \"%c\"", c);
786786

787787
/*

src/backend/replication/logical/worker.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3994,7 +3994,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
39943994

39953995
c = pq_getmsgbyte(&s);
39963996

3997-
if (c == 'w')
3997+
if (c == PqReplMsg_WALData)
39983998
{
39993999
XLogRecPtr start_lsn;
40004000
XLogRecPtr end_lsn;
@@ -4016,7 +4016,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
40164016

40174017
maybe_advance_nonremovable_xid(&rdt_data, false);
40184018
}
4019-
else if (c == 'k')
4019+
else if (c == PqReplMsg_Keepalive)
40204020
{
40214021
XLogRecPtr end_lsn;
40224022
TimestampTz timestamp;
@@ -4035,7 +4035,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
40354035

40364036
UpdateWorkerStats(last_received, timestamp, true);
40374037
}
4038-
else if (c == 's') /* Primary status update */
4038+
else if (c == PqReplMsg_PrimaryStatusUpdate)
40394039
{
40404040
rdt_data.remote_lsn = pq_getmsgint64(&s);
40414041
rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
@@ -4267,7 +4267,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
42674267
else
42684268
resetStringInfo(reply_message);
42694269

4270-
pq_sendbyte(reply_message, 'r');
4270+
pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
42714271
pq_sendint64(reply_message, recvpos); /* write */
42724272
pq_sendint64(reply_message, flushpos); /* flush */
42734273
pq_sendint64(reply_message, writepos); /* apply */
@@ -4438,7 +4438,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data)
44384438
* Send the current time to update the remote walsender's latest reply
44394439
* message received time.
44404440
*/
4441-
pq_sendbyte(request_message, 'p');
4441+
pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
44424442
pq_sendint64(request_message, GetCurrentTimestamp());
44434443

44444444
elog(DEBUG2, "sending publisher status request message");

src/backend/replication/walreceiver.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
826826

827827
switch (type)
828828
{
829-
case 'w': /* WAL records */
829+
case PqReplMsg_WALData:
830830
{
831831
StringInfoData incoming_message;
832832

@@ -850,7 +850,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
850850
XLogWalRcvWrite(buf, len, dataStart, tli);
851851
break;
852852
}
853-
case 'k': /* Keepalive */
853+
case PqReplMsg_Keepalive:
854854
{
855855
StringInfoData incoming_message;
856856

@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
11301130
applyPtr = GetXLogReplayRecPtr(NULL);
11311131

11321132
resetStringInfo(&reply_message);
1133-
pq_sendbyte(&reply_message, 'r');
1133+
pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate);
11341134
pq_sendint64(&reply_message, writePtr);
11351135
pq_sendint64(&reply_message, flushPtr);
11361136
pq_sendint64(&reply_message, applyPtr);
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
12341234

12351235
/* Construct the message and send it. */
12361236
resetStringInfo(&reply_message);
1237-
pq_sendbyte(&reply_message, 'h');
1237+
pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback);
12381238
pq_sendint64(&reply_message, GetCurrentTimestamp());
12391239
pq_sendint32(&reply_message, xmin);
12401240
pq_sendint32(&reply_message, xmin_epoch);

src/backend/replication/walsender.c

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
15341534

15351535
resetStringInfo(ctx->out);
15361536

1537-
pq_sendbyte(ctx->out, 'w');
1537+
pq_sendbyte(ctx->out, PqReplMsg_WALData);
15381538
pq_sendint64(ctx->out, lsn); /* dataStart */
15391539
pq_sendint64(ctx->out, lsn); /* walEnd */
15401540

@@ -2292,16 +2292,18 @@ ProcessRepliesIfAny(void)
22922292
switch (firstchar)
22932293
{
22942294
/*
2295-
* 'd' means a standby reply wrapped in a CopyData packet.
2295+
* PqMsg_CopyData means a standby reply wrapped in a CopyData
2296+
* packet.
22962297
*/
22972298
case PqMsg_CopyData:
22982299
ProcessStandbyMessage();
22992300
received = true;
23002301
break;
23012302

23022303
/*
2303-
* CopyDone means the standby requested to finish streaming.
2304-
* Reply with CopyDone, if we had not sent that already.
2304+
* PqMsg_CopyDone means the standby requested to finish
2305+
* streaming. Reply with CopyDone, if we had not sent that
2306+
* already.
23052307
*/
23062308
case PqMsg_CopyDone:
23072309
if (!streamingDoneSending)
@@ -2315,7 +2317,8 @@ ProcessRepliesIfAny(void)
23152317
break;
23162318

23172319
/*
2318-
* 'X' means that the standby is closing down the socket.
2320+
* PqMsg_Terminate means that the standby is closing down the
2321+
* socket.
23192322
*/
23202323
case PqMsg_Terminate:
23212324
proc_exit(0);
@@ -2350,15 +2353,15 @@ ProcessStandbyMessage(void)
23502353

23512354
switch (msgtype)
23522355
{
2353-
case 'r':
2356+
case PqReplMsg_StandbyStatusUpdate:
23542357
ProcessStandbyReplyMessage();
23552358
break;
23562359

2357-
case 'h':
2360+
case PqReplMsg_HotStandbyFeedback:
23582361
ProcessStandbyHSFeedbackMessage();
23592362
break;
23602363

2361-
case 'p':
2364+
case PqReplMsg_PrimaryStatusRequest:
23622365
ProcessStandbyPSRequestMessage();
23632366
break;
23642367

@@ -2752,7 +2755,7 @@ ProcessStandbyPSRequestMessage(void)
27522755

27532756
/* construct the message... */
27542757
resetStringInfo(&output_message);
2755-
pq_sendbyte(&output_message, 's');
2758+
pq_sendbyte(&output_message, PqReplMsg_PrimaryStatusUpdate);
27562759
pq_sendint64(&output_message, lsn);
27572760
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
27582761
pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
@@ -3364,7 +3367,7 @@ XLogSendPhysical(void)
33643367
* OK to read and send the slice.
33653368
*/
33663369
resetStringInfo(&output_message);
3367-
pq_sendbyte(&output_message, 'w');
3370+
pq_sendbyte(&output_message, PqReplMsg_WALData);
33683371

33693372
pq_sendint64(&output_message, startptr); /* dataStart */
33703373
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
@@ -4135,7 +4138,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
41354138

41364139
/* construct the message... */
41374140
resetStringInfo(&output_message);
4138-
pq_sendbyte(&output_message, 'k');
4141+
pq_sendbyte(&output_message, PqReplMsg_Keepalive);
41394142
pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
41404143
pq_sendint64(&output_message, GetCurrentTimestamp());
41414144
pq_sendbyte(&output_message, requestReply ? 1 : 0);

src/bin/pg_basebackup/pg_basebackup.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "fe_utils/option_utils.h"
3636
#include "fe_utils/recovery_gen.h"
3737
#include "getopt_long.h"
38+
#include "libpq/protocol.h"
3839
#include "receivelog.h"
3940
#include "streamutil.h"
4041

@@ -1338,7 +1339,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
13381339
/* Each CopyData message begins with a type byte. */
13391340
switch (GetCopyDataByte(r, copybuf, &cursor))
13401341
{
1341-
case 'n':
1342+
case PqBackupMsg_NewArchive:
13421343
{
13431344
/* New archive. */
13441345
char *archive_name;
@@ -1410,7 +1411,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
14101411
break;
14111412
}
14121413

1413-
case 'd':
1414+
case PqMsg_CopyData:
14141415
{
14151416
/* Archive or manifest data. */
14161417
if (state->manifest_buffer != NULL)
@@ -1446,7 +1447,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
14461447
break;
14471448
}
14481449

1449-
case 'p':
1450+
case PqBackupMsg_ProgressReport:
14501451
{
14511452
/*
14521453
* Progress report.
@@ -1465,7 +1466,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
14651466
break;
14661467
}
14671468

1468-
case 'm':
1469+
case PqBackupMsg_Manifest:
14691470
{
14701471
/*
14711472
* Manifest data will be sent next. This message is not

src/bin/pg_basebackup/pg_recvlogical.c

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "getopt_long.h"
2525
#include "libpq-fe.h"
2626
#include "libpq/pqsignal.h"
27+
#include "libpq/protocol.h"
2728
#include "pqexpbuffer.h"
2829
#include "streamutil.h"
2930

@@ -149,7 +150,7 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
149150
LSN_FORMAT_ARGS(output_fsync_lsn),
150151
replication_slot);
151152

152-
replybuf[len] = 'r';
153+
replybuf[len] = PqReplMsg_StandbyStatusUpdate;
153154
len += 1;
154155
fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
155156
len += 8;
@@ -454,7 +455,7 @@ StreamLogicalLog(void)
454455
}
455456

456457
/* Check the message type. */
457-
if (copybuf[0] == 'k')
458+
if (copybuf[0] == PqReplMsg_Keepalive)
458459
{
459460
int pos;
460461
bool replyRequested;
@@ -466,7 +467,7 @@ StreamLogicalLog(void)
466467
* We just check if the server requested a reply, and ignore the
467468
* rest.
468469
*/
469-
pos = 1; /* skip msgtype 'k' */
470+
pos = 1; /* skip msgtype PqReplMsg_Keepalive */
470471
walEnd = fe_recvint64(&copybuf[pos]);
471472
output_written_lsn = Max(walEnd, output_written_lsn);
472473

@@ -509,7 +510,7 @@ StreamLogicalLog(void)
509510

510511
continue;
511512
}
512-
else if (copybuf[0] != 'w')
513+
else if (copybuf[0] != PqReplMsg_WALData)
513514
{
514515
pg_log_error("unrecognized streaming header: \"%c\"",
515516
copybuf[0]);
@@ -521,7 +522,7 @@ StreamLogicalLog(void)
521522
* message. We only need the WAL location field (dataStart), the rest
522523
* of the header is ignored.
523524
*/
524-
hdr_len = 1; /* msgtype 'w' */
525+
hdr_len = 1; /* msgtype PqReplMsg_WALData */
525526
hdr_len += 8; /* dataStart */
526527
hdr_len += 8; /* walEnd */
527528
hdr_len += 8; /* sendTime */

src/bin/pg_basebackup/receivelog.c

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "access/xlog_internal.h"
2222
#include "common/logging.h"
2323
#include "libpq-fe.h"
24+
#include "libpq/protocol.h"
2425
#include "receivelog.h"
2526
#include "streamutil.h"
2627

@@ -338,7 +339,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque
338339
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
339340
int len = 0;
340341

341-
replybuf[len] = 'r';
342+
replybuf[len] = PqReplMsg_StandbyStatusUpdate;
342343
len += 1;
343344
fe_sendint64(blockpos, &replybuf[len]); /* write */
344345
len += 8;
@@ -823,13 +824,13 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
823824
}
824825

825826
/* Check the message type. */
826-
if (copybuf[0] == 'k')
827+
if (copybuf[0] == PqReplMsg_Keepalive)
827828
{
828829
if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
829830
&last_status))
830831
goto error;
831832
}
832-
else if (copybuf[0] == 'w')
833+
else if (copybuf[0] == PqReplMsg_WALData)
833834
{
834835
if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos))
835836
goto error;
@@ -1001,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
10011002
* Parse the keepalive message, enclosed in the CopyData message. We just
10021003
* check if the server requested a reply, and ignore the rest.
10031004
*/
1004-
pos = 1; /* skip msgtype 'k' */
1005+
pos = 1; /* skip msgtype PqReplMsg_Keepalive */
10051006
pos += 8; /* skip walEnd */
10061007
pos += 8; /* skip sendTime */
10071008

@@ -1064,7 +1065,7 @@ ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
10641065
* message. We only need the WAL location field (dataStart), the rest of
10651066
* the header is ignored.
10661067
*/
1067-
hdr_len = 1; /* msgtype 'w' */
1068+
hdr_len = 1; /* msgtype PqReplMsg_WALData */
10681069
hdr_len += 8; /* dataStart */
10691070
hdr_len += 8; /* walEnd */
10701071
hdr_len += 8; /* sendTime */

0 commit comments

Comments
 (0)