13
13
*/
14
14
#include "postgres.h"
15
15
16
+ #include "access/amvalidate.h"
16
17
#include "access/brin_internal.h"
17
18
#include "access/htup_details.h"
18
19
#include "catalog/pg_amop.h"
19
20
#include "catalog/pg_amproc.h"
20
21
#include "catalog/pg_opclass.h"
21
- #include "utils/catcache.h"
22
+ #include "catalog/pg_opfamily.h"
23
+ #include "catalog/pg_type.h"
24
+ #include "utils/builtins.h"
22
25
#include "utils/syscache.h"
23
26
24
27
25
28
/*
26
29
* Validator for a BRIN opclass.
30
+ *
31
+ * Some of the checks done here cover the whole opfamily, and therefore are
32
+ * redundant when checking each opclass in a family. But they don't run long
33
+ * enough to be much of a problem, so we accept the duplication rather than
34
+ * complicate the amvalidate API.
27
35
*/
28
36
bool
29
37
brinvalidate (Oid opclassoid )
30
38
{
39
+ bool result = true;
31
40
HeapTuple classtup ;
32
41
Form_pg_opclass classform ;
33
42
Oid opfamilyoid ;
34
43
Oid opcintype ;
35
- int numclassops ;
36
- int32 classfuncbits ;
44
+ char * opclassname ;
45
+ HeapTuple familytup ;
46
+ Form_pg_opfamily familyform ;
47
+ char * opfamilyname ;
37
48
CatCList * proclist ,
38
49
* oprlist ;
39
- int i ,
40
- j ;
50
+ uint64 allfuncs = 0 ;
51
+ uint64 allops = 0 ;
52
+ List * grouplist ;
53
+ OpFamilyOpFuncGroup * opclassgroup ;
54
+ int i ;
55
+ ListCell * lc ;
41
56
42
57
/* Fetch opclass information */
43
58
classtup = SearchSysCache1 (CLAOID , ObjectIdGetDatum (opclassoid ));
@@ -47,106 +62,217 @@ brinvalidate(Oid opclassoid)
47
62
48
63
opfamilyoid = classform -> opcfamily ;
49
64
opcintype = classform -> opcintype ;
65
+ opclassname = NameStr (classform -> opcname );
50
66
51
- ReleaseSysCache (classtup );
67
+ /* Fetch opfamily information */
68
+ familytup = SearchSysCache1 (OPFAMILYOID , ObjectIdGetDatum (opfamilyoid ));
69
+ if (!HeapTupleIsValid (familytup ))
70
+ elog (ERROR , "cache lookup failed for operator family %u" , opfamilyoid );
71
+ familyform = (Form_pg_opfamily ) GETSTRUCT (familytup );
72
+
73
+ opfamilyname = NameStr (familyform -> opfname );
52
74
53
75
/* Fetch all operators and support functions of the opfamily */
54
76
oprlist = SearchSysCacheList1 (AMOPSTRATEGY , ObjectIdGetDatum (opfamilyoid ));
55
77
proclist = SearchSysCacheList1 (AMPROCNUM , ObjectIdGetDatum (opfamilyoid ));
56
78
57
- /* We'll track the ops and functions belonging to the named opclass */
58
- numclassops = 0 ;
59
- classfuncbits = 0 ;
60
-
61
- /* Check support functions */
79
+ /* Check individual support functions */
62
80
for (i = 0 ; i < proclist -> n_members ; i ++ )
63
81
{
64
82
HeapTuple proctup = & proclist -> members [i ]-> tuple ;
65
83
Form_pg_amproc procform = (Form_pg_amproc ) GETSTRUCT (proctup );
84
+ bool ok ;
66
85
67
- /* Check that only allowed procedure numbers exist */
68
- if (procform -> amprocnum < 1 ||
69
- procform -> amprocnum > BRIN_LAST_OPTIONAL_PROCNUM )
70
- ereport (ERROR ,
86
+ /* Check procedure numbers and function signatures */
87
+ switch (procform -> amprocnum )
88
+ {
89
+ case BRIN_PROCNUM_OPCINFO :
90
+ ok = check_amproc_signature (procform -> amproc , INTERNALOID , true,
91
+ 1 , 1 , INTERNALOID );
92
+ break ;
93
+ case BRIN_PROCNUM_ADDVALUE :
94
+ ok = check_amproc_signature (procform -> amproc , BOOLOID , true,
95
+ 4 , 4 , INTERNALOID , INTERNALOID ,
96
+ INTERNALOID , INTERNALOID );
97
+ break ;
98
+ case BRIN_PROCNUM_CONSISTENT :
99
+ ok = check_amproc_signature (procform -> amproc , BOOLOID , true,
100
+ 3 , 3 , INTERNALOID , INTERNALOID ,
101
+ INTERNALOID );
102
+ break ;
103
+ case BRIN_PROCNUM_UNION :
104
+ ok = check_amproc_signature (procform -> amproc , BOOLOID , true,
105
+ 3 , 3 , INTERNALOID , INTERNALOID ,
106
+ INTERNALOID );
107
+ break ;
108
+ default :
109
+ /* Complain if it's not a valid optional proc number */
110
+ if (procform -> amprocnum < BRIN_FIRST_OPTIONAL_PROCNUM ||
111
+ procform -> amprocnum > BRIN_LAST_OPTIONAL_PROCNUM )
112
+ {
113
+ ereport (INFO ,
114
+ (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
115
+ errmsg ("brin opfamily %s contains function %s with invalid support number %d" ,
116
+ opfamilyname ,
117
+ format_procedure (procform -> amproc ),
118
+ procform -> amprocnum )));
119
+ result = false;
120
+ continue ; /* omit bad proc numbers from allfuncs */
121
+ }
122
+ /* Can't check signatures of optional procs, so assume OK */
123
+ ok = true;
124
+ break ;
125
+ }
126
+
127
+ if (!ok )
128
+ {
129
+ ereport (INFO ,
71
130
(errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
72
- errmsg ("brin opfamily %u contains invalid support number %d for procedure %u" ,
73
- opfamilyoid ,
74
- procform -> amprocnum , procform -> amproc )));
75
-
76
- /* Remember functions that are specifically for the named opclass */
77
- if (procform -> amproclefttype == opcintype &&
78
- procform -> amprocrighttype == opcintype )
79
- classfuncbits |= (1 << procform -> amprocnum );
131
+ errmsg ("brin opfamily %s contains function %s with wrong signature for support number %d" ,
132
+ opfamilyname ,
133
+ format_procedure (procform -> amproc ),
134
+ procform -> amprocnum )));
135
+ result = false;
136
+ }
137
+
138
+ /* Track all valid procedure numbers seen in opfamily */
139
+ allfuncs |= ((uint64 ) 1 ) << procform -> amprocnum ;
80
140
}
81
141
82
- /* Check operators */
142
+ /* Check individual operators */
83
143
for (i = 0 ; i < oprlist -> n_members ; i ++ )
84
144
{
85
145
HeapTuple oprtup = & oprlist -> members [i ]-> tuple ;
86
146
Form_pg_amop oprform = (Form_pg_amop ) GETSTRUCT (oprtup );
87
- bool found = false;
88
147
89
- /* TODO: Check that only allowed strategy numbers exist */
90
- if (oprform -> amopstrategy < 1 )
91
- ereport (ERROR ,
148
+ /* Check that only allowed strategy numbers exist */
149
+ if (oprform -> amopstrategy < 1 || oprform -> amopstrategy > 63 )
150
+ {
151
+ ereport (INFO ,
92
152
(errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
93
- errmsg ("brin opfamily %u contains invalid strategy number %d for operator %u" ,
94
- opfamilyoid ,
95
- oprform -> amopstrategy , oprform -> amopopr )));
96
-
97
- /* TODO: check more thoroughly for missing support functions */
98
- for (j = 0 ; j < proclist -> n_members ; j ++ )
153
+ errmsg ("brin opfamily %s contains operator %s with invalid strategy number %d" ,
154
+ opfamilyname ,
155
+ format_operator (oprform -> amopopr ),
156
+ oprform -> amopstrategy )));
157
+ result = false;
158
+ }
159
+ else
99
160
{
100
- HeapTuple proctup = & proclist -> members [j ]-> tuple ;
101
- Form_pg_amproc procform = (Form_pg_amproc ) GETSTRUCT (proctup );
102
-
103
- /* note only the operator's lefttype matters */
104
- if (procform -> amproclefttype == oprform -> amoplefttype &&
105
- procform -> amprocrighttype == oprform -> amoplefttype )
106
- {
107
- found = true;
108
- break ;
109
- }
161
+ /*
162
+ * The set of operators supplied varies across BRIN opfamilies.
163
+ * Our plan is to identify all operator strategy numbers used in
164
+ * the opfamily and then complain about datatype combinations that
165
+ * are missing any operator(s). However, consider only numbers
166
+ * that appear in some non-cross-type case, since cross-type
167
+ * operators may have unique strategies. (This is not a great
168
+ * heuristic, in particular an erroneous number used in a
169
+ * cross-type operator will not get noticed; but the core BRIN
170
+ * opfamilies are messy enough to make it necessary.)
171
+ */
172
+ if (oprform -> amoplefttype == oprform -> amoprighttype )
173
+ allops |= ((uint64 ) 1 ) << oprform -> amopstrategy ;
110
174
}
111
175
112
- if (!found )
113
- ereport (ERROR ,
114
- (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
115
- errmsg ("brin opfamily %u lacks support function for operator %u" ,
116
- opfamilyoid , oprform -> amopopr )));
117
-
118
176
/* brin doesn't support ORDER BY operators */
119
177
if (oprform -> amoppurpose != AMOP_SEARCH ||
120
178
OidIsValid (oprform -> amopsortfamily ))
121
- ereport (ERROR ,
179
+ {
180
+ ereport (INFO ,
181
+ (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
182
+ errmsg ("brin opfamily %s contains invalid ORDER BY specification for operator %s" ,
183
+ opfamilyname ,
184
+ format_operator (oprform -> amopopr ))));
185
+ result = false;
186
+ }
187
+
188
+ /* Check operator signature --- same for all brin strategies */
189
+ if (!check_amop_signature (oprform -> amopopr , BOOLOID ,
190
+ oprform -> amoplefttype ,
191
+ oprform -> amoprighttype ))
192
+ {
193
+ ereport (INFO ,
122
194
(errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
123
- errmsg ("brin opfamily %u contains invalid ORDER BY specification for operator %u" ,
124
- opfamilyoid , oprform -> amopopr )));
195
+ errmsg ("brin opfamily %s contains operator %s with wrong signature" ,
196
+ opfamilyname ,
197
+ format_operator (oprform -> amopopr ))));
198
+ result = false;
199
+ }
200
+ }
201
+
202
+ /* Now check for inconsistent groups of operators/functions */
203
+ grouplist = identify_opfamily_groups (oprlist , proclist );
204
+ opclassgroup = NULL ;
205
+ foreach (lc , grouplist )
206
+ {
207
+ OpFamilyOpFuncGroup * thisgroup = (OpFamilyOpFuncGroup * ) lfirst (lc );
208
+
209
+ /* Remember the group exactly matching the test opclass */
210
+ if (thisgroup -> lefttype == opcintype &&
211
+ thisgroup -> righttype == opcintype )
212
+ opclassgroup = thisgroup ;
125
213
126
- /* Count operators that are specifically for the named opclass */
127
- if (oprform -> amoplefttype == opcintype &&
128
- oprform -> amoprighttype == opcintype )
129
- numclassops ++ ;
214
+ /*
215
+ * Some BRIN opfamilies expect cross-type support functions to exist,
216
+ * and some don't. We don't know exactly which are which, so if we
217
+ * find a cross-type operator for which there are no support functions
218
+ * at all, let it pass. (Don't expect that all operators exist for
219
+ * such cross-type cases, either.)
220
+ */
221
+ if (thisgroup -> functionset == 0 &&
222
+ thisgroup -> lefttype != thisgroup -> righttype )
223
+ continue ;
224
+
225
+ /*
226
+ * Else complain if there seems to be an incomplete set of either
227
+ * operators or support functions for this datatype pair.
228
+ */
229
+ if (thisgroup -> operatorset != allops )
230
+ {
231
+ ereport (INFO ,
232
+ (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
233
+ errmsg ("brin opfamily %s is missing operator(s) for types %s and %s" ,
234
+ opfamilyname ,
235
+ format_type_be (thisgroup -> lefttype ),
236
+ format_type_be (thisgroup -> righttype ))));
237
+ result = false;
238
+ }
239
+ if (thisgroup -> functionset != allfuncs )
240
+ {
241
+ ereport (INFO ,
242
+ (errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
243
+ errmsg ("brin opfamily %s is missing support function(s) for types %s and %s" ,
244
+ opfamilyname ,
245
+ format_type_be (thisgroup -> lefttype ),
246
+ format_type_be (thisgroup -> righttype ))));
247
+ result = false;
248
+ }
130
249
}
131
250
132
- /* Check that the named opclass is complete */
133
- if (numclassops == 0 )
134
- ereport (ERROR ,
251
+ /* Check that the originally-named opclass is complete */
252
+ if (!opclassgroup || opclassgroup -> operatorset != allops )
253
+ {
254
+ ereport (INFO ,
135
255
(errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
136
- errmsg ("brin opclass %u is missing operator(s)" ,
137
- opclassoid )));
256
+ errmsg ("brin opclass %s is missing operator(s)" ,
257
+ opclassname )));
258
+ result = false;
259
+ }
138
260
for (i = 1 ; i <= BRIN_MANDATORY_NPROCS ; i ++ )
139
261
{
140
- if ((classfuncbits & (1 << i )) != 0 )
262
+ if (opclassgroup &&
263
+ (opclassgroup -> functionset & (((int64 ) 1 ) << i )) != 0 )
141
264
continue ; /* got it */
142
- ereport (ERROR ,
265
+ ereport (INFO ,
143
266
(errcode (ERRCODE_INVALID_OBJECT_DEFINITION ),
144
- errmsg ("brin opclass %u is missing required support function %d" ,
145
- opclassoid , i )));
267
+ errmsg ("brin opclass %s is missing support function %d" ,
268
+ opclassname , i )));
269
+ result = false;
146
270
}
147
271
148
272
ReleaseCatCacheList (proclist );
149
273
ReleaseCatCacheList (oprlist );
274
+ ReleaseSysCache (familytup );
275
+ ReleaseSysCache (classtup );
150
276
151
- return true ;
277
+ return result ;
152
278
}
0 commit comments