Skip to content

KAFKA-18053 Clean up topology config #19995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
Expand All @@ -42,6 +43,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -83,17 +85,33 @@ public StreamsBuilder() {
*
* @param topologyConfigs the streams configs that apply at the topology level. Please refer to {@link TopologyConfig} for more detail
*/
@Deprecated
@SuppressWarnings("this-escape")
public StreamsBuilder(final TopologyConfig topologyConfigs) {
topology = newTopology(topologyConfigs);
internalTopologyBuilder = topology.internalTopologyBuilder;
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
}

public StreamsBuilder(final Map<String, Object> configs) {
this(Utils.mkObjectProperties(configs));
}

public StreamsBuilder(final Properties properties) {
topology = newTopology(properties);
internalTopologyBuilder = topology.internalTopologyBuilder;
internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
}

@Deprecated
protected Topology newTopology(final TopologyConfig topologyConfigs) {
return new Topology(topologyConfigs);
}

protected Topology newTopology(final Properties properties) {
return new Topology(properties);
}

/**
* Create a {@link KStream} from the specified topic.
* The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
Expand Down Expand Up @@ -574,7 +592,8 @@ public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final StoreBuilder<
* @return the {@link Topology} that represents the specified processing logic
*/
public synchronized Topology build() {
return build(null);
internalStreamsBuilder.buildAndOptimizeTopology();
return topology;
}

/**
Expand All @@ -584,6 +603,7 @@ public synchronized Topology build() {
* @param props the {@link Properties} used for building possibly optimized topology
* @return the {@link Topology} that represents the specified processing logic
*/
@Deprecated
public synchronized Topology build(final Properties props) {
internalStreamsBuilder.buildAndOptimizeTopology(props);
return topology;
Expand Down
62 changes: 62 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.kafka.streams.kstream.SessionWindowedSerializer;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.TaskConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
Expand All @@ -57,6 +59,7 @@
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.DslStoreSuppliers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -71,8 +74,10 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -81,6 +86,7 @@
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.config.ConfigDef.parseType;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSize;

/**
* Configuration for a {@link KafkaStreams} instance.
Expand Down Expand Up @@ -1515,6 +1521,18 @@ public StreamsConfig(final Map<?, ?> props) {
this(props, true);
}


public final int maxBufferedSize;
public final long cacheSize;
public final long maxTaskIdleMs;
public final long taskTimeoutMs;
public final String storeType;
public final Class<?> dslStoreSuppliers;
public final Supplier<TimestampExtractor> timestampExtractorSupplier;
public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier;
public final Supplier<ProcessingExceptionHandler> processingExceptionHandlerSupplier;
public final boolean ensureExplicitInternalResourceNaming;

@SuppressWarnings("this-escape")
protected StreamsConfig(final Map<?, ?> props,
final boolean doLog) {
Expand All @@ -1523,11 +1541,41 @@ protected StreamsConfig(final Map<?, ?> props,
if (eosEnabled) {
verifyEOSTransactionTimeoutCompatibility();
}

this.maxBufferedSize = this.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
this.cacheSize = totalCacheSize(this);
this.maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG);
this.taskTimeoutMs = getLong(TASK_TIMEOUT_MS_CONFIG);
this.timestampExtractorSupplier = () -> getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);

final String deserializationExceptionHandlerKey = (originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)
|| originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)) ?
DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG :
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
this.deserializationExceptionHandlerSupplier = () -> getConfiguredInstance(deserializationExceptionHandlerKey, DeserializationExceptionHandler.class);

this.processingExceptionHandlerSupplier = () -> getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class);
this.storeType = getString(DEFAULT_DSL_STORE_CONFIG);
this.dslStoreSuppliers = getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
this.ensureExplicitInternalResourceNaming = getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG);

verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
verifyClientTelemetryConfigs();
verifyStreamsProtocolCompatibility(doLog);
}

public TaskConfig getTaskConfig() {
return new TaskConfig(
maxTaskIdleMs,
taskTimeoutMs,
maxBufferedSize,
timestampExtractorSupplier.get(),
deserializationExceptionHandlerSupplier.get(),
processingExceptionHandlerSupplier.get(),
eosEnabled
);
}

private void verifyStreamsProtocolCompatibility(final boolean doLog) {
if (doLog && isStreamsProtocolEnabled()) {
log.warn("The streams rebalance protocol is still in development and should not be used in production. "
Expand Down Expand Up @@ -2146,6 +2194,20 @@ protected boolean isStreamsProtocolEnabled() {
return getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name());
}

/**
* @return the DslStoreSuppliers if the value was explicitly configured (either by
* {@link StreamsConfig#DEFAULT_DSL_STORE} or {@link StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG})
*/
public Optional<DslStoreSuppliers> resolveDslStoreSuppliers() {
if (this.originals().containsKey(DSL_STORE_SUPPLIERS_CLASS_CONFIG)) {
return Optional.of(Utils.newInstance(dslStoreSuppliers, DslStoreSuppliers.class));
} else if (this.originals().containsKey(DEFAULT_DSL_STORE_CONFIG)) {
return Optional.of(MaterializedInternal.parse(storeType));
} else {
return Optional.empty();
}
}

/**
* Override any client properties in the original configs with overrides
*
Expand Down
12 changes: 12 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.kstream.KStream;
Expand All @@ -38,7 +39,9 @@
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.state.StoreBuilder;

import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;

Expand All @@ -65,10 +68,19 @@ public Topology() {
this(new InternalTopologyBuilder());
}

@Deprecated
public Topology(final TopologyConfig topologyConfigs) {
this(new InternalTopologyBuilder(topologyConfigs));
}

public Topology(final Map<String, Object> configs) {
this(Utils.mkObjectProperties(configs));
}

public Topology(final Properties properties) {
this(new InternalTopologyBuilder(new StreamsConfig(properties)));
}

protected Topology(final InternalTopologyBuilder internalTopologyBuilder) {
this.internalTopologyBuilder = internalTopologyBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
* If they are only set in the configs passed in to the KafkaStreams constructor, it will be too late for them
* to be applied and the config will be ignored.
*/
@SuppressWarnings("deprecation")
@Deprecated
public final class TopologyConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
static {
Expand Down Expand Up @@ -294,6 +294,7 @@ public Materialized.StoreType parseStoreType() {
* @return the DslStoreSuppliers if the value was explicitly configured (either by
* {@link StreamsConfig#DEFAULT_DSL_STORE} or {@link StreamsConfig#DSL_STORE_SUPPLIERS_CLASS_CONFIG})
*/
@Deprecated
public Optional<DslStoreSuppliers> resolveDslStoreSuppliers() {
if (isTopologyOverride(DSL_STORE_SUPPLIERS_CLASS_CONFIG, topologyOverrides) || globalAppConfigs.originals().containsKey(DSL_STORE_SUPPLIERS_CLASS_CONFIG)) {
return Optional.of(Utils.newInstance(dslStoreSuppliers, DslStoreSuppliers.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,22 +297,30 @@ private void maybeAddNodeForVersionedSemanticsMetadata(final GraphNode node) {
}
}

// use this method for testing only
public void buildAndOptimizeTopology() {
buildAndOptimizeTopology(null);
buildAndOptimizeTopologyInternal(null);
}

@Deprecated
public void buildAndOptimizeTopology(final Properties props) {
buildAndOptimizeTopologyInternal(props);
}

private void buildAndOptimizeTopologyInternal(final Properties props) {
mergeDuplicateSourceNodes();
optimizeTopology(props);
if (props != null) {
optimizeTopology(props);
} else {
optimizeTopology();
}
enableVersionedSemantics();

final PriorityQueue<GraphNode> graphNodePriorityQueue = new PriorityQueue<>(5, Comparator.comparing(GraphNode::buildPriority));

final PriorityQueue<GraphNode> graphNodePriorityQueue =
new PriorityQueue<>(5, Comparator.comparing(GraphNode::buildPriority));
graphNodePriorityQueue.offer(root);

while (!graphNodePriorityQueue.isEmpty()) {
final GraphNode streamGraphNode = graphNodePriorityQueue.remove();
final GraphNode streamGraphNode = graphNodePriorityQueue.poll();

if (LOG.isDebugEnabled()) {
LOG.debug("Adding nodes to topology {} child nodes {}", streamGraphNode, streamGraphNode.children());
Expand All @@ -323,14 +331,13 @@ public void buildAndOptimizeTopology(final Properties props) {
streamGraphNode.setHasWrittenToTopology(true);
}

for (final GraphNode graphNode : streamGraphNode.children()) {
graphNodePriorityQueue.offer(graphNode);
for (final GraphNode child : streamGraphNode.children()) {
graphNodePriorityQueue.offer(child);
}
}
internalTopologyBuilder.validateCopartition();

internalTopologyBuilder.validateCopartition();
internalTopologyBuilder.checkUnprovidedNames();

}

/**
Expand All @@ -339,12 +346,36 @@ public void buildAndOptimizeTopology(final Properties props) {
*/
private void optimizeTopology(final Properties props) {
final Set<String> optimizationConfigs;

if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
optimizationConfigs = Collections.emptySet();
} else {
optimizationConfigs = StreamsConfig.verifyTopologyOptimizationConfigs(
(String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
(String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)
);
}

applyOptimizations(optimizationConfigs);
}

/**
* This method is called after the topology has been built, and it applies the optimizations
* based on the configuration set in the topology configs.
*/
private void optimizeTopology() {
final StreamsConfig topicSpecificConfigs = internalTopologyBuilder.topologySpecificConfigs();
final String configValue = topicSpecificConfigs != null
? topicSpecificConfigs.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)
: null;

final Set<String> optimizationConfigs = configValue != null
? StreamsConfig.verifyTopologyOptimizationConfigs(configValue)
: Collections.emptySet();

applyOptimizations(optimizationConfigs);
}

private void applyOptimizations(final Set<String> optimizationConfigs) {
if (optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) {
LOG.debug("Optimizing the Kafka Streams graph for ktable source nodes");
reuseKTableSourceTopics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public MaterializedInternal(final Materialized<K, V, S> materialized,
this(materialized, nameProvider, generatedStorePrefix, false);
}

@SuppressWarnings("deprecation")
public MaterializedInternal(final Materialized<K, V, S> materialized,
final InternalNameProvider nameProvider,
final String generatedStorePrefix,
Expand All @@ -68,9 +69,14 @@ public MaterializedInternal(final Materialized<K, V, S> materialized,
// is configured with the main StreamsConfig
if (dslStoreSuppliers == null) {
if (nameProvider instanceof InternalStreamsBuilder) {
final TopologyConfig topologyConfig = ((InternalStreamsBuilder) nameProvider).internalTopologyBuilder.topologyConfigs();
if (topologyConfig != null) {
dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null);
final StreamsConfig streamsConfig = ((InternalStreamsBuilder) nameProvider).internalTopologyBuilder.topologySpecificConfigs();
if (streamsConfig != null) {
dslStoreSuppliers = streamsConfig.resolveDslStoreSuppliers().orElse(null);
} else {
final TopologyConfig topologyConfig = ((InternalStreamsBuilder) nameProvider).internalTopologyBuilder.topologyConfigs();
if (topologyConfig != null) {
dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.state.DslStoreSuppliers;
Expand All @@ -32,6 +33,7 @@ public class StreamJoinedInternal<K, V1, V2> extends StreamJoined<K, V1, V2> {
// store in the desired order (see comments in OuterStreamJoinFactory)
private final DslStoreSuppliers passedInDslStoreSuppliers;

@SuppressWarnings("deprecation")
//Needs to be public for testing
public StreamJoinedInternal(
final StreamJoined<K, V1, V2> streamJoined,
Expand All @@ -40,9 +42,14 @@ public StreamJoinedInternal(
super(streamJoined);
passedInDslStoreSuppliers = dslStoreSuppliers;
if (dslStoreSuppliers == null) {
final TopologyConfig topologyConfig = builder.internalTopologyBuilder().topologyConfigs();
if (topologyConfig != null) {
dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null);
final StreamsConfig streamsConfig = builder.internalTopologyBuilder.topologySpecificConfigs();
if (streamsConfig != null) {
dslStoreSuppliers = streamsConfig.resolveDslStoreSuppliers().orElse(null);
} else {
final TopologyConfig topologyConfig = builder.internalTopologyBuilder().topologyConfigs();
if (topologyConfig != null) {
dslStoreSuppliers = topologyConfig.resolveDslStoreSuppliers().orElse(null);
}
}
}
}
Expand Down
Loading