Skip to content

KAFKA-19407 Fix potential IllegalStateException when appending to timeIndex #19972

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public class LogSegment implements Closeable {
// NOTED: the offset is the last offset of batch having the max timestamp.
private volatile TimestampOffset maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN;

// Lock for maxTimestampAndOffsetSoFar to ensure that it will be initialized only once
private final Object maxTimestampAndOffsetLock = new Object();

private long created;

/* the number of bytes since we last added an entry in the offset index */
Expand Down Expand Up @@ -177,7 +180,7 @@ public void resizeIndexes(int size) throws IOException {
public void sanityCheck(boolean timeIndexFileNewlyCreated) throws IOException {
if (offsetIndexFile().exists()) {
// Resize the time index file to 0 if it is newly created.
if (timeIndexFileNewlyCreated)
if (timeIndexFileNewlyCreated)
timeIndex().resize(0);
// Sanity checks for time index and offset index are skipped because
// we will recover the segments above the recovery point in recoverLog()
Expand All @@ -192,8 +195,13 @@ public void sanityCheck(boolean timeIndexFileNewlyCreated) throws IOException {
* the time index).
*/
public TimestampOffset readMaxTimestampAndOffsetSoFar() throws IOException {
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN)
maxTimestampAndOffsetSoFar = timeIndex().lastEntry();
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this volatile field become an Atomic instead?

Copy link
Contributor Author

@ocadaruma ocadaruma Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Atomic doesn't make the thing simple, because we need double-check locking anyways to ensure:

  • initialize the value only once at first time
  • always return initialized value

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought updateAndGet could work to this effect

maxTimestampAndOffsetSoFar.updateAndGet(t -> if (t == TimestampOffset.UNKNOWN) timeIndex().lastEntry() else t)

But I have not fully considered whether it would end up being slower on any microbenchmark. It just looked simpler.

synchronized (maxTimestampAndOffsetLock) {
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) {
maxTimestampAndOffsetSoFar = timeIndex().lastEntry();
}
}
}
return maxTimestampAndOffsetSoFar;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -57,6 +58,10 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -65,7 +70,11 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class LogSegmentTest {
Expand Down Expand Up @@ -856,6 +865,42 @@ public void testNonMonotonicTimestampForMultipleBatchesInMemoryRecords() throws
assertEquals(new TimestampOffset(2, 2), segment.timeIndex().entry(1));
}

@Test
@Timeout(30)
public void testConcurrentAccessToMaxTimestampSoFar() throws Exception {
int numThreads = 16;
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
TimeIndex mockTimeIndex = mock(TimeIndex.class);
when(mockTimeIndex.lastEntry()).thenReturn(new TimestampOffset(RecordBatch.NO_TIMESTAMP, 0L));

try {
// to reproduce race, we iterate test for certain duration
long remainingDurationNanos = Duration.ofSeconds(1).toNanos();
while (remainingDurationNanos > 0) {
long t0 = System.nanoTime();
clearInvocations(mockTimeIndex);
try (LogSegment seg = spy(LogTestUtils.createSegment(0, logDir, 10, Time.SYSTEM))) {
when(seg.timeIndex()).thenReturn(mockTimeIndex);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
futures.add(executor.submit(() -> assertDoesNotThrow(seg::maxTimestampSoFar)));
}
for (Future<?> future : futures) {
future.get();
}
// timeIndex.lastEntry should be called once if no race
verify(mockTimeIndex, times(1)).lastEntry();

long elapsedNanos = System.nanoTime() - t0;
remainingDurationNanos -= elapsedNanos;
}
}
} finally {
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}

private ProducerStateManager newProducerStateManager() throws IOException {
return new ProducerStateManager(
topicPartition,
Expand Down