diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java index 35302031940a5..27d4ba0a919af 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDataSource.java @@ -59,7 +59,11 @@ public class IgniteJdbcThinDataSource implements DataSource, Serializable { if (!F.isEmpty(pwd)) props.put("password", pwd); - return IgniteJdbcThinDriver.register().connect(getUrl(), props); + String url = getUrl(); + + System.err.println("TEST | JDBC connection URL: " + url); + + return IgniteJdbcThinDriver.register().connect(url, props); } /** {@inheritDoc} */ @@ -194,6 +198,9 @@ public String getUrl() { public void setUrl(String url) throws SQLException { props = new ConnectionPropertiesImpl(); + // TODO: for test purposes + assert !props.isPartitionAwareness(); + props.setUrl(url); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java index a0f84fa74c8dc..226327685f13e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java @@ -21,6 +21,8 @@ import java.sql.DriverPropertyInfo; import java.sql.SQLException; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.StringTokenizer; import org.apache.ignite.IgniteCheckedException; @@ -316,6 +318,10 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa HostAndPortRange[] addrs = getAddresses(); + List ranges = Arrays.asList(addrs); + Collections.shuffle(ranges); + addrs = ranges.toArray(new HostAndPortRange[ranges.size()]); + for (int i = 0; i < addrs.length; i++) { if (i > 0) sbUrl.append(','); @@ -599,11 +605,17 @@ public class ConnectionPropertiesImpl implements ConnectionProperties, Serializa /** {@inheritDoc} */ @Override public boolean isPartitionAwareness() { + // TODO: for testing purposes. + assert !partitionAwareness.value(); + return partitionAwareness.value(); } /** {@inheritDoc} */ @Override public void setPartitionAwareness(boolean partitionAwareness) { + // TODO: for testing purposes. + assert !partitionAwareness; + this.partitionAwareness.setValue(partitionAwareness); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java similarity index 98% rename from modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java index 07a471278492c..568df78735500 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/MapCacheStoreStrategy.java @@ -110,6 +110,9 @@ public class MapCacheStoreStrategy implements TestCacheStoreStrategy { /** Serializable {@link #map} backed cache store factory */ public static class MapStoreFactory implements Factory> { + /** */ + private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ @Override public CacheStore create() { return new MapCacheStore(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java similarity index 100% rename from modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/TestCacheStoreStrategy.java diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexCntApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexCntApplication.java new file mode 100644 index 0000000000000..935498de3994f --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexCntApplication.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests.mex; + +import java.sql.Connection; +import java.sql.ResultSet; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** */ +public class MexCntApplication extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jsonNode) throws Exception { + final String tableName = jsonNode.get("tableName").asText(); + markInitialized(); + + recordResult("tableRowsCnt", printCount(tableName)); + + markFinished(); + } + + /** */ + protected int printCount(String tableName) throws Exception { + try (Connection conn = thinJdbcDataSource.getConnection()) { + try { + ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(1) FROM " + tableName); + + rs.next(); + + int cnt = rs.getInt(1); + + log.info("TEST | Table '" + tableName + "' contains " + cnt + " records."); + + return cnt; + } + catch (Exception t) { + log.error("Failed to get records count from table '" + tableName + "'. Error: " + t.getMessage(), t); + + markBroken(t); + + throw t; + } + } + } +} diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexLoadApplication.java new file mode 100644 index 0000000000000..b54f7d39fefd6 --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/mex/MexLoadApplication.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests.mex; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import com.fasterxml.jackson.databind.JsonNode; + +/** */ +public class MexLoadApplication extends MexCntApplication { + /** */ + private static final int WAIT_START_SECS = 20; + + /** {@inheritDoc} */ + @Override public void run(JsonNode jsonNode) throws Exception { + final int preloadDurSec = jsonNode.get("preloadDurSec").asInt(); + final int threads = jsonNode.get("threads").asInt(); + final String tableName = jsonNode.get("tableName").asText(); + final String cacheName = jsonNode.get("cacheName").asText(); + final boolean transaction = jsonNode.get("transaction").asBoolean(); + + createTable(tableName, cacheName); + + final ForkJoinPool executor = new ForkJoinPool(threads); + final CountDownLatch initLatch = new CountDownLatch(threads); + final AtomicLong counter = new AtomicLong(); + final AtomicBoolean preloaded = new AtomicBoolean(); + + log.info("TEST | Load pool parallelism=" + executor.getParallelism() + ", transaction=" + transaction); + + for (int i = 0; i < threads; ++i) { + executor.submit(() -> { + final Random rnd = new Random(); + boolean init = false; + + while (active()) { + try (Connection conn = thinJdbcDataSource.getConnection()) { + if (transaction) { + conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + conn.setAutoCommit(false); + } + + PreparedStatement ps = conn.prepareStatement("INSERT INTO " + tableName + " values(?,?,?)"); + + while (active()) { + for (int t = 0; t < (transaction ? 3 + rnd.nextInt(8) : 1); ++t) { + long id = counter.incrementAndGet(); + + ps.setLong(1, id); + + byte[] data = new byte[rnd.nextInt(2048)]; + rnd.nextBytes(data); + ps.setString(2, new String(data)); + + ps.setBigDecimal(3, BigDecimal.valueOf(rnd.nextDouble())); + + int res = ps.executeUpdate(); + + if (res != 1) + throw new IllegalStateException("Failed to insert a row. The result is not 1."); + } + + if (transaction) + conn.commit(); + + if (!init) { + init = true; + + initLatch.countDown(); + } + } + } + catch (Throwable th) { + if (!preloaded.get()) { + log.error("TEST | Failed to preload. Marking as broken.", th); + + markBroken(th); + + synchronized (this) { + notifyAll(); + } + } + else + log.info("TEST | Failed to load. Recreating connection. Err: " + th.getMessage()); + } + } + }); + } + + if (!active()) + return; + + if (!initLatch.await(WAIT_START_SECS, TimeUnit.SECONDS)) { + markBroken(new IllegalStateException("Failed to start loading.")); + + return; + } + + log.info("TEST | Started " + threads + " loading threads. Preloading for " + preloadDurSec + " seconds..."); + + synchronized (this) { + wait(preloadDurSec * 1000); + } + + preloaded.set(true); + + markInitialized(); + + while (active()) { + synchronized (this) { + wait(100); + } + } + + //printCount(tableName); + + markFinished(); + } + + /** */ + private void createTable(String tableName, String cacheName) throws Exception { + try (Connection conn = thinJdbcDataSource.getConnection()) { + conn.createStatement().execute("CREATE TABLE " + tableName + "(" + + "id INT, strVal VARCHAR, decVal DECIMAL, PRIMARY KEY(id)" + + ") WITH \"cache_name=" + cacheName + "\"" + ); + + try { + ResultSet rs = conn.prepareStatement("SELECT count(1) FROM " + tableName).executeQuery(); + + rs.next(); + + int cnt = rs.getInt(1); + + if (cnt == 0) + log.info("TEST | Created table '" + tableName + "' over cache '" + cacheName + "'."); + else + throw new IllegalStateException("Unexpected empty table rows number: " + cnt); + } + catch (Exception t) { + log.error("Failed to create table '" + tableName + "'.", t); + + markBroken(t); + + throw t; + } + } + } +} diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java new file mode 100644 index 0000000000000..09e71909800a3 --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/TestCacheStore.java @@ -0,0 +1,192 @@ +package org.apache.ignite.internal.ducktest.utils; + +import java.math.BigDecimal; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.BinaryObjectImpl; +import org.apache.ignite.internal.binary.builder.BinaryObjectBuilders; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.jetbrains.annotations.Nullable; + +public class TestCacheStore implements CacheStore { + private final Ignite ignite; + + private final IgniteCache cache; + + private final AtomicBoolean registerScheme = new AtomicBoolean(); + + private final AtomicReference binObjRef = new AtomicReference<>(); + + private final ThreadLocal> curUpdates = new ThreadLocal<>(); + + public TestCacheStore() { + System.err.println("TEST | Starting ExtCacheStore"); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder() + .setAddresses(F.asList("ducker02:45000")); + + cfg.setDiscoverySpi( + new TcpDiscoverySpi() + .setIpFinder(ipFinder) + .setLocalPort(45000) + ).setCommunicationSpi( + new TcpCommunicationSpi().setLocalPort(46000) + ).setClientMode(true) + .setPeerClassLoadingEnabled(true); + + ignite = Ignition.start(cfg); + +// System.err.println("TEST | Started ExtCacheStore"); + + cache = ignite.cache("EXT_STORAGE_CACHE"); + + assert cache != null; + +// System.err.println("TEST | cache: " + cache); + } + + @Override public void loadCache(IgniteBiInClosure clo, @Nullable Object... args) throws CacheLoaderException { + throw new UnsupportedOperationException(); + } + + @Override public void sessionEnd(boolean commit) throws CacheWriterException { + Map updateMap = curUpdates.get(); + + if (updateMap == null) + return; + + if (commit) + cache.putAll(updateMap); + + curUpdates.set(null); + } + + @Override public Object load(Object key) throws CacheLoaderException { + // System.err.println("TEST | ExternalStorage: load, key=" + key); + + Object val = cache.get(key); + + if (val == null) + return null; + + if (val instanceof DTO) { + DTO dto = (DTO)cache.get(key); + + // System.err.println("TEST | ExternalStorage: load, value=" + val); + + BinaryObjectImpl bo = binObjRef.get(); + + assert bo != null; + +// BinaryObjectBuilder bob = BinaryObjectBuilders.builder(bo); + + BinaryObjectBuilder bob = BinaryObjectBuilders.builder(bo.context(), + bo.context().metadata0(bo.typeId()).typeName()); + + bob.setField("STRVAL", dto.strVal); + bob.setField("DECVAL", dto.decVal); + + val = bob.build(); + } + + return val; + } + + @Override public Map loadAll(Iterable keys) throws CacheLoaderException { + System.err.println("TEST | ExternalStorage: loadAll"); + + Map res = new HashMap<>(); + + keys.forEach(key -> res.put(key, load(key))); + + return res; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + Object val = entry.getValue(); + + if (val instanceof BinaryObjectImpl) { + BinaryObjectImpl bo = (BinaryObjectImpl)val; + + if (!registerScheme.get()) { + synchronized (registerScheme) { + if (!registerScheme.get()) { + binObjRef.set(bo); + + registerScheme.set(true); + } + } + } + + DTO dto = new DTO(); + + dto.id = (Integer)entry.getKey(); + dto.strVal = bo.field("STRVAL"); + dto.decVal = bo.field("DECVAL"); + + val = dto; + } + + Map updateMap = curUpdates.get(); + + if (updateMap == null) + curUpdates.set(updateMap = new HashMap<>()); + + updateMap.put(entry.getKey(), val); + } + + @Override public void writeAll(Collection> entries) throws CacheWriterException { + entries.forEach(e -> write(e)); + } + + @Override public void delete(Object key) throws CacheWriterException { + throw new UnsupportedOperationException(); + } + + @Override public void deleteAll(Collection keys) throws CacheWriterException { + throw new UnsupportedOperationException(); + } + + public static final class DTO { + int id; + String strVal; + BigDecimal decVal; + + @Override public String toString() { + return "DTO{" + + "id=" + id + + ", decVal=" + decVal + + ", strVal='" + strVal + '\'' + + '}'; + } + } + + public static final class IgniteCacheStoreFactory implements Factory> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new TestCacheStore(); + } + } +} diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py index 0dd0d39f8afbf..33e9ec08d1cc7 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py +++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/cache.py @@ -35,3 +35,5 @@ class CacheConfiguration(NamedTuple): statistics_enabled: bool = True affinity: Bean = None affinity_mapper: Bean = None + external_storage: bool = False + keep_binary: bool = False diff --git a/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py b/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py index b67c0a471bc80..d8271df7ed13c 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py +++ b/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py @@ -19,7 +19,7 @@ from ignitetest.services.utils.decorators import memoize -DEFAULT_HEAP = "768M" +DEFAULT_HEAP = "4G" JVM_PARAMS_GC_G1 = "-XX:+UseG1GC -XX:MaxGCPauseMillis=100 " \ "-XX:ConcGCThreads=$(((`nproc`/3)>1?(`nproc`/3):1)) " \ diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 index 5eba40344fb3a..287f6b2e8fa56 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/cache_macro.j2 @@ -24,9 +24,14 @@ {% for cache in caches %} + + {% if cache.cache_mode == 'PARTITIONED' %} {% endif %} + {% if cache.cache_mode == 'REPLICATED' %} + + {% endif %} {% if cache.affinity %} @@ -39,6 +44,42 @@ {{ misc_utils.bean(cache.affinity_mapper) }} {% endif %} + + {% if cache.keep_binary %} + + {% endif %} + + + + + + {% if cache.external_storage %} + + + + + + + {% endif %} + + + +{# #} +{# #} +{# #} +{# #} +{# #} +{# #} +{# #} +{# #} +{# #} +{# #} + + {% endfor %} diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/client_configuration_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/client_configuration_macro.j2 index 1b7630caeba1e..46084007654ac 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/client_configuration_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/client_configuration_macro.j2 @@ -40,8 +40,9 @@ {% endif %} - {% if config.partition_awareness_enabled != None %} - - {% endif %} + +{# {% if config.partition_awareness_enabled != None %}#} +{# #} +{# {% endif %}#} {% endmacro %} diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 index fd1cbef5b60ec..48945c50188ba 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2 @@ -34,6 +34,24 @@ + + + + + + + + + + + + + + + + + + @@ -136,11 +154,7 @@ {% endif %} - {% if config.sql_configuration %} - - {{ misc_utils.bean(config.sql_configuration) }} - - {% endif %} + {{ misc_utils.plugins(config) }} diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 index a97631cfa03b1..708011dc07e15 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 +++ b/modules/ducktests/tests/ignitetest/services/utils/templates/log4j2.xml.j2 @@ -67,6 +67,9 @@ +{# #} +{# #} + diff --git a/modules/ducktests/tests/ignitetest/tests/mex/__init__.py b/modules/ducktests/tests/ignitetest/tests/mex/__init__.py new file mode 100644 index 0000000000000..dda120fcacc42 --- /dev/null +++ b/modules/ducktests/tests/ignitetest/tests/mex/__init__.py @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import hashlib +import os + +from ducktape.tests.test import TestContext + + +def decorate_args(args, with_args=False): + """ + Decorate args with sha1 hash. + """ + prefix = '' + if args: + sha_1 = hashlib.sha1() + sha_1.update(args.encode('utf-8')) + + digest = sha_1.hexdigest()[0:6] + prefix = digest + '@' + + return prefix + args if with_args else prefix + + +def patched_test_name(self): + """ + Monkey patched test_name property function. + """ + name_components = [self.module_name, + self.cls_name, + self.function_name, + self.injected_args_name] + + name = ".".join(filter(lambda x: x is not None and len(x) > 0, name_components)) + return decorate_args(self.injected_args_name) + name + + +def patched_results_dir(test_context, test_index): + """ + Monkey patch results_dir. + """ + results_dir = test_context.session_context.results_dir + + if test_context.cls is not None: + results_dir = os.path.join(results_dir, test_context.cls.__name__) + if test_context.function is not None: + results_dir = os.path.join(results_dir, test_context.function.__name__) + if test_context.injected_args is not None: + results_dir = os.path.join(results_dir, decorate_args(test_context.injected_args_name, True)) + if test_index is not None: + results_dir = os.path.join(results_dir, str(test_index)) + + return results_dir + + +TestContext.test_name = property(patched_test_name) +TestContext.results_dir = staticmethod(patched_results_dir) diff --git a/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py new file mode 100644 index 0000000000000..b3a071e8708f7 --- /dev/null +++ b/modules/ducktests/tests/ignitetest/tests/mex/mex_test.py @@ -0,0 +1,282 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module contains Flex tests. +""" + +import os +import random +import time +from ducktape.mark import matrix +from enum import IntEnum +from typing import NamedTuple + +from ignitetest.services.ignite import IgniteService +from ignitetest.services.ignite_app import IgniteApplicationService +from ignitetest.services.utils.control_utility import ControlUtility +from ignitetest.services.utils.ignite_aware import node_failed_event_pattern +from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration, \ + IgniteThinJdbcConfiguration, TransactionConfiguration, DiscoverySpi, CommunicationSpi +from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration +from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration +from ignitetest.services.utils.ssl.client_connector_configuration import ClientConnectorConfiguration +from ignitetest.utils import cluster +from ignitetest.utils.bean import Bean +from ignitetest.utils.ignite_test import IgniteTest +from ignitetest.services.utils.ignite_configuration.discovery import TcpDiscoverySpi, TcpDiscoveryVmIpFinder +from ignitetest.services.utils.ignite_configuration.communication import TcpCommunicationSpi +from ignitetest.utils.version import LATEST_2_17, DEV_BRANCH + +# Run: clear; ./docker/clean_up.sh; rm -drf ../../../results/*; ./docker/run_tests.sh -t ./ignitetest/tests/mex +class MexTest(IgniteTest): + FORCE_STOP = True + TRANSACTION = True + WAIT_AFTER_LOAD_SEC = 0 + PRELOAD_SECONDS = 25 + LOAD_SECONDS = PRELOAD_SECONDS / 3 + LOAD_THREADS = 20 + SERVERS = 3 + SERVER_IDX_TO_DROP = 1 + IGNITE_VERSION = DEV_BRANCH + CACHE_NAME = "TEST_CACHE" + TABLE_NAME = "TEST_TABLE" + + @cluster(num_nodes=SERVERS * 2 + 1) + def mex_test(self): + # Start the external storage + es = self.start_ext_storage() + + # Start the servers. + servers, control_utility, ignite_config = self.launch_cluster() + + # Start the loading app. and wait for some records preloaded. + app = self.start_load_app(servers, ignite_config.client_connector_configuration.port) + + # The loading is on. Now, kill a server node. + self.kill_node(servers) + + # Keep the loading a bit longer. + self.logger.info(f"TEST | Loading the cluster for additional {self.LOAD_SECONDS} seconds...") + time.sleep(self.LOAD_SECONDS) + + # Stop the loading. + self.logger.debug("TEST | Stopping the load application ...") + app.stop() + app.await_stopped() + self.logger.info("TEST | The load application has stopped.") + + if self.WAIT_AFTER_LOAD_SEC > 0: + self.logger.info(f"TEST | waiting after load for {self.WAIT_AFTER_LOAD_SEC} seconds...") + time.sleep(self.WAIT_AFTER_LOAD_SEC) + + # Count and compare table size from dirrefent nodes. + records_cnt = set() + + for node in self.alive_servers(servers.nodes): + app = self.start_cnt_app(node, ignite_config.client_connector_configuration.port) + app.await_started() + + cnt = app.extract_result("tableRowsCnt") + records_cnt.add(cnt) + self.logger.info(f"TEST | Partitions cnt on node {node.account.hostname}: {cnt}") + + app.stop() + app.await_stopped() + + assert len(records_cnt) == 1; + + # Run the idle verify. + output = control_utility.idle_verify(self.CACHE_NAME) + self.logger.info(f"TEST | Idle verify finished: {output}") + + # Finish the test. + servers.stop() + es.stop() + + def kill_node(self, servers): + failedNode = servers.nodes[self.SERVER_IDX_TO_DROP] + failedNodeId = servers.node_id(failedNode) + alive_servers = self.alive_servers(servers.nodes) + + if self.FORCE_STOP: + self.logger.info(f"TEST | 'kill -9' node {self.SERVER_IDX_TO_DROP} with id {failedNodeId}...") + + servers.stop_node(failedNode, force_stop=True) + + self.logger.debug("TEST | Awaiting for the node-failed-event...") + servers.await_event(node_failed_event_pattern(failedNodeId), 30, from_the_beginning=True, nodes=alive_servers) + + self.logger.info("TEST | The cluster has detected the node failure.") + else: + self.logger.info(f"TEST | gracefully stopping node {self.SERVER_IDX_TO_DROP} with id {failedNodeId}...") + servers.stop_node(failedNode, force_stop=False) + + self.logger.debug("TEST | Awaiting for the new cluster state...") + servers.await_event(f"servers={self.SERVERS - 1}, clients=0, state=ACTIVE, CPUs=", 30, from_the_beginning=True, + nodes=alive_servers) + + def alive_servers(self, nodes): + return [item for index, item in enumerate(nodes) if index != self.SERVER_IDX_TO_DROP] + + def alive_servers_addrs(self, nodes, jdbcPort): + nodes = self.alive_servers(nodes) + + jdbcPort = str(jdbcPort) + + return [n.account.hostname + ":" + jdbcPort for n in nodes] + + def start_load_app(self, servers, jdbcPort): + addrs = self.alive_servers_addrs(servers.nodes, jdbcPort) + + app = IgniteApplicationService( + self.test_context, + IgniteThinJdbcConfiguration(version=self.IGNITE_VERSION, addresses=addrs), + java_class_name="org.apache.ignite.internal.ducktest.tests.mex.MexLoadApplication", + num_nodes=1, + params={"preloadDurSec": self.PRELOAD_SECONDS, "threads": self.LOAD_THREADS, "cacheName": self.CACHE_NAME, + "tableName" : self.TABLE_NAME, "transaction" : self.TRANSACTION}, + startup_timeout_sec=self.PRELOAD_SECONDS + 10, + jvm_opts="-Xms4G -Xmx4G" + ) + + self.logger.debug("TEST | Starting the loading application...") + app.start() + + self.logger.debug("TEST | Waiting for the load application initialization...") + app.await_started() + + self.logger.info("TEST | The load application has initialized.") + + return app + + def start_cnt_app(self, node, jdbcPort): + jdbcPort = str(jdbcPort) + + addrs = [node.account.hostname + ":" + jdbcPort] + + app = IgniteApplicationService( + self.test_context, + IgniteThinJdbcConfiguration(version=self.IGNITE_VERSION, addresses=addrs), + java_class_name="org.apache.ignite.internal.ducktest.tests.mex.MexCntApplication", + num_nodes=1, + params={"tableName" : self.TABLE_NAME}, + startup_timeout_sec=10, + jvm_opts="-Xms4G -Xmx4G" + ) + + self.logger.debug(f"TEST | Starting the counter application to server node {node}...") + app.start() + + self.logger.debug(f"TEST | Waiting for the counter application initialization to server node {node}...") + app.await_started() + + self.logger.info(f"TEST | The counter application has initialized to server node {node}.") + + return app + + def start_ext_storage(self): + ignite_config = IgniteConfiguration( + version=self.IGNITE_VERSION, + metrics_log_frequency = 0, + caches=[CacheConfiguration( + name='EXT_STORAGE_CACHE', + keep_binary = True + )], + data_storage=DataStorageConfiguration( + default=DataRegionConfiguration( + persistence_enabled = True, + initial_size = 2048 * 1024 * 1024, + max_size = 4096 * 1024 * 1024, + ) + ), + cluster_state = 'ACTIVE', + discovery_spi = TcpDiscoverySpi(port=45000), + communication_spi = TcpCommunicationSpi(local_port=46000), + client_mode = False + ) + + ext_store, _ = start_servers(self.test_context, 1, ignite_config) + + ext_store.await_event("Topology snapshot \\[ver=1", 10, from_the_beginning=True, nodes=ext_store.nodes) + + control_utility = ControlUtility(ext_store) + control_utility.activate() + + return ext_store + + def launch_cluster(self): + cacheAffinity = Bean("org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction", partitions=512) + + ip_finder = TcpDiscoveryVmIpFinder() + ip_finder.addresses = ["ducker03","ducker04","ducker05","ducker06","ducker07"] + + ignite_config = IgniteConfiguration( + version=self.IGNITE_VERSION, + metrics_log_frequency = 0, + caches=[CacheConfiguration( + name=self.CACHE_NAME, + atomicity_mode='TRANSACTIONAL', + affinity = cacheAffinity, + cache_mode = 'REPLICATED', + external_storage = True, + keep_binary = True + )], + data_storage=DataStorageConfiguration( + default=DataRegionConfiguration( + persistence_enabled = False, + + # initial_size = 128 * 1024 * 1024, + # max_size = 256 * 1024 * 1024, + + initial_size = 4096 * 1024 * 1024, + max_size = 4096 * 1024 * 1024, + + # initial_size = 2147483648, + # max_size = 17179869184, + ) + ), + cluster_state = 'ACTIVE', + client_connector_configuration=ClientConnectorConfiguration(), + transaction_configuration=TransactionConfiguration( + default_tx_timeout=300000, + default_tx_isolation="READ_COMMITTED", + tx_timeout_on_partition_map_exchange=120000), + discovery_spi = TcpDiscoverySpi( + ip_finder=ip_finder + ), + client_mode = False + ) + + servers, start_servers_sec = start_servers(self.test_context, self.SERVERS, ignite_config) + + servers.await_event(f"Topology snapshot \\[ver={self.SERVERS}", 15, from_the_beginning=True, nodes=servers.nodes) + + control_utility = ControlUtility(servers) + control_utility.activate() + + return servers, control_utility, ignite_config + +def start_servers(test_context, num_nodes, ignite_config, modules=None): + """ + Start ignite servers. + """ + servers = IgniteService(test_context, config=ignite_config, num_nodes=num_nodes, modules=modules, + # mute spam in log. + jvm_opts=["-DIGNITE_DUMP_THREADS_ON_FAILURE=false"]) + + start = time.monotonic() + servers.start_async() + return servers, round(time.monotonic() - start, 1) \ No newline at end of file