@@ -7,42 +7,61 @@ package snapshot
7
7
8
8
import (
9
9
"context"
10
+ "fmt"
10
11
"path"
11
12
13
+ "github.com/docker/docker/api/types"
14
+ "github.com/docker/docker/api/types/container"
15
+ "github.com/docker/docker/api/types/network"
16
+ "github.com/docker/docker/client"
12
17
"github.com/pkg/errors"
13
18
14
19
dblabCfg "gitlab.com/postgres-ai/database-lab/pkg/config"
20
+ "gitlab.com/postgres-ai/database-lab/pkg/log"
15
21
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/config"
16
22
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/dbmarker"
17
23
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/tools"
24
+ "gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/tools/cont"
25
+ "gitlab.com/postgres-ai/database-lab/pkg/retrieval/engine/postgres/tools/health"
18
26
"gitlab.com/postgres-ai/database-lab/pkg/retrieval/options"
19
27
"gitlab.com/postgres-ai/database-lab/pkg/services/provision/databases/postgres/configuration"
20
28
"gitlab.com/postgres-ai/database-lab/pkg/services/provision/thinclones"
21
29
)
22
30
23
31
// LogicalInitial describes a job for preparing a logical initial snapshot.
24
32
type LogicalInitial struct {
25
- name string
26
- cloneManager thinclones.Manager
27
- options LogicalOptions
28
- globalCfg * dblabCfg.Global
29
- dbMarker * dbmarker.Marker
33
+ name string
34
+ cloneManager thinclones.Manager
35
+ dockerClient * client.Client
36
+ options LogicalOptions
37
+ globalCfg * dblabCfg.Global
38
+ dbMarker * dbmarker.Marker
39
+ queryProcessor * queryProcessor
30
40
}
31
41
32
42
// LogicalOptions describes options for a logical initialization job.
33
43
type LogicalOptions struct {
44
+ DataPatching DataPatching `yaml:"dataPatching"`
34
45
PreprocessingScript string `yaml:"preprocessingScript"`
35
46
Configs map [string ]string `yaml:"configs"`
36
47
Schedule Scheduler `yaml:"schedule"`
37
48
}
38
49
50
+ // DataPatching allows executing queries to transform data before snapshot taking.
51
+ type DataPatching struct {
52
+ DockerImage string `yaml:"dockerImage"`
53
+ QueryPreprocessing QueryPreprocessing `yaml:"queryPreprocessing"`
54
+ }
55
+
39
56
const (
40
57
// LogicalInitialType declares a job type for preparing a logical initial snapshot.
41
58
LogicalInitialType = "logicalSnapshot"
59
+
60
+ patchContainerPrefix = "dblab_patch_"
42
61
)
43
62
44
63
// NewLogicalInitialJob creates a new logical initial job.
45
- func NewLogicalInitialJob (cfg config.JobConfig , cloneManager thinclones.Manager ,
64
+ func NewLogicalInitialJob (cfg config.JobConfig , dockerClient * client. Client , cloneManager thinclones.Manager ,
46
65
global * dblabCfg.Global , marker * dbmarker.Marker ) (* LogicalInitial , error ) {
47
66
li := & LogicalInitial {
48
67
name : cfg .Name ,
@@ -55,6 +74,12 @@ func NewLogicalInitialJob(cfg config.JobConfig, cloneManager thinclones.Manager,
55
74
return nil , errors .Wrap (err , "failed to unmarshal configuration options" )
56
75
}
57
76
77
+ if li .options .DataPatching .QueryPreprocessing .QueryPath != "" {
78
+ li .queryProcessor = newQueryProcessor (dockerClient , global .Database .Name (), global .Database .User (),
79
+ li .options .DataPatching .QueryPreprocessing .QueryPath ,
80
+ li .options .DataPatching .QueryPreprocessing .MaxParallelWorkers )
81
+ }
82
+
58
83
return li , nil
59
84
}
60
85
@@ -63,13 +88,18 @@ func (s *LogicalInitial) Name() string {
63
88
return s .name
64
89
}
65
90
91
+ // patchContainerName returns container name.
92
+ func (s * LogicalInitial ) patchContainerName () string {
93
+ return patchContainerPrefix + s .globalCfg .InstanceID
94
+ }
95
+
66
96
// Reload reloads job configuration.
67
97
func (s * LogicalInitial ) Reload (cfg map [string ]interface {}) (err error ) {
68
98
return options .Unmarshal (cfg , & s .options )
69
99
}
70
100
71
101
// Run starts the job.
72
- func (s * LogicalInitial ) Run (_ context.Context ) error {
102
+ func (s * LogicalInitial ) Run (ctx context.Context ) error {
73
103
if s .options .PreprocessingScript != "" {
74
104
if err := runPreprocessingScript (s .options .PreprocessingScript ); err != nil {
75
105
return err
@@ -80,16 +110,24 @@ func (s *LogicalInitial) Run(_ context.Context) error {
80
110
return errors .Wrap (err , "failed to create PostgreSQL configuration files" )
81
111
}
82
112
113
+ dataDir := s .globalCfg .DataDir ()
114
+
83
115
// Run basic PostgreSQL configuration.
84
- if err := configuration .NewCorrector ().Run (s . globalCfg . DataDir () ); err != nil {
116
+ if err := configuration .NewCorrector ().Run (dataDir ); err != nil {
85
117
return errors .Wrap (err , "failed to adjust PostgreSQL configs" )
86
118
}
87
119
88
120
// Apply user defined configs.
89
- if err := applyUsersConfigs (s .options .Configs , path .Join (s . globalCfg . DataDir () , "postgresql.conf" )); err != nil {
121
+ if err := applyUsersConfigs (s .options .Configs , path .Join (dataDir , "postgresql.conf" )); err != nil {
90
122
return errors .Wrap (err , "failed to apply user-defined configs" )
91
123
}
92
124
125
+ if s .queryProcessor != nil {
126
+ if err := s .runPreprocessingQueries (ctx , dataDir ); err != nil {
127
+ return errors .Wrap (err , "failed to run preprocessing queries" )
128
+ }
129
+ }
130
+
93
131
dataStateAt := extractDataStateAt (s .dbMarker )
94
132
95
133
if _ , err := s .cloneManager .CreateSnapshot ("" , dataStateAt ); err != nil {
@@ -100,9 +138,115 @@ func (s *LogicalInitial) Run(_ context.Context) error {
100
138
}
101
139
102
140
func (s * LogicalInitial ) touchConfigFiles () error {
103
- if err := tools .TouchFile (path .Join (s .globalCfg .DataDir (), "postgresql.conf" )); err != nil {
141
+ dataDir := s .globalCfg .DataDir ()
142
+
143
+ if err := tools .TouchFile (path .Join (dataDir , "postgresql.conf" )); err != nil {
104
144
return err
105
145
}
106
146
107
- return tools .TouchFile (path .Join (s .globalCfg .DataDir (), "pg_hba.conf" ))
147
+ return tools .TouchFile (path .Join (dataDir , "pg_hba.conf" ))
148
+ }
149
+
150
+ func (s * LogicalInitial ) runPreprocessingQueries (ctx context.Context , dataDir string ) error {
151
+ pgVersion , err := tools .DetectPGVersion (dataDir )
152
+ if err != nil {
153
+ return errors .Wrap (err , "failed to detect the Postgres version" )
154
+ }
155
+
156
+ patchImage := s .options .DataPatching .DockerImage
157
+ if patchImage == "" {
158
+ patchImage = fmt .Sprintf ("postgresai/sync-instance:%s" , pgVersion )
159
+ }
160
+
161
+ if err := tools .PullImage (ctx , s .dockerClient , patchImage ); err != nil {
162
+ return errors .Wrap (err , "failed to scan image pulling response" )
163
+ }
164
+
165
+ pwd , err := tools .GeneratePassword ()
166
+ if err != nil {
167
+ return errors .Wrap (err , "failed to generate PostgreSQL password" )
168
+ }
169
+
170
+ hostConfig , err := s .buildHostConfig (ctx )
171
+ if err != nil {
172
+ return errors .Wrap (err , "failed to build container host config" )
173
+ }
174
+
175
+ // Run patch container.
176
+ patchCont , err := s .dockerClient .ContainerCreate (ctx ,
177
+ s .buildContainerConfig (dataDir , patchImage , pwd ),
178
+ hostConfig ,
179
+ & network.NetworkingConfig {},
180
+ s .patchContainerName (),
181
+ )
182
+ if err != nil {
183
+ return errors .Wrap (err , "failed to create container" )
184
+ }
185
+
186
+ defer tools .RemoveContainer (ctx , s .dockerClient , patchCont .ID , cont .StopPhysicalTimeout )
187
+
188
+ defer func () {
189
+ if err != nil {
190
+ tools .PrintContainerLogs (ctx , s .dockerClient , s .patchContainerName ())
191
+ }
192
+ }()
193
+
194
+ log .Msg (fmt .Sprintf ("Running container: %s. ID: %v" , s .patchContainerName (), patchCont .ID ))
195
+
196
+ if err := s .dockerClient .ContainerStart (ctx , patchCont .ID , types.ContainerStartOptions {}); err != nil {
197
+ return errors .Wrap (err , "failed to start container" )
198
+ }
199
+
200
+ log .Msg ("Starting PostgreSQL" )
201
+ log .Msg (fmt .Sprintf ("View logs using the command: %s %s" , tools .ViewLogsCmd , s .patchContainerName ()))
202
+
203
+ // Start PostgreSQL instance.
204
+ if err := tools .RunPostgres (ctx , s .dockerClient , patchCont .ID , dataDir ); err != nil {
205
+ return errors .Wrap (err , "failed to start PostgreSQL instance" )
206
+ }
207
+
208
+ log .Msg ("Waiting for PostgreSQL is ready" )
209
+
210
+ if err := tools .CheckContainerReadiness (ctx , s .dockerClient , patchCont .ID ); err != nil {
211
+ return errors .Wrap (err , "failed to readiness check" )
212
+ }
213
+
214
+ if err := s .queryProcessor .applyPreprocessingQueries (ctx , patchCont .ID ); err != nil {
215
+ return errors .Wrap (err , "failed to run preprocessing queries" )
216
+ }
217
+
218
+ return nil
219
+ }
220
+
221
+ func (s * LogicalInitial ) buildHostConfig (ctx context.Context ) (* container.HostConfig , error ) {
222
+ hostConfig := & container.HostConfig {}
223
+
224
+ if err := tools .AddVolumesToHostConfig (ctx , s .dockerClient , hostConfig , s .globalCfg .DataDir ()); err != nil {
225
+ return nil , err
226
+ }
227
+
228
+ return hostConfig , nil
229
+ }
230
+
231
+ func (s * LogicalInitial ) buildContainerConfig (clonePath , patchImage , password string ) * container.Config {
232
+ hcInterval := health .DefaultRestoreInterval
233
+ hcRetries := health .DefaultRestoreRetries
234
+
235
+ return & container.Config {
236
+ Labels : map [string ]string {
237
+ cont .DBLabControlLabel : cont .DBLabPatchLabel ,
238
+ cont .DBLabInstanceIDLabel : s .globalCfg .InstanceID ,
239
+ },
240
+ Env : []string {
241
+ "PGDATA=" + clonePath ,
242
+ "POSTGRES_PASSWORD=" + password ,
243
+ },
244
+ Image : patchImage ,
245
+ Healthcheck : health .GetConfig (
246
+ s .globalCfg .Database .User (),
247
+ s .globalCfg .Database .Name (),
248
+ health .OptionInterval (hcInterval ),
249
+ health .OptionRetries (hcRetries ),
250
+ ),
251
+ }
108
252
}
0 commit comments