Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.MissingInternalTopicsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Tag("integration")
@Timeout(600)
public class InitIntegrationTest {

private static final String INPUT_TOPIC = "input-topic";

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopics(INPUT_TOPIC);
}

@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}

private Properties streamsConfig;

@BeforeEach
public void before(final TestInfo testInfo) {
streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "init-test-" + safeUniqueTestName(testInfo));
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}

@AfterEach
public void after() throws IOException {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
}

@Test
public void shouldInitAndStartInManualMode() throws Exception {
streamsConfig.put(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG, StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL);

final StreamsBuilder builder = new StreamsBuilder();
builder.stream(INPUT_TOPIC).groupByKey().count();

try (final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig)) {
streams.init();

// Verify internal topics were created on the broker
try (final Admin admin = Admin.create(adminConfig())) {
final Set<String> topics = admin.listTopics().names().get();
final String appId = streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
assertTrue(topics.stream().anyMatch(t -> t.contains(appId) && t.contains("changelog")),
"Expected changelog topic to exist after init(), found: " + topics);
}

// App should reach RUNNING after init
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
}
}

@Test
public void shouldThrowWhenInternalTopicDeletedBetweenRestartsInManualMode() throws Exception {
streamsConfig.put(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG, StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL);

final StreamsBuilder builder = new StreamsBuilder();
builder.stream(INPUT_TOPIC).groupByKey().count();

// First run: init + start + stop normally
try (final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig)) {
streams.init();
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
}

// Delete a changelog topic while the app is stopped
final String appId = streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
try (final Admin admin = Admin.create(adminConfig())) {
final Set<String> topics = admin.listTopics().names().get();
final String changelogTopic = topics.stream()
.filter(t -> t.contains(appId) && t.contains("changelog"))
.findFirst()
.orElseThrow(() -> new AssertionError("No changelog topic found in: " + topics));

admin.deleteTopics(Set.of(changelogTopic)).all().get();
}

// Second run: start without init() — rebalance calls makeReady() for changelog
// topics, which tries to recreate the missing topic. Guard throws in manual mode.
final AtomicReference<Throwable> caughtException = new AtomicReference<>();

try (final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig)) {
streams.setUncaughtExceptionHandler(exception -> {
caughtException.set(exception);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});

streams.start();

TestUtils.waitForCondition(
() -> streams.state() == State.ERROR || streams.state() == State.NOT_RUNNING,
60000,
() -> "Streams did not reach ERROR state. State: " + streams.state()
+ ", exception: " + caughtException.get()
);

assertTrue(caughtException.get() instanceof MissingInternalTopicsException,
"Expected MissingInternalTopicsException but got: " + caughtException.get());
}
}

@Test
public void shouldAutoCreateInternalTopicsWithoutInit() throws Exception {
// Default config is automatic mode — no init() call needed
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(INPUT_TOPIC).groupByKey().count();

try (final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);

// Verify internal topics were auto-created during rebalance
try (final Admin admin = Admin.create(adminConfig())) {
final Set<String> topics = admin.listTopics().names().get();
final String appId = streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
assertTrue(topics.stream().anyMatch(t -> t.contains(appId) && t.contains("changelog")),
"Expected changelog topic to be auto-created, found: " + topics);
}
}
}

private Properties adminConfig() {
final Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
return props;
}
}
134 changes: 134 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.InternalTopicsAlreadySetupException;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException;
import org.apache.kafka.streams.errors.MissingInternalTopicsException;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
Expand All @@ -60,6 +64,10 @@
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
Expand Down Expand Up @@ -297,6 +305,132 @@ public boolean isValidTransition(final State newState) {
private final Object stateLock = new Object();
protected volatile State state = State.CREATED;

/**
* Initializes broker-side state for this Kafka Streams application.
* <p>
* Kafka Streams creates internal topics on the broker for fault-tolerance and repartitioning.
* This method validates and optionally creates those internal topics before starting the application.
*
* @throws MissingSourceTopicException if a source topic is missing
* @throws MissingInternalTopicsException if some but not all of the internal topics are missing
* @throws MisconfiguredInternalTopicException if an internal topic is misconfigured
* @throws InternalTopicsAlreadySetupException if all internal topics are already setup
*/
public void init() {
this.doInit(new InitParameters());
}

/**
* Initializes broker-side state for this Kafka Streams application.
* <p>
* Kafka Streams creates internal topics on the broker for fault-tolerance and repartitioning.
* This method validates and optionally creates those internal topics before starting the application.
*
* @param initParameters parameters controlling initialization behavior
* @throws MissingSourceTopicException if a source topic is missing
* @throws MissingInternalTopicsException if some but not all of the internal topics are missing
* and the parameters do not specify to set them up
* @throws MisconfiguredInternalTopicException if an internal topic is misconfigured
* @throws InternalTopicsAlreadySetupException if all internal topics are already setup
* @throws TimeoutException if initialization exceeds the configured timeout
*/
public void init(final InitParameters initParameters) {
this.doInit(initParameters);
}

private void doInit(final InitParameters initParameters) {
synchronized (stateLock) {
if (state != State.CREATED) {
throw new IllegalStateException("Can only call init() in CREATED state. Current state is: " + state);
}
}

final InternalTopicManager internalTopicManager = new InternalTopicManager(time, adminClient, applicationConfigs);
initParameters.timeout.ifPresent(internalTopicManager::setInitTimeout);

final Map<String, InternalTopicConfig> allInternalTopics = new HashMap<>();
final Set<String> allSourceTopics = new HashSet<>();

for (final Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> subtopologyMap : topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) {
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : subtopologyMap.values()) {
allInternalTopics.putAll(topicsInfo.stateChangelogTopics);
allInternalTopics.putAll(topicsInfo.repartitionSourceTopics);
allSourceTopics.addAll(topicsInfo.sourceTopics);
}
}

final ValidationResult validationResult = internalTopicManager.validate(allInternalTopics);

final boolean noInternalTopicsExist = allInternalTopics.keySet().equals(validationResult.missingTopics());
final boolean internalTopicsMisconfigured = !validationResult.misconfigurationsForTopics().isEmpty();
final boolean allInternalTopicsExist = validationResult.missingTopics().isEmpty();
final boolean missingSourceTopics = !Collections.disjoint(validationResult.missingTopics(), allSourceTopics);

if (internalTopicsMisconfigured) {
throw new MisconfiguredInternalTopicException("Misconfigured Internal Topics: " + validationResult.misconfigurationsForTopics());
}

if (missingSourceTopics) {
allSourceTopics.retainAll(validationResult.missingTopics());
throw new MissingSourceTopicException("Missing source topics: " + allSourceTopics);
}

if (noInternalTopicsExist) {
internalTopicManager.setup(allInternalTopics);
return;
}

if (allInternalTopicsExist) {
throw new InternalTopicsAlreadySetupException("All internal topics have already been setup");
} else {
if (initParameters.setupInternalTopicsIfIncompleteEnabled()) {
final Map<String, InternalTopicConfig> topicsToCreate = new HashMap<>();
for (final String missingTopic : validationResult.missingTopics()) {
topicsToCreate.put(missingTopic, allInternalTopics.get(missingTopic));
}
internalTopicManager.makeReady(topicsToCreate, true);
} else {
throw new MissingInternalTopicsException("Missing Internal Topics: ", new ArrayList<>(validationResult.missingTopics()));
}
}
}

