Skip to content

Commit cd0a6ee

Browse files
committed
Fix raftable_wrapper
1 parent 2e2c464 commit cd0a6ee

File tree

4 files changed

+24
-38
lines changed

4 files changed

+24
-38
lines changed

contrib/mmts/multimaster.c

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2354,7 +2354,7 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
23542354
ByteBufferFree(&buf);
23552355
for (i = 0; i < MtmNodes; i++) {
23562356
if (i+1 != MtmNodeId && !BIT_CHECK(Mtm->disabledNodeMask, i)) {
2357-
int size;
2357+
size_t size;
23582358
void* data = RaftableGet(psprintf("lock-graph-%d", i+1), &size, NULL, true);
23592359
if (data == NULL) {
23602360
return true; /* If using Raftable is disabled */
@@ -2369,31 +2369,3 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
23692369
}
23702370
return hasDeadlock;
23712371
}
2372-
2373-
void* RaftableGet(char const* key, int* size, RaftableTimestamp* ts, bool nowait)
2374-
{
2375-
size_t s;
2376-
char *value;
2377-
2378-
if (!MtmUseRaftable) return NULL;
2379-
2380-
Assert(ts == NULL); /* not implemented */
2381-
value = raftable_get(key, &s);
2382-
*size = s;
2383-
return value;
2384-
}
2385-
2386-
void RaftableSet(char const* key, void const* value, int size, bool nowait)
2387-
{
2388-
if (!MtmUseRaftable) return;
2389-
2390-
raftable_set(key, value, size, nowait ? 0 : -1);
2391-
}
2392-
2393-
bool RaftableCAS(char const* key, char const* value, bool nowait)
2394-
{
2395-
if (!MtmUseRaftable) return false;
2396-
2397-
Assert(false); /* not implemented */
2398-
return false;
2399-
}

contrib/mmts/pglogical_output.c

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -480,17 +480,23 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
480480
/*
481481
* Decide if the whole transaction with specific origin should be filtered out.
482482
*/
483+
extern int MtmReplicationNodeId;
484+
483485
static bool
484486
pg_decode_origin_filter(LogicalDecodingContext *ctx,
485487
RepOriginId origin_id)
486488
{
487489
PGLogicalOutputData *data = ctx->output_plugin_private;
488490

489-
if (!call_txn_filter_hook(data, origin_id))
491+
if (!call_txn_filter_hook(data, origin_id)) {
492+
elog(WARNING, "Record with origin %d is not sent to node %d", origin_id, MtmReplicationNodeId);
490493
return true;
494+
}
491495

492-
if (!data->forward_changesets && origin_id != InvalidRepOriginId)
496+
if (!data->forward_changesets && origin_id != InvalidRepOriginId) {
497+
*(int*)0 = 0;
493498
return true;
499+
}
494500

495501
return false;
496502
}

contrib/mmts/raftable.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,28 @@ static void RaftableResolve()
1919
/*
2020
* Raftable function proxies
2121
*/
22-
void* RaftableGet(char const* key, int* size, RaftableTimestamp* ts, bool nowait)
22+
void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool nowait)
2323
{
2424
if (!MtmUseRaftable) {
2525
return NULL;
2626
}
2727
RaftableResolve();
28-
return (*raftable_get_impl)(key, size, nowait ? 0 : -1);
28+
return (*raftable_get_impl)(key, size);
2929
}
3030

3131

32-
void RaftableSet(char const* key, void const* value, int size, bool nowait)
32+
void RaftableSet(char const* key, void const* value, size_t size, bool nowait)
3333
{
3434
if (MtmUseRaftable) {
3535
RaftableResolve();
3636
(*raftable_set_impl)(key, value, size, nowait ? 0 : -1);
3737
}
3838
}
39+
40+
bool RaftableCAS(char const* key, char const* value, bool nowait)
41+
{
42+
if (!MtmUseRaftable) return false;
43+
44+
Assert(false); /* not implemented */
45+
return false;
46+
}

contrib/mmts/raftable_wrapper.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ typedef struct RaftableTimestamp {
1515
* If "ts" is not NULL, then it is assigned timestamp of last update of this value
1616
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
1717
*/
18-
extern void* RaftableGet(char const* key, int* size, RaftableTimestamp* ts, bool nowait);
18+
extern void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool nowait);
1919

2020
/*
2121
* Set new value for the specified key. IF value is NULL, then key should be deleted.
2222
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
2323
*/
24-
extern void RaftableSet(char const* key, void const* value, int size, bool nowait);
24+
extern void RaftableSet(char const* key, void const* value, size_t size, bool nowait);
2525

2626
/*
2727
* If key doesn't exists or its value is not equal to the specified value then store this value and return true.
@@ -30,8 +30,8 @@ extern void RaftableSet(char const* key, void const* value, int size, bool nowa
3030
*/
3131
extern bool RaftableCAS(char const* key, char const* value, bool nowait);
3232

33-
typedef void* (*raftable_get_t)(char const* key, int* size, int timeout);
34-
typedef void (*raftable_set_t)(char const* key, void const* value, int size, int timeout);
33+
typedef void* (*raftable_get_t)(char const* key, size_t* size);
34+
typedef void (*raftable_set_t)(char const* key, void const* value, size_t size, int timeout_ms);
3535

3636
extern bool MtmUseRaftable;
3737

0 commit comments

Comments
 (0)