From 20e706c06fef93803ed4e621516ca5dd00bd6e1b Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Thu, 15 Jan 2026 18:04:06 +0300 Subject: [PATCH 1/2] Chery-pick TcpDiscoveryAbstractTraceableMessage fix --- .../discovery/DiscoveryMessageFactory.java | 2 ++ .../TcpDiscoveryAbstractTraceableMessage.java | 25 +++++++++++++++++-- .../messages/TcpDiscoveryNodeLeftMessage.java | 14 ++++++++++- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index a31dbf77d135e..f1821295dd0a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryLoopbackProblemMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryMetricsUpdateMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeFullMetricsMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryNodeLeftMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryNodeMetricsMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; @@ -59,6 +60,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFullMetricsMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeMetricsMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java index 433566d7e3021..c76fc8694d9b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractTraceableMessage.java @@ -17,10 +17,12 @@ package org.apache.ignite.spi.discovery.tcp.messages; -import java.io.Externalizable; import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.apache.ignite.internal.processors.tracing.messages.TraceableMessage; +import org.jetbrains.annotations.Nullable; /** * Abstract traceable message for TCP discovery. @@ -29,8 +31,12 @@ public abstract class TcpDiscoveryAbstractTraceableMessage extends TcpDiscoveryA /** Container. */ private SpanContainer spanContainer = new SpanContainer(); + /** Serialization holder of {@link #spanContainer}'s bytes. */ + @Order(value = 5, method = "spanBytes") + private @Nullable byte[] spanBytesHolder; + /** - * Default no-arg constructor for {@link Externalizable} interface. + * Default constructor for {@link DiscoveryMessageFactory}. */ protected TcpDiscoveryAbstractTraceableMessage() { // No-op. @@ -67,6 +73,21 @@ public Object readResolve() { return this; } + /** @return {@link #spanContainer}'s bytes. */ + public @Nullable byte[] spanBytes() { + return spanContainer == null ? null : spanContainer.serializedSpanBytes(); + } + + /** @param spanBytes {@link #spanContainer}'s bytes. */ + public void spanBytes(@Nullable byte[] spanBytes) { + if (spanBytes == null) + return; + + readResolve(); + + spanContainer.serializedSpanBytes(spanBytes); + } + /** {@inheritDoc} */ @Override public SpanContainer spanContainer() { return spanContainer; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index 5d6df69b715b7..543a06fe4dc7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -18,7 +18,9 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; /** * Sent by node that is stopping to coordinator across the ring, @@ -26,10 +28,15 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryNodeLeftMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryNodeLeftMessage() { + // No-op. + } + /** * Constructor. * @@ -39,6 +46,11 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { super(creatorNodeId); } + /** {@inheritDoc} */ + @Override public short directType() { + return 14; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryNodeLeftMessage.class, this, "super", super.toString()); From 6ee242b923a606d3488b4896d8351bb27908dd92 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Thu, 22 Jan 2026 11:37:21 +0300 Subject: [PATCH 2/2] IGNITE-27626 Use MessageSerializer for TcpDiscoveryCustomEventMessage --- .../discovery/CustomMessageWrapper.java | 68 ---------- .../discovery/DiscoveryMessageFactory.java | 13 ++ .../discovery/GridDiscoveryManager.java | 12 +- .../SecurityAwareCustomMessageWrapper.java | 46 ++++++- .../CacheStatisticsModeChangeMessage.java | 97 +++++++++++--- .../processors/cache/GridCacheProcessor.java | 3 +- .../reader/StandaloneNoopDiscoverySpi.java | 4 +- .../spi/discovery/DiscoveryNotification.java | 7 +- .../ignite/spi/discovery/DiscoverySpi.java | 3 +- .../discovery/DiscoverySpiCustomMessage.java | 37 ++---- .../isolated/IsolatedDiscoverySpi.java | 6 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 28 ++-- .../ignite/spi/discovery/tcp/ServerImpl.java | 46 ++++--- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 4 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 4 +- .../TcpDiscoveryCustomEventMessage.java | 124 ++++++++++++++---- .../messages/TcpDiscoveryNodeLeftMessage.java | 2 +- ...DiscoveryServerOnlyCustomEventMessage.java | 20 ++- .../resources/META-INF/classnames.properties | 1 - .../internal/DiscoverySpiTestListener.java | 11 +- .../binary/BinaryMarshallerSelfTest.java | 6 +- .../client/thin/ServiceAwarenessTest.java | 14 +- .../IgniteDiscoverySpiInternalListener.java | 3 +- .../ClientSlowDiscoveryAbstractTest.java | 14 +- ...eMarshallerCacheClassNameConflictTest.java | 3 +- ...shallerCacheClientRequestsMappingTest.java | 68 +++++----- .../IgniteMarshallerCacheFSRestoreTest.java | 3 +- .../binary/BinaryMetadataRemoveTest.java | 9 +- ...ridBinaryCacheEntryMemorySizeSelfTest.java | 4 +- ...IgniteSequentialNodeCrashRecoveryTest.java | 8 +- .../snapshot/AbstractSnapshotSelfTest.java | 24 ++-- .../IncrementalSnapshotJoiningClientTest.java | 12 +- .../IgniteNoCustomEventsOnNodeStart.java | 8 +- .../schema/IndexWithSameNameTestBase.java | 9 +- .../NodeSecurityContextPropagationTest.java | 4 +- .../ServiceConcurrentUndeployTest.java | 13 +- .../ServiceDeploymentProcessAbstractTest.java | 8 +- .../DiscoverySpiDataExchangeTest.java | 3 +- .../discovery/tcp/BlockTcpDiscoverySpi.java | 16 +-- ...cpDiscoveryPendingMessageDeliveryTest.java | 3 +- .../discovery/tcp/TcpDiscoverySelfTest.java | 7 +- .../discovery/tcp/TestTcpDiscoverySpi.java | 4 +- .../ignite/testframework/GridTestUtils.java | 35 +++-- .../junits/GridAbstractTest.java | 4 +- .../junits/GridTestBinaryMarshaller.java | 4 +- .../cache/BinaryTypeRegistrationTest.java | 13 +- .../CacheRegisterMetadataLocallyTest.java | 13 +- .../processors/query/KillQueryTest.java | 36 +++-- .../processors/query/RunningQueriesTest.java | 46 +++---- .../discovery/zk/ZookeeperDiscoverySpi.java | 4 +- ...ommunicationErrorResolveFinishMessage.java | 2 +- ...CommunicationErrorResolveStartMessage.java | 2 +- .../internal/ZkDiscoveryCustomEventData.java | 3 +- .../zk/internal/ZkForceNodeFailMessage.java | 2 +- .../zk/internal/ZkNoServersMessage.java | 2 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 15 ++- ...ZookeeperDiscoverySpiTestConfigurator.java | 4 +- 57 files changed, 527 insertions(+), 437 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java deleted file mode 100644 index 29aa940ecf24a..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.managers.discovery; - -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -public class CustomMessageWrapper implements DiscoverySpiCustomMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final DiscoveryCustomMessage delegate; - - /** - * @param delegate Delegate. - */ - public CustomMessageWrapper(DiscoveryCustomMessage delegate) { - this.delegate = delegate; - } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoverySpiCustomMessage ackMessage() { - DiscoveryCustomMessage res = delegate.ackMessage(); - - return res == null ? null : new CustomMessageWrapper(res); - } - - /** {@inheritDoc} */ - @Override public boolean isMutable() { - return delegate.isMutable(); - } - - /** {@inheritDoc} */ - @Override public boolean stopProcess() { - return delegate.stopProcess(); - } - - /** - * @return Delegate. - */ - public DiscoveryCustomMessage delegate() { - return delegate; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return delegate.toString(); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index f1821295dd0a1..846ebbc88a5a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.managers.discovery; +import org.apache.ignite.internal.codegen.CacheStatisticsModeChangeMessageSerializer; import org.apache.ignite.internal.codegen.InetAddressMessageSerializer; import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer; @@ -28,6 +29,7 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryConnectionCheckMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryCustomEventMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryDiscardMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryDuplicateIdMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryHandshakeRequestSerializer; @@ -40,6 +42,8 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; +import org.apache.ignite.internal.codegen.TcpDiscoveryServerOnlyCustomEventMessageSerializer; +import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; @@ -53,6 +57,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; @@ -65,6 +70,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessage; /** Message factory for discovery messages. */ public class DiscoveryMessageFactory implements MessageFactoryProvider { @@ -94,5 +100,12 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)13, TcpDiscoveryClientMetricsUpdateMessage::new, new TcpDiscoveryClientMetricsUpdateMessageSerializer()); factory.register((short)14, TcpDiscoveryMetricsUpdateMessage::new, new TcpDiscoveryMetricsUpdateMessageSerializer()); factory.register((short)15, TcpDiscoveryClientAckResponse::new, new TcpDiscoveryClientAckResponseSerializer()); + factory.register((short)16, TcpDiscoveryNodeLeftMessage::new, new TcpDiscoveryNodeLeftMessageSerializer()); + + //Custom messages + factory.register((short)500, TcpDiscoveryCustomEventMessage::new, new TcpDiscoveryCustomEventMessageSerializer()); + factory.register((short)501, TcpDiscoveryServerOnlyCustomEventMessage::new, + new TcpDiscoveryServerOnlyCustomEventMessageSerializer()); + factory.register((short)502, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 7334cfc16c9a9..688f470ca0442 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -131,7 +131,6 @@ import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; import org.apache.ignite.spi.discovery.DiscoveryNotification; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; import org.apache.ignite.spi.discovery.DiscoverySpiListener; @@ -592,8 +591,11 @@ private void onDiscovery0(DiscoveryNotification notification) { ClusterNode node = notification.getNode(); long topVer = notification.getTopVer(); - DiscoveryCustomMessage customMsg = notification.getCustomMsgData() == null ? null - : ((CustomMessageWrapper)notification.getCustomMsgData()).delegate(); + DiscoveryCustomMessage customMsg0 = notification.getCustomMsgData() == null ? null : + notification.getCustomMsgData(); + + DiscoveryCustomMessage customMsg = customMsg0 instanceof SecurityAwareCustomMessageWrapper ? + ((SecurityAwareCustomMessageWrapper)customMsg0).delegate() : customMsg0; if (skipMessage(notification.type(), customMsg)) return; @@ -932,7 +934,7 @@ public SecurityAwareNotificationTask(DiscoveryNotification notification) { /** */ @Override public void run() { - DiscoverySpiCustomMessage customMsg = notification.getCustomMsgData(); + DiscoveryCustomMessage customMsg = notification.getCustomMsgData(); if (customMsg instanceof SecurityAwareCustomMessageWrapper) { UUID secSubjId = ((SecurityAwareCustomMessageWrapper)customMsg).securitySubjectId(); @@ -2301,7 +2303,7 @@ public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteCheckedExce getSpi().sendCustomEvent(security.enabled() ? new SecurityAwareCustomMessageWrapper(msg, security.securityContext().subject().id()) - : new CustomMessageWrapper(msg)); + : msg); } catch (IgniteClientDisconnectedException e) { IgniteFuture reconnectFut = ctx.cluster().clientReconnectFuture(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java index 49ecd2e518e34..714c93ae3d48d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java @@ -18,21 +18,31 @@ package org.apache.ignite.internal.managers.discovery; import java.util.UUID; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.jetbrains.annotations.Nullable; -/** Extends {@link CustomMessageWrapper} with ID of security subject that initiated the current message. */ -public class SecurityAwareCustomMessageWrapper extends CustomMessageWrapper { +/** Custom message wrapper with ID of security subject that initiated the current message. */ +public class SecurityAwareCustomMessageWrapper extends DiscoverySpiCustomMessage implements Message { /** */ private static final long serialVersionUID = 0L; /** Security subject ID. */ - private final UUID secSubjId; + private UUID secSubjId; + + /** Original message. */ + private DiscoveryCustomMessage delegate; + + /** + * Default constructor. + */ + public SecurityAwareCustomMessageWrapper() { + // No-op. + } /** */ public SecurityAwareCustomMessageWrapper(DiscoveryCustomMessage delegate, UUID secSubjId) { - super(delegate); - + this.delegate = delegate; this.secSubjId = secSubjId; } @@ -42,9 +52,31 @@ public UUID securitySubjectId() { } /** {@inheritDoc} */ - @Override public @Nullable DiscoverySpiCustomMessage ackMessage() { - DiscoveryCustomMessage ack = delegate().ackMessage(); + @Override public boolean isMutable() { + return delegate.isMutable(); + } + + /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return delegate.stopProcess(); + } + + /** + * @return Delegate. + */ + public DiscoveryCustomMessage delegate() { + return delegate; + } + + /** {@inheritDoc} */ + @Override public @Nullable DiscoveryCustomMessage ackMessage() { + DiscoveryCustomMessage ack = delegate.ackMessage(); return ack == null ? null : new SecurityAwareCustomMessageWrapper(ack, secSubjId); } + + /** {@inheritDoc} */ + @Override public short directType() { + return 500; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java index 40bcfaf12de16..55e687d7096c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsModeChangeMessage.java @@ -20,18 +20,20 @@ import java.util.Collection; import java.util.Collections; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * Cache statistics mode change discovery message. */ -public class CacheStatisticsModeChangeMessage implements DiscoveryCustomMessage { +public class CacheStatisticsModeChangeMessage implements DiscoveryCustomMessage, Message { /** */ private static final long serialVersionUID = 0L; @@ -42,23 +44,35 @@ public class CacheStatisticsModeChangeMessage implements DiscoveryCustomMessage private static final byte ENABLED_MASK = 0x02; /** Custom message ID. */ - private final IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + private IgniteUuid id; /** Request id. */ - private final UUID reqId; + @Order(value = 1, method = "requestId") + private UUID reqId; /** Cache names. */ - private final Collection caches; + @Order(2) + private Collection caches; /** Flags. */ - private final byte flags; + @Order(3) + private byte flags; + + /** + * Default constructor. + */ + public CacheStatisticsModeChangeMessage() { + // No-op. + } /** * Constructor for response. * * @param req Request message. */ - private CacheStatisticsModeChangeMessage(CacheStatisticsModeChangeMessage req) { + private CacheStatisticsModeChangeMessage(IgniteUuid id, CacheStatisticsModeChangeMessage req) { + this.id = id; reqId = req.reqId; caches = null; @@ -73,7 +87,8 @@ private CacheStatisticsModeChangeMessage(CacheStatisticsModeChangeMessage req) { * * @param caches Collection of cache names. */ - public CacheStatisticsModeChangeMessage(UUID reqId, Collection caches, boolean enabled) { + public CacheStatisticsModeChangeMessage(IgniteUuid id, UUID reqId, Collection caches, boolean enabled) { + this.id = id; this.reqId = reqId; this.caches = Collections.unmodifiableCollection(caches); @@ -85,14 +100,9 @@ public CacheStatisticsModeChangeMessage(UUID reqId, Collection caches, b this.flags = flags; } - /** {@inheritDoc} */ - @Override public IgniteUuid id() { - return id; - } - /** {@inheritDoc} */ @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return initial() ? new CacheStatisticsModeChangeMessage(this) : null; + return initial() ? new CacheStatisticsModeChangeMessage(IgniteUuid.randomUuid(), this) : null; } /** {@inheritDoc} */ @@ -106,11 +116,58 @@ public CacheStatisticsModeChangeMessage(UUID reqId, Collection caches, b throw new UnsupportedOperationException(); } + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** + * @param id Custom message ID. + */ + public void id(IgniteUuid id) { + this.id = id; + } + + /** + * @return Request id. + */ + public UUID requestId() { + return reqId; + } + + /** + * @param reqId Request id. + */ + public void requestId(UUID reqId) { + this.reqId = reqId; + } + /** * @return Cache names. */ public Collection caches() { - return Collections.unmodifiableCollection(caches); + return caches; + } + + /** + * @param caches Cache names. + */ + public void caches(Collection caches) { + this.caches = caches; + } + + /** + * @return Flags. + */ + public byte flags() { + return flags; + } + + /** + * @param flags Flags. + */ + public void flags(byte flags) { + this.flags = flags; } /** @@ -127,15 +184,13 @@ public boolean enabled() { return (flags & ENABLED_MASK) != 0; } - /** - * @return Request id. - */ - public UUID requestId() { - return reqId; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheStatisticsModeChangeMessage.class, this); } + + /** {@inheritDoc} */ + @Override public short directType() { + return 502; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index d220c6b620748..0e4242e8bc60b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -5221,7 +5221,8 @@ public void enableStatistics(Collection cacheNames, boolean enabled) thr if (globalCaches.isEmpty()) return; - CacheStatisticsModeChangeMessage msg = new CacheStatisticsModeChangeMessage(UUID.randomUUID(), globalCaches, enabled); + CacheStatisticsModeChangeMessage msg = new CacheStatisticsModeChangeMessage(IgniteUuid.randomUuid(), + UUID.randomUUID(), globalCaches, enabled); EnableStatisticsFuture fut = new EnableStatisticsFuture(msg.requestId()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java index 3946c4fd2a8d2..3dd6d0b5f457d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java @@ -23,13 +23,13 @@ import java.util.UUID; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiNoop; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; @@ -101,7 +101,7 @@ public class StandaloneNoopDiscoverySpi extends IgniteSpiAdapter implements Disc } /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java index 6dc38c1c60d85..d06564d1b8a92 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.NavigableMap; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; import org.jetbrains.annotations.Nullable; @@ -42,7 +43,7 @@ public class DiscoveryNotification { private @Nullable NavigableMap> topHist; /** Custom message data. */ - private @Nullable DiscoverySpiCustomMessage customMsgData; + private @Nullable DiscoveryCustomMessage customMsgData; /** Span container. */ private SpanContainer spanContainer; @@ -75,7 +76,7 @@ public DiscoveryNotification( ClusterNode node, Collection topSnapshot, @Nullable NavigableMap> topHist, - @Nullable DiscoverySpiCustomMessage customMsgData, + @Nullable DiscoveryCustomMessage customMsgData, SpanContainer spanContainer ) { this.eventType = eventType; @@ -125,7 +126,7 @@ public NavigableMap> getTopHist() { /** * @return Custom message data. */ - public DiscoverySpiCustomMessage getCustomMsgData() { + public DiscoveryCustomMessage getCustomMsgData() { return customMsgData; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 545e1a043e733..3d51a2fc3bf06 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -23,6 +23,7 @@ import java.util.UUID; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.IgniteSpi; import org.apache.ignite.spi.IgniteSpiException; @@ -153,7 +154,7 @@ public interface DiscoverySpi extends IgniteSpi { * @param msg Custom message. * @throws IgniteException if failed to sent the event message. */ - public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException; + public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException; /** * Initiates failure of provided node. diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java index 6e11673d8e2c8..563771672c51d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java @@ -17,37 +17,26 @@ package org.apache.ignite.spi.discovery; -import java.io.Serializable; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.lang.IgniteUuid; /** * Message to send across ring. * * @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage) */ -public interface DiscoverySpiCustomMessage extends Serializable { - /** - * Called when custom message has been handled by all nodes. - * - * @return Ack message or {@code null} if ack is not required. - */ - @Nullable public DiscoverySpiCustomMessage ackMessage(); +public abstract class DiscoverySpiCustomMessage implements DiscoveryCustomMessage { + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return null; + } - /** - * @return {@code True} if message can be modified during listener notification. Changes will be send to next nodes. - */ - public boolean isMutable(); - - /** - * Called on discovery coordinator node after listener is notified. If returns {@code true} - * then message is not passed to others nodes, if after this method {@link #ackMessage()} returns non-null ack - * message, it is sent to all nodes. - * - * Note: this method is used then and only then the zookeeper discovery is configured. - * - * @return {@code True} if message should not be sent to all nodes. - */ - public boolean stopProcess(); + /** {@inheritDoc} */ + @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, + DiscoCache discoCache) { + return null; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java index 1e0d02019fbb8..9e00dd311a9d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java @@ -30,6 +30,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteProductVersion; @@ -41,7 +42,6 @@ import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; import org.apache.ignite.spi.discovery.DiscoveryNotification; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; import org.apache.ignite.spi.discovery.DiscoverySpiListener; @@ -159,7 +159,7 @@ public class IsolatedDiscoverySpi extends IgniteSpiAdapter implements IgniteDisc } /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { exec.execute(() -> { IgniteFuture fut = lsnr.onDiscovery(new DiscoveryNotification( EVT_DISCOVERY_CUSTOM_EVT, @@ -172,7 +172,7 @@ public class IsolatedDiscoverySpi extends IgniteSpiAdapter implements IgniteDisc // Acknowledge message must be send after initial message processed. fut.listen((f) -> { - DiscoverySpiCustomMessage ack = msg.ackMessage(); + DiscoveryCustomMessage ack = msg.ackMessage(); if (ack != null) { exec.execute(() -> lsnr.onDiscovery(new DiscoveryNotification( diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 238fdd13eabd5..2dc3718269627 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -65,8 +65,9 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage; +import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper; import org.apache.ignite.internal.processors.tracing.Span; import org.apache.ignite.internal.processors.tracing.SpanTags; import org.apache.ignite.internal.processors.tracing.messages.SpanContainer; @@ -89,7 +90,6 @@ import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.discovery.DiscoveryNotification; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; @@ -492,7 +492,7 @@ else if (state == DISCONNECTED) { } /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { + @Override public void sendCustomEvent(DiscoveryCustomMessage evt) { State state = this.state; if (state == DISCONNECTED) @@ -504,23 +504,26 @@ else if (state == DISCONNECTED) { try { TcpDiscoveryCustomEventMessage msg; - if (((CustomMessageWrapper)evt).delegate() instanceof DiscoveryServerOnlyCustomMessage) - msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt, - U.marshal(spi.marshaller(), evt)); + DiscoveryCustomMessage customMsg = evt instanceof SecurityAwareCustomMessageWrapper ? + ((SecurityAwareCustomMessageWrapper)evt).delegate() : evt; + + if (customMsg instanceof DiscoveryServerOnlyCustomMessage) + msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt); else - msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, - U.marshal(spi.marshaller(), evt)); + msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt); Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass())) .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString()) .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), () -> locNode.consistentId().toString()) - .addTag(SpanTags.MESSAGE_CLASS, () -> ((CustomMessageWrapper)evt).delegate().getClass().getSimpleName()) + .addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName()) .addLog(() -> "Created"); // This root span will be parent both from local and remote nodes. msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); + msg.prepareMarshal(spi.marshaller()); + sockWriter.sendMessage(msg); rootSpan.addLog(() -> "Sent").end(); @@ -2591,8 +2594,9 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { if (node != null && node.visible()) { try { - DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(), - U.resolveClassLoader(spi.ignite().configuration())); + msg.finishUnmarhal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + + DiscoveryCustomMessage msgObj = msg.message(); notifyDiscovery( EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj, msg.spanContainer()); @@ -2687,7 +2691,7 @@ private void notifyDiscovery( long topVer, ClusterNode node, Collection top, - @Nullable DiscoverySpiCustomMessage data, + @Nullable DiscoveryCustomMessage data, SpanContainer spanContainer ) { DiscoverySpiListener lsnr = spi.lsnr; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index ea8dd5f5c2db4..754573f1d67c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -79,8 +79,9 @@ import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage; +import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper; import org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty; import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; @@ -122,7 +123,6 @@ import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryNotification; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.IgniteDiscoveryThread; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; @@ -1019,22 +1019,25 @@ private void interruptPing(TcpDiscoveryNode node) { } /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { + @Override public void sendCustomEvent(DiscoveryCustomMessage evt) { try { TcpDiscoveryCustomEventMessage msg; - if (((CustomMessageWrapper)evt).delegate() instanceof DiscoveryServerOnlyCustomMessage) - msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt, - U.marshal(spi.marshaller(), evt)); + DiscoveryCustomMessage customMsg = evt instanceof SecurityAwareCustomMessageWrapper ? + ((SecurityAwareCustomMessageWrapper)evt).delegate() : evt; + + if (customMsg instanceof DiscoveryServerOnlyCustomMessage) + msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt); else - msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, - U.marshal(spi.marshaller(), evt)); + msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt); + + msg.prepareMarshal(spi.marshaller()); Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass())) .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString()) .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), () -> locNode.consistentId().toString()) - .addTag(SpanTags.MESSAGE_CLASS, () -> ((CustomMessageWrapper)evt).delegate().getClass().getSimpleName()) + .addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName()) .addLog(() -> "Created"); // This root span will be parent both from local and remote nodes. @@ -6217,31 +6220,33 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa processCustomMessage(msg, waitForNotification); } } - - msg.message(null, msg.messageBytes()); } else { addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true)); - DiscoverySpiCustomMessage msgObj = null; + DiscoveryCustomMessage msgObj = null; try { - msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + msg.finishUnmarhal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + + msgObj = msg.message(); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); } if (msgObj != null) { - DiscoverySpiCustomMessage nextMsg = msgObj.ackMessage(); + DiscoveryCustomMessage nextMsg = msgObj.ackMessage(); if (nextMsg != null) { try { TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage( - getLocalNodeId(), nextMsg, U.marshal(spi.marshaller(), nextMsg)); + getLocalNodeId(), nextMsg); ackMsg.topologyVersion(msg.topologyVersion()); + ackMsg.prepareMarshal(spi.marshaller()); + processCustomMessage(ackMsg, waitForNotification); } catch (IgniteCheckedException e) { @@ -6272,9 +6277,6 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa notifyDiscoveryListener(msg, waitForNotification); } - // Clear msg field to prevent possible memory leak. - msg.message(null, msg.messageBytes()); - if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } @@ -6411,10 +6413,12 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean if (node == null) return; - DiscoverySpiCustomMessage msgObj; + DiscoveryCustomMessage msgObj; try { - msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + msg.finishUnmarhal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + + msgObj = msg.message(); } catch (Throwable t) { throw new IgniteException("Failed to unmarshal discovery custom message: " + msg, t); @@ -6446,7 +6450,7 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean if (msgObj.isMutable()) { try { - msg.message(msgObj, U.marshal(spi.marshaller(), msgObj)); + msg.prepareMarshal(spi.marshaller()); } catch (Throwable t) { throw new IgniteException("Failed to marshal mutable discovery message: " + msgObj, t); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index e115a3cca03b1..aa7dc4a63226e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -37,6 +37,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.cache.CacheMetricsSnapshot; import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage; import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage; @@ -49,7 +50,6 @@ import org.apache.ignite.spi.IgniteSpiContext; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiThread; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientNodesMetricsMessage; @@ -281,7 +281,7 @@ protected void onMessageExchanged() { * @param msg Message. * @throws IgniteException If failed. */ - public abstract void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException; + public abstract void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException; /** * @param nodeId Node id. diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index b8cba5dd83f70..35b1b3b19fe16 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.processors.failure.FailureProcessor; @@ -91,7 +92,6 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; import org.apache.ignite.spi.discovery.DiscoverySpiListener; @@ -537,7 +537,7 @@ public ClusterNode getNode0(UUID id) { } /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { impl.sendCustomEvent(msg); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index cd1b90b348c94..7d195cc331e63 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -17,44 +17,61 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import java.util.Arrays; import java.util.Objects; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.IncompleteDeserializationException; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** - * Wrapped for custom message. + * Wrapper for custom message. */ @TcpDiscoveryRedirectToClient @TcpDiscoveryEnsureDelivery -public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractTraceableMessage { +public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { /** */ private static final long serialVersionUID = 0L; - /** */ - private transient volatile DiscoverySpiCustomMessage msg; + /** Original custom message. */ + private volatile DiscoveryCustomMessage msg; - /** */ - private byte[] msgBytes; + /** + * {@link Message} representation of original message. + */ + //TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 + @Order(value = 6, method = "serializedMessage") + private @Nullable Message serMsg; + + /** + * Serialized message bytes. + */ + //TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 + @Order(value = 7, method = "messageBytes") + private @Nullable byte[] msgBytes; + + /** + * Default constructor. + */ + public TcpDiscoveryCustomEventMessage() { + //No-op. + } /** * @param creatorNodeId Creator node id. * @param msg Message. - * @param msgBytes Serialized message. */ - public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpiCustomMessage msg, - @NotNull byte[] msgBytes) { + public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, DiscoveryCustomMessage msg) { super(creatorNodeId); this.msg = msg; - this.msgBytes = msgBytes; } /** @@ -64,7 +81,7 @@ public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, @Nullable DiscoverySpi public TcpDiscoveryCustomEventMessage(TcpDiscoveryCustomEventMessage msg) { super(msg); - this.msgBytes = msg.msgBytes; + msgBytes = msg.msgBytes; this.msg = msg.msg; } @@ -73,47 +90,97 @@ public TcpDiscoveryCustomEventMessage(TcpDiscoveryCustomEventMessage msg) { */ public void clearMessage() { msg = null; + serMsg = null; } /** - * @return Serialized message. + * @return Serialized message bytes. */ public byte[] messageBytes() { return msgBytes; } + /** + * @param msgBytes Serialized message bytes. + */ + public void messageBytes(@Nullable byte[] msgBytes) { + this.msgBytes = msgBytes; + } + + /** + * @return {@link Message} representation of original message. + */ + public @Nullable Message serializedMessage() { + return serMsg; + } + + /** + * @param serMsg {@link Message} representation of original message. + */ + public void serializedMessage(@Nullable Message serMsg) { + this.serMsg = serMsg; + } + /** * @param msg Message. - * @param msgBytes Serialized message. */ - public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msgBytes) { + public void message(DiscoverySpiCustomMessage msg) { this.msg = msg; - this.msgBytes = msgBytes; } /** + * @return Original message. + */ + public DiscoveryCustomMessage message() { + return msg; + } + + /** + * Prepare message for serialization. + * + * @param marsh Marshaller. + */ + //TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 + public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (msg instanceof Message) + serMsg = (Message)msg; + else { + assert msgBytes == null || msg.isMutable() : "Message bytes are not null for immutable message: bytes=" + + Arrays.toString(msgBytes) + "]"; + + msgBytes = U.marshal(marsh, msg); + } + + msg = null; + } + + /** + * Finish deserialization. + * * @param marsh Marshaller. - * @param ldr Classloader. - * @return Deserialized message, - * @throws java.lang.Throwable if unmarshal failed. + * @param ldr Class loader. */ - @Nullable public DiscoverySpiCustomMessage message(@NotNull Marshaller marsh, ClassLoader ldr) throws Throwable { - if (msg == null) { + //TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 + public void finishUnmarhal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { + if (msg != null) + return; + + if (serMsg != null) + msg = (DiscoveryCustomMessage)serMsg; + else { try { msg = U.unmarshal(marsh, msgBytes, ldr); } catch (IgniteCheckedException e) { // Try to resurrect a message in a case of deserialization failure if (e.getCause() instanceof IncompleteDeserializationException) - return new CustomMessageWrapper(((IncompleteDeserializationException)e.getCause()).message()); + msg = ((IncompleteDeserializationException)e.getCause()).message(); throw e; } - - assert msg != null; } - return msg; + assert msg != null; } /** {@inheritDoc} */ @@ -127,4 +194,9 @@ public void message(@Nullable DiscoverySpiCustomMessage msg, @NotNull byte[] msg @Override public String toString() { return S.toString(TcpDiscoveryCustomEventMessage.class, this, "super", super.toString()); } + + /** {@inheritDoc} */ + @Override public short directType() { + return 500; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java index 543a06fe4dc7f..8e70196ab051b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeLeftMessage.java @@ -48,7 +48,7 @@ public TcpDiscoveryNodeLeftMessage(UUID creatorNodeId) { /** {@inheritDoc} */ @Override public short directType() { - return 14; + return 16; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java index 97f701ed6e832..8d8b0eac774af 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryServerOnlyCustomEventMessage.java @@ -18,7 +18,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.util.UUID; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.jetbrains.annotations.NotNull; /** @@ -29,13 +29,23 @@ public class TcpDiscoveryServerOnlyCustomEventMessage extends TcpDiscoveryCustom /** */ private static final long serialVersionUID = 0L; + /** + * Default constructor. + */ + public TcpDiscoveryServerOnlyCustomEventMessage() { + // No-op. + } + /** * @param creatorNodeId Creator node id. * @param msg Message. - * @param msgBytes Serialized message. */ - public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId, @NotNull DiscoverySpiCustomMessage msg, - @NotNull byte[] msgBytes) { - super(creatorNodeId, msg, msgBytes); + public TcpDiscoveryServerOnlyCustomEventMessage(UUID creatorNodeId, @NotNull DiscoveryCustomMessage msg) { + super(creatorNodeId, msg); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 501; } } diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index e4f3b99b972de..96338a4405e1c 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -719,7 +719,6 @@ org.apache.ignite.internal.managers.deployment.GridDeploymentPerVersionStore$2 org.apache.ignite.internal.managers.deployment.GridDeploymentRequest org.apache.ignite.internal.managers.deployment.GridDeploymentResponse org.apache.ignite.internal.managers.deployment.P2PClassNotFoundException -org.apache.ignite.internal.managers.discovery.CustomMessageWrapper org.apache.ignite.internal.managers.discovery.DiscoCache$1 org.apache.ignite.internal.managers.discovery.DiscoCache$2 org.apache.ignite.internal.managers.discovery.DiscoCache$3 diff --git a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java index 01a3955c09aa7..fc9be4ef43a98 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java @@ -30,7 +30,6 @@ import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.testframework.GridTestUtils; /** @@ -52,7 +51,7 @@ public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListe private final Object mux = new Object(); /** */ - private List blockedMsgs = new ArrayList<>(); + private List blockedMsgs = new ArrayList<>(); /** */ private volatile DiscoverySpi spi; @@ -121,13 +120,13 @@ public void stopBlockRestart() { } /** {@inheritDoc} */ - @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) { + @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoveryCustomMessage msg) { this.spi = spi; this.log = log; synchronized (mux) { if (blockCustomEvtCls != null) { - DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate"); + DiscoveryCustomMessage msg0 = GridTestUtils.unwrap(msg); if (blockCustomEvtCls.contains(msg0.getClass())) { log.info("Block custom message: " + msg0); @@ -176,7 +175,7 @@ public void stopBlockCustomEvents() { if (spi == null) return; - List msgs; + List msgs; synchronized (this) { msgs = new ArrayList<>(blockedMsgs); @@ -186,7 +185,7 @@ public void stopBlockCustomEvents() { blockedMsgs.clear(); } - for (DiscoverySpiCustomMessage msg : msgs) { + for (DiscoveryCustomMessage msg : msgs) { log.info("Resend blocked message: " + msg); spi.sendCustomEvent(msg); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java index dcc49c013e433..308b390cef171 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java @@ -85,6 +85,7 @@ import org.apache.ignite.internal.binary.streams.BinaryOutputStream; import org.apache.ignite.internal.binary.streams.BinaryStreams; import org.apache.ignite.internal.binary.streams.BinaryStreamsTestUtils; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.systemview.GridSystemViewManager; import org.apache.ignite.internal.managers.systemview.JmxSystemViewExporterSpi; @@ -100,7 +101,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.MarshallerContextTestImpl; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.GridTestKernalContext; @@ -4159,7 +4159,7 @@ protected BinaryMarshaller binaryMarshaller( iCfg.setBinaryConfiguration(bCfg); iCfg.setClientMode(false); iCfg.setDiscoverySpi(new TcpDiscoverySpi() { - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { //No-op. } }); @@ -4217,7 +4217,7 @@ protected BinaryObjectBuilder builder( iCfg.setBinaryConfiguration(bCfg); iCfg.setClientMode(false); iCfg.setDiscoverySpi(new TcpDiscoverySpi() { - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { //No-op. } }); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java index 5c87c1819db3d..fc8f20cfe9e5e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java @@ -49,7 +49,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer; import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.service.GridServiceProxy; @@ -60,8 +59,8 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.services.ServiceConfiguration; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger; import org.apache.logging.log4j.Level; @@ -646,13 +645,14 @@ private static final class TestBlockingDiscoverySpi extends TcpDiscoverySpi { private final Set> toBlock = new HashSet<>(); /** */ - private final List blocked = new CopyOnWriteArrayList<>(); + private final List blocked = new CopyOnWriteArrayList<>(); /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { - if (msg instanceof CustomMessageWrapper - && toBlock.stream().anyMatch(mt -> mt.isAssignableFrom(((CustomMessageWrapper)msg).delegate().getClass()))) { - blocked.add((CustomMessageWrapper)msg); + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { + DiscoveryCustomMessage realMsg = GridTestUtils.unwrap(msg); + + if (toBlock.stream().anyMatch(mt -> mt.isAssignableFrom(realMsg.getClass()))) { + blocked.add(msg); return; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java index 6b25e9ff5d995..3d0a267a2ff79 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java @@ -20,7 +20,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; /** * For TESTING only. @@ -46,5 +45,5 @@ public default void beforeReconnect(ClusterNode locNode, IgniteLogger log) { * @param msg Custom message. * @return {@code False} to cancel event send. */ - public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg); + public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoveryCustomMessage msg); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java index a31feaffb66a3..aa9fb9441bf0d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientSlowDiscoveryAbstractTest.java @@ -23,17 +23,16 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** @@ -96,22 +95,23 @@ static class CustomMessageInterceptingDiscoverySpi extends TcpDiscoverySpi { TcpDiscoveryCustomEventMessage cm = (TcpDiscoveryCustomEventMessage)msg; - DiscoveryCustomMessage delegate; + DiscoveryCustomMessage realMsg; try { - DiscoverySpiCustomMessage custMsg = cm.message(marshaller(), - U.resolveClassLoader(ignite().configuration())); + cm.finishUnmarhal(marshaller(), U.resolveClassLoader(ignite().configuration())); + + DiscoveryCustomMessage custMsg = cm.message(); assertNotNull(custMsg); - delegate = ((CustomMessageWrapper)custMsg).delegate(); + realMsg = GridTestUtils.unwrap(custMsg); } catch (Throwable throwable) { throw new RuntimeException(throwable); } if (interceptor != null) - interceptor.apply(delegate); + interceptor.apply(realMsg); } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java index 67ff9fafa198b..7cb454e6d9ad0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java @@ -36,6 +36,7 @@ import org.apache.ignite.spi.discovery.DiscoveryNotification; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; import org.junit.Test; @@ -199,7 +200,7 @@ private DiscoverySpiListenerWrapper(DiscoverySpiListener delegate) { DiscoveryNotification notification ) { DiscoveryCustomMessage customMsg = notification.getCustomMsgData() == null ? null - : (DiscoveryCustomMessage)U.field(notification.getCustomMsgData(), "delegate"); + : GridTestUtils.unwrap(notification.getCustomMsgData()); if (customMsg != null) { //don't want to make this class public, using equality of class name instead of instanceof operator diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java index c485ac9441739..24854d9d22e8f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage; import org.apache.ignite.internal.processors.cache.persistence.filename.SharedFileTree; @@ -43,7 +42,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -163,25 +161,23 @@ private void doTestMarshallingBinaryMappingsLoadedFromClient(boolean receiveMeta @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TcpDiscoveryCustomEventMessage) { try { - DiscoverySpiCustomMessage custom = - ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader()); - - if (custom instanceof CustomMessageWrapper) { - DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate(); - - if (delegate instanceof MappingAcceptedMessage) { - MarshallerMappingItem item = GridTestUtils.getFieldValue(delegate, "item"); - - if (item.className().equals(PERSON_CLASS_NAME) || - item.className().equals(ORGANIZATION_CLASS_NAME) || - item.className().equals(ADDRESS_CLASS_NAME) - ) { - try { - U.await(delayMappingLatch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } - catch (Exception e) { - fail("Mapping proposed message must be released."); - } + TcpDiscoveryCustomEventMessage msg0 = (TcpDiscoveryCustomEventMessage)msg; + msg0.finishUnmarhal(marshaller(), U.gridClassLoader()); + + DiscoveryCustomMessage customMsg = GridTestUtils.unwrap(msg0.message()); + + if (customMsg instanceof MappingAcceptedMessage) { + MarshallerMappingItem item = GridTestUtils.getFieldValue(customMsg, "item"); + + if (item.className().equals(PERSON_CLASS_NAME) || + item.className().equals(ORGANIZATION_CLASS_NAME) || + item.className().equals(ADDRESS_CLASS_NAME) + ) { + try { + U.await(delayMappingLatch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + catch (Exception e) { + fail("Mapping proposed message must be released."); } } } @@ -240,22 +236,20 @@ public void testBinaryMetaDelayedForComputeJobResult() throws Exception { @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TcpDiscoveryCustomEventMessage) { try { - DiscoverySpiCustomMessage custom = - ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader()); - - if (custom instanceof CustomMessageWrapper) { - DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate(); - - if (delegate instanceof MappingProposedMessage) { - MarshallerMappingItem item = GridTestUtils.getFieldValue(delegate, "mappingItem"); - - if (item.className().contains(JOB_RESULT_CLASS_NAME_PREFIX)) { - try { - U.await(latch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } - catch (Exception e) { - fail("Exception must never be thrown: " + e.getMessage()); - } + TcpDiscoveryCustomEventMessage msg0 = (TcpDiscoveryCustomEventMessage)msg; + msg0.finishUnmarhal(marshaller(), U.gridClassLoader()); + + DiscoveryCustomMessage customMsg = GridTestUtils.unwrap(msg0.message()); + + if (customMsg instanceof MappingProposedMessage) { + MarshallerMappingItem item = GridTestUtils.getFieldValue(customMsg, "mappingItem"); + + if (item.className().contains(JOB_RESULT_CLASS_NAME_PREFIX)) { + try { + U.await(latch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } + catch (Exception e) { + fail("Exception must never be thrown: " + e.getMessage()); } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java index 8dc02770f9d6a..2ee750e97697e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java @@ -41,6 +41,7 @@ import org.apache.ignite.spi.discovery.DiscoveryNotification; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; import org.junit.Test; @@ -247,7 +248,7 @@ private DiscoverySpiListenerWrapper(DiscoverySpiListener delegate) { DiscoveryNotification notification ) { DiscoveryCustomMessage customMsg = notification.getCustomMsgData() == null ? null - : (DiscoveryCustomMessage)U.field(notification.getCustomMsgData(), "delegate"); + : GridTestUtils.unwrap(notification.getCustomMsgData()); if (customMsg != null) { //don't want to make this class public, using equality of class name instead of instanceof operator diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataRemoveTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataRemoveTest.java index 7a8193f3b1270..420cf01d0338c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataRemoveTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataRemoveTest.java @@ -31,10 +31,8 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -184,14 +182,11 @@ public void testChangeMetaWhenTypeRemoving() throws Exception { AtomicBoolean hookMsgs = new AtomicBoolean(true); discoveryHook = new GridTestUtils.DiscoveryHook() { - @Override public void beforeDiscovery(DiscoverySpiCustomMessage msg) { + @Override public void beforeDiscovery(DiscoveryCustomMessage msg) { if (!hookMsgs.get()) return; - DiscoveryCustomMessage customMsg = msg == null ? null - : (DiscoveryCustomMessage)IgniteUtils.field(msg, "delegate"); - - if (customMsg instanceof MetadataRemoveProposedMessage) { + if (msg instanceof MetadataRemoveProposedMessage) { try { barrier0.await(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java index 15910a33d19aa..f359db9fc7a04 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridBinaryCacheEntryMemorySizeSelfTest.java @@ -21,6 +21,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.systemview.GridSystemViewManager; import org.apache.ignite.internal.managers.systemview.JmxSystemViewExporterSpi; @@ -28,7 +29,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.MarshallerContextTestImpl; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.systemview.view.SystemView; import org.apache.ignite.testframework.junits.GridTestKernalContext; @@ -43,7 +43,7 @@ public class GridBinaryCacheEntryMemorySizeSelfTest extends GridCacheEntryMemory IgniteConfiguration iCfg = new IgniteConfiguration(); iCfg.setDiscoverySpi(new TcpDiscoverySpi() { - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { // No-op. } }); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java index dc4f15bd07b78..ce09fe1d6e190 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteSequentialNodeCrashRecoveryTest.java @@ -43,7 +43,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgnitionEx; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageIdAllocator; @@ -67,7 +66,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; @@ -357,16 +355,14 @@ public BlockingDiscoverySpi(IgnitePredicate blockPred) { /** */ private DiscoveryCustomMessage extractCustomMessage(TcpDiscoveryCustomEventMessage msg) { - DiscoverySpiCustomMessage msgObj = null; - try { - msgObj = msg.message(marshaller(), U.resolveClassLoader(ignite().configuration())); + msg.finishUnmarhal(marshaller(), U.gridClassLoader()); } catch (Throwable e) { U.error(log, "Failed to unmarshal discovery custom message.", e); } - return ((CustomMessageWrapper)msgObj).delegate(); + return GridTestUtils.unwrap(msg.message()); } /** Unblock discovery custom messages. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java index 3a490545429c9..c8b4d55f81a54 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java @@ -68,7 +68,6 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.encryption.AbstractEncryptionTest; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRecord; @@ -97,7 +96,6 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteFutureCancelledException; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi; import org.apache.ignite.testframework.GridTestUtils; @@ -910,24 +908,22 @@ protected static BlockingCustomMessageDiscoverySpi discoSpi(IgniteEx ignite) { /** */ protected static class BlockingCustomMessageDiscoverySpi extends TcpDiscoverySpi { /** List of messages which have been blocked. */ - private final List blocked = new CopyOnWriteArrayList<>(); + private final List blocked = new CopyOnWriteArrayList<>(); /** Discovery custom message filter. */ private volatile IgnitePredicate blockPred; /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { - if (msg instanceof CustomMessageWrapper) { - DiscoveryCustomMessage msg0 = ((CustomMessageWrapper)msg).delegate(); + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { + DiscoveryCustomMessage msg0 = GridTestUtils.unwrap(msg); - if (blockPred != null && blockPred.apply(msg0)) { - blocked.add(msg); + if (blockPred != null && blockPred.apply(msg0)) { + blocked.add(msg); - if (log.isInfoEnabled()) - log.info("Discovery message has been blocked: " + msg0); + if (log.isInfoEnabled()) + log.info("Discovery message has been blocked: " + msg0); - return; - } + return; } super.sendCustomEvent(msg); @@ -954,11 +950,11 @@ public synchronized void unblock() { /** Releases the blocked messages. */ private void releaseBlocked() { - List blocked = new CopyOnWriteArrayList<>(this.blocked); + List blocked = new CopyOnWriteArrayList<>(this.blocked); this.blocked.clear(); - for (DiscoverySpiCustomMessage msg : blocked) + for (DiscoveryCustomMessage msg : blocked) sendCustomEvent(msg); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java index aaae9d6b9e270..297897aaad8df 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; @@ -43,6 +43,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; import org.junit.Test; @@ -247,10 +248,13 @@ private static class CoordinatorBlockingDiscoverySpi extends TcpDiscoverySpi { TcpDiscoveryCustomEventMessage m = (TcpDiscoveryCustomEventMessage)msg; try { - CustomMessageWrapper m0 = (CustomMessageWrapper)m.message( - marshaller(), U.resolveClassLoader(ignite().configuration())); + m.finishUnmarhal(marshaller(), U.resolveClassLoader(ignite().configuration())); - if (m0.delegate() instanceof InitMessage) + DiscoveryCustomMessage m0 = m.message(); + + DiscoveryCustomMessage realMsg = GridTestUtils.unwrap(m0); + + if (realMsg instanceof InitMessage) rcvStartSnpReq.countDown(); } catch (Throwable e) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java index 96e2b462fc289..43a43cea26d72 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/IgniteNoCustomEventsOnNodeStart.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.processors.continuous; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -60,8 +60,10 @@ public void testNoCustomEventsOnStart() throws Exception { */ static class TestTcpDiscoverySpi extends TcpDiscoverySpi { /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) { - if (GridTestUtils.getFieldValue(msg, "delegate") instanceof CacheAffinityChangeMessage) + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) { + DiscoveryCustomMessage realMsg = GridTestUtils.unwrap(msg); + + if (realMsg instanceof CacheAffinityChangeMessage) return; failed = true; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java index fce48f3bbbcc7..40cb3dbbdc0dd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/schema/IndexWithSameNameTestBase.java @@ -40,7 +40,6 @@ import org.apache.ignite.configuration.SqlConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.TestRecordingCommunicationSpi; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.query.schema.management.IndexDescriptor; @@ -50,10 +49,10 @@ import org.apache.ignite.internal.util.lang.ConsumerX; import org.apache.ignite.internal.util.lang.GridTuple3; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -313,10 +312,10 @@ public static class SchemaFinishListeningTcpDiscoverySpi extends TcpDiscoverySpi @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TcpDiscoveryCustomEventMessage) { try { - DiscoverySpiCustomMessage spiCustomMsg = ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), - U.resolveClassLoader(ignite().configuration())); + TcpDiscoveryCustomEventMessage msg0 = (TcpDiscoveryCustomEventMessage)msg; + msg0.finishUnmarhal(marshaller(), U.gridClassLoader()); - DiscoveryCustomMessage discoCustomMsg = ((CustomMessageWrapper)spiCustomMsg).delegate(); + DiscoveryCustomMessage discoCustomMsg = GridTestUtils.unwrap(msg0.message()); if (discoCustomMsg instanceof SchemaFinishDiscoveryMessage) { SchemaFinishDiscoveryMessage finishMsg = (SchemaFinishDiscoveryMessage)discoCustomMsg; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java index 3480bbe2cbb1e..9734ddd0b2f09 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java @@ -35,7 +35,6 @@ import org.apache.ignite.failure.StopNodeOrHaltFailureHandler; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; @@ -51,6 +50,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -205,7 +205,7 @@ private boolean anyReceivedMessageMatch(IgniteEx ignite, Predicate predi assert customMsg instanceof SecurityAwareCustomMessageWrapper; - unwrappedMsg = ((CustomMessageWrapper)customMsg).delegate(); + unwrappedMsg = GridTestUtils.unwrap(customMsg); } if (predicate.test(unwrappedMsg)) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceConcurrentUndeployTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceConcurrentUndeployTest.java index a040aca7cb1f3..bbf64cc3e4857 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceConcurrentUndeployTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceConcurrentUndeployTest.java @@ -26,11 +26,11 @@ import org.apache.ignite.internal.DiscoverySpiTestListener; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.service.inner.LongInitializedTestService; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -50,13 +50,14 @@ public class ServiceConcurrentUndeployTest extends GridCommonAbstractTest { TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi(); disco.setInternalListener(new DiscoverySpiTestListener() { - @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) { + @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoveryCustomMessage msg) { if (spi.isClientMode()) { - boolean isUndeployMsg = msg instanceof CustomMessageWrapper - && ((CustomMessageWrapper)msg).delegate() instanceof ServiceChangeBatchRequest; + DiscoveryCustomMessage realMsg = GridTestUtils.unwrap(msg); + + boolean isUndeployMsg = realMsg instanceof ServiceChangeBatchRequest; if (isUndeployMsg) { - ServiceChangeBatchRequest batch = (ServiceChangeBatchRequest)((CustomMessageWrapper)msg).delegate(); + ServiceChangeBatchRequest batch = (ServiceChangeBatchRequest)realMsg; long undeployReqCnt = batch.requests().stream() .filter(r -> r instanceof ServiceUndeploymentRequest) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java index 5b4dc2ae74eaa..19b5e408c3c58 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentProcessAbstractTest.java @@ -22,10 +22,10 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; @@ -102,8 +102,10 @@ protected static class BlockingTcpDiscoverySpi extends TestTcpDiscoverySpi { private volatile boolean block; /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { - if (block && GridTestUtils.getFieldValue(msg, "delegate") instanceof ServiceClusterDeploymentResultBatch) + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { + DiscoveryCustomMessage msg0 = GridTestUtils.unwrap(msg); + + if (block && msg0 instanceof ServiceClusterDeploymentResultBatch) return; super.sendCustomEvent(msg); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java index 30a7ec8029837..9cffa43b9220a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.lang.IgniteProductVersion; @@ -153,7 +154,7 @@ private class DelegatedDiscoverySpi extends IgniteSpiAdapter implements IgniteDi } /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { delegate.sendCustomEvent(msg); } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java index f72eac9999460..a1b612ab63c40 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java @@ -21,13 +21,12 @@ import java.net.Socket; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.testframework.GridTestUtils; import static org.junit.Assert.assertNotNull; @@ -53,24 +52,19 @@ private synchronized void apply(ClusterNode addr, TcpDiscoveryAbstractMessage ms if (!(msg instanceof TcpDiscoveryCustomEventMessage)) return; - TcpDiscoveryCustomEventMessage cm = (TcpDiscoveryCustomEventMessage)msg; - - DiscoveryCustomMessage delegate; + TcpDiscoveryCustomEventMessage msg0 = (TcpDiscoveryCustomEventMessage)msg; try { - DiscoverySpiCustomMessage custMsg = cm.message(marshaller(), U.resolveClassLoader(ignite().configuration())); - - assertNotNull(custMsg); - - delegate = ((CustomMessageWrapper)custMsg).delegate(); + msg0.finishUnmarhal(marshaller(), U.gridClassLoader()); + assertNotNull(msg0.message()); } catch (Throwable throwable) { throw new RuntimeException(throwable); } if (clo != null) - clo.apply(addr, delegate); + clo.apply(addr, GridTestUtils.unwrap(msg0.message())); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java index 15cb1f903827d..0d156464075ca 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java @@ -24,7 +24,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; @@ -238,7 +237,7 @@ public void testDeliveryAllFailedMessagesInCorrectOrder() throws Exception { * @param id Message id. */ private void sendDummyCustomMessage(TcpDiscoverySpi disco, IgniteUuid id) { - disco.sendCustomEvent(new CustomMessageWrapper(new DummyCustomDiscoveryMessage(id))); + disco.sendCustomEvent(new DummyCustomDiscoveryMessage(id)); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 3195284ba9262..a5ac0baf584ef 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -2606,8 +2606,11 @@ private static class TestCustomerEventAckSpi extends TcpDiscoverySpi { if (stopBeforeSndAck) { if (msg instanceof TcpDiscoveryCustomEventMessage) { try { - DiscoveryCustomMessage custMsg = GridTestUtils.getFieldValue( - ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader()), "delegate"); + TcpDiscoveryCustomEventMessage msg0 = (TcpDiscoveryCustomEventMessage)msg; + + msg0.finishUnmarhal(marshaller(), U.gridClassLoader()); + + DiscoveryCustomMessage custMsg = GridTestUtils.unwrap(msg0.message()); if (custMsg instanceof StartRoutineAckDiscoveryMessage) { log.info("Skip message send and stop node: " + msg); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java index e3038bf5dc6a7..336c853706021 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java @@ -20,8 +20,8 @@ import java.io.IOException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage; @@ -73,7 +73,7 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi implements IgniteDiscov } /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr; if (internalLsnr != null) { diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index 62e450ee10c39..fba1bfd022a5f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -94,8 +94,8 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; @@ -126,7 +126,6 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper; import org.apache.ignite.spi.discovery.DiscoveryNotification; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.ssl.SslContextFactory; import org.apache.ignite.testframework.config.GridTestProperties; @@ -182,9 +181,8 @@ public static class DiscoveryHook { * * @param msg Intercepted discovery message. */ - public void beforeDiscovery(DiscoverySpiCustomMessage msg) { - if (msg instanceof CustomMessageWrapper) - beforeDiscovery(unwrap((CustomMessageWrapper)msg)); + public void beforeDiscovery0(DiscoveryCustomMessage msg) { + beforeDiscovery(unwrap(msg)); } /** @@ -201,9 +199,8 @@ public void beforeDiscovery(DiscoveryCustomMessage customMsg) { * * @param msg Intercepted discovery message. */ - public void afterDiscovery(DiscoverySpiCustomMessage msg) { - if (msg instanceof CustomMessageWrapper) - afterDiscovery(unwrap((CustomMessageWrapper)msg)); + public void afterDiscovery0(DiscoveryCustomMessage msg) { + afterDiscovery(unwrap(msg)); } /** @@ -221,16 +218,16 @@ public void afterDiscovery(DiscoveryCustomMessage customMsg) { public void ignite(IgniteEx ignite) { // No-op. } + } - /** - * Obtains {@link DiscoveryCustomMessage} from {@link CustomMessageWrapper}. - * - * @param wrapper Wrapper of {@link DiscoveryCustomMessage}. - * @return Unwrapped {@link DiscoveryCustomMessage}. - */ - private DiscoveryCustomMessage unwrap(CustomMessageWrapper wrapper) { - return U.field(wrapper, "delegate"); - } + /** + * Unwraps messsage if it is wrapped by {@link SecurityAwareCustomMessageWrapper}. + * + * @param msg Message. + */ + public static DiscoveryCustomMessage unwrap(DiscoveryCustomMessage msg) { + return msg instanceof SecurityAwareCustomMessageWrapper ? + ((SecurityAwareCustomMessageWrapper)msg).delegate() : msg; } /** @@ -254,11 +251,11 @@ private DiscoverySpiListenerWrapper(DiscoverySpiListener delegate, DiscoveryHook /** {@inheritDoc} */ @Override public IgniteFuture onDiscovery(DiscoveryNotification notification) { - hook.beforeDiscovery(notification.getCustomMsgData()); + hook.beforeDiscovery0(notification.getCustomMsgData()); IgniteFuture fut = delegate.onDiscovery(notification); - fut.listen(f -> hook.afterDiscovery(notification.getCustomMsgData())); + fut.listen(f -> hook.afterDiscovery0(notification.getCustomMsgData())); return fut; } diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 0a4bce9e44a89..410dc4259468a 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -84,6 +84,7 @@ import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryUtils; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.systemview.JmxSystemViewExporterSpi; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; @@ -113,7 +114,6 @@ import org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -647,7 +647,7 @@ protected GridTestKernalContext newContext() throws IgniteCheckedException { cfg.setClientMode(false); cfg.setDiscoverySpi(new TcpDiscoverySpi() { - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { // No-op. } }); diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestBinaryMarshaller.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestBinaryMarshaller.java index ff9af51975fe1..524c2d6f257a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestBinaryMarshaller.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestBinaryMarshaller.java @@ -26,10 +26,10 @@ import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.binary.GridBinaryMarshaller; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.cache.persistence.filename.SharedFileTree; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.MarshallerContextTestImpl; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; /** */ @@ -68,7 +68,7 @@ private BinaryMarshaller createBinaryMarshaller(IgniteLogger log) throws IgniteC ) .setClientMode(false) .setDiscoverySpi(new TcpDiscoverySpi() { - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { //No-op. } }); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java index 92b47624466f7..aaf23c74155a5 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java @@ -27,10 +27,10 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -48,10 +48,11 @@ public class BinaryTypeRegistrationTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); cfg.setDiscoverySpi(new TcpDiscoverySpi() { - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { - if (msg instanceof CustomMessageWrapper - && ((CustomMessageWrapper)msg).delegate() instanceof MetadataUpdateProposedMessage) - metadataUpdateProposedMessages.add(((CustomMessageWrapper)msg).delegate()); + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { + DiscoveryCustomMessage msg0 = GridTestUtils.unwrap(msg); + + if (msg0 instanceof MetadataUpdateProposedMessage) + metadataUpdateProposedMessages.add(msg0); super.sendCustomEvent(msg); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java index f7c079b48a5f5..4d9a353cc2ffa 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheRegisterMetadataLocallyTest.java @@ -29,7 +29,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage; @@ -39,10 +38,10 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -70,13 +69,11 @@ public class CacheRegisterMetadataLocallyTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); cfg.setDiscoverySpi(new TcpDiscoverySpi() { - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { - if (msg instanceof CustomMessageWrapper) { - DiscoveryCustomMessage realMsg = ((CustomMessageWrapper)msg).delegate(); + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { + DiscoveryCustomMessage realMsg = GridTestUtils.unwrap(msg); - if (realMsg instanceof MetadataUpdateProposedMessage || realMsg instanceof MetadataUpdateAcceptedMessage) - customMessages.add(realMsg); - } + if (realMsg instanceof MetadataUpdateProposedMessage || realMsg instanceof MetadataUpdateAcceptedMessage) + customMessages.add(realMsg); super.sendCustomEvent(msg); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java index 70847fabff366..8437b2609f4ec 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java @@ -65,7 +65,6 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; @@ -81,7 +80,6 @@ import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -195,26 +193,24 @@ public static Iterable valuesForAsync() { clientBlocker = commSpi; cfg.setDiscoverySpi(new TcpDiscoverySpi() { - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { - if (msg instanceof CustomMessageWrapper) { - DiscoveryCustomMessage delegate = ((CustomMessageWrapper)msg).delegate(); - - if (delegate instanceof DynamicCacheChangeBatch) { - try { - awaitTimeout(); - } - catch (Exception e) { - log.error(e.getMessage(), e); - } + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { + DiscoveryCustomMessage customMsg = GridTestUtils.unwrap(msg); + if (customMsg instanceof DynamicCacheChangeBatch) { + try { + awaitTimeout(); } - else if (delegate instanceof SchemaProposeDiscoveryMessage) { - try { - awaitTimeout(); - } - catch (Exception e) { - log.error(e.getMessage(), e); - } + catch (Exception e) { + log.error(e.getMessage(), e); + } + + } + else if (customMsg instanceof SchemaProposeDiscoveryMessage) { + try { + awaitTimeout(); + } + catch (Exception e) { + log.error(e.getMessage(), e); } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java index 5bc65cbebc98e..e3efd31c48974 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java @@ -47,7 +47,6 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; @@ -60,7 +59,6 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.Nullable; @@ -168,30 +166,28 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest { cfg.setDiscoverySpi(new TcpDiscoverySpi() { - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { - if (CustomMessageWrapper.class.isAssignableFrom(msg.getClass())) { - DiscoveryCustomMessage delegate = ((CustomMessageWrapper)msg).delegate(); - - if (DynamicCacheChangeBatch.class.isAssignableFrom(delegate.getClass())) { - ((DynamicCacheChangeBatch)delegate).requests().stream() - .filter((c) -> !c.cacheName().equalsIgnoreCase("default")) - .findAny() - .ifPresent((c) -> { - try { - awaitTimeout(); - } - catch (Exception e) { - e.printStackTrace(); - } - }); + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException { + DiscoveryCustomMessage realMsg = GridTestUtils.unwrap(msg); + + if (DynamicCacheChangeBatch.class.isAssignableFrom(realMsg.getClass())) { + ((DynamicCacheChangeBatch)realMsg).requests().stream() + .filter((c) -> !c.cacheName().equalsIgnoreCase("default")) + .findAny() + .ifPresent((c) -> { + try { + awaitTimeout(); + } + catch (Exception e) { + e.printStackTrace(); + } + }); + } + else if (SchemaProposeDiscoveryMessage.class.isAssignableFrom(realMsg.getClass())) { + try { + awaitTimeout(); } - else if (SchemaProposeDiscoveryMessage.class.isAssignableFrom(delegate.getClass())) { - try { - awaitTimeout(); - } - catch (Exception e) { - e.printStackTrace(); - } + catch (Exception e) { + e.printStackTrace(); } } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java index c439dce3708fc..f7a9431405e6d 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java @@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -49,7 +50,6 @@ import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; import org.apache.ignite.spi.discovery.DiscoverySpiListener; @@ -404,7 +404,7 @@ public DiscoverySpiNodeAuthenticator getAuthenticator() { } /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) { impl.sendCustomMessage(msg); } diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java index 9b7476c5355f5..8bf005248a293 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java @@ -25,7 +25,7 @@ /** * */ -class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { +class ZkCommunicationErrorResolveFinishMessage extends DiscoverySpiCustomMessage implements ZkInternalMessage { /** */ private static final long serialVersionUID = 0L; diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java index 0c79c36aee0a9..b4ab0d7040aab 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java @@ -25,7 +25,7 @@ /** * Zk Communication Error Resolve Start Message. */ -public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { +public class ZkCommunicationErrorResolveStartMessage extends DiscoverySpiCustomMessage implements ZkInternalMessage { /** */ private static final long serialVersionUID = 0L; diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java index 60a36c5adb952..a4db36079e8f0 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java @@ -18,6 +18,7 @@ package org.apache.ignite.spi.discovery.zk.internal; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; @@ -41,7 +42,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData { DiscoverySpiCustomMessage msg; /** Unmarshalled message. */ - transient DiscoverySpiCustomMessage resolvedMsg; + transient DiscoveryCustomMessage resolvedMsg; /** * @param evtId Event ID. diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java index de7291c0d453c..22b07f66b4fd8 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java @@ -24,7 +24,7 @@ /** * Zk Force Node Fail Message. */ -public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { +public class ZkForceNodeFailMessage extends DiscoverySpiCustomMessage implements ZkInternalMessage { /** */ private static final long serialVersionUID = 0L; diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java index 626fe742d1c1b..c79058e17bf60 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java @@ -24,7 +24,7 @@ /** * */ -class ZkNoServersMessage implements DiscoverySpiCustomMessage, ZkInternalMessage { +class ZkNoServersMessage extends DiscoverySpiCustomMessage implements ZkInternalMessage { /** */ private static final long serialVersionUID = 0L; diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 37039dd028b32..9ae072b9102a2 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -649,7 +650,7 @@ public boolean knownNode(UUID nodeId) { /** * @param msg Message. */ - public void sendCustomMessage(DiscoverySpiCustomMessage msg) { + public void sendCustomMessage(DiscoveryCustomMessage msg) { assert msg != null; List nodes = rtState.top.topologySnapshot(); @@ -2585,7 +2586,7 @@ else if (msg instanceof ZkCommunicationErrorResolveStartMessage) { evtsData.evtIdGen--; - DiscoverySpiCustomMessage ack = msg.ackMessage(); + DiscoveryCustomMessage ack = msg.ackMessage(); if (ack != null) { evtData = createAckEvent(ack, evtData); @@ -2746,7 +2747,7 @@ private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Excep if (evtData0.ackEvent() && evtData0.topologyVersion() < locNode.order()) break; - DiscoverySpiCustomMessage msg; + DiscoveryCustomMessage msg; if (rtState.crd) { assert evtData0.resolvedMsg != null : evtData0; @@ -3508,7 +3509,7 @@ public void simulateNodeFailure() { * @param evtData Event data. * @param msg Custom message. */ - private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData, final DiscoverySpiCustomMessage msg) { + private void notifyCustomEvent(final ZkDiscoveryCustomEventData evtData, final DiscoveryCustomMessage msg) { assert !(msg instanceof ZkInternalMessage) : msg; if (log.isDebugEnabled()) @@ -3714,7 +3715,7 @@ private void handleProcessedEvents(String ctx) throws Exception { } case ZkDiscoveryEventData.ZK_EVT_CUSTOM_EVT: { - DiscoverySpiCustomMessage ack = handleProcessedCustomEvent(ctx, + DiscoveryCustomMessage ack = handleProcessedCustomEvent(ctx, (ZkDiscoveryCustomEventData)evtData); if (ack != null) { @@ -3762,7 +3763,7 @@ private void handleProcessedEvents(String ctx) throws Exception { * @throws Exception If failed. */ private ZkDiscoveryCustomEventData createAckEvent( - DiscoverySpiCustomMessage ack, + DiscoveryCustomMessage ack, ZkDiscoveryCustomEventData origEvt) throws Exception { assert ack != null; @@ -3885,7 +3886,7 @@ private void deleteDataForJoinedAsync(ZkDiscoveryNodeJoinEventData evtData) { * @return Ack message. * @throws Exception If failed. */ - @Nullable private DiscoverySpiCustomMessage handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData) + @Nullable private DiscoveryCustomMessage handleProcessedCustomEvent(String ctx, ZkDiscoveryCustomEventData evtData) throws Exception { if (log.isDebugEnabled()) log.debug("All nodes processed custom event [ctx=" + ctx + ", evtData=" + evtData + ']'); diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestConfigurator.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestConfigurator.java index 8a2f4e6c84c24..259dd2f6c2a91 100644 --- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestConfigurator.java +++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestConfigurator.java @@ -22,9 +22,9 @@ import org.apache.curator.test.TestingCluster; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.spi.discovery.DiscoverySpi; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.IgniteDiscoverySpiInternalListenerSupport; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.config.GridTestProperties; @@ -96,7 +96,7 @@ private static class TestZookeeperDiscoverySpi extends ZookeeperDiscoverySpi imp private volatile IgniteDiscoverySpiInternalListener internalLsnr; /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) { + @Override public void sendCustomEvent(DiscoveryCustomMessage msg) { IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr; if (internalLsnr != null && !internalLsnr.beforeSendCustomEvent(this, log, msg))