Skip to content

Commit 2b87f9d

Browse files
committed
tablesample-contrib: add system_time sampling method
1 parent 45d33c8 commit 2b87f9d

File tree

6 files changed

+394
-0
lines changed

6 files changed

+394
-0
lines changed

contrib/tsm_system_time/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Generated subdirectories
2+
/log/
3+
/results/
4+
/tmp_check/

contrib/tsm_system_time/Makefile

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# src/test/modules/tsm_system_time/Makefile
2+
3+
MODULE_big = tsm_system_time
4+
OBJS = tsm_system_time.o $(WIN32RES)
5+
PGFILEDESC = "tsm_system_time - SYSTEM TABLESAMPLE method which accepts number rows of as a limit"
6+
7+
EXTENSION = tsm_system_time
8+
DATA = tsm_system_time--1.0.sql
9+
10+
REGRESS = tsm_system_time
11+
12+
ifdef USE_PGXS
13+
PG_CONFIG = pg_config
14+
PGXS := $(shell $(PG_CONFIG) --pgxs)
15+
include $(PGXS)
16+
else
17+
subdir = contrib/tsm_system_time
18+
top_builddir = ../..
19+
include $(top_builddir)/src/Makefile.global
20+
include $(top_srcdir)/contrib/contrib-global.mk
21+
endif
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
CREATE EXTENSION tsm_system_time;
2+
3+
CREATE TABLE test_tablesample (id int, name text) WITH (fillfactor=10); -- force smaller pages so we don't have to load too much data to get multiple pages
4+
5+
INSERT INTO test_tablesample SELECT i, repeat(i::text, 1000) FROM generate_series(0, 30) s(i) ORDER BY i;
6+
ANALYZE test_tablesample;
7+
8+
\timing
9+
SELECT count(*) FROM test_tablesample TABLESAMPLE system_time (1000);
10+
SELECT id FROM test_tablesample TABLESAMPLE system_time (1000) REPEATABLE (5432);
11+
12+
SELECT id FROM test_tablesample TABLESAMPLE system_time (1000) REPEATABLE (10);
13+
14+
EXPLAIN SELECT id FROM test_tablesample TABLESAMPLE system_time (1000) REPEATABLE (10);
15+
16+
-- done
17+
DROP TABLE test_tablesample CASCADE;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/* src/test/modules/tablesample/tsm_system_time--1.0.sql */
2+
3+
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
4+
\echo Use "CREATE EXTENSION tsm_system_time" to load this file. \quit
5+
6+
CREATE FUNCTION tsm_system_time_init(internal, int4, int4)
7+
RETURNS void
8+
AS 'MODULE_PATHNAME'
9+
LANGUAGE C STRICT;
10+
11+
CREATE FUNCTION tsm_system_time_nextblock(internal)
12+
RETURNS int4
13+
AS 'MODULE_PATHNAME'
14+
LANGUAGE C STRICT;
15+
16+
CREATE FUNCTION tsm_system_time_nexttuple(internal, int4, int2)
17+
RETURNS int2
18+
AS 'MODULE_PATHNAME'
19+
LANGUAGE C STRICT;
20+
21+
CREATE FUNCTION tsm_system_time_end(internal)
22+
RETURNS void
23+
AS 'MODULE_PATHNAME'
24+
LANGUAGE C STRICT;
25+
26+
CREATE FUNCTION tsm_system_time_reset(internal)
27+
RETURNS void
28+
AS 'MODULE_PATHNAME'
29+
LANGUAGE C STRICT;
30+
31+
CREATE FUNCTION tsm_system_time_cost(internal, internal, internal, internal, internal, internal, internal)
32+
RETURNS void
33+
AS 'MODULE_PATHNAME'
34+
LANGUAGE C STRICT;
35+
36+
INSERT INTO pg_tablesample_method VALUES('system_time', false, true,
37+
'tsm_system_time_init', 'tsm_system_time_nextblock',
38+
'tsm_system_time_nexttuple', '-', 'tsm_system_time_end',
39+
'tsm_system_time_reset', 'tsm_system_time_cost');
40+
Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* tsm_system_time.c
4+
* interface routines for system_time tablesample method
5+
*
6+
*
7+
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
8+
*
9+
* IDENTIFICATION
10+
* contrib/tsm_system_time_rowlimit/tsm_system_time.c
11+
*
12+
*-------------------------------------------------------------------------
13+
*/
14+
15+
#include "postgres.h"
16+
17+
#include "fmgr.h"
18+
19+
#include "access/tablesample.h"
20+
#include "access/relscan.h"
21+
#include "miscadmin.h"
22+
#include "nodes/execnodes.h"
23+
#include "nodes/relation.h"
24+
#include "optimizer/clauses.h"
25+
#include "storage/bufmgr.h"
26+
#include "utils/sampling.h"
27+
#include "utils/spccache.h"
28+
#include "utils/timestamp.h"
29+
30+
PG_MODULE_MAGIC;
31+
32+
/*
33+
* State
34+
*/
35+
typedef struct
36+
{
37+
SamplerRandomState randstate;
38+
uint32 seed; /* random seed */
39+
BlockNumber nblocks; /* number of block in relation */
40+
int32 time; /* time limit for sampling */
41+
TimestampTz start_time; /* start time of sampling */
42+
TimestampTz end_time; /* end time of sampling */
43+
OffsetNumber lt; /* last tuple returned from current block */
44+
BlockNumber step; /* step size */
45+
BlockNumber lb; /* last block visited */
46+
BlockNumber estblocks; /* estimated number of returned blocks (moving) */
47+
BlockNumber retblocks; /* number of already returned blocks */
48+
} SystemSamplerData;
49+
50+
51+
PG_FUNCTION_INFO_V1(tsm_system_time_init);
52+
PG_FUNCTION_INFO_V1(tsm_system_time_nextblock);
53+
PG_FUNCTION_INFO_V1(tsm_system_time_nexttuple);
54+
PG_FUNCTION_INFO_V1(tsm_system_time_end);
55+
PG_FUNCTION_INFO_V1(tsm_system_time_reset);
56+
PG_FUNCTION_INFO_V1(tsm_system_time_cost);
57+
58+
static uint32 random_relative_prime(uint32 n, SamplerRandomState randstate);
59+
60+
/*
61+
* Initializes the state.
62+
*/
63+
Datum
64+
tsm_system_time_init(PG_FUNCTION_ARGS)
65+
{
66+
TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
67+
uint32 seed = PG_GETARG_UINT32(1);
68+
int32 time = PG_ARGISNULL(2) ? -1 : PG_GETARG_INT32(2);
69+
HeapScanDesc scan = tsdesc->heapScan;
70+
SystemSamplerData *sampler;
71+
72+
if (time < 1)
73+
ereport(ERROR,
74+
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
75+
errmsg("invalid time limit"),
76+
errhint("Time limit must be positive integer value.")));
77+
78+
sampler = palloc0(sizeof(SystemSamplerData));
79+
80+
/* Remember initial values for reinit */
81+
sampler->seed = seed;
82+
sampler->nblocks = scan->rs_nblocks;
83+
sampler->time = time;
84+
sampler->estblocks = 2;
85+
sampler->retblocks = 0;
86+
sampler->start_time = GetCurrentTimestamp();
87+
sampler->end_time = TimestampTzPlusMilliseconds(sampler->start_time,
88+
sampler->time);
89+
sampler->lt = InvalidOffsetNumber;
90+
91+
sampler_random_init_state(sampler->seed, sampler->randstate);
92+
93+
/* Find relative prime as step size for linear probing. */
94+
sampler->step = random_relative_prime(sampler->nblocks, sampler->randstate);
95+
/*
96+
* Randomize start position so that blocks close to step size don't have
97+
* higher probability of being chosen on very short scan.
98+
*/
99+
sampler->lb = sampler_random_fract(sampler->randstate) * (sampler->nblocks / sampler->step);
100+
101+
tsdesc->tsmdata = (void *) sampler;
102+
103+
PG_RETURN_VOID();
104+
}
105+
106+
/*
107+
* Get next block number or InvalidBlockNumber when we're done.
108+
*
109+
* Uses the same logic as VACUUM for picking the random blocks.
110+
*/
111+
Datum
112+
tsm_system_time_nextblock(PG_FUNCTION_ARGS)
113+
{
114+
TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
115+
SystemSamplerData *sampler = (SystemSamplerData *) tsdesc->tsmdata;
116+
117+
sampler->lb = (sampler->lb + sampler->step) % sampler->nblocks;
118+
sampler->retblocks++;
119+
120+
/* All blocks have been read, we're done */
121+
if (sampler->retblocks > sampler->nblocks)
122+
PG_RETURN_UINT32(InvalidBlockNumber);
123+
124+
/*
125+
* Update the estimations for time limit at least 10 times per estimated
126+
* number of returned blocks to handle variations in block read speed.
127+
*/
128+
if (sampler->retblocks % Max(sampler->estblocks/10, 1) == 0)
129+
{
130+
TimestampTz now = GetCurrentTimestamp();
131+
long secs;
132+
int usecs;
133+
int usecs_remaining;
134+
int time_per_block;
135+
136+
TimestampDifference(sampler->start_time, now, &secs, &usecs);
137+
usecs += (int) secs * 1000000;
138+
139+
time_per_block = usecs / sampler->retblocks;
140+
141+
/* No time left, end. */
142+
TimestampDifference(now, sampler->end_time, &secs, &usecs);
143+
if (secs <= 0 && usecs <= 0)
144+
PG_RETURN_UINT32(InvalidBlockNumber);
145+
146+
/* Remaining microseconds */
147+
usecs_remaining = usecs + (int) secs * 1000000;
148+
149+
/* Recalculate estimated returned number of blocks */
150+
if (time_per_block < usecs_remaining && time_per_block > 0)
151+
sampler->estblocks = sampler->time * time_per_block;
152+
}
153+
154+
PG_RETURN_UINT32(sampler->lb);
155+
}
156+
157+
/*
158+
* Get next tuple offset in current block or InvalidOffsetNumber if we are done
159+
* with this block.
160+
*/
161+
Datum
162+
tsm_system_time_nexttuple(PG_FUNCTION_ARGS)
163+
{
164+
TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
165+
OffsetNumber maxoffset = PG_GETARG_UINT16(2);
166+
SystemSamplerData *sampler = (SystemSamplerData *) tsdesc->tsmdata;
167+
OffsetNumber tupoffset = sampler->lt;
168+
169+
if (tupoffset == InvalidOffsetNumber)
170+
tupoffset = FirstOffsetNumber;
171+
else
172+
tupoffset++;
173+
174+
if (tupoffset > maxoffset)
175+
tupoffset = InvalidOffsetNumber;
176+
177+
sampler->lt = tupoffset;
178+
179+
PG_RETURN_UINT16(tupoffset);
180+
}
181+
182+
/*
183+
* Cleanup method.
184+
*/
185+
Datum
186+
tsm_system_time_end(PG_FUNCTION_ARGS)
187+
{
188+
TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
189+
190+
pfree(tsdesc->tsmdata);
191+
192+
PG_RETURN_VOID();
193+
}
194+
195+
/*
196+
* Reset state (called by ReScan).
197+
*/
198+
Datum
199+
tsm_system_time_reset(PG_FUNCTION_ARGS)
200+
{
201+
TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
202+
SystemSamplerData *sampler = (SystemSamplerData *) tsdesc->tsmdata;
203+
204+
sampler->lt = InvalidOffsetNumber;
205+
sampler->start_time = GetCurrentTimestamp();
206+
sampler->end_time = TimestampTzPlusMilliseconds(sampler->start_time,
207+
sampler->time);
208+
sampler->estblocks = 2;
209+
sampler->retblocks = 0;
210+
sampler->step = random_relative_prime(sampler->nblocks, sampler->randstate);
211+
sampler->lb = sampler_random_fract(sampler->randstate) * (sampler->nblocks / sampler->step);
212+
213+
PG_RETURN_VOID();
214+
}
215+
216+
/*
217+
* Costing function.
218+
*/
219+
Datum
220+
tsm_system_time_cost(PG_FUNCTION_ARGS)
221+
{
222+
PlannerInfo *root = (PlannerInfo *) PG_GETARG_POINTER(0);
223+
Path *path = (Path *) PG_GETARG_POINTER(1);
224+
RelOptInfo *baserel = (RelOptInfo *) PG_GETARG_POINTER(2);
225+
List *args = (List *) PG_GETARG_POINTER(3);
226+
BlockNumber *pages = (BlockNumber *) PG_GETARG_POINTER(4);
227+
double *tuples = (double *) PG_GETARG_POINTER(5);
228+
Node *limitnode;
229+
int32 time;
230+
BlockNumber relpages;
231+
double reltuples;
232+
double density;
233+
double spc_random_page_cost;
234+
235+
limitnode = linitial(args);
236+
limitnode = estimate_expression_value(root, limitnode);
237+
238+
if (IsA(limitnode, RelabelType))
239+
limitnode = (Node *) ((RelabelType *) limitnode)->arg;
240+
241+
if (IsA(limitnode, Const))
242+
time = DatumGetInt32(((Const *) limitnode)->constvalue);
243+
else
244+
{
245+
/* Default time (1s) if the estimation didn't return Const. */
246+
time = 1000;
247+
}
248+
249+
relpages = baserel->pages;
250+
reltuples = baserel->tuples;
251+
252+
/* estimate the tuple density */
253+
if (relpages > 0)
254+
density = reltuples / (double) relpages;
255+
else
256+
density = (BLCKSZ - SizeOfPageHeaderData) / baserel->width;
257+
258+
/*
259+
* We equal random page cost value to number of ms it takes to read the
260+
* random page here which is far from accurate but we don't have anything
261+
* better to base our predicted page reads.
262+
*/
263+
get_tablespace_page_costs(baserel->reltablespace,
264+
&spc_random_page_cost,
265+
NULL);
266+
267+
*pages = Min(baserel->pages, time/spc_random_page_cost);
268+
*tuples = rint(density * (double) *pages * path->rows / baserel->tuples);
269+
path->rows = *tuples;
270+
271+
PG_RETURN_VOID();
272+
}
273+
274+
static uint32
275+
gcd (uint32 a, uint32 b)
276+
{
277+
uint32 c;
278+
279+
while (a != 0)
280+
{
281+
c = a;
282+
a = b % a;
283+
b = c;
284+
}
285+
286+
return b;
287+
}
288+
289+
static uint32
290+
random_relative_prime(uint32 n, SamplerRandomState randstate)
291+
{
292+
/* Pick random starting number, with some limits on what it can be. */
293+
uint32 r = (uint32) sampler_random_fract(randstate) * n/2 + n/4,
294+
t;
295+
296+
/*
297+
* This should only take 2 or 3 iterations as the probability of 2 numbers
298+
* being relatively prime is ~61%.
299+
*/
300+
while ((t = gcd(r, n)) > 1)
301+
{
302+
CHECK_FOR_INTERRUPTS();
303+
r /= t;
304+
}
305+
306+
return r;
307+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# tsm_system_time extension
2+
comment = 'SYSTEM TABLESAMPLE method which accepts time in milliseconds as a limit'
3+
default_version = '1.0'
4+
module_pathname = '$libdir/tsm_system_time'
5+
relocatable = true

0 commit comments

Comments
 (0)