Skip to content

Commit 2eaa019

Browse files
committed
pathman: background worker for partition creation in insert trigger
1 parent ad580df commit 2eaa019

File tree

8 files changed

+281
-69
lines changed

8 files changed

+281
-69
lines changed

contrib/pg_pathman/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# contrib/pg_pathman/Makefile
22

33
MODULE_big = pg_pathman
4-
OBJS = init.o pg_pathman.o dsm_array.o rangeset.o pl_funcs.o $(WIN32RES)
4+
OBJS = init.o pg_pathman.o dsm_array.o rangeset.o pl_funcs.o worker.o $(WIN32RES)
55

66
EXTENSION = pg_pathman
77
EXTVERSION = 0.1

contrib/pg_pathman/dsm_array.c

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,14 @@ typedef BlockHeader* BlockHeaderPtr;
3232
#define set_length(header, length) \
3333
((length) | ((*header) & FREE_BIT))
3434

35+
/*
36+
* Initialize dsm config for arrays
37+
*/
3538
void
36-
alloc_dsm_table()
39+
init_dsm_config()
3740
{
3841
bool found;
39-
dsm_cfg = ShmemInitStruct("dsm config", sizeof(DsmConfig), &found);
42+
dsm_cfg = ShmemInitStruct("pathman dsm_array config", sizeof(DsmConfig), &found);
4043
if (!found)
4144
{
4245
dsm_cfg->segment_handle = 0;
@@ -46,6 +49,15 @@ alloc_dsm_table()
4649
}
4750
}
4851

52+
/*
53+
* Attach process to dsm_array segment. This function is used for
54+
* background workers only. Use init_dsm_segment() in backend processes.
55+
*/
56+
void
57+
attach_dsm_array_segment()
58+
{
59+
segment = dsm_attach(dsm_cfg->segment_handle);
60+
}
4961

5062
/*
5163
* Initialize dsm segment. Returns true if new segment was created and

contrib/pg_pathman/expected/pg_pathman.out

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -502,23 +502,8 @@ NOTICE: Copying data to partitions...
502502

503503
INSERT INTO test.range_rel (dt)
504504
SELECT generate_series('2015-01-01', '2015-04-30', '1 day'::interval);
505-
NOTICE: partition test.range_rel_2 created
506-
NOTICE: partition test.range_rel_3 created
507-
NOTICE: partition test.range_rel_4 created
508-
NOTICE: partition test.range_rel_5 created
509-
NOTICE: partition test.range_rel_6 created
510-
NOTICE: partition test.range_rel_7 created
511-
NOTICE: partition test.range_rel_8 created
512-
NOTICE: partition test.range_rel_9 created
513-
NOTICE: partition test.range_rel_10 created
514-
NOTICE: partition test.range_rel_11 created
515-
NOTICE: partition test.range_rel_12 created
516505
INSERT INTO test.range_rel (dt)
517506
SELECT generate_series('2014-12-31', '2014-12-01', '-1 day'::interval);
518-
NOTICE: partition test.range_rel_13 created
519-
NOTICE: partition test.range_rel_14 created
520-
NOTICE: partition test.range_rel_15 created
521-
NOTICE: partition test.range_rel_16 created
522507
EXPLAIN (COSTS OFF) SELECT * FROM test.range_rel WHERE dt = '2014-12-15';
523508
QUERY PLAN
524509
--------------------------------------------------------------------------------

contrib/pg_pathman/init.c

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,33 @@
1616

1717
HTAB *relations = NULL;
1818
HTAB *range_restrictions = NULL;
19+
// bool *config_loaded = NULL;
1920
bool initialization_needed = true;
2021

22+
typedef struct ShmemConfig
23+
{
24+
bool config_loaded;
25+
} ShmemConfig;
26+
ShmemConfig *shmem_cfg;
27+
2128
static FmgrInfo *qsort_type_cmp_func;
2229

2330
static bool validate_range_constraint(Expr *, PartRelationInfo *, Datum *, Datum *);
2431
static bool validate_hash_constraint(Expr *expr, PartRelationInfo *prel, int *hash);
2532
static int cmp_range_entries(const void *p1, const void *p2);
2633

34+
void
35+
init_shmem_config()
36+
{
37+
bool found;
38+
create_relations_hashtable();
39+
create_range_restrictions_hashtable();
40+
shmem_cfg = (ShmemConfig *)
41+
ShmemInitStruct("pathman shmem config", sizeof(ShmemConfig), &found);
42+
shmem_cfg->config_loaded = false;
43+
// *config_loaded = false;
44+
}
45+
2746
/*
2847
* Initialize hashtables
2948
*/
@@ -35,9 +54,15 @@ load_config(void)
3554
initialization_needed = false;
3655
new_segment_created = init_dsm_segment(INITIAL_BLOCKS_COUNT, 32);
3756

