108
108
#include "replication/logicallauncher.h"
109
109
#include "replication/slotsync.h"
110
110
#include "replication/walsender.h"
111
+ #include "storage/aio_subsys.h"
111
112
#include "storage/fd.h"
113
+ #include "storage/io_worker.h"
112
114
#include "storage/ipc.h"
113
115
#include "storage/pmsignal.h"
116
+ #include "storage/proc.h"
114
117
#include "tcop/backend_startup.h"
115
118
#include "tcop/tcopprot.h"
116
119
#include "utils/datetime.h"
@@ -340,6 +343,7 @@ typedef enum
340
343
* ckpt */
341
344
PM_WAIT_XLOG_ARCHIVAL , /* waiting for archiver and walsenders to
342
345
* finish */
346
+ PM_WAIT_IO_WORKERS , /* waiting for io workers to exit */
343
347
PM_WAIT_CHECKPOINTER , /* waiting for checkpointer to shut down */
344
348
PM_WAIT_DEAD_END , /* waiting for dead-end children to exit */
345
349
PM_NO_CHILDREN , /* all important children have exited */
@@ -402,6 +406,10 @@ bool LoadedSSL = false;
402
406
static DNSServiceRef bonjour_sdref = NULL ;
403
407
#endif
404
408
409
+ /* State for IO worker management. */
410
+ static int io_worker_count = 0 ;
411
+ static PMChild * io_worker_children [MAX_IO_WORKERS ];
412
+
405
413
/*
406
414
* postmaster.c - function prototypes
407
415
*/
@@ -436,6 +444,8 @@ static void TerminateChildren(int signal);
436
444
static int CountChildren (BackendTypeMask targetMask );
437
445
static void LaunchMissingBackgroundProcesses (void );
438
446
static void maybe_start_bgworkers (void );
447
+ static bool maybe_reap_io_worker (int pid );
448
+ static void maybe_adjust_io_workers (void );
439
449
static bool CreateOptsFile (int argc , char * argv [], char * fullprogname );
440
450
static PMChild * StartChildProcess (BackendType type );
441
451
static void StartSysLogger (void );
@@ -1365,6 +1375,11 @@ PostmasterMain(int argc, char *argv[])
1365
1375
*/
1366
1376
AddToDataDirLockFile (LOCK_FILE_LINE_PM_STATUS , PM_STATUS_STARTING );
1367
1377
1378
+ UpdatePMState (PM_STARTUP );
1379
+
1380
+ /* Make sure we can perform I/O while starting up. */
1381
+ maybe_adjust_io_workers ();
1382
+
1368
1383
/* Start bgwriter and checkpointer so they can help with recovery */
1369
1384
if (CheckpointerPMChild == NULL )
1370
1385
CheckpointerPMChild = StartChildProcess (B_CHECKPOINTER );
@@ -1377,7 +1392,6 @@ PostmasterMain(int argc, char *argv[])
1377
1392
StartupPMChild = StartChildProcess (B_STARTUP );
1378
1393
Assert (StartupPMChild != NULL );
1379
1394
StartupStatus = STARTUP_RUNNING ;
1380
- UpdatePMState (PM_STARTUP );
1381
1395
1382
1396
/* Some workers may be scheduled to start now */
1383
1397
maybe_start_bgworkers ();
@@ -2502,6 +2516,16 @@ process_pm_child_exit(void)
2502
2516
continue ;
2503
2517
}
2504
2518
2519
+ /* Was it an IO worker? */
2520
+ if (maybe_reap_io_worker (pid ))
2521
+ {
2522
+ if (!EXIT_STATUS_0 (exitstatus ) && !EXIT_STATUS_1 (exitstatus ))
2523
+ HandleChildCrash (pid , exitstatus , _ ("io worker" ));
2524
+
2525
+ maybe_adjust_io_workers ();
2526
+ continue ;
2527
+ }
2528
+
2505
2529
/*
2506
2530
* Was it a backend or a background worker?
2507
2531
*/
@@ -2723,6 +2747,7 @@ HandleFatalError(QuitSignalReason reason, bool consider_sigabrt)
2723
2747
case PM_WAIT_XLOG_SHUTDOWN :
2724
2748
case PM_WAIT_XLOG_ARCHIVAL :
2725
2749
case PM_WAIT_CHECKPOINTER :
2750
+ case PM_WAIT_IO_WORKERS :
2726
2751
2727
2752
/*
2728
2753
* NB: Similar code exists in PostmasterStateMachine()'s handling
@@ -2905,20 +2930,21 @@ PostmasterStateMachine(void)
2905
2930
2906
2931
/*
2907
2932
* If we are doing crash recovery or an immediate shutdown then we
2908
- * expect archiver, checkpointer and walsender to exit as well,
2909
- * otherwise not.
2933
+ * expect archiver, checkpointer, io workers and walsender to exit as
2934
+ * well, otherwise not.
2910
2935
*/
2911
2936
if (FatalError || Shutdown >= ImmediateShutdown )
2912
2937
targetMask = btmask_add (targetMask ,
2913
2938
B_CHECKPOINTER ,
2914
2939
B_ARCHIVER ,
2940
+ B_IO_WORKER ,
2915
2941
B_WAL_SENDER );
2916
2942
2917
2943
/*
2918
- * Normally walsenders and archiver will continue running; they will
2919
- * be terminated later after writing the checkpoint record. We also
2920
- * let dead-end children to keep running for now. The syslogger
2921
- * process exits last.
2944
+ * Normally archiver, checkpointer, IO workers and walsenders will
2945
+ * continue running; they will be terminated later after writing the
2946
+ * checkpoint record. We also let dead-end children to keep running
2947
+ * for now. The syslogger process exits last.
2922
2948
*
2923
2949
* This assertion checks that we have covered all backend types,
2924
2950
* either by including them in targetMask, or by noting here that they
@@ -2933,12 +2959,13 @@ PostmasterStateMachine(void)
2933
2959
B_LOGGER );
2934
2960
2935
2961
/*
2936
- * Archiver, checkpointer and walsender may or may not be in
2937
- * targetMask already.
2962
+ * Archiver, checkpointer, IO workers, and walsender may or may
2963
+ * not be in targetMask already.
2938
2964
*/
2939
2965
remainMask = btmask_add (remainMask ,
2940
2966
B_ARCHIVER ,
2941
2967
B_CHECKPOINTER ,
2968
+ B_IO_WORKER ,
2942
2969
B_WAL_SENDER );
2943
2970
2944
2971
/* these are not real postmaster children */
@@ -3039,11 +3066,25 @@ PostmasterStateMachine(void)
3039
3066
{
3040
3067
/*
3041
3068
* PM_WAIT_XLOG_ARCHIVAL state ends when there are no children other
3042
- * than checkpointer, dead-end children and logger left. There
3069
+ * than checkpointer, io workers and dead-end children left. There
3043
3070
* shouldn't be any regular backends left by now anyway; what we're
3044
3071
* really waiting for is for walsenders and archiver to exit.
3045
3072
*/
3046
- if (CountChildren (btmask_all_except (B_CHECKPOINTER , B_LOGGER , B_DEAD_END_BACKEND )) == 0 )
3073
+ if (CountChildren (btmask_all_except (B_CHECKPOINTER , B_IO_WORKER ,
3074
+ B_LOGGER , B_DEAD_END_BACKEND )) == 0 )
3075
+ {
3076
+ UpdatePMState (PM_WAIT_IO_WORKERS );
3077
+ SignalChildren (SIGUSR2 , btmask (B_IO_WORKER ));
3078
+ }
3079
+ }
3080
+
3081
+ if (pmState == PM_WAIT_IO_WORKERS )
3082
+ {
3083
+ /*
3084
+ * PM_WAIT_IO_WORKERS state ends when there's only checkpointer and
3085
+ * dead_end children left.
3086
+ */
3087
+ if (io_worker_count == 0 )
3047
3088
{
3048
3089
UpdatePMState (PM_WAIT_CHECKPOINTER );
3049
3090
@@ -3171,10 +3212,14 @@ PostmasterStateMachine(void)
3171
3212
/* re-create shared memory and semaphores */
3172
3213
CreateSharedMemoryAndSemaphores ();
3173
3214
3215
+ UpdatePMState (PM_STARTUP );
3216
+
3217
+ /* Make sure we can perform I/O while starting up. */
3218
+ maybe_adjust_io_workers ();
3219
+
3174
3220
StartupPMChild = StartChildProcess (B_STARTUP );
3175
3221
Assert (StartupPMChild != NULL );
3176
3222
StartupStatus = STARTUP_RUNNING ;
3177
- UpdatePMState (PM_STARTUP );
3178
3223
/* crash recovery started, reset SIGKILL flag */
3179
3224
AbortStartTime = 0 ;
3180
3225
@@ -3198,6 +3243,7 @@ pmstate_name(PMState state)
3198
3243
PM_TOSTR_CASE (PM_WAIT_BACKENDS );
3199
3244
PM_TOSTR_CASE (PM_WAIT_XLOG_SHUTDOWN );
3200
3245
PM_TOSTR_CASE (PM_WAIT_XLOG_ARCHIVAL );
3246
+ PM_TOSTR_CASE (PM_WAIT_IO_WORKERS );
3201
3247
PM_TOSTR_CASE (PM_WAIT_DEAD_END );
3202
3248
PM_TOSTR_CASE (PM_WAIT_CHECKPOINTER );
3203
3249
PM_TOSTR_CASE (PM_NO_CHILDREN );
@@ -3235,6 +3281,16 @@ LaunchMissingBackgroundProcesses(void)
3235
3281
if (SysLoggerPMChild == NULL && Logging_collector )
3236
3282
StartSysLogger ();
3237
3283
3284
+ /*
3285
+ * The number of configured workers might have changed, or a prior start
3286
+ * of a worker might have failed. Check if we need to start/stop any
3287
+ * workers.
3288
+ *
3289
+ * A config file change will always lead to this function being called, so
3290
+ * we always will process the config change in a timely manner.
3291
+ */
3292
+ maybe_adjust_io_workers ();
3293
+
3238
3294
/*
3239
3295
* The checkpointer and the background writer are active from the start,
3240
3296
* until shutdown is initiated.
@@ -4120,6 +4176,7 @@ bgworker_should_start_now(BgWorkerStartTime start_time)
4120
4176
case PM_WAIT_DEAD_END :
4121
4177
case PM_WAIT_XLOG_ARCHIVAL :
4122
4178
case PM_WAIT_XLOG_SHUTDOWN :
4179
+ case PM_WAIT_IO_WORKERS :
4123
4180
case PM_WAIT_BACKENDS :
4124
4181
case PM_STOP_BACKENDS :
4125
4182
break ;
@@ -4270,6 +4327,99 @@ maybe_start_bgworkers(void)
4270
4327
}
4271
4328
}
4272
4329
4330
+ static bool
4331
+ maybe_reap_io_worker (int pid )
4332
+ {
4333
+ for (int id = 0 ; id < MAX_IO_WORKERS ; ++ id )
4334
+ {
4335
+ if (io_worker_children [id ] &&
4336
+ io_worker_children [id ]-> pid == pid )
4337
+ {
4338
+ ReleasePostmasterChildSlot (io_worker_children [id ]);
4339
+
4340
+ -- io_worker_count ;
4341
+ io_worker_children [id ] = NULL ;
4342
+ return true;
4343
+ }
4344
+ }
4345
+ return false;
4346
+ }
4347
+
4348
+ /*
4349
+ * Start or stop IO workers, to close the gap between the number of running
4350
+ * workers and the number of configured workers. Used to respond to change of
4351
+ * the io_workers GUC (by increasing and decreasing the number of workers), as
4352
+ * well as workers terminating in response to errors (by starting
4353
+ * "replacement" workers).
4354
+ */
4355
+ static void
4356
+ maybe_adjust_io_workers (void )
4357
+ {
4358
+ if (!pgaio_workers_enabled ())
4359
+ return ;
4360
+
4361
+ /*
4362
+ * If we're in final shutting down state, then we're just waiting for all
4363
+ * processes to exit.
4364
+ */
4365
+ if (pmState >= PM_WAIT_IO_WORKERS )
4366
+ return ;
4367
+
4368
+ /* Don't start new workers during an immediate shutdown either. */
4369
+ if (Shutdown >= ImmediateShutdown )
4370
+ return ;
4371
+
4372
+ /*
4373
+ * Don't start new workers if we're in the shutdown phase of a crash
4374
+ * restart. But we *do* need to start if we're already starting up again.
4375
+ */
4376
+ if (FatalError && pmState >= PM_STOP_BACKENDS )
4377
+ return ;
4378
+
4379
+ Assert (pmState < PM_WAIT_IO_WORKERS );
4380
+
4381
+ /* Not enough running? */
4382
+ while (io_worker_count < io_workers )
4383
+ {
4384
+ PMChild * child ;
4385
+ int id ;
4386
+
4387
+ /* find unused entry in io_worker_children array */
4388
+ for (id = 0 ; id < MAX_IO_WORKERS ; ++ id )
4389
+ {
4390
+ if (io_worker_children [id ] == NULL )
4391
+ break ;
4392
+ }
4393
+ if (id == MAX_IO_WORKERS )
4394
+ elog (ERROR , "could not find a free IO worker ID" );
4395
+
4396
+ /* Try to launch one. */
4397
+ child = StartChildProcess (B_IO_WORKER );
4398
+ if (child != NULL )
4399
+ {
4400
+ io_worker_children [id ] = child ;
4401
+ ++ io_worker_count ;
4402
+ }
4403
+ else
4404
+ break ; /* XXX try again soon? */
4405
+ }
4406
+
4407
+ /* Too many running? */
4408
+ if (io_worker_count > io_workers )
4409
+ {
4410
+ /* ask the IO worker in the highest slot to exit */
4411
+ for (int id = MAX_IO_WORKERS - 1 ; id >= 0 ; -- id )
4412
+ {
4413
+ if (io_worker_children [id ] != NULL )
4414
+ {
4415
+ kill (io_worker_children [id ]-> pid , SIGUSR2 );
4416
+ break ;
4417
+ }
4418
+ }
4419
+ }
4420
+ }
4421
+
4422
+
4273
4423
/*
4274
4424
* When a backend asks to be notified about worker state changes, we
4275
4425
* set a flag in its backend entry. The background worker machinery needs
0 commit comments