public static class InitParameters {
private boolean setupInternalTopicsIfIncomplete;
protected Optional<Duration> timeout = Optional.empty();

private InitParameters() {
this.setupInternalTopicsIfIncomplete = false;
}

public static InitParameters initParameters() {
return new InitParameters();
}

public static InitParameters timeout(final Duration timeout) {
return new InitParameters().withTimeout(timeout);
}

public InitParameters withTimeout(final Duration timeout) {
this.timeout = Optional.ofNullable(timeout);
return this;
}

public InitParameters enableSetupInternalTopicsIfIncomplete() {
this.setupInternalTopicsIfIncomplete = true;
return this;
}

public InitParameters disableSetupInternalTopicsIfIncomplete() {
this.setupInternalTopicsIfIncomplete = false;
return this;
}

public boolean setupInternalTopicsIfIncompleteEnabled() {
return setupInternalTopicsIfIncomplete;
}
}

private boolean waitOnStates(final long waitMs, final State... targetStates) {
final Set<State> targetStateSet = Set.of(targetStates);
final long begin = time.milliseconds();
Expand Down
15 changes: 15 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,14 @@ public class StreamsConfig extends AbstractConfig {
"Whether to use the configured <code>" + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + "</code> during global store/KTable processing. " +
"Disabled by default. This config will be removed in Kafka Streams 5.0, where global exception handling will be enabled by default";

public static final String INTERNAL_TOPIC_SETUP_CONFIG = "internal.topics.setup";
public static final String INTERNAL_TOPIC_SETUP_AUTOMATIC = "automatic";
public static final String INTERNAL_TOPIC_SETUP_MANUAL = "manual";
private static final String INTERNAL_TOPIC_SETUP_DOC =
"Configures how internal topics (e.g., repartition or changelog topics) should be created. " +
"Set to 'automatic' to allow internal topics to be created during a rebalance (default). " +
"Set to 'manual' to disable automatic creation. Users must call KafkaStreams#init() instead.";

static {
CONFIG = new ConfigDef()

Expand Down Expand Up @@ -1017,6 +1025,12 @@ public class StreamsConfig extends AbstractConfig {
LogAndFailProcessingExceptionHandler.class.getName(),
Importance.MEDIUM,
PROCESSING_EXCEPTION_HANDLER_CLASS_DOC)
.define(INTERNAL_TOPIC_SETUP_CONFIG,
ConfigDef.Type.STRING,
INTERNAL_TOPIC_SETUP_AUTOMATIC,
ConfigDef.ValidString.in(INTERNAL_TOPIC_SETUP_AUTOMATIC, INTERNAL_TOPIC_SETUP_MANUAL),
Importance.MEDIUM,
INTERNAL_TOPIC_SETUP_DOC)
.define(PROCESSING_GUARANTEE_CONFIG,
Type.STRING,
AT_LEAST_ONCE,
Expand Down Expand Up @@ -1887,6 +1901,7 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St
consumerProps.put(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
consumerProps.put(TASK_ASSIGNOR_CLASS_CONFIG, getString(TASK_ASSIGNOR_CLASS_CONFIG));
consumerProps.put(INTERNAL_TOPIC_SETUP_CONFIG, getString(INTERNAL_TOPIC_SETUP_CONFIG));

// verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
Expand Down
Loading