38-
LWLockAcquire(load_config_lock, LW_EXCLUSIVE);
39-
load_relations_hashtable(new_segment_created);
40-
LWLockRelease(load_config_lock);
57+
/* if config is not loaded */
58+
if (shmem_cfg && shmem_cfg->config_loaded)
59+
{
60+
LWLockAcquire(load_config_lock, LW_EXCLUSIVE);
61+
load_relations_hashtable(new_segment_created);
62+
LWLockRelease(load_config_lock);
63+
// *config_loaded = true;
64+
shmem_cfg->config_loaded = true;
65+
}
4166
}
4267

4368
/*

contrib/pg_pathman/pathman.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,19 +131,21 @@ LWLock *dsm_init_lock;
131131

132132

133133
/* Dynamic shared memory functions */
134-
void alloc_dsm_table(void);
134+
void init_dsm_config(void);
135135
bool init_dsm_segment(size_t blocks_count, size_t block_size);
136136
void init_dsm_table(size_t block_size, size_t start, size_t end);
137137
void alloc_dsm_array(DsmArray *arr, size_t entry_size, size_t length);
138138
void free_dsm_array(DsmArray *arr);
139139
void *dsm_array_get_pointer(const DsmArray* arr);
140-
140+
dsm_handle get_dsm_array_segment(void);
141+
void attach_dsm_array_segment(void);
141142

142143
HTAB *relations;
143144
HTAB *range_restrictions;
144145
bool initialization_needed;
145146

146147
/* initialization functions */
148+
void init_shmem_config(void);
147149
void load_config(void);
148150
void create_relations_hashtable(void);
149151
void create_hash_restrictions_hashtable(void);
@@ -157,5 +159,7 @@ PartRelationInfo *get_pathman_relation_info(Oid relid, bool *found);
157159
RangeRelation *get_pathman_range_relation(Oid relid, bool *found);
158160
int range_binary_search(const RangeRelation *rangerel, FmgrInfo *cmp_func, Datum value, bool *fountPtr);
159161
char *get_extension_schema(void);
162+
FmgrInfo *get_cmp_func(Oid type1, Oid type2);
163+
Oid create_partitions_bg_worker(Oid relid, Datum value, Oid value_type);
160164

161165
#endif /* PATHMAN_H */

contrib/pg_pathman/pg_pathman.c

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,25 @@ get_pathman_range_relation(Oid relid, bool *found)
147147
return hash_search(range_restrictions, (const void *) &key, HASH_FIND, found);
148148
}
149149

150+
FmgrInfo *
151+
get_cmp_func(Oid type1, Oid type2)
152+
{
153+
FmgrInfo *cmp_func;
154+
Oid cmp_proc_oid;
155+
TypeCacheEntry *tce;
156+
157+
cmp_func = palloc(sizeof(FmgrInfo));
158+
tce = lookup_type_cache(type1,
159+
TYPECACHE_EQ_OPR | TYPECACHE_LT_OPR | TYPECACHE_GT_OPR |
160+
TYPECACHE_CMP_PROC | TYPECACHE_CMP_PROC_FINFO);
161+
cmp_proc_oid = get_opfamily_proc(tce->btree_opf,
162+
type1,
163+
type2,
164+
BTORDER_PROC);
165+
fmgr_info(cmp_proc_oid, cmp_func);
166+
return cmp_func;
167+
}
168+
150169
/*
151170
* Planner hook. It disables inheritance for tables that have been partitioned
152171
* by pathman to prevent standart PostgreSQL partitioning mechanism from
@@ -233,9 +252,8 @@ pathman_shmem_startup(void)
233252
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
234253

235254
/* Allocate shared memory objects */
236-
alloc_dsm_table();
237-
create_relations_hashtable();
238-
create_range_restrictions_hashtable();
255+
init_dsm_config();
256+
init_shmem_config();
239257

