Skip to content

Commit 32d7514

Browse files
committed
improve create_partitions_bg_worker(), introduce drop_triggers()
1 parent 97c31cd commit 32d7514

File tree

9 files changed

+95
-115
lines changed

9 files changed

+95
-115
lines changed

init.sql

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,18 +119,11 @@ LANGUAGE plpgsql;
119119
CREATE OR REPLACE FUNCTION @extschema@.disable_partitioning(IN relation TEXT)
120120
RETURNS VOID AS
121121
$$
122-
DECLARE
123-
parttype INTEGER;
124122
BEGIN
125123
relation := @extschema@.validate_relname(relation);
126-
parttype := parttype FROM pathman_config WHERE relname = relation;
127124

128125
DELETE FROM @extschema@.pathman_config WHERE relname = relation;
129-
IF parttype = 1 THEN
130-
PERFORM @extschema@.drop_triggers(relation);
131-
ELSIF parttype = 2 THEN
132-
PERFORM @extschema@.drop_triggers(relation);
133-
END IF;
126+
PERFORM @extschema@.drop_triggers(relation);
134127

135128
/* Notify backend about changes */
136129
PERFORM on_remove_partitions(relation::regclass::integer);

range.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1194,7 +1194,7 @@ BEGIN
11941194
RAISE NOTICE 'partition % created', v_part;
11951195
END LOOP;
11961196
ELSE
1197-
RAISE NOTICE 'Not implemented yet';
1197+
RAISE EXCEPTION 'Could not create partition';
11981198
END IF;
11991199

12001200
IF i > 0 THEN

src/init.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,11 @@ load_relations_hashtable(bool reinitialize)
198198
TupleDesc tupdesc = SPI_tuptable->tupdesc;
199199
SPITupleTable *tuptable = SPI_tuptable;
200200

201-
for (i=0; i<proc; i++)
201+
for (i = 0; i < proc; i++)
202202
{
203203
RelationKey key;
204204
HeapTuple tuple = tuptable->vals[i];
205-
int oid = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 1, &isnull));
205+
Oid oid = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 1, &isnull));
206206

207207
key.dbid = MyDatabaseId;
208208
key.relid = oid;

src/partition_filter.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,9 @@ partition_filter_exec(CustomScanState *node)
169169
elog(ERROR, "PartitionFilter selected more than one partition");
170170
else if (nparts == 0)
171171
{
172-
selected_partid = add_missing_partition(state->partitioned_table,
173-
&state->temp_const);
172+
selected_partid = create_partitions_bg_worker(state->partitioned_table,
173+
state->temp_const.constvalue,
174+
state->temp_const.consttype);
174175

175176
refresh_walker_context_ranges(&state->wcxt);
176177
}

src/pathman.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ search_rangerel_result search_range_partition_eq(const Datum value,
229229
const RangeRelation *rangerel,
230230
RangeEntry *out_rentry);
231231
char *get_extension_schema(void);
232-
Oid create_partitions_bg_worker(Oid relid, Datum value, Oid value_type, bool *crashed);
232+
Oid create_partitions_bg_worker(Oid relid, Datum value, Oid value_type);
233233
Oid create_partitions(Oid relid, Datum value, Oid value_type, bool *crashed);
234234

235235
void handle_modification_query(Query *parse);

src/pl_funcs.c

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,6 @@ find_or_create_range_partition(PG_FUNCTION_ARGS)
121121
else
122122
{
123123
Oid child_oid;
124-
bool crashed = false;
125-
126-
/* Lock config before appending new partitions */
127-
LWLockAcquire(pmstate->load_config_lock, LW_EXCLUSIVE);
128-
129-
/* Restrict concurrent partition creation */
130-
LWLockAcquire(pmstate->edit_partitions_lock, LW_EXCLUSIVE);
131124

132125
/*
133126
* Check if someone else has already created partition.
@@ -136,26 +129,12 @@ find_or_create_range_partition(PG_FUNCTION_ARGS)
136129
rangerel, &found_re);
137130
if (search_state == SEARCH_RANGEREL_FOUND)
138131
{
139-
LWLockRelease(pmstate->edit_partitions_lock);
140-
LWLockRelease(pmstate->load_config_lock);
141132
PG_RETURN_OID(found_re.child_oid);
142133
}
143-
else
144-
Assert(search_state != SEARCH_RANGEREL_GAP);
145134

146135
/* Start background worker to create new partitions */
147-
child_oid = create_partitions_bg_worker(relid, value, value_type, &crashed);
148-
149-
/* Release locks */
150-
if (!crashed)
151-
{
152-
LWLockRelease(pmstate->edit_partitions_lock);
153-
LWLockRelease(pmstate->load_config_lock);
154-
}
136+
child_oid = create_partitions_bg_worker(relid, value, value_type);
155137

156-
/* Repeat binary search */
157-
Assert(SEARCH_RANGEREL_FOUND == search_range_partition_eq(value, &cmp_func,
158-
rangerel, NULL));
159138
PG_RETURN_OID(child_oid);
160139
}
161140
}

