Skip to content

Commit 27d9a55

Browse files
committed
Merge branch 'trunk' into KAFKA-18894
2 parents e26a06f + 7863b35 commit 27d9a55

File tree

283 files changed

+10175
-4757
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

283 files changed

+10175
-4757
lines changed

.github/scripts/pr-format.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,10 @@ def check(positive_assertion, ok_msg, err_msg):
162162
new_lines.append(textwrap.fill(line, width=72, break_long_words=False, break_on_hyphens=False, replace_whitespace=False))
163163
rewrapped_p = "\n".join(new_lines)
164164
else:
165-
rewrapped_p = textwrap.fill("".join(p), width=72, break_long_words=False, break_on_hyphens=False, replace_whitespace=True)
165+
indent = ""
166+
if len(p) > 0 and p[0].startswith("Reviewers:"):
167+
indent = " "
168+
rewrapped_p = textwrap.fill("".join(p), subsequent_indent=indent, width=72, break_long_words=False, break_on_hyphens=False, replace_whitespace=True)
166169
new_paragraphs.append(rewrapped_p + "\n")
167170
body = "\n".join(new_paragraphs)
168171

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ Using compiled files:
109109

110110
Using docker image:
111111

112-
docker run -p 9092:9092 apache/kafka:3.7.0
112+
docker run -p 9092:9092 apache/kafka:latest
113113

114114
### Cleaning the build ###
115115
./gradlew clean

bin/kafka-streams-groups.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/bash
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.streams.StreamsGroupCommand "$@"

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3737,7 +3737,7 @@ project(':connect:mirror') {
37373737
testImplementation project(':core')
37383738
testImplementation project(':test-common:test-common-runtime')
37393739
testImplementation project(':server')
3740-
testImplementation project(':server-common').sourceSets.test.output
3740+
testImplementation project(':server-common')
37413741

37423742

37433743
testRuntimeOnly project(':connect:runtime')

checkstyle/import-control-server-common.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
</subpackage>
131131
<subpackage name="config">
132132
<allow pkg="org.apache.kafka.server"/>
133+
<allow pkg="org.apache.kafka.clients"/>
133134
</subpackage>
134135
</subpackage>
135136

checkstyle/import-control-storage.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
<allow pkg="com.fasterxml.jackson" />
8989
<allow pkg="com.yammer.metrics.core" />
9090
<allow pkg="org.apache.kafka.common" />
91+
<allow pkg="org.apache.kafka.config" />
9192
<allow pkg="org.apache.kafka.server"/>
9293
<allow pkg="org.apache.kafka.storage.internals"/>
9394
<allow pkg="org.apache.kafka.storage.log.metrics"/>

checkstyle/suppressions.xml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
106106

107107
<suppress checks="NPathComplexity"
108-
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
108+
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell|MockConsumer).java"/>
109109

110110
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
111111
files="CoordinatorClient.java"/>
@@ -168,6 +168,9 @@
168168
<suppress checks="NPathComplexity"
169169
files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/>
170170

171+
<suppress checks="ClassFanOutComplexity"
172+
files="ShareConsumerTest.java"/>
173+
171174
<!-- connect tests-->
172175
<suppress checks="ClassDataAbstractionCoupling"
173176
files="(DistributedHerder|KafkaBasedLog|WorkerSourceTaskWithTopicCreation|WorkerSourceTask)Test.java"/>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerConfig;
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.ConsumerRecords;
23+
import org.apache.kafka.clients.consumer.GroupProtocol;
24+
import org.apache.kafka.clients.producer.Producer;
25+
import org.apache.kafka.clients.producer.ProducerConfig;
26+
import org.apache.kafka.clients.producer.ProducerRecord;
27+
import org.apache.kafka.common.header.Header;
28+
import org.apache.kafka.common.header.internals.RecordHeader;
29+
import org.apache.kafka.common.test.ClusterInstance;
30+
import org.apache.kafka.common.test.TestUtils;
31+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
32+
import org.apache.kafka.common.test.api.ClusterTest;
33+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
34+
import org.apache.kafka.common.test.api.Type;
35+
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
36+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
37+
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
38+
import org.apache.kafka.server.config.ReplicationConfigs;
39+
import org.apache.kafka.server.config.ServerConfigs;
40+
import org.apache.kafka.server.config.ServerLogConfigs;
41+
42+
import java.time.Duration;
43+
import java.util.ArrayList;
44+
import java.util.Collections;
45+
import java.util.Iterator;
46+
import java.util.List;
47+
import java.util.Map;
48+
49+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
50+
import static org.junit.jupiter.api.Assertions.assertEquals;
51+
import static org.junit.jupiter.api.Assertions.assertTrue;
52+
53+
@ClusterTestDefaults(
54+
types = {Type.CO_KRAFT},
55+
serverProperties = {
56+
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
57+
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
58+
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
59+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
60+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
61+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
62+
@ClusterConfigProperty(key = ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"),
63+
@ClusterConfigProperty(key = "log.unclean.leader.election.enable", value = "false"),
64+
@ClusterConfigProperty(key = ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
65+
@ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
66+
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "200")
67+
}
68+
)
69+
public class TransactionsWithMaxInFlightOneTest {
70+
private static final String TOPIC1 = "topic1";
71+
private static final String TOPIC2 = "topic2";
72+
private static final String HEADER_KEY = "transactionStatus";
73+
private static final byte[] ABORTED_VALUE = "aborted".getBytes();
74+
private static final byte[] COMMITTED_VALUE = "committed".getBytes();
75+
76+
@ClusterTest
77+
public void testTransactionalProducerSingleBrokerMaxInFlightOne(ClusterInstance clusterInstance) throws InterruptedException {
78+
// We want to test with one broker to verify multiple requests queued on a connection
79+
assertEquals(1, clusterInstance.brokers().size());
80+
81+
clusterInstance.createTopic(TOPIC1, 4, (short) 1);
82+
clusterInstance.createTopic(TOPIC2, 4, (short) 1);
83+
84+
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
85+
ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-producer",
86+
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1
87+
))
88+
) {
89+
producer.initTransactions();
90+
91+
producer.beginTransaction();
92+
producer.send(new ProducerRecord<>(TOPIC2, null, "2".getBytes(), "2".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
93+
producer.send(new ProducerRecord<>(TOPIC1, null, "4".getBytes(), "4".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
94+
producer.flush();
95+
producer.abortTransaction();
96+
97+
producer.beginTransaction();
98+
producer.send(new ProducerRecord<>(TOPIC1, null, "1".getBytes(), "1".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
99+
producer.send(new ProducerRecord<>(TOPIC2, null, "3".getBytes(), "3".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
100+
producer.commitTransaction();
101+
102+
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
103+
ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>();
104+
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
105+
ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name(),
106+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
107+
ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"
108+
)
109+
)) {
110+
consumer.subscribe(List.of(TOPIC1, TOPIC2));
111+
TestUtils.waitForCondition(() -> {
112+
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
113+
records.forEach(consumerRecords::add);
114+
return consumerRecords.size() == 2;
115+
}, 15_000, () -> "Consumer with protocol " + groupProtocol.name + " should consume 2 records, but get " + consumerRecords.size());
116+
}
117+
consumerRecords.forEach(record -> {
118+
Iterator<Header> headers = record.headers().headers(HEADER_KEY).iterator();
119+
assertTrue(headers.hasNext());
120+
Header header = headers.next();
121+
assertArrayEquals(COMMITTED_VALUE, header.value(), "Record does not have the expected header value");
122+
});
123+
}
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)