240258
LWLockRelease(AddinShmemInitLock);
241259

contrib/pg_pathman/pl_funcs.c

Lines changed: 11 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@
66
#include "access/nbtree.h"
77
#include "access/xact.h"
88
#include "catalog/pg_type.h"
9-
#include "executor/spi.h"
9+
// #include "executor/spi.h"
1010
#include "storage/lmgr.h"
1111

1212

13+
#include "miscadmin.h"
14+
15+
1316
/* declarations */
1417
PG_FUNCTION_INFO_V1( on_partitions_created );
1518
PG_FUNCTION_INFO_V1( on_partitions_updated );
@@ -116,16 +119,9 @@ find_or_create_range_partition(PG_FUNCTION_ARGS)
116119
PG_RETURN_OID(ranges[pos].child_oid);
117120
else
118121
{
119-
int ret;
120-
Datum vals[4];
121-
Oid oids[] = {OIDOID, value_type, value_type, value_type};
122-
bool nulls[] = {false, false, false, false};
123-
RangeEntry *re = &ranges[rangerel->ranges.length-1];
124-
int cmp_upper = FunctionCall2(&cmp_func, value, ranges[rangerel->ranges.length-1].max);
125-
int cmp_lower = FunctionCall2(&cmp_func, value, ranges[0].min);
126-
char *sql;
127-
128-
/* Lock relation before appending new partitions */
122+
Oid child_oid;
123+
124+
/* Lock config before appending new partitions */
129125
LWLockAcquire(load_config_lock, LW_EXCLUSIVE);
130126

131127
/*
@@ -139,45 +135,17 @@ find_or_create_range_partition(PG_FUNCTION_ARGS)
139135
PG_RETURN_OID(ranges[pos].child_oid);
140136
}
141137

142-
/* Determine nearest range partition */
143-
if (cmp_upper > 0)
144-
re = &ranges[rangerel->ranges.length-1];
145-
else if (cmp_lower < 0)
146-
re = &ranges[0];
147-
148-
vals[0] = ObjectIdGetDatum(relid);
149-
vals[1] = re->min;
150-
vals[2] = re->max;
151-
vals[3] = value;
152-
153-
/* Create new partitions */
154-
SPI_connect();
155-
sql = psprintf("SELECT %s.append_partitions_on_demand_internal($1, $2, $3, $4)",
156-
get_extension_schema());
157-
ret = SPI_execute_with_args(sql, 4, oids, vals, nulls, false, 0);
158-
// ret = SPI_execute_with_args("SELECT append_partitions_on_demand_internal($1, $2, $3, $4)",
159-
// 4, oids, vals, nulls, false, 0);
160-
if (ret > 0)
161-
{
162-
/* Update relation info */
163-
free_dsm_array(&rangerel->ranges);
164-
free_dsm_array(&prel->children);
165-
load_check_constraints(relid, GetCatalogSnapshot(relid));
166-
}
167-
else
168-
elog(WARNING, "Attempt to create new partitions failed");
169-
170-
SPI_finish();
138+
/* Start background worker to create new partitions */
139+
child_oid = create_partitions_bg_worker(relid, value, value_type);
171140

172-
/* Release locks */
141+
/* Release lock */
173142
LWLockRelease(load_config_lock);
174-
// pfree(sql);
175143

176144
/* Repeat binary search */
177145
ranges = dsm_array_get_pointer(&rangerel->ranges);
178146
pos = range_binary_search(rangerel, &cmp_func, value, &found);
179147
if (found)
180-
PG_RETURN_OID(ranges[pos].child_oid);
148+
PG_RETURN_OID(child_oid);
181149
}
182150

183151
PG_RETURN_NULL();

0 commit comments

Comments
 (0)