-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19407 Fix potential IllegalStateException when appending to timeIndex #19972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,6 +93,9 @@ public class LogSegment implements Closeable { | |
// NOTED: the offset is the last offset of batch having the max timestamp. | ||
private volatile TimestampOffset maxTimestampAndOffsetSoFar = TimestampOffset.UNKNOWN; | ||
|
||
// Lock for maxTimestampAndOffsetSoFar to ensure that it will be initialized only once | ||
private final Object maxTimestampAndOffsetLock = new Object(); | ||
|
||
private long created; | ||
|
||
/* the number of bytes since we last added an entry in the offset index */ | ||
|
@@ -177,7 +180,7 @@ public void resizeIndexes(int size) throws IOException { | |
public void sanityCheck(boolean timeIndexFileNewlyCreated) throws IOException { | ||
if (offsetIndexFile().exists()) { | ||
// Resize the time index file to 0 if it is newly created. | ||
if (timeIndexFileNewlyCreated) | ||
if (timeIndexFileNewlyCreated) | ||
timeIndex().resize(0); | ||
// Sanity checks for time index and offset index are skipped because | ||
// we will recover the segments above the recovery point in recoverLog() | ||
|
@@ -192,8 +195,13 @@ public void sanityCheck(boolean timeIndexFileNewlyCreated) throws IOException { | |
* the time index). | ||
*/ | ||
public TimestampOffset readMaxTimestampAndOffsetSoFar() throws IOException { | ||
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) | ||
maxTimestampAndOffsetSoFar = timeIndex().lastEntry(); | ||
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this volatile field become an Atomic instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using Atomic doesn't make the thing simple, because we need double-check locking anyways to ensure:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought maxTimestampAndOffsetSoFar.updateAndGet(t -> if (t == TimestampOffset.UNKNOWN) timeIndex().lastEntry() else t) But I have not fully considered whether it would end up being slower on any microbenchmark. It just looked simpler. |
||
synchronized (maxTimestampAndOffsetLock) { | ||
if (maxTimestampAndOffsetSoFar == TimestampOffset.UNKNOWN) { | ||
maxTimestampAndOffsetSoFar = timeIndex().lastEntry(); | ||
} | ||
} | ||
} | ||
return maxTimestampAndOffsetSoFar; | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.