diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java index 77224e2394759..d7d5c4a0246b1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java @@ -33,29 +33,23 @@ import java.io.IOException; import java.time.Duration; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.Properties; public class StreamsBrokerDownResilienceTest { - private static final int KEY = 0; - private static final int VALUE = 1; - private static final String SOURCE_TOPIC_1 = "streamsResilienceSource"; private static final String SINK_TOPIC = "streamsResilienceSink"; public static void main(final String[] args) throws IOException { - if (args.length < 2) { - System.err.println("StreamsBrokerDownResilienceTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter"); + if (args.length < 1) { + System.err.println("StreamsBrokerDownResilienceTest is expecting one parameter: propFile; but only see " + args.length + " parameter"); Exit.exit(1); } System.out.println("StreamsTest instance started"); final String propFileName = args[0]; - final String additionalConfigs = args[1]; final Properties streamsProperties = Utils.loadProps(propFileName); final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); @@ -70,15 +64,6 @@ public static void main(final String[] args) throws IOException { streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); - - // it is expected that max.poll.interval, retries, request.timeout and max.block.ms set - // streams_broker_down_resilience_test and passed as args - if (additionalConfigs != null && !additionalConfigs.equalsIgnoreCase("none")) { - final Map updated = updatedConfigs(additionalConfigs); - System.out.println("Updating configs with " + updated); - streamsProperties.putAll(updated); - } - if (!confirmCorrectConfigs(streamsProperties)) { System.err.printf("ERROR: Did not have all required configs expected to contain %s %s %s %s%n", StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), @@ -130,21 +115,4 @@ private static boolean confirmCorrectConfigs(final Properties properties) { properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG)); } - /** - * Takes a string with keys and values separated by '=' and each key value pair - * separated by ',' for example max.block.ms=5000,retries=6,request.timeout.ms=6000 - * - * @param formattedConfigs the formatted config string - * @return HashMap with keys and values inserted - */ - private static Map updatedConfigs(final String formattedConfigs) { - final String[] parts = formattedConfigs.split(","); - final Map updatedConfigs = new HashMap<>(); - for (final String part : parts) { - final String[] keyValue = part.split("="); - updatedConfigs.put(keyValue[KEY], keyValue[VALUE]); - } - return updatedConfigs; - } - } diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 4b49017fc42db..754bfff66ebfe 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -440,11 +440,33 @@ def prop_file(self): class StreamsBrokerDownResilienceService(StreamsTestBaseService): - def __init__(self, test_context, kafka, configs): + def __init__(self, test_context, kafka, group_protocol="classic", extra_configs=None): super(StreamsBrokerDownResilienceService, self).__init__(test_context, kafka, "org.apache.kafka.streams.tests.StreamsBrokerDownResilienceTest", - configs) + "") + self.GROUP_PROTOCOL = group_protocol + self.EXTRA_CONFIGS = extra_configs or {} + + def prop_file(self): + properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + streams_property.GROUP_PROTOCOL: self.GROUP_PROTOCOL, + # Required configs for broker down resilience + # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout) + "consumer.max.poll.interval.ms": 50000, + "producer.retries": 2, + "producer.request.timeout.ms": 15000, + "producer.max.block.ms": 30000, + "acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment + "session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735 + } + + # Merge any extra configs + properties.update(self.EXTRA_CONFIGS) + + cfg = KafkaConfig(**properties) + return cfg.render() def start_cmd(self, node): args = self.args.copy() @@ -458,8 +480,7 @@ def start_cmd(self, node): cmd = "( export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j)s\"; " \ "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \ - " %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \ - " %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + " %(config_file)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args self.logger.info("Executing: " + cmd) diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py index 94df6e3747338..7a2a9adf998d0 100644 --- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py @@ -54,8 +54,7 @@ def test_streams_resilient_to_broker_down(self, metadata_quorum, group_protocol) # So with (2 * 15000) = 30 seconds, we'll set downtime to 70 seconds broker_down_time_in_seconds = 70 - processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, self.get_configs( - group_protocol=group_protocol)) + processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol) processor.start() self.assert_produce_consume(self.inputTopic, @@ -91,16 +90,16 @@ def test_streams_runs_with_broker_down_initially(self, metadata_quorum, group_pr node = self.kafka.leader(self.inputTopic) self.kafka.stop_node(node) - configs = self.get_configs(group_protocol=group_protocol, extra_configs=",application.id=starting_wo_broker_id") + extra_configs = {"application.id": "starting_wo_broker_id"} # start streams with broker down initially - processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs) processor.start() - processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs) processor_2.start() - processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs) processor_3.start() broker_unavailable_message = "Node may not be available" @@ -159,24 +158,19 @@ def test_streams_runs_with_broker_down_initially(self, metadata_quorum, group_pr group_protocol=["classic", "streams"]) def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, group_protocol): self.kafka.start() - extra_configs = ",application.id=shutdown_with_broker_down" + extra_configs = {"application.id": "shutdown_with_broker_down"} # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor if group_protocol == "classic": - extra_configs += ",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" + extra_configs["internal.task.assignor.class"] = "org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" - configs = self.get_configs( - group_protocol=group_protocol, - extra_configs=extra_configs - ) - - processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs) processor.start() - processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs) processor_2.start() - processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs) # need to wait for rebalance once rebalance = "State transition from REBALANCING to RUNNING" @@ -245,24 +239,19 @@ def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, group group_protocol=["classic", "streams"]) def test_streams_should_failover_while_brokers_down(self, metadata_quorum, group_protocol): self.kafka.start() - extra_configs = ",application.id=shutdown_with_broker_down" + extra_configs = {"application.id": "shutdown_with_broker_down"} # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor if group_protocol == "classic": - extra_configs += ",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" - - configs = self.get_configs( - group_protocol=group_protocol, - extra_configs=extra_configs - ) + extra_configs["internal.task.assignor.class"] = "org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" - processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs) processor.start() - processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs) processor_2.start() - processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs) # need to wait for rebalance once rebalance = "State transition from REBALANCING to RUNNING"