62
62
#include "replication/origin.h"
63
63
#include "replication/syncrep.h"
64
64
#include "replication/walsender.h"
65
+ #include "replication/logicalfuncs.h"
65
66
#include "storage/fd.h"
66
67
#include "storage/ipc.h"
67
68
#include "storage/predicate.h"
@@ -174,6 +175,8 @@ static void RemoveGXact(GlobalTransaction gxact);
174
175
static char twophase_buf [10 * 1024 ];
175
176
static int twophase_pos = 0 ;
176
177
size_t bogus_write (int fd , char * buf , size_t nbytes );
178
+
179
+ static char * XlogReadTwoPhaseData (XLogRecPtr lsn );
177
180
// LWLock *xlogreclock;
178
181
179
182
/*
@@ -1156,8 +1159,10 @@ EndPrepare(GlobalTransaction gxact)
1156
1159
XLogFlush (gxact -> prepare_lsn );
1157
1160
1158
1161
1159
- fprintf (stderr , "WAL %s->prepare_xlogptr = %X/%X \n" , gxact -> gid , (uint32 ) (gxact -> prepare_xlogptr >> 32 ), (uint32 ) (gxact -> prepare_xlogptr ));
1160
- fprintf (stderr , "WAL %s->prepare_lsn = %X/%X \n" , gxact -> gid , (uint32 ) (gxact -> prepare_lsn >> 32 ), (uint32 ) (gxact -> prepare_lsn ));
1162
+ // fprintf(stderr, "WAL %s->prepare_xlogptr = %X/%X \n",
1163
+ // gxact->gid, (uint32) (gxact->prepare_xlogptr >> 32), (uint32) (gxact->prepare_xlogptr));
1164
+ // fprintf(stderr, "WAL %s->prepare_lsn = %X/%X \n",
1165
+ // gxact->gid, (uint32) (gxact->prepare_lsn >> 32), (uint32) (gxact->prepare_lsn));
1161
1166
1162
1167
1163
1168
/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1404,7 +1409,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1404
1409
/*
1405
1410
* Read and validate the state file
1406
1411
*/
1407
- buf = ReadTwoPhaseFile (xid , true);
1412
+ // buf = ReadTwoPhaseFile(xid, true);
1413
+ buf = XlogReadTwoPhaseData (gxact -> prepare_xlogptr );
1408
1414
if (buf == NULL )
1409
1415
ereport (ERROR ,
1410
1416
(errcode (ERRCODE_DATA_CORRUPTED ),
@@ -2251,15 +2257,16 @@ RecordTransactionAbortPrepared(TransactionId xid,
2251
2257
/**********************************************************************************/
2252
2258
2253
2259
2254
- static int xlogreadfd = -1 ;
2255
- static XLogSegNo xlogreadsegno = -1 ;
2256
- static char xlogfpath [MAXPGPATH ];
2260
+ // static int xlogreadfd = -1;
2261
+ // static XLogSegNo xlogreadsegno = -1;
2262
+ // static char xlogfpath[MAXPGPATH];
2263
+
2264
+ // typedef struct XLogPageReadPrivate
2265
+ // {
2266
+ // const char *datadir;
2267
+ // TimeLineID tli;
2268
+ // } XLogPageReadPrivate;
2257
2269
2258
- typedef struct XLogPageReadPrivate
2259
- {
2260
- const char * datadir ;
2261
- TimeLineID tli ;
2262
- } XLogPageReadPrivate ;
2263
2270
2264
2271
size_t
2265
2272
bogus_write (int fd , char * buf , size_t nbytes )
@@ -2270,165 +2277,39 @@ bogus_write(int fd, char *buf, size_t nbytes)
2270
2277
}
2271
2278
2272
2279
2273
- static int SimpleXLogPageRead (XLogReaderState * xlogreader ,
2274
- XLogRecPtr targetPagePtr ,
2275
- int reqLen , XLogRecPtr targetRecPtr , char * readBuf ,
2276
- TimeLineID * pageTLI );
2277
-
2278
-
2279
- /* XLogreader callback function, to read a WAL page */
2280
- static int
2281
- SimpleXLogPageRead (XLogReaderState * xlogreader , XLogRecPtr targetPagePtr ,
2282
- int reqLen , XLogRecPtr targetRecPtr , char * readBuf ,
2283
- TimeLineID * pageTLI )
2280
+ static char *
2281
+ XlogReadTwoPhaseData (XLogRecPtr lsn )
2284
2282
{
2285
- XLogPageReadPrivate * private = ( XLogPageReadPrivate * ) xlogreader -> private_data ;
2286
- uint32 targetPageOff ;
2287
- XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY ;
2283
+ XLogRecord * record ;
2284
+ XLogReaderState * xlogreader ;
2285
+ char * errormsg ;
2288
2286
2289
- XLByteToSeg (targetPagePtr , targetSegNo );
2290
- targetPageOff = targetPagePtr % XLogSegSize ;
2287
+ fprintf (stderr , "XlogReadTwoPhaseData called\n" );
2291
2288
2292
- /*
2293
- * See if we need to switch to a new segment because the requested record
2294
- * is not in the currently open one.
2295
- */
2296
- if (xlogreadfd >= 0 && !XLByteInSeg (targetPagePtr , xlogreadsegno ))
2297
- {
2298
- close (xlogreadfd );
2299
- xlogreadfd = -1 ;
2300
- }
2289
+ xlogreader = XLogReaderAllocate (& logical_read_local_xlog_page , NULL );
2290
+ if (xlogreader == NULL )
2291
+ fprintf (stderr , "xlogreader == NULL\n" );
2301
2292
2302
- XLByteToSeg (targetPagePtr , xlogreadsegno );
2303
-
2304
- if (xlogreadfd < 0 )
2293
+ record = XLogReadRecord (xlogreader , lsn , & errormsg );
2294
+ if (record == NULL )
2305
2295
{
2306
- char xlogfname [MAXFNAMELEN ];
2307
-
2308
- XLogFileName (xlogfname , private -> tli , xlogreadsegno );
2309
-
2310
- snprintf (xlogfpath , MAXPGPATH , "%s/" XLOGDIR "/%s" , private -> datadir , xlogfname );
2311
-
2312
- xlogreadfd = open (xlogfpath , O_RDONLY | PG_BINARY , 0 );
2313
-
2314
- if (xlogreadfd < 0 )
2315
- {
2316
- printf (_ ("could not open file \"%s\": %s\n" ), xlogfpath ,
2317
- strerror (errno ));
2318
- return -1 ;
2319
- }
2296
+ fprintf (stderr , "XLogReadRecord error\n" );
2320
2297
}
2321
2298
2322
- /*
2323
- * At this point, we have the right segment open.
2324
- */
2325
- Assert (xlogreadfd != -1 );
2299
+ // memcpy(twophase_buf + twophase_pos, buf, nbytes);
2300
+ // twophase_pos += nbytes;
2301
+ // return nbytes;
2326
2302
2327
- /* Read the requested page */
2328
- if (lseek (xlogreadfd , (off_t ) targetPageOff , SEEK_SET ) < 0 )
2329
- {
2330
- printf (_ ("could not seek in file \"%s\": %s\n" ), xlogfpath ,
2331
- strerror (errno ));
2332
- return -1 ;
2333
- }
2334
-
2335
- if (read (xlogreadfd , readBuf , XLOG_BLCKSZ ) != XLOG_BLCKSZ )
2336
- {
2337
- printf (_ ("could not read from file \"%s\": %s\n" ), xlogfpath ,
2338
- strerror (errno ));
2339
- return -1 ;
2340
- }
2341
-
2342
- Assert (targetSegNo == xlogreadsegno );
2303
+ // XLogReaderFree(xlogreader);
2304
+ // if (xlogreadfd != -1)
2305
+ // {
2306
+ // close(xlogreadfd);
2307
+ // xlogreadfd = -1;
2308
+ // }
2343
2309
2344
- * pageTLI = private -> tli ;
2345
- return XLOG_BLCKSZ ;
2310
+ return XLogRecGetData (xlogreader );
2346
2311
}
2347
2312
2348
- // XLogRecPtr
2349
- // readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli);
2350
-
2351
- // XLogRecPtr
2352
- // readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli)
2353
- // {
2354
- // XLogRecord *record;
2355
- // XLogReaderState *xlogreader;
2356
- // char *errormsg;
2357
- // XLogPageReadPrivate private;
2358
- // XLogRecPtr endptr;
2359
-
2360
- // private.datadir = datadir;
2361
- // private.tli = tli;
2362
- // xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
2363
- // if (xlogreader == NULL)
2364
- // pg_fatal("out of memory\n");
2365
-
2366
- // record = XLogReadRecord(xlogreader, ptr, &errormsg);
2367
- // if (record == NULL)
2368
- // {
2369
- // if (errormsg)
2370
- // pg_fatal("could not read WAL record at %X/%X: %s\n",
2371
- // (uint32) (ptr >> 32), (uint32) (ptr), errormsg);
2372
- // else
2373
- // pg_fatal("could not read WAL record at %X/%X\n",
2374
- // (uint32) (ptr >> 32), (uint32) (ptr));
2375
- // }
2376
- // endptr = xlogreader->EndRecPtr;
2377
-
2378
- // XLogReaderFree(xlogreader);
2379
- // if (xlogreadfd != -1)
2380
- // {
2381
- // close(xlogreadfd);
2382
- // xlogreadfd = -1;
2383
- // }
2384
-
2385
- // return endptr;
2386
- // }
2387
-
2388
-
2389
- // static char *
2390
- // XlogReadTwoPhaseData(XLogRecPtr lsn, bool give_warnings, TimeLineID tli)
2391
- // {
2392
- // XLogRecord *record;
2393
- // XLogReaderState *xlogreader;
2394
- // XLogPageReadPrivate private;
2395
-
2396
- // private.datadir = datadir;
2397
- // private.tli = tli;
2398
- // xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
2399
- // if (xlogreader == NULL)
2400
- // pg_fatal("out of memory\n");
2401
-
2402
- // record = XLogReadRecord(xlogreader, ptr, &errormsg);
2403
- // if (record == NULL)
2404
- // {
2405
- // if (errormsg)
2406
- // pg_fatal("could not read WAL record at %X/%X: %s\n",
2407
- // (uint32) (ptr >> 32), (uint32) (ptr), errormsg);
2408
- // else
2409
- // pg_fatal("could not read WAL record at %X/%X\n",
2410
- // (uint32) (ptr >> 32), (uint32) (ptr));
2411
- // }
2412
- // endptr = xlogreader->EndRecPtr;
2413
-
2414
- // XLogReaderFree(xlogreader);
2415
- // if (xlogreadfd != -1)
2416
- // {
2417
- // close(xlogreadfd);
2418
- // xlogreadfd = -1;
2419
- // }
2420
-
2421
- // return XLogRecGetData(record)
2422
- // }
2423
-
2424
-
2425
-
2426
-
2427
-
2428
-
2429
-
2430
-
2431
-
2432
2313
2433
2314
2434
2315
0 commit comments