src/utils.c

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ check_rinfo_for_partitioned_attr(List *rinfo, Index varno, AttrNumber varattno)
153153
}
154154

155155
/*
156-
* Append trigger info contained in 'more' to 'src'.
156+
* Append trigger info contained in 'more' to 'src', both remain unmodified.
157157
*
158158
* This allows us to execute some of main table's triggers on children.
159159
* See ExecInsert() for more details.
@@ -227,28 +227,6 @@ append_trigger_descs(TriggerDesc *src, TriggerDesc *more, bool *grown_up)
227227
return new_desc;
228228
}
229229

230-
Oid
231-
add_missing_partition(Oid partitioned_table, Const *value)
232-
{
233-
bool crashed;
234-
Oid result = InvalidOid;
235-
236-
SPI_connect();
237-
PushActiveSnapshot(GetTransactionSnapshot());
238-
239-
/* Create partitions */
240-
result = create_partitions(partitioned_table,
241-
value->constvalue,
242-
value->consttype,
243-
&crashed);
244-
245-
/* Cleanup */
246-
SPI_finish();
247-
PopActiveSnapshot();
248-
249-
return result;
250-
}
251-
252230
/*
253231
* Get BTORDER_PROC for two types described by Oids
254232
*/

src/utils.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ TriggerDesc * append_trigger_descs(TriggerDesc *src,
3131
TriggerDesc *more,
3232
bool *grown_up);
3333

34-
Oid add_missing_partition(Oid partitioned_table, Const *value);
35-
3634
void fill_type_cmp_fmgr_info(FmgrInfo *finfo,
3735
Oid type1,
3836
Oid type2);

src/worker.c

