@@ -174,10 +174,8 @@ static void RemoveGXact(GlobalTransaction gxact);
174
174
175
175
static char twophase_buf [10 * 1024 ];
176
176
static int twophase_pos = 0 ;
177
- size_t bogus_write (int fd , char * buf , size_t nbytes );
178
-
177
+ size_t bogus_write (int fd , const void * buf , size_t nbytes );
179
178
static char * XlogReadTwoPhaseData (XLogRecPtr lsn );
180
- // LWLock *xlogreclock;
181
179
182
180
/*
183
181
* Initialization of shared memory
@@ -997,6 +995,8 @@ StartPrepare(GlobalTransaction gxact)
997
995
998
996
save_state_data (& hdr , sizeof (TwoPhaseFileHeader ));
999
997
998
+ // fprintf(stderr, "StartPrepare: %s=(%d,%d,%d,%d)\n", hdr.gid, hdr.nsubxacts, hdr.ncommitrels, hdr.nabortrels, hdr.ninvalmsgs);
999
+
1000
1000
/*
1001
1001
* Add the additional info about subxacts, deletable files and cache
1002
1002
* invalidation messages.
@@ -1033,13 +1033,13 @@ StartPrepare(GlobalTransaction gxact)
1033
1033
void
1034
1034
EndPrepare (GlobalTransaction gxact )
1035
1035
{
1036
- PGXACT * pgxact = & ProcGlobal -> allPgXact [gxact -> pgprocno ];
1037
- TransactionId xid = pgxact -> xid ;
1036
+ // PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
1037
+ // TransactionId xid = pgxact->xid;
1038
1038
TwoPhaseFileHeader * hdr ;
1039
1039
char path [MAXPGPATH ];
1040
1040
StateFileChunk * record ;
1041
1041
pg_crc32c statefile_crc ;
1042
- pg_crc32c bogus_crc ;
1042
+ // pg_crc32c bogus_crc;
1043
1043
int fd ;
1044
1044
1045
1045
/* Add the end sentinel to the list of 2PC records */
@@ -1144,26 +1144,13 @@ EndPrepare(GlobalTransaction gxact)
1144
1144
MyPgXact -> delayChkpt = true;
1145
1145
1146
1146
XLogBeginInsert ();
1147
-
1148
1147
for (record = records .head ; record != NULL ; record = record -> next )
1149
1148
XLogRegisterData (record -> data , record -> len );
1150
-
1151
- // LWLockAcquire(xlogreclock, LW_EXCLUSIVE);
1152
- LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
1153
- gxact -> prepare_xlogptr = GetXLogInsertRecPtr ();
1154
1149
gxact -> prepare_lsn = XLogInsert (RM_XACT_ID , XLOG_XACT_PREPARE );
1155
- LWLockRelease (TwoPhaseStateLock );
1156
- // LWLockRelease(xlogreclock);
1157
-
1158
-
1159
1150
XLogFlush (gxact -> prepare_lsn );
1151
+ gxact -> prepare_xlogptr = ProcLastRecPtr ;
1160
1152
1161
-
1162
- // fprintf(stderr, "WAL %s->prepare_xlogptr = %X/%X \n",
1163
- // gxact->gid, (uint32) (gxact->prepare_xlogptr >> 32), (uint32) (gxact->prepare_xlogptr));
1164
- // fprintf(stderr, "WAL %s->prepare_lsn = %X/%X \n",
1165
- // gxact->gid, (uint32) (gxact->prepare_lsn >> 32), (uint32) (gxact->prepare_lsn));
1166
-
1153
+ // fprintf(stderr, "EndPrepare: %s={xlogptr:%X,lsn:%X, delta: %X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
1167
1154
1168
1155
/* If we crash now, we have prepared: WAL replay will fix things */
1169
1156
@@ -1250,101 +1237,100 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1250
1237
static char *
1251
1238
ReadTwoPhaseFile (TransactionId xid , bool give_warnings )
1252
1239
{
1253
- // char path[MAXPGPATH];
1254
- // char *buf;
1255
- // TwoPhaseFileHeader *hdr;
1256
- // int fd;
1257
- // struct stat stat;
1258
- // uint32 crc_offset;
1259
- // pg_crc32c calc_crc,
1260
- // file_crc;
1261
-
1262
- // TwoPhaseFilePath(path, xid);
1240
+ char path [MAXPGPATH ];
1241
+ char * buf ;
1242
+ TwoPhaseFileHeader * hdr ;
1243
+ int fd ;
1244
+ struct stat stat ;
1245
+ uint32 crc_offset ;
1246
+ pg_crc32c calc_crc ,
1247
+ file_crc ;
1263
1248
1264
- // fd = OpenTransientFile (path, O_RDONLY | PG_BINARY, 0 );
1249
+ TwoPhaseFilePath (path , xid );
1265
1250
1266
- // if (fd < 0)
1267
- // {
1268
- // if (give_warnings)
1269
- // ereport(WARNING,
1270
- // (errcode_for_file_access(),
1271
- // errmsg("could not open two-phase state file \"%s\": %m",
1272
- // path)));
1273
- // return NULL;
1274
- // }
1251
+ fd = OpenTransientFile (path , O_RDONLY | PG_BINARY , 0 );
1252
+ if (fd < 0 )
1253
+ {
1254
+ if (give_warnings )
1255
+ ereport (WARNING ,
1256
+ (errcode_for_file_access (),
1257
+ errmsg ("could not open two-phase state file \"%s\": %m" ,
1258
+ path )));
1259
+ return NULL ;
1260
+ }
1275
1261
1276
1262
/*
1277
1263
* Check file length. We can determine a lower bound pretty easily. We
1278
1264
* set an upper bound to avoid palloc() failure on a corrupt file, though
1279
1265
* we can't guarantee that we won't get an out of memory error anyway,
1280
1266
* even on a valid file.
1281
1267
*/
1282
- // if (fstat(fd, &stat))
1283
- // {
1284
- // CloseTransientFile(fd);
1285
- // if (give_warnings)
1286
- // ereport(WARNING,
1287
- // (errcode_for_file_access(),
1288
- // errmsg("could not stat two-phase state file \"%s\": %m",
1289
- // path)));
1290
- // return NULL;
1291
- // }
1268
+ if (fstat (fd , & stat ))
1269
+ {
1270
+ CloseTransientFile (fd );
1271
+ if (give_warnings )
1272
+ ereport (WARNING ,
1273
+ (errcode_for_file_access (),
1274
+ errmsg ("could not stat two-phase state file \"%s\": %m" ,
1275
+ path )));
1276
+ return NULL ;
1277
+ }
1292
1278
1293
- // if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1294
- // MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1295
- // sizeof(pg_crc32c)) ||
1296
- // stat.st_size > MaxAllocSize)
1297
- // {
1298
- // CloseTransientFile(fd);
1299
- // return NULL;
1300
- // }
1279
+ if (stat .st_size < (MAXALIGN (sizeof (TwoPhaseFileHeader )) +
1280
+ MAXALIGN (sizeof (TwoPhaseRecordOnDisk )) +
1281
+ sizeof (pg_crc32c )) ||
1282
+ stat .st_size > MaxAllocSize )
1283
+ {
1284
+ CloseTransientFile (fd );
1285
+ return NULL ;
1286
+ }
1301
1287
1302
- // crc_offset = stat.st_size - sizeof(pg_crc32c);
1303
- // if (crc_offset != MAXALIGN(crc_offset))
1304
- // {
1305
- // CloseTransientFile(fd);
1306
- // return NULL;
1307
- // }
1288
+ crc_offset = stat .st_size - sizeof (pg_crc32c );
1289
+ if (crc_offset != MAXALIGN (crc_offset ))
1290
+ {
1291
+ CloseTransientFile (fd );
1292
+ return NULL ;
1293
+ }
1308
1294
1309
- // / *
1310
- // * OK, slurp in the file.
1311
- // */
1312
- // buf = (char *) palloc(stat.st_size);
1295
+ /*
1296
+ * OK, slurp in the file.
1297
+ */
1298
+ buf = (char * ) palloc (stat .st_size );
1313
1299
1314
- // if (read(fd, buf, stat.st_size) != stat.st_size)
1315
- // {
1316
- // CloseTransientFile(fd);
1317
- // if (give_warnings)
1318
- // ereport(WARNING,
1319
- // (errcode_for_file_access(),
1320
- // errmsg("could not read two-phase state file \"%s\": %m",
1321
- // path)));
1322
- // pfree(buf);
1323
- // return NULL;
1324
- // }
1300
+ if (read (fd , buf , stat .st_size ) != stat .st_size )
1301
+ {
1302
+ CloseTransientFile (fd );
1303
+ if (give_warnings )
1304
+ ereport (WARNING ,
1305
+ (errcode_for_file_access (),
1306
+ errmsg ("could not read two-phase state file \"%s\": %m" ,
1307
+ path )));
1308
+ pfree (buf );
1309
+ return NULL ;
1310
+ }
1325
1311
1326
- // CloseTransientFile(fd);
1312
+ CloseTransientFile (fd );
1327
1313
1328
- // hdr = (TwoPhaseFileHeader *) buf;
1329
- // if (hdr->magic != TWOPHASE_MAGIC || hdr->total_len != stat.st_size)
1330
- // {
1331
- // pfree(buf);
1332
- // return NULL;
1333
- // }
1314
+ hdr = (TwoPhaseFileHeader * ) buf ;
1315
+ if (hdr -> magic != TWOPHASE_MAGIC || hdr -> total_len != stat .st_size )
1316
+ {
1317
+ pfree (buf );
1318
+ return NULL ;
1319
+ }
1334
1320
1335
- // INIT_CRC32C(calc_crc);
1336
- // COMP_CRC32C(calc_crc, buf, crc_offset);
1337
- // FIN_CRC32C(calc_crc);
1321
+ INIT_CRC32C (calc_crc );
1322
+ COMP_CRC32C (calc_crc , buf , crc_offset );
1323
+ FIN_CRC32C (calc_crc );
1338
1324
1339
- // file_crc = *((pg_crc32c *) (buf + crc_offset));
1325
+ file_crc = * ((pg_crc32c * ) (buf + crc_offset ));
1340
1326
1341
- // if (!EQ_CRC32C(calc_crc, file_crc))
1342
- // {
1343
- // pfree(buf);
1344
- // return NULL;
1345
- // }
1327
+ if (!EQ_CRC32C (calc_crc , file_crc ))
1328
+ {
1329
+ pfree (buf );
1330
+ return NULL ;
1331
+ }
1346
1332
1347
- return twophase_buf ;
1333
+ return buf ;
1348
1334
}
1349
1335
1350
1336
/*
@@ -1410,12 +1396,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1410
1396
* Read and validate the state file
1411
1397
*/
1412
1398
// buf = ReadTwoPhaseFile(xid, true);
1399
+ // buf = twophase_buf;
1413
1400
buf = XlogReadTwoPhaseData (gxact -> prepare_xlogptr );
1414
- if (buf == NULL )
1415
- ereport (ERROR ,
1416
- (errcode (ERRCODE_DATA_CORRUPTED ),
1417
- errmsg ("two-phase state file for transaction %u is corrupt" ,
1418
- xid )));
1419
1401
1420
1402
/*
1421
1403
* Disassemble the header area
@@ -1435,6 +1417,15 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1435
1417
/* compute latestXid among all children */
1436
1418
latestXid = TransactionIdLatest (xid , hdr -> nsubxacts , children );
1437
1419
1420
+
1421
+ // fprintf(stderr, "FinishPrepared: %s=(%d,%d,%d,%d)\n", gxact->gid, hdr->nsubxacts, hdr->ncommitrels, hdr->nabortrels, hdr->ninvalmsgs);
1422
+ // fprintf(stderr, "FinishPrepared: %s={xlogptr:%X,lsn:%X,delta:%X}\n", gxact->gid, gxact->prepare_xlogptr, gxact->prepare_lsn, gxact->prepare_lsn - gxact->prepare_xlogptr);
1423
+
1424
+ Assert (hdr -> nsubxacts == 0 );
1425
+ Assert (hdr -> ncommitrels == 0 );
1426
+ Assert (hdr -> nabortrels == 0 );
1427
+ Assert (hdr -> ninvalmsgs == 0 );
1428
+
1438
1429
/*
1439
1430
* The order of operations here is critical: make the XLOG entry for
1440
1431
* commit or abort, then mark the transaction committed or aborted in
@@ -2246,30 +2237,11 @@ RecordTransactionAbortPrepared(TransactionId xid,
2246
2237
SyncRepWaitForLSN (recptr );
2247
2238
}
2248
2239
2249
-
2250
-
2251
-
2252
-
2253
-
2254
-
2255
-
2256
-
2257
2240
/**********************************************************************************/
2258
2241
2259
2242
2260
- // static int xlogreadfd = -1;
2261
- // static XLogSegNo xlogreadsegno = -1;
2262
- // static char xlogfpath[MAXPGPATH];
2263
-
2264
- // typedef struct XLogPageReadPrivate
2265
- // {
2266
- // const char *datadir;
2267
- // TimeLineID tli;
2268
- // } XLogPageReadPrivate;
2269
-
2270
-
2271
- size_t
2272
- bogus_write (int fd , char * buf , size_t nbytes )
2243
+ size_t
2244
+ bogus_write (int fd , const void * buf , size_t nbytes )
2273
2245
{
2274
2246
memcpy (twophase_buf + twophase_pos , buf , nbytes );
2275
2247
twophase_pos += nbytes ;
@@ -2284,8 +2256,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn)
2284
2256
XLogReaderState * xlogreader ;
2285
2257
char * errormsg ;
2286
2258
2287
- fprintf (stderr , "XlogReadTwoPhaseData called\n" );
2288
-
2289
2259
xlogreader = XLogReaderAllocate (& logical_read_local_xlog_page , NULL );
2290
2260
if (xlogreader == NULL )
2291
2261
fprintf (stderr , "xlogreader == NULL\n" );
@@ -2296,20 +2266,5 @@ XlogReadTwoPhaseData(XLogRecPtr lsn)
2296
2266
fprintf (stderr , "XLogReadRecord error\n" );
2297
2267
}
2298
2268
2299
- // memcpy(twophase_buf + twophase_pos, buf, nbytes);
2300
- // twophase_pos += nbytes;
2301
- // return nbytes;
2302
-
2303
- // XLogReaderFree(xlogreader);
2304
- // if (xlogreadfd != -1)
2305
- // {
2306
- // close(xlogreadfd);
2307
- // xlogreadfd = -1;
2308
- // }
2309
-
2310
2269
return XLogRecGetData (xlogreader );
2311
2270
}
2312
-
2313
-
2314
-
2315
-
0 commit comments