Skip to content

Commit e23172a

Browse files
authored
MINOR: Move OffsetCheckpointFile to storage module (apache#16917)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1 parent c207438 commit e23172a

27 files changed

+533
-419
lines changed

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import kafka.controller.{KafkaController, StateChangeLogger}
2525
import kafka.log._
2626
import kafka.log.remote.RemoteLogManager
2727
import kafka.server._
28-
import kafka.server.checkpoints.OffsetCheckpoints
2928
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
3029
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
3130
import kafka.utils._
@@ -46,6 +45,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState
4645
import org.apache.kafka.server.common.MetadataVersion
4746
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard}
4847
import org.apache.kafka.server.metrics.KafkaMetricsGroup
48+
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
4949
import org.slf4j.event.Level
5050

5151
import scala.collection.{Map, Seq}
@@ -479,10 +479,10 @@ class Partition(val topicPartition: TopicPartition,
479479
private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints,
480480
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): UnifiedLog = {
481481
def updateHighWatermark(log: UnifiedLog): Unit = {
482-
val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse {
482+
val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).orElseGet(() => {
483483
info(s"No checkpointed highwatermark is found for partition $topicPartition")
484484
0L
485-
}
485+
})
486486
val initialHighWatermark = log.updateHighWatermark(checkpointHighWatermark)
487487
info(s"Log loaded for partition $topicPartition with initial high watermark $initialHighWatermark")
488488
}

core/src/main/scala/kafka/log/LogCleanerManager.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717

1818
package kafka.log
1919

20+
import java.lang.{Long => JLong}
2021
import java.io.File
2122
import java.util.concurrent.TimeUnit
2223
import java.util.concurrent.locks.ReentrantLock
2324
import kafka.common.LogCleaningAbortedException
24-
import kafka.server.checkpoints.OffsetCheckpointFile
2525
import kafka.utils.CoreUtils._
2626
import kafka.utils.{Logging, Pool}
2727
import org.apache.kafka.common.{KafkaException, TopicPartition}
2828
import org.apache.kafka.common.errors.KafkaStorageException
2929
import org.apache.kafka.common.utils.Time
30+
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
3031
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
3132
import org.apache.kafka.server.metrics.KafkaMetricsGroup
3233

