Skip to content

Commit ce960e8

Browse files
Create new databases and change owners of existing ones during sync. (zalando#153)
* Create new databases and change owners of existing ones during sync.
1 parent d3679bf commit ce960e8

File tree

4 files changed

+112
-47
lines changed

4 files changed

+112
-47
lines changed

pkg/cluster/cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,12 @@ func (c *Cluster) Create() error {
255255
if err = c.createRoles(); err != nil {
256256
return fmt.Errorf("could not create users: %v", err)
257257
}
258+
c.logger.Infof("users have been successfully created")
258259

259260
if err = c.createDatabases(); err != nil {
260261
return fmt.Errorf("could not create databases: %v", err)
261262
}
262-
263-
c.logger.Infof("users have been successfully created")
263+
c.logger.Infof("databases have been successfully created")
264264
} else {
265265
if c.masterLess {
266266
c.logger.Warnln("cluster is masterless")

pkg/cluster/pg.go

Lines changed: 34 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ const (
2525
WHERE a.rolname = ANY($1)
2626
ORDER BY 1;`
2727

28-
getDatabasesSQL = `SELECT datname, a.rolname AS owner FROM pg_database d INNER JOIN pg_authid a ON a.oid = d.datdba;`
29-
createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";`
28+
getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;`
29+
createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";`
30+
alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";`
3031
)
3132

3233
func (c *Cluster) pgConnectionString() string {
@@ -137,22 +138,15 @@ func (c *Cluster) readPgUsersFromDatabase(userNames []string) (users spec.PgUser
137138
return users, nil
138139
}
139140

141+
// getDatabases returns the map of current databases with owners
142+
// The caller is responsible for opening and closing the database connection
140143
func (c *Cluster) getDatabases() (map[string]string, error) {
141144
var (
142145
rows *sql.Rows
143146
err error
144147
)
145148
dbs := make(map[string]string)
146149

147-
if err = c.initDbConn(); err != nil {
148-
return nil, fmt.Errorf("could not init db connection")
149-
}
150-
defer func() {
151-
if err = c.closeDbConn(); err != nil {
152-
c.logger.Errorf("could not close db connection: %v", err)
153-
}
154-
}()
155-
156150
if rows, err = c.pgDb.Query(getDatabasesSQL); err != nil {
157151
return nil, fmt.Errorf("could not query database: %v", err)
158152
}
@@ -176,49 +170,44 @@ func (c *Cluster) getDatabases() (map[string]string, error) {
176170
return dbs, nil
177171
}
178172

179-
func (c *Cluster) createDatabases() error {
180-
c.setProcessName("creating databases")
181-
182-
newDbs := c.Spec.Databases
183-
curDbs, err := c.getDatabases()
184-
if err != nil {
185-
return fmt.Errorf("could not get current databases: %v", err)
173+
// executeCreateDatabase creates new database with the given owner.
174+
// The caller is responsible for openinging and closing the database connection.
175+
func (c *Cluster) executeCreateDatabase(datname, owner string) error {
176+
if !c.databaseNameOwnerValid(datname, owner) {
177+
return nil
186178
}
187-
for datname := range curDbs {
188-
delete(newDbs, datname)
179+
c.logger.Infof("creating database %q with owner %q", datname, owner)
180+
181+
if _, err := c.pgDb.Query(fmt.Sprintf(createDatabaseSQL, datname, owner)); err != nil {
182+
return fmt.Errorf("could not execute create database: %v", err)
189183
}
184+
return nil
185+
}
190186

191-
if len(newDbs) == 0 {
187+
// executeCreateDatabase changes the owner of the given database.
188+
// The caller is responsible for openinging and closing the database connection.
189+
func (c *Cluster) executeAlterDatabaseOwner(datname string, owner string) error {
190+
if !c.databaseNameOwnerValid(datname, owner) {
192191
return nil
193192
}
194-
195-
if err = c.initDbConn(); err != nil {
196-
return fmt.Errorf("could not init database connection")
193+
c.logger.Infof("changing database %q owner to %q", datname, owner)
194+
if _, err := c.pgDb.Query(fmt.Sprintf(alterDatabaseOwnerSQL, datname, owner)); err != nil {
195+
return fmt.Errorf("could not execute alter database owner: %v", err)
197196
}
198-
defer func() {
199-
if err = c.closeDbConn(); err != nil {
200-
c.logger.Errorf("could not close database connection: %v", err)
201-
}
202-
}()
203-
204-
for datname, owner := range newDbs {
205-
if _, ok := c.pgUsers[owner]; !ok {
206-
c.logger.Infof("skipping creation of the %q database, user %q does not exist", datname, owner)
207-
continue
208-
}
209-
210-
if !databaseNameRegexp.MatchString(datname) {
211-
c.logger.Infof("database %q has invalid name", datname)
212-
continue
213-
}
214-
c.logger.Infof("creating database %q with owner %q", datname, owner)
197+
return nil
198+
}
215199

216-
if _, err = c.pgDb.Query(fmt.Sprintf(createDatabaseSQL, datname, owner)); err != nil {
217-
return fmt.Errorf("could not query database: %v", err)
218-
}
200+
func (c *Cluster) databaseNameOwnerValid(datname, owner string) bool {
201+
if _, ok := c.pgUsers[owner]; !ok {
202+
c.logger.Infof("skipping creation of the %q database, user %q does not exist", datname, owner)
203+
return false
219204
}
220205

221-
return nil
206+
if !databaseNameRegexp.MatchString(datname) {
207+
c.logger.Infof("database %q has invalid name", datname)
208+
return false
209+
}
210+
return true
222211
}
223212

224213
func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin bool) (result []string) {

pkg/cluster/resources.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,3 +557,27 @@ func (c *Cluster) GetStatefulSet() *v1beta1.StatefulSet {
557557
func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget {
558558
return c.PodDisruptionBudget
559559
}
560+
561+
func (c *Cluster) createDatabases() error {
562+
c.setProcessName("creating databases")
563+
564+
if len(c.Spec.Databases) == 0 {
565+
return nil
566+
}
567+
568+
if err := c.initDbConn(); err != nil {
569+
return fmt.Errorf("could not init database connection")
570+
}
571+
defer func() {
572+
if err := c.closeDbConn(); err != nil {
573+
c.logger.Errorf("could not close database connection: %v", err)
574+
}
575+
}()
576+
577+
for datname, owner := range c.Spec.Databases {
578+
if err := c.executeCreateDatabase(datname, owner); err != nil {
579+
return err
580+
}
581+
}
582+
return nil
583+
}

pkg/cluster/sync.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ func (c *Cluster) Sync(newSpec *spec.Postgresql) (err error) {
9090
err = fmt.Errorf("could not sync roles: %v", err)
9191
return
9292
}
93+
c.logger.Debugf("syncing databases")
94+
if err = c.syncDatabases(); err != nil {
95+
err = fmt.Errorf("could not sync databases: %v", err)
96+
return
97+
}
9398
}
9499

95100
c.logger.Debugf("syncing persistent volumes")
@@ -292,3 +297,50 @@ func (c *Cluster) samePDBWith(pdb *policybeta1.PodDisruptionBudget) (match bool,
292297

293298
return
294299
}
300+
301+
func (c *Cluster) syncDatabases() error {
302+
c.setProcessName("syncing databases")
303+
304+
createDatabases := make(map[string]string)
305+
alterOwnerDatabases := make(map[string]string)
306+
307+
if err := c.initDbConn(); err != nil {
308+
return fmt.Errorf("could not init database connection")
309+
}
310+
defer func() {
311+
if err := c.closeDbConn(); err != nil {
312+
c.logger.Errorf("could not close database connection: %v", err)
313+
}
314+
}()
315+
316+
currentDatabases, err := c.getDatabases()
317+
if err != nil {
318+
return fmt.Errorf("could not get current databases: %v", err)
319+
}
320+
321+
for datname, newOwner := range c.Spec.Databases {
322+
currentOwner, exists := currentDatabases[datname]
323+
if !exists {
324+
createDatabases[datname] = newOwner
325+
} else if currentOwner != newOwner {
326+
alterOwnerDatabases[datname] = newOwner
327+
}
328+
}
329+
330+
if len(createDatabases)+len(alterOwnerDatabases) == 0 {
331+
return nil
332+
}
333+
334+
for datname, owner := range createDatabases {
335+
if err = c.executeCreateDatabase(datname, owner); err != nil {
336+
return err
337+
}
338+
}
339+
for datname, owner := range alterOwnerDatabases {
340+
if err = c.executeAlterDatabaseOwner(datname, owner); err != nil {
341+
return err
342+
}
343+
}
344+
345+
return nil
346+
}

0 commit comments

Comments
 (0)