@@ -12,9 +12,11 @@ import (
12
12
appsv1 "k8s.io/api/apps/v1"
13
13
batchv1 "k8s.io/api/batch/v1"
14
14
corev1 "k8s.io/api/core/v1"
15
+ "k8s.io/apimachinery/pkg/api/resource"
15
16
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
17
"k8s.io/apimachinery/pkg/labels"
17
18
19
+ "github.com/crunchydata/postgres-operator/internal/feature"
18
20
"github.com/crunchydata/postgres-operator/internal/initialize"
19
21
"github.com/crunchydata/postgres-operator/internal/naming"
20
22
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
@@ -33,17 +35,17 @@ func pgUpgradeJob(upgrade *v1beta1.PGUpgrade) metav1.ObjectMeta {
33
35
34
36
// upgradeCommand returns an entrypoint that prepares the filesystem for
35
37
// and performs a PostgreSQL major version upgrade using pg_upgrade.
36
- func upgradeCommand (upgrade * v1beta1. PGUpgrade , fetchKeyCommand string ) []string {
37
- oldVersion := fmt . Sprint ( upgrade . Spec . FromPostgresVersion )
38
- newVersion := fmt .Sprint ( upgrade . Spec . ToPostgresVersion )
38
+ func upgradeCommand (oldVersion , newVersion int , fetchKeyCommand string , availableCPUs int ) []string {
39
+ // Use multiple CPUs when three or more are available.
40
+ argJobs := fmt .Sprintf ( ` --jobs=%d` , max ( 1 , availableCPUs - 1 ) )
39
41
40
42
// if the fetch key command is set for TDE, provide the value during initialization
41
43
initdb := `/usr/pgsql-"${new_version}"/bin/initdb -k -D /pgdata/pg"${new_version}"`
42
44
if fetchKeyCommand != "" {
43
45
initdb += ` --encryption-key-command "` + fetchKeyCommand + `"`
44
46
}
45
47
46
- args := []string {oldVersion , newVersion }
48
+ args := []string {fmt . Sprint ( oldVersion ), fmt . Sprint ( newVersion ) }
47
49
script := strings .Join ([]string {
48
50
`declare -r data_volume='/pgdata' old_version="$1" new_version="$2"` ,
49
51
`printf 'Performing PostgreSQL upgrade from version "%s" to "%s" ...\n\n' "$@"` ,
@@ -97,14 +99,14 @@ func upgradeCommand(upgrade *v1beta1.PGUpgrade, fetchKeyCommand string) []string
97
99
`echo -e "Step 5: Running pg_upgrade check...\n"` ,
98
100
`time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \` ,
99
101
`--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}"\` ,
100
- ` --new-datadir /pgdata/pg"${new_version}" --link --check` ,
102
+ ` --new-datadir /pgdata/pg"${new_version}" --link --check` + argJobs ,
101
103
102
104
// Assuming the check completes successfully, the pg_upgrade command will
103
105
// be run that actually prepares the upgraded pgdata directory.
104
106
`echo -e "\nStep 6: Running pg_upgrade...\n"` ,
105
107
`time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \` ,
106
108
`--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}" \` ,
107
- `--new-datadir /pgdata/pg"${new_version}" --link` ,
109
+ `--new-datadir /pgdata/pg"${new_version}" --link` + argJobs ,
108
110
109
111
// Since we have cleared the Patroni cluster step by removing the EndPoints, we copy patroni.dynamic.json
110
112
// from the old data dir to help retain PostgreSQL parameters you had set before.
@@ -118,10 +120,21 @@ func upgradeCommand(upgrade *v1beta1.PGUpgrade, fetchKeyCommand string) []string
118
120
return append ([]string {"bash" , "-ceu" , "--" , script , "upgrade" }, args ... )
119
121
}
120
122
123
+ // largestWholeCPU returns the maximum CPU request or limit as a non-negative
124
+ // integer of CPUs. When resources lacks any CPU, the result is zero.
125
+ func largestWholeCPU (resources corev1.ResourceRequirements ) int {
126
+ // Read CPU quantities as millicores then divide to get the "floor."
127
+ // NOTE: [resource.Quantity.Value] looks easier, but it rounds up.
128
+ return max (
129
+ int (resources .Limits .Cpu ().ScaledValue (resource .Milli )/ 1000 ),
130
+ int (resources .Requests .Cpu ().ScaledValue (resource .Milli )/ 1000 ),
131
+ 0 )
132
+ }
133
+
121
134
// generateUpgradeJob returns a Job that can upgrade the PostgreSQL data
122
135
// directory of the startup instance.
123
136
func (r * PGUpgradeReconciler ) generateUpgradeJob (
124
- _ context.Context , upgrade * v1beta1.PGUpgrade ,
137
+ ctx context.Context , upgrade * v1beta1.PGUpgrade ,
125
138
startup * appsv1.StatefulSet , fetchKeyCommand string ,
126
139
) * batchv1.Job {
127
140
job := & batchv1.Job {}
@@ -167,6 +180,12 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
167
180
job .Spec .BackoffLimit = initialize .Int32 (0 )
168
181
job .Spec .Template .Spec .RestartPolicy = corev1 .RestartPolicyNever
169
182
183
+ // When enabled, calculate the number of CPUs for pg_upgrade.
184
+ wholeCPUs := 0
185
+ if feature .Enabled (ctx , feature .PGUpgradeCPUConcurrency ) {
186
+ wholeCPUs = largestWholeCPU (upgrade .Spec .Resources )
187
+ }
188
+
170
189
// Replace all containers with one that does the upgrade.
171
190
job .Spec .Template .Spec .EphemeralContainers = nil
172
191
job .Spec .Template .Spec .InitContainers = nil
@@ -179,7 +198,11 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
179
198
VolumeMounts : database .VolumeMounts ,
180
199
181
200
// Use our upgrade command and the specified image and resources.
182
- Command : upgradeCommand (upgrade , fetchKeyCommand ),
201
+ Command : upgradeCommand (
202
+ upgrade .Spec .FromPostgresVersion ,
203
+ upgrade .Spec .ToPostgresVersion ,
204
+ fetchKeyCommand ,
205
+ wholeCPUs ),
183
206
Image : pgUpgradeContainerImage (upgrade ),
184
207
ImagePullPolicy : upgrade .Spec .ImagePullPolicy ,
185
208
Resources : upgrade .Spec .Resources ,
0 commit comments