Skip to content

Commit 50e570a

Browse files
committed
Add WL_SOCKET_CLOSED for socket shutdown events.
Provide a way for WaitEventSet to report that the remote peer has shut down its socket, independently of whether there is any buffered data remaining to be read. This works only on systems where the kernel exposes that information, namely: * WAIT_USE_POLL builds using POLLRDHUP, if available * WAIT_USE_EPOLL builds using EPOLLRDHUP * WAIT_USE_KQUEUE builds using EV_EOF Reviewed-by: Zhihong Yu <zyu@yugabyte.com> Reviewed-by: Maksim Milyutin <milyutinma@gmail.com> Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru
1 parent 5e01001 commit 50e570a

File tree

2 files changed

+74
-11
lines changed

2 files changed

+74
-11
lines changed

src/backend/storage/ipc/latch.c

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,7 @@ FreeWaitEventSet(WaitEventSet *set)
840840
* - WL_SOCKET_CONNECTED: Wait for socket connection to be established,
841841
* can be combined with other WL_SOCKET_* events (on non-Windows
842842
* platforms, this is the same as WL_SOCKET_WRITEABLE)
843+
* - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
843844
* - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
844845
*
845846
* Returns the offset in WaitEventSet->events (starting from 0), which can be
@@ -1042,12 +1043,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
10421043
else
10431044
{
10441045
Assert(event->fd != PGINVALID_SOCKET);
1045-
Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
1046+
Assert(event->events & (WL_SOCKET_READABLE |
1047+
WL_SOCKET_WRITEABLE |
1048+
WL_SOCKET_CLOSED));
10461049

10471050
if (event->events & WL_SOCKET_READABLE)
10481051
epoll_ev.events |= EPOLLIN;
10491052
if (event->events & WL_SOCKET_WRITEABLE)
10501053
epoll_ev.events |= EPOLLOUT;
1054+
if (event->events & WL_SOCKET_CLOSED)
1055+
epoll_ev.events |= EPOLLRDHUP;
10511056
}
10521057

10531058
/*
@@ -1086,12 +1091,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
10861091
}
10871092
else
10881093
{
1089-
Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
1094+
Assert(event->events & (WL_SOCKET_READABLE |
1095+
WL_SOCKET_WRITEABLE |
1096+
WL_SOCKET_CLOSED));
10901097
pollfd->events = 0;
10911098
if (event->events & WL_SOCKET_READABLE)
10921099
pollfd->events |= POLLIN;
10931100
if (event->events & WL_SOCKET_WRITEABLE)
10941101
pollfd->events |= POLLOUT;
1102+
#ifdef POLLRDHUP
1103+
if (event->events & WL_SOCKET_CLOSED)
1104+
pollfd->events |= POLLRDHUP;
1105+
#endif
10951106
}
10961107

10971108
Assert(event->fd != PGINVALID_SOCKET);
@@ -1164,7 +1175,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
11641175
Assert(event->events != WL_LATCH_SET || set->latch != NULL);
11651176
Assert(event->events == WL_LATCH_SET ||
11661177
event->events == WL_POSTMASTER_DEATH ||
1167-
(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
1178+
(event->events & (WL_SOCKET_READABLE |
1179+
WL_SOCKET_WRITEABLE |
1180+
WL_SOCKET_CLOSED)));
11681181

11691182
if (event->events == WL_POSTMASTER_DEATH)
11701183
{
@@ -1187,9 +1200,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
11871200
* old event mask to the new event mask, since kevent treats readable
11881201
* and writable as separate events.
11891202
*/
1190-
if (old_events & WL_SOCKET_READABLE)
1203+
if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
11911204
old_filt_read = true;
1192-
if (event->events & WL_SOCKET_READABLE)
1205+
if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
11931206
new_filt_read = true;
11941207
if (old_events & WL_SOCKET_WRITEABLE)
11951208
old_filt_write = true;
@@ -1209,7 +1222,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
12091222
event);
12101223
}
12111224

1212-
Assert(count > 0);
1225+
/* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */
1226+
if (count == 0)
1227+
return;
1228+
12131229
Assert(count <= 2);
12141230