@@ -114,7 +115,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
114115
partitions.iterator.map { tp =>
115116
Option(logs.get(tp)).map {
116117
log =>
117-
val lastCleanOffset = lastClean.get(tp)
118+
val lastCleanOffset: Option[Long] = lastClean.get(tp)
118119
val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
119120
val (_, uncleanableBytes) = calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset, offsetsToClean.firstUncleanableDirtyOffset)
120121
uncleanableBytes
@@ -144,7 +145,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
144145
inLock(lock) {
145146
checkpoints.values.flatMap(checkpoint => {
146147
try {
147-
checkpoint.read()
148+
checkpoint.read().asScala.map{ case (tp, offset) => tp -> Long2long(offset) }
148149
} catch {
149150
case e: KafkaStorageException =>
150151
error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
@@ -376,13 +377,13 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
376377
* @param partitionToRemove The TopicPartition to be removed
377378
*/
378379
def updateCheckpoints(dataDir: File,
379-
partitionToUpdateOrAdd: Option[(TopicPartition, Long)] = None,
380+
partitionToUpdateOrAdd: Option[(TopicPartition, JLong)] = None,
380381
partitionToRemove: Option[TopicPartition] = None): Unit = {
381382
inLock(lock) {
382383
val checkpoint = checkpoints(dataDir)
383384
if (checkpoint != null) {
384385
try {
385-
val currentCheckpoint = checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap
386+
val currentCheckpoint = checkpoint.read().asScala.filter { case (tp, _) => logs.keys.contains(tp) }.toMap
386387
// remove the partition offset if any
387388
var updatedCheckpoint = partitionToRemove match {
388389
case Some(topicPartition) => currentCheckpoint - topicPartition
@@ -394,7 +395,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
394395
case None => updatedCheckpoint
395396
}
396397

397-
checkpoint.write(updatedCheckpoint)
398+
checkpoint.write(updatedCheckpoint.asJava)
398399
} catch {
399400
case e: KafkaStorageException =>
400401
error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
@@ -409,7 +410,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
409410
def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, destLogDir: File): Unit = {
410411
inLock(lock) {
411412
try {
412-
checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
413+
checkpoints.get(sourceLogDir).flatMap(_.read().asScala.get(topicPartition)) match {
413414
case Some(offset) =>
414415
debug(s"Removing the partition offset data in checkpoint file for '$topicPartition' " +
415416
s"from ${sourceLogDir.getAbsoluteFile} directory.")
@@ -448,14 +449,16 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
448449
/**
449450
* Truncate the checkpointed offset for the given partition if its checkpointed offset is larger than the given offset
450451
*/
451-
def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition, offset: Long): Unit = {
452+
def maybeTruncateCheckpoint(dataDir: File, topicPartition: TopicPartition, offset: JLong): Unit = {
452453
inLock(lock) {
453454
if (logs.get(topicPartition).config.compact) {
454455
val checkpoint = checkpoints(dataDir)
455456
if (checkpoint != null) {
456457
val existing = checkpoint.read()
457-
if (existing.getOrElse(topicPartition, 0L) > offset)
458-
checkpoint.write(mutable.Map() ++= existing += topicPartition -> offset)
458+
if (existing.getOrDefault(topicPartition, 0L) > offset) {
459+
existing.put(topicPartition, offset)
460+
checkpoint.write(existing)
461+
}
459462
}
460463
}
461464
}

core/src/main/scala/kafka/log/LogManager.scala

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717

1818
package kafka.log
1919

20-
import java.io._
20+
import java.lang.{Long => JLong}
21+
import java.io.{File, IOException}
2122
import java.nio.file.{Files, NoSuchFileException}
2223
import java.util.concurrent._
2324
import java.util.concurrent.atomic.AtomicInteger
24-
import kafka.server.checkpoints.OffsetCheckpointFile
2525
import kafka.server.metadata.ConfigRepository
26-
import kafka.server._
26+
import kafka.server.{BrokerTopicStats, KafkaConfig, KafkaRaftServer}
2727
import kafka.server.metadata.BrokerMetadataPublisher.info
28-
import kafka.utils._
28+
import kafka.utils.threadsafe
29+
import kafka.utils.{CoreUtils, Logging, Pool}
2930
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid}
3031
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
3132
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException}
@@ -40,13 +41,13 @@ import org.apache.kafka.common.requests.{AbstractControlRequest, LeaderAndIsrReq
4041
import org.apache.kafka.image.TopicsImage
4142
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, PropertiesUtils}
4243

43-
import java.util.{OptionalLong, Properties}
44+
import java.util.{Collections, OptionalLong, Properties}
4445
import org.apache.kafka.server.common.MetadataVersion
4546
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
4647
import org.apache.kafka.server.metrics.KafkaMetricsGroup
4748
import org.apache.kafka.server.util.{FileLock, Scheduler}
4849
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, RemoteIndexCache}
49-
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler
50+
import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile}
5051

5152
import java.util
5253
import scala.annotation.nowarn
@@ -322,16 +323,16 @@ class LogManager(logDirs: Seq[File],
322323

323324
private[log] def loadLog(logDir: File,
324325
hadCleanShutdown: Boolean,
325-
recoveryPoints: Map[TopicPartition, Long],
326-
logStartOffsets: Map[TopicPartition, Long],
326+
recoveryPoints: util.Map[TopicPartition, JLong],
327+
logStartOffsets: util.Map[TopicPartition, JLong],
327328
defaultConfig: LogConfig,
328329
topicConfigOverrides: Map[String, LogConfig],
329330
numRemainingSegments: ConcurrentMap[String, Int],
330331
isStray: UnifiedLog => Boolean): UnifiedLog = {
331332
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
332333
val config = topicConfigOverrides.getOrElse(topicPartition.topic, defaultConfig)
333-
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
334-
val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
334+
val logRecoveryPoint = recoveryPoints.getOrDefault(topicPartition, 0L)
335+
val logStartOffset = logStartOffsets.getOrDefault(topicPartition, 0L)
335336

336337
val log = UnifiedLog(
337338
dir = logDir,
@@ -444,22 +445,22 @@ class LogManager(logDirs: Seq[File],
444445
}
445446
hadCleanShutdownFlags.put(logDirAbsolutePath, hadCleanShutdown)
446447

447-
var recoveryPoints = Map[TopicPartition, Long]()
448-
try {
449-
recoveryPoints = this.recoveryPointCheckpoints(dir).read()
448+
val recoveryPoints: util.Map[TopicPartition, JLong] = try {
449+
this.recoveryPointCheckpoints(dir).read()
450450
} catch {
451451
case e: Exception =>
452452
warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +
453453
s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
454+
Collections.emptyMap[TopicPartition, JLong]
454455
}
455456

456-
var logStartOffsets = Map[TopicPartition, Long]()
457-
try {
458-
logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
457+
val logStartOffsets: util.Map[TopicPartition, JLong] = try {
458+
this.logStartOffsetCheckpoints(dir).read()
459459
} catch {
460460
case e: Exception =>
461461
warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +
462462
s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
463+
Collections.emptyMap[TopicPartition, JLong]
463464
}
464465

465466
val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
@@ -844,10 +845,10 @@ class LogManager(logDirs: Seq[File],
844845
private def checkpointRecoveryOffsetsInDir(logDir: File, logsToCheckpoint: Map[TopicPartition, UnifiedLog]): Unit = {
845846
try {
846847
recoveryPointCheckpoints.get(logDir).foreach { checkpoint =>
847-
val recoveryOffsets = logsToCheckpoint.map { case (tp, log) => tp -> log.recoveryPoint }
848+
val recoveryOffsets: Map[TopicPartition, JLong] = logsToCheckpoint.map { case (tp, log) => tp -> long2Long(log.recoveryPoint) }
848849
// checkpoint.write calls Utils.atomicMoveWithFallback, which flushes the parent
849850
// directory and guarantees crash consistency.
850-
checkpoint.write(recoveryOffsets)
851+
checkpoint.write(recoveryOffsets.asJava)
851852
}
852853
} catch {
853854
case e: KafkaStorageException =>
@@ -867,11 +868,11 @@ class LogManager(logDirs: Seq[File],
867868
private def checkpointLogStartOffsetsInDir(logDir: File, logsToCheckpoint: Map[TopicPartition, UnifiedLog]): Unit = {
868869
try {
869870
logStartOffsetCheckpoints.get(logDir).foreach { checkpoint =>
870-
val logStartOffsets = logsToCheckpoint.collect {
871+
val logStartOffsets: Map[TopicPartition, JLong] = logsToCheckpoint.collect {
871872
case (tp, log) if log.remoteLogEnabled() || log.logStartOffset > log.logSegments.asScala.head.baseOffset =>
872-
tp -> log.logStartOffset
873+
tp -> long2Long(log.logStartOffset)
873874
}
874-
checkpoint.write(logStartOffsets)
875+
checkpoint.write(logStartOffsets.asJava)
875876
}
876877
} catch {
877878
case e: KafkaStorageException =>

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import kafka.log.{LogManager, UnifiedLog}
2525
import kafka.server.HostedPartition.Online
2626
import kafka.server.QuotaFactory.QuotaManagers
2727
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult}
28-
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
2928
import kafka.server.metadata.ZkMetadataCache
3029
import kafka.utils.Implicits._
3130
import kafka.utils._
@@ -60,9 +59,11 @@ import org.apache.kafka.server.common.DirectoryEventHandler
6059
import org.apache.kafka.server.common.MetadataVersion._
6160
import org.apache.kafka.server.metrics.KafkaMetricsGroup
6261
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
62+
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpoints, OffsetCheckpointFile}
6363
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard}
6464

6565
import java.io.File
66+
import java.lang.{Long => JLong}
6667
import java.nio.file.{Files, Paths}
6768
import java.util
6869
import java.util.concurrent.atomic.AtomicBoolean
@@ -1202,7 +1203,7 @@ class ReplicaManager(val config: KafkaConfig,
12021203
// start ReplicaAlterDirThread to move data of this partition from the current log to the future log
12031204
// - Otherwise, return KafkaStorageException. We do not create the future log while there is offline log directory
12041205
// so that we can avoid creating future log for the same partition in multiple log directories.
1205-
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
1206+
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints.asJava)
12061207
if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) {
12071208
val futureLog = futureLocalLogOrException(topicPartition)
12081209
logManager.abortAndPauseCleaning(topicPartition)
@@ -2039,7 +2040,7 @@ class ReplicaManager(val config: KafkaConfig,
20392040
}
20402041
}
20412042

2042-
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
2043+
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints.asJava)
20432044
val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
20442045
makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap,
20452046
highWatermarkCheckpoints, topicIdFromRequest)
@@ -2137,8 +2138,8 @@ class ReplicaManager(val config: KafkaConfig,
21372138
}
21382139

21392140
protected[server] def maybeAddLogDirFetchers(partitions: Set[Partition],
2140-
offsetCheckpoints: OffsetCheckpoints,
2141-
topicIds: String => Option[Uuid]): Unit = {
2141+
offsetCheckpoints: OffsetCheckpoints,
2142+
topicIds: String => Option[Uuid]): Unit = {
21422143
val futureReplicasAndInitialOffset = new mutable.HashMap[TopicPartition, InitialFetchState]
21432144
for (partition <- partitions) {
21442145
val topicPartition = partition.topicPartition
@@ -2429,22 +2430,22 @@ class ReplicaManager(val config: KafkaConfig,
24292430

24302431
// Flushes the highwatermark value for all partitions to the highwatermark file
24312432
def checkpointHighWatermarks(): Unit = {
2432-
def putHw(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]],
2433+
def putHw(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, JLong]],
24332434
log: UnifiedLog): Unit = {
24342435
val checkpoints = logDirToCheckpoints.getOrElseUpdate(log.parentDir,
2435-
new mutable.AnyRefMap[TopicPartition, Long]())
2436+
new mutable.AnyRefMap[TopicPartition, JLong]())
24362437
checkpoints.put(log.topicPartition, log.highWatermark)
24372438
}
24382439

2439-
val logDirToHws = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]](
2440+
val logDirToHws = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, JLong]](
24402441
allPartitions.size)
24412442
onlinePartitionsIterator.foreach { partition =>
24422443
partition.log.foreach(putHw(logDirToHws, _))
24432444
partition.futureLog.foreach(putHw(logDirToHws, _))
24442445
}
24452446

24462447
for ((logDir, hws) <- logDirToHws) {
2447-
try highWatermarkCheckpoints.get(logDir).foreach(_.write(hws))
2448+
try highWatermarkCheckpoints.get(logDir).foreach(_.write(hws.asJava))
24482449
catch {
24492450
case e: KafkaStorageException =>
24502451
error(s"Error while writing to highwatermark file in directory $logDir", e)
@@ -2749,7 +2750,7 @@ class ReplicaManager(val config: KafkaConfig,
27492750

27502751
// Handle partitions which we are now the leader or follower for.
27512752
if (!localChanges.leaders.isEmpty || !localChanges.followers.isEmpty) {
2752-
val lazyOffsetCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
2753+
val lazyOffsetCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints.asJava)
27532754
val leaderChangedPartitions = new mutable.HashSet[Partition]
27542755
val followerChangedPartitions = new mutable.HashSet[Partition]
27552756
if (!localChanges.leaders.isEmpty) {

0 commit comments

Comments
 (0)