Skip to content

Commit be44ed2

Browse files
committed
Improve index AMs' opclass validation procedures.
The amvalidate functions added in commit 65c5fcd were on the crude side. Improve them in a few ways: * Perform signature checking for operators and support functions. * Apply more thorough checks for missing operators and functions, where possible. * Instead of reporting problems as ERRORs, report most problems as INFO messages and make the amvalidate function return FALSE. This allows more than one problem to be discovered per run. * Report object names rather than OIDs, and work a bit harder on making the messages understandable. Also, remove a few more opr_sanity regression test queries that are now superseded by the amvalidate checks.
1 parent b995518 commit be44ed2

File tree

13 files changed

+1417
-774
lines changed

13 files changed

+1417
-774
lines changed

src/backend/access/brin/brin_validate.c

Lines changed: 193 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -13,31 +13,46 @@
1313
*/
1414
#include "postgres.h"
1515

16+
#include "access/amvalidate.h"
1617
#include "access/brin_internal.h"
1718
#include "access/htup_details.h"
1819
#include "catalog/pg_amop.h"
1920
#include "catalog/pg_amproc.h"
2021
#include "catalog/pg_opclass.h"
21-
#include "utils/catcache.h"
22+
#include "catalog/pg_opfamily.h"
23+
#include "catalog/pg_type.h"
24+
#include "utils/builtins.h"
2225
#include "utils/syscache.h"
2326

2427

2528
/*
2629
* Validator for a BRIN opclass.
30+
*
31+
* Some of the checks done here cover the whole opfamily, and therefore are
32+
* redundant when checking each opclass in a family. But they don't run long
33+
* enough to be much of a problem, so we accept the duplication rather than
34+
* complicate the amvalidate API.
2735
*/
2836
bool
2937
brinvalidate(Oid opclassoid)
3038
{
39+
bool result = true;
3140
HeapTuple classtup;
3241
Form_pg_opclass classform;
3342
Oid opfamilyoid;
3443
Oid opcintype;
35-
int numclassops;
36-
int32 classfuncbits;
44+
char *opclassname;
45+
HeapTuple familytup;
46+
Form_pg_opfamily familyform;
47+
char *opfamilyname;
3748
CatCList *proclist,
3849
*oprlist;
39-
int i,
40-
j;
50+
uint64 allfuncs = 0;
51+
uint64 allops = 0;
52+
List *grouplist;
53+
OpFamilyOpFuncGroup *opclassgroup;
54+
int i;
55+
ListCell *lc;
4156

4257
/* Fetch opclass information */
4358
classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
@@ -47,106 +62,217 @@ brinvalidate(Oid opclassoid)
4762

4863
opfamilyoid = classform->opcfamily;
4964
opcintype = classform->opcintype;
65+
opclassname = NameStr(classform->opcname);
5066

51-
ReleaseSysCache(classtup);
67+
/* Fetch opfamily information */
68+
familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid));
69+
if (!HeapTupleIsValid(familytup))
70+
elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid);
71+
familyform = (Form_pg_opfamily) GETSTRUCT(familytup);
72+
73+
opfamilyname = NameStr(familyform->opfname);
5274

5375
/* Fetch all operators and support functions of the opfamily */
5476
oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
5577
proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
5678

