Skip to content

Commit 44a13de

Browse files
knizhnikkelvich
authored andcommitted
Fix raftable_wrapper
1 parent b2225c9 commit 44a13de

File tree

4 files changed

+24
-38
lines changed

4 files changed

+24
-38
lines changed

multimaster.c

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2352,7 +2352,7 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
23522352
ByteBufferFree(&buf);
23532353
for (i = 0; i < MtmNodes; i++) {
23542354
if (i+1 != MtmNodeId && !BIT_CHECK(Mtm->disabledNodeMask, i)) {
2355-
int size;
2355+
size_t size;
23562356
void* data = RaftableGet(psprintf("lock-graph-%d", i+1), &size, NULL, true);
23572357
if (data == NULL) {
23582358
return true; /* If using Raftable is disabled */
@@ -2367,31 +2367,3 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
23672367
}
23682368
return hasDeadlock;
23692369
}
2370-
2371-
void* RaftableGet(char const* key, int* size, RaftableTimestamp* ts, bool nowait)
2372-
{
2373-
size_t s;
2374-
char *value;
2375-
2376-
if (!MtmUseRaftable) return NULL;
2377-
2378-
Assert(ts == NULL); /* not implemented */
2379-
value = raftable_get(key, &s);
2380-
*size = s;
2381-
return value;
2382-
}
2383-
2384-
void RaftableSet(char const* key, void const* value, int size, bool nowait)
2385-
{
2386-
if (!MtmUseRaftable) return;
2387-
2388-
raftable_set(key, value, size, nowait ? 0 : -1);
2389-
}
2390-
2391-
bool RaftableCAS(char const* key, char const* value, bool nowait)
2392-
{
2393-
if (!MtmUseRaftable) return false;
2394-
2395-
Assert(false); /* not implemented */
2396-
return false;
2397-
}

pglogical_output.c

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -480,17 +480,23 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
480480
/*
481481
* Decide if the whole transaction with specific origin should be filtered out.
482482
*/
483+
extern int MtmReplicationNodeId;
484+
483485
static bool
484486
pg_decode_origin_filter(LogicalDecodingContext *ctx,
485487
RepOriginId origin_id)
486488
{
487489
PGLogicalOutputData *data = ctx->output_plugin_private;
488490

489-
if (!call_txn_filter_hook(data, origin_id))
491+
if (!call_txn_filter_hook(data, origin_id)) {
492+
elog(WARNING, "Record with origin %d is not sent to node %d", origin_id, MtmReplicationNodeId);
490493
return true;
494+
}
491495

492-
if (!data->forward_changesets && origin_id != InvalidRepOriginId)
496+
if (!data->forward_changesets && origin_id != InvalidRepOriginId) {
497+
*(int*)0 = 0;
493498
return true;
499+
}
494500

495501
return false;
496502
}

raftable.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,28 @@ static void RaftableResolve()
1919
/*
2020
* Raftable function proxies
2121
*/
22-
void* RaftableGet(char const* key, int* size, RaftableTimestamp* ts, bool nowait)
22+
void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool nowait)
2323
{
2424
if (!MtmUseRaftable) {
2525
return NULL;
2626
}
2727
RaftableResolve();
28-
return (*raftable_get_impl)(key, size, nowait ? 0 : -1);
28+
return (*raftable_get_impl)(key, size);
2929
}
3030

3131

32-
void RaftableSet(char const* key, void const* value, int size, bool nowait)
32+
void RaftableSet(char const* key, void const* value, size_t size, bool nowait)
3333
{
3434
if (MtmUseRaftable) {
3535
RaftableResolve();
3636
(*raftable_set_impl)(key, value, size, nowait ? 0 : -1);
3737
}
3838
}
39+
40+
bool RaftableCAS(char const* key, char const* value, bool nowait)
41+
{
42+
if (!MtmUseRaftable) return false;
43+
44+
Assert(false); /* not implemented */
45+
return false;
46+
}

raftable_wrapper.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ typedef struct RaftableTimestamp {
1515
* If "ts" is not NULL, then it is assigned timestamp of last update of this value
1616
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
1717
*/
18-
extern void* RaftableGet(char const* key, int* size, RaftableTimestamp* ts, bool nowait);
18+
extern void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool nowait);
1919

2020
/*
2121
* Set new value for the specified key. IF value is NULL, then key should be deleted.
2222
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
2323
*/
24-
extern void RaftableSet(char const* key, void const* value, int size, bool nowait);
24+
extern void RaftableSet(char const* key, void const* value, size_t size, bool nowait);
2525

2626
/*
2727
* If key doesn't exists or its value is not equal to the specified value then store this value and return true.
@@ -30,8 +30,8 @@ extern void RaftableSet(char const* key, void const* value, int size, bool nowa
3030
*/
3131
extern bool RaftableCAS(char const* key, char const* value, bool nowait);
3232

33-
typedef void* (*raftable_get_t)(char const* key, int* size, int timeout);
34-
typedef void (*raftable_set_t)(char const* key, void const* value, int size, int timeout);
33+
typedef void* (*raftable_get_t)(char const* key, size_t* size);
34+
typedef void (*raftable_set_t)(char const* key, void const* value, size_t size, int timeout_ms);
3535

3636
extern bool MtmUseRaftable;
3737

0 commit comments

Comments
 (0)