From 9216291a66228bb902ba65e0084ba6ab8b93a633 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Wed, 3 Sep 2025 22:54:03 +0800 Subject: [PATCH] chore: Drop the deprecated Cluster Client --- .sbt-java-formatter.conf | 1 - .../receptionist/LocalReceptionistSpec.scala | 4 - .../protobuf/msg/ClusterClientMessages.java | 649 ---------- .../msg/DistributedPubSubMessages.java | 12 +- .../protoc4-upgrade.excludes | 1 - .../remove-deprecated-methods.excludes | 62 + .../main/protobuf/ClusterClientMessages.proto | 22 - .../protobuf/DistributedPubSubMessages.proto | 6 +- .../src/main/resources/reference.conf | 100 -- .../pekko/cluster/client/ClusterClient.scala | 1126 ----------------- .../ClusterClientMessageSerializer.scala | 96 -- .../client/ClusterClientHandoverSpec.scala | 127 -- .../cluster/client/ClusterClientSpec.scala | 383 ------ .../client/ClusterClientStopSpec.scala | 128 -- .../cluster/client/ClusterClientTest.java | 161 --- .../ClusterClientMessageSerializerSpec.scala | 48 - .../ClusterReceptionistSpec.scala | 9 - docs/src/main/paradox/cluster-client.md | 303 ----- docs/src/main/paradox/cluster-usage.md | 8 - docs/src/main/paradox/index-cluster.md | 1 - 20 files changed, 71 insertions(+), 3176 deletions(-) delete mode 100644 cluster-tools/src/main/java/org/apache/pekko/cluster/client/protobuf/msg/ClusterClientMessages.java delete mode 100644 cluster-tools/src/main/protobuf/ClusterClientMessages.proto delete mode 100644 cluster-tools/src/main/scala/org/apache/pekko/cluster/client/ClusterClient.scala delete mode 100644 cluster-tools/src/main/scala/org/apache/pekko/cluster/client/protobuf/ClusterClientMessageSerializer.scala delete mode 100644 cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientHandoverSpec.scala delete mode 100644 cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientSpec.scala delete mode 100644 cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala delete mode 100644 cluster-tools/src/test/java/org/apache/pekko/cluster/client/ClusterClientTest.java delete mode 100644 cluster-tools/src/test/scala/org/apache/pekko/cluster/client/protobuf/ClusterClientMessageSerializerSpec.scala delete mode 100644 docs/src/main/paradox/cluster-client.md diff --git a/.sbt-java-formatter.conf b/.sbt-java-formatter.conf index c194cf733a4..4272a862000 100644 --- a/.sbt-java-formatter.conf +++ b/.sbt-java-formatter.conf @@ -25,7 +25,6 @@ ignored-files = [ //pekko-cluster-sharding "ClusterShardingMessages.java", //pekko-cluster-tools - "ClusterClientMessages.java", "DistributedPubSubMessages.java", //pekko-cluster-typed "ClusterMessages.java", diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/internal/receptionist/LocalReceptionistSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/internal/receptionist/LocalReceptionistSpec.scala index d463f23c44c..96db2483184 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/internal/receptionist/LocalReceptionistSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/internal/receptionist/LocalReceptionistSpec.scala @@ -198,10 +198,6 @@ class LocalReceptionistSpec extends ScalaTestWithActorTestKit with AnyWordSpecLi listing.isForKey(ServiceKeyA) should ===(true) listing.serviceInstances(ServiceKeyA) should be(Set()) } - - "not conflict with the ClusterClient receptionist default name" in { - system.systemActorOf(Behaviors.ignore, "receptionist") - } } } diff --git a/cluster-tools/src/main/java/org/apache/pekko/cluster/client/protobuf/msg/ClusterClientMessages.java b/cluster-tools/src/main/java/org/apache/pekko/cluster/client/protobuf/msg/ClusterClientMessages.java deleted file mode 100644 index a04f7a3aa58..00000000000 --- a/cluster-tools/src/main/java/org/apache/pekko/cluster/client/protobuf/msg/ClusterClientMessages.java +++ /dev/null @@ -1,649 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2019-2022 Lightbend Inc. - */ - -// Generated by the protocol buffer compiler. DO NOT EDIT! -// NO CHECKED-IN PROTOBUF GENCODE -// source: ClusterClientMessages.proto -// Protobuf Java Version: 4.32.0 - -package org.apache.pekko.cluster.client.protobuf.msg; - -@org.apache.pekko.protobufv3.internal.Generated -public final class ClusterClientMessages { - private ClusterClientMessages() {} - static { - org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion( - org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC, - /* major= */ 4, - /* minor= */ 32, - /* patch= */ 0, - /* suffix= */ "", - ClusterClientMessages.class.getName()); - } - public static void registerAllExtensions( - org.apache.pekko.protobufv3.internal.ExtensionRegistryLite registry) { - } - - public static void registerAllExtensions( - org.apache.pekko.protobufv3.internal.ExtensionRegistry registry) { - registerAllExtensions( - (org.apache.pekko.protobufv3.internal.ExtensionRegistryLite) registry); - } - public interface ContactsOrBuilder extends - // @@protoc_insertion_point(interface_extends:Contacts) - org.apache.pekko.protobufv3.internal.MessageOrBuilder { - - /** - * repeated string contactPoints = 1; - * @return A list containing the contactPoints. - */ - java.util.List - getContactPointsList(); - /** - * repeated string contactPoints = 1; - * @return The count of contactPoints. - */ - int getContactPointsCount(); - /** - * repeated string contactPoints = 1; - * @param index The index of the element to return. - * @return The contactPoints at the given index. - */ - java.lang.String getContactPoints(int index); - /** - * repeated string contactPoints = 1; - * @param index The index of the value to return. - * @return The bytes of the contactPoints at the given index. - */ - org.apache.pekko.protobufv3.internal.ByteString - getContactPointsBytes(int index); - } - /** - * Protobuf type {@code Contacts} - */ - public static final class Contacts extends - org.apache.pekko.protobufv3.internal.GeneratedMessage implements - // @@protoc_insertion_point(message_implements:Contacts) - ContactsOrBuilder { - private static final long serialVersionUID = 0L; - static { - org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion( - org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC, - /* major= */ 4, - /* minor= */ 32, - /* patch= */ 0, - /* suffix= */ "", - Contacts.class.getName()); - } - // Use Contacts.newBuilder() to construct. - private Contacts(org.apache.pekko.protobufv3.internal.GeneratedMessage.Builder builder) { - super(builder); - } - private Contacts() { - contactPoints_ = - org.apache.pekko.protobufv3.internal.LazyStringArrayList.emptyList(); - } - - public static final org.apache.pekko.protobufv3.internal.Descriptors.Descriptor - getDescriptor() { - return org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_descriptor; - } - - @java.lang.Override - protected org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_fieldAccessorTable - .ensureFieldAccessorsInitialized( - org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.class, org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.Builder.class); - } - - public static final int CONTACTPOINTS_FIELD_NUMBER = 1; - @SuppressWarnings("serial") - private org.apache.pekko.protobufv3.internal.LazyStringArrayList contactPoints_ = - org.apache.pekko.protobufv3.internal.LazyStringArrayList.emptyList(); - /** - * repeated string contactPoints = 1; - * @return A list containing the contactPoints. - */ - public org.apache.pekko.protobufv3.internal.ProtocolStringList - getContactPointsList() { - return contactPoints_; - } - /** - * repeated string contactPoints = 1; - * @return The count of contactPoints. - */ - public int getContactPointsCount() { - return contactPoints_.size(); - } - /** - * repeated string contactPoints = 1; - * @param index The index of the element to return. - * @return The contactPoints at the given index. - */ - public java.lang.String getContactPoints(int index) { - return contactPoints_.get(index); - } - /** - * repeated string contactPoints = 1; - * @param index The index of the value to return. - * @return The bytes of the contactPoints at the given index. - */ - public org.apache.pekko.protobufv3.internal.ByteString - getContactPointsBytes(int index) { - return contactPoints_.getByteString(index); - } - - private byte memoizedIsInitialized = -1; - @java.lang.Override - public final boolean isInitialized() { - byte isInitialized = memoizedIsInitialized; - if (isInitialized == 1) return true; - if (isInitialized == 0) return false; - - memoizedIsInitialized = 1; - return true; - } - - @java.lang.Override - public void writeTo(org.apache.pekko.protobufv3.internal.CodedOutputStream output) - throws java.io.IOException { - for (int i = 0; i < contactPoints_.size(); i++) { - org.apache.pekko.protobufv3.internal.GeneratedMessage.writeString(output, 1, contactPoints_.getRaw(i)); - } - getUnknownFields().writeTo(output); - } - - @java.lang.Override - public int getSerializedSize() { - int size = memoizedSize; - if (size != -1) return size; - - size = 0; - { - int dataSize = 0; - for (int i = 0; i < contactPoints_.size(); i++) { - dataSize += computeStringSizeNoTag(contactPoints_.getRaw(i)); - } - size += dataSize; - size += 1 * getContactPointsList().size(); - } - size += getUnknownFields().getSerializedSize(); - memoizedSize = size; - return size; - } - - @java.lang.Override - public boolean equals(final java.lang.Object obj) { - if (obj == this) { - return true; - } - if (!(obj instanceof org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts)) { - return super.equals(obj); - } - org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts other = (org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts) obj; - - if (!getContactPointsList() - .equals(other.getContactPointsList())) return false; - if (!getUnknownFields().equals(other.getUnknownFields())) return false; - return true; - } - - @java.lang.Override - public int hashCode() { - if (memoizedHashCode != 0) { - return memoizedHashCode; - } - int hash = 41; - hash = (19 * hash) + getDescriptor().hashCode(); - if (getContactPointsCount() > 0) { - hash = (37 * hash) + CONTACTPOINTS_FIELD_NUMBER; - hash = (53 * hash) + getContactPointsList().hashCode(); - } - hash = (29 * hash) + getUnknownFields().hashCode(); - memoizedHashCode = hash; - return hash; - } - - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( - java.nio.ByteBuffer data) - throws org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException { - return PARSER.parseFrom(data); - } - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( - java.nio.ByteBuffer data, - org.apache.pekko.protobufv3.internal.ExtensionRegistryLite extensionRegistry) - throws org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException { - return PARSER.parseFrom(data, extensionRegistry); - } - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( - org.apache.pekko.protobufv3.internal.ByteString data) - throws org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException { - return PARSER.parseFrom(data); - } - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( - org.apache.pekko.protobufv3.internal.ByteString data, - org.apache.pekko.protobufv3.internal.ExtensionRegistryLite extensionRegistry) - throws org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException { - return PARSER.parseFrom(data, extensionRegistry); - } - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(byte[] data) - throws org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException { - return PARSER.parseFrom(data); - } - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( - byte[] data, - org.apache.pekko.protobufv3.internal.ExtensionRegistryLite extensionRegistry) - throws org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException { - return PARSER.parseFrom(data, extensionRegistry); - } - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom(java.io.InputStream input) - throws java.io.IOException { - return org.apache.pekko.protobufv3.internal.GeneratedMessage - .parseWithIOException(PARSER, input); - } - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( - java.io.InputStream input, - org.apache.pekko.protobufv3.internal.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return org.apache.pekko.protobufv3.internal.GeneratedMessage - .parseWithIOException(PARSER, input, extensionRegistry); - } - - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseDelimitedFrom(java.io.InputStream input) - throws java.io.IOException { - return org.apache.pekko.protobufv3.internal.GeneratedMessage - .parseDelimitedWithIOException(PARSER, input); - } - - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseDelimitedFrom( - java.io.InputStream input, - org.apache.pekko.protobufv3.internal.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return org.apache.pekko.protobufv3.internal.GeneratedMessage - .parseDelimitedWithIOException(PARSER, input, extensionRegistry); - } - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( - org.apache.pekko.protobufv3.internal.CodedInputStream input) - throws java.io.IOException { - return org.apache.pekko.protobufv3.internal.GeneratedMessage - .parseWithIOException(PARSER, input); - } - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts parseFrom( - org.apache.pekko.protobufv3.internal.CodedInputStream input, - org.apache.pekko.protobufv3.internal.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return org.apache.pekko.protobufv3.internal.GeneratedMessage - .parseWithIOException(PARSER, input, extensionRegistry); - } - - @java.lang.Override - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder() { - return DEFAULT_INSTANCE.toBuilder(); - } - public static Builder newBuilder(org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts prototype) { - return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); - } - @java.lang.Override - public Builder toBuilder() { - return this == DEFAULT_INSTANCE - ? new Builder() : new Builder().mergeFrom(this); - } - - @java.lang.Override - protected Builder newBuilderForType( - org.apache.pekko.protobufv3.internal.GeneratedMessage.BuilderParent parent) { - Builder builder = new Builder(parent); - return builder; - } - /** - * Protobuf type {@code Contacts} - */ - public static final class Builder extends - org.apache.pekko.protobufv3.internal.GeneratedMessage.Builder implements - // @@protoc_insertion_point(builder_implements:Contacts) - org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.ContactsOrBuilder { - public static final org.apache.pekko.protobufv3.internal.Descriptors.Descriptor - getDescriptor() { - return org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_descriptor; - } - - @java.lang.Override - protected org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_fieldAccessorTable - .ensureFieldAccessorsInitialized( - org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.class, org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.Builder.class); - } - - // Construct using org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.newBuilder() - private Builder() { - - } - - private Builder( - org.apache.pekko.protobufv3.internal.GeneratedMessage.BuilderParent parent) { - super(parent); - - } - @java.lang.Override - public Builder clear() { - super.clear(); - bitField0_ = 0; - contactPoints_ = - org.apache.pekko.protobufv3.internal.LazyStringArrayList.emptyList(); - return this; - } - - @java.lang.Override - public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor - getDescriptorForType() { - return org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.internal_static_Contacts_descriptor; - } - - @java.lang.Override - public org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts getDefaultInstanceForType() { - return org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.getDefaultInstance(); - } - - @java.lang.Override - public org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts build() { - org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException(result); - } - return result; - } - - @java.lang.Override - public org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts buildPartial() { - org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts result = new org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts(this); - if (bitField0_ != 0) { buildPartial0(result); } - onBuilt(); - return result; - } - - private void buildPartial0(org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts result) { - int from_bitField0_ = bitField0_; - if (((from_bitField0_ & 0x00000001) != 0)) { - contactPoints_.makeImmutable(); - result.contactPoints_ = contactPoints_; - } - } - - @java.lang.Override - public Builder mergeFrom(org.apache.pekko.protobufv3.internal.Message other) { - if (other instanceof org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts) { - return mergeFrom((org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts)other); - } else { - super.mergeFrom(other); - return this; - } - } - - public Builder mergeFrom(org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts other) { - if (other == org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts.getDefaultInstance()) return this; - if (!other.contactPoints_.isEmpty()) { - if (contactPoints_.isEmpty()) { - contactPoints_ = other.contactPoints_; - bitField0_ |= 0x00000001; - } else { - ensureContactPointsIsMutable(); - contactPoints_.addAll(other.contactPoints_); - } - onChanged(); - } - this.mergeUnknownFields(other.getUnknownFields()); - onChanged(); - return this; - } - - @java.lang.Override - public final boolean isInitialized() { - return true; - } - - @java.lang.Override - public Builder mergeFrom( - org.apache.pekko.protobufv3.internal.CodedInputStream input, - org.apache.pekko.protobufv3.internal.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - if (extensionRegistry == null) { - throw new java.lang.NullPointerException(); - } - try { - boolean done = false; - while (!done) { - int tag = input.readTag(); - switch (tag) { - case 0: - done = true; - break; - case 10: { - org.apache.pekko.protobufv3.internal.ByteString bs = input.readBytes(); - ensureContactPointsIsMutable(); - contactPoints_.add(bs); - break; - } // case 10 - default: { - if (!super.parseUnknownField(input, extensionRegistry, tag)) { - done = true; // was an endgroup tag - } - break; - } // default: - } // switch (tag) - } // while (!done) - } catch (org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException e) { - throw e.unwrapIOException(); - } finally { - onChanged(); - } // finally - return this; - } - private int bitField0_; - - private org.apache.pekko.protobufv3.internal.LazyStringArrayList contactPoints_ = - org.apache.pekko.protobufv3.internal.LazyStringArrayList.emptyList(); - private void ensureContactPointsIsMutable() { - if (!contactPoints_.isModifiable()) { - contactPoints_ = new org.apache.pekko.protobufv3.internal.LazyStringArrayList(contactPoints_); - } - bitField0_ |= 0x00000001; - } - /** - * repeated string contactPoints = 1; - * @return A list containing the contactPoints. - */ - public org.apache.pekko.protobufv3.internal.ProtocolStringList - getContactPointsList() { - contactPoints_.makeImmutable(); - return contactPoints_; - } - /** - * repeated string contactPoints = 1; - * @return The count of contactPoints. - */ - public int getContactPointsCount() { - return contactPoints_.size(); - } - /** - * repeated string contactPoints = 1; - * @param index The index of the element to return. - * @return The contactPoints at the given index. - */ - public java.lang.String getContactPoints(int index) { - return contactPoints_.get(index); - } - /** - * repeated string contactPoints = 1; - * @param index The index of the value to return. - * @return The bytes of the contactPoints at the given index. - */ - public org.apache.pekko.protobufv3.internal.ByteString - getContactPointsBytes(int index) { - return contactPoints_.getByteString(index); - } - /** - * repeated string contactPoints = 1; - * @param index The index to set the value at. - * @param value The contactPoints to set. - * @return This builder for chaining. - */ - public Builder setContactPoints( - int index, java.lang.String value) { - if (value == null) { throw new NullPointerException(); } - ensureContactPointsIsMutable(); - contactPoints_.set(index, value); - bitField0_ |= 0x00000001; - onChanged(); - return this; - } - /** - * repeated string contactPoints = 1; - * @param value The contactPoints to add. - * @return This builder for chaining. - */ - public Builder addContactPoints( - java.lang.String value) { - if (value == null) { throw new NullPointerException(); } - ensureContactPointsIsMutable(); - contactPoints_.add(value); - bitField0_ |= 0x00000001; - onChanged(); - return this; - } - /** - * repeated string contactPoints = 1; - * @param values The contactPoints to add. - * @return This builder for chaining. - */ - public Builder addAllContactPoints( - java.lang.Iterable values) { - ensureContactPointsIsMutable(); - org.apache.pekko.protobufv3.internal.AbstractMessageLite.Builder.addAll( - values, contactPoints_); - bitField0_ |= 0x00000001; - onChanged(); - return this; - } - /** - * repeated string contactPoints = 1; - * @return This builder for chaining. - */ - public Builder clearContactPoints() { - contactPoints_ = - org.apache.pekko.protobufv3.internal.LazyStringArrayList.emptyList(); - bitField0_ = (bitField0_ & ~0x00000001);; - onChanged(); - return this; - } - /** - * repeated string contactPoints = 1; - * @param value The bytes of the contactPoints to add. - * @return This builder for chaining. - */ - public Builder addContactPointsBytes( - org.apache.pekko.protobufv3.internal.ByteString value) { - if (value == null) { throw new NullPointerException(); } - ensureContactPointsIsMutable(); - contactPoints_.add(value); - bitField0_ |= 0x00000001; - onChanged(); - return this; - } - - // @@protoc_insertion_point(builder_scope:Contacts) - } - - // @@protoc_insertion_point(class_scope:Contacts) - private static final org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts DEFAULT_INSTANCE; - static { - DEFAULT_INSTANCE = new org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts(); - } - - public static org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts getDefaultInstance() { - return DEFAULT_INSTANCE; - } - - private static final org.apache.pekko.protobufv3.internal.Parser - PARSER = new org.apache.pekko.protobufv3.internal.AbstractParser() { - @java.lang.Override - public Contacts parsePartialFrom( - org.apache.pekko.protobufv3.internal.CodedInputStream input, - org.apache.pekko.protobufv3.internal.ExtensionRegistryLite extensionRegistry) - throws org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException { - Builder builder = newBuilder(); - try { - builder.mergeFrom(input, extensionRegistry); - } catch (org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException e) { - throw e.setUnfinishedMessage(builder.buildPartial()); - } catch (org.apache.pekko.protobufv3.internal.UninitializedMessageException e) { - throw e.asInvalidProtocolBufferException().setUnfinishedMessage(builder.buildPartial()); - } catch (java.io.IOException e) { - throw new org.apache.pekko.protobufv3.internal.InvalidProtocolBufferException(e) - .setUnfinishedMessage(builder.buildPartial()); - } - return builder.buildPartial(); - } - }; - - public static org.apache.pekko.protobufv3.internal.Parser parser() { - return PARSER; - } - - @java.lang.Override - public org.apache.pekko.protobufv3.internal.Parser getParserForType() { - return PARSER; - } - - @java.lang.Override - public org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages.Contacts getDefaultInstanceForType() { - return DEFAULT_INSTANCE; - } - - } - - private static final org.apache.pekko.protobufv3.internal.Descriptors.Descriptor - internal_static_Contacts_descriptor; - private static final - org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable - internal_static_Contacts_fieldAccessorTable; - - public static org.apache.pekko.protobufv3.internal.Descriptors.FileDescriptor - getDescriptor() { - return descriptor; - } - private static org.apache.pekko.protobufv3.internal.Descriptors.FileDescriptor - descriptor; - static { - java.lang.String[] descriptorData = { - "\n\033ClusterClientMessages.proto\"!\n\010Contact" + - "s\022\025\n\rcontactPoints\030\001 \003(\tB0\n,org.apache.p" + - "ekko.cluster.client.protobuf.msgH\001" - }; - descriptor = org.apache.pekko.protobufv3.internal.Descriptors.FileDescriptor - .internalBuildGeneratedFileFrom(descriptorData, - new org.apache.pekko.protobufv3.internal.Descriptors.FileDescriptor[] { - }); - internal_static_Contacts_descriptor = - getDescriptor().getMessageTypes().get(0); - internal_static_Contacts_fieldAccessorTable = new - org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable( - internal_static_Contacts_descriptor, - new java.lang.String[] { "ContactPoints", }); - descriptor.resolveAllFeaturesImmutable(); - } - - // @@protoc_insertion_point(outer_class_scope) -} diff --git a/cluster-tools/src/main/java/org/apache/pekko/cluster/pubsub/protobuf/msg/DistributedPubSubMessages.java b/cluster-tools/src/main/java/org/apache/pekko/cluster/pubsub/protobuf/msg/DistributedPubSubMessages.java index 39acf3fbf18..84e7e65b617 100644 --- a/cluster-tools/src/main/java/org/apache/pekko/cluster/pubsub/protobuf/msg/DistributedPubSubMessages.java +++ b/cluster-tools/src/main/java/org/apache/pekko/cluster/pubsub/protobuf/msg/DistributedPubSubMessages.java @@ -5325,7 +5325,7 @@ public interface SendOrBuilder extends /** *
    * *
-   * Send normally local, but it is also used by the ClusterClient.
+   * Send normally local
    * 
* * Protobuf type {@code Send} @@ -5673,7 +5673,7 @@ protected Builder newBuilderForType( /** *
      * *
-     * Send normally local, but it is also used by the ClusterClient.
+     * Send normally local
      * 
* * Protobuf type {@code Send} @@ -6210,7 +6210,7 @@ public interface SendToAllOrBuilder extends /** *
    * *
-   * SendToAll normally local, but it is also used by the ClusterClient.
+   * SendToAll normally local
    * 
* * Protobuf type {@code SendToAll} @@ -6558,7 +6558,7 @@ protected Builder newBuilderForType( /** *
      * *
-     * SendToAll normally local, but it is also used by the ClusterClient.
+     * SendToAll normally local
      * 
* * Protobuf type {@code SendToAll} @@ -7084,7 +7084,7 @@ public interface PublishOrBuilder extends /** *
    * *
-   * Publish normally local, but it is also used by the ClusterClient.
+   * Publish normally local
    * 
* * Protobuf type {@code Publish} @@ -7392,7 +7392,7 @@ protected Builder newBuilderForType( /** *
      * *
-     * Publish normally local, but it is also used by the ClusterClient.
+     * Publish normally local
      * 
* * Protobuf type {@code Publish} diff --git a/cluster-tools/src/main/mima-filters/1.1.x.backwards.excludes/protoc4-upgrade.excludes b/cluster-tools/src/main/mima-filters/1.1.x.backwards.excludes/protoc4-upgrade.excludes index 134f8f3e11b..c430c6261e7 100644 --- a/cluster-tools/src/main/mima-filters/1.1.x.backwards.excludes/protoc4-upgrade.excludes +++ b/cluster-tools/src/main/mima-filters/1.1.x.backwards.excludes/protoc4-upgrade.excludes @@ -16,5 +16,4 @@ # under the License. # Issues caused by upgrading protoc to 4.29.3 -ProblemFilters.exclude[Problem]("org.apache.pekko.cluster.client.protobuf.msg.*") ProblemFilters.exclude[Problem]("org.apache.pekko.cluster.pubsub.protobuf.msg.*") diff --git a/cluster-tools/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes b/cluster-tools/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes index 08061c51dca..c260752a79a 100644 --- a/cluster-tools/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes +++ b/cluster-tools/src/main/mima-filters/2.0.x.backwards.excludes/remove-deprecated-methods.excludes @@ -18,3 +18,65 @@ # Remove deprecated methods ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.cluster.singleton.ClusterSingletonManager.setTimer*") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.cluster.pubsub.DistributedPubSubSettings.this") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient$Internal$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient$Internal$HeartbeatTick$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient$Internal$ReconnectTimeout$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient$Internal$RefreshContactsTick$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient$Publish") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient$Publish$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient$Send") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient$Send$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient$SendToAll") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClient$SendToAll$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClientInteraction") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClientMessage") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClientReceptionist") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClientReceptionist$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClientSettings") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClientSettings$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClientUnreachable") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClientUnreachable$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClientUp") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClientUp$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClients") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterClients$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist$Internal$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist$Internal$CheckDeadlines$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist$Internal$ClientResponseTunnel") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist$Internal$Contacts") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist$Internal$Contacts$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist$Internal$GetContacts$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist$Internal$Heartbeat$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist$Internal$HeartbeatRsp$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist$Internal$Ping$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionist$Internal$ReceptionistShutdown$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionistSettings") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ClusterReceptionistSettings$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ContactPointAdded") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ContactPointAdded$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ContactPointChange") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ContactPointRemoved") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ContactPointRemoved$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ContactPoints") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.ContactPoints$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.GetClusterClients") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.GetClusterClients$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.GetContactPoints") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.GetContactPoints$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.SubscribeClusterClients") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.SubscribeClusterClients$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.SubscribeContactPoints") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.SubscribeContactPoints$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.UnsubscribeClusterClients") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.UnsubscribeClusterClients$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.UnsubscribeContactPoints") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.UnsubscribeContactPoints$") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.protobuf.ClusterClientMessageSerializer") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages$Contacts") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages$Contacts$Builder") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.cluster.client.protobuf.msg.ClusterClientMessages$ContactsOrBuilder") diff --git a/cluster-tools/src/main/protobuf/ClusterClientMessages.proto b/cluster-tools/src/main/protobuf/ClusterClientMessages.proto deleted file mode 100644 index 995b318d414..00000000000 --- a/cluster-tools/src/main/protobuf/ClusterClientMessages.proto +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2009-2022 Lightbend Inc. - */ - -syntax = "proto2"; - -option java_package = "org.apache.pekko.cluster.client.protobuf.msg"; -option optimize_for = SPEED; - -message Contacts { - repeated string contactPoints = 1; -} - diff --git a/cluster-tools/src/main/protobuf/DistributedPubSubMessages.proto b/cluster-tools/src/main/protobuf/DistributedPubSubMessages.proto index 8cb6567819f..55519acbe59 100644 --- a/cluster-tools/src/main/protobuf/DistributedPubSubMessages.proto +++ b/cluster-tools/src/main/protobuf/DistributedPubSubMessages.proto @@ -50,7 +50,7 @@ message Address { } /** - * Send normally local, but it is also used by the ClusterClient. + * Send normally local */ message Send { required string path = 1; @@ -59,7 +59,7 @@ message Send { } /** - * SendToAll normally local, but it is also used by the ClusterClient. + * SendToAll normally local */ message SendToAll { required string path = 1; @@ -68,7 +68,7 @@ message SendToAll { } /** - * Publish normally local, but it is also used by the ClusterClient. + * Publish normally local */ message Publish { required string topic = 1; diff --git a/cluster-tools/src/main/resources/reference.conf b/cluster-tools/src/main/resources/reference.conf index cc18387346b..ec5c8431683 100644 --- a/cluster-tools/src/main/resources/reference.conf +++ b/cluster-tools/src/main/resources/reference.conf @@ -54,106 +54,6 @@ pekko.actor { } } - -# //#receptionist-ext-config -# Settings for the ClusterClientReceptionist extension -pekko.cluster.client.receptionist { - # Actor name of the ClusterReceptionist actor, /system/receptionist - name = receptionist - - # Start the receptionist on members tagged with this role. - # All members are used if undefined or empty. - role = "" - - # The receptionist will send this number of contact points to the client - number-of-contacts = 3 - - # The actor that tunnel response messages to the client will be stopped - # after this time of inactivity. - response-tunnel-receive-timeout = 30s - - # The id of the dispatcher to use for ClusterReceptionist actors. - # If specified you need to define the settings of the actual dispatcher. - use-dispatcher = "pekko.actor.internal-dispatcher" - - # How often failure detection heartbeat messages should be received for - # each ClusterClient - heartbeat-interval = 2s - - # Number of potentially lost/delayed heartbeats that will be - # accepted before considering it to be an anomaly. - # The ClusterReceptionist is using the org.apache.pekko.remote.DeadlineFailureDetector, which - # will trigger if there are no heartbeats within the duration - # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with - # the default settings. - acceptable-heartbeat-pause = 13s - - # Failure detection checking interval for checking all ClusterClients - failure-detection-interval = 2s -} -# //#receptionist-ext-config - -# //#cluster-client-config -# Settings for the ClusterClient -pekko.cluster.client { - # Actor paths of the ClusterReceptionist actors on the servers (cluster nodes) - # that the client will try to contact initially. It is mandatory to specify - # at least one initial contact. - # Comma separated full actor paths defined by a string on the form of - # "pekko://system@hostname:port/system/receptionist" - initial-contacts = [] - - # Interval at which the client retries to establish contact with one of - # ClusterReceptionist on the servers (cluster nodes) - establishing-get-contacts-interval = 3s - - # Interval at which the client will ask the ClusterReceptionist for - # new contact points to be used for next reconnect. - refresh-contacts-interval = 60s - - # How often failure detection heartbeat messages should be sent - heartbeat-interval = 2s - - # Number of potentially lost/delayed heartbeats that will be - # accepted before considering it to be an anomaly. - # The ClusterClient is using the org.apache.pekko.remote.DeadlineFailureDetector, which - # will trigger if there are no heartbeats within the duration - # heartbeat-interval + acceptable-heartbeat-pause, i.e. 15 seconds with - # the default settings. - acceptable-heartbeat-pause = 13s - - # If connection to the receptionist is not established the client will buffer - # this number of messages and deliver them the connection is established. - # When the buffer is full old messages will be dropped when new messages are sent - # via the client. Use 0 to disable buffering, i.e. messages will be dropped - # immediately if the location of the singleton is unknown. - # Maximum allowed buffer size is 10000. - buffer-size = 1000 - - # If connection to the receptionist is lost and the client has not been - # able to acquire a new connection for this long the client will stop itself. - # This duration makes it possible to watch the cluster client and react on a more permanent - # loss of connection with the cluster, for example by accessing some kind of - # service registry for an updated set of initial contacts to start a new cluster client with. - # If this is not wanted it can be set to "off" to disable the timeout and retry - # forever. - reconnect-timeout = off -} -# //#cluster-client-config - -# Protobuf serializer for ClusterClient messages -pekko.actor { - serializers { - pekko-cluster-client = "org.apache.pekko.cluster.client.protobuf.ClusterClientMessageSerializer" - } - serialization-bindings { - "org.apache.pekko.cluster.client.ClusterClientMessage" = pekko-cluster-client - } - serialization-identifiers { - "org.apache.pekko.cluster.client.protobuf.ClusterClientMessageSerializer" = 15 - } -} - # //#singleton-config pekko.cluster.singleton { # The actor name of the child singleton actor. diff --git a/cluster-tools/src/main/scala/org/apache/pekko/cluster/client/ClusterClient.scala b/cluster-tools/src/main/scala/org/apache/pekko/cluster/client/ClusterClient.scala deleted file mode 100644 index 54881a9fa2c..00000000000 --- a/cluster-tools/src/main/scala/org/apache/pekko/cluster/client/ClusterClient.scala +++ /dev/null @@ -1,1126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2009-2022 Lightbend Inc. - */ - -package org.apache.pekko.cluster.client - -import java.net.URLEncoder - -import scala.collection.immutable -import scala.collection.immutable.{ HashMap, HashSet } -import scala.concurrent.duration._ - -import com.typesafe.config.Config - -import org.apache.pekko -import pekko.actor.Actor -import pekko.actor.ActorIdentity -import pekko.actor.ActorLogging -import pekko.actor.ActorPath -import pekko.actor.ActorRef -import pekko.actor.ActorSystem -import pekko.actor.Address -import pekko.actor.Cancellable -import pekko.actor.ClassicActorSystemProvider -import pekko.actor.DeadLetterSuppression -import pekko.actor.Deploy -import pekko.actor.ExtendedActorSystem -import pekko.actor.Extension -import pekko.actor.ExtensionId -import pekko.actor.ExtensionIdProvider -import pekko.actor.Identify -import pekko.actor.NoSerializationVerificationNeeded -import pekko.actor.Props -import pekko.actor.ReceiveTimeout -import pekko.actor.Terminated -import pekko.cluster.Cluster -import pekko.cluster.ClusterEvent._ -import pekko.cluster.Member -import pekko.cluster.MemberStatus -import pekko.cluster.pubsub._ -import pekko.remote.DeadlineFailureDetector -import pekko.routing.ConsistentHash -import pekko.routing.MurmurHash -import pekko.util.MessageBuffer -import pekko.util.ccompat._ -import pekko.util.ccompat.JavaConverters._ - -@ccompatUsedUntil213 -@deprecated( - "Use Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") -object ClusterClientSettings { - - /** - * Create settings from the default configuration - * `pekko.cluster.client`. - */ - def apply(system: ActorSystem): ClusterClientSettings = - apply(system.settings.config.getConfig("pekko.cluster.client")) - - /** - * Create settings from a configuration with the same layout as - * the default configuration `pekko.cluster.client`. - */ - def apply(config: Config): ClusterClientSettings = { - val initialContacts = config.getStringList("initial-contacts").asScala.map(ActorPath.fromString).toSet - new ClusterClientSettings( - initialContacts, - establishingGetContactsInterval = config.getDuration("establishing-get-contacts-interval", MILLISECONDS).millis, - refreshContactsInterval = config.getDuration("refresh-contacts-interval", MILLISECONDS).millis, - heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis, - acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis, - bufferSize = config.getInt("buffer-size"), - reconnectTimeout = config.getString("reconnect-timeout") match { - case "off" => None - case _ => Some(config.getDuration("reconnect-timeout", MILLISECONDS).millis) - }) - } - - /** - * Java API: Create settings from the default configuration - * `pekko.cluster.client`. - */ - def create(system: ActorSystem): ClusterClientSettings = apply(system) - - /** - * Java API: Create settings from a configuration with the same layout as - * the default configuration `pekko.cluster.client`. - */ - def create(config: Config): ClusterClientSettings = apply(config) - -} - -/** - * @param initialContacts Actor paths of the `ClusterReceptionist` actors on - * the servers (cluster nodes) that the client will try to contact initially. - * It is mandatory to specify at least one initial contact. The path of the - * default receptionist is - * "pekko://system@hostname:port/system/receptionist" - * @param establishingGetContactsInterval Interval at which the client retries - * to establish contact with one of ClusterReceptionist on the servers (cluster nodes) - * @param refreshContactsInterval Interval at which the client will ask the - * `ClusterReceptionist` for new contact points to be used for next reconnect. - * @param heartbeatInterval How often failure detection heartbeat messages for detection - * of failed connections should be sent. - * @param acceptableHeartbeatPause Number of potentially lost/delayed heartbeats that will - * be accepted before considering it to be an anomaly. The ClusterClient is using the - * [[pekko.remote.DeadlineFailureDetector]], which will trigger if there are no heartbeats - * within the duration `heartbeatInterval + acceptableHeartbeatPause`. - * @param bufferSize If connection to the receptionist is not established the client - * will buffer this number of messages and deliver them the connection is established. - * When the buffer is full old messages will be dropped when new messages are sent via the - * client. Use 0 to disable buffering, i.e. messages will be dropped immediately if the - * location of the receptionist is unavailable. - * @param reconnectTimeout If the connection to the receptionist is lost and cannot - * be re-established within this duration the cluster client will be stopped. This makes it possible - * to watch it from another actor and possibly acquire a new list of initialContacts from some - * external service registry - */ -@deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") -final class ClusterClientSettings( - val initialContacts: Set[ActorPath], - val establishingGetContactsInterval: FiniteDuration, - val refreshContactsInterval: FiniteDuration, - val heartbeatInterval: FiniteDuration, - val acceptableHeartbeatPause: FiniteDuration, - val bufferSize: Int, - val reconnectTimeout: Option[FiniteDuration]) - extends NoSerializationVerificationNeeded { - - require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000") - - /** - * For binary/source compatibility - */ - def this( - initialContacts: Set[ActorPath], - establishingGetContactsInterval: FiniteDuration, - refreshContactsInterval: FiniteDuration, - heartbeatInterval: FiniteDuration, - acceptableHeartbeatPause: FiniteDuration, - bufferSize: Int) = - this( - initialContacts, - establishingGetContactsInterval, - refreshContactsInterval, - heartbeatInterval, - acceptableHeartbeatPause, - bufferSize, - None) - - /** - * Scala API - */ - def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = { - require(initialContacts.nonEmpty, "initialContacts must be defined") - copy(initialContacts = initialContacts) - } - - /** - * Java API - */ - def withInitialContacts(initialContacts: java.util.Set[ActorPath]): ClusterClientSettings = - withInitialContacts(initialContacts.asScala.toSet) - - def withEstablishingGetContactsInterval(establishingGetContactsInterval: FiniteDuration): ClusterClientSettings = - copy(establishingGetContactsInterval = establishingGetContactsInterval) - - def withRefreshContactsInterval(refreshContactsInterval: FiniteDuration): ClusterClientSettings = - copy(refreshContactsInterval = refreshContactsInterval) - - def withHeartbeat( - heartbeatInterval: FiniteDuration, - acceptableHeartbeatPause: FiniteDuration): ClusterClientSettings = - copy(heartbeatInterval = heartbeatInterval, acceptableHeartbeatPause = acceptableHeartbeatPause) - - def withBufferSize(bufferSize: Int): ClusterClientSettings = - copy(bufferSize = bufferSize) - - def withReconnectTimeout(reconnectTimeout: Option[FiniteDuration]): ClusterClientSettings = - copy(reconnectTimeout = reconnectTimeout) - - private def copy( - initialContacts: Set[ActorPath] = initialContacts, - establishingGetContactsInterval: FiniteDuration = establishingGetContactsInterval, - refreshContactsInterval: FiniteDuration = refreshContactsInterval, - heartbeatInterval: FiniteDuration = heartbeatInterval, - acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause, - bufferSize: Int = bufferSize, - reconnectTimeout: Option[FiniteDuration] = reconnectTimeout): ClusterClientSettings = - new ClusterClientSettings( - initialContacts, - establishingGetContactsInterval, - refreshContactsInterval, - heartbeatInterval, - acceptableHeartbeatPause, - bufferSize, - reconnectTimeout) -} - -/** - * Declares a super type for all events emitted by the `ClusterClient` - * in relation to contact points being added or removed. - */ -sealed trait ContactPointChange { - val contactPoint: ActorPath -} - -/** - * Emitted to a subscriber when contact points have been - * received by the ClusterClient and a new one has been added. - */ -final case class ContactPointAdded(override val contactPoint: ActorPath) extends ContactPointChange - -/** - * Emitted to a subscriber when contact points have been - * received by the ClusterClient and a new one has been added. - */ -final case class ContactPointRemoved(override val contactPoint: ActorPath) extends ContactPointChange - -sealed abstract class SubscribeContactPoints - -/** - * Subscribe to a cluster client's contact point changes where - * it is guaranteed that a sender receives the initial state - * of contact points prior to any events in relation to them - * changing. - * The sender will automatically become unsubscribed when it - * terminates. - */ -case object SubscribeContactPoints extends SubscribeContactPoints { - - /** - * Java API: get the singleton instance - */ - def getInstance = this -} - -sealed abstract class UnsubscribeContactPoints - -/** - * Explicitly unsubscribe from contact point change events. - */ -case object UnsubscribeContactPoints extends UnsubscribeContactPoints { - - /** - * Java API: get the singleton instance - */ - def getInstance = this -} - -sealed abstract class GetContactPoints - -/** - * Get the contact points known to this client. A ``ContactPoints`` message - * will be replied. - */ -case object GetContactPoints extends GetContactPoints { - - /** - * Java API: get the singleton instance - */ - def getInstance = this -} - -/** - * The reply to ``GetContactPoints``. - * - * @param contactPoints The presently known list of contact points. - */ -final case class ContactPoints(contactPoints: Set[ActorPath]) { - - /** - * Java API - */ - def getContactPoints: java.util.Set[ActorPath] = - contactPoints.asJava -} - -@deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") -object ClusterClient { - - /** - * Scala API: Factory method for `ClusterClient` [[pekko.actor.Props]]. - */ - @deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") - def props(settings: ClusterClientSettings): Props = - Props(new ClusterClient(settings)).withDeploy(Deploy.local) - - @SerialVersionUID(1L) - @deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") - final case class Send(path: String, msg: Any, localAffinity: Boolean) { - - /** - * Convenience constructor with `localAffinity` false - */ - def this(path: String, msg: Any) = this(path, msg, localAffinity = false) - } - @SerialVersionUID(1L) - @deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") - final case class SendToAll(path: String, msg: Any) - - @SerialVersionUID(1L) - @deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") - final case class Publish(topic: String, msg: Any) - - /** - * INTERNAL API - */ - private[pekko] object Internal { - case object RefreshContactsTick - case object HeartbeatTick - case object ReconnectTimeout - } -} - -/** - * This actor is intended to be used on an external node that is not member - * of the cluster. It acts like a gateway for sending messages to actors - * somewhere in the cluster. From the initial contact points it will establish - * a connection to a [[ClusterReceptionist]] somewhere in the cluster. It will - * monitor the connection to the receptionist and establish a new connection if - * the link goes down. When looking for a new receptionist it uses fresh contact - * points retrieved from previous establishment, or periodically refreshed - * contacts, i.e. not necessarily the initial contact points. - * - * You can send messages via the `ClusterClient` to any actor in the cluster - * that is registered in the [[ClusterReceptionist]]. - * Messages are wrapped in [[ClusterClient.Send]], [[ClusterClient.SendToAll]] - * or [[ClusterClient.Publish]]. - * - * 1. [[ClusterClient.Send]] - - * The message will be delivered to one recipient with a matching path, if any such - * exists. If several entries match the path the message will be delivered - * to one random destination. The sender of the message can specify that local - * affinity is preferred, i.e. the message is sent to an actor in the same local actor - * system as the used receptionist actor, if any such exists, otherwise random to any other - * matching entry. - * - * 2. [[ClusterClient.SendToAll]] - - * The message will be delivered to all recipients with a matching path. - * - * 3. [[ClusterClient.Publish]] - - * The message will be delivered to all recipients Actors that have been registered as subscribers to - * to the named topic. - * - * Use the factory method [[ClusterClient#props]]) to create the - * [[pekko.actor.Props]] for the actor. - * - * If the receptionist is not currently available, the client will buffer the messages - * and then deliver them when the connection to the receptionist has been established. - * The size of the buffer is configurable and it can be disabled by using a buffer size - * of 0. When the buffer is full old messages will be dropped when new messages are sent - * via the client. - * - * Note that this is a best effort implementation: messages can always be lost due to the distributed - * nature of the actors involved. - */ -@deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") -final class ClusterClient(settings: ClusterClientSettings) extends Actor with ActorLogging { - - import ClusterClient._ - import ClusterClient.Internal._ - import ClusterReceptionist.Internal._ - import settings._ - - require(initialContacts.nonEmpty, "initialContacts must be defined") - - val failureDetector = new DeadlineFailureDetector(acceptableHeartbeatPause, heartbeatInterval) - - var contactPaths: HashSet[ActorPath] = - initialContacts.to(HashSet) - val initialContactsSel = - contactPaths.map(context.actorSelection) - var contacts = initialContactsSel - sendGetContacts() - - var contactPathsPublished = contactPaths - - var subscribers = Vector.empty[ActorRef] - - import context.dispatcher - val heartbeatTask = - context.system.scheduler.scheduleWithFixedDelay(heartbeatInterval, heartbeatInterval, self, HeartbeatTick) - var refreshContactsTask: Option[Cancellable] = None - scheduleRefreshContactsTick(establishingGetContactsInterval) - self ! RefreshContactsTick - - var buffer = MessageBuffer.empty - - def scheduleRefreshContactsTick(interval: FiniteDuration): Unit = { - refreshContactsTask.foreach { _.cancel() } - refreshContactsTask = Some( - context.system.scheduler.scheduleWithFixedDelay(interval, interval, self, RefreshContactsTick)) - } - - override def postStop(): Unit = { - super.postStop() - heartbeatTask.cancel() - refreshContactsTask.foreach { _.cancel() } - } - - def receive = establishing.orElse(contactPointMessages) - - def establishing: Actor.Receive = { - val connectTimerCancelable = settings.reconnectTimeout.map { timeout => - context.system.scheduler.scheduleOnce(timeout, self, ReconnectTimeout) - } - - { - case Contacts(contactPoints) => - if (contactPoints.nonEmpty) { - contactPaths = contactPoints.map(ActorPath.fromString).to(HashSet) - contacts = contactPaths.map(context.actorSelection) - contacts.foreach { _ ! Identify(Array.emptyByteArray) } - } - publishContactPoints() - case ActorIdentity(_, Some(receptionist)) => - log.info("Connected to [{}]", receptionist.path) - scheduleRefreshContactsTick(refreshContactsInterval) - sendBuffered(receptionist) - context.become(active(receptionist).orElse(contactPointMessages)) - connectTimerCancelable.foreach(_.cancel()) - failureDetector.heartbeat() - self ! HeartbeatTick // will register us as active client of the selected receptionist - case ActorIdentity(_, None) => // ok, use another instead - case HeartbeatTick => - failureDetector.heartbeat() - case RefreshContactsTick => sendGetContacts() - case Send(path, msg, localAffinity) => - buffer(DistributedPubSubMediator.Send(path, msg, localAffinity)) - case SendToAll(path, msg) => - buffer(DistributedPubSubMediator.SendToAll(path, msg)) - case Publish(topic, msg) => - buffer(DistributedPubSubMediator.Publish(topic, msg)) - case ReconnectTimeout => - log.warning( - "Receptionist reconnect not successful within {} stopping cluster client", - settings.reconnectTimeout) - context.stop(self) - case ReceptionistShutdown => // ok, haven't chosen a receptionist yet - } - } - - def active(receptionist: ActorRef): Actor.Receive = { - case Send(path, msg, localAffinity) => - receptionist.forward(DistributedPubSubMediator.Send(path, msg, localAffinity)) - case SendToAll(path, msg) => - receptionist.forward(DistributedPubSubMediator.SendToAll(path, msg)) - case Publish(topic, msg) => - receptionist.forward(DistributedPubSubMediator.Publish(topic, msg)) - case HeartbeatTick => - if (!failureDetector.isAvailable) { - log.info("Lost contact with [{}], reestablishing connection", receptionist) - reestablish() - } else - receptionist ! Heartbeat - case HeartbeatRsp => - failureDetector.heartbeat() - case RefreshContactsTick => - receptionist ! GetContacts - case Contacts(contactPoints) => - // refresh of contacts - if (contactPoints.nonEmpty) { - contactPaths = contactPoints.map(ActorPath.fromString).to(HashSet) - contacts = contactPaths.map(context.actorSelection) - } - publishContactPoints() - case _: ActorIdentity => // ok, from previous establish, already handled - case ReceptionistShutdown => - if (receptionist == sender()) { - log.info("Receptionist [{}] is shutting down, reestablishing connection", receptionist) - reestablish() - } - } - - def contactPointMessages: Actor.Receive = { - case SubscribeContactPoints => - val subscriber = sender() - subscriber ! ContactPoints(contactPaths) - subscribers :+= subscriber - context.watch(subscriber) - case UnsubscribeContactPoints => - val subscriber = sender() - subscribers = subscribers.filterNot(_ == subscriber) - case Terminated(subscriber) => - self.tell(UnsubscribeContactPoints, subscriber) - case GetContactPoints => - sender() ! ContactPoints(contactPaths) - } - - def sendGetContacts(): Unit = { - val sendTo = - if (contacts.isEmpty) initialContactsSel - else if (contacts.size == 1) initialContactsSel.union(contacts) - else contacts - if (log.isDebugEnabled) - log.debug(s"""Sending GetContacts to [${sendTo.mkString(",")}]""") - sendTo.foreach { _ ! GetContacts } - } - - def buffer(msg: Any): Unit = - if (settings.bufferSize == 0) - log.debug("Receptionist not available and buffering is disabled, dropping message [{}]", msg.getClass.getName) - else if (buffer.size == settings.bufferSize) { - val (m, _) = buffer.head() - buffer.dropHead() - log.debug("Receptionist not available, buffer is full, dropping first message [{}]", m.getClass.getName) - buffer.append(msg, sender()) - } else { - log.debug("Receptionist not available, buffering message type [{}]", msg.getClass.getName) - buffer.append(msg, sender()) - } - - def sendBuffered(receptionist: ActorRef): Unit = { - log.debug("Sending buffered messages to receptionist") - buffer.foreach((msg, snd) => receptionist.tell(msg, snd)) - buffer = MessageBuffer.empty - } - - def publishContactPoints(): Unit = { - for (cp <- contactPaths if !contactPathsPublished.contains(cp)) { - val contactPointAdded = ContactPointAdded(cp) - subscribers.foreach(_ ! contactPointAdded) - } - for (cp <- contactPathsPublished if !contactPaths.contains(cp)) { - val contactPointRemoved = ContactPointRemoved(cp) - subscribers.foreach(_ ! contactPointRemoved) - } - contactPathsPublished = contactPaths - } - - def reestablish(): Unit = { - sendGetContacts() - scheduleRefreshContactsTick(establishingGetContactsInterval) - context.become(establishing.orElse(contactPointMessages)) - failureDetector.heartbeat() - } -} - -@deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") -object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] with ExtensionIdProvider { - override def get(system: ActorSystem): ClusterClientReceptionist = super.get(system) - override def get(system: ClassicActorSystemProvider): ClusterClientReceptionist = super.get(system) - - override def lookup = ClusterClientReceptionist - - override def createExtension(system: ExtendedActorSystem): ClusterClientReceptionist = - new ClusterClientReceptionist(system) -} - -/** - * Extension that starts [[ClusterReceptionist]] and accompanying [[pekko.cluster.pubsub.DistributedPubSubMediator]] - * with settings defined in config section `pekko.cluster.client.receptionist`. - * The [[pekko.cluster.pubsub.DistributedPubSubMediator]] is started by the [[pekko.cluster.pubsub.DistributedPubSub]] extension. - */ -@deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") -final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension { - - private val config = system.settings.config.getConfig("pekko.cluster.client.receptionist") - private val role: Option[String] = config.getString("role") match { - case "" => None - case r => Some(r) - } - - /** - * Returns true if this member is not tagged with the role configured for the - * receptionist. - */ - def isTerminated: Boolean = Cluster(system).isTerminated || !role.forall(Cluster(system).selfRoles.contains) - - /** - * Register the actors that should be reachable for the clients in this [[DistributedPubSubMediator]]. - */ - private def pubSubMediator: ActorRef = DistributedPubSub(system).mediator - - /** - * Register an actor that should be reachable for the clients. - * The clients can send messages to this actor with `Send` or `SendToAll` using - * the path elements of the `ActorRef`, e.g. `"/user/myservice"`. - */ - def registerService(actor: ActorRef): Unit = - pubSubMediator ! DistributedPubSubMediator.Put(actor) - - /** - * A registered actor will be automatically unregistered when terminated, - * but it can also be explicitly unregistered before termination. - */ - def unregisterService(actor: ActorRef): Unit = - pubSubMediator ! DistributedPubSubMediator.Remove(actor.path.toStringWithoutAddress) - - /** - * Register an actor that should be reachable for the clients to a named topic. - * Several actors can be registered to the same topic name, and all will receive - * published messages. - * The client can publish messages to this topic with `Publish`. - */ - def registerSubscriber(topic: String, actor: ActorRef): Unit = - pubSubMediator ! DistributedPubSubMediator.Subscribe(topic, actor) - - /** - * A registered subscriber will be automatically unregistered when terminated, - * but it can also be explicitly unregistered before termination. - */ - def unregisterSubscriber(topic: String, actor: ActorRef): Unit = - pubSubMediator ! DistributedPubSubMediator.Unsubscribe(topic, actor) - - /** - * The [[ClusterReceptionist]] actor - */ - private val receptionist: ActorRef = { - if (isTerminated) - system.deadLetters - else { - val name = config.getString("name") - val dispatcher = config.getString("use-dispatcher") - // important to use val mediator here to activate it outside of ClusterReceptionist constructor - val mediator = pubSubMediator - system.systemActorOf( - ClusterReceptionist.props(mediator, ClusterReceptionistSettings(config)).withDispatcher(dispatcher), - name) - } - } - - /** - * Returns the underlying receptionist actor, particularly so that its - * events can be observed via subscribe/unsubscribe. - */ - def underlying: ActorRef = - receptionist -} - -@deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") -object ClusterReceptionistSettings { - - /** - * Create settings from the default configuration - * `pekko.cluster.client.receptionist`. - */ - def apply(system: ActorSystem): ClusterReceptionistSettings = - apply(system.settings.config.getConfig("pekko.cluster.client.receptionist")) - - /** - * Create settings from a configuration with the same layout as - * the default configuration `pekko.cluster.client.receptionist`. - */ - def apply(config: Config): ClusterReceptionistSettings = - new ClusterReceptionistSettings( - role = roleOption(config.getString("role")), - numberOfContacts = config.getInt("number-of-contacts"), - responseTunnelReceiveTimeout = config.getDuration("response-tunnel-receive-timeout", MILLISECONDS).millis, - heartbeatInterval = config.getDuration("heartbeat-interval", MILLISECONDS).millis, - acceptableHeartbeatPause = config.getDuration("acceptable-heartbeat-pause", MILLISECONDS).millis, - failureDetectionInterval = config.getDuration("failure-detection-interval", MILLISECONDS).millis) - - /** - * Java API: Create settings from the default configuration - * `pekko.cluster.client.receptionist`. - */ - def create(system: ActorSystem): ClusterReceptionistSettings = apply(system) - - /** - * Java API: Create settings from a configuration with the same layout as - * the default configuration `pekko.cluster.client.receptionist`. - */ - def create(config: Config): ClusterReceptionistSettings = apply(config) - - /** - * INTERNAL API - */ - private[pekko] def roleOption(role: String): Option[String] = - if (role == "") None else Option(role) - -} - -/** - * @param role Start the receptionist on members tagged with this role. - * All members are used if undefined. - * @param numberOfContacts The receptionist will send this number of contact points to the client - * @param responseTunnelReceiveTimeout The actor that tunnel response messages to the - * client will be stopped after this time of inactivity. - */ -@deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") -final class ClusterReceptionistSettings( - val role: Option[String], - val numberOfContacts: Int, - val responseTunnelReceiveTimeout: FiniteDuration) - extends NoSerializationVerificationNeeded { - - def withRole(role: String): ClusterReceptionistSettings = copy(role = ClusterReceptionistSettings.roleOption(role)) - - def withRole(role: Option[String]): ClusterReceptionistSettings = copy(role = role) - - def withNumberOfContacts(numberOfContacts: Int): ClusterReceptionistSettings = - copy(numberOfContacts = numberOfContacts) - - def withResponseTunnelReceiveTimeout(responseTunnelReceiveTimeout: FiniteDuration): ClusterReceptionistSettings = - copy(responseTunnelReceiveTimeout = responseTunnelReceiveTimeout) - - def withHeartbeat( - heartbeatInterval: FiniteDuration, - acceptableHeartbeatPause: FiniteDuration, - failureDetectionInterval: FiniteDuration): ClusterReceptionistSettings = - copy( - heartbeatInterval = heartbeatInterval, - acceptableHeartbeatPause = acceptableHeartbeatPause, - failureDetectionInterval = failureDetectionInterval) - - // BEGIN BINARY COMPATIBILITY - // The following is required in order to maintain binary - // compatibility with 2.4. Post 2.4, the following 3 properties should - // be moved to the class's constructor, and the following section of code - // should be removed entirely. - // TODO: ADDRESS FOR v.2.5 - - def heartbeatInterval: FiniteDuration = - _heartbeatInterval - def acceptableHeartbeatPause: FiniteDuration = - _acceptableHeartbeatPause - def failureDetectionInterval: FiniteDuration = - _failureDetectionInterval - - private var _heartbeatInterval: FiniteDuration = 2.seconds - private var _acceptableHeartbeatPause: FiniteDuration = 13.seconds - private var _failureDetectionInterval: FiniteDuration = 2.second - - def this( - role: Option[String], - numberOfContacts: Int, - responseTunnelReceiveTimeout: FiniteDuration, - heartbeatInterval: FiniteDuration, - acceptableHeartbeatPause: FiniteDuration, - failureDetectionInterval: FiniteDuration) = { - this(role, numberOfContacts, responseTunnelReceiveTimeout) - this._heartbeatInterval = heartbeatInterval - this._acceptableHeartbeatPause = acceptableHeartbeatPause - this._failureDetectionInterval = failureDetectionInterval - } - - // END BINARY COMPATIBILITY - - private def copy( - role: Option[String] = role, - numberOfContacts: Int = numberOfContacts, - responseTunnelReceiveTimeout: FiniteDuration = responseTunnelReceiveTimeout, - heartbeatInterval: FiniteDuration = heartbeatInterval, - acceptableHeartbeatPause: FiniteDuration = acceptableHeartbeatPause, - failureDetectionInterval: FiniteDuration = failureDetectionInterval): ClusterReceptionistSettings = - new ClusterReceptionistSettings( - role, - numberOfContacts, - responseTunnelReceiveTimeout, - heartbeatInterval, - acceptableHeartbeatPause, - failureDetectionInterval) -} - -/** - * Marker trait for remote messages with special serializer. - */ -sealed trait ClusterClientMessage extends Serializable - -/** - * Declares a super type for all events emitted by the `ClusterReceptionist`. - * in relation to cluster clients being interacted with. - */ -sealed trait ClusterClientInteraction { - val clusterClient: ActorRef -} - -/** - * Emitted to the Pekko event stream when a cluster client has interacted with - * a receptionist. - */ -final case class ClusterClientUp(override val clusterClient: ActorRef) extends ClusterClientInteraction - -/** - * Emitted to the Pekko event stream when a cluster client was previously connected - * but then not seen for some time. - */ -final case class ClusterClientUnreachable(override val clusterClient: ActorRef) extends ClusterClientInteraction - -sealed abstract class SubscribeClusterClients - -/** - * Subscribe to a cluster receptionist's client interactions where - * it is guaranteed that a sender receives the initial state - * of contact points prior to any events in relation to them - * changing. - * The sender will automatically become unsubscribed when it - * terminates. - */ -case object SubscribeClusterClients extends SubscribeClusterClients { - - /** - * Java API: get the singleton instance - */ - def getInstance = this -} - -sealed abstract class UnsubscribeClusterClients - -/** - * Explicitly unsubscribe from client interaction events. - */ -case object UnsubscribeClusterClients extends UnsubscribeClusterClients { - - /** - * Java API: get the singleton instance - */ - def getInstance = this -} - -sealed abstract class GetClusterClients - -/** - * Get the cluster clients known to this receptionist. A ``ClusterClients`` message - * will be replied. - */ -case object GetClusterClients extends GetClusterClients { - - /** - * Java API: get the singleton instance - */ - def getInstance = this -} - -/** - * The reply to ``GetClusterClients``. - * - * @param clusterClients The presently known list of cluster clients. - */ -final case class ClusterClients(clusterClients: Set[ActorRef]) { - - /** - * Java API - */ - def getClusterClients: java.util.Set[ActorRef] = - clusterClients.asJava -} - -@deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") -object ClusterReceptionist { - - /** - * Scala API: Factory method for `ClusterReceptionist` [[pekko.actor.Props]]. - */ - def props(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings): Props = - Props(new ClusterReceptionist(pubSubMediator, settings)).withDeploy(Deploy.local) - - /** - * INTERNAL API - */ - private[pekko] object Internal { - @SerialVersionUID(1L) - case object GetContacts extends ClusterClientMessage with DeadLetterSuppression - @SerialVersionUID(1L) - final case class Contacts(contactPoints: immutable.IndexedSeq[String]) extends ClusterClientMessage - @SerialVersionUID(1L) - case object Heartbeat extends ClusterClientMessage with DeadLetterSuppression - @SerialVersionUID(1L) - case object HeartbeatRsp extends ClusterClientMessage with DeadLetterSuppression - @SerialVersionUID(1L) - case object ReceptionistShutdown extends ClusterClientMessage with DeadLetterSuppression - @SerialVersionUID(1L) - case object Ping extends DeadLetterSuppression - case object CheckDeadlines - - /** - * Replies are tunneled via this actor, child of the receptionist, to avoid - * inbound connections from other cluster nodes to the client. - */ - class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging { - context.setReceiveTimeout(timeout) - - private val isAsk = { - val pathElements = client.path.elements - pathElements.size == 2 && pathElements.head == "temp" && pathElements.tail.head.startsWith("$") - } - - def receive = { - case Ping => // keep alive from client - case ReceiveTimeout => - log.debug("ClientResponseTunnel for client [{}] stopped due to inactivity", client.path) - context.stop(self) - case msg => - client.tell(msg, Actor.noSender) - if (isAsk) - context.stop(self) - } - } - } - -} - -/** - * [[ClusterClient]] connects to this actor to retrieve. The `ClusterReceptionist` is - * supposed to be started on all nodes, or all nodes with specified role, in the cluster. - * The receptionist can be started with the [[ClusterClientReceptionist]] or as an - * ordinary actor (use the factory method [[ClusterReceptionist#props]]). - * - * The receptionist forwards messages from the client to the associated [[pekko.cluster.pubsub.DistributedPubSubMediator]], - * i.e. the client can send messages to any actor in the cluster that is registered in the - * `DistributedPubSubMediator`. Messages from the client are wrapped in - * [[pekko.cluster.pubsub.DistributedPubSubMediator.Send]], [[pekko.cluster.pubsub.DistributedPubSubMediator.SendToAll]] - * or [[pekko.cluster.pubsub.DistributedPubSubMediator.Publish]] with the semantics described in - * [[pekko.cluster.pubsub.DistributedPubSubMediator]]. - * - * Response messages from the destination actor are tunneled via the receptionist - * to avoid inbound connections from other cluster nodes to the client, i.e. - * the `sender()`, as seen by the destination actor, is not the client itself. - * The `sender()` of the response messages, as seen by the client, is `deadLetters` - * since the client should normally send subsequent messages via the `ClusterClient`. - * It is possible to pass the original sender inside the reply messages if - * the client is supposed to communicate directly to the actor in the cluster. - */ -@deprecated( - "Use Apache Pekko gRPC instead, see https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-pekko-grpc", - since = "Akka 2.6.0") -final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings) - extends Actor - with ActorLogging { - - import ClusterReceptionist.Internal._ - import DistributedPubSubMediator.{ Publish, Send, SendToAll } - import settings._ - - val cluster = Cluster(context.system) - val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging - import cluster.selfAddress - - require(role.forall(cluster.selfRoles.contains), s"This cluster member [$selfAddress] doesn't have the role [$role]") - - var nodes: immutable.SortedSet[Address] = { - def hashFor(node: Address): Int = node match { - // cluster node identifier is the host and port of the address; protocol and system is assumed to be the same - case Address(_, _, Some(host), Some(port)) => MurmurHash.stringHash(s"$host:$port") - case _ => - throw new IllegalStateException(s"Unexpected address without host/port: [$node]") - } - implicit val ringOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) => - val ha = hashFor(a) - val hb = hashFor(b) - ha < hb || (ha == hb && Member.addressOrdering.compare(a, b) < 0) - } - immutable.SortedSet() - } - val virtualNodesFactor = 10 - var consistentHash: ConsistentHash[Address] = ConsistentHash(nodes, virtualNodesFactor) - - var clientInteractions = HashMap.empty[ActorRef, DeadlineFailureDetector] - var clientsPublished = HashSet.empty[ActorRef] - - var subscribers = Vector.empty[ActorRef] - - val checkDeadlinesTask = - context.system.scheduler.scheduleWithFixedDelay( - failureDetectionInterval, - failureDetectionInterval, - self, - CheckDeadlines)(context.dispatcher) - - override def preStart(): Unit = { - super.preStart() - require(!cluster.isTerminated, "Cluster node must not be terminated") - cluster.subscribe(self, classOf[MemberEvent]) - } - - override def postStop(): Unit = { - super.postStop() - cluster.unsubscribe(self) - checkDeadlinesTask.cancel() - clientInteractions.keySet.foreach(_ ! ReceptionistShutdown) - } - - def matchingRole(m: Member): Boolean = role.forall(m.hasRole) - - def responseTunnel(client: ActorRef): ActorRef = { - val encName = URLEncoder.encode(client.path.toSerializationFormat, "utf-8") - context.child(encName) match { - case Some(tunnel) => tunnel - case None => - context.actorOf(Props(classOf[ClientResponseTunnel], client, responseTunnelReceiveTimeout), encName) - } - } - - def receive = { - case msg @ (_: Send | _: SendToAll | _: Publish) => - val tunnel = responseTunnel(sender()) - tunnel ! Ping // keep alive - pubSubMediator.tell(msg, tunnel) - - case Heartbeat => - if (verboseHeartbeat) log.debug("Heartbeat from client [{}]", sender().path) - sender() ! HeartbeatRsp - updateClientInteractions(sender()) - - case GetContacts => - // Consistent hashing is used to ensure that the reply to GetContacts - // is the same from all nodes (most of the time) and it also - // load balances the client connections among the nodes in the cluster. - if (numberOfContacts >= nodes.size) { - val contacts = Contacts(nodes.iterator.map(a => self.path.toStringWithAddress(a)).to(immutable.IndexedSeq)) - if (log.isDebugEnabled) - log.debug( - "Client [{}] gets contactPoints [{}] (all nodes)", - sender().path, - contacts.contactPoints.mkString(",")) - sender() ! contacts - } else { - // using toStringWithAddress in case the client is local, normally it is not, and - // toStringWithAddress will use the remote address of the client - val a = consistentHash.nodeFor(sender().path.toStringWithAddress(cluster.selfAddress)) - val slice = { - val first = nodes.rangeFrom(a).tail.take(numberOfContacts) - if (first.size == numberOfContacts) first - else first.union(nodes.take(numberOfContacts - first.size)) - } - val contacts = Contacts(slice.iterator.map(a => self.path.toStringWithAddress(a)).to(immutable.IndexedSeq)) - if (log.isDebugEnabled) - log.debug("Client [{}] gets contactPoints [{}]", sender().path, contacts.contactPoints.mkString(",")) - sender() ! contacts - } - - case state: CurrentClusterState => - nodes = nodes.empty.union(state.members.collect { - case m if m.status != MemberStatus.Joining && matchingRole(m) => m.address - }) - consistentHash = ConsistentHash(nodes, virtualNodesFactor) - - case MemberUp(m) => - if (matchingRole(m)) { - nodes += m.address - consistentHash = ConsistentHash(nodes, virtualNodesFactor) - } - - case MemberRemoved(m, _) => - if (m.address == selfAddress) - context.stop(self) - else if (matchingRole(m)) { - nodes -= m.address - consistentHash = ConsistentHash(nodes, virtualNodesFactor) - } - - case _: MemberEvent => // not of interest - case SubscribeClusterClients => - val subscriber = sender() - subscriber ! ClusterClients(clientInteractions.keySet.to(HashSet)) - subscribers :+= subscriber - context.watch(subscriber) - - case UnsubscribeClusterClients => - val subscriber = sender() - subscribers = subscribers.filterNot(_ == subscriber) - - case Terminated(subscriber) => - self.tell(UnsubscribeClusterClients, subscriber) - - case GetClusterClients => - sender() ! ClusterClients(clientInteractions.keySet.to(HashSet)) - - case CheckDeadlines => - clientInteractions = clientInteractions.filter { - case (_, failureDetector) => - failureDetector.isAvailable - } - publishClientsUnreachable() - } - - def updateClientInteractions(client: ActorRef): Unit = - clientInteractions.get(client) match { - case Some(failureDetector) => - failureDetector.heartbeat() - case None => - val failureDetector = new DeadlineFailureDetector(acceptableHeartbeatPause, heartbeatInterval) - failureDetector.heartbeat() - clientInteractions = clientInteractions + (client -> failureDetector) - log.debug("Received new contact from [{}]", client.path) - val clusterClientUp = ClusterClientUp(client) - subscribers.foreach(_ ! clusterClientUp) - clientsPublished = clientInteractions.keySet.to(HashSet) - } - - def publishClientsUnreachable(): Unit = { - val publishableClients = clientInteractions.keySet.to(HashSet) - for (c <- clientsPublished if !publishableClients.contains(c)) { - log.debug("Lost contact with [{}]", c.path) - val clusterClientUnreachable = ClusterClientUnreachable(c) - subscribers.foreach(_ ! clusterClientUnreachable) - } - clientsPublished = publishableClients - } -} diff --git a/cluster-tools/src/main/scala/org/apache/pekko/cluster/client/protobuf/ClusterClientMessageSerializer.scala b/cluster-tools/src/main/scala/org/apache/pekko/cluster/client/protobuf/ClusterClientMessageSerializer.scala deleted file mode 100644 index fed3a7124b6..00000000000 --- a/cluster-tools/src/main/scala/org/apache/pekko/cluster/client/protobuf/ClusterClientMessageSerializer.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2015-2022 Lightbend Inc. - */ - -package org.apache.pekko.cluster.client.protobuf - -import java.io.NotSerializableException - -import scala.annotation.nowarn - -import org.apache.pekko -import pekko.actor.ExtendedActorSystem -import pekko.cluster.client.ClusterReceptionist -import pekko.cluster.client.protobuf.msg.{ ClusterClientMessages => cm } -import pekko.serialization.BaseSerializer -import pekko.serialization.SerializerWithStringManifest -import pekko.util.ccompat.JavaConverters._ - -/** - * INTERNAL API: Serializer of ClusterClient messages. - */ -@nowarn("msg=deprecated") -private[pekko] class ClusterClientMessageSerializer(val system: ExtendedActorSystem) - extends SerializerWithStringManifest - with BaseSerializer { - import ClusterReceptionist.Internal._ - - private val ContactsManifest = "A" - private val GetContactsManifest = "B" - private val HeartbeatManifest = "C" - private val HeartbeatRspManifest = "D" - private val ReceptionistShutdownManifest = "E" - - private val emptyByteArray = Array.empty[Byte] - - private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] => AnyRef]( - ContactsManifest -> contactsFromBinary, - GetContactsManifest -> { _ => - GetContacts - }, - HeartbeatManifest -> { _ => - Heartbeat - }, - HeartbeatRspManifest -> { _ => - HeartbeatRsp - }, - ReceptionistShutdownManifest -> { _ => - ReceptionistShutdown - }) - - override def manifest(obj: AnyRef): String = obj match { - case _: Contacts => ContactsManifest - case GetContacts => GetContactsManifest - case Heartbeat => HeartbeatManifest - case HeartbeatRsp => HeartbeatRspManifest - case ReceptionistShutdown => ReceptionistShutdownManifest - case _ => - throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") - } - - override def toBinary(obj: AnyRef): Array[Byte] = obj match { - case m: Contacts => contactsToProto(m).toByteArray - case GetContacts => emptyByteArray - case Heartbeat => emptyByteArray - case HeartbeatRsp => emptyByteArray - case ReceptionistShutdown => emptyByteArray - case _ => - throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") - } - - override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = - fromBinaryMap.get(manifest) match { - case Some(f) => f(bytes) - case None => - throw new NotSerializableException( - s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") - } - - private def contactsToProto(m: Contacts): cm.Contacts = - cm.Contacts.newBuilder().addAllContactPoints(m.contactPoints.asJava).build() - - private def contactsFromBinary(bytes: Array[Byte]): Contacts = { - val m = cm.Contacts.parseFrom(bytes) - Contacts(m.getContactPointsList.asScala.toVector) - } - -} diff --git a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientHandoverSpec.scala b/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientHandoverSpec.scala deleted file mode 100644 index 7057f7df9d1..00000000000 --- a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientHandoverSpec.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2009-2022 Lightbend Inc. - */ - -package org.apache.pekko.cluster.client - -import scala.concurrent.duration._ - -import scala.annotation.nowarn -import com.typesafe.config.ConfigFactory - -import org.apache.pekko -import pekko.actor.{ ActorPath, ActorRef } -import pekko.cluster.{ Cluster, MultiNodeClusterSpec } -import pekko.remote.testconductor.RoleName -import pekko.remote.testkit.{ MultiNodeConfig, STMultiNodeSpec } -import pekko.testkit.{ ImplicitSender, TestActors } - -object ClusterClientHandoverSpec extends MultiNodeConfig { - val client = role("client") - val first = role("first") - val second = role("second") - commonConfig(ConfigFactory.parseString(""" - pekko.loglevel = INFO - pekko.actor.provider = "cluster" - pekko.remote.log-remote-lifecycle-events = off - pekko.cluster.client { - heartbeat-interval = 1d - acceptable-heartbeat-pause = 1d - reconnect-timeout = 3s - refresh-contacts-interval = 1d - - } - pekko.test.filter-leeway = 10s - """).withFallback(MultiNodeClusterSpec.clusterConfig)) -} - -class ClusterClientHandoverSpecMultiJvmNode1 extends ClusterClientHandoverSpec -class ClusterClientHandoverSpecMultiJvmNode2 extends ClusterClientHandoverSpec -class ClusterClientHandoverSpecMultiJvmNode3 extends ClusterClientHandoverSpec - -@nowarn("msg=deprecated") -class ClusterClientHandoverSpec - extends MultiNodeClusterSpec(ClusterClientHandoverSpec) - with STMultiNodeSpec - with ImplicitSender { - - import ClusterClientHandoverSpec._ - - override def initialParticipants: Int = 3 - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(to).address) - ClusterClientReceptionist(system) - } - enterBarrier(from.name + "-joined") - } - - def initialContacts: Set[ActorPath] = Set(first, second).map { r => - node(r) / "system" / "receptionist" - } - - "A Cluster Client" must { - - "startup cluster with a single node" in within(30.seconds) { - join(first, first) - runOn(first) { - val service = system.actorOf(TestActors.echoActorProps, "testService") - ClusterClientReceptionist(system).registerService(service) - awaitMembersUp(1) - } - enterBarrier("cluster-started") - } - - var clusterClient: ActorRef = null - - "establish connection to first node" in { - runOn(client) { - clusterClient = system.actorOf( - ClusterClient.props(ClusterClientSettings(system).withInitialContacts(initialContacts)), - "client1") - clusterClient ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) - expectMsgType[String](3.seconds) should be("hello") - } - enterBarrier("established") - } - - "bring the second node into the cluster" in { - join(second, first) - runOn(second) { - val service = system.actorOf(TestActors.echoActorProps, "testService") - ClusterClientReceptionist(system).registerService(service) - awaitMembersUp(2) - } - enterBarrier("second-up") - } - - "remove first node from the cluster" in { - runOn(first) { - Cluster(system).leave(node(first).address) - } - - runOn(second) { - awaitMembersUp(1) - } - enterBarrier("handover-done") - } - - "re-establish on receptionist shutdown" in { - runOn(client) { - clusterClient ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) - expectMsgType[String](3.seconds) should be("hello") - } - enterBarrier("handover-successful") - } - } -} diff --git a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientSpec.scala b/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientSpec.scala deleted file mode 100644 index d5b1be40d84..00000000000 --- a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientSpec.scala +++ /dev/null @@ -1,383 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2009-2022 Lightbend Inc. - */ - -package org.apache.pekko.cluster.client - -import scala.annotation.nowarn -import scala.concurrent.Await -import scala.concurrent.duration._ - -import com.typesafe.config.ConfigFactory - -import org.apache.pekko -import pekko.actor.Actor -import pekko.actor.ActorPath -import pekko.actor.ActorRef -import pekko.actor.Address -import pekko.actor.NoSerializationVerificationNeeded -import pekko.actor.Props -import pekko.cluster.Cluster -import pekko.cluster.client.ClusterClientSpec.TestClientListener.LatestContactPoints -import pekko.cluster.client.ClusterClientSpec.TestReceptionistListener.LatestClusterClients -import pekko.cluster.pubsub._ -import pekko.remote.testconductor.RoleName -import pekko.remote.testkit.MultiNodeConfig -import pekko.remote.testkit.MultiNodeSpec -import pekko.remote.testkit.STMultiNodeSpec -import pekko.remote.transport.ThrottlerTransportAdapter.Direction -import pekko.testkit._ -import pekko.util.Timeout -import pekko.util.unused - -object ClusterClientSpec extends MultiNodeConfig { - val client = role("client") - val first = role("first") - val second = role("second") - val third = role("third") - val fourth = role("fourth") - - commonConfig(ConfigFactory.parseString(""" - pekko.loglevel = INFO - pekko.actor.provider = "cluster" - pekko.remote.log-remote-lifecycle-events = off - pekko.cluster.downing-provider-class = org.apache.pekko.cluster.testkit.AutoDowning - pekko.cluster.testkit.auto-down-unreachable-after = 0s - pekko.cluster.client.heartbeat-interval = 1s - pekko.cluster.client.acceptable-heartbeat-pause = 3s - pekko.cluster.client.refresh-contacts-interval = 1s - # number-of-contacts must be >= 4 because we shutdown all but one in the end - pekko.cluster.client.receptionist.number-of-contacts = 4 - pekko.cluster.client.receptionist.heartbeat-interval = 10s - pekko.cluster.client.receptionist.acceptable-heartbeat-pause = 10s - pekko.cluster.client.receptionist.failure-detection-interval = 1s - pekko.test.filter-leeway = 10s - """)) - - testTransport(on = true) - - case class Reply(msg: Any, node: Address) extends JavaSerializable - - class TestService(testActor: ActorRef) extends Actor { - def receive = { - case "shutdown" => - context.system.terminate() - case msg => - testActor.forward(msg) - sender() ! Reply(s"$msg-ack", Cluster(context.system).selfAddress) - } - } - - class Service extends Actor { - def receive = { - case msg => sender() ! msg - } - } - - // #clientEventsListener - class ClientListener(targetClient: ActorRef) extends Actor { - override def preStart(): Unit = - targetClient ! SubscribeContactPoints - - def receive: Receive = - receiveWithContactPoints(Set.empty) - - def receiveWithContactPoints(contactPoints: Set[ActorPath]): Receive = { - case ContactPoints(cps) => - context.become(receiveWithContactPoints(cps)) - // Now do something with the up-to-date "cps" - case ContactPointAdded(cp) => - context.become(receiveWithContactPoints(contactPoints + cp)) - // Now do something with an up-to-date "contactPoints + cp" - case ContactPointRemoved(cp) => - context.become(receiveWithContactPoints(contactPoints - cp)) - // Now do something with an up-to-date "contactPoints - cp" - } - } - // #clientEventsListener - - object TestClientListener { - case object GetLatestContactPoints - case class LatestContactPoints(contactPoints: Set[ActorPath]) extends NoSerializationVerificationNeeded - } - - class TestClientListener(targetClient: ActorRef) extends ClientListener(targetClient) { - - import TestClientListener._ - - override def receiveWithContactPoints(contactPoints: Set[ActorPath]): Receive = { - case GetLatestContactPoints => - sender() ! LatestContactPoints(contactPoints) - case msg: Any => - super.receiveWithContactPoints(contactPoints)(msg) - } - } - - // #receptionistEventsListener - class ReceptionistListener(targetReceptionist: ActorRef) extends Actor { - override def preStart(): Unit = - targetReceptionist ! SubscribeClusterClients - - def receive: Receive = - receiveWithClusterClients(Set.empty) - - def receiveWithClusterClients(clusterClients: Set[ActorRef]): Receive = { - case ClusterClients(cs) => - context.become(receiveWithClusterClients(cs)) - // Now do something with the up-to-date "c" - case ClusterClientUp(c) => - context.become(receiveWithClusterClients(clusterClients + c)) - // Now do something with an up-to-date "clusterClients + c" - case ClusterClientUnreachable(c) => - context.become(receiveWithClusterClients(clusterClients - c)) - // Now do something with an up-to-date "clusterClients - c" - } - } - // #receptionistEventsListener - - object TestReceptionistListener { - case object GetLatestClusterClients - case class LatestClusterClients(clusterClients: Set[ActorRef]) extends NoSerializationVerificationNeeded - } - - class TestReceptionistListener(targetReceptionist: ActorRef) extends ReceptionistListener(targetReceptionist) { - - import TestReceptionistListener._ - - override def receiveWithClusterClients(clusterClients: Set[ActorRef]): Receive = { - case GetLatestClusterClients => - sender() ! LatestClusterClients(clusterClients) - case msg: Any => - super.receiveWithClusterClients(clusterClients)(msg) - } - } -} - -class ClusterClientMultiJvmNode1 extends ClusterClientSpec -class ClusterClientMultiJvmNode2 extends ClusterClientSpec -class ClusterClientMultiJvmNode3 extends ClusterClientSpec -class ClusterClientMultiJvmNode4 extends ClusterClientSpec -class ClusterClientMultiJvmNode5 extends ClusterClientSpec - -@nowarn("msg=deprecated") -class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNodeSpec with ImplicitSender { - import ClusterClientSpec._ - - override def initialParticipants = roles.size - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(to).address) - createReceptionist() - } - enterBarrier(from.name + "-joined") - } - - def createReceptionist(): Unit = ClusterClientReceptionist(system) - - def awaitCount(expected: Int): Unit = { - awaitAssert { - DistributedPubSub(system).mediator ! DistributedPubSubMediator.Count - val actual = expectMsgType[Int] - actual should ===(expected) - } - } - - var remainingServerRoleNames = Set(first, second, third, fourth) - - def roleName(addr: Address): Option[RoleName] = remainingServerRoleNames.find(node(_).address == addr) - - def initialContacts = (remainingServerRoleNames - first - fourth).map { r => - node(r) / "system" / "receptionist" - } - - @unused - def docOnly = { // not used, only demo - // #initialContacts - val initialContacts = Set( - ActorPath.fromString("pekko://OtherSys@host1:7355/system/receptionist"), - ActorPath.fromString("pekko://OtherSys@host2:7355/system/receptionist")) - val settings = ClusterClientSettings(system).withInitialContacts(initialContacts) - // #initialContacts - - // make the compiler happy and thinking we use it - settings.acceptableHeartbeatPause - } - - "A ClusterClient" must { - - "startup cluster" in within(30.seconds) { - join(first, first) - join(second, first) - join(third, first) - join(fourth, first) - runOn(fourth) { - val service = system.actorOf(Props(classOf[TestService], testActor), "testService") - ClusterClientReceptionist(system).registerService(service) - } - runOn(first, second, third, fourth) { - awaitCount(1) - } - - enterBarrier("after-1") - } - - "communicate to actor on any node in cluster" in within(10.seconds) { - runOn(client) { - val c = system.actorOf( - ClusterClient.props(ClusterClientSettings(system).withInitialContacts(initialContacts)), - "client1") - c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) - expectMsgType[Reply].msg should be("hello-ack") - system.stop(c) - } - runOn(fourth) { - expectMsg("hello") - } - - enterBarrier("after-2") - } - - "work with ask" in within(10.seconds) { - runOn(client) { - import pekko.pattern.ask - val c = system.actorOf( - ClusterClient.props(ClusterClientSettings(system).withInitialContacts(initialContacts)), - "ask-client") - implicit val timeout: Timeout = Timeout(remaining) - val reply = c ? ClusterClient.Send("/user/testService", "hello-request", localAffinity = true) - Await.result(reply.mapTo[Reply], remaining).msg should be("hello-request-ack") - system.stop(c) - } - runOn(fourth) { - expectMsg("hello-request") - } - - enterBarrier("after-3") - } - - "demonstrate usage" in within(15.seconds) { - def host1 = first - def host2 = second - def host3 = third - - // #server - runOn(host1) { - val serviceA = system.actorOf(Props[Service](), "serviceA") - ClusterClientReceptionist(system).registerService(serviceA) - } - - runOn(host2, host3) { - val serviceB = system.actorOf(Props[Service](), "serviceB") - ClusterClientReceptionist(system).registerService(serviceB) - } - // #server - - runOn(host1, host2, host3, fourth) { - awaitCount(4) - } - enterBarrier("services-replicated") - - // #client - runOn(client) { - val c = system.actorOf( - ClusterClient.props(ClusterClientSettings(system).withInitialContacts(initialContacts)), - "client") - c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true) - c ! ClusterClient.SendToAll("/user/serviceB", "hi") - } - // #client - - runOn(client) { - // note that "hi" was sent to 2 "serviceB" - receiveN(3).toSet should ===(Set("hello", "hi")) - } - - // strange, barriers fail without this sleep - Thread.sleep(1000) - enterBarrier("after-4") - } - - "report events" in within(15.seconds) { - runOn(client) { - implicit val timeout: Timeout = Timeout(1.second.dilated) - val client = Await.result(system.actorSelection("/user/client").resolveOne(), timeout.duration) - val listener = system.actorOf(Props(classOf[TestClientListener], client), "reporter-client-listener") - - val expectedContacts = Set(first, second, third, fourth).map(node(_) / "system" / "receptionist") - awaitAssert({ - listener ! TestClientListener.GetLatestContactPoints - expectMsgType[LatestContactPoints].contactPoints should ===(expectedContacts) - }, max = 10.seconds) - } - - enterBarrier("reporter-client-listener-tested") - - runOn(first, second, third) { - // Only run this test on a node that knows about our client. It could be that no node knows - // but there isn't a means of expressing that at least one of the nodes needs to pass the test. - implicit val timeout: Timeout = Timeout(2.seconds.dilated) - val r = ClusterClientReceptionist(system).underlying - r ! GetClusterClients - val cps = expectMsgType[ClusterClients] - if (cps.clusterClients.exists(_.path.name == "client")) { - log.info("Testing that the receptionist has just one client") - val l = system.actorOf(Props(classOf[TestReceptionistListener], r), "reporter-receptionist-listener") - - val expectedClient = - Await.result(system.actorSelection(node(client) / "user" / "client").resolveOne(), timeout.duration) - awaitAssert({ - val probe = TestProbe() - l.tell(TestReceptionistListener.GetLatestClusterClients, probe.ref) - // "ask-client" might still be around, filter - probe.expectMsgType[LatestClusterClients].clusterClients should contain(expectedClient) - }, max = 10.seconds) - } - } - - enterBarrier("after-5") - } - - "report a removal of a receptionist" in within(10.seconds) { - runOn(client) { - val unreachableContact = node(client) / "system" / "receptionist" - val expectedRoles = Set(first, second, third, fourth) - val expectedContacts = expectedRoles.map(node(_) / "system" / "receptionist") - - // We need to slow down things otherwise our receptionists can sometimes tell us - // that our unreachableContact is unreachable before we get a chance to - // subscribe to events. - expectedRoles.foreach { role => - testConductor.blackhole(client, role, Direction.Both).await - } - - val c = system.actorOf( - ClusterClient.props(ClusterClientSettings(system).withInitialContacts(expectedContacts + unreachableContact)), - "client5") - - val probe = TestProbe() - c.tell(SubscribeContactPoints, probe.ref) - - expectedRoles.foreach { role => - testConductor.passThrough(client, role, Direction.Both).await - } - - probe.fishForMessage(10.seconds, "removal") { - case ContactPointRemoved(`unreachableContact`) => true - case _ => false - } - } - enterBarrier("after-7") - } - - } -} diff --git a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala b/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala deleted file mode 100644 index 6af92d89a7e..00000000000 --- a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2009-2022 Lightbend Inc. - */ - -package org.apache.pekko.cluster.client - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import scala.annotation.nowarn -import com.typesafe.config.ConfigFactory - -import org.apache.pekko -import pekko.actor.{ Actor, Props } -import pekko.cluster.Cluster -import pekko.cluster.pubsub.{ DistributedPubSub, DistributedPubSubMediator } -import pekko.remote.testconductor.RoleName -import pekko.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } -import pekko.testkit.{ EventFilter, ImplicitSender } - -object ClusterClientStopSpec extends MultiNodeConfig { - val client = role("client") - val first = role("first") - val second = role("second") - commonConfig(ConfigFactory.parseString(""" - pekko.loglevel = INFO - pekko.actor.provider = "cluster" - pekko.remote.log-remote-lifecycle-events = off - pekko.cluster.client { - heartbeat-interval = 1s - acceptable-heartbeat-pause = 1s - reconnect-timeout = 3s - receptionist.number-of-contacts = 1 - - } - pekko.test.filter-leeway = 10s - """)) - - class Service extends Actor { - def receive = { - case msg => sender() ! msg - } - } -} - -class ClusterClientStopMultiJvmNode1 extends ClusterClientStopSpec -class ClusterClientStopMultiJvmNode2 extends ClusterClientStopSpec -class ClusterClientStopMultiJvmNode3 extends ClusterClientStopSpec - -@nowarn("msg=deprecated") -class ClusterClientStopSpec extends MultiNodeSpec(ClusterClientStopSpec) with STMultiNodeSpec with ImplicitSender { - - import ClusterClientStopSpec._ - - override def initialParticipants: Int = 3 - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - Cluster(system).join(node(to).address) - ClusterClientReceptionist(system) - } - enterBarrier(from.name + "-joined") - } - - def awaitCount(expected: Int): Unit = { - awaitAssert { - DistributedPubSub(system).mediator ! DistributedPubSubMediator.Count - val actual = expectMsgType[Int] - actual should ===(expected) - } - } - - def initialContacts = Set(first, second).map { r => - node(r) / "system" / "receptionist" - } - - "A Cluster Client" should { - - "startup cluster" in within(30.seconds) { - join(first, first) - join(second, first) - runOn(first) { - val service = system.actorOf(Props(classOf[Service]), "testService") - ClusterClientReceptionist(system).registerService(service) - } - runOn(first, second) { - awaitCount(1) - } - - enterBarrier("cluster-started") - } - - "stop if re-establish fails for too long time" in within(20.seconds) { - runOn(client) { - val c = system.actorOf( - ClusterClient.props(ClusterClientSettings(system).withInitialContacts(initialContacts)), - "client1") - c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) - expectMsgType[String](3.seconds) should be("hello") - enterBarrier("was-in-contact") - - watch(c) - - expectTerminated(c, 10.seconds) - EventFilter.warning(start = "Receptionist reconnect not successful within", occurrences = 1) - - } - - runOn(first, second) { - enterBarrier("was-in-contact") - Await.ready(system.terminate(), 10.seconds) - - } - - } - - } - -} diff --git a/cluster-tools/src/test/java/org/apache/pekko/cluster/client/ClusterClientTest.java b/cluster-tools/src/test/java/org/apache/pekko/cluster/client/ClusterClientTest.java deleted file mode 100644 index ad513485891..00000000000 --- a/cluster-tools/src/test/java/org/apache/pekko/cluster/client/ClusterClientTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2009-2022 Lightbend Inc. - */ - -package org.apache.pekko.cluster.client; - -import com.typesafe.config.ConfigFactory; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; -import org.apache.pekko.actor.*; -import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; -import org.junit.ClassRule; -import org.junit.Test; -import org.scalatestplus.junit.JUnitSuite; - -public class ClusterClientTest extends JUnitSuite { - - @ClassRule - public static PekkoJUnitActorSystemResource actorSystemResource = - new PekkoJUnitActorSystemResource( - "DistributedPubSubMediatorTest", - ConfigFactory.parseString( - "pekko.actor.provider = \"cluster\"\n" - + "pekko.remote.classic.netty.tcp.port=0\n" - + "pekko.remote.artery.canonical.port=0")); - - private final ActorSystem system = actorSystemResource.getSystem(); - - // #initialContacts - Set initialContacts() { - return new HashSet( - Arrays.asList( - ActorPaths.fromString("pekko://OtherSys@host1:7355/system/receptionist"), - ActorPaths.fromString("pekko://OtherSys@host2:7355/system/receptionist"))); - } - - // #initialContacts - - @Test - @Deprecated - public void demonstrateUsage() { - // #server - ActorRef serviceA = system.actorOf(Props.create(Service.class), "serviceA"); - ClusterClientReceptionist.get(system).registerService(serviceA); - - ActorRef serviceB = system.actorOf(Props.create(Service.class), "serviceB"); - ClusterClientReceptionist.get(system).registerService(serviceB); - // #server - - // #client - final ActorRef c = - system.actorOf( - ClusterClient.props( - ClusterClientSettings.create(system).withInitialContacts(initialContacts())), - "client"); - c.tell(new ClusterClient.Send("/user/serviceA", "hello", true), ActorRef.noSender()); - c.tell(new ClusterClient.SendToAll("/user/serviceB", "hi"), ActorRef.noSender()); - // #client - - system.actorOf(Props.create(ClientListener.class, c)); - system.actorOf( - Props.create( - ReceptionistListener.class, ClusterClientReceptionist.get(system).underlying())); - } - - public static class Service extends UntypedAbstractActor { - public void onReceive(Object msg) {} - } - - // #clientEventsListener - public static class ClientListener extends AbstractActor { - private final ActorRef targetClient; - private final Set contactPoints = new HashSet<>(); - - public ClientListener(ActorRef targetClient) { - this.targetClient = targetClient; - } - - @Override - public void preStart() { - targetClient.tell(SubscribeContactPoints.getInstance(), sender()); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match( - ContactPoints.class, - msg -> { - contactPoints.addAll(msg.getContactPoints()); - // Now do something with an up-to-date "contactPoints" - }) - .match( - ContactPointAdded.class, - msg -> { - contactPoints.add(msg.contactPoint()); - // Now do something with an up-to-date "contactPoints" - }) - .match( - ContactPointRemoved.class, - msg -> { - contactPoints.remove(msg.contactPoint()); - // Now do something with an up-to-date "contactPoints" - }) - .build(); - } - } - - // #clientEventsListener - - // #receptionistEventsListener - public static class ReceptionistListener extends AbstractActor { - private final ActorRef targetReceptionist; - private final Set clusterClients = new HashSet<>(); - - public ReceptionistListener(ActorRef targetReceptionist) { - this.targetReceptionist = targetReceptionist; - } - - @Override - public void preStart() { - targetReceptionist.tell(SubscribeClusterClients.getInstance(), sender()); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match( - ClusterClients.class, - msg -> { - clusterClients.addAll(msg.getClusterClients()); - // Now do something with an up-to-date "clusterClients" - }) - .match( - ClusterClientUp.class, - msg -> { - clusterClients.add(msg.clusterClient()); - // Now do something with an up-to-date "clusterClients" - }) - .match( - ClusterClientUnreachable.class, - msg -> { - clusterClients.remove(msg.clusterClient()); - // Now do something with an up-to-date "clusterClients" - }) - .build(); - } - } - // #receptionistEventsListener - -} diff --git a/cluster-tools/src/test/scala/org/apache/pekko/cluster/client/protobuf/ClusterClientMessageSerializerSpec.scala b/cluster-tools/src/test/scala/org/apache/pekko/cluster/client/protobuf/ClusterClientMessageSerializerSpec.scala deleted file mode 100644 index 7f2e37cea5d..00000000000 --- a/cluster-tools/src/test/scala/org/apache/pekko/cluster/client/protobuf/ClusterClientMessageSerializerSpec.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2015-2022 Lightbend Inc. - */ - -package org.apache.pekko.cluster.client.protobuf - -import scala.annotation.nowarn - -import org.apache.pekko -import pekko.actor.ExtendedActorSystem -import pekko.cluster.client.ClusterReceptionist.Internal._ -import pekko.testkit.PekkoSpec - -@nowarn("msg=deprecated") -class ClusterClientMessageSerializerSpec extends PekkoSpec { - - val serializer = new ClusterClientMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) - - def checkSerialization(obj: AnyRef): Unit = { - val blob = serializer.toBinary(obj) - val ref = serializer.fromBinary(blob, serializer.manifest(obj)) - ref should ===(obj) - } - - "ClusterClientMessages" must { - - "be serializable" in { - val contactPoints = Vector( - "pekko://system@node-1:7355/system/receptionist", - "pekko://system@node-2:7355/system/receptionist", - "pekko://system@node-3:7355/system/receptionist") - checkSerialization(Contacts(contactPoints)) - checkSerialization(GetContacts) - checkSerialization(Heartbeat) - checkSerialization(HeartbeatRsp) - checkSerialization(ReceptionistShutdown) - } - } -} diff --git a/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index 03f1ab39523..110789c28d6 100644 --- a/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -613,15 +613,6 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } } - "not conflict with the ClusterClient receptionist default name".taggedAs(LongRunningTest, GHExcludeAeronTest) in { - val testKit = ActorTestKit(s"ClusterReceptionistSpec-test-9", ClusterReceptionistSpec.config) - try { - testKit.system.systemActorOf(Behaviors.ignore, "receptionist") - } finally { - testKit.shutdownTestKit() - } - } - "handle unregistration and re-registration of services".taggedAs(LongRunningTest, GHExcludeAeronTest) in { val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-10", ClusterReceptionistSpec.config) val system1 = testKit1.system diff --git a/docs/src/main/paradox/cluster-client.md b/docs/src/main/paradox/cluster-client.md deleted file mode 100644 index 0a28f86b767..00000000000 --- a/docs/src/main/paradox/cluster-client.md +++ /dev/null @@ -1,303 +0,0 @@ -# Classic Cluster Client - -@@@ warning - -Cluster Client is deprecated in favor of using [Pekko gRPC]($pekko.doc.dns$/docs/pekko-grpc/current/index.html). -It is not advised to build new applications with Cluster Client. - -@@@ - -@@include[includes.md](includes.md) { #actor-api } - -## Module info - -To use Cluster Client, you must add the following dependency in your project: - -@@dependency[sbt,Maven,Gradle] { -bomGroup=org.apache.pekko bomArtifact=pekko-bom_$scala.binary.version$ bomVersionSymbols=PekkoVersion -symbol1=PekkoVersion -value1="$pekko.version$" -group=org.apache.pekko -artifact=pekko-cluster-tools_$scala.binary.version$ -version=PekkoVersion -} - -@@project-info{ projectId="cluster-tools" } - -## Introduction - -An actor system that is not part of the cluster can communicate with actors -somewhere in the cluster via the @apidoc[ClusterClient], the client can run in an `ActorSystem` that is part of -another cluster. It only needs to know the location of one (or more) nodes to use as initial -contact points. It will establish a connection to a @apidoc[cluster.client.ClusterReceptionist] somewhere in -the cluster. It will monitor the connection to the receptionist and establish a new -connection if the link goes down. When looking for a new receptionist it uses fresh -contact points retrieved from the previous establishment, or periodically refreshed contacts, -i.e. not necessarily the initial contact points. - -Using the @apidoc[ClusterClient] for communicating with a cluster from the outside requires that the system with the client -can both connect and be connected with Pekko Remoting from all the nodes in the cluster with a receptionist. -This creates a tight coupling in that the client and cluster systems may need to have the same version of both Pekko, libraries, message classes, serializers and potentially even the JVM. In many cases it is a better solution -to use a more explicit and decoupling protocol such as [HTTP]($pekko.doc.dns$/docs/pekko-http/current/index.html) or -[gRPC]($pekko.doc.dns$/docs/pekko-grpc/current/). - -Additionally, since Pekko Remoting is primarily designed as a protocol for Pekko Cluster there is no explicit resource -management, when a @apidoc[ClusterClient] has been used it will cause connections with the cluster until the ActorSystem is -stopped (unlike other kinds of network clients). - -@apidoc[ClusterClient] should not be used when sending messages to actors that run -within the same cluster. Similar functionality as the @apidoc[ClusterClient] is -provided more efficiently by @ref:[Distributed Publish Subscribe in Cluster](distributed-pub-sub.md) for actors that -belong to the same cluster. - -The connecting system must have its `org.apache.pekko.actor.provider` set to `remote` or `cluster` when using -the cluster client. - -The receptionist is supposed to be started on all nodes, or all nodes with a specified role, -in the cluster. The receptionist can be started with the @apidoc[cluster.client.ClusterReceptionist] extension -or as an ordinary actor. - -You can send messages via the @apidoc[ClusterClient] to any actor in the cluster that is registered -in the @apidoc[DistributedPubSubMediator] used by the @apidoc[cluster.client.ClusterReceptionist]. -The @apidoc[ClusterClientReceptionist] provides methods for registration of actors that -should be reachable from the client. Messages are wrapped in `ClusterClient.Send`, -@scala[@scaladoc[`ClusterClient.SendToAll`](pekko.cluster.client.ClusterClient$)]@java[`ClusterClient.SendToAll`] or @scala[@scaladoc[`ClusterClient.Publish`](org.apache.pekko.cluster.client.ClusterClient$)]@java[`ClusterClient.Publish`]. - -Both the @apidoc[ClusterClient] and the @apidoc[ClusterClientReceptionist] emit events that can be subscribed to. -The @apidoc[ClusterClient] sends out notifications about the list of contact points received -from the @apidoc[ClusterClientReceptionist]. One use of this list might be for the client to record its -contact points. A client that is restarted could then use this information to supersede any previously -configured contact points. - -The @apidoc[ClusterClientReceptionist] sends out notifications in relation to having received a contact -from a @apidoc[ClusterClient]. This notification enables the server containing the receptionist to become aware of -what clients are connected to. - -1. **ClusterClient.Send** - - The message will be delivered to one recipient with a matching path if any such exists. If several entries match the path the message will be delivered - to one random destination. The sender of the message can specify that local - affinity is preferred, i.e. the message is sent to an actor in the same local actor - system as the used receptionist actor, if any such exists, otherwise random to any other - matching entry. - -2. **ClusterClient.SendToAll** - - The message will be delivered to all recipients with a matching path. - -3. **ClusterClient.Publish** - - The message will be delivered to all recipients Actors that have been registered as subscribers - to the named topic. - -Response messages from the destination actor are tunneled via the receptionist -to avoid inbound connections from other cluster nodes to the client: - -* @scala[@scaladoc[`sender()`](pekko.actor.Actor)]@java[@javadoc[`getSender()`](pekko.actor.Actor)], as seen by the destination actor, is not the client itself, - but the receptionist -* @scala[@scaladoc[`sender()`](pekko.actor.Actor)] @java[@javadoc[`getSender()`](pekko.actor.Actor)] of the response messages, sent back from the destination and seen by the client, - is `deadLetters` - -since the client should normally send subsequent messages via the @apidoc[ClusterClient]. -It is possible to pass the original sender inside the reply messages if -the client is supposed to communicate directly to the actor in the cluster. - -While establishing a connection to a receptionist the @apidoc[ClusterClient] will buffer -messages and send them when the connection is established. If the buffer is full -the @apidoc[ClusterClient] will drop old messages when new messages are sent via the client. -The size of the buffer is configurable and it can be disabled by using a buffer size of 0. - -It's worth noting that messages can always be lost because of the distributed nature -of these actors. As always, additional logic should be implemented in the destination -(acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery. - -## An Example - -On the cluster nodes, first start the receptionist. Note, it is recommended to load the extension -when the actor system is started by defining it in the `pekko.extensions` configuration property: - -``` -pekko.extensions = ["org.apache.pekko.cluster.client.ClusterClientReceptionist"] -``` - -Next, register the actors that should be available for the client. - -Scala -: @@snip [ClusterClientSpec.scala](/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientSpec.scala) { #server } - -Java -: @@snip [ClusterClientTest.java](/cluster-tools/src/test/java/org/apache/pekko/cluster/client/ClusterClientTest.java) { #server } - -On the client, you create the @apidoc[ClusterClient] actor and use it as a gateway for sending -messages to the actors identified by their path (without address information) somewhere -in the cluster. - -Scala -: @@snip [ClusterClientSpec.scala](/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientSpec.scala) { #client } - -Java -: @@snip [ClusterClientTest.java](/cluster-tools/src/test/java/org/apache/pekko/cluster/client/ClusterClientTest.java) { #client } - -The `initialContacts` parameter is a @scala[`Set[ActorPath]`]@java[`Set`], which can be created like this: - -Scala -: @@snip [ClusterClientSpec.scala](/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientSpec.scala) { #initialContacts } - -Java -: @@snip [ClusterClientTest.java](/cluster-tools/src/test/java/org/apache/pekko/cluster/client/ClusterClientTest.java) { #initialContacts } - -You will probably define the address information of the initial contact points in configuration or system property. -See also @ref:[Configuration](#cluster-client-config). - -## ClusterClientReceptionist Extension - -In the example above the receptionist is started and accessed with the `org.apache.pekko.cluster.client.ClusterClientReceptionist` extension. -That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to -start the `org.apache.pekko.cluster.client.ClusterReceptionist` actor as an ordinary actor and you can have several -different receptionists at the same time, serving different types of clients. - -Note that the @apidoc[ClusterClientReceptionist] uses the @apidoc[DistributedPubSub] extension, which is described -in @ref:[Distributed Publish Subscribe in Cluster](distributed-pub-sub.md). - -It is recommended to load the extension when the actor system is started by defining it in the -`pekko.extensions` configuration property: - -``` -pekko.extensions = ["pekko.cluster.client.ClusterClientReceptionist"] -``` - -## Events - -As mentioned earlier, both the @apidoc[ClusterClient] and @apidoc[ClusterClientReceptionist] emit events that can be subscribed to. -The following code snippet declares an actor that will receive notifications on contact points (addresses to the available -receptionists), as they become available. The code illustrates subscribing to the events and receiving the @apidoc[ClusterClient] -initial state. - -Scala -: @@snip [ClusterClientSpec.scala](/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientSpec.scala) { #clientEventsListener } - -Java -: @@snip [ClusterClientTest.java](/cluster-tools/src/test/java/org/apache/pekko/cluster/client/ClusterClientTest.java) { #clientEventsListener } - -Similarly we can have an actor that behaves in a similar fashion for learning what cluster clients are connected to a @apidoc[ClusterClientReceptionist]: - -Scala -: @@snip [ClusterClientSpec.scala](/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientSpec.scala) { #receptionistEventsListener } - -Java -: @@snip [ClusterClientTest.java](/cluster-tools/src/test/java/org/apache/pekko/cluster/client/ClusterClientTest.java) { #receptionistEventsListener } - - -## Configuration - -The @apidoc[ClusterClientReceptionist] extension (or @apidoc[cluster.client.ClusterReceptionistSettings]) can be configured -with the following properties: - -@@snip [reference.conf](/cluster-tools/src/main/resources/reference.conf) { #receptionist-ext-config } - -The following configuration properties are read by the @apidoc[ClusterClientSettings] -when created with a @scala[@scaladoc[`ActorSystem`](pekko.actor.ActorSystem)]@java[@javadoc[`ActorSystem`](pekko.actor.ActorSystem)] parameter. It is also possible to amend the @apidoc[ClusterClientSettings] -or create it from another config section with the same layout as below. @apidoc[ClusterClientSettings] is -a parameter to the @scala[@scaladoc[`ClusterClient.props`](pekko.cluster.client.ClusterClient$)]@java[@javadoc[`ClusterClient.props`](pekko.cluster.client.ClusterClient$)] factory method, i.e. each client can be configured -with different settings if needed. - -@@snip [reference.conf](/cluster-tools/src/main/resources/reference.conf) { #cluster-client-config } - -## Failure handling - -When the cluster client is started it must be provided with a list of initial contacts which are cluster -nodes where receptionists are running. It will then repeatedly (with an interval configurable -by `establishing-get-contacts-interval`) try to contact those until it gets in contact with one of them. -While running, the list of contacts is continuously updated with data from the receptionists (again, with an -interval configurable with `refresh-contacts-interval`), so that if there are more receptionists in the cluster -than the initial contacts provided to the client will learn about them. - -While the client is running it will detect failures in its connection to the receptionist by heartbeats -if more than a configurable amount of heartbeats are missed the client will try to reconnect to its known -set of contacts to find a receptionist it can access. - -## When the cluster cannot be reached at all - -It is possible to make the cluster client stop entirely if it cannot find a receptionist it can talk to -within a configurable interval. This is configured with the `reconnect-timeout`, which defaults to `off`. -This can be useful when initial contacts are provided from some kind of service registry, cluster node addresses -are entirely dynamic and the entire cluster might shut down or crash, be restarted on new addresses. Since the -client will be stopped in that case a monitoring actor can watch it and upon `Terminate` a new set of initial -contacts can be fetched and a new cluster client started. - -## Migration to Apache Pekko gRPC - -Cluster Client is deprecated and it is not advised to build new applications with it. -As a replacement, we recommend using [Pekko gRPC]($pekko.doc.dns$/docs/pekko-grpc/current/) -with an application-specific protocol. The benefits of this approach are: - -* Improved security by using TLS for gRPC (HTTP/2) versus exposing Pekko Remoting outside the Pekko Cluster -* Easier to update clients and servers independent of each other -* Improved protocol definition between client and server -* Usage of [Pekko gRPC Service Discovery]($pekko.doc.dns$/docs/pekko-grpc/current/client/configuration.html#using-pekko-discovery-for-endpoint-discovery) -* Clients do not need to use Pekko -* See also [gRPC versus Pekko Remoting]($pekko.doc.dns$/docs/pekko-grpc/current/whygrpc.html#grpc-vs-pekko-remoting) - -### Migrating directly - -Existing users of Cluster Client may migrate directly to Pekko gRPC and use it -as documented in [its documentation]($pekko.doc.dns$/docs/pekko-grpc/current/). - -### Migrating gradually - -If your application extensively uses Cluster Client, a more gradual migration -might be desired that requires less re-write of the application. That migration step is described in this section. We recommend migration directly if feasible, -though. - -An example is provided to illustrate an approach to migrate from the deprecated Cluster Client to Pekko gRPC, -with minimal changes to your existing code. The example is intended to be copied and adjusted to your needs. -It will not be provided as a published artifact. - -* [pekko-samples/pekko-sample-cluster-cluster-client-grpc-scala](https://github.com/apache/pekko-samples/tree/main/pekko-sample-cluster-client-grpc-scala) implemented in Scala -* [pekko-samples/pekko-sample-cluster-cluster-client-grpc-java](https://github.com/apache/pekko-samples/tree/main/pekko-sample-cluster-client-grpc-java) implemented in Java - -The example is still using an actor on the client-side to have an API that is very close -to the original Cluster Client. The messages this actor can handle correspond to the -@ref:[Distributed Pub Sub](distributed-pub-sub.md) messages on the server-side, such as -`ClusterClient.Send` and `ClusterClient.Publish`. - -The `ClusterClient` actor delegates those messages to the gRPC client, and on the -server-side those are translated and delegated to the destination actors that -are registered via the `ClusterClientReceptionist` in the same way as in the original. - -Pekko gRPC is used as the transport for the messages between client and server, instead of Pekko Remoting. - -The application specific messages are wrapped and serialized with Pekko Serialization, -which means that care must be taken to keep wire compatibility when changing any messages used -between the client and server. The Pekko configuration of Pekko serializers must be the same (or -being compatible) on the client and the server. - -#### Next steps - -After this first migration step from Cluster Client to Pekko gRPC, you can start -replacing calls to `ClusterClientReceptionistService` with new, -application-specific gRPC endpoints. - -#### Differences - -Aside from the underlying implementation using gRPC instead of Actor messages -and Pekko Remoting it's worth pointing out the following differences between -the Cluster Client and the example emulating Cluster Client with Pekko gRPC as -transport. - -##### Single request-reply - -For request-reply interactions when there is only one reply message for each request -it is more efficient to use the `ClusterClient.AskSend` message instead of -`ClusterClient.Send` as illustrated in the example. Then it doesn't have to -setup a full bidirectional gRPC stream for each request but can use the @scala[`Future`]@java[`CompletionStage`] -based API. - -##### Initial contact points - -Instead of configured initial contact points the [Pekko gRPC Service Discovery]($pekko.doc.dns$/docs/pekko-grpc/current/client/configuration.html#using-pekko-discovery-for-endpoint-discovery) can be used. - -##### Failure detection - -Heartbeat messages and failure detection of the connections have been removed -since that should be handled by the gRPC connections. diff --git a/docs/src/main/paradox/cluster-usage.md b/docs/src/main/paradox/cluster-usage.md index 0516e21973e..365ea5d83a5 100644 --- a/docs/src/main/paradox/cluster-usage.md +++ b/docs/src/main/paradox/cluster-usage.md @@ -284,15 +284,7 @@ See @ref:[Cluster Aware Routers](cluster-routing.md) and @ref:[Routers](routing. @@include[cluster.md](includes/cluster.md) { #cluster-multidc } See @ref:[Cluster Multi-DC](cluster-dc.md). - -### Cluster Client - -Communication from an actor system that is not part of the cluster to actors running -somewhere in the cluster. The client does not have to know on which node the destination -actor is running. -See @ref:[Cluster Client](cluster-client.md). - ### Cluster Metrics The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes diff --git a/docs/src/main/paradox/index-cluster.md b/docs/src/main/paradox/index-cluster.md index bdfc6a101a5..d4c4da8bac1 100644 --- a/docs/src/main/paradox/index-cluster.md +++ b/docs/src/main/paradox/index-cluster.md @@ -11,7 +11,6 @@ For the new API see @ref[Cluster](typed/index-cluster.md). * [cluster-routing](cluster-routing.md) * [cluster-singleton](cluster-singleton.md) * [distributed-pub-sub](distributed-pub-sub.md) -* [cluster-client](cluster-client.md) * [cluster-sharding](cluster-sharding.md) * [cluster-metrics](cluster-metrics.md) * [distributed-data](distributed-data.md)