12
12
*/
13
13
#include "postgres.h"
14
14
15
+ #include "pglogical_output/compat.h"
15
16
#include "pglogical_config.h"
16
17
#include "pglogical_output.h"
17
18
#include "pglogical_proto.h"
18
19
#include "pglogical_hooks.h"
20
+ #include "pglogical_relmetacache.h"
19
21
20
22
#include "access/hash.h"
21
23
#include "access/sysattr.h"
33
35
34
36
#include "replication/output_plugin.h"
35
37
#include "replication/logical.h"
38
+ #ifdef HAVE_REPLICATION_ORIGINS
36
39
#include "replication/origin.h"
40
+ #endif
37
41
38
42
#include "utils/builtins.h"
39
43
#include "utils/catcache.h"
47
51
#include "utils/syscache.h"
48
52
#include "utils/typcache.h"
49
53
54
+ PG_MODULE_MAGIC ;
55
+
50
56
extern void _PG_output_plugin_init (OutputPluginCallbacks * cb );
51
57
52
58
/* These must be available to pg_dlsym() */
@@ -61,8 +67,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
61
67
ReorderBufferTXN * txn , Relation rel ,
62
68
ReorderBufferChange * change );
63
69
70
+ #ifdef HAVE_REPLICATION_ORIGINS
64
71
static bool pg_decode_origin_filter (LogicalDecodingContext * ctx ,
65
72
RepOriginId origin_id );
73
+ #endif
66
74
67
75
static void send_startup_message (LogicalDecodingContext * ctx ,
68
76
PGLogicalOutputData * data , bool last_message );
@@ -79,7 +87,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
79
87
cb -> begin_cb = pg_decode_begin_txn ;
80
88
cb -> change_cb = pg_decode_change ;
81
89
cb -> commit_cb = pg_decode_commit_txn ;
90
+ #ifdef HAVE_REPLICATION_ORIGINS
82
91
cb -> filter_by_origin_cb = pg_decode_origin_filter ;
92
+ #endif
83
93
cb -> shutdown_cb = pg_decode_shutdown ;
84
94
}
85
95
@@ -99,42 +109,42 @@ check_binary_compatibility(PGLogicalOutputData *data)
99
109
if (data -> client_binary_sizeofdatum != 0
100
110
&& data -> client_binary_sizeofdatum != sizeof (Datum ))
101
111
{
102
- elog (DEBUG1 , "Binary mode rejected: Server and client endian sizeof(Datum) mismatch" );
112
+ elog (DEBUG1 , "Binary mode rejected: Server and client sizeof(Datum) mismatch" );
103
113
return false;
104
114
}
105
115
106
116
if (data -> client_binary_sizeofint != 0
107
117
&& data -> client_binary_sizeofint != sizeof (int ))
108
118
{
109
- elog (DEBUG1 , "Binary mode rejected: Server and client endian sizeof(int) mismatch" );
119
+ elog (DEBUG1 , "Binary mode rejected: Server and client sizeof(int) mismatch" );
110
120
return false;
111
121
}
112
122
113
123
if (data -> client_binary_sizeoflong != 0
114
124
&& data -> client_binary_sizeoflong != sizeof (long ))
115
125
{
116
- elog (DEBUG1 , "Binary mode rejected: Server and client endian sizeof(long) mismatch" );
126
+ elog (DEBUG1 , "Binary mode rejected: Server and client sizeof(long) mismatch" );
117
127
return false;
118
128
}
119
129
120
130
if (data -> client_binary_float4byval_set
121
131
&& data -> client_binary_float4byval != server_float4_byval ())
122
132
{
123
- elog (DEBUG1 , "Binary mode rejected: Server and client endian float4byval mismatch" );
133
+ elog (DEBUG1 , "Binary mode rejected: Server and client float4byval mismatch" );
124
134
return false;
125
135
}
126
136
127
137
if (data -> client_binary_float8byval_set
128
138
&& data -> client_binary_float8byval != server_float8_byval ())
129
139
{
130
- elog (DEBUG1 , "Binary mode rejected: Server and client endian float8byval mismatch" );
140
+ elog (DEBUG1 , "Binary mode rejected: Server and client float8byval mismatch" );
131
141
return false;
132
142
}
133
143
134
144
if (data -> client_binary_intdatetimes_set
135
145
&& data -> client_binary_intdatetimes != server_integer_datetimes ())
136
146
{
137
- elog (DEBUG1 , "Binary mode rejected: Server and client endian integer datetimes mismatch" );
147
+ elog (DEBUG1 , "Binary mode rejected: Server and client integer datetimes mismatch" );
138
148
return false;
139
149
}
140
150
@@ -148,7 +158,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
148
158
{
149
159
PGLogicalOutputData * data = palloc0 (sizeof (PGLogicalOutputData ));
150
160
151
- data -> context = AllocSetContextCreate (TopMemoryContext ,
161
+ data -> context = AllocSetContextCreate (ctx -> context ,
152
162
"pglogical conversion context" ,
153
163
ALLOCSET_DEFAULT_MINSIZE ,
154
164
ALLOCSET_DEFAULT_INITSIZE ,
@@ -202,17 +212,17 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
202
212
errmsg ("client sent startup parameters in format %d but we only support format 1" ,
203
213
params_format )));
204
214
205
- if (data -> client_min_proto_version > PG_LOGICAL_PROTO_VERSION_NUM )
215
+ if (data -> client_min_proto_version > PGLOGICAL_PROTO_VERSION_NUM )
206
216
ereport (ERROR ,
207
217
(errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
208
218
errmsg ("client sent min_proto_version=%d but we only support protocol %d or lower" ,
209
- data -> client_min_proto_version , PG_LOGICAL_PROTO_VERSION_NUM )));
219
+ data -> client_min_proto_version , PGLOGICAL_PROTO_VERSION_NUM )));
210
220
211
- if (data -> client_max_proto_version < PG_LOGICAL_PROTO_MIN_VERSION_NUM )
221
+ if (data -> client_max_proto_version < PGLOGICAL_PROTO_MIN_VERSION_NUM )
212
222
ereport (ERROR ,
213
223
(errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
214
224
errmsg ("client sent max_proto_version=%d but we only support protocol %d or higher" ,
215
- data -> client_max_proto_version , PG_LOGICAL_PROTO_MIN_VERSION_NUM )));
225
+ data -> client_max_proto_version , PGLOGICAL_PROTO_MIN_VERSION_NUM )));
216
226
217
227
/*
218
228
* Set correct protocol format.
@@ -308,42 +318,17 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
308
318
}
309
319
310
320
/*
311
- * Will we forward changesets? We have to if we're on 9.4;
312
- * otherwise honour the client's request.
321
+ * 9.4 lacks origins info so don't forward it.
322
+ *
323
+ * There's currently no knob for clients to use to suppress
324
+ * this info and it's sent if it's supported and available.
313
325
*/
314
326
if (PG_VERSION_NUM /100 == 904 )
315
- {
316
- /*
317
- * 9.4 unconditionally forwards changesets due to lack of
318
- * replication origins, and it can't ever send origin info
319
- * for the same reason.
320
- */
321
- data -> forward_changesets = true;
322
327
data -> forward_changeset_origins = false;
323
-
324
- if (data -> client_forward_changesets_set
325
- && !data -> client_forward_changesets )
326
- {
327
- ereport (DEBUG1 ,
328
- (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
329
- errmsg ("Cannot disable changeset forwarding on PostgreSQL 9.4" )));
330
- }
331
- }
332
- else if (data -> client_forward_changesets_set
333
- && data -> client_forward_changesets )
334
- {
335
- /* Client explicitly asked for forwarding; forward csets and origins */
336
- data -> forward_changesets = true;
337
- data -> forward_changeset_origins = true;
338
- }
339
328
else
340
- {
341
- /* Default to not forwarding or honour client's request not to fwd */
342
- data -> forward_changesets = false;
343
- data -> forward_changeset_origins = false;
344
- }
329
+ data -> forward_changeset_origins = true;
345
330
346
- if (data -> hooks_setup_funcname != NIL || data -> api -> setup_hooks )
331
+ if (data -> hooks_setup_funcname != NIL )
347
332
{
348
333
349
334
data -> hooks_mctxt = AllocSetContextCreate (ctx -> context ,
@@ -355,6 +340,43 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
355
340
load_hooks (data );
356
341
call_startup_hook (data , ctx -> output_plugin_options );
357
342
}
343
+
344
+ if (data -> client_relmeta_cache_size < -1 )
345
+ {
346
+ ereport (ERROR ,
347
+ (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
348
+ errmsg ("relmeta_cache_size must be -1, 0, or positive" )));
349
+ }
350
+
351
+ /*
352
+ * Relation metadata cache configuration.
353
+ *
354
+ * TODO: support fixed size cache
355
+ *
356
+ * Need a LRU for eviction, and need to implement a new message type for
357
+ * cache purge notifications for clients. In the mean time force it to 0
358
+ * (off). The client will be told via a startup param and must respect
359
+ * that.
360
+ */
361
+ if (data -> client_relmeta_cache_size != 0
362
+ && data -> client_relmeta_cache_size != -1 )
363
+ {
364
+ ereport (INFO ,
365
+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
366
+ errmsg ("fixed size cache not supported, forced to off" ),
367
+ errdetail ("only relmeta_cache_size=0 (off) or relmeta_cache_size=-1 (unlimited) supported" )));
368
+
369
+ data -> relmeta_cache_size = 0 ;
370
+ }
371
+ else
372
+ {
373
+ /* ack client request */
374
+ data -> relmeta_cache_size = data -> client_relmeta_cache_size ;
375
+ }
376
+
377
+ /* if cache enabled, init it */
378
+ if (data -> relmeta_cache_size != 0 )
379
+ pglogical_init_relmetacache (ctx -> context );
358
380
}
359
381
}
360
382
@@ -370,12 +392,15 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
370
392
if (!startup_message_sent )
371
393
send_startup_message (ctx , data , false /* can't be last message */ );
372
394
395
+ #ifdef HAVE_REPLICATION_ORIGINS
373
396
/* If the record didn't originate locally, send origin info */
374
397
send_replication_origin &= txn -> origin_id != InvalidRepOriginId ;
398
+ #endif
375
399
376
400
OutputPluginPrepareWrite (ctx , !send_replication_origin );
377
401
data -> api -> write_begin (ctx -> out , data , txn );
378
402
403
+ #ifdef HAVE_REPLICATION_ORIGINS
379
404
if (send_replication_origin )
380
405
{
381
406
char * origin ;
@@ -397,6 +422,7 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
397
422
replorigin_by_oid (txn -> origin_id , true, & origin ))
398
423
data -> api -> write_origin (ctx -> out , origin , txn -> origin_lsn );
399
424
}
425
+ #endif
400
426
401
427
OutputPluginWrite (ctx , true);
402
428
}
@@ -421,6 +447,8 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
421
447
{
422
448
PGLogicalOutputData * data = ctx -> output_plugin_private ;
423
449
MemoryContext old ;
450
+ struct PGLRelMetaCacheEntry * cached_relmeta = NULL ;
451
+
424
452
425
453
/* First check the table filter */
426
454
if (!call_row_filter_hook (data , txn , relation , change ))
@@ -429,11 +457,18 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
429
457
/* Avoid leaking memory by using and resetting our own context */
430
458
old = MemoryContextSwitchTo (data -> context );
431
459
432
- /* TODO: add caching (send only if changed) */
433
- if (data -> api -> write_rel )
460
+ /*
461
+ * If the protocol wants to write relation information and the client
462
+ * isn't known to have metadata cached for this relation already,
463
+ * send relation metadata.
464
+ *
465
+ * TODO: track hit/miss stats
466
+ */
467
+ if (data -> api -> write_rel != NULL &&
468
+ !pglogical_cache_relmeta (data , relation , & cached_relmeta ))
434
469
{
435
470
OutputPluginPrepareWrite (ctx , false);
436
- data -> api -> write_rel (ctx -> out , data , relation );
471
+ data -> api -> write_rel (ctx -> out , data , relation , cached_relmeta );
437
472
OutputPluginWrite (ctx , false);
438
473
}
439
474
@@ -477,28 +512,22 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
477
512
MemoryContextReset (data -> context );
478
513
}
479
514
515
+ #ifdef HAVE_REPLICATION_ORIGINS
480
516
/*
481
517
* Decide if the whole transaction with specific origin should be filtered out.
482
518
*/
483
- extern int MtmReplicationNodeId ;
484
-
485
519
static bool
486
520
pg_decode_origin_filter (LogicalDecodingContext * ctx ,
487
521
RepOriginId origin_id )
488
522
{
489
523
PGLogicalOutputData * data = ctx -> output_plugin_private ;
490
524
491
- if (!call_txn_filter_hook (data , origin_id )) {
492
- return true;
493
- }
494
-
495
- if (!data -> forward_changesets && origin_id != InvalidRepOriginId ) {
496
- * (int * )0 = 0 ;
525
+ if (!call_txn_filter_hook (data , origin_id ))
497
526
return true;
498
- }
499
527
500
528
return false;
501
529
}
530
+ #endif
502
531
503
532
static void
504
533
send_startup_message (LogicalDecodingContext * ctx ,
@@ -532,9 +561,10 @@ static void pg_decode_shutdown(LogicalDecodingContext * ctx)
532
561
533
562
call_shutdown_hook (data );
534
563
535
- if (data -> hooks_mctxt != NULL )
536
- {
537
- MemoryContextDelete (data -> hooks_mctxt );
538
- data -> hooks_mctxt = NULL ;
539
- }
564
+ pglogical_destroy_relmetacache ();
565
+
566
+ /*
567
+ * no need to delete data->context or data->hooks_mctxt as they're children
568
+ * of ctx->context which will expire on return.
569
+ */
540
570
}
0 commit comments