From 889657d36154cc5c019a1c86843cbfd8acb6b6b1 Mon Sep 17 00:00:00 2001 From: k-apol Date: Sun, 29 Mar 2026 08:29:46 -0500 Subject: [PATCH] KAFKA-10357: Add explicit internal topic initialization for Kafka Streams (KIP-698) Implements KIP-698 to prevent silent data loss when internal topics (repartition/changelog) are accidentally deleted. New public API: - KafkaStreams#init() and init(InitParameters) to explicitly create and validate internal topics before starting the application - InitParameters with fluent builder (following CloseOptions pattern): timeout(), withTimeout(), enableSetupInternalTopicsIfIncomplete() - StreamsConfig 'internal.topics.setup' config: 'automatic' (default, backward compatible) or 'manual' (requires init(), throws on missing topics during rebalance) New exceptions (already merged via #19988): - MissingInternalTopicsException - MisconfiguredInternalTopicException - InternalTopicsAlreadySetupException Behavioral changes: - In manual mode, makeReady() throws MissingInternalTopicsException when topics need creating during rebalance (instead of silently recreating them) - init() validates state (CREATED only), source topics, internal topic existence and configuration - INTERNAL_TOPIC_SETUP_CONFIG forwarded to consumer configs for the partition assignor Reviewers: Matthias J. Sax --- .../integration/InitIntegrationTest.java | 183 ++++++++++++++++ .../apache/kafka/streams/KafkaStreams.java | 134 ++++++++++++ .../apache/kafka/streams/StreamsConfig.java | 15 ++ .../internals/InternalTopicManager.java | 23 ++ .../kafka/streams/KafkaStreamsTest.java | 198 ++++++++++++++++++ .../kafka/streams/StreamsConfigTest.java | 19 ++ .../internals/InternalTopicManagerTest.java | 47 +++++ 7 files changed, 619 insertions(+) create mode 100644 streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InitIntegrationTest.java diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InitIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InitIntegrationTest.java new file mode 100644 index 0000000000000..c8f5e13563162 --- /dev/null +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InitIntegrationTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.MissingInternalTopicsException; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("integration") +@Timeout(600) +public class InitIntegrationTest { + + private static final String INPUT_TOPIC = "input-topic"; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @BeforeAll + public static void startCluster() throws IOException, InterruptedException { + CLUSTER.start(); + CLUSTER.createTopics(INPUT_TOPIC); + } + + @AfterAll + public static void closeCluster() { + CLUSTER.stop(); + } + + private Properties streamsConfig; + + @BeforeEach + public void before(final TestInfo testInfo) { + streamsConfig = new Properties(); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "init-test-" + safeUniqueTestName(testInfo)); + streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } + + @AfterEach + public void after() throws IOException { + IntegrationTestUtils.purgeLocalStreamsState(streamsConfig); + } + + @Test + public void shouldInitAndStartInManualMode() throws Exception { + streamsConfig.put(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG, StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream(INPUT_TOPIC).groupByKey().count(); + + try (final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig)) { + streams.init(); + + // Verify internal topics were created on the broker + try (final Admin admin = Admin.create(adminConfig())) { + final Set topics = admin.listTopics().names().get(); + final String appId = streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); + assertTrue(topics.stream().anyMatch(t -> t.contains(appId) && t.contains("changelog")), + "Expected changelog topic to exist after init(), found: " + topics); + } + + // App should reach RUNNING after init + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + } + } + + @Test + public void shouldThrowWhenInternalTopicDeletedBetweenRestartsInManualMode() throws Exception { + streamsConfig.put(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG, StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream(INPUT_TOPIC).groupByKey().count(); + + // First run: init + start + stop normally + try (final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig)) { + streams.init(); + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + } + + // Delete a changelog topic while the app is stopped + final String appId = streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); + try (final Admin admin = Admin.create(adminConfig())) { + final Set topics = admin.listTopics().names().get(); + final String changelogTopic = topics.stream() + .filter(t -> t.contains(appId) && t.contains("changelog")) + .findFirst() + .orElseThrow(() -> new AssertionError("No changelog topic found in: " + topics)); + + admin.deleteTopics(Set.of(changelogTopic)).all().get(); + } + + // Second run: start without init() — rebalance calls makeReady() for changelog + // topics, which tries to recreate the missing topic. Guard throws in manual mode. + final AtomicReference caughtException = new AtomicReference<>(); + + try (final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig)) { + streams.setUncaughtExceptionHandler(exception -> { + caughtException.set(exception); + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + }); + + streams.start(); + + TestUtils.waitForCondition( + () -> streams.state() == State.ERROR || streams.state() == State.NOT_RUNNING, + 60000, + () -> "Streams did not reach ERROR state. State: " + streams.state() + + ", exception: " + caughtException.get() + ); + + assertTrue(caughtException.get() instanceof MissingInternalTopicsException, + "Expected MissingInternalTopicsException but got: " + caughtException.get()); + } + } + + @Test + public void shouldAutoCreateInternalTopicsWithoutInit() throws Exception { + // Default config is automatic mode — no init() call needed + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream(INPUT_TOPIC).groupByKey().count(); + + try (final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig)) { + IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); + + // Verify internal topics were auto-created during rebalance + try (final Admin admin = Admin.create(adminConfig())) { + final Set topics = admin.listTopics().names().get(); + final String appId = streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); + assertTrue(topics.stream().anyMatch(t -> t.contains(appId) && t.contains("changelog")), + "Expected changelog topic to be auto-created, found: " + topics); + } + } + } + + private Properties adminConfig() { + final Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + return props; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 7adb5ac73c4c2..eaa2035e70c40 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -40,8 +40,12 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.InternalTopicsAlreadySetupException; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.InvalidStateStorePartitionException; +import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException; +import org.apache.kafka.streams.errors.MissingInternalTopicsException; +import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsNotStartedException; @@ -60,6 +64,10 @@ import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; +import org.apache.kafka.streams.processor.internals.InternalTopicConfig; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; +import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; @@ -297,6 +305,132 @@ public boolean isValidTransition(final State newState) { private final Object stateLock = new Object(); protected volatile State state = State.CREATED; + /** + * Initializes broker-side state for this Kafka Streams application. + *