57-
/* We'll track the ops and functions belonging to the named opclass */
58-
numclassops = 0;
59-
classfuncbits = 0;
60-
61-
/* Check support functions */
79+
/* Check individual support functions */
6280
for (i = 0; i < proclist->n_members; i++)
6381
{
6482
HeapTuple proctup = &proclist->members[i]->tuple;
6583
Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
84+
bool ok;
6685

67-
/* Check that only allowed procedure numbers exist */
68-
if (procform->amprocnum < 1 ||
69-
procform->amprocnum > BRIN_LAST_OPTIONAL_PROCNUM)
70-
ereport(ERROR,
86+
/* Check procedure numbers and function signatures */
87+
switch (procform->amprocnum)
88+
{
89+
case BRIN_PROCNUM_OPCINFO:
90+
ok = check_amproc_signature(procform->amproc, INTERNALOID, true,
91+
1, 1, INTERNALOID);
92+
break;
93+
case BRIN_PROCNUM_ADDVALUE:
94+
ok = check_amproc_signature(procform->amproc, BOOLOID, true,
95+
4, 4, INTERNALOID, INTERNALOID,
96+
INTERNALOID, INTERNALOID);
97+
break;
98+
case BRIN_PROCNUM_CONSISTENT:
99+
ok = check_amproc_signature(procform->amproc, BOOLOID, true,
100+
3, 3, INTERNALOID, INTERNALOID,
101+
INTERNALOID);
102+
break;
103+
case BRIN_PROCNUM_UNION:
104+
ok = check_amproc_signature(procform->amproc, BOOLOID, true,
105+
3, 3, INTERNALOID, INTERNALOID,
106+
INTERNALOID);
107+
break;
108+
default:
109+
/* Complain if it's not a valid optional proc number */
110+
if (procform->amprocnum < BRIN_FIRST_OPTIONAL_PROCNUM ||
111+
procform->amprocnum > BRIN_LAST_OPTIONAL_PROCNUM)
112+
{
113+
ereport(INFO,
114+
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
115+
errmsg("brin opfamily %s contains function %s with invalid support number %d",
116+
opfamilyname,
117+
format_procedure(procform->amproc),
118+
procform->amprocnum)));
119+
result = false;
120+
continue; /* omit bad proc numbers from allfuncs */
121+
}
122+
/* Can't check signatures of optional procs, so assume OK */
123+
ok = true;
124+
break;
125+
}
126+
127+
if (!ok)
128+
{
129+
ereport(INFO,
71130
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
72-
errmsg("brin opfamily %u contains invalid support number %d for procedure %u",
73-
opfamilyoid,
74-
procform->amprocnum, procform->amproc)));
75-
76-
/* Remember functions that are specifically for the named opclass */
77-
if (procform->amproclefttype == opcintype &&
78-
procform->amprocrighttype == opcintype)
79-
classfuncbits |= (1 << procform->amprocnum);
131+
errmsg("brin opfamily %s contains function %s with wrong signature for support number %d",
132+
opfamilyname,
133+
format_procedure(procform->amproc),
134+
procform->amprocnum)));
135+
result = false;
136+
}
137+
138+
/* Track all valid procedure numbers seen in opfamily */
139+
allfuncs |= ((uint64) 1) << procform->amprocnum;
80140
}
81141

82-
/* Check operators */
142+
/* Check individual operators */
83143
for (i = 0; i < oprlist->n_members; i++)
84144
{
85145
HeapTuple oprtup = &oprlist->members[i]->tuple;
86146
Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
87-
bool found = false;
88147

89-
/* TODO: Check that only allowed strategy numbers exist */
90-
if (oprform->amopstrategy < 1)
91-
ereport(ERROR,
148+
/* Check that only allowed strategy numbers exist */
149+
if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63)
150+
{
151+
ereport(INFO,
92152
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
93-
errmsg("brin opfamily %u contains invalid strategy number %d for operator %u",
94-
opfamilyoid,
95-
oprform->amopstrategy, oprform->amopopr)));
96-
97-
/* TODO: check more thoroughly for missing support functions */
98-
for (j = 0; j < proclist->n_members; j++)
153+
errmsg("brin opfamily %s contains operator %s with invalid strategy number %d",
154+
opfamilyname,
155+
format_operator(oprform->amopopr),
156+
oprform->amopstrategy)));
157+
result = false;
158+
}
159+
else
99160
{
100-
HeapTuple proctup = &proclist->members[j]->tuple;
101-
Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
102-
103-
/* note only the operator's lefttype matters */
104-
if (procform->amproclefttype == oprform->amoplefttype &&
105-
procform->amprocrighttype == oprform->amoplefttype)
106-
{
107-
found = true;
108-
break;
109-
}
161+
/*
162+
* The set of operators supplied varies across BRIN opfamilies.
163+
* Our plan is to identify all operator strategy numbers used in
164+
* the opfamily and then complain about datatype combinations that
165+
* are missing any operator(s). However, consider only numbers
166+
* that appear in some non-cross-type case, since cross-type
167+
* operators may have unique strategies. (This is not a great
168+
* heuristic, in particular an erroneous number used in a
169+
* cross-type operator will not get noticed; but the core BRIN
170+
* opfamilies are messy enough to make it necessary.)
171+
*/
172+
if (oprform->amoplefttype == oprform->amoprighttype)
173+
allops |= ((uint64) 1) << oprform->amopstrategy;
110174
}
111175

