Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,23 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class StreamsBrokerDownResilienceTest {

private static final int KEY = 0;
private static final int VALUE = 1;

private static final String SOURCE_TOPIC_1 = "streamsResilienceSource";

private static final String SINK_TOPIC = "streamsResilienceSink";

public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsBrokerDownResilienceTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
if (args.length < 1) {
System.err.println("StreamsBrokerDownResilienceTest is expecting one parameter: propFile; but only see " + args.length + " parameter");
Exit.exit(1);
}

System.out.println("StreamsTest instance started");

final String propFileName = args[0];
final String additionalConfigs = args[1];

final Properties streamsProperties = Utils.loadProps(propFileName);
final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
Expand All @@ -70,15 +64,6 @@ public static void main(final String[] args) throws IOException {
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);


// it is expected that max.poll.interval, retries, request.timeout and max.block.ms set
// streams_broker_down_resilience_test and passed as args
if (additionalConfigs != null && !additionalConfigs.equalsIgnoreCase("none")) {
final Map<String, String> updated = updatedConfigs(additionalConfigs);
System.out.println("Updating configs with " + updated);
streamsProperties.putAll(updated);
}

if (!confirmCorrectConfigs(streamsProperties)) {
System.err.printf("ERROR: Did not have all required configs expected to contain %s %s %s %s%n",
StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
Expand Down Expand Up @@ -130,21 +115,4 @@ private static boolean confirmCorrectConfigs(final Properties properties) {
properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG));
}

/**
* Takes a string with keys and values separated by '=' and each key value pair
* separated by ',' for example max.block.ms=5000,retries=6,request.timeout.ms=6000
*
* @param formattedConfigs the formatted config string
* @return HashMap with keys and values inserted
*/
private static Map<String, String> updatedConfigs(final String formattedConfigs) {
final String[] parts = formattedConfigs.split(",");
final Map<String, String> updatedConfigs = new HashMap<>();
for (final String part : parts) {
final String[] keyValue = part.split("=");
updatedConfigs.put(keyValue[KEY], keyValue[VALUE]);
}
return updatedConfigs;
}

}
29 changes: 25 additions & 4 deletions tests/kafkatest/services/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,33 @@ def prop_file(self):


class StreamsBrokerDownResilienceService(StreamsTestBaseService):
def __init__(self, test_context, kafka, configs):
def __init__(self, test_context, kafka, group_protocol="classic", extra_configs=None):
super(StreamsBrokerDownResilienceService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsBrokerDownResilienceTest",
configs)
"")
self.GROUP_PROTOCOL = group_protocol
self.EXTRA_CONFIGS = extra_configs or {}

def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.GROUP_PROTOCOL: self.GROUP_PROTOCOL,
# Required configs for broker down resilience
# Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout)
"consumer.max.poll.interval.ms": 50000,
"producer.retries": 2,
"producer.request.timeout.ms": 15000,
"producer.max.block.ms": 30000,
"acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment
"session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735
}

# Merge any extra configs
properties.update(self.EXTRA_CONFIGS)

cfg = KafkaConfig(**properties)
return cfg.render()

def start_cmd(self, node):
args = self.args.copy()
Expand All @@ -458,8 +480,7 @@ def start_cmd(self, node):

cmd = "( export KAFKA_LOG4J_OPTS=\"%(log4j_param)s%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \
" %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
" %(config_file)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args

self.logger.info("Executing: " + cmd)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ def test_streams_resilient_to_broker_down(self, metadata_quorum, group_protocol)
# So with (2 * 15000) = 30 seconds, we'll set downtime to 70 seconds
broker_down_time_in_seconds = 70

processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, self.get_configs(
group_protocol=group_protocol))
processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol)
processor.start()

self.assert_produce_consume(self.inputTopic,
Expand Down Expand Up @@ -91,16 +90,16 @@ def test_streams_runs_with_broker_down_initially(self, metadata_quorum, group_pr
node = self.kafka.leader(self.inputTopic)
self.kafka.stop_node(node)

configs = self.get_configs(group_protocol=group_protocol, extra_configs=",application.id=starting_wo_broker_id")
extra_configs = {"application.id": "starting_wo_broker_id"}

# start streams with broker down initially
processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs)
processor.start()

processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs)
processor_2.start()

processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs)
processor_3.start()

broker_unavailable_message = "Node may not be available"
Expand Down Expand Up @@ -159,24 +158,19 @@ def test_streams_runs_with_broker_down_initially(self, metadata_quorum, group_pr
group_protocol=["classic", "streams"])
def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, group_protocol):
self.kafka.start()
extra_configs = ",application.id=shutdown_with_broker_down"
extra_configs = {"application.id": "shutdown_with_broker_down"}

# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
if group_protocol == "classic":
extra_configs += ",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
extra_configs["internal.task.assignor.class"] = "org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"

configs = self.get_configs(
group_protocol=group_protocol,
extra_configs=extra_configs
)

processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs)
processor.start()

processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs)
processor_2.start()

processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs)

# need to wait for rebalance once
rebalance = "State transition from REBALANCING to RUNNING"
Expand Down Expand Up @@ -245,24 +239,19 @@ def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, group
group_protocol=["classic", "streams"])
def test_streams_should_failover_while_brokers_down(self, metadata_quorum, group_protocol):
self.kafka.start()
extra_configs = ",application.id=shutdown_with_broker_down"
extra_configs = {"application.id": "shutdown_with_broker_down"}

# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
if group_protocol == "classic":
extra_configs += ",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"

configs = self.get_configs(
group_protocol=group_protocol,
extra_configs=extra_configs
)
extra_configs["internal.task.assignor.class"] = "org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"

processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs)
processor.start()

processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs)
processor_2.start()

processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, group_protocol=group_protocol, extra_configs=extra_configs)

# need to wait for rebalance once
rebalance = "State transition from REBALANCING to RUNNING"
Expand Down
Loading