Lines changed: 85 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,31 @@ typedef struct PartitionArgs
4545
* waits till it finishes the job and returns the result (new partition oid)
4646
*/
4747
Oid
48-
create_partitions_bg_worker(Oid relid, Datum value, Oid value_type, bool *crashed)
48+
create_partitions_bg_worker(Oid relid, Datum value, Oid value_type)
4949
{
50+
#define HandleError(condition, new_state) \
51+
if (condition) { exec_state = (new_state); goto handle_exec_state; }
52+
53+
/* Execution state to be checked */
54+
enum
55+
{
56+
BGW_OK = 0, /* everything is fine (default) */
57+
BGW_COULD_NOT_START, /* could not start worker */
58+
BGW_PM_DIED, /* postmaster died */
59+
BGW_CRASHED /* worker crashed */
60+
} exec_state = BGW_OK;
61+
5062
BackgroundWorker worker;
51-
BackgroundWorkerHandle *worker_handle;
52-
BgwHandleStatus status;
53-
dsm_segment *segment;
54-
dsm_handle segment_handle;
55-
pid_t pid;
56-
PartitionArgs *args;
57-
Oid child_oid;
58-
TypeCacheEntry *tce;
63+
BackgroundWorkerHandle *bgw_handle;
64+
BgwHandleStatus bgw_status;
65+
bool bgw_started;
66+
dsm_segment *segment;
67+
dsm_handle segment_handle;
68+
pid_t pid;
69+
PartitionArgs *args;
70+
TypeCacheEntry *tce;
71+
Oid child_oid = InvalidOid;
72+
5973

6074
/* Create a dsm segment for the worker to pass arguments */
6175
segment = dsm_create(sizeof(PartitionArgs), 0);
@@ -84,34 +98,53 @@ create_partitions_bg_worker(Oid relid, Datum value, Oid value_type, bool *crashe
8498
worker.bgw_main_arg = Int32GetDatum(segment_handle);
8599
worker.bgw_notify_pid = MyProcPid;
86100

101+
LWLockAcquire(pmstate->load_config_lock, LW_EXCLUSIVE);
102+
LWLockAcquire(pmstate->edit_partitions_lock, LW_EXCLUSIVE);
103+
87104
/* Start dynamic worker */
88-
if (!RegisterDynamicBackgroundWorker(&worker, &worker_handle))
89-
{
90-
elog(WARNING, "Unable to create background worker for pg_pathman");
91-
}
105+
bgw_started = RegisterDynamicBackgroundWorker(&worker, &bgw_handle);
106+
HandleError(bgw_started == false, BGW_COULD_NOT_START);
92107

93-
status = WaitForBackgroundWorkerStartup(worker_handle, &pid);
94-
if (status == BGWH_POSTMASTER_DIED)
95-
{
96-
ereport(WARNING,
97-
(errmsg("Postmaster died during the pg_pathman background worker process"),
98-
errhint("More details may be available in the server log.")));
99-
}
108+
/* Wait till the worker starts */
109+
bgw_status = WaitForBackgroundWorkerStartup(bgw_handle, &pid);
110+
HandleError(bgw_status == BGWH_POSTMASTER_DIED, BGW_PM_DIED);
100111

101-
/* Wait till the worker finishes its job */
102-
status = WaitForBackgroundWorkerShutdown(worker_handle);
103-
if (status == BGWH_POSTMASTER_DIED)
104-
{
105-
ereport(WARNING,
106-
(errmsg("Postmaster died during the pg_pathman background worker process"),
107-
errhint("More details may be available in the server log.")));
108-
}
109-
*crashed = args->crashed;
112+
/* Wait till the worker finishes job */
113+
bgw_status = WaitForBackgroundWorkerShutdown(bgw_handle);
114+
HandleError(bgw_status == BGWH_POSTMASTER_DIED, BGW_PM_DIED);
115+
116+
/* Save the result (partition Oid) */
110117
child_oid = args->result;
111118

119+
120+
/* end execution */
121+
handle_exec_state:
122+
LWLockRelease(pmstate->load_config_lock);
123+
LWLockRelease(pmstate->edit_partitions_lock);
124+
112125
/* Free dsm segment */
113126
dsm_detach(segment);
114127

128+
switch (exec_state)
129+
{
130+
case BGW_COULD_NOT_START:
131+
elog(ERROR, "Unable to create background worker for pg_pathman");
132+
break;
133+
134+
case BGW_PM_DIED:
135+
ereport(ERROR,
136+
(errmsg("Postmaster died during the pg_pathman background worker process"),
137+
errhint("More details may be available in the server log.")));
138+
break;
139+
140+
case BGW_CRASHED:
141+
elog(ERROR, "Could not create partition due to background worker crash");
142+
break;
143+
144+
default:
145+
break;
146+
}
147+
115148
return child_oid;
116149
}
117150

@@ -129,10 +162,8 @@ bg_worker_main(Datum main_arg)
129162

130163
/* Attach to dynamic shared memory */
131164
if (!handle)
132-
{
133-
ereport(WARNING,
134-
(errmsg("pg_pathman worker: invalid dsm_handle")));
135-
}
165+
ereport(WARNING, (errmsg("pg_pathman worker: invalid dsm_handle")));
166+
136167
segment = dsm_attach(handle);
137168
args = dsm_segment_address(segment);
138169

@@ -163,19 +194,16 @@ bg_worker_main(Datum main_arg)
163194
Oid
164195
create_partitions(Oid relid, Datum value, Oid value_type, bool *crashed)
165196
{
166-
int ret;
167-
Datum vals[2];
168-
Oid oids[] = {OIDOID, value_type};
169-
bool nulls[] = {false, false};
197+
Oid oids[] = { OIDOID, value_type };
198+
Datum vals[] = { ObjectIdGetDatum(relid), value };
199+
bool nulls[] = { false, false };
170200
char *sql;
171201
PartRelationInfo *prel;
172202
RangeRelation *rangerel;
173203
FmgrInfo cmp_func;
174204
char *schema;
175-
search_rangerel_result search_state;
176-
RangeEntry found_re;
177205

178-
*crashed = false;
206+
*crashed = true;
179207
schema = get_extension_schema();
180208

181209
prel = get_pathman_relation_info(relid, NULL);
@@ -184,36 +212,39 @@ create_partitions(Oid relid, Datum value, Oid value_type, bool *crashed)
184212
/* Comparison function */
185213
fill_type_cmp_fmgr_info(&cmp_func, value_type, prel->atttype);
186214

187-
vals[0] = ObjectIdGetDatum(relid);
188-
vals[1] = value;
189-
190215
/* Perform PL procedure */
191216
sql = psprintf("SELECT %s.append_partitions_on_demand_internal($1, $2)",
192217
schema);
193218
PG_TRY();
194219
{
220+
int ret;
221+
Oid partid = InvalidOid;
222+
bool isnull;
223+
195224
ret = SPI_execute_with_args(sql, 2, oids, vals, nulls, false, 0);
196225
if (ret > 0)
197226
{
227+
TupleDesc tupdesc = SPI_tuptable->tupdesc;
228+
HeapTuple tuple = SPI_tuptable->vals[0];
229+
230+
Assert(SPI_processed == 1);
231+
232+
partid = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 1, &isnull));
233+
198234
/* Update relation info */
199235
free_dsm_array(&rangerel->ranges);
200236
free_dsm_array(&prel->children);
201237
load_check_constraints(relid, GetCatalogSnapshot(relid));
202238
}
239+
240+
*crashed = false;
241+
return partid;
203242
}
204243
PG_CATCH();
205244
{
206-
elog(WARNING, "Attempt to create new partitions failed");
207-
if (crashed != NULL)
208-
*crashed = true;
209-
return 0;
245+
elog(ERROR, "Attempt to create new partitions failed");
246+
247+
return InvalidOid; /* compiler should be happy */
210248
}
211249
PG_END_TRY();
212-
213-
search_state = search_range_partition_eq(value, &cmp_func,
214-
rangerel, &found_re);
215-
if (search_state == SEARCH_RANGEREL_FOUND)
216-
return found_re.child_oid;
217-
218-
return 0;
219250
}

0 commit comments

Comments
 (0)