diff --git a/build.gradle b/build.gradle
index 2e35057165c53..61b68edfe67de 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1420,6 +1420,7 @@ project(':group-coordinator') {
implementation libs.hdrHistogram
implementation libs.re2j
implementation libs.slf4jApi
+ implementation libs.guava
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml
index 8b6a8d99f5eaa..341ac8984ab93 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -77,6 +77,7 @@
+
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index f6f7bd68e8363..a4e94e44080a5 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -61,6 +61,7 @@ versions += [
classgraph: "4.8.173",
gradle: "8.10.2",
grgit: "4.1.1",
+ guava: "33.4.0-jre",
httpclient: "4.5.14",
jackson: "2.16.2",
jacoco: "0.8.10",
@@ -147,6 +148,7 @@ libs += [
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsValidator: "commons-validator:commons-validator:$versions.commonsValidator",
+ guava: "com.google.guava:guava:$versions.guava",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
jacksonDatabindYaml: "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$versions.jackson",
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 54d7e98d4b7be..34b17ff2533f4 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -19,11 +19,21 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.metadata.BrokerRegistration;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
@@ -209,4 +219,50 @@ void validateOffsetFetch(
default boolean shouldExpire() {
return true;
}
+
+ /**
+ * Computes the hash of the topics in a group.
+ *
+ * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash.
+ * @return The hash of the group.
+ */
+ static long computeGroupHash(Map topicHashes) {
+ return Hashing.combineOrdered(
+ topicHashes.entrySet()
+ .stream()
+ .sorted(Map.Entry.comparingByKey())
+ .map(e -> HashCode.fromLong(e.getValue()))
+ .toList()
+ ).asLong();
+ }
+
+ /**
+ * Computes the hash of the topic id, name, number of partitions, and partition racks by Murmur3.
+ *
+ * @param topicImage The topic image.
+ * @param clusterImage The cluster image.
+ * @return The hash of the topic.
+ */
+ static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) {
+ HashFunction hf = Hashing.murmur3_128();
+ Hasher topicHasher = hf.newHasher()
+ .putByte((byte) 0) // magic byte
+ .putLong(topicImage.id().hashCode()) // topic Id
+ .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+ .putInt(topicImage.partitions().size()); // number of partitions
+
+ topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
+ topicHasher.putInt(entry.getKey()); // partition id
+ String racks = Arrays.stream(entry.getValue().replicas)
+ .mapToObj(clusterImage::broker)
+ .filter(Objects::nonNull)
+ .map(BrokerRegistration::rack)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .sorted()
+ .collect(Collectors.joining(";"));
+ topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
+ });
+ return topicHasher.hash().asLong();
+ }
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
new file mode 100644
index 0000000000000..679bcfdf3e1e4
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.MetadataImage;
+
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class GroupTest {
+ private static final Uuid FOO_TOPIC_ID = Uuid.randomUuid();
+ private static final String FOO_TOPIC_NAME = "foo";
+ private static final String BAR_TOPIC_NAME = "bar";
+ private static final int FOO_NUM_PARTITIONS = 2;
+ private static final MetadataImage FOO_METADATA_IMAGE = new MetadataImageBuilder()
+ .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+ .addRacks()
+ .build();
+
+ @Test
+ void testComputeTopicHash() {
+ long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
+
+ HashFunction hf = Hashing.murmur3_128();
+ Hasher topicHasher = hf.newHasher()
+ .putByte((byte) 0) // magic byte
+ .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+ .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
+ .putInt(FOO_NUM_PARTITIONS) // number of partitions
+ .putInt(0) // partition 0
+ .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
+ .putInt(1) // partition 1
+ .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
+ assertEquals(topicHasher.hash().asLong(), result);
+ }
+
+ @Test
+ void testComputeTopicHashWithDifferentMagicByte() {
+ long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
+
+ HashFunction hf = Hashing.murmur3_128();
+ Hasher topicHasher = hf.newHasher()
+ .putByte((byte) 1) // different magic byte
+ .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+ .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
+ .putInt(FOO_NUM_PARTITIONS) // number of partitions
+ .putInt(0) // partition 0
+ .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0
+ .putInt(1) // partition 1
+ .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
+ assertNotEquals(topicHasher.hash().asLong(), result);
+ }
+
+ @Test
+ void testComputeTopicHashWithDifferentPartitionOrder() {
+ long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
+
+ HashFunction hf = Hashing.murmur3_128();
+ Hasher topicHasher = hf.newHasher()
+ .putByte((byte) 0) // magic byte
+ .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+ .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
+ .putInt(FOO_NUM_PARTITIONS) // number of partitions
+ // different partition order
+ .putInt(1) // partition 1
+ .putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1
+ .putInt(0) // partition 0
+ .putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0
+ assertNotEquals(topicHasher.hash().asLong(), result);
+ }
+
+ @Test
+ void testComputeTopicHashWithDifferentRackOrder() {
+ long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
+
+ HashFunction hf = Hashing.murmur3_128();
+ Hasher topicHasher = hf.newHasher()
+ .putByte((byte) 0) // magic byte
+ .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+ .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name
+ .putInt(FOO_NUM_PARTITIONS) // number of partitions
+ .putInt(0) // partition 0
+ .putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0
+ .putInt(1) // partition 1
+ .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1
+ assertNotEquals(topicHasher.hash().asLong(), result);
+ }
+
+ @ParameterizedTest
+ @MethodSource("differentFieldGenerator")
+ void testComputeTopicHashWithDifferentField(MetadataImage differentImage, Uuid topicId) {
+ long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster());
+
+ assertNotEquals(
+ Group.computeTopicHash(
+ differentImage.topics().getTopic(topicId),
+ differentImage.cluster()
+ ),
+ result
+ );
+ }
+
+ private static Stream differentFieldGenerator() {
+ Uuid differentTopicId = Uuid.randomUuid();
+ return Stream.of(
+ Arguments.of(new MetadataImageBuilder() // different topic id
+ .addTopic(differentTopicId, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+ .addRacks()
+ .build(),
+ differentTopicId
+ ),
+ Arguments.of(new MetadataImageBuilder() // different topic name
+ .addTopic(FOO_TOPIC_ID, "bar", FOO_NUM_PARTITIONS)
+ .addRacks()
+ .build(),
+ FOO_TOPIC_ID
+ ),
+ Arguments.of(new MetadataImageBuilder() // different partitions
+ .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, 1)
+ .addRacks()
+ .build(),
+ FOO_TOPIC_ID
+ ),
+ Arguments.of(new MetadataImageBuilder() // different racks
+ .addTopic(FOO_TOPIC_ID, FOO_TOPIC_NAME, FOO_NUM_PARTITIONS)
+ .build(),
+ FOO_TOPIC_ID
+ )
+ );
+ }
+
+ @Test
+ void testComputeGroupHash() {
+ long result = Group.computeGroupHash(Map.of(
+ BAR_TOPIC_NAME, 123L,
+ FOO_TOPIC_NAME, 456L
+ ));
+
+ long expected = Hashing.combineOrdered(List.of(
+ HashCode.fromLong(123L),
+ HashCode.fromLong(456L)
+ )).asLong();
+ assertEquals(expected, result);
+ }
+
+ @Test
+ void testComputeGroupHashWithDifferentOrder() {
+ long result = Group.computeGroupHash(Map.of(
+ BAR_TOPIC_NAME, 123L,
+ FOO_TOPIC_NAME, 456L
+ ));
+
+ long unexpected = Hashing.combineOrdered(List.of(
+ HashCode.fromLong(456L),
+ HashCode.fromLong(123L)
+ )).asLong();
+ assertNotEquals(unexpected, result);
+ }
+}