Skip to content

LogManager

LogManager and KafkaServer

Creating Instance

LogManager takes the following to be created:

LogManager is created using apply factory method.

Creating LogManager

apply(
  config: KafkaConfig,
  initialOfflineDirs: Seq[String],
  configRepository: ConfigRepository,
  kafkaScheduler: KafkaScheduler,
  time: Time,
  brokerTopicStats: BrokerTopicStats,
  logDirFailureChannel: LogDirFailureChannel,
  keepPartitionMetadataFile: Boolean): LogManager

apply extracts log-related configuration properties (from the given KafkaConfig) and creates a LogConfig.

apply creates a LogCleaner.

In the end, apply creates a LogManager based on some configuration properties.


apply is used when:

KafkaMetricsGroup

LogManager is a KafkaMetricsGroup.

Current Logs

currentLogs: Pool[TopicPartition, UnifiedLog]

LogManager defines currentLogs internal registry of UnifiedLogs per TopicPartition.

LogManager uses the currentLogs registry when:

Looking Up Log

getLog(
  topicPartition: TopicPartition,
  isFuture: Boolean = false): Option[UnifiedLog]

With the input isFuture enabled, getLog uses the futureLogs registry to look up the UnifiedLog for the input TopicPartition (if available). Otherwise, getLog uses the currentLogs registry.


getLog is used when:

getOrCreateLog

getOrCreateLog(
  topicPartition: TopicPartition,
  isNew: Boolean = false,
  isFuture: Boolean = false,
  topicId: Option[Uuid]): UnifiedLog

getOrCreateLog...FIXME


getOrCreateLog is used when:

startup

startup(
  topicNames: Set[String]): Unit

startup startupWithConfigOverrides with the currentDefaultConfig and fetchTopicConfigOverrides.


startup is used when:

startupWithConfigOverrides

startupWithConfigOverrides(
  defaultConfig: LogConfig,
  topicConfigOverrides: Map[String, LogConfig]): Unit

startupWithConfigOverrides...FIXME

loadLogs

loadLogs(
  defaultConfig: LogConfig,
  topicConfigOverrides: Map[String, LogConfig]): Unit

loadLogs...FIXME

loadLog

loadLog(
  logDir: File,
  hadCleanShutdown: Boolean,
  recoveryPoints: Map[TopicPartition, Long],
  logStartOffsets: Map[TopicPartition, Long],
  defaultConfig: LogConfig,
  topicConfigOverrides: Map[String, LogConfig],
  numRemainingSegments: ConcurrentMap[String, Int]): UnifiedLog

loadLog...FIXME

Logging

Enable ALL logging level for kafka.log.LogManager logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.log.LogManager=ALL

Refer to Logging.