Skip to content

Commit 1dfc7da

Browse files
committed
feat: allow users to run a preprocessing SQL queries during the promotion stage when PostgreSQL is running and ready to receive commands (#181)
1 parent 14bd989 commit 1dfc7da

File tree

9 files changed

+367
-16
lines changed

9 files changed

+367
-16
lines changed

configs/config.example.logical_generic.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,18 @@ retrieval:
215215

216216
logicalSnapshot:
217217
options:
218+
# Define pre-precessing SQL queries for data patching. For example, "/tmp/scripts/sql".
219+
dataPatching:
220+
# The Docker image to run data patching container.
221+
dockerImage: "sync-instance:12"
222+
223+
queryPreprocessing:
224+
# Path to SQL pre-processing queries. Default: empty string (no pre-processing defined).
225+
queryPath: ""
226+
227+
# Worker limit for parallel queries.
228+
maxParallelWorkers: 2
229+
218230
# It is possible to define a pre-precessing script. For example, "/tmp/scripts/custom.sh".
219231
# Default: empty string (no pre-processing defined).
220232
# This can be used for scrubbing eliminating PII data, to define data masking, etc.

configs/config.example.logical_rds_iam.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,18 @@ retrieval:
216216

217217
logicalSnapshot:
218218
options:
219+
# Define pre-precessing SQL queries for data patching. For example, "/tmp/scripts/sql".
220+
dataPatching:
221+
# The Docker image to run data patching container.
222+
dockerImage: "sync-instance:12"
223+
224+
queryPreprocessing:
225+
# Path to SQL pre-processing queries. Default: empty string (no pre-processing defined).
226+
queryPath: ""
227+
228+
# Worker limit for parallel queries.
229+
maxParallelWorkers: 2
230+
219231
# It is possible to define a pre-precessing script. For example, "/tmp/scripts/custom.sh".
220232
# Default: empty string (no pre-processing defined).
221233
# This can be used for scrubbing eliminating PII data, to define data masking, etc.

configs/config.example.physical_generic.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,15 @@ retrieval:
201201
# Maximum number of health check retries.
202202
maxRetries: 200
203203

204+
# It is possible to define pre-precessing SQL queries. For example, "/tmp/scripts/sql".
205+
# Default: empty string (no pre-processing defined).
206+
queryPreprocessing:
207+
# Path to SQL pre-processing queries.
208+
queryPath: ""
209+
210+
# Worker limit for parallel queries.
211+
maxParallelWorkers: 2
212+
204213
# It is possible to define a pre-precessing script. For example, "/tmp/scripts/custom.sh".
205214
# Default: empty string (no pre-processing defined).
206215
# This can be used for scrubbing eliminating PII data, to define data masking, etc.

configs/config.example.physical_walg.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,15 @@ retrieval:
191191
# Maximum number of health check retries.
192192
maxRetries: 200
193193

194+
# It is possible to define pre-precessing SQL queries. For example, "/tmp/scripts/sql".
195+
# Default: empty string (no pre-processing defined).
196+
queryPreprocessing:
197+
# Path to SQL pre-processing queries.
198+
queryPath: ""
199+
200+
# Worker limit for parallel queries.
201+
maxParallelWorkers: 2
202+
194203
# It is possible to define a pre-precessing script. For example, "/tmp/scripts/custom.sh".
195204
# Default: empty string (no pre-processing defined).
196205
# This can be used for scrubbing eliminating PII data, to define data masking, etc.

pkg/retrieval/engine/postgres/job_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (s *JobBuilder) BuildJob(jobCfg config.JobConfig) (components.JobRunner, er
5555
return physical.NewJob(jobCfg, s.dockerClient, s.globalCfg, s.dbMarker)
5656

5757
case snapshot.LogicalInitialType:
58-
return snapshot.NewLogicalInitialJob(jobCfg, s.cloneManager, s.globalCfg, s.dbMarker)
58+
return snapshot.NewLogicalInitialJob(jobCfg, s.dockerClient, s.cloneManager, s.globalCfg, s.dbMarker)
5959

6060
case snapshot.PhysicalInitialType:
6161
return snapshot.NewPhysicalInitialJob(jobCfg, s.dockerClient, s.cloneManager, s.globalCfg, s.dbMarker)

pkg/retrieval/engine/postgres/snapshot/logical.go

Lines changed: 155 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,42 +7,61 @@ package snapshot
77

88
import (
99
"context"
10+
"fmt"
1011
"path"
1112

13+
"github.com/docker/docker/api/types"
14+
"github.com/docker/docker/api/types/container"
15+
"github.com/docker/docker/api/types/network"
16+
"github.com/docker/docker/client"
1217
"github.com/pkg/errors"
1318

1419
dblabCfg "gitlab.com/postgres-ai/database-lab/pkg/config"
20+
"gitlab.com/postgres-ai/database-lab/pkg/log"
1521
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/config"
1622
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/dbmarker"
1723
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/tools"
24+
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/tools/cont"
25+
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/tools/health"
1826
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/options"
1927
"gitlab.com/postgres-ai/database-lab/pkg/services/provision/databases/postgres/configuration"
2028
"gitlab.com/postgres-ai/database-lab/pkg/services/provision/thinclones"
2129
)
2230

2331
// LogicalInitial describes a job for preparing a logical initial snapshot.
2432
type LogicalInitial struct {
25-
name string
26-
cloneManager thinclones.Manager
27-
options LogicalOptions
28-
globalCfg *dblabCfg.Global
29-
dbMarker *dbmarker.Marker
33+
name string
34+
cloneManager thinclones.Manager
35+
dockerClient *client.Client
36+
options LogicalOptions
37+
globalCfg *dblabCfg.Global
38+
dbMarker *dbmarker.Marker
39+
queryProcessor *queryProcessor
3040
}
3141

3242
// LogicalOptions describes options for a logical initialization job.
3343
type LogicalOptions struct {
44+
DataPatching DataPatching `yaml:"dataPatching"`
3445
PreprocessingScript string `yaml:"preprocessingScript"`
3546
Configs map[string]string `yaml:"configs"`
3647
Schedule Scheduler `yaml:"schedule"`
3748
}
3849

50+
// DataPatching allows executing queries to transform data before snapshot taking.
51+
type DataPatching struct {
52+
DockerImage string `yaml:"dockerImage"`
53+
QueryPreprocessing QueryPreprocessing `yaml:"queryPreprocessing"`
54+
}
55+
3956
const (
4057
// LogicalInitialType declares a job type for preparing a logical initial snapshot.
4158
LogicalInitialType = "logicalSnapshot"
59+
60+
patchContainerPrefix = "dblab_patch_"
4261
)
4362

4463
// NewLogicalInitialJob creates a new logical initial job.
45-
func NewLogicalInitialJob(cfg config.JobConfig, cloneManager thinclones.Manager,
64+
func NewLogicalInitialJob(cfg config.JobConfig, dockerClient *client.Client, cloneManager thinclones.Manager,
4665
global *dblabCfg.Global, marker *dbmarker.Marker) (*LogicalInitial, error) {
4766
li := &LogicalInitial{
4867
name: cfg.Name,
@@ -55,6 +74,12 @@ func NewLogicalInitialJob(cfg config.JobConfig, cloneManager thinclones.Manager,
5574
return nil, errors.Wrap(err, "failed to unmarshal configuration options")
5675
}
5776

77+
if li.options.DataPatching.QueryPreprocessing.QueryPath != "" {
78+
li.queryProcessor = newQueryProcessor(dockerClient, global.Database.Name(), global.Database.User(),
79+
li.options.DataPatching.QueryPreprocessing.QueryPath,
80+
li.options.DataPatching.QueryPreprocessing.MaxParallelWorkers)
81+
}
82+
5883
return li, nil
5984
}
6085

@@ -63,13 +88,18 @@ func (s *LogicalInitial) Name() string {
6388
return s.name
6489
}
6590

91+
// patchContainerName returns container name.
92+
func (s *LogicalInitial) patchContainerName() string {
93+
return patchContainerPrefix + s.globalCfg.InstanceID
94+
}
95+
6696
// Reload reloads job configuration.
6797
func (s *LogicalInitial) Reload(cfg map[string]interface{}) (err error) {
6898
return options.Unmarshal(cfg, &s.options)
6999
}
70100

71101
// Run starts the job.
72-
func (s *LogicalInitial) Run(_ context.Context) error {
102+
func (s *LogicalInitial) Run(ctx context.Context) error {
73103
if s.options.PreprocessingScript != "" {
74104
if err := runPreprocessingScript(s.options.PreprocessingScript); err != nil {
75105
return err
@@ -80,16 +110,24 @@ func (s *LogicalInitial) Run(_ context.Context) error {
80110
return errors.Wrap(err, "failed to create PostgreSQL configuration files")
81111
}
82112

113+
dataDir := s.globalCfg.DataDir()
114+
83115
// Run basic PostgreSQL configuration.
84-
if err := configuration.NewCorrector().Run(s.globalCfg.DataDir()); err != nil {
116+
if err := configuration.NewCorrector().Run(dataDir); err != nil {
85117
return errors.Wrap(err, "failed to adjust PostgreSQL configs")
86118
}
87119

88120
// Apply user defined configs.
89-
if err := applyUsersConfigs(s.options.Configs, path.Join(s.globalCfg.DataDir(), "postgresql.conf")); err != nil {
121+
if err := applyUsersConfigs(s.options.Configs, path.Join(dataDir, "postgresql.conf")); err != nil {
90122
return errors.Wrap(err, "failed to apply user-defined configs")
91123
}
92124

125+
if s.queryProcessor != nil {
126+
if err := s.runPreprocessingQueries(ctx, dataDir); err != nil {
127+
return errors.Wrap(err, "failed to run preprocessing queries")
128+
}
129+
}
130+
93131
dataStateAt := extractDataStateAt(s.dbMarker)
94132

95133
if _, err := s.cloneManager.CreateSnapshot("", dataStateAt); err != nil {
@@ -100,9 +138,115 @@ func (s *LogicalInitial) Run(_ context.Context) error {
100138
}
101139

102140
func (s *LogicalInitial) touchConfigFiles() error {
103-
if err := tools.TouchFile(path.Join(s.globalCfg.DataDir(), "postgresql.conf")); err != nil {
141+
dataDir := s.globalCfg.DataDir()
142+
143+
if err := tools.TouchFile(path.Join(dataDir, "postgresql.conf")); err != nil {
104144
return err
105145
}
106146

107-
return tools.TouchFile(path.Join(s.globalCfg.DataDir(), "pg_hba.conf"))
147+
return tools.TouchFile(path.Join(dataDir, "pg_hba.conf"))
148+
}
149+
150+
func (s *LogicalInitial) runPreprocessingQueries(ctx context.Context, dataDir string) error {
151+
pgVersion, err := tools.DetectPGVersion(dataDir)
152+
if err != nil {
153+
return errors.Wrap(err, "failed to detect the Postgres version")
154+
}
155+
156+
patchImage := s.options.DataPatching.DockerImage
157+
if patchImage == "" {
158+
patchImage = fmt.Sprintf("postgresai/sync-instance:%s", pgVersion)
159+
}
160+
161+
if err := tools.PullImage(ctx, s.dockerClient, patchImage); err != nil {
162+
return errors.Wrap(err, "failed to scan image pulling response")
163+
}
164+
165+
pwd, err := tools.GeneratePassword()
166+
if err != nil {
167+
return errors.Wrap(err, "failed to generate PostgreSQL password")
168+
}
169+
170+
hostConfig, err := s.buildHostConfig(ctx)
171+
if err != nil {
172+
return errors.Wrap(err, "failed to build container host config")
173+
}
174+
175+
// Run patch container.
176+
patchCont, err := s.dockerClient.ContainerCreate(ctx,
177+
s.buildContainerConfig(dataDir, patchImage, pwd),
178+
hostConfig,
179+
&network.NetworkingConfig{},
180+
s.patchContainerName(),
181+
)
182+
if err != nil {
183+
return errors.Wrap(err, "failed to create container")
184+
}
185+
186+
defer tools.RemoveContainer(ctx, s.dockerClient, patchCont.ID, cont.StopPhysicalTimeout)
187+
188+
defer func() {
189+
if err != nil {
190+
tools.PrintContainerLogs(ctx, s.dockerClient, s.patchContainerName())
191+
}
192+
}()
193+
194+
log.Msg(fmt.Sprintf("Running container: %s. ID: %v", s.patchContainerName(), patchCont.ID))
195+
196+
if err := s.dockerClient.ContainerStart(ctx, patchCont.ID, types.ContainerStartOptions{}); err != nil {
197+
return errors.Wrap(err, "failed to start container")
198+
}
199+
200+
log.Msg("Starting PostgreSQL")
201+
log.Msg(fmt.Sprintf("View logs using the command: %s %s", tools.ViewLogsCmd, s.patchContainerName()))
202+
203+
// Start PostgreSQL instance.
204+
if err := tools.RunPostgres(ctx, s.dockerClient, patchCont.ID, dataDir); err != nil {
205+
return errors.Wrap(err, "failed to start PostgreSQL instance")
206+
}
207+
208+
log.Msg("Waiting for PostgreSQL is ready")
209+
210+
if err := tools.CheckContainerReadiness(ctx, s.dockerClient, patchCont.ID); err != nil {
211+
return errors.Wrap(err, "failed to readiness check")
212+
}
213+
214+
if err := s.queryProcessor.applyPreprocessingQueries(ctx, patchCont.ID); err != nil {
215+
return errors.Wrap(err, "failed to run preprocessing queries")
216+
}
217+
218+
return nil
219+
}
220+
221+
func (s *LogicalInitial) buildHostConfig(ctx context.Context) (*container.HostConfig, error) {
222+
hostConfig := &container.HostConfig{}
223+
224+
if err := tools.AddVolumesToHostConfig(ctx, s.dockerClient, hostConfig, s.globalCfg.DataDir()); err != nil {
225+
return nil, err
226+
}
227+
228+
return hostConfig, nil
229+
}
230+
231+
func (s *LogicalInitial) buildContainerConfig(clonePath, patchImage, password string) *container.Config {
232+
hcInterval := health.DefaultRestoreInterval
233+
hcRetries := health.DefaultRestoreRetries
234+
235+
return &container.Config{
236+
Labels: map[string]string{
237+
cont.DBLabControlLabel: cont.DBLabPatchLabel,
238+
cont.DBLabInstanceIDLabel: s.globalCfg.InstanceID,
239+
},
240+
Env: []string{
241+
"PGDATA=" + clonePath,
242+
"POSTGRES_PASSWORD=" + password,
243+
},
244+
Image: patchImage,
245+
Healthcheck: health.GetConfig(
246+
s.globalCfg.Database.User(),
247+
s.globalCfg.Database.Name(),
248+
health.OptionInterval(hcInterval),
249+
health.OptionRetries(hcRetries),
250+
),
251+
}
108252
}

pkg/retrieval/engine/postgres/snapshot/physical.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type PhysicalInitial struct {
7171
scheduler *cron.Cron
7272
schedulerCtx context.Context
7373
promotionMutex sync.Mutex
74+
queryProcessor *queryProcessor
7475
}
7576

7677
// PhysicalOptions describes options for a physical initialization job.
@@ -86,9 +87,10 @@ type PhysicalOptions struct {
8687

8788
// Promotion describes promotion options.
8889
type Promotion struct {
89-
Enabled bool `yaml:"enabled"`
90-
DockerImage string `yaml:"dockerImage"`
91-
HealthCheck HealthCheck `yaml:"healthCheck"`
90+
Enabled bool `yaml:"enabled"`
91+
DockerImage string `yaml:"dockerImage"`
92+
HealthCheck HealthCheck `yaml:"healthCheck"`
93+
QueryPreprocessing QueryPreprocessing `yaml:"queryPreprocessing"`
9294
}
9395

9496
// HealthCheck describes health check options of a promotion.
@@ -109,6 +111,12 @@ type ScheduleSpec struct {
109111
Limit int `yaml:"limit"`
110112
}
111113

114+
// QueryPreprocessing defines query preprocessing options.
115+
type QueryPreprocessing struct {
116+
QueryPath string `yaml:"queryPath"`
117+
MaxParallelWorkers int `yaml:"maxParallelWorkers"`
118+
}
119+
112120
// NewPhysicalInitialJob creates a new physical initial job.
113121
func NewPhysicalInitialJob(cfg config.JobConfig, docker *client.Client, cloneManager thinclones.Manager,
114122
global *dblabCfg.Global, marker *dbmarker.Marker) (*PhysicalInitial, error) {
@@ -129,6 +137,12 @@ func NewPhysicalInitialJob(cfg config.JobConfig, docker *client.Client, cloneMan
129137
return nil, errors.Wrap(err, "invalid physicalSnapshot configuration")
130138
}
131139

140+
if p.options.Promotion.QueryPreprocessing.QueryPath != "" {
141+
p.queryProcessor = newQueryProcessor(docker, global.Database.Name(), global.Database.User(),
142+
p.options.Promotion.QueryPreprocessing.QueryPath,
143+
p.options.Promotion.QueryPreprocessing.MaxParallelWorkers)
144+
}
145+
132146
p.setupScheduler()
133147

134148
return p, nil
@@ -261,7 +275,7 @@ func (p *PhysicalInitial) run(ctx context.Context) (err error) {
261275
cloneName := fmt.Sprintf("clone%s_%s", pre, preDataStateAt)
262276

263277
defer func() {
264-
if _, ok := err.(*skipSnapshotErr); ok {
278+
if _, ok := errors.Cause(err).(*skipSnapshotErr); ok {
265279
log.Msg(err.Error())
266280
err = nil
267281
}
@@ -492,6 +506,12 @@ func (p *PhysicalInitial) promoteInstance(ctx context.Context, clonePath string)
492506
}
493507
}
494508

509+
if p.queryProcessor != nil {
510+
if err := p.queryProcessor.applyPreprocessingQueries(ctx, promoteCont.ID); err != nil {
511+
return errors.Wrap(err, "failed to run preprocessing queries")
512+
}
513+
}
514+
495515
// Checkpoint.
496516
if err := p.checkpoint(ctx, promoteCont.ID); err != nil {
497517
return err

0 commit comments

Comments
 (0)