Skip to content

Commit 5fae3e9

Browse files
committed
Use multiple processors when PGUpgrade is given CPU resources
The --jobs flag allows for some aspects of pg_upgrade to operate in parallel. The documentation says: This option can dramatically reduce the time to upgrade a multi-database server running on a multiprocessor machine. Issue: PGO-1958 See: https://www.postgresql.org/docs/current/pgupgrade.html
1 parent 8fb1788 commit 5fae3e9

File tree

4 files changed

+131
-18
lines changed

4 files changed

+131
-18
lines changed

internal/controller/pgupgrade/jobs.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ import (
1212
appsv1 "k8s.io/api/apps/v1"
1313
batchv1 "k8s.io/api/batch/v1"
1414
corev1 "k8s.io/api/core/v1"
15+
"k8s.io/apimachinery/pkg/api/resource"
1516
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1617
"k8s.io/apimachinery/pkg/labels"
1718

19+
"github.com/crunchydata/postgres-operator/internal/feature"
1820
"github.com/crunchydata/postgres-operator/internal/initialize"
1921
"github.com/crunchydata/postgres-operator/internal/naming"
2022
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
@@ -33,17 +35,17 @@ func pgUpgradeJob(upgrade *v1beta1.PGUpgrade) metav1.ObjectMeta {
3335

3436
// upgradeCommand returns an entrypoint that prepares the filesystem for
3537
// and performs a PostgreSQL major version upgrade using pg_upgrade.
36-
func upgradeCommand(upgrade *v1beta1.PGUpgrade, fetchKeyCommand string) []string {
37-
oldVersion := fmt.Sprint(upgrade.Spec.FromPostgresVersion)
38-
newVersion := fmt.Sprint(upgrade.Spec.ToPostgresVersion)
38+
func upgradeCommand(oldVersion, newVersion int, fetchKeyCommand string, availableCPUs int) []string {
39+
// Use multiple CPUs when three or more are available.
40+
argJobs := fmt.Sprintf(` --jobs=%d`, max(1, availableCPUs-1))
3941

4042
// if the fetch key command is set for TDE, provide the value during initialization
4143
initdb := `/usr/pgsql-"${new_version}"/bin/initdb -k -D /pgdata/pg"${new_version}"`
4244
if fetchKeyCommand != "" {
4345
initdb += ` --encryption-key-command "` + fetchKeyCommand + `"`
4446
}
4547

46-
args := []string{oldVersion, newVersion}
48+
args := []string{fmt.Sprint(oldVersion), fmt.Sprint(newVersion)}
4749
script := strings.Join([]string{
4850
`declare -r data_volume='/pgdata' old_version="$1" new_version="$2"`,
4951
`printf 'Performing PostgreSQL upgrade from version "%s" to "%s" ...\n\n' "$@"`,
@@ -97,14 +99,14 @@ func upgradeCommand(upgrade *v1beta1.PGUpgrade, fetchKeyCommand string) []string
9799
`echo -e "Step 5: Running pg_upgrade check...\n"`,
98100
`time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \`,
99101
`--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}"\`,
100-
` --new-datadir /pgdata/pg"${new_version}" --link --check`,
102+
` --new-datadir /pgdata/pg"${new_version}" --link --check` + argJobs,
101103

102104
// Assuming the check completes successfully, the pg_upgrade command will
103105
// be run that actually prepares the upgraded pgdata directory.
104106
`echo -e "\nStep 6: Running pg_upgrade...\n"`,
105107
`time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \`,
106108
`--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}" \`,
107-
`--new-datadir /pgdata/pg"${new_version}" --link`,
109+
`--new-datadir /pgdata/pg"${new_version}" --link` + argJobs,
108110

109111
// Since we have cleared the Patroni cluster step by removing the EndPoints, we copy patroni.dynamic.json
110112
// from the old data dir to help retain PostgreSQL parameters you had set before.
@@ -118,10 +120,21 @@ func upgradeCommand(upgrade *v1beta1.PGUpgrade, fetchKeyCommand string) []string
118120
return append([]string{"bash", "-ceu", "--", script, "upgrade"}, args...)
119121
}
120122

123+
// largestWholeCPU returns the maximum CPU request or limit as a non-negative
124+
// integer of CPUs. When resources lacks any CPU, the result is zero.
125+
func largestWholeCPU(resources corev1.ResourceRequirements) int {
126+
// Read CPU quantities as millicores then divide to get the "floor."
127+
// NOTE: [resource.Quantity.Value] looks easier, but it rounds up.
128+
return max(
129+
int(resources.Limits.Cpu().ScaledValue(resource.Milli)/1000),
130+
int(resources.Requests.Cpu().ScaledValue(resource.Milli)/1000),
131+
0)
132+
}
133+
121134
// generateUpgradeJob returns a Job that can upgrade the PostgreSQL data
122135
// directory of the startup instance.
123136
func (r *PGUpgradeReconciler) generateUpgradeJob(
124-
_ context.Context, upgrade *v1beta1.PGUpgrade,
137+
ctx context.Context, upgrade *v1beta1.PGUpgrade,
125138
startup *appsv1.StatefulSet, fetchKeyCommand string,
126139
) *batchv1.Job {
127140
job := &batchv1.Job{}
@@ -167,6 +180,12 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
167180
job.Spec.BackoffLimit = initialize.Int32(0)
168181
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever
169182

183+
// When enabled, calculate the number of CPUs for pg_upgrade.
184+
wholeCPUs := 0
185+
if feature.Enabled(ctx, feature.PGUpgradeCPUConcurrency) {
186+
wholeCPUs = largestWholeCPU(upgrade.Spec.Resources)
187+
}
188+
170189
// Replace all containers with one that does the upgrade.
171190
job.Spec.Template.Spec.EphemeralContainers = nil
172191
job.Spec.Template.Spec.InitContainers = nil
@@ -179,7 +198,11 @@ func (r *PGUpgradeReconciler) generateUpgradeJob(
179198
VolumeMounts: database.VolumeMounts,
180199

181200
// Use our upgrade command and the specified image and resources.
182-
Command: upgradeCommand(upgrade, fetchKeyCommand),
201+
Command: upgradeCommand(
202+
upgrade.Spec.FromPostgresVersion,
203+
upgrade.Spec.ToPostgresVersion,
204+
fetchKeyCommand,
205+
wholeCPUs),
183206
Image: pgUpgradeContainerImage(upgrade),
184207
ImagePullPolicy: upgrade.Spec.ImagePullPolicy,
185208
Resources: upgrade.Spec.Resources,

internal/controller/pgupgrade/jobs_test.go

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,85 @@ import (
1616
"k8s.io/apimachinery/pkg/api/resource"
1717
"sigs.k8s.io/yaml"
1818

19+
"github.com/crunchydata/postgres-operator/internal/feature"
1920
"github.com/crunchydata/postgres-operator/internal/initialize"
2021
"github.com/crunchydata/postgres-operator/internal/testing/cmp"
2122
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
2223
)
2324

25+
func TestLargestWholeCPU(t *testing.T) {
26+
assert.Equal(t, 0,
27+
largestWholeCPU(corev1.ResourceRequirements{}),
28+
"expected the zero value to be zero")
29+
30+
for _, tt := range []struct {
31+
Name, ResourcesYAML string
32+
Result int
33+
}{
34+
{
35+
Name: "Negatives", ResourcesYAML: `{requests: {cpu: -3}, limits: {cpu: -5}}`,
36+
Result: 0,
37+
},
38+
{
39+
Name: "SmallPositive", ResourcesYAML: `limits: {cpu: 600m}`,
40+
Result: 0,
41+
},
42+
{
43+
Name: "FractionalPositive", ResourcesYAML: `requests: {cpu: 2200m}`,
44+
Result: 2,
45+
},
46+
{
47+
Name: "LargePositive", ResourcesYAML: `limits: {cpu: 10}`,
48+
Result: 10,
49+
},
50+
{
51+
Name: "RequestsAndLimits", ResourcesYAML: `{requests: {cpu: 2}, limits: {cpu: 4}}`,
52+
Result: 4,
53+
},
54+
} {
55+
t.Run(tt.Name, func(t *testing.T) {
56+
var resources corev1.ResourceRequirements
57+
assert.NilError(t, yaml.Unmarshal([]byte(tt.ResourcesYAML), &resources))
58+
assert.Equal(t, tt.Result, largestWholeCPU(resources))
59+
})
60+
}
61+
}
62+
63+
func TestUpgradeCommand(t *testing.T) {
64+
expectScript := func(t *testing.T, script string) {
65+
t.Helper()
66+
67+
t.Run("PrettyYAML", func(t *testing.T) {
68+
b, err := yaml.Marshal(script)
69+
assert.NilError(t, err)
70+
assert.Assert(t, strings.HasPrefix(string(b), `|`),
71+
"expected literal block scalar, got:\n%s", b)
72+
})
73+
}
74+
75+
t.Run("CPUs", func(t *testing.T) {
76+
for _, tt := range []struct {
77+
CPUs int
78+
Jobs string
79+
}{
80+
{CPUs: 0, Jobs: "--jobs=1"},
81+
{CPUs: 1, Jobs: "--jobs=1"},
82+
{CPUs: 2, Jobs: "--jobs=1"},
83+
{CPUs: 3, Jobs: "--jobs=2"},
84+
{CPUs: 10, Jobs: "--jobs=9"},
85+
} {
86+
command := upgradeCommand(10, 11, "", tt.CPUs)
87+
assert.Assert(t, len(command) > 3)
88+
assert.DeepEqual(t, []string{"bash", "-ceu", "--"}, command[:3])
89+
90+
script := command[3]
91+
assert.Assert(t, cmp.Contains(script, tt.Jobs))
92+
93+
expectScript(t, script)
94+
}
95+
})
96+
}
97+
2498
func TestGenerateUpgradeJob(t *testing.T) {
2599
ctx := context.Background()
26100
reconciler := &PGUpgradeReconciler{}
@@ -120,11 +194,11 @@ spec:
120194
echo -e "Step 5: Running pg_upgrade check...\n"
121195
time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \
122196
--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}"\
123-
--new-datadir /pgdata/pg"${new_version}" --link --check
197+
--new-datadir /pgdata/pg"${new_version}" --link --check --jobs=1
124198
echo -e "\nStep 6: Running pg_upgrade...\n"
125199
time /usr/pgsql-"${new_version}"/bin/pg_upgrade --old-bindir /usr/pgsql-"${old_version}"/bin \
126200
--new-bindir /usr/pgsql-"${new_version}"/bin --old-datadir /pgdata/pg"${old_version}" \
127-
--new-datadir /pgdata/pg"${new_version}" --link
201+
--new-datadir /pgdata/pg"${new_version}" --link --jobs=1
128202
echo -e "\nStep 7: Copying patroni.dynamic.json...\n"
129203
cp /pgdata/pg"${old_version}"/patroni.dynamic.json /pgdata/pg"${new_version}"
130204
echo -e "\npg_upgrade Job Complete!"
@@ -149,6 +223,17 @@ spec:
149223
status: {}
150224
`))
151225

226+
t.Run(feature.PGUpgradeCPUConcurrency+"Enabled", func(t *testing.T) {
227+
gate := feature.NewGate()
228+
assert.NilError(t, gate.SetFromMap(map[string]bool{
229+
feature.PGUpgradeCPUConcurrency: true,
230+
}))
231+
ctx := feature.NewContext(context.Background(), gate)
232+
233+
job := reconciler.generateUpgradeJob(ctx, upgrade, startup, "")
234+
assert.Assert(t, cmp.MarshalContains(job, `--jobs=2`))
235+
})
236+
152237
tdeJob := reconciler.generateUpgradeJob(ctx, upgrade, startup, "echo testKey")
153238
assert.Assert(t, cmp.MarshalContains(tdeJob,
154239
`/usr/pgsql-"${new_version}"/bin/initdb -k -D /pgdata/pg"${new_version}" --encryption-key-command "echo testKey"`))

internal/feature/features.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ const (
8383
// Support custom sidecars for pgBouncer Pods
8484
PGBouncerSidecars = "PGBouncerSidecars"
8585

86+
// Adjust PGUpgrade parallelism according to CPU resources
87+
PGUpgradeCPUConcurrency = "PGUpgradeCPUConcurrency"
88+
8689
// Support tablespace volumes
8790
TablespaceVolumes = "TablespaceVolumes"
8891

@@ -95,14 +98,15 @@ func NewGate() MutableGate {
9598
gate := featuregate.NewFeatureGate()
9699

97100
if err := gate.Add(map[Feature]featuregate.FeatureSpec{
98-
AppendCustomQueries: {Default: false, PreRelease: featuregate.Alpha},
99-
AutoCreateUserSchema: {Default: true, PreRelease: featuregate.Beta},
100-
AutoGrowVolumes: {Default: false, PreRelease: featuregate.Alpha},
101-
BridgeIdentifiers: {Default: false, PreRelease: featuregate.Alpha},
102-
InstanceSidecars: {Default: false, PreRelease: featuregate.Alpha},
103-
PGBouncerSidecars: {Default: false, PreRelease: featuregate.Alpha},
104-
TablespaceVolumes: {Default: false, PreRelease: featuregate.Alpha},
105-
VolumeSnapshots: {Default: false, PreRelease: featuregate.Alpha},
101+
AppendCustomQueries: {Default: false, PreRelease: featuregate.Alpha},
102+
AutoCreateUserSchema: {Default: true, PreRelease: featuregate.Beta},
103+
AutoGrowVolumes: {Default: false, PreRelease: featuregate.Alpha},
104+
BridgeIdentifiers: {Default: false, PreRelease: featuregate.Alpha},
105+
InstanceSidecars: {Default: false, PreRelease: featuregate.Alpha},
106+
PGBouncerSidecars: {Default: false, PreRelease: featuregate.Alpha},
107+
PGUpgradeCPUConcurrency: {Default: false, PreRelease: featuregate.Alpha},
108+
TablespaceVolumes: {Default: false, PreRelease: featuregate.Alpha},
109+
VolumeSnapshots: {Default: false, PreRelease: featuregate.Alpha},
106110
}); err != nil {
107111
panic(err)
108112
}

internal/feature/features_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ func TestDefaults(t *testing.T) {
2222
assert.Assert(t, false == gate.Enabled(BridgeIdentifiers))
2323
assert.Assert(t, false == gate.Enabled(InstanceSidecars))
2424
assert.Assert(t, false == gate.Enabled(PGBouncerSidecars))
25+
assert.Assert(t, false == gate.Enabled(PGUpgradeCPUConcurrency))
2526
assert.Assert(t, false == gate.Enabled(TablespaceVolumes))
2627
assert.Assert(t, false == gate.Enabled(VolumeSnapshots))
2728
}

0 commit comments

Comments
 (0)