@@ -840,6 +840,7 @@ FreeWaitEventSet(WaitEventSet *set)
840
840
* - WL_SOCKET_CONNECTED: Wait for socket connection to be established,
841
841
* can be combined with other WL_SOCKET_* events (on non-Windows
842
842
* platforms, this is the same as WL_SOCKET_WRITEABLE)
843
+ * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
843
844
* - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
844
845
*
845
846
* Returns the offset in WaitEventSet->events (starting from 0), which can be
@@ -1042,12 +1043,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
1042
1043
else
1043
1044
{
1044
1045
Assert (event -> fd != PGINVALID_SOCKET );
1045
- Assert (event -> events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE ));
1046
+ Assert (event -> events & (WL_SOCKET_READABLE |
1047
+ WL_SOCKET_WRITEABLE |
1048
+ WL_SOCKET_CLOSED ));
1046
1049
1047
1050
if (event -> events & WL_SOCKET_READABLE )
1048
1051
epoll_ev .events |= EPOLLIN ;
1049
1052
if (event -> events & WL_SOCKET_WRITEABLE )
1050
1053
epoll_ev .events |= EPOLLOUT ;
1054
+ if (event -> events & WL_SOCKET_CLOSED )
1055
+ epoll_ev .events |= EPOLLRDHUP ;
1051
1056
}
1052
1057
1053
1058
/*
@@ -1086,12 +1091,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
1086
1091
}
1087
1092
else
1088
1093
{
1089
- Assert (event -> events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE ));
1094
+ Assert (event -> events & (WL_SOCKET_READABLE |
1095
+ WL_SOCKET_WRITEABLE |
1096
+ WL_SOCKET_CLOSED ));
1090
1097
pollfd -> events = 0 ;
1091
1098
if (event -> events & WL_SOCKET_READABLE )
1092
1099
pollfd -> events |= POLLIN ;
1093
1100
if (event -> events & WL_SOCKET_WRITEABLE )
1094
1101
pollfd -> events |= POLLOUT ;
1102
+ #ifdef POLLRDHUP
1103
+ if (event -> events & WL_SOCKET_CLOSED )
1104
+ pollfd -> events |= POLLRDHUP ;
1105
+ #endif
1095
1106
}
1096
1107
1097
1108
Assert (event -> fd != PGINVALID_SOCKET );
@@ -1164,7 +1175,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
1164
1175
Assert (event -> events != WL_LATCH_SET || set -> latch != NULL );
1165
1176
Assert (event -> events == WL_LATCH_SET ||
1166
1177
event -> events == WL_POSTMASTER_DEATH ||
1167
- (event -> events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE )));
1178
+ (event -> events & (WL_SOCKET_READABLE |
1179
+ WL_SOCKET_WRITEABLE |
1180
+ WL_SOCKET_CLOSED )));
1168
1181
1169
1182
if (event -> events == WL_POSTMASTER_DEATH )
1170
1183
{
@@ -1187,9 +1200,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
1187
1200
* old event mask to the new event mask, since kevent treats readable
1188
1201
* and writable as separate events.
1189
1202
*/
1190
- if (old_events & WL_SOCKET_READABLE )
1203
+ if (old_events & ( WL_SOCKET_READABLE | WL_SOCKET_CLOSED ) )
1191
1204
old_filt_read = true;
1192
- if (event -> events & WL_SOCKET_READABLE )
1205
+ if (event -> events & ( WL_SOCKET_READABLE | WL_SOCKET_CLOSED ) )
1193
1206
new_filt_read = true;
1194
1207
if (old_events & WL_SOCKET_WRITEABLE )
1195
1208
old_filt_write = true;
@@ -1209,7 +1222,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
1209
1222
event );
1210
1223
}
1211
1224
1212
- Assert (count > 0 );
1225
+ /* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */
1226
+ if (count == 0 )
1227
+ return ;
1228
+
1213
1229
Assert (count <= 2 );
1214
1230
1215
1231
rc = kevent (set -> kqueue_fd , & k_ev [0 ], count , NULL , 0 , NULL );
@@ -1524,7 +1540,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
1524
1540
returned_events ++ ;
1525
1541
}
1526
1542
}
1527
- else if (cur_event -> events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE ))
1543
+ else if (cur_event -> events & (WL_SOCKET_READABLE |
1544
+ WL_SOCKET_WRITEABLE |
1545
+ WL_SOCKET_CLOSED ))
1528
1546
{
1529
1547
Assert (cur_event -> fd != PGINVALID_SOCKET );
1530
1548
@@ -1542,6 +1560,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
1542
1560
occurred_events -> events |= WL_SOCKET_WRITEABLE ;
1543
1561
}
1544
1562
1563
+ if ((cur_event -> events & WL_SOCKET_CLOSED ) &&
1564
+ (cur_epoll_event -> events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP )))
1565
+ {
1566
+ /* remote peer shut down, or error */
1567
+ occurred_events -> events |= WL_SOCKET_CLOSED ;
1568
+ }
1569
+
1545
1570
if (occurred_events -> events != 0 )
1546
1571
{
1547
1572
occurred_events -> fd = cur_event -> fd ;
@@ -1667,7 +1692,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
1667
1692
occurred_events ++ ;
1668
1693
returned_events ++ ;
1669
1694
}
1670
- else if (cur_event -> events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE ))
1695
+ else if (cur_event -> events & (WL_SOCKET_READABLE |
1696
+ WL_SOCKET_WRITEABLE |
1697
+ WL_SOCKET_CLOSED ))
1671
1698
{
1672
1699
Assert (cur_event -> fd >= 0 );
1673
1700
@@ -1678,6 +1705,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
1678
1705
occurred_events -> events |= WL_SOCKET_READABLE ;
1679
1706
}
1680
1707
1708
+ if ((cur_event -> events & WL_SOCKET_CLOSED ) &&
1709
+ (cur_kqueue_event -> filter == EVFILT_READ ) &&
1710
+ (cur_kqueue_event -> flags & EV_EOF ))
1711
+ {
1712
+ /* the remote peer has shut down */
1713
+ occurred_events -> events |= WL_SOCKET_CLOSED ;
1714
+ }
1715
+
1681
1716
if ((cur_event -> events & WL_SOCKET_WRITEABLE ) &&
1682
1717
(cur_kqueue_event -> filter == EVFILT_WRITE ))
1683
1718
{
@@ -1788,7 +1823,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
1788
1823
returned_events ++ ;
1789
1824
}
1790
1825
}
1791
- else if (cur_event -> events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE ))
1826
+ else if (cur_event -> events & (WL_SOCKET_READABLE |
1827
+ WL_SOCKET_WRITEABLE |
1828
+ WL_SOCKET_CLOSED ))
1792
1829
{
1793
1830
int errflags = POLLHUP | POLLERR | POLLNVAL ;
1794
1831
@@ -1808,6 +1845,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
1808
1845
occurred_events -> events |= WL_SOCKET_WRITEABLE ;
1809
1846
}
1810
1847
1848
+ #ifdef POLLRDHUP
1849
+ if ((cur_event -> events & WL_SOCKET_CLOSED ) &&
1850
+ (cur_pollfd -> revents & (POLLRDHUP | errflags )))
1851
+ {
1852
+ /* remote peer closed, or error */
1853
+ occurred_events -> events |= WL_SOCKET_CLOSED ;
1854
+ }
1855
+ #endif
1856
+
1811
1857
if (occurred_events -> events != 0 )
1812
1858
{
1813
1859
occurred_events -> fd = cur_event -> fd ;
@@ -2014,6 +2060,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
2014
2060
}
2015
2061
#endif
2016
2062
2063
+ /*
2064
+ * Return whether the current build options can report WL_SOCKET_CLOSED.
2065
+ */
2066
+ bool
2067
+ WaitEventSetCanReportClosed (void )
2068
+ {
2069
+ #if (defined(WAIT_USE_POLL ) && defined(POLLRDHUP )) || \
2070
+ defined(WAIT_USE_EPOLL ) || \
2071
+ defined(WAIT_USE_KQUEUE )
2072
+ return true;
2073
+ #else
2074
+ return false;
2075
+ #endif
2076
+ }
2077
+
2017
2078
/*
2018
2079
* Get the number of wait events registered in a given WaitEventSet.
2019
2080
*/
0 commit comments