Skip to content

Commit b8b3d0f

Browse files
committed
KAFKA-17259: Support to override serverProperties and restart cluster in ClusterTestExtensions
Signed-off-by: PoAn Yang <[email protected]>
1 parent 2e4a378 commit b8b3d0f

File tree

4 files changed

+167
-1
lines changed

4 files changed

+167
-1
lines changed

test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,13 @@
5858
import java.util.Collection;
5959
import java.util.Collections;
6060
import java.util.HashMap;
61+
import java.util.HashSet;
6162
import java.util.List;
6263
import java.util.Map;
6364
import java.util.Map.Entry;
6465
import java.util.Optional;
6566
import java.util.Properties;
67+
import java.util.Set;
6668
import java.util.concurrent.CompletableFuture;
6769
import java.util.concurrent.ExecutionException;
6870
import java.util.concurrent.ExecutorService;
@@ -314,8 +316,9 @@ private static void setupNodeDirectories(File baseDirectory,
314316
private final Map<Integer, BrokerServer> brokers;
315317
private final File baseDirectory;
316318
private final SimpleFaultHandlerFactory faultHandlerFactory;
317-
private final PreboundSocketFactoryManager socketFactoryManager;
319+
private PreboundSocketFactoryManager socketFactoryManager;
318320
private final String controllerListenerName;
321+
private Map<Integer, Set<String>> nodeIdToListeners = new HashMap<>();
319322

320323
private KafkaClusterTestKit(
321324
TestKitNodes nodes,
@@ -437,6 +440,130 @@ public void startup() throws ExecutionException, InterruptedException {
437440
}
438441
}
439442

443+
public void shutdown() throws Exception {
444+
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
445+
try {
446+
// Note the shutdown order here is chosen to be consistent with
447+
// `KafkaRaftServer`. See comments in that class for an explanation.
448+
for (Entry<Integer, BrokerServer> entry : brokers.entrySet()) {
449+
int brokerId = entry.getKey();
450+
BrokerServer broker = entry.getValue();
451+
nodeIdToListeners.computeIfAbsent(brokerId, __ -> new HashSet<>());
452+
Set<String> listeners = nodeIdToListeners.get(brokerId);
453+
broker.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
454+
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
455+
});
456+
if (!broker.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
457+
listeners.add(broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
458+
broker.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
459+
broker.socketServer().controlPlaneAcceptorOpt().get().localPort());
460+
}
461+
nodeIdToListeners.put(brokerId, listeners);
462+
futureEntries.add(new SimpleImmutableEntry<>("broker" + brokerId,
463+
executorService.submit((Runnable) broker::shutdown)));
464+
}
465+
waitForAllFutures(futureEntries);
466+
futureEntries.clear();
467+
for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
468+
int controllerId = entry.getKey();
469+
ControllerServer controller = entry.getValue();
470+
nodeIdToListeners.computeIfAbsent(controllerId, __ -> new HashSet<>());
471+
Set<String> listeners = nodeIdToListeners.get(controllerId);
472+
controller.socketServer().dataPlaneAcceptors().forEach((endpoint, acceptor) -> {
473+
listeners.add(endpoint.listenerName().value() + "://" + endpoint.host() + ":" + acceptor.localPort());
474+
});
475+
if (!controller.socketServer().controlPlaneAcceptorOpt().isEmpty()) {
476+
listeners.add(controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().listenerName().value() + "://" +
477+
controller.socketServer().controlPlaneAcceptorOpt().get().endPoint().host() + ":" +
478+
controller.socketServer().controlPlaneAcceptorOpt().get().localPort());
479+
}
480+
nodeIdToListeners.put(controllerId, listeners);
481+
futureEntries.add(new SimpleImmutableEntry<>("controller" + controllerId,
482+
executorService.submit(controller::shutdown)));
483+
}
484+
waitForAllFutures(futureEntries);
485+
futureEntries.clear();
486+
socketFactoryManager.close();
487+
} catch (Exception e) {
488+
for (Entry<String, Future<?>> entry : futureEntries) {
489+
entry.getValue().cancel(true);
490+
}
491+
throw e;
492+
}
493+
}
494+
495+
public void restart(Map<Integer, Map<String, Object>> perServerOverriddenConfig) throws Exception {
496+
shutdown();
497+
498+
Map<Integer, SharedServer> jointServers = new HashMap<>();
499+
500+
socketFactoryManager = new PreboundSocketFactoryManager();
501+
controllers.forEach((id, controller) -> {
502+
Map<String, Object> config = controller.config().originals();
503+
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
504+
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
505+
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));
506+
507+
TestKitNode node = nodes.controllerNodes().get(id);
508+
KafkaConfig nodeConfig = new KafkaConfig(config, false);
509+
SharedServer sharedServer = new SharedServer(
510+
nodeConfig,
511+
node.initialMetaPropertiesEnsemble(),
512+
Time.SYSTEM,
513+
new Metrics(),
514+
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())),
515+
Collections.emptyList(),
516+
faultHandlerFactory,
517+
socketFactoryManager.getOrCreateSocketFactory(node.id())
518+
);
519+
try {
520+
controller = new ControllerServer(
521+
sharedServer,
522+
KafkaRaftServer.configSchema(),
523+
nodes.bootstrapMetadata());
524+
} catch (Throwable e) {
525+
log.error("Error creating controller {}", node.id(), e);
526+
Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController);
527+
throw e;
528+
}
529+
controllers.put(node.id(), controller);
530+
jointServers.put(node.id(), sharedServer);
531+
});
532+
533+
brokers.forEach((id, broker) -> {
534+
Map<String, Object> config = broker.config().originals();
535+
config.putAll(perServerOverriddenConfig.getOrDefault(-1, Collections.emptyMap()));
536+
config.putAll(perServerOverriddenConfig.getOrDefault(id, Collections.emptyMap()));
537+
config.put(SocketServerConfigs.LISTENERS_CONFIG, String.join(",", nodeIdToListeners.get(id)));
538+
539+
TestKitNode node = nodes.brokerNodes().get(id);
540+
KafkaConfig nodeConfig = new KafkaConfig(config);
541+
SharedServer sharedServer = jointServers.computeIfAbsent(
542+
node.id(),
543+
nodeId -> new SharedServer(
544+
nodeConfig,
545+
node.initialMetaPropertiesEnsemble(),
546+
Time.SYSTEM,
547+
new Metrics(),
548+
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(nodeConfig.quorumConfig().voters())),
549+
Collections.emptyList(),
550+
faultHandlerFactory,
551+
socketFactoryManager.getOrCreateSocketFactory(node.id())
552+
)
553+
);
554+
try {
555+
broker = new BrokerServer(sharedServer);
556+
} catch (Throwable e) {
557+
log.error("Error creating broker {}", node.id(), e);
558+
Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker);
559+
throw e;
560+
}
561+
brokers.put(node.id(), broker);
562+
});
563+
564+
startup();
565+
}
566+
440567
/**
441568
* Wait for a controller to mark all the brokers as ready (registered and unfenced).
442569
* And also wait for the metadata cache up-to-date in each broker server.

test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ default SocketServer anyControllerSocketServer() {
155155
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
156156
}
157157

158+
default void restart() throws Exception {
159+
restart(Map.of());
160+
}
161+
162+
void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception;
163+
158164
String clusterId();
159165

160166
//---------------------------[producer/consumer/admin]---------------------------//

test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ public void stop() {
193193
}
194194
}
195195

196+
@Override
197+
public void restart(Map<Integer, Map<String, Object>> perServerConfigOverrides) throws Exception {
198+
clusterTestKit.restart(perServerConfigOverrides);
199+
}
200+
196201
@Override
197202
public void shutdownBroker(int brokerId) {
198203
findBrokerOrThrow(brokerId).shutdown();

test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,4 +331,32 @@ public void testControllerListenerName(ClusterInstance cluster) throws Execution
331331
assertEquals(1, admin.describeMetadataQuorum().quorumInfo().get().nodes().size());
332332
}
333333
}
334+
335+
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, serverProperties = {
336+
@ClusterConfigProperty(key = "offset.storage.replication.factor", value = "1"),
337+
})
338+
public void testRestartWithOverriddenConfig(ClusterInstance clusterInstance) throws Exception {
339+
clusterInstance.restart(Collections.singletonMap(-1, Collections.singletonMap("default.replication.factor", 2)));
340+
clusterInstance.waitForReadyBrokers();
341+
clusterInstance.brokers().values().forEach(broker -> {
342+
Assertions.assertEquals(2, broker.config().getInt("default.replication.factor"));
343+
});
344+
clusterInstance.controllers().values().forEach(controller -> {
345+
Assertions.assertEquals(2, controller.config().getInt("default.replication.factor"));
346+
});
347+
}
348+
349+
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, serverProperties = {
350+
@ClusterConfigProperty(key = "offset.storage.replication.factor", value = "1"),
351+
})
352+
public void testRestartWithoutOverriddenConfig(ClusterInstance clusterInstance) throws Exception {
353+
clusterInstance.restart();
354+
clusterInstance.waitForReadyBrokers();
355+
clusterInstance.brokers().values().forEach(broker -> {
356+
Assertions.assertEquals(1, broker.config().getInt("default.replication.factor"));
357+
});
358+
clusterInstance.controllers().values().forEach(controller -> {
359+
Assertions.assertEquals(1, controller.config().getInt("default.replication.factor"));
360+
});
361+
}
334362
}

0 commit comments

Comments
 (0)