12151231
rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
@@ -1524,7 +1540,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
15241540
returned_events++;
15251541
}
15261542
}
1527-
else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
1543+
else if (cur_event->events & (WL_SOCKET_READABLE |
1544+
WL_SOCKET_WRITEABLE |
1545+
WL_SOCKET_CLOSED))
15281546
{
15291547
Assert(cur_event->fd != PGINVALID_SOCKET);
15301548

@@ -1542,6 +1560,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
15421560
occurred_events->events |= WL_SOCKET_WRITEABLE;
15431561
}
15441562

1563+
if ((cur_event->events & WL_SOCKET_CLOSED) &&
1564+
(cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)))
1565+
{
1566+
/* remote peer shut down, or error */
1567+
occurred_events->events |= WL_SOCKET_CLOSED;
1568+
}
1569+
15451570
if (occurred_events->events != 0)
15461571
{
15471572
occurred_events->fd = cur_event->fd;
@@ -1667,7 +1692,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
16671692
occurred_events++;
16681693
returned_events++;
16691694
}
1670-
else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
1695+
else if (cur_event->events & (WL_SOCKET_READABLE |
1696+
WL_SOCKET_WRITEABLE |
1697+
WL_SOCKET_CLOSED))
16711698
{
16721699
Assert(cur_event->fd >= 0);
16731700

@@ -1678,6 +1705,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
16781705
occurred_events->events |= WL_SOCKET_READABLE;
16791706
}
16801707

1708+
if ((cur_event->events & WL_SOCKET_CLOSED) &&
1709+
(cur_kqueue_event->filter == EVFILT_READ) &&
1710+
(cur_kqueue_event->flags & EV_EOF))
1711+
{
1712+
/* the remote peer has shut down */
1713+
occurred_events->events |= WL_SOCKET_CLOSED;
1714+
}
1715+
16811716
if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
16821717
(cur_kqueue_event->filter == EVFILT_WRITE))
16831718
{
@@ -1788,7 +1823,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
17881823
returned_events++;
17891824
}
17901825
}
1791-
else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
1826+
else if (cur_event->events & (WL_SOCKET_READABLE |
1827+
WL_SOCKET_WRITEABLE |
1828+
WL_SOCKET_CLOSED))
17921829
{
17931830
int errflags = POLLHUP | POLLERR | POLLNVAL;
17941831

@@ -1808,6 +1845,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
18081845
occurred_events->events |= WL_SOCKET_WRITEABLE;
18091846
}
18101847

1848+
#ifdef POLLRDHUP
1849+
if ((cur_event->events & WL_SOCKET_CLOSED) &&
1850+
(cur_pollfd->revents & (POLLRDHUP | errflags)))
1851+
{
1852+
/* remote peer closed, or error */
1853+
occurred_events->events |= WL_SOCKET_CLOSED;
1854+
}
1855+
#endif
1856+
18111857
if (occurred_events->events != 0)
18121858
{
18131859
occurred_events->fd = cur_event->fd;
@@ -2014,6 +2060,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
20142060
}
20152061
#endif
20162062

2063+
/*
2064+
* Return whether the current build options can report WL_SOCKET_CLOSED.
2065+
*/
2066+
bool
2067+
WaitEventSetCanReportClosed(void)
2068+
{
2069+
#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \
2070+
defined(WAIT_USE_EPOLL) || \
2071+
defined(WAIT_USE_KQUEUE)
2072+
return true;
2073+
#else
2074+
return false;
2075+
#endif
2076+
}
2077+
20172078
/*
20182079
* Get the number of wait events registered in a given WaitEventSet.
20192080
*/

src/include/storage/latch.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,11 @@ typedef struct Latch
134134
/* avoid having to deal with case on platforms not requiring it */
135135
#define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE
136136
#endif
137-
137+
#define WL_SOCKET_CLOSED (1 << 7)
138138
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \
139139
WL_SOCKET_WRITEABLE | \
140-
WL_SOCKET_CONNECTED)
140+
WL_SOCKET_CONNECTED | \
141+
WL_SOCKET_CLOSED)
141142

142143
typedef struct WaitEvent
143144
{
@@ -180,5 +181,6 @@ extern int WaitLatchOrSocket(Latch *latch, int wakeEvents,
180181
pgsocket sock, long timeout, uint32 wait_event_info);
181182
extern void InitializeLatchWaitSet(void);
182183
extern int GetNumRegisteredWaitEvents(WaitEventSet *set);
184+
extern bool WaitEventSetCanReportClosed(void);
183185

184186
#endif /* LATCH_H */

0 commit comments

Comments
 (0)