Skip to content

Commit 3884219

Browse files
committed
Fix MtmAdjustOldestXid
1 parent caa8b65 commit 3884219

File tree

3 files changed

+52
-30
lines changed

3 files changed

+52
-30
lines changed

contrib/mmts/multimaster.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -512,9 +512,16 @@ MtmAdjustOldestXid(TransactionId xid)
512512
MTM_LOG2("%d: MtmAdjustOldestXid(%d): snapshot=%ld, csn=%ld, status=%d", MyProcPid, xid, ts != NULL ? ts->snapshot : 0, ts != NULL ? ts->csn : 0, ts != NULL ? ts->status : -1);
513513
Mtm->gcCount = 0;
514514

515+
//return FirstNormalTransactionId;
516+
515517
if (ts != NULL) {
516518
oldestSnapshot = ts->snapshot;
517-
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
519+
Assert(oldestSnapshot != INVALID_CSN);
520+
if (Mtm->nodes[MtmNodeId-1].oldestSnapshot < oldestSnapshot) {
521+
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
522+
} else {
523+
oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
524+
}
518525
for (i = 0; i < Mtm->nAllNodes; i++) {
519526
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
520527
&& Mtm->nodes[i].oldestSnapshot < oldestSnapshot)
@@ -527,9 +534,11 @@ MtmAdjustOldestXid(TransactionId xid)
527534
for (ts = Mtm->transListHead;
528535
ts != NULL
529536
&& ts->csn < oldestSnapshot
530-
&& TransactionIdPrecedes(ts->xid, xid)
537+
&& TransactionIdPrecedes(ts->xid, xid);
538+
/*
531539
&& (ts->status == TRANSACTION_STATUS_COMMITTED ||
532540
ts->status == TRANSACTION_STATUS_ABORTED);
541+
*/
533542
prev = ts, ts = ts->next)
534543
{
535544
if (prev != NULL) {
@@ -542,9 +551,10 @@ MtmAdjustOldestXid(TransactionId xid)
542551
if (MtmUseDtm)
543552
{
544553
if (prev != NULL) {
554+
MTM_LOG1("%d: MtmAdjustOldestXid: oldestXid=%d, prev->xid=%d, prev->status=%d, prev->snapshot=%ld, ts->xid=%d, ts->status=%d, ts->snapshot=%ld, oldestSnapshot=%ld",
555+
MyProcPid, xid, prev->xid, prev->status, prev->snapshot, (ts ? ts->xid : 0), (ts ? ts->status : -1), (ts ? ts->snapshot : -1), oldestSnapshot);
545556
Mtm->transListHead = prev;
546557
Mtm->oldestXid = xid = prev->xid;
547-
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, oldestSnapshot=%ld", MyProcPid, xid, oldestSnapshot);
548558
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
549559
xid = Mtm->oldestXid;
550560
}

contrib/mmts/pglogical_output.c

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -377,30 +377,31 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
377377
send_replication_origin &= txn->origin_id != InvalidRepOriginId;
378378

379379
OutputPluginPrepareWrite(ctx, !send_replication_origin);
380-
data->api->write_begin(ctx->out, data, txn);
380+
if (data->api) {
381+
data->api->write_begin(ctx->out, data, txn);
381382

382-
if (send_replication_origin)
383-
{
384-
char *origin;
385-
386-
/* Message boundary */
387-
OutputPluginWrite(ctx, false);
388-
OutputPluginPrepareWrite(ctx, true);
389-
390-
/*
391-
* XXX: which behaviour we want here?
392-
*
393-
* Alternatives:
394-
* - don't send origin message if origin name not found
395-
* (that's what we do now)
396-
* - throw error - that will break replication, not good
397-
* - send some special "unknown" origin
398-
*/
399-
if (data->api->write_origin &&
400-
replorigin_by_oid(txn->origin_id, true, &origin))
383+
if (send_replication_origin)
384+
{
385+
char *origin;
386+
387+
/* Message boundary */
388+
OutputPluginWrite(ctx, false);
389+
OutputPluginPrepareWrite(ctx, true);
390+
391+
/*
392+
* XXX: which behaviour we want here?
393+
*
394+
* Alternatives:
395+
* - don't send origin message if origin name not found
396+
* (that's what we do now)
397+
* - throw error - that will break replication, not good
398+
* - send some special "unknown" origin
399+
*/
400+
if (data->api->write_origin &&
401+
replorigin_by_oid(txn->origin_id, true, &origin))
401402
data->api->write_origin(ctx->out, origin, txn->origin_lsn);
403+
}
402404
}
403-
404405
OutputPluginWrite(ctx, true);
405406
}
406407

@@ -414,7 +415,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
414415
PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
415416

416417
OutputPluginPrepareWrite(ctx, true);
417-
data->api->write_commit(ctx->out, data, txn, commit_lsn);
418+
if (data->api) {
419+
data->api->write_commit(ctx->out, data, txn, commit_lsn);
420+
}
418421
OutputPluginWrite(ctx, true);
419422
}
420423

@@ -426,7 +429,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
426429
MemoryContext old;
427430

428431
/* First check the table filter */
429-
if (!call_row_filter_hook(data, txn, relation, change))
432+
if (!call_row_filter_hook(data, txn, relation, change) || data->api == NULL)
430433
return;
431434

432435
/* Avoid leaking memory by using and resetting our own context */
@@ -520,7 +523,9 @@ send_startup_message(LogicalDecodingContext *ctx,
520523
*/
521524

522525
OutputPluginPrepareWrite(ctx, last_message);
523-
data->api->write_startup_message(ctx->out, msg);
526+
if (data->api) {
527+
data->api->write_startup_message(ctx->out, msg);
528+
}
524529
OutputPluginWrite(ctx, last_message);
525530

526531
pfree(msg);

contrib/mmts/tests2/lib/bank_client.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,13 @@ def check_total(self):
153153
def tx(conn, cur):
154154
cur.execute('select sum(amount) from bank_test')
155155
res = cur.fetchone()
156+
total = res[0]
157+
if total != 0:
158+
cur.execute('select mtm.get_snapshot()')
159+
res = cur.fetchone()
160+
print("Isolation error, total = %d, node = %d, snapshot = %d" % (total,self.node_id,res[0]))
161+
#raise BaseException
156162
conn.commit()
157-
if res[0] != 0:
158-
print("Isolation error, total = %d, node = %d" % (res[0],self.node_id))
159-
raise BaseException
160163

161164
self.exec_tx('total', tx)
162165

@@ -175,10 +178,14 @@ def tx(conn, cur):
175178
set amount = amount - %s
176179
where uid = %s''',
177180
(amount, from_uid))
181+
if (cur.rowcount != 1):
182+
raise BaseException
178183
cur.execute('''update bank_test
179184
set amount = amount + %s
180185
where uid = %s''',
181186
(amount, to_uid))
187+
if (cur.rowcount != 1):
188+
raise BaseException
182189
conn.commit()
183190

184191
self.exec_tx('transfer', tx)

0 commit comments

Comments
 (0)