LogManager¶
Creating Instance¶
LogManager
takes the following to be created:
- Log directories
- Initial offline directories
-
ConfigRepository
-
LogConfig
-
CleanerConfig
- num.recovery.threads.per.data.dir
- log.flush.scheduler.interval.ms
- log.flush.offset.checkpoint.interval.ms
- log.flush.start.offset.checkpoint.interval.ms
- log.retention.check.interval.ms
- transaction.max.timeout.ms
- transactional.id.expiration.ms
- inter.broker.protocol.version
-
Scheduler
-
BrokerTopicStats
-
LogDirFailureChannel
-
Time
-
keepPartitionMetadataFile
LogManager
is created using apply factory method.
Creating LogManager¶
apply(
config: KafkaConfig,
initialOfflineDirs: Seq[String],
configRepository: ConfigRepository,
kafkaScheduler: KafkaScheduler,
time: Time,
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
keepPartitionMetadataFile: Boolean): LogManager
apply
extracts log-related configuration properties (from the given KafkaConfig) and creates a LogConfig.
apply
creates a LogCleaner.
In the end, apply
creates a LogManager based on some configuration properties.
apply
is used when:
KafkaMetricsGroup¶
LogManager
is a KafkaMetricsGroup.
Current Logs¶
currentLogs: Pool[TopicPartition, UnifiedLog]
LogManager
defines currentLogs
internal registry of UnifiedLogs per TopicPartition
.
LogManager
uses the currentLogs
registry when:
- startupWithConfigOverrides (to create a LogCleaner when enabled)
- handleLogDirFailure
- loadLog
- truncateTo
- truncateFullyAndStartAt
- getLog
- getOrCreateLog
- and many more
Looking Up Log¶
getLog(
topicPartition: TopicPartition,
isFuture: Boolean = false): Option[UnifiedLog]
With the input isFuture
enabled, getLog
uses the futureLogs registry to look up the UnifiedLog for the input TopicPartition
(if available). Otherwise, getLog
uses the currentLogs registry.
getLog
is used when:
Partition
is requested to topicId and getOffsetByTimestampLogManager
is requested to maybeUpdatePreferredLogDir, getOrCreateLog, asyncDeleteReplicaManager
is requested to getLog, maybeAddLogDirFetchers
getOrCreateLog¶
getOrCreateLog(
topicPartition: TopicPartition,
isNew: Boolean = false,
isFuture: Boolean = false,
topicId: Option[Uuid]): UnifiedLog
getOrCreateLog
...FIXME
getOrCreateLog
is used when:
Partition
is requested to createLog
startup¶
startup(
topicNames: Set[String]): Unit
startup
startupWithConfigOverrides with the currentDefaultConfig and fetchTopicConfigOverrides.
startup
is used when:
KafkaServer
is requested to startupBrokerMetadataPublisher
is requested to initializeManagers
startupWithConfigOverrides¶
startupWithConfigOverrides(
defaultConfig: LogConfig,
topicConfigOverrides: Map[String, LogConfig]): Unit
startupWithConfigOverrides
...FIXME
loadLogs¶
loadLogs(
defaultConfig: LogConfig,
topicConfigOverrides: Map[String, LogConfig]): Unit
loadLogs
...FIXME
loadLog¶
loadLog(
logDir: File,
hadCleanShutdown: Boolean,
recoveryPoints: Map[TopicPartition, Long],
logStartOffsets: Map[TopicPartition, Long],
defaultConfig: LogConfig,
topicConfigOverrides: Map[String, LogConfig],
numRemainingSegments: ConcurrentMap[String, Int]): UnifiedLog
loadLog
...FIXME
Logging¶
Enable ALL
logging level for kafka.log.LogManager
logger to see what happens inside.
Add the following line to config/log4j.properties
:
log4j.logger.kafka.log.LogManager=ALL
Refer to Logging.