3
3
* latch.c
4
4
* Routines for inter-process latches
5
5
*
6
- * The Unix implementation uses the so-called self-pipe trick to overcome the
7
- * race condition involved with poll() (or epoll_wait() on linux) and setting
8
- * a global flag in the signal handler. When a latch is set and the current
9
- * process is waiting for it, the signal handler wakes up the poll() in
10
- * WaitLatch by writing a byte to a pipe. A signal by itself doesn't interrupt
11
- * poll() on all platforms, and even on platforms where it does, a signal that
12
- * arrives just before the poll() call does not prevent poll() from entering
13
- * sleep. An incoming byte on a pipe however reliably interrupts the sleep,
14
- * and causes poll() to return immediately even if the signal arrives before
15
- * poll() begins.
6
+ * The poll() implementation uses the so-called self-pipe trick to overcome the
7
+ * race condition involved with poll() and setting a global flag in the signal
8
+ * handler. When a latch is set and the current process is waiting for it, the
9
+ * signal handler wakes up the poll() in WaitLatch by writing a byte to a pipe.
10
+ * A signal by itself doesn't interrupt poll() on all platforms, and even on
11
+ * platforms where it does, a signal that arrives just before the poll() call
12
+ * does not prevent poll() from entering sleep. An incoming byte on a pipe
13
+ * however reliably interrupts the sleep, and causes poll() to return
14
+ * immediately even if the signal arrives before poll() begins.
16
15
*
17
- * When SetLatch is called from the same process that owns the latch,
18
- * SetLatch writes the byte directly to the pipe. If it's owned by another
19
- * process, SIGURG is sent and the signal handler in the waiting process
20
- * writes the byte to the pipe on behalf of the signaling process .
16
+ * The epoll() implementation overcomes the race with a different technique: it
17
+ * keeps SIGURG blocked and consumes from a signalfd() descriptor instead. We
18
+ * don't need to register a signal handler or create our own self-pipe. We
19
+ * assume that any system that has Linux epoll() also has Linux signalfd() .
21
20
*
22
21
* The Windows implementation uses Windows events that are inherited by all
23
22
* postmaster child processes. There's no need for the self-pipe trick there.
46
45
#include <poll.h>
47
46
#endif
48
47
48
+ #include "libpq/pqsignal.h"
49
49
#include "miscadmin.h"
50
50
#include "pgstat.h"
51
51
#include "port/atomics.h"
79
79
#error "no wait set implementation available"
80
80
#endif
81
81
82
+ #ifdef WAIT_USE_EPOLL
83
+ #include <sys/signalfd.h>
84
+ #endif
85
+
82
86
/* typedef in latch.h */
83
87
struct WaitEventSet
84
88
{
@@ -139,7 +143,14 @@ static WaitEventSet *LatchWaitSet;
139
143
#ifndef WIN32
140
144
/* Are we currently in WaitLatch? The signal handler would like to know. */
141
145
static volatile sig_atomic_t waiting = false;
146
+ #endif
142
147
148
+ #ifdef WAIT_USE_EPOLL
149
+ /* On Linux, we'll receive SIGURG via a signalfd file descriptor. */
150
+ static int signal_fd = -1 ;
151
+ #endif
152
+
153
+ #if defined(WAIT_USE_POLL ) || defined(WAIT_USE_KQUEUE )
143
154
/* Read and write ends of the self-pipe */
144
155
static int selfpipe_readfd = -1 ;
145
156
static int selfpipe_writefd = -1 ;
@@ -150,8 +161,11 @@ static int selfpipe_owner_pid = 0;
150
161
/* Private function prototypes */
151
162
static void latch_sigurg_handler (SIGNAL_ARGS );
152
163
static void sendSelfPipeByte (void );
153
- static void drainSelfPipe (void );
154
- #endif /* WIN32 */
164
+ #endif
165
+
166
+ #if defined(WAIT_USE_POLL ) || defined(WAIT_USE_EPOLL )
167
+ static void drain (void );
168
+ #endif
155
169
156
170
#if defined(WAIT_USE_EPOLL )
157
171
static void WaitEventAdjustEpoll (WaitEventSet * set , WaitEvent * event , int action );
@@ -175,7 +189,7 @@ static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
175
189
void
176
190
InitializeLatchSupport (void )
177
191
{
178
- #ifndef WIN32
192
+ #if defined( WAIT_USE_POLL ) || defined( WAIT_USE_KQUEUE )
179
193
int pipefd [2 ];
180
194
181
195
if (IsUnderPostmaster )
@@ -247,8 +261,21 @@ InitializeLatchSupport(void)
247
261
ReserveExternalFD ();
248
262
249
263
pqsignal (SIGURG , latch_sigurg_handler );
250
- #else
251
- /* currently, nothing to do here for Windows */
264
+ #endif
265
+
266
+ #ifdef WAIT_USE_EPOLL
267
+ sigset_t signalfd_mask ;
268
+
269
+ /* Block SIGURG, because we'll receive it through a signalfd. */
270
+ sigaddset (& UnBlockSig , SIGURG );
271
+
272
+ /* Set up the signalfd to receive SIGURG notifications. */
273
+ sigemptyset (& signalfd_mask );
274
+ sigaddset (& signalfd_mask , SIGURG );
275
+ signal_fd = signalfd (-1 , & signalfd_mask , SFD_NONBLOCK | SFD_CLOEXEC );
276
+ if (signal_fd < 0 )
277
+ elog (FATAL , "signalfd() failed" );
278
+ ReserveExternalFD ();
252
279
#endif
253
280
}
254
281
@@ -273,19 +300,28 @@ InitializeLatchWaitSet(void)
273
300
void
274
301
ShutdownLatchSupport (void )
275
302
{
303
+ #if defined(WAIT_USE_POLL ) || defined(WAIT_USE_KQUEUE )
276
304
pqsignal (SIGURG , SIG_IGN );
305
+ #endif
277
306
278
307
if (LatchWaitSet )
279
308
{
280
309
FreeWaitEventSet (LatchWaitSet );
281
310
LatchWaitSet = NULL ;
282
311
}
283
312
313
+ #if defined(WAIT_USE_POLL ) || defined(WAIT_USE_KQUEUE )
284
314
close (selfpipe_readfd );
285
315
close (selfpipe_writefd );
286
316
selfpipe_readfd = -1 ;
287
317
selfpipe_writefd = -1 ;
288
318
selfpipe_owner_pid = InvalidPid ;
319
+ #endif
320
+
321
+ #if defined(WAIT_USE_EPOLL )
322
+ close (signal_fd );
323
+ signal_fd = -1 ;
324
+ #endif
289
325
}
290
326
291
327
/*
@@ -299,10 +335,10 @@ InitLatch(Latch *latch)
299
335
latch -> owner_pid = MyProcPid ;
300
336
latch -> is_shared = false;
301
337
302
- #ifndef WIN32
338
+ #if defined( WAIT_USE_POLL ) || defined( WAIT_USE_KQUEUE )
303
339
/* Assert InitializeLatchSupport has been called in this process */
304
340
Assert (selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid );
305
- #else
341
+ #elif defined( WAIT_USE_WIN32 )
306
342
latch -> event = CreateEvent (NULL , TRUE, FALSE, NULL );
307
343
if (latch -> event == NULL )
308
344
elog (ERROR , "CreateEvent failed: error code %lu" , GetLastError ());
@@ -363,7 +399,7 @@ OwnLatch(Latch *latch)
363
399
/* Sanity checks */
364
400
Assert (latch -> is_shared );
365
401
366
- #ifndef WIN32
402
+ #if defined( WAIT_USE_POLL ) || defined( WAIT_USE_KQUEUE )
367
403
/* Assert InitializeLatchSupport has been called in this process */
368
404
Assert (selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid );
369
405
#endif
@@ -550,9 +586,9 @@ SetLatch(Latch *latch)
550
586
551
587
/*
552
588
* See if anyone's waiting for the latch. It can be the current process if
553
- * we're in a signal handler. We use the self-pipe to wake up the
554
- * poll()/epoll_wait() in that case. If it's another process, send a
555
- * signal.
589
+ * we're in a signal handler. We use the self-pipe or SIGURG to ourselves
590
+ * to wake up WaitEventSetWaitBlock() without races in that case. If it's
591
+ * another process, send a signal.
556
592
*
557
593
* Fetch owner_pid only once, in case the latch is concurrently getting
558
594
* owned or disowned. XXX: This assumes that pid_t is atomic, which isn't
@@ -575,11 +611,17 @@ SetLatch(Latch *latch)
575
611
return ;
576
612
else if (owner_pid == MyProcPid )
577
613
{
614
+ #if defined(WAIT_USE_POLL ) || defined(WAIT_USE_KQUEUE )
578
615
if (waiting )
579
616
sendSelfPipeByte ();
617
+ #else
618
+ if (waiting )
619
+ kill (MyProcPid , SIGURG );
620
+ #endif
580
621
}
581
622
else
582
623
kill (owner_pid , SIGURG );
624
+
583
625
#else
584
626
585
627
/*
@@ -856,8 +898,13 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
856
898
{
857
899
set -> latch = latch ;
858
900
set -> latch_pos = event -> pos ;
859
- #ifndef WIN32
901
+ #if defined( WAIT_USE_POLL ) || defined( WAIT_USE_KQUEUE )
860
902
event -> fd = selfpipe_readfd ;
903
+ #elif defined(WAIT_USE_EPOLL )
904
+ event -> fd = signal_fd ;
905
+ #else
906
+ event -> fd = PGINVALID_SOCKET ;
907
+ return event -> pos ;
861
908
#endif
862
909
}
863
910
else if (events == WL_POSTMASTER_DEATH )
@@ -932,12 +979,13 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
932
979
if (latch && latch -> owner_pid != MyProcPid )
933
980
elog (ERROR , "cannot wait on a latch owned by another process" );
934
981
set -> latch = latch ;
982
+
935
983
/*
936
984
* On Unix, we don't need to modify the kernel object because the
937
- * underlying pipe is the same for all latches so we can return
938
- * immediately. On Windows, we need to update our array of handles,
939
- * but we leave the old one in place and tolerate spurious wakeups if
940
- * the latch is disabled.
985
+ * underlying pipe (if there is one) is the same for all latches so we
986
+ * can return immediately. On Windows, we need to update our array of
987
+ * handles, but we leave the old one in place and tolerate spurious
988
+ * wakeups if the latch is disabled.
941
989
*/
942
990
#if defined(WAIT_USE_WIN32 )
943
991
if (!latch )
@@ -1421,8 +1469,8 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
1421
1469
if (cur_event -> events == WL_LATCH_SET &&
1422
1470
cur_epoll_event -> events & (EPOLLIN | EPOLLERR | EPOLLHUP ))
1423
1471
{
1424
- /* There's data in the self-pipe, clear it . */
1425
- drainSelfPipe ();
1472
+ /* Drain the signalfd . */
1473
+ drain ();
1426
1474
1427
1475
if (set -> latch && set -> latch -> is_set )
1428
1476
{
@@ -1575,7 +1623,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
1575
1623
cur_kqueue_event -> filter == EVFILT_READ )
1576
1624
{
1577
1625
/* There's data in the self-pipe, clear it. */
1578
- drainSelfPipe ();
1626
+ drain ();
1579
1627
1580
1628
if (set -> latch && set -> latch -> is_set )
1581
1629
{
@@ -1691,7 +1739,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
1691
1739
(cur_pollfd -> revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL )))
1692
1740
{
1693
1741
/* There's data in the self-pipe, clear it. */
1694
- drainSelfPipe ();
1742
+ drain ();
1695
1743
1696
1744
if (set -> latch && set -> latch -> is_set )
1697
1745
{
@@ -1951,7 +1999,8 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
1951
1999
}
1952
2000
#endif
1953
2001
1954
- #ifndef WIN32
2002
+ #if defined(WAIT_USE_POLL ) || defined(WAIT_USE_KQUEUE )
2003
+
1955
2004
/*
1956
2005
* SetLatch uses SIGURG to wake up the process waiting on the latch.
1957
2006
*
@@ -1967,10 +2016,8 @@ latch_sigurg_handler(SIGNAL_ARGS)
1967
2016
1968
2017
errno = save_errno ;
1969
2018
}
1970
- #endif /* !WIN32 */
1971
2019
1972
2020
/* Send one byte to the self-pipe, to wake up WaitLatch */
1973
- #ifndef WIN32
1974
2021
static void
1975
2022
sendSelfPipeByte (void )
1976
2023
{
@@ -2000,45 +2047,58 @@ sendSelfPipeByte(void)
2000
2047
return ;
2001
2048
}
2002
2049
}
2003
- #endif /* !WIN32 */
2050
+
2051
+ #endif
2052
+
2053
+ #if defined(WAIT_USE_POLL ) || defined(WAIT_USE_EPOLL )
2004
2054
2005
2055
/*
2006
- * Read all available data from the self-pipe
2056
+ * Read all available data from self-pipe or signalfd.
2007
2057
*
2008
2058
* Note: this is only called when waiting = true. If it fails and doesn't
2009
2059
* return, it must reset that flag first (though ideally, this will never
2010
2060
* happen).
2011
2061
*/
2012
- #ifndef WIN32
2013
2062
static void
2014
- drainSelfPipe (void )
2063
+ drain (void )
2015
2064
{
2016
- /*
2017
- * There shouldn't normally be more than one byte in the pipe, or maybe a
2018
- * few bytes if multiple processes run SetLatch at the same instant.
2019
- */
2020
- char buf [16 ];
2065
+ char buf [1024 ];
2021
2066
int rc ;
2067
+ int fd ;
2068
+
2069
+ #ifdef WAIT_USE_POLL
2070
+ fd = selfpipe_readfd ;
2071
+ #else
2072
+ fd = signal_fd ;
2073
+ #endif
2022
2074
2023
2075
for (;;)
2024
2076
{
2025
- rc = read (selfpipe_readfd , buf , sizeof (buf ));
2077
+ rc = read (fd , buf , sizeof (buf ));
2026
2078
if (rc < 0 )
2027
2079
{
2028
2080
if (errno == EAGAIN || errno == EWOULDBLOCK )
2029
- break ; /* the pipe is empty */
2081
+ break ; /* the descriptor is empty */
2030
2082
else if (errno == EINTR )
2031
2083
continue ; /* retry */
2032
2084
else
2033
2085
{
2034
2086
waiting = false;
2087
+ #ifdef WAIT_USE_POLL
2035
2088
elog (ERROR , "read() on self-pipe failed: %m" );
2089
+ #else
2090
+ elog (ERROR , "read() on signalfd failed: %m" );
2091
+ #endif
2036
2092
}
2037
2093
}
2038
2094
else if (rc == 0 )
2039
2095
{
2040
2096
waiting = false;
2097
+ #ifdef WAIT_USE_POLL
2041
2098
elog (ERROR , "unexpected EOF on self-pipe" );
2099
+ #else
2100
+ elog (ERROR , "unexpected EOF on signalfd" );
2101
+ #endif
2042
2102
}
2043
2103
else if (rc < sizeof (buf ))
2044
2104
{
@@ -2048,4 +2108,5 @@ drainSelfPipe(void)
2048
2108
/* else buffer wasn't big enough, so read again */
2049
2109
}
2050
2110
}
2051
- #endif /* !WIN32 */
2111
+
2112
+ #endif
0 commit comments