Skip to content

Commit bf3bfd6

Browse files
committed
KAFKA-5747; Producer snapshot loading should cover schema errors
Author: Jason Gustafson <jason@confluent.io> Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes apache#3688 from hachikuji/KAFKA-5747
1 parent 8a5a84d commit bf3bfd6

File tree

2 files changed

+91
-28
lines changed

2 files changed

+91
-28
lines changed

core/src/main/scala/kafka/log/ProducerStateManager.scala

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -285,32 +285,37 @@ object ProducerStateManager {
285285
new Field(ProducerEntriesField, new ArrayOf(ProducerSnapshotEntrySchema), "The entries in the producer table"))
286286

287287
def readSnapshot(file: File): Iterable[ProducerIdEntry] = {
288-
val buffer = Files.readAllBytes(file.toPath)
289-
val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
290-
291-
val version = struct.getShort(VersionField)
292-
if (version != ProducerSnapshotVersion)
293-
throw new IllegalArgumentException(s"Unhandled snapshot file version $version")
294-
295-
val crc = struct.getUnsignedInt(CrcField)
296-
val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset)
297-
if (crc != computedCrc)
298-
throw new CorruptSnapshotException(s"Snapshot file '$file' is corrupted (CRC is no longer valid). " +
299-
s"Stored crc: $crc. Computed crc: $computedCrc")
300-
301-
struct.getArray(ProducerEntriesField).map { producerEntryObj =>
302-
val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
303-
val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
304-
val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
305-
val seq = producerEntryStruct.getInt(LastSequenceField)
306-
val offset = producerEntryStruct.getLong(LastOffsetField)
307-
val timestamp = producerEntryStruct.getLong(TimestampField)
308-
val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
309-
val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
310-
val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
311-
val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta, timestamp,
312-
coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
313-
newEntry
288+
try {
289+
val buffer = Files.readAllBytes(file.toPath)
290+
val struct = PidSnapshotMapSchema.read(ByteBuffer.wrap(buffer))
291+
292+
val version = struct.getShort(VersionField)
293+
if (version != ProducerSnapshotVersion)
294+
throw new CorruptSnapshotException(s"Snapshot contained an unknown file version $version")
295+
296+
val crc = struct.getUnsignedInt(CrcField)
297+
val computedCrc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.length - ProducerEntriesOffset)
298+
if (crc != computedCrc)
299+
throw new CorruptSnapshotException(s"Snapshot is corrupt (CRC is no longer valid). " +
300+
s"Stored crc: $crc. Computed crc: $computedCrc")
301+
302+
struct.getArray(ProducerEntriesField).map { producerEntryObj =>
303+
val producerEntryStruct = producerEntryObj.asInstanceOf[Struct]
304+
val producerId: Long = producerEntryStruct.getLong(ProducerIdField)
305+
val producerEpoch = producerEntryStruct.getShort(ProducerEpochField)
306+
val seq = producerEntryStruct.getInt(LastSequenceField)
307+
val offset = producerEntryStruct.getLong(LastOffsetField)
308+
val timestamp = producerEntryStruct.getLong(TimestampField)
309+
val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField)
310+
val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField)
311+
val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField)
312+
val newEntry = ProducerIdEntry(producerId, producerEpoch, seq, offset, offsetDelta, timestamp,
313+
coordinatorEpoch, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None)
314+
newEntry
315+
}
316+
} catch {
317+
case e: SchemaException =>
318+
throw new CorruptSnapshotException(s"Snapshot failed schema validation: ${e.getMessage}")
314319
}
315320
}
316321

@@ -436,7 +441,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
436441
latestSnapshotFile match {
437442
case Some(file) =>
438443
try {
439-
info(s"Loading producer state from snapshot file ${file.getName} for partition $topicPartition")
444+
info(s"Loading producer state from snapshot file '$file' for partition $topicPartition")
440445
val loadedProducers = readSnapshot(file).filter { producerEntry =>
441446
isProducerRetained(producerEntry, logStartOffset) && !isProducerExpired(currentTime, producerEntry)
442447
}
@@ -446,7 +451,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
446451
return
447452
} catch {
448453
case e: CorruptSnapshotException =>
449-
error(s"Snapshot file at ${file.getPath} is corrupt: ${e.getMessage}")
454+
warn(s"Failed to load producer snapshot from '$file': ${e.getMessage}")
450455
Files.deleteIfExists(file.toPath)
451456
}
452457
case None =>

core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package kafka.log
1919

2020
import java.io.File
21+
import java.nio.ByteBuffer
22+
import java.nio.channels.FileChannel
23+
import java.nio.file.{OpenOption, StandardOpenOption}
2124

2225
import kafka.server.LogOffsetMetadata
2326
import kafka.utils.TestUtils
@@ -608,6 +611,61 @@ class ProducerStateManagerTest extends JUnitSuite {
608611
appendEndTxnMarker(stateManager, producerId, producerEpoch, ControlRecordType.COMMIT, offset = 100, coordinatorEpoch = 0)
609612
}
610613

614+
@Test
615+
def testLoadFromEmptySnapshotFile(): Unit = {
616+
testLoadFromCorruptSnapshot { file =>
617+
file.truncate(0L)
618+
}
619+
}
620+
621+
@Test
622+
def testLoadFromTruncatedSnapshotFile(): Unit = {
623+
testLoadFromCorruptSnapshot { file =>
624+
// truncate to some arbitrary point in the middle of the snapshot
625+
assertTrue(file.size > 2)
626+
file.truncate(file.size / 2)
627+
}
628+
}
629+
630+
@Test
631+
def testLoadFromCorruptSnapshotFile(): Unit = {
632+
testLoadFromCorruptSnapshot { file =>
633+
// write some garbage somewhere in the file
634+
assertTrue(file.size > 2)
635+
file.write(ByteBuffer.wrap(Array[Byte](37)), file.size / 2)
636+
}
637+
}
638+
639+
private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => Unit): Unit = {
640+
val epoch = 0.toShort
641+
val producerId = 1L
642+
643+
append(stateManager, producerId, epoch, seq = 0, offset = 0L)
644+
stateManager.takeSnapshot()
645+
646+
append(stateManager, producerId, epoch, seq = 1, offset = 1L)
647+
stateManager.takeSnapshot()
648+
649+
// Truncate the last snapshot
650+
val latestSnapshotOffset = stateManager.latestSnapshotOffset
651+
assertEquals(Some(2L), latestSnapshotOffset)
652+
val snapshotToTruncate = Log.producerSnapshotFile(logDir, latestSnapshotOffset.get)
653+
val channel = FileChannel.open(snapshotToTruncate.toPath, StandardOpenOption.WRITE)
654+
try {
655+
makeFileCorrupt(channel)
656+
} finally {
657+
channel.close()
658+
}
659+
660+
// Ensure that the truncated snapshot is deleted and producer state is loaded from the previous snapshot
661+
val reloadedStateManager = new ProducerStateManager(partition, logDir, maxPidExpirationMs)
662+
reloadedStateManager.truncateAndReload(0L, 20L, time.milliseconds())
663+
assertFalse(snapshotToTruncate.exists())
664+
665+
val loadedProducerState = reloadedStateManager.activeProducers(producerId)
666+
assertEquals(0L, loadedProducerState.lastOffset)
667+
}
668+
611669
private def appendEndTxnMarker(mapping: ProducerStateManager,
612670
producerId: Long,
613671
producerEpoch: Short,

0 commit comments

Comments
 (0)