+ * Kafka Streams creates internal topics on the broker for fault-tolerance and repartitioning. + * This method validates and optionally creates those internal topics before starting the application. + * + * @throws MissingSourceTopicException if a source topic is missing + * @throws MissingInternalTopicsException if some but not all of the internal topics are missing + * @throws MisconfiguredInternalTopicException if an internal topic is misconfigured + * @throws InternalTopicsAlreadySetupException if all internal topics are already setup + */ + public void init() { + this.doInit(new InitParameters()); + } + + /** + * Initializes broker-side state for this Kafka Streams application. + *

+ * Kafka Streams creates internal topics on the broker for fault-tolerance and repartitioning. + * This method validates and optionally creates those internal topics before starting the application. + * + * @param initParameters parameters controlling initialization behavior + * @throws MissingSourceTopicException if a source topic is missing + * @throws MissingInternalTopicsException if some but not all of the internal topics are missing + * and the parameters do not specify to set them up + * @throws MisconfiguredInternalTopicException if an internal topic is misconfigured + * @throws InternalTopicsAlreadySetupException if all internal topics are already setup + * @throws TimeoutException if initialization exceeds the configured timeout + */ + public void init(final InitParameters initParameters) { + this.doInit(initParameters); + } + + private void doInit(final InitParameters initParameters) { + synchronized (stateLock) { + if (state != State.CREATED) { + throw new IllegalStateException("Can only call init() in CREATED state. Current state is: " + state); + } + } + + final InternalTopicManager internalTopicManager = new InternalTopicManager(time, adminClient, applicationConfigs); + initParameters.timeout.ifPresent(internalTopicManager::setInitTimeout); + + final Map allInternalTopics = new HashMap<>(); + final Set allSourceTopics = new HashSet<>(); + + for (final Map subtopologyMap : topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) { + for (final InternalTopologyBuilder.TopicsInfo topicsInfo : subtopologyMap.values()) { + allInternalTopics.putAll(topicsInfo.stateChangelogTopics); + allInternalTopics.putAll(topicsInfo.repartitionSourceTopics); + allSourceTopics.addAll(topicsInfo.sourceTopics); + } + } + + final ValidationResult validationResult = internalTopicManager.validate(allInternalTopics); + + final boolean noInternalTopicsExist = allInternalTopics.keySet().equals(validationResult.missingTopics()); + final boolean internalTopicsMisconfigured = !validationResult.misconfigurationsForTopics().isEmpty(); + final boolean allInternalTopicsExist = validationResult.missingTopics().isEmpty(); + final boolean missingSourceTopics = !Collections.disjoint(validationResult.missingTopics(), allSourceTopics); + + if (internalTopicsMisconfigured) { + throw new MisconfiguredInternalTopicException("Misconfigured Internal Topics: " + validationResult.misconfigurationsForTopics()); + } + + if (missingSourceTopics) { + allSourceTopics.retainAll(validationResult.missingTopics()); + throw new MissingSourceTopicException("Missing source topics: " + allSourceTopics); + } + + if (noInternalTopicsExist) { + internalTopicManager.setup(allInternalTopics); + return; + } + + if (allInternalTopicsExist) { + throw new InternalTopicsAlreadySetupException("All internal topics have already been setup"); + } else { + if (initParameters.setupInternalTopicsIfIncompleteEnabled()) { + final Map topicsToCreate = new HashMap<>(); + for (final String missingTopic : validationResult.missingTopics()) { + topicsToCreate.put(missingTopic, allInternalTopics.get(missingTopic)); + } + internalTopicManager.makeReady(topicsToCreate, true); + } else { + throw new MissingInternalTopicsException("Missing Internal Topics: ", new ArrayList<>(validationResult.missingTopics())); + } + } + } + + public static class InitParameters { + private boolean setupInternalTopicsIfIncomplete; + protected Optional timeout = Optional.empty(); + + private InitParameters() { + this.setupInternalTopicsIfIncomplete = false; + } + + public static InitParameters initParameters() { + return new InitParameters(); + } + + public static InitParameters timeout(final Duration timeout) { + return new InitParameters().withTimeout(timeout); + } + + public InitParameters withTimeout(final Duration timeout) { + this.timeout = Optional.ofNullable(timeout); + return this; + } + + public InitParameters enableSetupInternalTopicsIfIncomplete() { + this.setupInternalTopicsIfIncomplete = true; + return this; + } + + public InitParameters disableSetupInternalTopicsIfIncomplete() { + this.setupInternalTopicsIfIncomplete = false; + return this; + } + + public boolean setupInternalTopicsIfIncompleteEnabled() { + return setupInternalTopicsIfIncomplete; + } + } + private boolean waitOnStates(final long waitMs, final State... targetStates) { final Set targetStateSet = Set.of(targetStates); final long begin = time.milliseconds(); diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index d28818e47e34c..0509001f62301 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -878,6 +878,14 @@ public class StreamsConfig extends AbstractConfig { "Whether to use the configured " + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " during global store/KTable processing. " + "Disabled by default. This config will be removed in Kafka Streams 5.0, where global exception handling will be enabled by default"; + public static final String INTERNAL_TOPIC_SETUP_CONFIG = "internal.topics.setup"; + public static final String INTERNAL_TOPIC_SETUP_AUTOMATIC = "automatic"; + public static final String INTERNAL_TOPIC_SETUP_MANUAL = "manual"; + private static final String INTERNAL_TOPIC_SETUP_DOC = + "Configures how internal topics (e.g., repartition or changelog topics) should be created. " + + "Set to 'automatic' to allow internal topics to be created during a rebalance (default). " + + "Set to 'manual' to disable automatic creation. Users must call KafkaStreams#init() instead."; + static { CONFIG = new ConfigDef() @@ -1017,6 +1025,12 @@ public class StreamsConfig extends AbstractConfig { LogAndFailProcessingExceptionHandler.class.getName(), Importance.MEDIUM, PROCESSING_EXCEPTION_HANDLER_CLASS_DOC) + .define(INTERNAL_TOPIC_SETUP_CONFIG, + ConfigDef.Type.STRING, + INTERNAL_TOPIC_SETUP_AUTOMATIC, + ConfigDef.ValidString.in(INTERNAL_TOPIC_SETUP_AUTOMATIC, INTERNAL_TOPIC_SETUP_MANUAL), + Importance.MEDIUM, + INTERNAL_TOPIC_SETUP_DOC) .define(PROCESSING_GUARANTEE_CONFIG, Type.STRING, AT_LEAST_ONCE, @@ -1887,6 +1901,7 @@ public Map getMainConsumerConfigs(final String groupId, final St consumerProps.put(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG)); consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); consumerProps.put(TASK_ASSIGNOR_CLASS_CONFIG, getString(TASK_ASSIGNOR_CLASS_CONFIG)); + consumerProps.put(INTERNAL_TOPIC_SETUP_CONFIG, getString(INTERNAL_TOPIC_SETUP_CONFIG)); // verify that producer batch config is no larger than segment size, then add topic configs required for creating topics final Map topicProps = originalsWithPrefix(TOPIC_PREFIX, false); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 554a646b48461..60976af65a82c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -42,11 +42,13 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.MissingInternalTopicsException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.internals.ClientUtils.QuietConsumerConfig; import org.slf4j.Logger; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -74,11 +76,13 @@ public class InternalTopicManager { private final Time time; private final Admin adminClient; + private final boolean isManualInternalTopicConfig; private final short replicationFactor; private final long windowChangeLogAdditionalRetention; private final long retryBackOffMs; private final long retryTimeoutMs; + private Duration initTimeout; private final Map defaultTopicConfigs = new HashMap<>(); @@ -87,6 +91,8 @@ public InternalTopicManager(final Time time, final StreamsConfig streamsConfig) { this.time = time; this.adminClient = adminClient; + this.isManualInternalTopicConfig = StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL + .equals(streamsConfig.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG)); final LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())); log = logContext.logger(getClass()); @@ -464,12 +470,17 @@ public Map> getTopicPartitionInfo(final Set makeReady(final Map topics) { + return makeReady(topics, false); + } + + public Set makeReady(final Map topics, final boolean isInitializing) { // we will do the validation / topic-creation in a loop, until we have confirmed all topics // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); final long currentWallClockMs = time.milliseconds(); final long deadlineMs = currentWallClockMs + retryTimeoutMs; + final long initDeadlineMs = initTimeout != null ? currentWallClockMs + initTimeout.toMillis() : Long.MAX_VALUE; final Set topicsNotReady = new HashSet<>(topics.keySet()); final Set newlyCreatedTopics = new HashSet<>(); @@ -492,6 +503,7 @@ public Set makeReady(final Map topics) { break; } if (!topicsToCreate.isEmpty()) { + throwIfManualSetupEnabledAndCalledWithoutInit(topicsNotReady, isInitializing); final Set createdTopics = createTopics(topicsToCreate, topicsNotReady, deadlineMs); topicsNotReady.removeAll(createdTopics); newlyCreatedTopics.addAll(createdTopics); @@ -620,6 +632,13 @@ private Set createTopics(final Set topicsToCreate, } + private void throwIfManualSetupEnabledAndCalledWithoutInit(final Set topicsNotReady, final boolean isInitializing) { + if (isManualInternalTopicConfig && !isInitializing) { + throw new MissingInternalTopicsException("Internal topic configuration set to MANUAL. \n" + + "You must call init() to setup internal topics.", new ArrayList<>(topicsNotReady)); + } + } + /** * Try to get the partition information for the given topics; return the partition info for topics that already exists. * @@ -671,6 +690,10 @@ protected Map> getTopicPartitionInfo(final Set< return topicPartitionInfo; } + public void setInitTimeout(final Duration timeoutMs) { + initTimeout = timeoutMs; + } + /** * Try to get the number of partitions for the given topics; return the number of partitions for topics that already exists. * diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 857b53910f274..a48d46c2e90a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -35,7 +35,12 @@ import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KafkaStreams.InitParameters; import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.errors.InternalTopicsAlreadySetupException; +import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException; +import org.apache.kafka.streams.errors.MissingInternalTopicsException; +import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsNotStartedException; import org.apache.kafka.streams.errors.TopologyException; @@ -49,6 +54,8 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; +import org.apache.kafka.streams.processor.internals.InternalTopicConfig; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; @@ -83,6 +90,7 @@ import java.time.Duration; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -424,6 +432,196 @@ public void shouldInitializeTasksForLocalStateOnStart() { } } + @Test + public void initShouldThrowMisconfiguredExceptionWhenInternalTopicsAreMisconfigured() { + prepareStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); + + final InitParameters initParams = InitParameters.initParameters(); + + try (final MockedConstruction mocked = mockConstruction(InternalTopicManager.class, + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + + when(result.misconfigurationsForTopics()).thenReturn(Map.of("topicA", List.of("bad config"))); + when(result.missingTopics()).thenReturn(Set.of("topicA")); + + when(mock.validate(any())).thenReturn(result); + })) { + + final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); + + final MisconfiguredInternalTopicException exception = assertThrows( + MisconfiguredInternalTopicException.class, + () -> streams.init(initParams) + ); + + assertTrue(exception.getMessage().contains("topicA")); + } + } + + @Test + public void initShouldThrowMissingSourceTopicExceptionWhenSourceTopicMissing() { + prepareStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); + + final InitParameters initParams = InitParameters.initParameters(); + + try (final MockedConstruction mocked = mockConstruction(InternalTopicManager.class, + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + when(result.missingTopics()).thenReturn(Set.of("source-topic")); + + when(mock.validate(any())).thenReturn(result); + })) { + + final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); + + final MissingSourceTopicException exception = assertThrows(MissingSourceTopicException.class, + () -> streams.init(initParams) + ); + + assertTrue(exception.getMessage().contains("source-topic")); + } + } + + @Test + public void initShouldThrowInternalTopicsAlreadySetupExceptionIfAllExist() { + prepareStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source-topic").groupByKey().count(); + final Topology topology = builder.build(); + + final InitParameters initParams = InitParameters.initParameters(); + + try (final MockedConstruction mocked = mockConstruction(InternalTopicManager.class, + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + when(result.missingTopics()).thenReturn(Collections.emptySet()); + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + + when(mock.validate(any())).thenReturn(result); + })) { + + final KafkaStreams streams = new KafkaStreams(topology, props, supplier, time); + + final StreamsException exception = assertThrows(InternalTopicsAlreadySetupException.class, + () -> streams.init(initParams)); + + assertTrue(exception.getMessage().contains("All internal topics have already been setup")); + } + } + + @Test + public void initShouldThrowMissingInternalTopicsExceptionWhenDisabled() { + prepareStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source-topic").groupByKey().count(); + final Topology topology = builder.build(); + + final InitParameters initParams = InitParameters.initParameters(); + + try (MockedConstruction mocked = mockConstruction(InternalTopicManager.class, + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + when(result.missingTopics()).thenReturn(Set.of("some-missing-topic")); + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + when(mock.validate(any())).thenReturn(result); + })) { + + final KafkaStreams streams = new KafkaStreams(topology, props, supplier, time); + + assertThrows(MissingInternalTopicsException.class, () -> streams.init(initParams)); + + final InternalTopicManager internalTopicManager = mocked.constructed().get(0); + verify(internalTopicManager, never()).setup(any()); + verify(internalTopicManager, never()).makeReady(any()); + } + } + + @Test + public void initShouldMakeReadyInternalTopicsWhenAutoSetupEnabled() { + prepareStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source-topic").groupByKey().count(); + final Topology topology = builder.build(); + + final InitParameters initParams = InitParameters.initParameters().enableSetupInternalTopicsIfIncomplete(); + + try (MockedConstruction mocked = mockConstruction(InternalTopicManager.class, + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + + when(result.missingTopics()).thenReturn(Set.of("topicA")); + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + when(mock.validate(any())).thenReturn(result); + })) { + + final KafkaStreams streams = new KafkaStreams(topology, props, supplier, time); + + streams.init(initParams); + + final InternalTopicManager internalTopicManager = mocked.constructed().get(0); + verify(internalTopicManager).makeReady(any(), eq(true)); + verify(internalTopicManager, never()).setup(any()); + } + } + + @Test + public void initShouldThrowIfNotInCreatedState() throws Exception { + prepareStreams(); + final AtomicReference state1 = prepareStreamThread(streamThreadOne, 1); + final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); + prepareThreadState(streamThreadOne, state1); + prepareThreadState(streamThreadTwo, state2); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + streams.start(); + waitForCondition( + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); + + assertThrows(IllegalStateException.class, () -> streams.init()); + } + } + + @Test + public void initShouldCreateAllTopicsWhenNoneExist() { + prepareStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source-topic").groupByKey().count(); + final Topology topology = builder.build(); + + try (MockedConstruction mocked = mockConstruction(InternalTopicManager.class, + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + when(mock.validate(any())).thenAnswer(invocation -> { + final Map topics = invocation.getArgument(0); + when(result.missingTopics()).thenReturn(new HashSet<>(topics.keySet())); + return result; + }); + })) { + + final KafkaStreams streams = new KafkaStreams(topology, props, supplier, time); + + streams.init(); + + final InternalTopicManager internalTopicManager = mocked.constructed().get(0); + verify(internalTopicManager).setup(any()); + } + } + @Test public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 34db1ccc25302..0bbfe44bc6786 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -557,6 +557,25 @@ public void shouldOverrideStreamsDefaultProducerConfigs() { assertEquals("30000", producerConfigs.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)); } + @Test + public void shouldParseInternalTopicSetupConfig() { + props.put(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG, StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); + final StreamsConfig config = new StreamsConfig(props); + assertEquals(StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL, config.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG)); + } + + @Test + public void shouldThrowConfigExceptionForInvalidInternalTopicSetupValue() { + props.put(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG, "invalid_value"); + assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + } + + @Test + public void shouldDefaultInternalTopicSetupToAutomatic() { + final StreamsConfig config = new StreamsConfig(props); + assertEquals(StreamsConfig.INTERNAL_TOPIC_SETUP_AUTOMATIC, config.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG)); + } + @Test public void shouldNotThrowIfTransactionTimeoutSmallerThanCommitIntervalForAtLeastOnce() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 4cece3fd7c7c9..09c6a81ac10ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -50,6 +50,7 @@ import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.MissingInternalTopicsException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult; @@ -1860,6 +1861,52 @@ private InternalTopicConfig setupVersionedChangelogTopicConfig(final String topi return internalTopicConfig; } + @Test + public void makeReadyShouldThrowWhenManualModeAndNotInitializing() { + final Map manualConfig = new HashMap<>(config); + manualConfig.put(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG, StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); + final InternalTopicManager manualTopicManager = new InternalTopicManager( + time, + mockAdminClient, + new StreamsConfig(manualConfig) + ); + + final InternalTopicConfig topicConfig = setupRepartitionTopicConfig(topic1, 1); + + assertThrows(MissingInternalTopicsException.class, + () -> manualTopicManager.makeReady(mkMap(mkEntry(topic1, topicConfig)))); + } + + @Test + public void makeReadyShouldCreateWhenManualModeAndInitializing() throws Exception { + final Map manualConfig = new HashMap<>(config); + manualConfig.put(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG, StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); + final InternalTopicManager manualTopicManager = new InternalTopicManager( + time, + mockAdminClient, + new StreamsConfig(manualConfig) + ); + + final InternalTopicConfig topicConfig = setupRepartitionTopicConfig(topic1, 1); + + // isInitializing=true should allow topic creation even in manual mode + manualTopicManager.makeReady(mkMap(mkEntry(topic1, topicConfig)), true); + + final Set createdTopics = mockAdminClient.listTopics().names().get(); + assertThat(createdTopics, hasItem(topic1)); + } + + @Test + public void makeReadyShouldCreateWhenAutomaticMode() throws Exception { + // Default config is automatic — makeReady should create topics without isInitializing + final InternalTopicConfig topicConfig = setupRepartitionTopicConfig(topic1, 1); + + internalTopicManager.makeReady(mkMap(mkEntry(topic1, topicConfig))); + + final Set createdTopics = mockAdminClient.listTopics().names().get(); + assertThat(createdTopics, hasItem(topic1)); + } + private InternalTopicConfig setupRepartitionTopicConfig(final String topicName, final int partitionCount) { final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topicName, Collections.emptyMap());