Skip to content

Commit 6c26595

Browse files
committed
Update Transactional producer to translate retriable into abortable exceptions
1 parent e3430fe commit 6c26595

File tree

3 files changed

+63
-2
lines changed

3 files changed

+63
-2
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.kafka.common.errors.ProducerFencedException;
3838
import org.apache.kafka.common.errors.RetriableException;
3939
import org.apache.kafka.common.errors.TopicAuthorizationException;
40+
import org.apache.kafka.common.errors.TransactionAbortableException;
4041
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
4142
import org.apache.kafka.common.errors.UnknownProducerIdException;
4243
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -1073,6 +1074,11 @@ private void transitionTo(State target, RuntimeException error) {
10731074
} else if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) {
10741075
if (error == null)
10751076
throw new IllegalArgumentException("Cannot transition to " + target + " with a null exception");
1077+
1078+
if (error instanceof RetriableException) {
1079+
error = new TransactionAbortableException("Transaction Request was aborted after exhausting retries.", error);
1080+
}
1081+
10761082
lastError = error;
10771083
} else {
10781084
lastError = null;

clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@
1717
package org.apache.kafka.common.errors;
1818

1919
public class TransactionAbortableException extends ApiException {
20+
21+
private static final long serialVersionUID = 1L;
22+
23+
public TransactionAbortableException(String message, Throwable cause) {
24+
super(message, cause);
25+
}
26+
2027
public TransactionAbortableException(String message) {
2128
super(message);
2229
}

clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ private static Map<TopicPartition, MemoryRecords> partitionRecords(ProduceReques
186186
}));
187187
return Collections.unmodifiableMap(partitionRecords);
188188
}
189-
189+
190190
@Test
191191
public void testSimple() throws Exception {
192192
long offset = 0;
@@ -3001,7 +3001,51 @@ public void testCustomErrorMessage() throws Exception {
30013001
}
30023002

30033003
@Test
3004-
public void testSenderShouldRetryWithBackoffOnRetriableError() {
3004+
public void testSenderShouldTransitionToAbortableAfterRetriesExhausted() throws InterruptedException {
3005+
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
3006+
TransactionManager txnManager = new TransactionManager(
3007+
logContext,
3008+
"testRetriableException",
3009+
60000,
3010+
RETRY_BACKOFF_MS,
3011+
apiVersions
3012+
);
3013+
3014+
// Setup with transaction state and initialize transactions with single retry
3015+
setupWithTransactionState(txnManager, false, null, 1);
3016+
doInitTransactions(txnManager, producerIdAndEpoch);
3017+
3018+
// Begin transaction and add partition
3019+
txnManager.beginTransaction();
3020+
txnManager.maybeAddPartition(tp0);
3021+
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
3022+
sender.runOnce();
3023+
3024+
// First produce request
3025+
appendToAccumulator(tp0);
3026+
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
3027+
sender.runOnce();
3028+
3029+
// Sleep for retry backoff
3030+
time.sleep(RETRY_BACKOFF_MS);
3031+
3032+
// Second attempt to process record - PREPARE the response before sending
3033+
client.prepareResponse(produceResponse(tp0, -1, Errors.COORDINATOR_LOAD_IN_PROGRESS, -1));
3034+
sender.runOnce();
3035+
3036+
// Now transaction should be in abortable state after retry is exhausted
3037+
assertTrue(txnManager.hasAbortableError());
3038+
3039+
// Second produce request - should fail with TransactionAbortableException
3040+
Future<RecordMetadata> future2 = appendToAccumulator(tp0);
3041+
client.prepareResponse(produceResponse(tp0, -1, Errors.NONE, -1));
3042+
// Sender will try to send and fail with TransactionAbortableException instead of COORDINATOR_LOAD_IN_PROGRESS, because we're in abortable state
3043+
sender.runOnce();
3044+
assertFutureFailure(future2, TransactionAbortableException.class);
3045+
}
3046+
3047+
@Test
3048+
public void testSenderShouldRetryWithBackoffOnRetriableError() throws InterruptedException {
30053049
final long producerId = 343434L;
30063050
TransactionManager transactionManager = createTransactionManager();
30073051
setupWithTransactionState(transactionManager);
@@ -3635,6 +3679,10 @@ private void setupWithTransactionState(TransactionManager transactionManager, bo
36353679
setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE, 0);
36363680
}
36373681

3682+
private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool, int retries) {
3683+
setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, retries, 0);
3684+
}
3685+
36383686
private void setupWithTransactionState(
36393687
TransactionManager transactionManager,
36403688
boolean guaranteeOrder,

0 commit comments

Comments
 (0)