112-
if (!found)
113-
ereport(ERROR,
114-
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
115-
errmsg("brin opfamily %u lacks support function for operator %u",
116-
opfamilyoid, oprform->amopopr)));
117-
118176
/* brin doesn't support ORDER BY operators */
119177
if (oprform->amoppurpose != AMOP_SEARCH ||
120178
OidIsValid(oprform->amopsortfamily))
121-
ereport(ERROR,
179+
{
180+
ereport(INFO,
181+
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
182+
errmsg("brin opfamily %s contains invalid ORDER BY specification for operator %s",
183+
opfamilyname,
184+
format_operator(oprform->amopopr))));
185+
result = false;
186+
}
187+
188+
/* Check operator signature --- same for all brin strategies */
189+
if (!check_amop_signature(oprform->amopopr, BOOLOID,
190+
oprform->amoplefttype,
191+
oprform->amoprighttype))
192+
{
193+
ereport(INFO,
122194
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
123-
errmsg("brin opfamily %u contains invalid ORDER BY specification for operator %u",
124-
opfamilyoid, oprform->amopopr)));
195+
errmsg("brin opfamily %s contains operator %s with wrong signature",
196+
opfamilyname,
197+
format_operator(oprform->amopopr))));
198+
result = false;
199+
}
200+
}
201+
202+
/* Now check for inconsistent groups of operators/functions */
203+
grouplist = identify_opfamily_groups(oprlist, proclist);
204+
opclassgroup = NULL;
205+
foreach(lc, grouplist)
206+
{
207+
OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc);
208+
209+
/* Remember the group exactly matching the test opclass */
210+
if (thisgroup->lefttype == opcintype &&
211+
thisgroup->righttype == opcintype)
212+
opclassgroup = thisgroup;
125213

126-
/* Count operators that are specifically for the named opclass */
127-
if (oprform->amoplefttype == opcintype &&
128-
oprform->amoprighttype == opcintype)
129-
numclassops++;
214+
/*
215+
* Some BRIN opfamilies expect cross-type support functions to exist,
216+
* and some don't. We don't know exactly which are which, so if we
217+
* find a cross-type operator for which there are no support functions
218+
* at all, let it pass. (Don't expect that all operators exist for
219+
* such cross-type cases, either.)
220+
*/
221+
if (thisgroup->functionset == 0 &&
222+
thisgroup->lefttype != thisgroup->righttype)
223+
continue;
224+
225+
/*
226+
* Else complain if there seems to be an incomplete set of either
227+
* operators or support functions for this datatype pair.
228+
*/
229+
if (thisgroup->operatorset != allops)
230+
{
231+
ereport(INFO,
232+
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
233+
errmsg("brin opfamily %s is missing operator(s) for types %s and %s",
234+
opfamilyname,
235+
format_type_be(thisgroup->lefttype),
236+
format_type_be(thisgroup->righttype))));
237+
result = false;
238+
}
239+
if (thisgroup->functionset != allfuncs)
240+
{
241+
ereport(INFO,
242+
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
243+
errmsg("brin opfamily %s is missing support function(s) for types %s and %s",
244+
opfamilyname,
245+
format_type_be(thisgroup->lefttype),
246+
format_type_be(thisgroup->righttype))));
247+
result = false;
248+
}
130249
}
131250

132-
/* Check that the named opclass is complete */
133-
if (numclassops == 0)
134-
ereport(ERROR,
251+
/* Check that the originally-named opclass is complete */
252+
if (!opclassgroup || opclassgroup->operatorset != allops)
253+
{
254+
ereport(INFO,
135255
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
136-
errmsg("brin opclass %u is missing operator(s)",
137-
opclassoid)));
256+
errmsg("brin opclass %s is missing operator(s)",
257+
opclassname)));
258+
result = false;
259+
}
138260
for (i = 1; i <= BRIN_MANDATORY_NPROCS; i++)
139261
{
140-
if ((classfuncbits & (1 << i)) != 0)
262+
if (opclassgroup &&
263+
(opclassgroup->functionset & (((int64) 1) << i)) != 0)
141264
continue; /* got it */
142-
ereport(ERROR,
265+
ereport(INFO,
143266
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
144-
errmsg("brin opclass %u is missing required support function %d",
145-
opclassoid, i)));
267+
errmsg("brin opclass %s is missing support function %d",
268+
opclassname, i)));
269+
result = false;
146270
}
147271

148272
ReleaseCatCacheList(proclist);
149273
ReleaseCatCacheList(oprlist);
274+
ReleaseSysCache(familytup);
275+
ReleaseSysCache(classtup);
150276

151-
return true;
277+
return result;
152278
}

0 commit comments

Comments
 (0)