Skip to content

Commit 7863b35

Browse files
authored
KAFKA-14485: Move LogCleaner to storage module (apache#19387)
Move LogCleaner and related classes to storage module and rewrite in Java. Reviewers: Mickael Maison <[email protected]>, Jun Rao <[email protected]>
1 parent 23cfb98 commit 7863b35

32 files changed

+1995
-1526
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3737,7 +3737,7 @@ project(':connect:mirror') {
37373737
testImplementation project(':core')
37383738
testImplementation project(':test-common:test-common-runtime')
37393739
testImplementation project(':server')
3740-
testImplementation project(':server-common').sourceSets.test.output
3740+
testImplementation project(':server-common')
37413741

37423742

37433743
testRuntimeOnly project(':connect:runtime')

checkstyle/import-control-server-common.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
</subpackage>
131131
<subpackage name="config">
132132
<allow pkg="org.apache.kafka.server"/>
133+
<allow pkg="org.apache.kafka.clients"/>
133134
</subpackage>
134135
</subpackage>
135136

checkstyle/import-control-storage.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
<allow pkg="com.fasterxml.jackson" />
8989
<allow pkg="com.yammer.metrics.core" />
9090
<allow pkg="org.apache.kafka.common" />
91+
<allow pkg="org.apache.kafka.config" />
9192
<allow pkg="org.apache.kafka.server"/>
9293
<allow pkg="org.apache.kafka.storage.internals"/>
9394
<allow pkg="org.apache.kafka.storage.log.metrics"/>

config/log4j2.yaml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,17 @@ Configuration:
133133
AppenderRef:
134134
ref: ControllerAppender
135135
# LogCleaner logger
136-
- name: kafka.log.LogCleaner
136+
- name: org.apache.kafka.storage.internals.log.LogCleaner
137+
level: INFO
138+
additivity: false
139+
AppenderRef:
140+
ref: CleanerAppender
141+
- name: org.apache.kafka.storage.internals.log.LogCleaner$CleanerThread
142+
level: INFO
143+
additivity: false
144+
AppenderRef:
145+
ref: CleanerAppender
146+
- name: org.apache.kafka.storage.internals.log.Cleaner
137147
level: INFO
138148
additivity: false
139149
AppenderRef:

core/src/main/scala/kafka/log/LogCleaner.scala

Lines changed: 0 additions & 1307 deletions
This file was deleted.

core/src/main/scala/kafka/log/LogManager.scala

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,12 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
4242
import java.util.{Collections, Optional, OptionalLong, Properties}
4343
import org.apache.kafka.server.metrics.KafkaMetricsGroup
4444
import org.apache.kafka.server.util.{FileLock, Scheduler}
45-
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, LogOffsetsListener, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
45+
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogCleaner, LogConfig, LogDirFailureChannel, LogOffsetsListener, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
4646
import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile}
4747
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
4848

4949
import java.util
50+
import java.util.stream.Collectors
5051

5152
/**
5253
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
@@ -629,7 +630,7 @@ class LogManager(logDirs: Seq[File],
629630
initialTaskDelayMs)
630631
}
631632
if (cleanerConfig.enableCleaner) {
632-
_cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
633+
_cleaner = new LogCleaner(cleanerConfig, liveLogDirs.asJava, currentLogs, logDirFailureChannel, time)
633634
_cleaner.startup()
634635
}
635636
}
@@ -894,7 +895,7 @@ class LogManager(logDirs: Seq[File],
894895
*/
895896
private def resumeCleaning(topicPartition: TopicPartition): Unit = {
896897
if (cleaner != null) {
897-
cleaner.resumeCleaning(Seq(topicPartition))
898+
cleaner.resumeCleaning(util.Set.of(topicPartition))
898899
info(s"Cleaning for partition $topicPartition is resumed")
899900
}
900901
}
@@ -1286,7 +1287,7 @@ class LogManager(logDirs: Seq[File],
12861287
if (cleaner != null && !isFuture) {
12871288
cleaner.abortCleaning(topicPartition)
12881289
if (checkpoint) {
1289-
cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition))
1290+
cleaner.updateCheckpoints(removedLog.parentDirFile, Optional.of(topicPartition))
12901291
}
12911292
}
12921293
if (isStray) {
@@ -1344,7 +1345,7 @@ class LogManager(logDirs: Seq[File],
13441345

13451346
val logsByDirCached = logsByDir
13461347
logDirs.foreach { logDir =>
1347-
if (cleaner != null) cleaner.updateCheckpoints(logDir)
1348+
if (cleaner != null) cleaner.updateCheckpoints(logDir, Optional.empty())
13481349
val logsToCheckpoint = logsInDir(logsByDirCached, logDir)
13491350
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
13501351
checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
@@ -1382,19 +1383,22 @@ class LogManager(logDirs: Seq[File],
13821383
val startMs = time.milliseconds
13831384

13841385
// clean current logs.
1385-
val deletableLogs = {
1386+
val deletableLogs: util.Map[TopicPartition, UnifiedLog] = {
13861387
if (cleaner != null) {
13871388
// prevent cleaner from working on same partitions when changing cleanup policy
13881389
cleaner.pauseCleaningForNonCompactedPartitions()
13891390
} else {
1390-
currentLogs.asScala.filter {
1391-
case (_, log) => !log.config.compact
1392-
}
1391+
currentLogs.entrySet().stream()
1392+
.filter(e => !e.getValue.config.compact)
1393+
.collect(Collectors.toMap(
1394+
(e: util.Map.Entry[TopicPartition, UnifiedLog]) => e.getKey,
1395+
(e: util.Map.Entry[TopicPartition, UnifiedLog]) => e.getValue
1396+
))
13931397
}
13941398
}
13951399

13961400
try {
1397-
deletableLogs.foreach {
1401+
deletableLogs.forEach {
13981402
case (topicPartition, log) =>
13991403
debug(s"Garbage collecting '${log.name}'")
14001404
total += log.deleteOldSegments()
@@ -1408,7 +1412,7 @@ class LogManager(logDirs: Seq[File],
14081412
}
14091413
} finally {
14101414
if (cleaner != null) {
1411-
cleaner.resumeCleaning(deletableLogs.map(_._1))
1415+
cleaner.resumeCleaning(deletableLogs.keySet())
14121416
}
14131417
}
14141418

@@ -1548,7 +1552,7 @@ object LogManager {
15481552
LogConfig.validateBrokerLogConfigValues(defaultProps, config.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
15491553
val defaultLogConfig = new LogConfig(defaultProps)
15501554

1551-
val cleanerConfig = LogCleaner.cleanerConfig(config)
1555+
val cleanerConfig = new CleanerConfig(config)
15521556
val transactionLogConfig = new TransactionLogConfig(config)
15531557

15541558
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util
2121
import java.util.{Collections, Properties}
2222
import java.util.concurrent.CopyOnWriteArrayList
2323
import java.util.concurrent.locks.ReentrantReadWriteLock
24-
import kafka.log.{LogCleaner, LogManager}
24+
import kafka.log.LogManager
2525
import kafka.network.{DataPlaneAcceptor, SocketServer}
2626
import kafka.raft.KafkaRaftManager
2727
import kafka.server.DynamicBrokerConfig._
@@ -46,7 +46,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
4646
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
4747
import org.apache.kafka.server.telemetry.ClientTelemetry
4848
import org.apache.kafka.snapshot.RecordsSnapshotReader
49-
import org.apache.kafka.storage.internals.log.LogConfig
49+
import org.apache.kafka.storage.internals.log.{LogCleaner, LogConfig}
5050

5151
import scala.collection._
5252
import scala.jdk.CollectionConverters._
@@ -89,7 +89,7 @@ object DynamicBrokerConfig {
8989
private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
9090

9191
val AllDynamicConfigs = DynamicSecurityConfigs ++
92-
LogCleaner.ReconfigurableConfigs ++
92+
LogCleaner.RECONFIGURABLE_CONFIGS.asScala ++
9393
DynamicLogConfig.ReconfigurableConfigs ++
9494
DynamicThreadPool.RECONFIGURABLE_CONFIGS.asScala ++
9595
Set(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG) ++

core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
283283
val broker = cluster.brokers.asScala.head._2
284284
val log = broker.logManager.getLog(tp).get
285285
log.roll()
286-
assertTrue(broker.logManager.cleaner.awaitCleaned(tp, 0))
286+
assertTrue(broker.logManager.cleaner.awaitCleaned(tp, 0, 60000L))
287287
}
288288

289289
private def withAdmin(f: Admin => Unit): Unit = {

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,7 +1041,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
10411041
configs.get(topicResource1).get(TopicConfig.RETENTION_MS_CONFIG).value)
10421042

10431043
val maxMessageBytes2 = configs.get(topicResource2).get(TopicConfig.MAX_MESSAGE_BYTES_CONFIG)
1044-
assertEquals(LogConfig.DEFAULT_MAX_MESSAGE_BYTES.toString, maxMessageBytes2.value)
1044+
assertEquals(ServerLogConfigs.MAX_MESSAGE_BYTES_DEFAULT.toString, maxMessageBytes2.value)
10451045
assertEquals(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageBytes2.name)
10461046
assertTrue(maxMessageBytes2.isDefault)
10471047
assertFalse(maxMessageBytes2.isSensitive)
@@ -3467,7 +3467,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
34673467
assertEquals(2, configs.size)
34683468

34693469
assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString, configs.get(topic1Resource).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
3470-
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE, configs.get(topic1Resource).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)
3470+
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, configs.get(topic1Resource).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)
34713471
assertEquals("0.9", configs.get(topic2Resource).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
34723472

34733473
// Check invalid use of append/subtract operation types
@@ -4120,12 +4120,12 @@ object PlaintextAdminIntegrationTest {
41204120

41214121
assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString,
41224122
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
4123-
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE,
4123+
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT,
41244124
configs.get(topicResource1).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)
41254125

41264126
assertEquals("snappy", configs.get(topicResource2).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)
41274127

4128-
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
4128+
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
41294129

41304130
// Alter configs with validateOnly = true: first and third are invalid, second is valid
41314131
alterConfigs.put(topicResource1, util.Arrays.asList(
@@ -4149,11 +4149,11 @@ object PlaintextAdminIntegrationTest {
41494149

41504150
assertEquals(LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO.toString,
41514151
configs.get(topicResource1).get(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG).value)
4152-
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE,
4152+
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT,
41534153
configs.get(topicResource1).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)
41544154

41554155
assertEquals("snappy", configs.get(topicResource2).get(TopicConfig.COMPRESSION_TYPE_CONFIG).value)
41564156

4157-
assertEquals(LogConfig.DEFAULT_COMPRESSION_TYPE, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
4157+
assertEquals(ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, configs.get(brokerResource).get(ServerConfigs.COMPRESSION_TYPE_CONFIG).value)
41584158
}
41594159
}

core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.kafka.common.config.TopicConfig
2626
import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException}
2727
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType}
2828
import org.apache.kafka.common.serialization.ByteArraySerializer
29-
import org.apache.kafka.storage.internals.log.LogConfig
29+
import org.apache.kafka.server.config.ServerLogConfigs
3030
import org.junit.jupiter.api.Assertions._
3131
import org.junit.jupiter.api.Timeout
3232
import org.junit.jupiter.params.ParameterizedTest
@@ -262,7 +262,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
262262
val valueLengthSize = 3
263263
val overhead = Records.LOG_OVERHEAD + DefaultRecordBatch.RECORD_BATCH_OVERHEAD + DefaultRecord.MAX_RECORD_OVERHEAD +
264264
keyLengthSize + headerLengthSize + valueLengthSize
265-
val valueSize = LogConfig.DEFAULT_MAX_MESSAGE_BYTES - overhead
265+
val valueSize = ServerLogConfigs.MAX_MESSAGE_BYTES_DEFAULT - overhead
266266

267267
val record0 = new ProducerRecord(topic, new Array[Byte](0), new Array[Byte](valueSize))
268268
assertEquals(record0.value.length, producer.send(record0).get.serializedValueSize)

0 commit comments

Comments
 (0)