@@ -285,32 +285,37 @@ object ProducerStateManager {
285
285
new Field (ProducerEntriesField , new ArrayOf (ProducerSnapshotEntrySchema ), " The entries in the producer table" ))
286
286
287
287
def readSnapshot (file : File ): Iterable [ProducerIdEntry ] = {
288
- val buffer = Files .readAllBytes(file.toPath)
289
- val struct = PidSnapshotMapSchema .read(ByteBuffer .wrap(buffer))
290
-
291
- val version = struct.getShort(VersionField )
292
- if (version != ProducerSnapshotVersion )
293
- throw new IllegalArgumentException (s " Unhandled snapshot file version $version" )
294
-
295
- val crc = struct.getUnsignedInt(CrcField )
296
- val computedCrc = Crc32C .compute(buffer, ProducerEntriesOffset , buffer.length - ProducerEntriesOffset )
297
- if (crc != computedCrc)
298
- throw new CorruptSnapshotException (s " Snapshot file ' $file' is corrupted (CRC is no longer valid). " +
299
- s " Stored crc: $crc. Computed crc: $computedCrc" )
300
-
301
- struct.getArray(ProducerEntriesField ).map { producerEntryObj =>
302
- val producerEntryStruct = producerEntryObj.asInstanceOf [Struct ]
303
- val producerId : Long = producerEntryStruct.getLong(ProducerIdField )
304
- val producerEpoch = producerEntryStruct.getShort(ProducerEpochField )
305
- val seq = producerEntryStruct.getInt(LastSequenceField )
306
- val offset = producerEntryStruct.getLong(LastOffsetField )
307
- val timestamp = producerEntryStruct.getLong(TimestampField )
308
- val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField )
309
- val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField )
310
- val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField )
311
- val newEntry = ProducerIdEntry (producerId, producerEpoch, seq, offset, offsetDelta, timestamp,
312
- coordinatorEpoch, if (currentTxnFirstOffset >= 0 ) Some (currentTxnFirstOffset) else None )
313
- newEntry
288
+ try {
289
+ val buffer = Files .readAllBytes(file.toPath)
290
+ val struct = PidSnapshotMapSchema .read(ByteBuffer .wrap(buffer))
291
+
292
+ val version = struct.getShort(VersionField )
293
+ if (version != ProducerSnapshotVersion )
294
+ throw new CorruptSnapshotException (s " Snapshot contained an unknown file version $version" )
295
+
296
+ val crc = struct.getUnsignedInt(CrcField )
297
+ val computedCrc = Crc32C .compute(buffer, ProducerEntriesOffset , buffer.length - ProducerEntriesOffset )
298
+ if (crc != computedCrc)
299
+ throw new CorruptSnapshotException (s " Snapshot is corrupt (CRC is no longer valid). " +
300
+ s " Stored crc: $crc. Computed crc: $computedCrc" )
301
+
302
+ struct.getArray(ProducerEntriesField ).map { producerEntryObj =>
303
+ val producerEntryStruct = producerEntryObj.asInstanceOf [Struct ]
304
+ val producerId : Long = producerEntryStruct.getLong(ProducerIdField )
305
+ val producerEpoch = producerEntryStruct.getShort(ProducerEpochField )
306
+ val seq = producerEntryStruct.getInt(LastSequenceField )
307
+ val offset = producerEntryStruct.getLong(LastOffsetField )
308
+ val timestamp = producerEntryStruct.getLong(TimestampField )
309
+ val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField )
310
+ val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField )
311
+ val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField )
312
+ val newEntry = ProducerIdEntry (producerId, producerEpoch, seq, offset, offsetDelta, timestamp,
313
+ coordinatorEpoch, if (currentTxnFirstOffset >= 0 ) Some (currentTxnFirstOffset) else None )
314
+ newEntry
315
+ }
316
+ } catch {
317
+ case e : SchemaException =>
318
+ throw new CorruptSnapshotException (s " Snapshot failed schema validation: ${e.getMessage}" )
314
319
}
315
320
}
316
321
@@ -436,7 +441,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
436
441
latestSnapshotFile match {
437
442
case Some (file) =>
438
443
try {
439
- info(s " Loading producer state from snapshot file ${ file.getName} for partition $topicPartition" )
444
+ info(s " Loading producer state from snapshot file ' $ file' for partition $topicPartition" )
440
445
val loadedProducers = readSnapshot(file).filter { producerEntry =>
441
446
isProducerRetained(producerEntry, logStartOffset) && ! isProducerExpired(currentTime, producerEntry)
442
447
}
@@ -446,7 +451,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
446
451
return
447
452
} catch {
448
453
case e : CorruptSnapshotException =>
449
- error (s " Snapshot file at ${file.getPath} is corrupt : ${e.getMessage}" )
454
+ warn (s " Failed to load producer snapshot from ' $file ' : ${e.getMessage}" )
450
455
Files .deleteIfExists(file.toPath)
451
456
}
452
457
case None =>
0 commit comments