@@ -9978,7 +9978,9 @@ Backup::execFSREMOVECONF(Signal* signal)
9978
9978
* This is recorded using the following variables in the LCP control file.
9979
9979
*
9980
9980
* MaxPartPairs:
9981
- * This is the number of parts used in LCPs.
9981
+ * This is the maximum number of LCPs that can constitute a recoverable
9982
+ * checkpoints. Thus an LCP control file can write at most this many
9983
+ * parts. Currently this number is set to 2048.
9982
9984
*
9983
9985
* NumPartPairs:
9984
9986
* This is the number of files used in the restore of this LCP, there is
@@ -10653,7 +10655,7 @@ Backup::execLCP_PREPARE_REQ(Signal* signal)
10653
10655
ptr.p ->m_first_fragment = true ;
10654
10656
ptr.p ->m_is_lcp_scan_active = false ;
10655
10657
ptr.p ->m_current_lcp_lsn = Uint64 (0 );
10656
- DEB_LCP ((" (%u)TAGS Start new LCP, id: %u" , instance (), req.backupId ));
10658
+ DEB_LCP_STAT ((" (%u)TAGS Start new LCP, id: %u" , instance (), req.backupId ));
10657
10659
LocalDeleteLcpFile_list queue (c_deleteLcpFilePool,
10658
10660
m_delete_lcp_file_head);
10659
10661
ndbrequire (queue.isEmpty ())
@@ -10784,7 +10786,7 @@ Backup::lcp_open_ctl_file_done(Signal* signal,
10784
10786
Uint32 mem_offset = Uint32 ((char *)pagePtr.p - (char *)c_startOfPages);
10785
10787
req->data .memoryAddress .memoryOffset = mem_offset;
10786
10788
req->data .memoryAddress .fileOffset = 0 ;
10787
- req->data .memoryAddress .size = BackupFormat::NDB_LCP_CTL_FILE_SIZE ;
10789
+ req->data .memoryAddress .size = BackupFormat::NDB_LCP_CTL_FILE_SIZE_BIG ;
10788
10790
10789
10791
sendSignal (NDBFS_REF, GSN_FSREADREQ, signal,
10790
10792
FsReadWriteReq::FixedLength + 3 , JBA);
@@ -10831,7 +10833,10 @@ Backup::execFSREADCONF(Signal *signal)
10831
10833
if (ptr.p ->deleteFilePtr == filePtr.i )
10832
10834
{
10833
10835
jam ();
10834
- ndbrequire (filePtr.p ->bytesRead == BackupFormat::NDB_LCP_CTL_FILE_SIZE);
10836
+ ndbrequire (filePtr.p ->bytesRead ==
10837
+ BackupFormat::NDB_LCP_CTL_FILE_SIZE_SMALL ||
10838
+ filePtr.p ->bytesRead ==
10839
+ BackupFormat::NDB_LCP_CTL_FILE_SIZE_BIG);
10835
10840
lcp_read_ctl_file_for_rewrite_done (signal, filePtr);
10836
10841
return ;
10837
10842
}
@@ -11163,7 +11168,8 @@ Backup::lcp_read_ctl_file(Page32Ptr pagePtr,
11163
11168
* This information is used to decide which header file to close (the most
11164
11169
* recent one) and which header file to use for the next LCP.
11165
11170
*/
11166
- ndbrequire (BackupFormat::NDB_LCP_CTL_FILE_SIZE == bytesRead);
11171
+ ndbrequire (BackupFormat::NDB_LCP_CTL_FILE_SIZE_SMALL == bytesRead ||
11172
+ BackupFormat::NDB_LCP_CTL_FILE_SIZE_BIG == bytesRead);
11167
11173
if (!convert_ctl_page_to_host (lcpCtlFilePtr))
11168
11174
{
11169
11175
jam ();
@@ -11179,24 +11185,41 @@ Backup::lcp_read_ctl_file(Page32Ptr pagePtr,
11179
11185
}
11180
11186
}
11181
11187
11188
+ /* *
11189
+ * We compress before writing LCP control and after reading it we will
11190
+ * decompress the part information. In compressed format we use 3 bytes
11191
+ * to store two numbers that can at most be 2048. In uncompressed
11192
+ * format each part is a 16-bit unsigned integer.
11193
+ */
11194
+ #define BYTES_PER_PART 3
11195
+ /* *
11196
+ * Define the LCP Control file header size, remove the one part pair
11197
+ * defined in the common header.
11198
+ */
11199
+ #define LCP_CTL_FILE_HEADER_SIZE (sizeof (BackupFormat::LCPCtlFile) - \
11200
+ sizeof (BackupFormat::PartPair))
11201
+
11182
11202
bool
11183
11203
Backup::convert_ctl_page_to_host (
11184
11204
struct BackupFormat ::LCPCtlFile *lcpCtlFilePtr)
11185
11205
{
11186
11206
Uint32 *pageData = (Uint32*)lcpCtlFilePtr;
11187
11207
Uint32 numPartPairs = ntohl (lcpCtlFilePtr->NumPartPairs );
11188
- Uint32 real_bytes_read = (sizeof (*lcpCtlFilePtr) +
11189
- (sizeof (struct BackupFormat ::PartPair) *
11190
- (numPartPairs - 1 )));
11208
+ Uint32 real_bytes_read = LCP_CTL_FILE_HEADER_SIZE +
11209
+ (BYTES_PER_PART * numPartPairs);
11191
11210
11192
- /* Checksum is calculated on network byte order */
11211
+ /* Checksum is calculated on compressed network byte order */
11193
11212
if (numPartPairs > BackupFormat::NDB_MAX_LCP_PARTS)
11194
11213
{
11195
11214
DEB_LCP ((" (%u)numPartPairs: %x" , instance (), numPartPairs));
11196
11215
ndbassert (false );
11197
11216
return false ;
11198
11217
}
11199
- Uint32 words = real_bytes_read / sizeof (Uint32);
11218
+ /* *
11219
+ * Add 3 to ensure that we get also the last word with anything not
11220
+ * equal to 0 when changing to word count.
11221
+ */
11222
+ Uint32 words = (real_bytes_read + 3 ) / sizeof (Uint32);
11200
11223
Uint32 chksum = 0 ;
11201
11224
for (Uint32 i = 0 ; i < words; i++)
11202
11225
{
@@ -11244,48 +11267,37 @@ Backup::convert_ctl_page_to_host(
11244
11267
lcpCtlFilePtr->MaxPartPairs = ntohl (lcpCtlFilePtr->MaxPartPairs );
11245
11268
lcpCtlFilePtr->NumPartPairs = ntohl (lcpCtlFilePtr->NumPartPairs );
11246
11269
11247
- ndbrequire (( 2 * BackupFormat::NDB_LCP_CTL_FILE_SIZE) >= real_bytes_read);
11270
+ ndbrequire (BackupFormat::NDB_LCP_CTL_FILE_SIZE_BIG >= real_bytes_read);
11248
11271
ndbrequire (lcpCtlFilePtr->fileHeader .FileType ==
11249
11272
BackupFormat::LCP_CTL_FILE);
11250
11273
ndbrequire (memcmp (BACKUP_MAGIC, lcpCtlFilePtr->fileHeader .Magic , 8 ) == 0 );
11251
11274
ndbrequire (lcpCtlFilePtr->NumPartPairs <= lcpCtlFilePtr->MaxPartPairs );
11252
11275
ndbrequire (lcpCtlFilePtr->NumPartPairs > 0 );
11253
11276
Uint32 total_parts;
11254
- if (lcpCtlFilePtr->fileHeader .BackupVersion == NDBD_USE_PARTIAL_LCP_v2)
11255
- {
11256
- lcpCtlFilePtr->RowCountLow = ntohl (lcpCtlFilePtr->RowCountLow );
11257
- lcpCtlFilePtr->RowCountHigh = ntohl (lcpCtlFilePtr->RowCountHigh );
11258
- total_parts = decompress_part_pairs (lcpCtlFilePtr,
11259
- lcpCtlFilePtr->NumPartPairs ,
11260
- &lcpCtlFilePtr->partPairs [0 ]);
11261
- }
11262
- else
11263
- {
11264
- struct BackupFormat ::LCPCtlFile *oldLcpCtlFilePtr =
11265
- (struct BackupFormat ::LCPCtlFile*)lcpCtlFilePtr;
11266
- lcpCtlFilePtr->RowCountLow = 0 ;
11267
- lcpCtlFilePtr->RowCountHigh = 0 ;
11268
- ndbrequire (lcpCtlFilePtr->fileHeader .BackupVersion ==
11269
- NDBD_USE_PARTIAL_LCP_v1);
11270
- total_parts = decompress_part_pairs (lcpCtlFilePtr,
11271
- lcpCtlFilePtr->NumPartPairs ,
11272
- &oldLcpCtlFilePtr->partPairs [0 ]);
11273
- }
11277
+ ndbrequire (lcpCtlFilePtr->fileHeader .BackupVersion >= NDBD_USE_PARTIAL_LCP_v2)
11278
+ lcpCtlFilePtr->RowCountLow = ntohl (lcpCtlFilePtr->RowCountLow );
11279
+ lcpCtlFilePtr->RowCountHigh = ntohl (lcpCtlFilePtr->RowCountHigh );
11280
+ total_parts = decompress_part_pairs (lcpCtlFilePtr,
11281
+ lcpCtlFilePtr->NumPartPairs ,
11282
+ &lcpCtlFilePtr->partPairs [0 ]);
11274
11283
ndbrequire (total_parts <= lcpCtlFilePtr->MaxPartPairs );
11275
11284
return true ;
11276
11285
}
11277
11286
11278
11287
void
11279
- Backup::convert_ctl_page_to_network (Uint32 *page)
11288
+ Backup::convert_ctl_page_to_network (Uint32 *page, Uint32 file_size )
11280
11289
{
11281
11290
struct BackupFormat ::LCPCtlFile *lcpCtlFilePtr =
11282
11291
(struct BackupFormat ::LCPCtlFile*)page;
11283
11292
Uint32 numPartPairs = lcpCtlFilePtr->NumPartPairs ;
11284
- Uint32 real_bytes_read = (sizeof (*lcpCtlFilePtr) +
11285
- (sizeof (struct BackupFormat ::PartPair) *
11286
- (numPartPairs - 1 )));
11293
+ Uint32 compressed_bytes_written = LCP_CTL_FILE_HEADER_SIZE +
11294
+ (BYTES_PER_PART * numPartPairs);
11287
11295
11288
- ndbrequire (BackupFormat::NDB_LCP_CTL_FILE_SIZE >= real_bytes_read);
11296
+ /* *
11297
+ * Add 3 to ensure that we take into account the last word that might
11298
+ * filled with only 1 byte of information.
11299
+ */
11300
+ ndbrequire (file_size >= (compressed_bytes_written + 3 ));
11289
11301
11290
11302
ndbrequire (memcmp (BACKUP_MAGIC, lcpCtlFilePtr->fileHeader .Magic , 8 ) == 0 );
11291
11303
ndbrequire (lcpCtlFilePtr->fileHeader .FileType ==
@@ -11334,17 +11346,21 @@ Backup::convert_ctl_page_to_network(Uint32 *page)
11334
11346
lcpCtlFilePtr->RowCountLow = htonl (lcpCtlFilePtr->RowCountLow );
11335
11347
lcpCtlFilePtr->RowCountHigh = htonl (lcpCtlFilePtr->RowCountHigh );
11336
11348
11337
- Uint32 total_parts = compress_part_pairs (lcpCtlFilePtr, numPartPairs);
11349
+ Uint32 total_parts = compress_part_pairs (lcpCtlFilePtr,
11350
+ numPartPairs,
11351
+ file_size);
11338
11352
ndbrequire (total_parts <= maxPartPairs);
11339
11353
11340
11354
/* *
11341
- * Checksum is calculated on network byte order.
11355
+ * Checksum is calculated on compressed network byte order.
11342
11356
* The checksum is calculated without regard to size decreasing due to
11343
11357
* compression. This is not a problem since we fill the remainder with
11344
11358
* zeroes and XOR doesn't change the checksum with extra zeroes.
11359
+ *
11360
+ * Add 3 to ensure that we move to word count in a correct manner.
11345
11361
*/
11346
11362
lcpCtlFilePtr->Checksum = 0 ;
11347
- Uint32 words = real_bytes_read / sizeof (Uint32);
11363
+ Uint32 words = (compressed_bytes_written + 3 ) / sizeof (Uint32);
11348
11364
Uint32 chksum = 0 ;
11349
11365
for (Uint32 i = 0 ; i < words; i++)
11350
11366
{
@@ -11355,7 +11371,8 @@ Backup::convert_ctl_page_to_network(Uint32 *page)
11355
11371
11356
11372
Uint32
11357
11373
Backup::compress_part_pairs (struct BackupFormat ::LCPCtlFile *lcpCtlFilePtr,
11358
- Uint32 num_parts)
11374
+ Uint32 num_parts,
11375
+ Uint32 file_size)
11359
11376
{
11360
11377
Uint32 total_parts = 0 ;
11361
11378
unsigned char *part_array =
@@ -11390,10 +11407,10 @@ Backup::compress_part_pairs(struct BackupFormat::LCPCtlFile *lcpCtlFilePtr,
11390
11407
numParts));
11391
11408
}
11392
11409
ndbrequire (total_parts == BackupFormat::NDB_MAX_LCP_PARTS);
11393
- unsigned char *start_page = (unsigned char *)lcpCtlFilePtr;
11394
- unsigned char *end_page = start_page + BackupFormat::NDB_LCP_CTL_FILE_SIZE ;
11395
- Uint64 remaining_size_64 = end_page - part_array;
11396
- ndbrequire (remaining_size_64 < BackupFormat::NDB_LCP_CTL_FILE_SIZE );
11410
+ unsigned char *start_pos = (unsigned char *)lcpCtlFilePtr;
11411
+ unsigned char *end_pos = start_pos + file_size ;
11412
+ Uint64 remaining_size_64 = end_pos - part_array;
11413
+ ndbrequire (remaining_size_64 < file_size );
11397
11414
Uint32 remaining_size = Uint32 (remaining_size_64);
11398
11415
memset (part_array, 0 , remaining_size);
11399
11416
return total_parts;
@@ -11646,9 +11663,38 @@ Backup::lcp_copy_ctl_page(BackupRecordPtr ptr)
11646
11663
c_backupFilePool.getPtr (recent_file_ptr, ptr.p ->prepareCtlFilePtr [recent]);
11647
11664
file_ptr.p ->pages .getPtr (page_ptr, 0 );
11648
11665
recent_file_ptr.p ->pages .getPtr (recent_page_ptr, 0 );
11649
- memcpy (page_ptr.p ,
11650
- recent_page_ptr.p ,
11651
- BackupFormat::NDB_LCP_CTL_FILE_SIZE);
11666
+ /* *
11667
+ * Important to consider here that the page is currently in expanded
11668
+ * format. So before we copy it we calculate how much to copy.
11669
+ */
11670
+ {
11671
+ struct BackupFormat ::LCPCtlFile *lcpCtlFilePtr =
11672
+ (struct BackupFormat ::LCPCtlFile*)recent_page_ptr.p ;
11673
+ Uint32 num_parts = lcpCtlFilePtr->NumPartPairs ;
11674
+ Uint32 size_to_copy = LCP_CTL_FILE_HEADER_SIZE;
11675
+ size_to_copy += (num_parts * sizeof (struct BackupFormat ::PartPair));
11676
+ memcpy (page_ptr.p ,
11677
+ recent_page_ptr.p ,
11678
+ size_to_copy);
11679
+ }
11680
+ #ifdef VM_TRACE
11681
+ {
11682
+ struct BackupFormat ::LCPCtlFile *lcpCtlFilePtr =
11683
+ (struct BackupFormat ::LCPCtlFile*)page_ptr.p ;
11684
+ jam ();
11685
+ Uint32 total_parts = 0 ;
11686
+ Uint32 num_parts = lcpCtlFilePtr->NumPartPairs ;
11687
+ jamLine (num_parts);
11688
+ for (Uint32 i = 0 ; i < num_parts; i++)
11689
+ {
11690
+ Uint32 parts = lcpCtlFilePtr->partPairs [i].numParts ;
11691
+ total_parts += parts;
11692
+ jamLine (parts);
11693
+ }
11694
+ jam ();
11695
+ ndbassert (total_parts == BackupFormat::NDB_MAX_LCP_PARTS);
11696
+ }
11697
+ #endif
11652
11698
}
11653
11699
11654
11700
void
@@ -13059,21 +13105,20 @@ Backup::lcp_write_ctl_file_to_disk(Signal *signal,
13059
13105
struct BackupFormat ::LCPCtlFile *lcpCtlFilePtr =
13060
13106
(struct BackupFormat ::LCPCtlFile*)pagePtr.p ;
13061
13107
Uint32 num_parts = lcpCtlFilePtr->NumPartPairs ;
13062
- Uint32 size_written = (sizeof (BackupFormat::LCPCtlFile) -
13063
- sizeof (BackupFormat::PartPair)) +
13064
- (3 * num_parts + 3 );
13065
- if (size_written > BackupFormat::NDB_LCP_CTL_FILE_SIZE)
13108
+ Uint32 file_size = LCP_CTL_FILE_HEADER_SIZE +
13109
+ (3 * num_parts + 3 );
13110
+ if (file_size > BackupFormat::NDB_LCP_CTL_FILE_SIZE_SMALL)
13066
13111
{
13067
13112
jam ();
13068
13113
DEB_LCP ((" (%u)Writing 8192 byte control file" , instance ()));
13069
- size_written = BackupFormat::NDB_LCP_CTL_FILE_SIZE_BIG;
13114
+ file_size = BackupFormat::NDB_LCP_CTL_FILE_SIZE_BIG;
13070
13115
}
13071
13116
else
13072
13117
{
13073
13118
jam ();
13074
- size_written = BackupFormat::NDB_LCP_CTL_FILE_SIZE ;
13119
+ file_size = BackupFormat::NDB_LCP_CTL_FILE_SIZE_SMALL ;
13075
13120
}
13076
- convert_ctl_page_to_network ((Uint32*)pagePtr.p );
13121
+ convert_ctl_page_to_network ((Uint32*)pagePtr.p , file_size );
13077
13122
filePtr.p ->m_flags |= BackupFile::BF_WRITING;
13078
13123
FsReadWriteReq* req = (FsReadWriteReq*)signal->getDataPtrSend ();
13079
13124
req->userPointer = filePtr.i ;
@@ -13089,7 +13134,7 @@ Backup::lcp_write_ctl_file_to_disk(Signal *signal,
13089
13134
Uint32 mem_offset = Uint32 ((char *)pagePtr.p - (char *)c_startOfPages);
13090
13135
req->data .memoryAddress .memoryOffset = mem_offset;
13091
13136
req->data .memoryAddress .fileOffset = 0 ;
13092
- req->data .memoryAddress .size = size_written ;
13137
+ req->data .memoryAddress .size = file_size ;
13093
13138
13094
13139
sendSignal (NDBFS_REF, GSN_FSWRITEREQ, signal,
13095
13140
FsReadWriteReq::FixedLength + 3 , JBA);
@@ -13591,7 +13636,7 @@ Backup::lcp_read_ctl_file_for_rewrite(Signal *signal,
13591
13636
Uint32 mem_offset = Uint32 (((char *)pagePtr.p ) - ((char *)c_startOfPages));
13592
13637
req->data .memoryAddress .memoryOffset = mem_offset;
13593
13638
req->data .memoryAddress .fileOffset = 0 ;
13594
- req->data .memoryAddress .size = BackupFormat::NDB_LCP_CTL_FILE_SIZE ;
13639
+ req->data .memoryAddress .size = BackupFormat::NDB_LCP_CTL_FILE_SIZE_BIG ;
13595
13640
13596
13641
sendSignal (NDBFS_REF, GSN_FSREADREQ, signal,
13597
13642
FsReadWriteReq::FixedLength + 3 , JBA);
0 commit comments