From 9c6017a39fc3fc4cd203ca01ee3a0468a4bb6f01 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 14 Jan 2026 15:18:14 +0300 Subject: [PATCH 01/26] raw --- modules/ducktests/tests/docker/run_tests.sh | 2 +- .../tests/ignitetest/tests/flex/__init__.py | 70 +++++++++++++++ .../tests/ignitetest/tests/flex/flex_test.py | 87 +++++++++++++++++++ 3 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 modules/ducktests/tests/ignitetest/tests/flex/__init__.py create mode 100644 modules/ducktests/tests/ignitetest/tests/flex/flex_test.py diff --git a/modules/ducktests/tests/docker/run_tests.sh b/modules/ducktests/tests/docker/run_tests.sh index 697d1e8d33c95..a73a11b5068c7 100755 --- a/modules/ducktests/tests/docker/run_tests.sh +++ b/modules/ducktests/tests/docker/run_tests.sh @@ -31,7 +31,7 @@ IMAGE_PREFIX="ducker-ignite-eclipse-temurin" # DuckerTest parameters are specified with options to the script # Path to ducktests -TC_PATHS="./ignitetest/" +TC_PATHS="./ignitetest/tests/flex" # Global parameters to pass to ducktape util with --global param GLOBALS="{}" # Ducktests parameters to pass to ducktape util with --parameters param diff --git a/modules/ducktests/tests/ignitetest/tests/flex/__init__.py b/modules/ducktests/tests/ignitetest/tests/flex/__init__.py new file mode 100644 index 0000000000000..dda120fcacc42 --- /dev/null +++ b/modules/ducktests/tests/ignitetest/tests/flex/__init__.py @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import hashlib +import os + +from ducktape.tests.test import TestContext + + +def decorate_args(args, with_args=False): + """ + Decorate args with sha1 hash. + """ + prefix = '' + if args: + sha_1 = hashlib.sha1() + sha_1.update(args.encode('utf-8')) + + digest = sha_1.hexdigest()[0:6] + prefix = digest + '@' + + return prefix + args if with_args else prefix + + +def patched_test_name(self): + """ + Monkey patched test_name property function. + """ + name_components = [self.module_name, + self.cls_name, + self.function_name, + self.injected_args_name] + + name = ".".join(filter(lambda x: x is not None and len(x) > 0, name_components)) + return decorate_args(self.injected_args_name) + name + + +def patched_results_dir(test_context, test_index): + """ + Monkey patch results_dir. + """ + results_dir = test_context.session_context.results_dir + + if test_context.cls is not None: + results_dir = os.path.join(results_dir, test_context.cls.__name__) + if test_context.function is not None: + results_dir = os.path.join(results_dir, test_context.function.__name__) + if test_context.injected_args is not None: + results_dir = os.path.join(results_dir, decorate_args(test_context.injected_args_name, True)) + if test_index is not None: + results_dir = os.path.join(results_dir, str(test_index)) + + return results_dir + + +TestContext.test_name = property(patched_test_name) +TestContext.results_dir = staticmethod(patched_results_dir) diff --git a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py new file mode 100644 index 0000000000000..5d323ed1337f5 --- /dev/null +++ b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module contains discovery tests. +""" + +import os +import random +from ducktape.mark import matrix +from enum import IntEnum +from time import monotonic +from typing import NamedTuple + +from ignitetest.services.ignite import IgniteService +from ignitetest.services.utils.ignite_aware import node_failed_event_pattern +from ignitetest.services.utils.ignite_configuration import IgniteConfiguration +from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration +from ignitetest.utils import cluster +from ignitetest.utils.ignite_test import IgniteTest +from ignitetest.utils.version import LATEST_2_17, IgniteVersion + + +class FlexTest(IgniteTest): + SERVERS = 3 + SERVER_IDX_TO_DROP = 1 + IGNITE_VERSION = LATEST_2_17 + FAILURE_DETECTION_TIMEOUT_SEC = 7 + + @cluster(num_nodes=SERVERS + 1) + def flex_test(self): + ignite_config = IgniteConfiguration( + version=self.IGNITE_VERSION, + metrics_log_frequency = 0, + failure_detection_timeout=self.FAILURE_DETECTION_TIMEOUT_SEC * 1000, + caches=[CacheConfiguration( + name='test-cache', + atomicity_mode='ATOMIC' + )] + ) + + servers, start_servers_sec = start_servers(self.test_context, self.SERVERS, ignite_config) + + servers.await_event(f"servers={self.SERVERS}, clients=0, state=ACTIVE, CPUs=", 30, from_the_beginning=True, nodes=servers.nodes) + + failedNode = servers.nodes[self.SERVER_IDX_TO_DROP] + failedNodeId = servers.node_id(failedNode) + + self.logger.info(f"'kill -9' node {self.SERVER_IDX_TO_DROP} with id {failedNodeId} ...") + + servers.stop_node(failedNode, force_stop=True) + + self.logger.info("Awaiting for the node-failed-event ...") + servers.await_event(node_failed_event_pattern(failedNodeId), self.FAILURE_DETECTION_TIMEOUT_SEC * 3, + from_the_beginning=True, nodes=[servers.nodes[0], servers.nodes[2]]) + + self.logger.info("Awaiting for the new cluster state ...") + servers.await_event(f"servers={self.SERVERS - 1}, clients=0, state=ACTIVE, CPUs=", self.FAILURE_DETECTION_TIMEOUT_SEC * 3, + from_the_beginning=True, nodes=[servers.nodes[0], servers.nodes[2]]) + + self.logger.info("The cluster has detected the node failure.") + + servers.stop() + +def start_servers(test_context, num_nodes, ignite_config, modules=None): + """ + Start ignite servers. + """ + servers = IgniteService(test_context, config=ignite_config, num_nodes=num_nodes, modules=modules, + # mute spam in log. + jvm_opts=["-DIGNITE_DUMP_THREADS_ON_FAILURE=false"]) + + start = monotonic() + servers.start_async() + return servers, round(monotonic() - start, 1) \ No newline at end of file From 8bbbed3b2a2d6bf6e58720377cff42f641ab5517 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 14 Jan 2026 17:01:40 +0300 Subject: [PATCH 02/26] + cacheCfg --- .../utils/ignite_configuration/cache.py | 2 +- .../services/utils/templates/cache_macro.j2 | 22 +++++++++++++++++++ .../tests/ignitetest/tests/flex/flex_test.py | 12 +++++++--- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py index 0dd0d39f8afbf..5645793458877 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py +++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py @@ -21,7 +21,7 @@ from ignitetest.utils.bean import Bean -DFLT_PARTS_CNT: int = 1024 +DFLT_PARTS_CNT: int = 512 class CacheConfiguration(NamedTuple): diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 index 5eba40344fb3a..d88c9af0f67b6 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 @@ -24,9 +24,14 @@ {% for cache in caches %} + + {% if cache.cache_mode == 'PARTITIONED' %} {% endif %} + {% if cache.cache_mode == 'REPLICATED' %} + + {% endif %} {% if cache.affinity %} @@ -39,6 +44,23 @@ {{ misc_utils.bean(cache.affinity_mapper) }} {% endif %} + + + + + + + + + + + + + + + + + {% endfor %} diff --git a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py index 5d323ed1337f5..b9ba94a94d663 100644 --- a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py @@ -29,6 +29,7 @@ from ignitetest.services.utils.ignite_configuration import IgniteConfiguration from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration from ignitetest.utils import cluster +from ignitetest.utils.bean import Bean from ignitetest.utils.ignite_test import IgniteTest from ignitetest.utils.version import LATEST_2_17, IgniteVersion @@ -41,19 +42,24 @@ class FlexTest(IgniteTest): @cluster(num_nodes=SERVERS + 1) def flex_test(self): + cacheAffinity = Bean("org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction", partitions=512) + ignite_config = IgniteConfiguration( version=self.IGNITE_VERSION, metrics_log_frequency = 0, failure_detection_timeout=self.FAILURE_DETECTION_TIMEOUT_SEC * 1000, caches=[CacheConfiguration( - name='test-cache', - atomicity_mode='ATOMIC' + name='TBG_SCS_DM_DOCUMENTS', + atomicity_mode='TRANSACTIONAL', + affinity = cacheAffinity, + cache_mode = 'REPLICATED' )] ) servers, start_servers_sec = start_servers(self.test_context, self.SERVERS, ignite_config) - servers.await_event(f"servers={self.SERVERS}, clients=0, state=ACTIVE, CPUs=", 30, from_the_beginning=True, nodes=servers.nodes) + servers.await_event(f"servers={self.SERVERS}, clients=0, state=ACTIVE, CPUs=", 30, from_the_beginning=True, + nodes=servers.nodes) failedNode = servers.nodes[self.SERVER_IDX_TO_DROP] failedNodeId = servers.node_id(failedNode) From ca073e6d76a6ebfedddbf7bd1a1c213a449b74ca Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 14 Jan 2026 17:11:17 +0300 Subject: [PATCH 03/26] + dataRegionCfg --- .../templates/ignite_configuration_macro.j2 | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 index fd1cbef5b60ec..bf3d05087654c 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 @@ -34,6 +34,27 @@ + + + + + + + + + + + + + + + + + + + + + From c956627e28827eb166882e0ea06f138bc56cf99b Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 14 Jan 2026 21:00:03 +0300 Subject: [PATCH 04/26] + loadApp --- .../tests/flex/FlexLoadApplication.java | 165 ++++++++++++++++++ .../services/utils/templates/cache_macro.j2 | 2 +- .../templates/ignite_configuration_macro.j2 | 21 --- .../tests/ignitetest/tests/flex/flex_test.py | 102 ++++++++--- 4 files changed, 244 insertions(+), 46 deletions(-) create mode 100644 modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java new file mode 100644 index 0000000000000..6cdc88fff4fd9 --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests.flex; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** */ +public class FlexLoadApplication extends IgniteAwareApplication { + /** */ + private static final int THREADS = 10; + + /** */ + private static final int START_TIME_WAIT_SEC = 20; + + /** */ + private static final int PRELOAD_TIME_SEC = 10; + + /** {@inheritDoc} */ + @Override public void run(JsonNode jsonNode) throws Exception { + createTable(); + + final ForkJoinPool executor = new ForkJoinPool(THREADS); + final CountDownLatch initLatch = new CountDownLatch(THREADS); + final AtomicLong counter = new AtomicLong(); + final AtomicBoolean preloaded = new AtomicBoolean(); + + log.info("TEST | Load pool parallelism=" + executor.getParallelism()); + + for (int i = 0; i < THREADS; ++i) { + executor.submit(() -> { + final Random rnd = new Random(); + boolean init = false; + + while (active()) { + try (Connection conn = thinJdbcDataSource.getConnection()) { + PreparedStatement ps = conn.prepareStatement("INSERT INTO SCS_DM_DOCUMENTS values(?,?,?)"); + + while (active()) { + long id = counter.incrementAndGet(); + + ps.setLong(1, id); + ps.setString(2, UUID.randomUUID().toString()); + ps.setBigDecimal(3, BigDecimal.valueOf(rnd.nextDouble())); + + int res = ps.executeUpdate(); + + if (res != 1) + throw new IllegalStateException("Failed to insert a row. The results is not 1."); + + if (!init) { + init = true; + + initLatch.countDown(); + } + } + } + catch (Throwable th) { + if (!preloaded.get()) { + log.error("TEST | Failed to preload. Marking as broken.", th); + + markBroken(th); + + synchronized (this) { + notifyAll(); + } + } + else + log.info("TEST | Failed to load. Recreating connection. Err: " + th.getMessage()); + } + } + }); + } + + if (!active()) + return; + + if (!initLatch.await(START_TIME_WAIT_SEC, TimeUnit.SECONDS)) { + Exception th = new IllegalStateException("Failed to start loading."); + + markBroken(th); + + throw th; + } + + log.info("TEST | Started " + THREADS + " loading threads. Preloading..."); + + synchronized (this) { + wait(PRELOAD_TIME_SEC * 1000); + } + + preloaded.set(true); + + log.info("TEST | Preloaded. Inserted about " + counter.get() + " records. Continue loading..."); + + markInitialized(); + + while (active()) { + synchronized (this) { + wait(150); + } + } + + log.info("TEST | Finished. Stopping..."); + + markFinished(); + } + + /** */ + private void createTable() throws Exception { + try (Connection conn = thinJdbcDataSource.getConnection()) { + conn.createStatement().execute("CREATE TABLE SCS_DM_DOCUMENTS(" + + "id INT, strVal VARCHAR, decVal DECIMAL, PRIMARY KEY(id)" + + ") WITH \"cache_name=TBG_SCS_DM_DOCUMENTS\"" + ); + + try { + ResultSet rs = conn.prepareStatement("SELECT count(1) FROM SCS_DM_DOCUMENTS").executeQuery(); + + rs.next(); + + int cnt = rs.getInt(1); + + if (cnt == 0) + log.info("TEST | Created table 'SCS_DM_DOCUMENTS'."); + else + throw new IllegalStateException("Unexpected empty table count: " + cnt); + } + catch (Exception t) { + t = new IllegalStateException("Failed to create table SCS_DM_DOCUMENTS.", t); + + markBroken(t); + + throw t; + } + } + + } +} diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 index d88c9af0f67b6..1a5002c333ab5 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 @@ -25,7 +25,7 @@ - + {% if cache.cache_mode == 'PARTITIONED' %} {% endif %} diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 index bf3d05087654c..fd1cbef5b60ec 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 @@ -34,27 +34,6 @@ - - - - - - - - - - - - - - - - - - - - - diff --git a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py index b9ba94a94d663..ecd0d878d7921 100644 --- a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py @@ -14,7 +14,7 @@ # limitations under the License. """ -Module contains discovery tests. +Module contains Flex tests. """ import os @@ -25,35 +25,106 @@ from typing import NamedTuple from ignitetest.services.ignite import IgniteService +from ignitetest.services.ignite_app import IgniteApplicationService from ignitetest.services.utils.ignite_aware import node_failed_event_pattern -from ignitetest.services.utils.ignite_configuration import IgniteConfiguration +from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration, \ + IgniteThinJdbcConfiguration from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration +from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration +from ignitetest.services.utils.ssl.client_connector_configuration import ClientConnectorConfiguration from ignitetest.utils import cluster from ignitetest.utils.bean import Bean from ignitetest.utils.ignite_test import IgniteTest from ignitetest.utils.version import LATEST_2_17, IgniteVersion - class FlexTest(IgniteTest): SERVERS = 3 SERVER_IDX_TO_DROP = 1 IGNITE_VERSION = LATEST_2_17 - FAILURE_DETECTION_TIMEOUT_SEC = 7 @cluster(num_nodes=SERVERS + 1) def flex_test(self): + servers, ignite_config = self.launch_cluster() + + load_app = self.start_load_app(servers, ignite_config.client_connector_configuration.port) + + self.kill_node(servers) + + self.logger.info("TEST | Stopping the load application ...") + + load_app.stop() + load_app.await_stopped() + + self.logger.info("TEST | The load application has stopped.") + + servers.stop() + + def kill_node(self, servers): + failedNode = servers.nodes[self.SERVER_IDX_TO_DROP] + failedNodeId = servers.node_id(failedNode) + + self.logger.info(f"TEST | 'kill -9' node {self.SERVER_IDX_TO_DROP} with id {failedNodeId} ...") + + servers.stop_node(failedNode, force_stop=True) + + self.logger.info("TEST | Awaiting for the node-failed-event...") + servers.await_event(node_failed_event_pattern(failedNodeId), 30, from_the_beginning=True, + nodes=[servers.nodes[0], servers.nodes[2]]) + + self.logger.info("TEST | Awaiting for the new cluster state...") + servers.await_event(f"servers={self.SERVERS - 1}, clients=0, state=ACTIVE, CPUs=", 30, from_the_beginning=True, + nodes=[servers.nodes[0], servers.nodes[2]]) + + self.logger.info("TEST | The cluster has detected the node failure.") + + def start_load_app(self, servers, jdbcPort): + jdbcPort = str(jdbcPort) + + addrs = [servers.nodes[0].account.hostname + ":" + jdbcPort, + servers.nodes[1].account.hostname + ":" + jdbcPort, + servers.nodes[2].account.hostname + ":" + jdbcPort] + + app = IgniteApplicationService( + self.test_context, + IgniteThinJdbcConfiguration( + version=self.IGNITE_VERSION, + addresses=addrs + ), + java_class_name="org.apache.ignite.internal.ducktest.tests.flex.FlexLoadApplication", + num_nodes=1 + ) + + self.logger.info("TEST | Starting the loading application...") + + app.start() + + self.logger.info("TEST | Waiting for the load application initialization...") + + app.await_started() + + self.logger.info("TEST | The load application has initialized.") + + return app + + def launch_cluster(self): cacheAffinity = Bean("org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction", partitions=512) ignite_config = IgniteConfiguration( version=self.IGNITE_VERSION, metrics_log_frequency = 0, - failure_detection_timeout=self.FAILURE_DETECTION_TIMEOUT_SEC * 1000, caches=[CacheConfiguration( name='TBG_SCS_DM_DOCUMENTS', atomicity_mode='TRANSACTIONAL', affinity = cacheAffinity, cache_mode = 'REPLICATED' - )] + )], + data_storage=DataStorageConfiguration( + default=DataRegionConfiguration( + initial_size = 2048 * 1024 * 1024, + max_size = 8192 * 1024 * 1024 + ) + ), + client_connector_configuration=ClientConnectorConfiguration() ) servers, start_servers_sec = start_servers(self.test_context, self.SERVERS, ignite_config) @@ -61,24 +132,7 @@ def flex_test(self): servers.await_event(f"servers={self.SERVERS}, clients=0, state=ACTIVE, CPUs=", 30, from_the_beginning=True, nodes=servers.nodes) - failedNode = servers.nodes[self.SERVER_IDX_TO_DROP] - failedNodeId = servers.node_id(failedNode) - - self.logger.info(f"'kill -9' node {self.SERVER_IDX_TO_DROP} with id {failedNodeId} ...") - - servers.stop_node(failedNode, force_stop=True) - - self.logger.info("Awaiting for the node-failed-event ...") - servers.await_event(node_failed_event_pattern(failedNodeId), self.FAILURE_DETECTION_TIMEOUT_SEC * 3, - from_the_beginning=True, nodes=[servers.nodes[0], servers.nodes[2]]) - - self.logger.info("Awaiting for the new cluster state ...") - servers.await_event(f"servers={self.SERVERS - 1}, clients=0, state=ACTIVE, CPUs=", self.FAILURE_DETECTION_TIMEOUT_SEC * 3, - from_the_beginning=True, nodes=[servers.nodes[0], servers.nodes[2]]) - - self.logger.info("The cluster has detected the node failure.") - - servers.stop() + return servers, ignite_config def start_servers(test_context, num_nodes, ignite_config, modules=None): """ From eb904ed0e096181fc3351b68b15e281ce72e79c9 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 14 Jan 2026 21:17:46 +0300 Subject: [PATCH 05/26] + wait load time --- .../ducktest/tests/flex/FlexLoadApplication.java | 6 +++--- .../tests/ignitetest/tests/flex/flex_test.py | 14 ++++++++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java index 6cdc88fff4fd9..f9bb0af08f20a 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java @@ -40,7 +40,7 @@ public class FlexLoadApplication extends IgniteAwareApplication { private static final int START_TIME_WAIT_SEC = 20; /** */ - private static final int PRELOAD_TIME_SEC = 10; + private static final int PRELOAD_TIME_SEC = 20; /** {@inheritDoc} */ @Override public void run(JsonNode jsonNode) throws Exception { @@ -117,7 +117,7 @@ public class FlexLoadApplication extends IgniteAwareApplication { preloaded.set(true); - log.info("TEST | Preloaded. Inserted about " + counter.get() + " records. Continue loading..."); + log.info("TEST | Preloaded. Loaded about " + counter.get() + " records. Continue loading..."); markInitialized(); @@ -127,7 +127,7 @@ public class FlexLoadApplication extends IgniteAwareApplication { } } - log.info("TEST | Finished. Stopping..."); + log.info("TEST | Stopping. Loaded about " + counter.get() + " records."); markFinished(); } diff --git a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py index ecd0d878d7921..51f2ea56f28d0 100644 --- a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py @@ -19,9 +19,9 @@ import os import random +import time from ducktape.mark import matrix from enum import IntEnum -from time import monotonic from typing import NamedTuple from ignitetest.services.ignite import IgniteService @@ -35,10 +35,12 @@ from ignitetest.utils import cluster from ignitetest.utils.bean import Bean from ignitetest.utils.ignite_test import IgniteTest -from ignitetest.utils.version import LATEST_2_17, IgniteVersion +from ignitetest.utils.version import LATEST_2_17 + class FlexTest(IgniteTest): SERVERS = 3 + LOAD_SECONDS = 30 SERVER_IDX_TO_DROP = 1 IGNITE_VERSION = LATEST_2_17 @@ -48,6 +50,10 @@ def flex_test(self): load_app = self.start_load_app(servers, ignite_config.client_connector_configuration.port) + self.logger.info(f"TEST | Loading the cluster for {self.LOAD_SECONDS} seconds...") + + time.sleep(self.LOAD_SECONDS) + self.kill_node(servers) self.logger.info("TEST | Stopping the load application ...") @@ -142,6 +148,6 @@ def start_servers(test_context, num_nodes, ignite_config, modules=None): # mute spam in log. jvm_opts=["-DIGNITE_DUMP_THREADS_ON_FAILURE=false"]) - start = monotonic() + start = time.monotonic() servers.start_async() - return servers, round(monotonic() - start, 1) \ No newline at end of file + return servers, round(time.monotonic() - start, 1) \ No newline at end of file From 887d848c508bb5098d85a4feedf2888b4f68c89b Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 14 Jan 2026 22:11:03 +0300 Subject: [PATCH 06/26] + idle verify --- .../ducktest/tests/flex/FlexLoadApplication.java | 4 ++-- .../ducktests/tests/ignitetest/tests/flex/flex_test.py | 10 +++++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java index f9bb0af08f20a..8d66cfb0ba5cd 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java @@ -34,13 +34,13 @@ /** */ public class FlexLoadApplication extends IgniteAwareApplication { /** */ - private static final int THREADS = 10; + private static final int THREADS = 16; /** */ private static final int START_TIME_WAIT_SEC = 20; /** */ - private static final int PRELOAD_TIME_SEC = 20; + private static final int PRELOAD_TIME_SEC = 5; /** {@inheritDoc} */ @Override public void run(JsonNode jsonNode) throws Exception { diff --git a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py index 51f2ea56f28d0..85b71cceef890 100644 --- a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py @@ -26,6 +26,7 @@ from ignitetest.services.ignite import IgniteService from ignitetest.services.ignite_app import IgniteApplicationService +from ignitetest.services.utils.control_utility import ControlUtility from ignitetest.services.utils.ignite_aware import node_failed_event_pattern from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration, \ IgniteThinJdbcConfiguration @@ -40,7 +41,7 @@ class FlexTest(IgniteTest): SERVERS = 3 - LOAD_SECONDS = 30 + LOAD_SECONDS = 5 SERVER_IDX_TO_DROP = 1 IGNITE_VERSION = LATEST_2_17 @@ -48,6 +49,9 @@ class FlexTest(IgniteTest): def flex_test(self): servers, ignite_config = self.launch_cluster() + control_utility = ControlUtility(servers) + control_utility.activate() + load_app = self.start_load_app(servers, ignite_config.client_connector_configuration.port) self.logger.info(f"TEST | Loading the cluster for {self.LOAD_SECONDS} seconds...") @@ -63,6 +67,10 @@ def flex_test(self): self.logger.info("TEST | The load application has stopped.") + output = control_utility.idle_verify("TBG_SCS_DM_DOCUMENTS") + + self.logger.info(f"TEST | Idle verify finished: {output}") + servers.stop() def kill_node(self, servers): From 7d9c929b21b5444b5db6eea202673967088eb7a1 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 14 Jan 2026 22:47:12 +0300 Subject: [PATCH 07/26] + load app params --- .../tests/flex/FlexLoadApplication.java | 23 ++++++++----------- .../tests/ignitetest/tests/flex/flex_test.py | 16 ++++++------- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java index 8d66cfb0ba5cd..9544eb48552e4 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java @@ -34,26 +34,23 @@ /** */ public class FlexLoadApplication extends IgniteAwareApplication { /** */ - private static final int THREADS = 16; - - /** */ - private static final int START_TIME_WAIT_SEC = 20; - - /** */ - private static final int PRELOAD_TIME_SEC = 5; + private final int WAIT_START_SECS = 20; /** {@inheritDoc} */ @Override public void run(JsonNode jsonNode) throws Exception { + final int preloadDurSec = jsonNode.get("preloadDurSec").asInt(); + final int threads = jsonNode.get("threads").asInt(); + createTable(); - final ForkJoinPool executor = new ForkJoinPool(THREADS); - final CountDownLatch initLatch = new CountDownLatch(THREADS); + final ForkJoinPool executor = new ForkJoinPool(threads); + final CountDownLatch initLatch = new CountDownLatch(threads); final AtomicLong counter = new AtomicLong(); final AtomicBoolean preloaded = new AtomicBoolean(); log.info("TEST | Load pool parallelism=" + executor.getParallelism()); - for (int i = 0; i < THREADS; ++i) { + for (int i = 0; i < threads; ++i) { executor.submit(() -> { final Random rnd = new Random(); boolean init = false; @@ -101,7 +98,7 @@ public class FlexLoadApplication extends IgniteAwareApplication { if (!active()) return; - if (!initLatch.await(START_TIME_WAIT_SEC, TimeUnit.SECONDS)) { + if (!initLatch.await(WAIT_START_SECS, TimeUnit.SECONDS)) { Exception th = new IllegalStateException("Failed to start loading."); markBroken(th); @@ -109,10 +106,10 @@ public class FlexLoadApplication extends IgniteAwareApplication { throw th; } - log.info("TEST | Started " + THREADS + " loading threads. Preloading..."); + log.info("TEST | Started " + threads + " loading threads. Preloading..."); synchronized (this) { - wait(PRELOAD_TIME_SEC * 1000); + wait(preloadDurSec * 1000); } preloaded.set(true); diff --git a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py index 85b71cceef890..f62400a5398ac 100644 --- a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py @@ -41,8 +41,10 @@ class FlexTest(IgniteTest): SERVERS = 3 - LOAD_SECONDS = 5 SERVER_IDX_TO_DROP = 1 + PRELOAD_SECONDS = 30 + LOAD_SECONDS = PRELOAD_SECONDS + LOAD_THREADS = 10 IGNITE_VERSION = LATEST_2_17 @cluster(num_nodes=SERVERS + 1) @@ -94,18 +96,16 @@ def kill_node(self, servers): def start_load_app(self, servers, jdbcPort): jdbcPort = str(jdbcPort) - addrs = [servers.nodes[0].account.hostname + ":" + jdbcPort, - servers.nodes[1].account.hostname + ":" + jdbcPort, + addrs = [servers.nodes[0].account.hostname + ":" + jdbcPort, servers.nodes[1].account.hostname + ":" + jdbcPort, servers.nodes[2].account.hostname + ":" + jdbcPort] app = IgniteApplicationService( self.test_context, - IgniteThinJdbcConfiguration( - version=self.IGNITE_VERSION, - addresses=addrs - ), + IgniteThinJdbcConfiguration(version=self.IGNITE_VERSION, addresses=addrs), java_class_name="org.apache.ignite.internal.ducktest.tests.flex.FlexLoadApplication", - num_nodes=1 + num_nodes=1, + params={"preloadDurSec": self.PRELOAD_SECONDS, "threads": self.LOAD_THREADS}, + startup_timeout_sec=self.PRELOAD_SECONDS + 10 ) self.logger.info("TEST | Starting the loading application...") From cc661f95a902149a0340d4ef9894b9d17c755658 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 14 Jan 2026 22:52:14 +0300 Subject: [PATCH 08/26] + larger ti ings --- modules/ducktests/tests/ignitetest/tests/flex/flex_test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py index f62400a5398ac..44275aa5c2efa 100644 --- a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py @@ -38,13 +38,12 @@ from ignitetest.utils.ignite_test import IgniteTest from ignitetest.utils.version import LATEST_2_17 - class FlexTest(IgniteTest): SERVERS = 3 SERVER_IDX_TO_DROP = 1 - PRELOAD_SECONDS = 30 - LOAD_SECONDS = PRELOAD_SECONDS - LOAD_THREADS = 10 + PRELOAD_SECONDS = 120 + LOAD_SECONDS = 40 + LOAD_THREADS = 8 IGNITE_VERSION = LATEST_2_17 @cluster(num_nodes=SERVERS + 1) From f5f9fc53b2345f941530ea70ed8244967cca2127 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 14 Jan 2026 23:41:10 +0300 Subject: [PATCH 09/26] + cacheParam --- .../ducktest/tests/flex/FlexLoadApplication.java | 11 ++++++----- .../tests/ignitetest/tests/flex/flex_test.py | 11 ++++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java index 9544eb48552e4..ef4e82e0da162 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java @@ -34,14 +34,15 @@ /** */ public class FlexLoadApplication extends IgniteAwareApplication { /** */ - private final int WAIT_START_SECS = 20; + private static final int WAIT_START_SECS = 20; /** {@inheritDoc} */ @Override public void run(JsonNode jsonNode) throws Exception { final int preloadDurSec = jsonNode.get("preloadDurSec").asInt(); final int threads = jsonNode.get("threads").asInt(); + final String cacheName = jsonNode.get("CACHE_NAME").asText(); - createTable(); + createTable(cacheName); final ForkJoinPool executor = new ForkJoinPool(threads); final CountDownLatch initLatch = new CountDownLatch(threads); @@ -57,7 +58,7 @@ public class FlexLoadApplication extends IgniteAwareApplication { while (active()) { try (Connection conn = thinJdbcDataSource.getConnection()) { - PreparedStatement ps = conn.prepareStatement("INSERT INTO SCS_DM_DOCUMENTS values(?,?,?)"); + PreparedStatement ps = conn.prepareStatement("INSERT INTO " + cacheName + " values(?,?,?)"); while (active()) { long id = counter.incrementAndGet(); @@ -130,11 +131,11 @@ public class FlexLoadApplication extends IgniteAwareApplication { } /** */ - private void createTable() throws Exception { + private void createTable(String cacheName) throws Exception { try (Connection conn = thinJdbcDataSource.getConnection()) { conn.createStatement().execute("CREATE TABLE SCS_DM_DOCUMENTS(" + "id INT, strVal VARCHAR, decVal DECIMAL, PRIMARY KEY(id)" + - ") WITH \"cache_name=TBG_SCS_DM_DOCUMENTS\"" + ") WITH \"cache_name=" + cacheName + "\"" ); try { diff --git a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py index 44275aa5c2efa..6882e5a8aeb31 100644 --- a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py @@ -41,10 +41,11 @@ class FlexTest(IgniteTest): SERVERS = 3 SERVER_IDX_TO_DROP = 1 - PRELOAD_SECONDS = 120 - LOAD_SECONDS = 40 + PRELOAD_SECONDS = 10 + LOAD_SECONDS = 10 LOAD_THREADS = 8 IGNITE_VERSION = LATEST_2_17 + CACHE_NAME = "TBG_SCS_DM_DOCUMENTS" @cluster(num_nodes=SERVERS + 1) def flex_test(self): @@ -68,7 +69,7 @@ def flex_test(self): self.logger.info("TEST | The load application has stopped.") - output = control_utility.idle_verify("TBG_SCS_DM_DOCUMENTS") + output = control_utility.idle_verify(self.CACHE_NAME) self.logger.info(f"TEST | Idle verify finished: {output}") @@ -103,7 +104,7 @@ def start_load_app(self, servers, jdbcPort): IgniteThinJdbcConfiguration(version=self.IGNITE_VERSION, addresses=addrs), java_class_name="org.apache.ignite.internal.ducktest.tests.flex.FlexLoadApplication", num_nodes=1, - params={"preloadDurSec": self.PRELOAD_SECONDS, "threads": self.LOAD_THREADS}, + params={"preloadDurSec": self.PRELOAD_SECONDS, "threads": self.LOAD_THREADS, "cacheName": self.CACHE_NAME}, startup_timeout_sec=self.PRELOAD_SECONDS + 10 ) @@ -126,7 +127,7 @@ def launch_cluster(self): version=self.IGNITE_VERSION, metrics_log_frequency = 0, caches=[CacheConfiguration( - name='TBG_SCS_DM_DOCUMENTS', + name=self.CACHE_NAME, atomicity_mode='TRANSACTIONAL', affinity = cacheAffinity, cache_mode = 'REPLICATED' From 05ab69d8c2a2051035ef13ab5ee78c23cfabfba5 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Wed, 14 Jan 2026 23:50:26 +0300 Subject: [PATCH 10/26] + params --- .../ducktest/tests/flex/FlexLoadApplication.java | 16 ++++++++-------- .../tests/ignitetest/tests/flex/flex_test.py | 8 +++++--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java index ef4e82e0da162..bcbcf7bcc3caa 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java @@ -40,9 +40,10 @@ public class FlexLoadApplication extends IgniteAwareApplication { @Override public void run(JsonNode jsonNode) throws Exception { final int preloadDurSec = jsonNode.get("preloadDurSec").asInt(); final int threads = jsonNode.get("threads").asInt(); - final String cacheName = jsonNode.get("CACHE_NAME").asText(); + final String cacheName = jsonNode.get("cacheName").asText(); + final String tableName = jsonNode.get("tableName").asText(); - createTable(cacheName); + createTable(cacheName, tableName); final ForkJoinPool executor = new ForkJoinPool(threads); final CountDownLatch initLatch = new CountDownLatch(threads); @@ -131,33 +132,32 @@ public class FlexLoadApplication extends IgniteAwareApplication { } /** */ - private void createTable(String cacheName) throws Exception { + private void createTable(String tableName, String cacheName) throws Exception { try (Connection conn = thinJdbcDataSource.getConnection()) { - conn.createStatement().execute("CREATE TABLE SCS_DM_DOCUMENTS(" + + conn.createStatement().execute("CREATE TABLE " + tableName + "(" + "id INT, strVal VARCHAR, decVal DECIMAL, PRIMARY KEY(id)" + ") WITH \"cache_name=" + cacheName + "\"" ); try { - ResultSet rs = conn.prepareStatement("SELECT count(1) FROM SCS_DM_DOCUMENTS").executeQuery(); + ResultSet rs = conn.prepareStatement("SELECT count(1) FROM " + tableName).executeQuery(); rs.next(); int cnt = rs.getInt(1); if (cnt == 0) - log.info("TEST | Created table 'SCS_DM_DOCUMENTS'."); + log.info("TEST | Created table '" + tableName + "'."); else throw new IllegalStateException("Unexpected empty table count: " + cnt); } catch (Exception t) { - t = new IllegalStateException("Failed to create table SCS_DM_DOCUMENTS.", t); + t = new IllegalStateException("Failed to create table " + tableName + ".", t); markBroken(t); throw t; } } - } } diff --git a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py index 6882e5a8aeb31..2050f7a0d91d0 100644 --- a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py @@ -41,11 +41,12 @@ class FlexTest(IgniteTest): SERVERS = 3 SERVER_IDX_TO_DROP = 1 - PRELOAD_SECONDS = 10 - LOAD_SECONDS = 10 + PRELOAD_SECONDS = 120 + LOAD_SECONDS = 40 LOAD_THREADS = 8 IGNITE_VERSION = LATEST_2_17 CACHE_NAME = "TBG_SCS_DM_DOCUMENTS" + TABLE_NAME = "SCS_DM_DOCUMENTS" @cluster(num_nodes=SERVERS + 1) def flex_test(self): @@ -104,7 +105,8 @@ def start_load_app(self, servers, jdbcPort): IgniteThinJdbcConfiguration(version=self.IGNITE_VERSION, addresses=addrs), java_class_name="org.apache.ignite.internal.ducktest.tests.flex.FlexLoadApplication", num_nodes=1, - params={"preloadDurSec": self.PRELOAD_SECONDS, "threads": self.LOAD_THREADS, "cacheName": self.CACHE_NAME}, + params={"preloadDurSec": self.PRELOAD_SECONDS, "threads": self.LOAD_THREADS, "cacheName": self.CACHE_NAME, + "tableName" : self.TABLE_NAME}, startup_timeout_sec=self.PRELOAD_SECONDS + 10 ) From 4b4f3e09a810ca02948a25e0a396456a75082328 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 15 Jan 2026 11:19:17 +0300 Subject: [PATCH 11/26] + bigger string on insert. + pool sizes --- .../ducktest/tests/flex/FlexLoadApplication.java | 9 ++++++--- .../utils/templates/ignite_configuration_macro.j2 | 4 ++++ .../ducktests/tests/ignitetest/tests/flex/flex_test.py | 5 +++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java index bcbcf7bcc3caa..44243e162ad8f 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java @@ -22,7 +22,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Random; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; @@ -65,7 +64,11 @@ public class FlexLoadApplication extends IgniteAwareApplication { long id = counter.incrementAndGet(); ps.setLong(1, id); - ps.setString(2, UUID.randomUUID().toString()); + + byte[] data = new byte[rnd.nextInt(2048)]; + rnd.nextBytes(data); + ps.setString(2, new String(data)); + ps.setBigDecimal(3, BigDecimal.valueOf(rnd.nextDouble())); int res = ps.executeUpdate(); @@ -136,7 +139,7 @@ private void createTable(String tableName, String cacheName) throws Exception { try (Connection conn = thinJdbcDataSource.getConnection()) { conn.createStatement().execute("CREATE TABLE " + tableName + "(" + "id INT, strVal VARCHAR, decVal DECIMAL, PRIMARY KEY(id)" + - ") WITH \"cache_name=" + cacheName + "\"" + ") WITH \"cache_name=" + cacheName + ",atomicity=TRANSACTIONAL\"" ); try { diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 index fd1cbef5b60ec..9d4585b27cedc 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 @@ -34,6 +34,10 @@ + + + + diff --git a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py index 2050f7a0d91d0..2c2ff99b3f158 100644 --- a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py @@ -136,8 +136,9 @@ def launch_cluster(self): )], data_storage=DataStorageConfiguration( default=DataRegionConfiguration( - initial_size = 2048 * 1024 * 1024, - max_size = 8192 * 1024 * 1024 + initial_size = 256 * 1024 * 1024, + max_size = 1024 * 1024 * 1024, + #persistence_enabled = True ) ), client_connector_configuration=ClientConnectorConfiguration() From 7cef4595b189ce0279e7c3f25d4f79d20ceb6370 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 15 Jan 2026 12:18:59 +0300 Subject: [PATCH 12/26] fixes, renamings --- .../MexLoadApplication.java} | 73 +++++++++++++------ modules/ducktests/tests/docker/run_tests.sh | 2 +- .../templates/ignite_configuration_macro.j2 | 20 +++-- .../services/utils/templates/log4j2.xml.j2 | 5 +- .../tests/{flex => mex}/__init__.py | 0 .../{flex/flex_test.py => mex/mex_test.py} | 45 +++++++----- 6 files changed, 95 insertions(+), 50 deletions(-) rename modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/{flex/FlexLoadApplication.java => mex/MexLoadApplication.java} (69%) rename modules/ducktests/tests/ignitetest/tests/{flex => mex}/__init__.py (100%) rename modules/ducktests/tests/ignitetest/tests/{flex/flex_test.py => mex/mex_test.py} (87%) diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexLoadApplication.java similarity index 69% rename from modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java rename to modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexLoadApplication.java index 44243e162ad8f..109e592a91611 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/flex/FlexLoadApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexLoadApplication.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.ducktest.tests.flex; +package org.apache.ignite.internal.ducktest.tests.mex; import java.math.BigDecimal; import java.sql.Connection; @@ -31,7 +31,7 @@ import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; /** */ -public class FlexLoadApplication extends IgniteAwareApplication { +public class MexLoadApplication extends IgniteAwareApplication { /** */ private static final int WAIT_START_SECS = 20; @@ -39,17 +39,17 @@ public class FlexLoadApplication extends IgniteAwareApplication { @Override public void run(JsonNode jsonNode) throws Exception { final int preloadDurSec = jsonNode.get("preloadDurSec").asInt(); final int threads = jsonNode.get("threads").asInt(); - final String cacheName = jsonNode.get("cacheName").asText(); final String tableName = jsonNode.get("tableName").asText(); + final String cacheName = jsonNode.get("cacheName").asText(); - createTable(cacheName, tableName); + createTable(tableName, cacheName); final ForkJoinPool executor = new ForkJoinPool(threads); final CountDownLatch initLatch = new CountDownLatch(threads); final AtomicLong counter = new AtomicLong(); final AtomicBoolean preloaded = new AtomicBoolean(); - log.info("TEST | Load pool parallelism=" + executor.getParallelism()); + log.info("TEST | Load pool parallelism=" + executor.getParallelism() + ", ig=" + ignite + ", client=" + client); for (int i = 0; i < threads; ++i) { executor.submit(() -> { @@ -58,23 +58,30 @@ public class FlexLoadApplication extends IgniteAwareApplication { while (active()) { try (Connection conn = thinJdbcDataSource.getConnection()) { - PreparedStatement ps = conn.prepareStatement("INSERT INTO " + cacheName + " values(?,?,?)"); + conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + conn.setAutoCommit(false); + + PreparedStatement ps = conn.prepareStatement("INSERT INTO " + tableName + " values(?,?,?)"); while (active()) { - long id = counter.incrementAndGet(); + for (int t = 0; t < 1 + rnd.nextInt(10); ++t) { + long id = counter.incrementAndGet(); - ps.setLong(1, id); + ps.setLong(1, id); - byte[] data = new byte[rnd.nextInt(2048)]; - rnd.nextBytes(data); - ps.setString(2, new String(data)); + byte[] data = new byte[rnd.nextInt(2048)]; + rnd.nextBytes(data); + ps.setString(2, new String(data)); - ps.setBigDecimal(3, BigDecimal.valueOf(rnd.nextDouble())); + ps.setBigDecimal(3, BigDecimal.valueOf(rnd.nextDouble())); - int res = ps.executeUpdate(); + int res = ps.executeUpdate(); - if (res != 1) - throw new IllegalStateException("Failed to insert a row. The results is not 1."); + if (res != 1) + throw new IllegalStateException("Failed to insert a row. The results is not 1."); + } + + conn.commit(); if (!init) { init = true; @@ -104,11 +111,9 @@ public class FlexLoadApplication extends IgniteAwareApplication { return; if (!initLatch.await(WAIT_START_SECS, TimeUnit.SECONDS)) { - Exception th = new IllegalStateException("Failed to start loading."); - - markBroken(th); + markBroken(new IllegalStateException("Failed to start loading.")); - throw th; + return; } log.info("TEST | Started " + threads + " loading threads. Preloading..."); @@ -119,8 +124,6 @@ public class FlexLoadApplication extends IgniteAwareApplication { preloaded.set(true); - log.info("TEST | Preloaded. Loaded about " + counter.get() + " records. Continue loading..."); - markInitialized(); while (active()) { @@ -131,9 +134,33 @@ public class FlexLoadApplication extends IgniteAwareApplication { log.info("TEST | Stopping. Loaded about " + counter.get() + " records."); + printCount(tableName); + markFinished(); } + /** */ + private void printCount(String tableName) throws Exception { + try (Connection conn = thinJdbcDataSource.getConnection()) { + try { + ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(1) FROM " + tableName); + + rs.next(); + + int cnt = rs.getInt(1); + + log.info("TEST | Table '" + tableName + "' contains " + cnt + " records."); + } + catch (Exception t) { + t = new IllegalStateException("Failed to get records count from table '" + tableName + "'.", t); + + markBroken(t); + + throw t; + } + } + } + /** */ private void createTable(String tableName, String cacheName) throws Exception { try (Connection conn = thinJdbcDataSource.getConnection()) { @@ -150,12 +177,12 @@ private void createTable(String tableName, String cacheName) throws Exception { int cnt = rs.getInt(1); if (cnt == 0) - log.info("TEST | Created table '" + tableName + "'."); + log.info("TEST | Created table '" + tableName + "' over cache '" + cacheName + "'."); else throw new IllegalStateException("Unexpected empty table count: " + cnt); } catch (Exception t) { - t = new IllegalStateException("Failed to create table " + tableName + ".", t); + t = new IllegalStateException("Failed to create table '" + tableName + "'.", t); markBroken(t); diff --git a/modules/ducktests/tests/docker/run_tests.sh b/modules/ducktests/tests/docker/run_tests.sh index a73a11b5068c7..99bee14d8b77f 100755 --- a/modules/ducktests/tests/docker/run_tests.sh +++ b/modules/ducktests/tests/docker/run_tests.sh @@ -31,7 +31,7 @@ IMAGE_PREFIX="ducker-ignite-eclipse-temurin" # DuckerTest parameters are specified with options to the script # Path to ducktests -TC_PATHS="./ignitetest/tests/flex" +TC_PATHS="./ignitetest/tests/mex" # Global parameters to pass to ducktape util with --global param GLOBALS="{}" # Ducktests parameters to pass to ducktape util with --parameters param diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 index 9d4585b27cedc..48945c50188ba 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 @@ -38,6 +38,20 @@ + + + + + + + + + + + + + + @@ -140,11 +154,7 @@ {% endif %} - {% if config.sql_configuration %} - - {{ misc_utils.bean(config.sql_configuration) }} - - {% endif %} + {{ misc_utils.plugins(config) }} diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 index a97631cfa03b1..30dfec548fa01 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 @@ -67,11 +67,14 @@ + + + - + diff --git a/modules/ducktests/tests/ignitetest/tests/flex/__init__.py b/modules/ducktests/tests/ignitetest/tests/mex/__init__.py similarity index 100% rename from modules/ducktests/tests/ignitetest/tests/flex/__init__.py rename to modules/ducktests/tests/ignitetest/tests/mex/__init__.py diff --git a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py similarity index 87% rename from modules/ducktests/tests/ignitetest/tests/flex/flex_test.py rename to modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index 2c2ff99b3f158..d74e1a6d6e2cc 100644 --- a/modules/ducktests/tests/ignitetest/tests/flex/flex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -29,7 +29,7 @@ from ignitetest.services.utils.control_utility import ControlUtility from ignitetest.services.utils.ignite_aware import node_failed_event_pattern from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration, \ - IgniteThinJdbcConfiguration + IgniteThinJdbcConfiguration, TransactionConfiguration from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration from ignitetest.services.utils.ssl.client_connector_configuration import ClientConnectorConfiguration @@ -38,31 +38,29 @@ from ignitetest.utils.ignite_test import IgniteTest from ignitetest.utils.version import LATEST_2_17 -class FlexTest(IgniteTest): + +class MexTest(IgniteTest): SERVERS = 3 SERVER_IDX_TO_DROP = 1 - PRELOAD_SECONDS = 120 - LOAD_SECONDS = 40 - LOAD_THREADS = 8 + PRELOAD_SECONDS = 90 + LOAD_SECONDS = 25 + LOAD_THREADS = 12 IGNITE_VERSION = LATEST_2_17 - CACHE_NAME = "TBG_SCS_DM_DOCUMENTS" - TABLE_NAME = "SCS_DM_DOCUMENTS" + CACHE_NAME = "TEST_CACHE" + TABLE_NAME = "TEST_TABLE" @cluster(num_nodes=SERVERS + 1) - def flex_test(self): - servers, ignite_config = self.launch_cluster() - - control_utility = ControlUtility(servers) - control_utility.activate() + def mex_test(self): + servers, control_utility, ignite_config = self.launch_cluster() load_app = self.start_load_app(servers, ignite_config.client_connector_configuration.port) + self.kill_node(servers) + self.logger.info(f"TEST | Loading the cluster for {self.LOAD_SECONDS} seconds...") time.sleep(self.LOAD_SECONDS) - self.kill_node(servers) - self.logger.info("TEST | Stopping the load application ...") load_app.stop() @@ -103,7 +101,7 @@ def start_load_app(self, servers, jdbcPort): app = IgniteApplicationService( self.test_context, IgniteThinJdbcConfiguration(version=self.IGNITE_VERSION, addresses=addrs), - java_class_name="org.apache.ignite.internal.ducktest.tests.flex.FlexLoadApplication", + java_class_name="org.apache.ignite.internal.ducktest.tests.mex.MexLoadApplication", num_nodes=1, params={"preloadDurSec": self.PRELOAD_SECONDS, "threads": self.LOAD_THREADS, "cacheName": self.CACHE_NAME, "tableName" : self.TABLE_NAME}, @@ -138,18 +136,25 @@ def launch_cluster(self): default=DataRegionConfiguration( initial_size = 256 * 1024 * 1024, max_size = 1024 * 1024 * 1024, - #persistence_enabled = True + persistence_enabled = True ) ), - client_connector_configuration=ClientConnectorConfiguration() + cluster_state = 'ACTIVE', + client_connector_configuration=ClientConnectorConfiguration(), + transaction_configuration=TransactionConfiguration( + default_tx_timeout=300000, + default_tx_isolation="READ_COMMITTED", + tx_timeout_on_partition_map_exchange=120000), ) servers, start_servers_sec = start_servers(self.test_context, self.SERVERS, ignite_config) - servers.await_event(f"servers={self.SERVERS}, clients=0, state=ACTIVE, CPUs=", 30, from_the_beginning=True, - nodes=servers.nodes) + servers.await_event(f"Topology snapshot \[ver={self.SERVERS}", 15, from_the_beginning=True, nodes=servers.nodes) + + control_utility = ControlUtility(servers) + control_utility.activate() - return servers, ignite_config + return servers, control_utility, ignite_config def start_servers(test_context, num_nodes, ignite_config, modules=None): """ From b8a673258601ba89ac9db67e93c18cf6b3dabc6b Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 15 Jan 2026 13:21:04 +0300 Subject: [PATCH 13/26] + counter app --- .../ducktest/tests/mex/MexCntApplication.java | 60 +++++++++++++++ .../tests/mex/MexLoadApplication.java | 31 +------- .../services/utils/templates/log4j2.xml.j2 | 6 +- .../tests/ignitetest/tests/mex/mex_test.py | 77 ++++++++++++++----- 4 files changed, 124 insertions(+), 50 deletions(-) create mode 100644 modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexCntApplication.java diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexCntApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexCntApplication.java new file mode 100644 index 0000000000000..935498de3994f --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexCntApplication.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests.mex; + +import java.sql.Connection; +import java.sql.ResultSet; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** */ +public class MexCntApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jsonNode) throws Exception { + final String tableName = jsonNode.get("tableName").asText(); + markInitialized(); + + recordResult("tableRowsCnt", printCount(tableName)); + + markFinished(); + } + + /** */ + protected int printCount(String tableName) throws Exception { + try (Connection conn = thinJdbcDataSource.getConnection()) { + try { + ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(1) FROM " + tableName); + + rs.next(); + + int cnt = rs.getInt(1); + + log.info("TEST | Table '" + tableName + "' contains " + cnt + " records."); + + return cnt; + } + catch (Exception t) { + log.error("Failed to get records count from table '" + tableName + "'. Error: " + t.getMessage(), t); + + markBroken(t); + + throw t; + } + } + } +} diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexLoadApplication.java index 109e592a91611..119c5c8b00efe 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexLoadApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexLoadApplication.java @@ -28,10 +28,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import com.fasterxml.jackson.databind.JsonNode; -import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; /** */ -public class MexLoadApplication extends IgniteAwareApplication { +public class MexLoadApplication extends MexCntApplication { /** */ private static final int WAIT_START_SECS = 20; @@ -116,7 +115,7 @@ public class MexLoadApplication extends IgniteAwareApplication { return; } - log.info("TEST | Started " + threads + " loading threads. Preloading..."); + log.info("TEST | Started " + threads + " loading threads. Preloading for " + preloadDurSec + " seconds..."); synchronized (this) { wait(preloadDurSec * 1000); @@ -128,39 +127,15 @@ public class MexLoadApplication extends IgniteAwareApplication { while (active()) { synchronized (this) { - wait(150); + wait(100); } } - log.info("TEST | Stopping. Loaded about " + counter.get() + " records."); - printCount(tableName); markFinished(); } - /** */ - private void printCount(String tableName) throws Exception { - try (Connection conn = thinJdbcDataSource.getConnection()) { - try { - ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(1) FROM " + tableName); - - rs.next(); - - int cnt = rs.getInt(1); - - log.info("TEST | Table '" + tableName + "' contains " + cnt + " records."); - } - catch (Exception t) { - t = new IllegalStateException("Failed to get records count from table '" + tableName + "'.", t); - - markBroken(t); - - throw t; - } - } - } - /** */ private void createTable(String tableName, String cacheName) throws Exception { try (Connection conn = thinJdbcDataSource.getConnection()) { diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 index 30dfec548fa01..708011dc07e15 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 @@ -67,14 +67,14 @@ - - +{# #} +{# #} - + diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index d74e1a6d6e2cc..7e838c95a8a5b 100644 --- a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -40,38 +40,57 @@ class MexTest(IgniteTest): + PRELOAD_SECONDS = 5 + LOAD_SECONDS = 90 + LOAD_THREADS = 30 SERVERS = 3 SERVER_IDX_TO_DROP = 1 - PRELOAD_SECONDS = 90 - LOAD_SECONDS = 25 - LOAD_THREADS = 12 IGNITE_VERSION = LATEST_2_17 CACHE_NAME = "TEST_CACHE" TABLE_NAME = "TEST_TABLE" - @cluster(num_nodes=SERVERS + 1) + @cluster(num_nodes=SERVERS + 3) def mex_test(self): + # Start the servers. servers, control_utility, ignite_config = self.launch_cluster() - load_app = self.start_load_app(servers, ignite_config.client_connector_configuration.port) + # Start the loading app. ant wait for some records preloaded. + app = self.start_load_app(servers, ignite_config.client_connector_configuration.port) + # The loading is on. Now, kill a server node. self.kill_node(servers) - self.logger.info(f"TEST | Loading the cluster for {self.LOAD_SECONDS} seconds...") - + # Continue the loading a bit longer. + self.logger.info(f"TEST | Loading the cluster for additional {self.LOAD_SECONDS} seconds...") time.sleep(self.LOAD_SECONDS) + # Stop the loading. self.logger.info("TEST | Stopping the load application ...") + app.stop() + app.await_stopped() - load_app.stop() - load_app.await_stopped() - + # Check idle verify. self.logger.info("TEST | The load application has stopped.") - output = control_utility.idle_verify(self.CACHE_NAME) - self.logger.info(f"TEST | Idle verify finished: {output}") + # Table rows cnt on srvr0. + app = self.start_cnt_app(servers, 0, ignite_config.client_connector_configuration.port) + rowCntOnNode0 = app.extract_result("tableRowsCnt") + app.stop() + app.await_stopped() + + # Table rows cnt on srvr2. + app = self.start_cnt_app(servers, 2, ignite_config.client_connector_configuration.port) + rowCntOnNode2 = app.extract_result("tableRowsCnt") + app.stop() + app.await_stopped() + + # Compare the rows cnt results. + assert rowCntOnNode0 == rowCntOnNode2 + self.logger.info(f"TEST | Detected {rowCntOnNode0} table rows on the alive servers.") + + # Finish the test. servers.stop() def kill_node(self, servers): @@ -105,21 +124,41 @@ def start_load_app(self, servers, jdbcPort): num_nodes=1, params={"preloadDurSec": self.PRELOAD_SECONDS, "threads": self.LOAD_THREADS, "cacheName": self.CACHE_NAME, "tableName" : self.TABLE_NAME}, - startup_timeout_sec=self.PRELOAD_SECONDS + 10 - ) + startup_timeout_sec=self.PRELOAD_SECONDS + 10) self.logger.info("TEST | Starting the loading application...") - app.start() self.logger.info("TEST | Waiting for the load application initialization...") - app.await_started() self.logger.info("TEST | The load application has initialized.") return app + def start_cnt_app(self, servers, nodeIdx, jdbcPort): + jdbcPort = str(jdbcPort) + + addrs = [servers.nodes[nodeIdx].account.hostname + ":" + jdbcPort] + + app = IgniteApplicationService( + self.test_context, + IgniteThinJdbcConfiguration(version=self.IGNITE_VERSION, addresses=addrs), + java_class_name="org.apache.ignite.internal.ducktest.tests.mex.MexCntApplication", + num_nodes=1, + params={"tableName" : self.TABLE_NAME}, + startup_timeout_sec=20) + + self.logger.info(f"TEST | Starting the counter application to server node {nodeIdx}...") + app.start() + + self.logger.info(f"TEST | Waiting for the counter application initialization to server node {nodeIdx}...") + app.await_started() + + self.logger.info(f"TEST | The counter application has initialized to server node {nodeIdx}.") + + return app + def launch_cluster(self): cacheAffinity = Bean("org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction", partitions=512) @@ -134,8 +173,8 @@ def launch_cluster(self): )], data_storage=DataStorageConfiguration( default=DataRegionConfiguration( - initial_size = 256 * 1024 * 1024, - max_size = 1024 * 1024 * 1024, + initial_size = 2147483648, + max_size = 8589934592, persistence_enabled = True ) ), @@ -149,7 +188,7 @@ def launch_cluster(self): servers, start_servers_sec = start_servers(self.test_context, self.SERVERS, ignite_config) - servers.await_event(f"Topology snapshot \[ver={self.SERVERS}", 15, from_the_beginning=True, nodes=servers.nodes) + servers.await_event(f"Topology snapshot \\[ver={self.SERVERS}", 15, from_the_beginning=True, nodes=servers.nodes) control_utility = ControlUtility(servers) control_utility.activate() From 03c0c32200856f4874d33418d16b3ad8973502d4 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 15 Jan 2026 16:18:45 +0300 Subject: [PATCH 14/26] detected --- .../cache/MapCacheStoreStrategy.java | 3 ++ .../cache/TestCacheStoreStrategy.java | 0 .../ignitetest/services/utils/jvm_utils.py | 2 +- .../services/utils/templates/cache_macro.j2 | 19 +++++++- .../tests/ignitetest/tests/mex/mex_test.py | 44 ++++++++++++------- 5 files changed, 49 insertions(+), 19 deletions(-) rename modules/core/src/{test => main}/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java (98%) rename modules/core/src/{test => main}/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java (100%) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java similarity index 98% rename from modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java index 07a471278492c..568df78735500 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java @@ -110,6 +110,9 @@ public class MapCacheStoreStrategy implements TestCacheStoreStrategy { /** Serializable {@link #map} backed cache store factory */ public static class MapStoreFactory implements Factory> { + /** */ + private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ @Override public CacheStore create() { return new MapCacheStore(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java similarity index 100% rename from modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java diff --git a/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py b/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py index b67c0a471bc80..d8271df7ed13c 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py +++ b/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py @@ -19,7 +19,7 @@ from ignitetest.services.utils.decorators import memoize -DEFAULT_HEAP = "768M" +DEFAULT_HEAP = "4G" JVM_PARAMS_GC_G1 = "-XX:+UseG1GC -XX:MaxGCPauseMillis=100 " \ "-XX:ConcGCThreads=$(((`nproc`/3)>1?(`nproc`/3):1)) " \ diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 index 1a5002c333ab5..498dd7eb2a8fc 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 @@ -47,8 +47,7 @@ - - + @@ -61,6 +60,22 @@ + + + + + + + + + + + + {% endfor %} diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index 7e838c95a8a5b..cb80c7765d06f 100644 --- a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -36,16 +36,16 @@ from ignitetest.utils import cluster from ignitetest.utils.bean import Bean from ignitetest.utils.ignite_test import IgniteTest -from ignitetest.utils.version import LATEST_2_17 +from ignitetest.utils.version import LATEST_2_17, DEV_BRANCH class MexTest(IgniteTest): - PRELOAD_SECONDS = 5 - LOAD_SECONDS = 90 - LOAD_THREADS = 30 + PRELOAD_SECONDS = 50 + LOAD_SECONDS = 15 + LOAD_THREADS = 8 SERVERS = 3 SERVER_IDX_TO_DROP = 1 - IGNITE_VERSION = LATEST_2_17 + IGNITE_VERSION = DEV_BRANCH CACHE_NAME = "TEST_CACHE" TABLE_NAME = "TEST_TABLE" @@ -69,26 +69,27 @@ def mex_test(self): app.stop() app.await_stopped() - # Check idle verify. - self.logger.info("TEST | The load application has stopped.") - output = control_utility.idle_verify(self.CACHE_NAME) - self.logger.info(f"TEST | Idle verify finished: {output}") - # Table rows cnt on srvr0. app = self.start_cnt_app(servers, 0, ignite_config.client_connector_configuration.port) rowCntOnNode0 = app.extract_result("tableRowsCnt") + self.logger.info(f"TEST | Partitions cnt on node0: {rowCntOnNode0}") app.stop() app.await_stopped() # Table rows cnt on srvr2. app = self.start_cnt_app(servers, 2, ignite_config.client_connector_configuration.port) rowCntOnNode2 = app.extract_result("tableRowsCnt") + self.logger.info(f"TEST | Partitions cnt on node2: {rowCntOnNode2}") app.stop() app.await_stopped() + # Check idle verify. + self.logger.info("TEST | The load application has stopped.") + output = control_utility.idle_verify(self.CACHE_NAME) + self.logger.info(f"TEST | Idle verify finished: {output}") + # Compare the rows cnt results. assert rowCntOnNode0 == rowCntOnNode2 - self.logger.info(f"TEST | Detected {rowCntOnNode0} table rows on the alive servers.") # Finish the test. servers.stop() @@ -124,7 +125,9 @@ def start_load_app(self, servers, jdbcPort): num_nodes=1, params={"preloadDurSec": self.PRELOAD_SECONDS, "threads": self.LOAD_THREADS, "cacheName": self.CACHE_NAME, "tableName" : self.TABLE_NAME}, - startup_timeout_sec=self.PRELOAD_SECONDS + 10) + startup_timeout_sec=self.PRELOAD_SECONDS + 10, + jvm_opts="-Xms4G -Xmx4G" + ) self.logger.info("TEST | Starting the loading application...") app.start() @@ -147,7 +150,9 @@ def start_cnt_app(self, servers, nodeIdx, jdbcPort): java_class_name="org.apache.ignite.internal.ducktest.tests.mex.MexCntApplication", num_nodes=1, params={"tableName" : self.TABLE_NAME}, - startup_timeout_sec=20) + startup_timeout_sec=10, + jvm_opts="-Xms4G -Xmx4G" + ) self.logger.info(f"TEST | Starting the counter application to server node {nodeIdx}...") app.start() @@ -173,9 +178,16 @@ def launch_cluster(self): )], data_storage=DataStorageConfiguration( default=DataRegionConfiguration( - initial_size = 2147483648, - max_size = 8589934592, - persistence_enabled = True + persistence_enabled = False, + + # initial_size = 128 * 1024 * 1024, + # max_size = 256 * 1024 * 1024, + + initial_size = 1024 * 1024 * 1024, + max_size = 2048 * 1024 * 1024, + + # initial_size = 2147483648, + # max_size = 17179869184, ) ), cluster_state = 'ACTIVE', From 13efca9d5fe25a6185623d1a2d8b94605173a618 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 15 Jan 2026 18:06:23 +0300 Subject: [PATCH 15/26] + 4th node --- .../services/utils/templates/cache_macro.j2 | 4 +- .../tests/ignitetest/tests/mex/mex_test.py | 80 ++++++++++--------- 2 files changed, 45 insertions(+), 39 deletions(-) diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 index 498dd7eb2a8fc..8c23a1ac5b654 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 @@ -61,11 +61,11 @@ - + diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index cb80c7765d06f..072177e77c986 100644 --- a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -40,16 +40,16 @@ class MexTest(IgniteTest): - PRELOAD_SECONDS = 50 - LOAD_SECONDS = 15 + PRELOAD_SECONDS = 60 + LOAD_SECONDS = 20 LOAD_THREADS = 8 - SERVERS = 3 + SERVERS = 4 SERVER_IDX_TO_DROP = 1 IGNITE_VERSION = DEV_BRANCH CACHE_NAME = "TEST_CACHE" TABLE_NAME = "TEST_TABLE" - @cluster(num_nodes=SERVERS + 3) + @cluster(num_nodes=SERVERS * 2) def mex_test(self): # Start the servers. servers, control_utility, ignite_config = self.launch_cluster() @@ -60,36 +60,35 @@ def mex_test(self): # The loading is on. Now, kill a server node. self.kill_node(servers) - # Continue the loading a bit longer. + # Keep the loading a bit longer. self.logger.info(f"TEST | Loading the cluster for additional {self.LOAD_SECONDS} seconds...") time.sleep(self.LOAD_SECONDS) # Stop the loading. - self.logger.info("TEST | Stopping the load application ...") + self.logger.debug("TEST | Stopping the load application ...") app.stop() app.await_stopped() - # Table rows cnt on srvr0. - app = self.start_cnt_app(servers, 0, ignite_config.client_connector_configuration.port) - rowCntOnNode0 = app.extract_result("tableRowsCnt") - self.logger.info(f"TEST | Partitions cnt on node0: {rowCntOnNode0}") - app.stop() - app.await_stopped() + records_cnt = set() - # Table rows cnt on srvr2. - app = self.start_cnt_app(servers, 2, ignite_config.client_connector_configuration.port) - rowCntOnNode2 = app.extract_result("tableRowsCnt") - self.logger.info(f"TEST | Partitions cnt on node2: {rowCntOnNode2}") - app.stop() - app.await_stopped() + for node in self.alive_servers(servers.nodes): + app = self.start_cnt_app(node, ignite_config.client_connector_configuration.port) + app.await_started() + + cnt = app.extract_result("tableRowsCnt") + records_cnt.add(cnt) + self.logger.info(f"TEST | Partitions cnt on node {node}: {cnt}") + + app.stop() + app.await_stopped() - # Check idle verify. + # Run the idle verify. self.logger.info("TEST | The load application has stopped.") output = control_utility.idle_verify(self.CACHE_NAME) self.logger.info(f"TEST | Idle verify finished: {output}") # Compare the rows cnt results. - assert rowCntOnNode0 == rowCntOnNode2 + assert len(records_cnt) == 1; # Finish the test. servers.stop() @@ -97,26 +96,33 @@ def mex_test(self): def kill_node(self, servers): failedNode = servers.nodes[self.SERVER_IDX_TO_DROP] failedNodeId = servers.node_id(failedNode) + alive_servers = self.alive_servers(servers.nodes) self.logger.info(f"TEST | 'kill -9' node {self.SERVER_IDX_TO_DROP} with id {failedNodeId} ...") servers.stop_node(failedNode, force_stop=True) - self.logger.info("TEST | Awaiting for the node-failed-event...") - servers.await_event(node_failed_event_pattern(failedNodeId), 30, from_the_beginning=True, - nodes=[servers.nodes[0], servers.nodes[2]]) + self.logger.debug("TEST | Awaiting for the node-failed-event...") + servers.await_event(node_failed_event_pattern(failedNodeId), 30, from_the_beginning=True, nodes = alive_servers) - self.logger.info("TEST | Awaiting for the new cluster state...") + self.logger.debug("TEST | Awaiting for the new cluster state...") servers.await_event(f"servers={self.SERVERS - 1}, clients=0, state=ACTIVE, CPUs=", 30, from_the_beginning=True, - nodes=[servers.nodes[0], servers.nodes[2]]) + nodes = alive_servers) self.logger.info("TEST | The cluster has detected the node failure.") - def start_load_app(self, servers, jdbcPort): + def alive_servers(self, nodes): + return [item for index, item in enumerate(nodes) if index != self.SERVER_IDX_TO_DROP] + + def alive_servers_addrs(self, nodes, jdbcPort): + nodes = self.alive_servers(nodes) + jdbcPort = str(jdbcPort) - addrs = [servers.nodes[0].account.hostname + ":" + jdbcPort, servers.nodes[1].account.hostname + ":" + jdbcPort, - servers.nodes[2].account.hostname + ":" + jdbcPort] + return [n.account.hostname + ":" + jdbcPort for n in nodes] + + def start_load_app(self, servers, jdbcPort): + addrs = self.alive_servers_addrs(servers.nodes, jdbcPort) app = IgniteApplicationService( self.test_context, @@ -129,20 +135,20 @@ def start_load_app(self, servers, jdbcPort): jvm_opts="-Xms4G -Xmx4G" ) - self.logger.info("TEST | Starting the loading application...") + self.logger.debug("TEST | Starting the loading application...") app.start() - self.logger.info("TEST | Waiting for the load application initialization...") + self.logger.debug("TEST | Waiting for the load application initialization...") app.await_started() self.logger.info("TEST | The load application has initialized.") return app - def start_cnt_app(self, servers, nodeIdx, jdbcPort): + def start_cnt_app(self, node, jdbcPort): jdbcPort = str(jdbcPort) - addrs = [servers.nodes[nodeIdx].account.hostname + ":" + jdbcPort] + addrs = [node.account.hostname + ":" + jdbcPort] app = IgniteApplicationService( self.test_context, @@ -154,13 +160,13 @@ def start_cnt_app(self, servers, nodeIdx, jdbcPort): jvm_opts="-Xms4G -Xmx4G" ) - self.logger.info(f"TEST | Starting the counter application to server node {nodeIdx}...") + self.logger.debug(f"TEST | Starting the counter application to server node {node}...") app.start() - self.logger.info(f"TEST | Waiting for the counter application initialization to server node {nodeIdx}...") + self.logger.debug(f"TEST | Waiting for the counter application initialization to server node {node}...") app.await_started() - self.logger.info(f"TEST | The counter application has initialized to server node {nodeIdx}.") + self.logger.info(f"TEST | The counter application has initialized to server node {node}.") return app @@ -183,8 +189,8 @@ def launch_cluster(self): # initial_size = 128 * 1024 * 1024, # max_size = 256 * 1024 * 1024, - initial_size = 1024 * 1024 * 1024, - max_size = 2048 * 1024 * 1024, + initial_size = 4096 * 1024 * 1024, + max_size = 4096 * 1024 * 1024, # initial_size = 2147483648, # max_size = 17179869184, From 81331ebef4196008c60ae847edc56584bc6ca00c Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 15 Jan 2026 19:44:09 +0300 Subject: [PATCH 16/26] well reproducing --- .../services/utils/templates/cache_macro.j2 | 26 ++++++++++++------- .../tests/ignitetest/tests/mex/mex_test.py | 6 ++--- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 index 8c23a1ac5b654..616e4a6c01629 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 @@ -55,21 +55,27 @@ - - - - - - + - - - +{# #} +{# #} +{# #} +{# #} +{# #} +{# #} +{# #} +{# #} +{# #} +{# #} diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index ce0e9733c2b94..ab959b06530ba 100644 --- a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -38,11 +38,11 @@ from ignitetest.utils.ignite_test import IgniteTest from ignitetest.utils.version import LATEST_2_17, DEV_BRANCH - +# Run: clear; ./docker/clean_up.sh; rm -drf ../../../results/*; ./docker/run_tests.sh -t ./ignitetest/tests/mex class MexTest(IgniteTest): PRELOAD_SECONDS = 60 LOAD_SECONDS = PRELOAD_SECONDS / 3 - FORCE_STOP = False + FORCE_STOP = True TRANSACTION = True LOAD_THREADS = 8 SERVERS = 4 @@ -101,23 +101,22 @@ def kill_node(self, servers): alive_servers = self.alive_servers(servers.nodes) if self.FORCE_STOP: - self.logger.info(f"TEST | 'kill -9' node {self.SERVER_IDX_TO_DROP} with id {failedNodeId} ...") + self.logger.info(f"TEST | 'kill -9' node {self.SERVER_IDX_TO_DROP} with id {failedNodeId}...") servers.stop_node(failedNode, force_stop=True) self.logger.debug("TEST | Awaiting for the node-failed-event...") servers.await_event(node_failed_event_pattern(failedNodeId), 30, from_the_beginning=True, nodes=alive_servers) - else: - self.logger.info(f"TEST | stopping node {self.SERVER_IDX_TO_DROP} with id {failedNodeId} ...") + self.logger.info("TEST | The cluster has detected the node failure.") + else: + self.logger.info(f"TEST | stopping node {self.SERVER_IDX_TO_DROP} with id {failedNodeId}...") servers.stop_node(failedNode, force_stop=False) self.logger.debug("TEST | Awaiting for the new cluster state...") servers.await_event(f"servers={self.SERVERS - 1}, clients=0, state=ACTIVE, CPUs=", 30, from_the_beginning=True, nodes=alive_servers) - self.logger.info("TEST | The cluster has detected the node failure.") - def alive_servers(self, nodes): return [item for index, item in enumerate(nodes) if index != self.SERVER_IDX_TO_DROP] From 8725fc03338e641a5cf721c728fec5bd27643bb6 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 16 Jan 2026 12:51:11 +0300 Subject: [PATCH 20/26] failed with forceStop == false --- .../utils/templates/ignite_configuration_macro.j2 | 4 ++-- .../ducktests/tests/ignitetest/tests/mex/mex_test.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 index d02611d243d38..48945c50188ba 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 @@ -34,11 +34,11 @@ - + diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index ab959b06530ba..e1af85a8c8d14 100644 --- a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -40,12 +40,12 @@ # Run: clear; ./docker/clean_up.sh; rm -drf ../../../results/*; ./docker/run_tests.sh -t ./ignitetest/tests/mex class MexTest(IgniteTest): - PRELOAD_SECONDS = 60 + PRELOAD_SECONDS = 40 LOAD_SECONDS = PRELOAD_SECONDS / 3 - FORCE_STOP = True + FORCE_STOP = False TRANSACTION = True - LOAD_THREADS = 8 - SERVERS = 4 + LOAD_THREADS = 12 + SERVERS = 3 SERVER_IDX_TO_DROP = 1 IGNITE_VERSION = DEV_BRANCH CACHE_NAME = "TEST_CACHE" @@ -110,7 +110,7 @@ def kill_node(self, servers): self.logger.info("TEST | The cluster has detected the node failure.") else: - self.logger.info(f"TEST | stopping node {self.SERVER_IDX_TO_DROP} with id {failedNodeId}...") + self.logger.info(f"TEST | gracefully stopping node {self.SERVER_IDX_TO_DROP} with id {failedNodeId}...") servers.stop_node(failedNode, force_stop=False) self.logger.debug("TEST | Awaiting for the new cluster state...") From 7d1c166d9e6f047c473323f6ca19e7aa86b3436c Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 16 Jan 2026 13:04:39 +0300 Subject: [PATCH 21/26] failed with forceStop == false, transaction -- False --- modules/ducktests/tests/ignitetest/tests/mex/mex_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index e1af85a8c8d14..677b091f8c703 100644 --- a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -40,10 +40,10 @@ # Run: clear; ./docker/clean_up.sh; rm -drf ../../../results/*; ./docker/run_tests.sh -t ./ignitetest/tests/mex class MexTest(IgniteTest): + FORCE_STOP = False + TRANSACTION = False PRELOAD_SECONDS = 40 LOAD_SECONDS = PRELOAD_SECONDS / 3 - FORCE_STOP = False - TRANSACTION = True LOAD_THREADS = 12 SERVERS = 3 SERVER_IDX_TO_DROP = 1 @@ -79,7 +79,7 @@ def mex_test(self): cnt = app.extract_result("tableRowsCnt") records_cnt.add(cnt) - self.logger.info(f"TEST | Partitions cnt on node {node}: {cnt}") + self.logger.info(f"TEST | Partitions cnt on node {node.account.hostname}: {cnt}") app.stop() app.await_stopped() From e37f2d3ae09cc227f64c8ca898ea84c0a21e61c6 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 16 Jan 2026 13:18:00 +0300 Subject: [PATCH 22/26] + waiting after load --- modules/ducktests/tests/ignitetest/tests/mex/mex_test.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index 677b091f8c703..4fc1f605fcb6c 100644 --- a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -42,6 +42,7 @@ class MexTest(IgniteTest): FORCE_STOP = False TRANSACTION = False + WAIT_AFTER_LOAD_SEC = 5 PRELOAD_SECONDS = 40 LOAD_SECONDS = PRELOAD_SECONDS / 3 LOAD_THREADS = 12 @@ -71,6 +72,10 @@ def mex_test(self): app.stop() app.await_stopped() + if self.WAIT_AFTER_LOAD_SEC > 0: + self.logger.info(f"TEST | waiting after load for {self.WAIT_AFTER_LOAD_SEC} seconds...") + time.sleep(self.WAIT_AFTER_LOAD_SEC) + records_cnt = set() for node in self.alive_servers(servers.nodes): From 6c6fb8512f15216e6bc2fb6fef6b1f11c53e6d77 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 22 Jan 2026 18:45:57 +0300 Subject: [PATCH 23/26] fix for ext. storage --- .../ducktest/utils/TestCacheStore.java | 133 ++++++++++++++++++ .../utils/ignite_configuration/cache.py | 2 + .../services/utils/templates/cache_macro.j2 | 28 ++-- .../tests/ignitetest/tests/mex/mex_test.py | 69 +++++++-- 4 files changed, 205 insertions(+), 27 deletions(-) create mode 100644 modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java new file mode 100644 index 0000000000000..4aa6b0d78001e --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java @@ -0,0 +1,133 @@ +package org.apache.ignite.internal.ducktest.utils; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.BinaryObjectImpl; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.jetbrains.annotations.Nullable; + +public class TestCacheStore implements CacheStore { + private final Ignite ignite; + + private final IgniteCache cache; + + private final AtomicBoolean registerScheme = new AtomicBoolean(); + + public TestCacheStore() { + System.err.println("TEST | Starting ExtCacheStore"); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder() + .setAddresses(F.asList("ducker02:45000")); + + cfg.setDiscoverySpi( + new TcpDiscoverySpi() + .setIpFinder(ipFinder) + .setLocalPort(45000) + ).setCommunicationSpi( + new TcpCommunicationSpi().setLocalPort(46000) + ).setClientMode(true) + .setPeerClassLoadingEnabled(true); + + ignite = Ignition.start(cfg); + + System.err.println("TEST | Started ExtCacheStore"); + + cache = ignite.cache("EXT_STORAGE_CACHE"); + + assert cache != null; + + System.err.println("TEST | cache: " + cache); + } + + @Override public void loadCache(IgniteBiInClosure clo, @Nullable Object... args) throws CacheLoaderException { + throw new UnsupportedOperationException(); + } + + @Override public void sessionEnd(boolean commit) throws CacheWriterException { + // No-op. + } + + @Override public Object load(Object key) throws CacheLoaderException { + System.err.println("TEST | ExternalStorage: load, keyClass=" + key.getClass().getSimpleName()); + + return cache.get(key); + } + + @Override public Map loadAll(Iterable keys) throws CacheLoaderException { + System.err.println("TEST | ExternalStorage: loadAll"); + + Map res = new HashMap<>(); + + keys.forEach(key -> res.put(key, load(key))); + + return res; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + // System.err.println("TEST | ExternalStorage: write, keyClass=" + entry.getValue().getClass().getSimpleName()); + Object val = entry.getValue(); + +// if (val instanceof BinaryObjectImpl) { +//// if (val instanceof BinaryObjectImpl && !registerScheme.get()) { +// //synchronized (registerScheme) { +// //if (!registerScheme.get()) { +// BinaryObjectImpl boj = (BinaryObjectImpl)val; +// +// //System.err.println("TEST | ExternalStorage: write, src biaryObject: " + boj); +// +// val = ignite.binary().builder(boj).build(); +// +// //System.err.println("TEST | ExternalStorage: write, new biaryObject: " + newBoj); +// +// //registerScheme.set(true); +// // } +// //} +// } + + cache.put(entry.getKey(), val); + } + + @Override public void writeAll(Collection> entries) throws CacheWriterException { + entries.forEach(e -> cache.put(e.getKey(), e.getValue())); + } + + @Override public void delete(Object key) throws CacheWriterException { + cache.remove(key); + } + + @Override public void deleteAll(Collection keys) throws CacheWriterException { + Set keys0 = keys instanceof Set ? (Set)keys : new HashSet<>(keys); + + cache.removeAll(keys0); + } + + public static final class IgniteCacheStoreFactory implements Factory> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new TestCacheStore(); + } + } +} diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py index 0dd0d39f8afbf..33e9ec08d1cc7 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py +++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py @@ -35,3 +35,5 @@ class CacheConfiguration(NamedTuple): statistics_enabled: bool = True affinity: Bean = None affinity_mapper: Bean = None + external_storage: bool = False + keep_binary: bool = False diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 index 616e4a6c01629..287f6b2e8fa56 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 @@ -44,27 +44,25 @@ {{ misc_utils.bean(cache.affinity_mapper) }} {% endif %} - - - - - - - - - - + {% if cache.keep_binary %} + + {% endif %} - - - - - + {% if cache.external_storage %} + + + + + + + {% endif %} + + {# #} {# #} diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index 4fc1f605fcb6c..efbdfa092b121 100644 --- a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -29,21 +29,23 @@ from ignitetest.services.utils.control_utility import ControlUtility from ignitetest.services.utils.ignite_aware import node_failed_event_pattern from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration, \ - IgniteThinJdbcConfiguration, TransactionConfiguration + IgniteThinJdbcConfiguration, TransactionConfiguration, DiscoverySpi, CommunicationSpi from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration from ignitetest.services.utils.ssl.client_connector_configuration import ClientConnectorConfiguration from ignitetest.utils import cluster from ignitetest.utils.bean import Bean from ignitetest.utils.ignite_test import IgniteTest +from ignitetest.services.utils.ignite_configuration.discovery import TcpDiscoverySpi, TcpDiscoveryVmIpFinder +from ignitetest.services.utils.ignite_configuration.communication import TcpCommunicationSpi from ignitetest.utils.version import LATEST_2_17, DEV_BRANCH # Run: clear; ./docker/clean_up.sh; rm -drf ../../../results/*; ./docker/run_tests.sh -t ./ignitetest/tests/mex class MexTest(IgniteTest): - FORCE_STOP = False + FORCE_STOP = True TRANSACTION = False - WAIT_AFTER_LOAD_SEC = 5 - PRELOAD_SECONDS = 40 + WAIT_AFTER_LOAD_SEC = 0 + PRELOAD_SECONDS = 15 LOAD_SECONDS = PRELOAD_SECONDS / 3 LOAD_THREADS = 12 SERVERS = 3 @@ -52,8 +54,11 @@ class MexTest(IgniteTest): CACHE_NAME = "TEST_CACHE" TABLE_NAME = "TEST_TABLE" - @cluster(num_nodes=SERVERS * 2) + @cluster(num_nodes=SERVERS * 2 + 1) def mex_test(self): + # Start the external storage + es = self.start_ext_storage() + # Start the servers. servers, control_utility, ignite_config = self.launch_cluster() @@ -71,11 +76,17 @@ def mex_test(self): self.logger.debug("TEST | Stopping the load application ...") app.stop() app.await_stopped() + self.logger.info("TEST | The load application has stopped.") if self.WAIT_AFTER_LOAD_SEC > 0: self.logger.info(f"TEST | waiting after load for {self.WAIT_AFTER_LOAD_SEC} seconds...") time.sleep(self.WAIT_AFTER_LOAD_SEC) + # Run the idle verify. + output = control_utility.idle_verify(self.CACHE_NAME) + self.logger.info(f"TEST | Idle verify finished: {output}") + + # Count and compare table size from dirrefent nodes. records_cnt = set() for node in self.alive_servers(servers.nodes): @@ -89,16 +100,11 @@ def mex_test(self): app.stop() app.await_stopped() - # Run the idle verify. - self.logger.info("TEST | The load application has stopped.") - output = control_utility.idle_verify(self.CACHE_NAME) - self.logger.info(f"TEST | Idle verify finished: {output}") - - # Compare the rows cnt results. assert len(records_cnt) == 1; # Finish the test. servers.stop() + es.stop() def kill_node(self, servers): failedNode = servers.nodes[self.SERVER_IDX_TO_DROP] @@ -181,9 +187,42 @@ def start_cnt_app(self, node, jdbcPort): return app + def start_ext_storage(self): + ignite_config = IgniteConfiguration( + version=self.IGNITE_VERSION, + metrics_log_frequency = 0, + caches=[CacheConfiguration( + name='EXT_STORAGE_CACHE', + keep_binary = True + )], + data_storage=DataStorageConfiguration( + default=DataRegionConfiguration( + persistence_enabled = True, + initial_size = 2048 * 1024 * 1024, + max_size = 4096 * 1024 * 1024, + ) + ), + cluster_state = 'ACTIVE', + discovery_spi = TcpDiscoverySpi(port=45000), + communication_spi = TcpCommunicationSpi(local_port=46000), + client_mode = False + ) + + ext_store, _ = start_servers(self.test_context, 1, ignite_config) + + ext_store.await_event("Topology snapshot \\[ver=1", 10, from_the_beginning=True, nodes=ext_store.nodes) + + control_utility = ControlUtility(ext_store) + control_utility.activate() + + return ext_store + def launch_cluster(self): cacheAffinity = Bean("org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction", partitions=512) + ip_finder = TcpDiscoveryVmIpFinder() + ip_finder.addresses = ["ducker03","ducker04","ducker05","ducker06","ducker07"] + ignite_config = IgniteConfiguration( version=self.IGNITE_VERSION, metrics_log_frequency = 0, @@ -191,7 +230,9 @@ def launch_cluster(self): name=self.CACHE_NAME, atomicity_mode='TRANSACTIONAL', affinity = cacheAffinity, - cache_mode = 'REPLICATED' + cache_mode = 'REPLICATED', + external_storage = True, + keep_binary = True )], data_storage=DataStorageConfiguration( default=DataRegionConfiguration( @@ -213,6 +254,10 @@ def launch_cluster(self): default_tx_timeout=300000, default_tx_isolation="READ_COMMITTED", tx_timeout_on_partition_map_exchange=120000), + discovery_spi = TcpDiscoverySpi( + ip_finder=ip_finder + ), + client_mode = False ) servers, start_servers_sec = start_servers(self.test_context, self.SERVERS, ignite_config) From 681e76e0628b1b26366dc237701175c453f4a534 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Thu, 22 Jan 2026 18:45:57 +0300 Subject: [PATCH 24/26] fix for ext. storage --- .../ignite/IgniteJdbcThinDataSource.java | 6 +- .../jdbc/thin/ConnectionPropertiesImpl.java | 6 + .../ducktest/utils/TestCacheStore.java | 183 ++++++++++++++++++ .../utils/ignite_configuration/cache.py | 2 + .../services/utils/templates/cache_macro.j2 | 28 ++- .../tests/ignitetest/tests/mex/mex_test.py | 65 ++++++- 6 files changed, 264 insertions(+), 26 deletions(-) create mode 100644 modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java index 510c7a56854b3..27d4ba0a919af 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java @@ -59,7 +59,11 @@ public class IgniteJdbcThinDataSource implements DataSource, Serializable { if (!F.isEmpty(pwd)) props.put("password", pwd); - return IgniteJdbcThinDriver.register().connect(getUrl(), props); + String url = getUrl(); + + System.err.println("TEST | JDBC connection URL: " + url); + + return IgniteJdbcThinDriver.register().connect(url, props); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java index c1d62ea7daac6..226327685f13e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java @@ -21,6 +21,8 @@ import java.sql.DriverPropertyInfo; import java.sql.SQLException; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.StringTokenizer; import org.apache.ignite.IgniteCheckedException; @@ -316,6 +318,10 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa HostAndPortRange[] addrs = getAddresses(); + List ranges = Arrays.asList(addrs); + Collections.shuffle(ranges); + addrs = ranges.toArray(new HostAndPortRange[ranges.size()]); + for (int i = 0; i < addrs.length; i++) { if (i > 0) sbUrl.append(','); diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java new file mode 100644 index 0000000000000..f70096a7b079c --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java @@ -0,0 +1,183 @@ +package org.apache.ignite.internal.ducktest.utils; + +import java.math.BigDecimal; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.BinaryObjectImpl; +import org.apache.ignite.internal.binary.builder.BinaryObjectBuilders; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.jetbrains.annotations.Nullable; + +public class TestCacheStore implements CacheStore { + private final Ignite ignite; + + private final IgniteCache cache; + + private final AtomicBoolean registerScheme = new AtomicBoolean(); + + private final AtomicReference binObjRef = new AtomicReference<>(); + + public TestCacheStore() { + System.err.println("TEST | Starting ExtCacheStore"); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder() + .setAddresses(F.asList("ducker02:45000")); + + cfg.setDiscoverySpi( + new TcpDiscoverySpi() + .setIpFinder(ipFinder) + .setLocalPort(45000) + ).setCommunicationSpi( + new TcpCommunicationSpi().setLocalPort(46000) + ).setClientMode(true) + .setPeerClassLoadingEnabled(true); + + ignite = Ignition.start(cfg); + + System.err.println("TEST | Started ExtCacheStore"); + + cache = ignite.cache("EXT_STORAGE_CACHE"); + + assert cache != null; + + System.err.println("TEST | cache: " + cache); + } + + @Override public void loadCache(IgniteBiInClosure clo, @Nullable Object... args) throws CacheLoaderException { + throw new UnsupportedOperationException(); + } + + @Override public void sessionEnd(boolean commit) throws CacheWriterException { + // No-op. + } + + @Override public Object load(Object key) throws CacheLoaderException { + System.err.println("TEST | ExternalStorage: load, key=" + key); + + DTO val = (DTO)cache.get(key); + + System.err.println("TEST | ExternalStorage: load, value=" + val); + + if (val == null) + return null; + + BinaryObjectImpl bo = binObjRef.get(); + +// BinaryObjectBuilder bob = BinaryObjectBuilders.builder(bo); + + BinaryObjectBuilder bob = BinaryObjectBuilders.builder(bo.context(), + bo.context().metadata0(bo.typeId()).typeName()); + + assert bo != null; + + bob.setField("STRVAL", val.strVal); + bob.setField("DECVAL", val.decVal); + + BinaryObject newBo = bob.build(); + + System.err.println("TEST | ExternalStorage: load, key=" + key + ", bo=" + bo + ", newBo=" + newBo); + + return newBo; + } + + @Override public Map loadAll(Iterable keys) throws CacheLoaderException { + System.err.println("TEST | ExternalStorage: loadAll"); + + Map res = new HashMap<>(); + + keys.forEach(key -> res.put(key, load(key))); + + return res; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + // System.err.println("TEST | ExternalStorage: write, keyClass=" + entry.getValue().getClass().getSimpleName()); + Object val = entry.getValue(); + + if (val instanceof BinaryObjectImpl && !registerScheme.get()) { + synchronized (registerScheme) { + if (!registerScheme.get()) { + BinaryObjectImpl bo = (BinaryObjectImpl)val; + + binObjRef.set(bo); + + registerScheme.set(true); + } + } + } + + if (val instanceof BinaryObjectImpl) { + BinaryObjectImpl bo = (BinaryObjectImpl)val; + + DTO dto = new DTO(); + + dto.id = (Integer)entry.getKey(); + dto.strVal = bo.field("STRVAL"); + dto.decVal = bo.field("DECVAL"); + + val = dto; + } + + cache.put(entry.getKey(), val); + } + + @Override public void writeAll(Collection> entries) throws CacheWriterException { + entries.forEach(e -> cache.put(e.getKey(), e.getValue())); + } + + @Override public void delete(Object key) throws CacheWriterException { + cache.remove(key); + } + + @Override public void deleteAll(Collection keys) throws CacheWriterException { + Set keys0 = keys instanceof Set ? (Set)keys : new HashSet<>(keys); + + cache.removeAll(keys0); + } + + public static final class DTO { + int id; + String strVal; + BigDecimal decVal; + + @Override public String toString() { + return "DTO{" + + "id=" + id + + ", decVal=" + decVal + + ", strVal='" + strVal + '\'' + + '}'; + } + } + + public static final class IgniteCacheStoreFactory implements Factory> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new TestCacheStore(); + } + } +} diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py index 0dd0d39f8afbf..33e9ec08d1cc7 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py +++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py @@ -35,3 +35,5 @@ class CacheConfiguration(NamedTuple): statistics_enabled: bool = True affinity: Bean = None affinity_mapper: Bean = None + external_storage: bool = False + keep_binary: bool = False diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 index 616e4a6c01629..287f6b2e8fa56 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 @@ -44,27 +44,25 @@ {{ misc_utils.bean(cache.affinity_mapper) }} {% endif %} - - - - - - - - - - + {% if cache.keep_binary %} + + {% endif %} - - - - - + {% if cache.external_storage %} + + + + + + + {% endif %} + + {# #} {# #} diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index 4fc1f605fcb6c..502c6908e5f39 100644 --- a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -29,21 +29,23 @@ from ignitetest.services.utils.control_utility import ControlUtility from ignitetest.services.utils.ignite_aware import node_failed_event_pattern from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration, \ - IgniteThinJdbcConfiguration, TransactionConfiguration + IgniteThinJdbcConfiguration, TransactionConfiguration, DiscoverySpi, CommunicationSpi from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration from ignitetest.services.utils.ssl.client_connector_configuration import ClientConnectorConfiguration from ignitetest.utils import cluster from ignitetest.utils.bean import Bean from ignitetest.utils.ignite_test import IgniteTest +from ignitetest.services.utils.ignite_configuration.discovery import TcpDiscoverySpi, TcpDiscoveryVmIpFinder +from ignitetest.services.utils.ignite_configuration.communication import TcpCommunicationSpi from ignitetest.utils.version import LATEST_2_17, DEV_BRANCH # Run: clear; ./docker/clean_up.sh; rm -drf ../../../results/*; ./docker/run_tests.sh -t ./ignitetest/tests/mex class MexTest(IgniteTest): - FORCE_STOP = False + FORCE_STOP = True TRANSACTION = False - WAIT_AFTER_LOAD_SEC = 5 - PRELOAD_SECONDS = 40 + WAIT_AFTER_LOAD_SEC = 0 + PRELOAD_SECONDS = 45 LOAD_SECONDS = PRELOAD_SECONDS / 3 LOAD_THREADS = 12 SERVERS = 3 @@ -52,8 +54,11 @@ class MexTest(IgniteTest): CACHE_NAME = "TEST_CACHE" TABLE_NAME = "TEST_TABLE" - @cluster(num_nodes=SERVERS * 2) + @cluster(num_nodes=SERVERS * 2 + 1) def mex_test(self): + # Start the external storage + es = self.start_ext_storage() + # Start the servers. servers, control_utility, ignite_config = self.launch_cluster() @@ -71,11 +76,13 @@ def mex_test(self): self.logger.debug("TEST | Stopping the load application ...") app.stop() app.await_stopped() + self.logger.info("TEST | The load application has stopped.") if self.WAIT_AFTER_LOAD_SEC > 0: self.logger.info(f"TEST | waiting after load for {self.WAIT_AFTER_LOAD_SEC} seconds...") time.sleep(self.WAIT_AFTER_LOAD_SEC) + # Count and compare table size from dirrefent nodes. records_cnt = set() for node in self.alive_servers(servers.nodes): @@ -89,16 +96,15 @@ def mex_test(self): app.stop() app.await_stopped() + assert len(records_cnt) == 1; + # Run the idle verify. - self.logger.info("TEST | The load application has stopped.") output = control_utility.idle_verify(self.CACHE_NAME) self.logger.info(f"TEST | Idle verify finished: {output}") - # Compare the rows cnt results. - assert len(records_cnt) == 1; - # Finish the test. servers.stop() + es.stop() def kill_node(self, servers): failedNode = servers.nodes[self.SERVER_IDX_TO_DROP] @@ -181,9 +187,42 @@ def start_cnt_app(self, node, jdbcPort): return app + def start_ext_storage(self): + ignite_config = IgniteConfiguration( + version=self.IGNITE_VERSION, + metrics_log_frequency = 0, + caches=[CacheConfiguration( + name='EXT_STORAGE_CACHE', + keep_binary = True + )], + data_storage=DataStorageConfiguration( + default=DataRegionConfiguration( + persistence_enabled = True, + initial_size = 2048 * 1024 * 1024, + max_size = 4096 * 1024 * 1024, + ) + ), + cluster_state = 'ACTIVE', + discovery_spi = TcpDiscoverySpi(port=45000), + communication_spi = TcpCommunicationSpi(local_port=46000), + client_mode = False + ) + + ext_store, _ = start_servers(self.test_context, 1, ignite_config) + + ext_store.await_event("Topology snapshot \\[ver=1", 10, from_the_beginning=True, nodes=ext_store.nodes) + + control_utility = ControlUtility(ext_store) + control_utility.activate() + + return ext_store + def launch_cluster(self): cacheAffinity = Bean("org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction", partitions=512) + ip_finder = TcpDiscoveryVmIpFinder() + ip_finder.addresses = ["ducker03","ducker04","ducker05","ducker06","ducker07"] + ignite_config = IgniteConfiguration( version=self.IGNITE_VERSION, metrics_log_frequency = 0, @@ -191,7 +230,9 @@ def launch_cluster(self): name=self.CACHE_NAME, atomicity_mode='TRANSACTIONAL', affinity = cacheAffinity, - cache_mode = 'REPLICATED' + cache_mode = 'REPLICATED', + external_storage = True, + keep_binary = True )], data_storage=DataStorageConfiguration( default=DataRegionConfiguration( @@ -213,6 +254,10 @@ def launch_cluster(self): default_tx_timeout=300000, default_tx_isolation="READ_COMMITTED", tx_timeout_on_partition_map_exchange=120000), + discovery_spi = TcpDiscoverySpi( + ip_finder=ip_finder + ), + client_mode = False ) servers, start_servers_sec = start_servers(self.test_context, self.SERVERS, ignite_config) From 2f024408c5256d819a70ade80edda62f1796f0f1 Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 23 Jan 2026 01:00:14 +0300 Subject: [PATCH 25/26] minor params --- modules/ducktests/tests/ignitetest/tests/mex/mex_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index 502c6908e5f39..7b0324ed8fb09 100644 --- a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -45,9 +45,9 @@ class MexTest(IgniteTest): FORCE_STOP = True TRANSACTION = False WAIT_AFTER_LOAD_SEC = 0 - PRELOAD_SECONDS = 45 + PRELOAD_SECONDS = 50 LOAD_SECONDS = PRELOAD_SECONDS / 3 - LOAD_THREADS = 12 + LOAD_THREADS = 20 SERVERS = 3 SERVER_IDX_TO_DROP = 1 IGNITE_VERSION = DEV_BRANCH From d6528d2a1fcb3ec4c08a881130a37738f102091c Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 23 Jan 2026 10:34:08 +0300 Subject: [PATCH 26/26] use `onSessionEnd` --- .../ducktest/utils/TestCacheStore.java | 83 ++++++++++--------- .../tests/ignitetest/tests/mex/mex_test.py | 4 +- 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java index f70096a7b079c..09e71909800a3 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java @@ -3,9 +3,7 @@ import java.math.BigDecimal; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; @@ -15,7 +13,6 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; -import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.configuration.IgniteConfiguration; @@ -37,6 +34,8 @@ public class TestCacheStore implements CacheStore { private final AtomicReference binObjRef = new AtomicReference<>(); + private final ThreadLocal> curUpdates = new ThreadLocal<>(); + public TestCacheStore() { System.err.println("TEST | Starting ExtCacheStore"); @@ -56,13 +55,13 @@ public TestCacheStore() { ignite = Ignition.start(cfg); - System.err.println("TEST | Started ExtCacheStore"); +// System.err.println("TEST | Started ExtCacheStore"); cache = ignite.cache("EXT_STORAGE_CACHE"); assert cache != null; - System.err.println("TEST | cache: " + cache); +// System.err.println("TEST | cache: " + cache); } @Override public void loadCache(IgniteBiInClosure clo, @Nullable Object... args) throws CacheLoaderException { @@ -70,36 +69,46 @@ public TestCacheStore() { } @Override public void sessionEnd(boolean commit) throws CacheWriterException { - // No-op. + Map updateMap = curUpdates.get(); + + if (updateMap == null) + return; + + if (commit) + cache.putAll(updateMap); + + curUpdates.set(null); } @Override public Object load(Object key) throws CacheLoaderException { - System.err.println("TEST | ExternalStorage: load, key=" + key); - - DTO val = (DTO)cache.get(key); + // System.err.println("TEST | ExternalStorage: load, key=" + key); - System.err.println("TEST | ExternalStorage: load, value=" + val); + Object val = cache.get(key); if (val == null) return null; - BinaryObjectImpl bo = binObjRef.get(); + if (val instanceof DTO) { + DTO dto = (DTO)cache.get(key); -// BinaryObjectBuilder bob = BinaryObjectBuilders.builder(bo); + // System.err.println("TEST | ExternalStorage: load, value=" + val); - BinaryObjectBuilder bob = BinaryObjectBuilders.builder(bo.context(), - bo.context().metadata0(bo.typeId()).typeName()); + BinaryObjectImpl bo = binObjRef.get(); - assert bo != null; + assert bo != null; - bob.setField("STRVAL", val.strVal); - bob.setField("DECVAL", val.decVal); +// BinaryObjectBuilder bob = BinaryObjectBuilders.builder(bo); - BinaryObject newBo = bob.build(); + BinaryObjectBuilder bob = BinaryObjectBuilders.builder(bo.context(), + bo.context().metadata0(bo.typeId()).typeName()); - System.err.println("TEST | ExternalStorage: load, key=" + key + ", bo=" + bo + ", newBo=" + newBo); + bob.setField("STRVAL", dto.strVal); + bob.setField("DECVAL", dto.decVal); - return newBo; + val = bob.build(); + } + + return val; } @Override public Map loadAll(Iterable keys) throws CacheLoaderException { @@ -113,23 +122,20 @@ public TestCacheStore() { } @Override public void write(Cache.Entry entry) throws CacheWriterException { - // System.err.println("TEST | ExternalStorage: write, keyClass=" + entry.getValue().getClass().getSimpleName()); Object val = entry.getValue(); - if (val instanceof BinaryObjectImpl && !registerScheme.get()) { - synchronized (registerScheme) { - if (!registerScheme.get()) { - BinaryObjectImpl bo = (BinaryObjectImpl)val; + if (val instanceof BinaryObjectImpl) { + BinaryObjectImpl bo = (BinaryObjectImpl)val; - binObjRef.set(bo); + if (!registerScheme.get()) { + synchronized (registerScheme) { + if (!registerScheme.get()) { + binObjRef.set(bo); - registerScheme.set(true); + registerScheme.set(true); + } } } - } - - if (val instanceof BinaryObjectImpl) { - BinaryObjectImpl bo = (BinaryObjectImpl)val; DTO dto = new DTO(); @@ -140,21 +146,24 @@ public TestCacheStore() { val = dto; } - cache.put(entry.getKey(), val); + Map updateMap = curUpdates.get(); + + if (updateMap == null) + curUpdates.set(updateMap = new HashMap<>()); + + updateMap.put(entry.getKey(), val); } @Override public void writeAll(Collection> entries) throws CacheWriterException { - entries.forEach(e -> cache.put(e.getKey(), e.getValue())); + entries.forEach(e -> write(e)); } @Override public void delete(Object key) throws CacheWriterException { - cache.remove(key); + throw new UnsupportedOperationException(); } @Override public void deleteAll(Collection keys) throws CacheWriterException { - Set keys0 = keys instanceof Set ? (Set)keys : new HashSet<>(keys); - - cache.removeAll(keys0); + throw new UnsupportedOperationException(); } public static final class DTO { diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py index 7b0324ed8fb09..b3a071e8708f7 100644 --- a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -43,9 +43,9 @@ # Run: clear; ./docker/clean_up.sh; rm -drf ../../../results/*; ./docker/run_tests.sh -t ./ignitetest/tests/mex class MexTest(IgniteTest): FORCE_STOP = True - TRANSACTION = False + TRANSACTION = True WAIT_AFTER_LOAD_SEC = 0 - PRELOAD_SECONDS = 50 + PRELOAD_SECONDS = 25 LOAD_SECONDS = PRELOAD_SECONDS / 3 LOAD_THREADS = 20 SERVERS = 3