From f8a0b1ed97421c56f3f43b6a64b547ae95724370 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 11 Jul 2025 19:32:53 +0200 Subject: [PATCH 1/7] feat: add replication APIs to sync client --- .../weaviate/client/v1/cluster/Cluster.java | 10 +++ .../v1/cluster/api/NodesStatusGetter.java | 33 +++++++-- .../client/v1/cluster/api/Replicator.java | 73 +++++++++++++++++++ .../v1/cluster/api/ShardingStateQuerier.java | 47 ++++++++++++ .../cluster/api/replication/Replication.java | 45 ++++++++++++ .../api/ReplicationAllDeleter.java | 20 +++++ .../replication/api/ReplicationAllGetter.java | 27 +++++++ .../replication/api/ReplicationCanceler.java | 26 +++++++ .../replication/api/ReplicationDeleter.java | 26 +++++++ .../replication/api/ReplicationGetter.java | 37 ++++++++++ .../replication/api/ReplicationQuerier.java | 63 ++++++++++++++++ .../replication/model/ReplicateOperation.java | 26 +++++++ .../model/ReplicateOperationState.java | 16 ++++ .../model/ReplicateOperationStatus.java | 12 +++ .../v1/cluster/model/ReplicationType.java | 10 +++ .../v1/cluster/model/ShardReplicas.java | 15 ++++ .../v1/cluster/model/ShardingState.java | 15 ++++ 17 files changed, 494 insertions(+), 7 deletions(-) create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/Replicator.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/ShardingStateQuerier.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/replication/Replication.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllDeleter.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationCanceler.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationDeleter.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationGetter.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperation.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationState.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationStatus.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/model/ReplicationType.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/model/ShardReplicas.java create mode 100644 src/main/java/io/weaviate/client/v1/cluster/model/ShardingState.java diff --git a/src/main/java/io/weaviate/client/v1/cluster/Cluster.java b/src/main/java/io/weaviate/client/v1/cluster/Cluster.java index 5f2727994..ce8506364 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/Cluster.java +++ b/src/main/java/io/weaviate/client/v1/cluster/Cluster.java @@ -2,6 +2,8 @@ import io.weaviate.client.base.http.HttpClient; import io.weaviate.client.v1.cluster.api.NodesStatusGetter; +import io.weaviate.client.v1.cluster.api.Replicator; +import io.weaviate.client.v1.cluster.api.ShardingStateQuerier; import io.weaviate.client.Config; public class Cluster { @@ -17,4 +19,12 @@ public Cluster(HttpClient httpClient, Config config) { public NodesStatusGetter nodesStatusGetter() { return new NodesStatusGetter(httpClient, config); } + + public ShardingStateQuerier shardingStateQuerier() { + return new ShardingStateQuerier(httpClient, config); + } + + public Replicator replicator() { + return new Replicator(httpClient, config); + } } diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/NodesStatusGetter.java b/src/main/java/io/weaviate/client/v1/cluster/api/NodesStatusGetter.java index 4707c0c96..fb7e7cdc7 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/NodesStatusGetter.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/NodesStatusGetter.java @@ -1,5 +1,13 @@ package io.weaviate.client.v1.cluster.api; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.lang3.StringUtils; + import io.weaviate.client.Config; import io.weaviate.client.base.BaseClient; import io.weaviate.client.base.ClientResult; @@ -8,12 +16,10 @@ import io.weaviate.client.base.http.HttpClient; import io.weaviate.client.base.util.UrlEncoder; import io.weaviate.client.v1.cluster.model.NodesStatusResponse; -import org.apache.commons.lang3.StringUtils; public class NodesStatusGetter extends BaseClient implements ClientResult { - private String className; - private String output; + private Map queryParams = new HashMap<>(); public NodesStatusGetter(HttpClient httpClient, Config config) { super(httpClient, config); @@ -24,8 +30,13 @@ public NodesStatusGetter withClassName(String className) { return this; } + public NodesStatusGetter withShard(String shard) { + this.queryParams.put("shard", shard); + return this; + } + public NodesStatusGetter withOutput(String output) { - this.output = output; + this.queryParams.put("output", output); return this; } @@ -37,12 +48,20 @@ public Result run() { private String path() { String path = "/nodes"; + if (StringUtils.isNotBlank(className)) { - path = String.format("%s/%s", path, UrlEncoder.encodePathParam(className)); + path += "/" + UrlEncoder.encodePathParam(className); + } + + List query = new ArrayList<>(); + for (Entry qp : queryParams.entrySet()) { + query.add(UrlEncoder.encodeQueryParam(qp.getKey(), qp.getValue().toString())); } - if (StringUtils.isNotBlank(output)) { - path = String.format("%s?%s", path, UrlEncoder.encodeQueryParam("output", output)); + + if (!query.isEmpty()) { + path += "?" + String.join("&", query); } + return path; } } diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/Replicator.java b/src/main/java/io/weaviate/client/v1/cluster/api/Replicator.java new file mode 100644 index 000000000..3c29ea85f --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/Replicator.java @@ -0,0 +1,73 @@ +package io.weaviate.client.v1.cluster.api; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client.Config; +import io.weaviate.client.base.BaseClient; +import io.weaviate.client.base.ClientResult; +import io.weaviate.client.base.Response; +import io.weaviate.client.base.Result; +import io.weaviate.client.base.http.HttpClient; +import io.weaviate.client.v1.cluster.api.Replicator.ResponseBody; +import io.weaviate.client.v1.cluster.model.ReplicationType; + +public class Replicator extends BaseClient implements ClientResult { + private String className; + private String shard; + private String sourceNode; + private String targetNode; + private ReplicationType replicationType; + + public Replicator(HttpClient httpClient, Config config) { + super(httpClient, config); + } + + public Replicator withClassName(String className) { + this.className = className; + return this; + } + + public Replicator withShard(String shard) { + this.shard = shard; + return this; + } + + public Replicator withSourceNode(String sourceNode) { + this.sourceNode = sourceNode; + return this; + } + + public Replicator withTargetNode(String targetNode) { + this.targetNode = targetNode; + return this; + } + + public Replicator withReplicationType(ReplicationType replicationType) { + this.replicationType = replicationType; + return this; + } + + class RequestBody { + @SerializedName("collection") + String className = Replicator.this.className; + @SerializedName("shard") + String shard = Replicator.this.shard; + @SerializedName("sourceNode") + String sourceNode = Replicator.this.sourceNode; + @SerializedName("targetNode") + String targetNode = Replicator.this.targetNode; + @SerializedName("type") + ReplicationType replicationType = Replicator.this.replicationType; + } + + static class ResponseBody { + @SerializedName("id") + String replicationId; + } + + @Override + public Result run() { + Response resp = sendPostRequest("/replication/replicate", new RequestBody(), ResponseBody.class); + return new Result<>(resp, resp.getBody().replicationId); + } +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/ShardingStateQuerier.java b/src/main/java/io/weaviate/client/v1/cluster/api/ShardingStateQuerier.java new file mode 100644 index 000000000..887f08f95 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/ShardingStateQuerier.java @@ -0,0 +1,47 @@ +package io.weaviate.client.v1.cluster.api; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client.Config; +import io.weaviate.client.base.BaseClient; +import io.weaviate.client.base.ClientResult; +import io.weaviate.client.base.Response; +import io.weaviate.client.base.Result; +import io.weaviate.client.base.http.HttpClient; +import io.weaviate.client.base.util.UrlEncoder; +import io.weaviate.client.v1.cluster.api.ShardingStateQuerier.ResponseBody; +import io.weaviate.client.v1.cluster.model.ShardingState; + +public class ShardingStateQuerier extends BaseClient implements ClientResult { + private String className; + private String shard; + + public ShardingStateQuerier(HttpClient httpClient, Config config) { + super(httpClient, config); + } + + public ShardingStateQuerier withClassName(String className) { + this.className = className; + return this; + } + + public ShardingStateQuerier withShard(String shard) { + this.shard = shard; + return this; + } + + static class ResponseBody { + @SerializedName("shardingState") + ShardingState state; + } + + @Override + public Result run() { + String path = "/replication/sharding-state?" + UrlEncoder.encodeQueryParam("collection", className); + if (shard != null) { + path += "&" + UrlEncoder.encodeQueryParam("shard", shard); + } + Response resp = sendGetRequest(path, ResponseBody.class); + return new Result<>(resp, resp.getBody().state); + } +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/Replication.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/Replication.java new file mode 100644 index 000000000..40c8a6664 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/Replication.java @@ -0,0 +1,45 @@ +package io.weaviate.client.v1.cluster.api.replication; + +import io.weaviate.client.Config; +import io.weaviate.client.base.http.HttpClient; +import io.weaviate.client.v1.cluster.api.replication.api.ReplicationAllDeleter; +import io.weaviate.client.v1.cluster.api.replication.api.ReplicationAllGetter; +import io.weaviate.client.v1.cluster.api.replication.api.ReplicationCanceler; +import io.weaviate.client.v1.cluster.api.replication.api.ReplicationDeleter; +import io.weaviate.client.v1.cluster.api.replication.api.ReplicationGetter; +import io.weaviate.client.v1.cluster.api.replication.api.ReplicationQuerier; + +public class Replication { + + private final Config config; + private final HttpClient httpClient; + + public Replication(HttpClient httpClient, Config config) { + this.config = config; + this.httpClient = httpClient; + } + + public ReplicationGetter getter() { + return new ReplicationGetter(httpClient, config); + } + + public ReplicationAllGetter allGetter() { + return new ReplicationAllGetter(httpClient, config); + } + + public ReplicationQuerier querier() { + return new ReplicationQuerier(httpClient, config); + } + + public ReplicationCanceler canceler() { + return new ReplicationCanceler(httpClient, config); + } + + public ReplicationDeleter deleter() { + return new ReplicationDeleter(httpClient, config); + } + + public ReplicationAllDeleter allDeleter() { + return new ReplicationAllDeleter(httpClient, config); + } +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllDeleter.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllDeleter.java new file mode 100644 index 000000000..c8bb30ccd --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllDeleter.java @@ -0,0 +1,20 @@ +package io.weaviate.client.v1.cluster.api.replication.api; + +import io.weaviate.client.Config; +import io.weaviate.client.base.BaseClient; +import io.weaviate.client.base.ClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.base.http.HttpClient; + +public class ReplicationAllDeleter extends BaseClient implements ClientResult { + + public ReplicationAllDeleter(HttpClient httpClient, Config config) { + super(httpClient, config); + } + + @Override + public Result run() { + return Result + .voidToBoolean(sendDeleteRequest("/replication/replicate", null, Void.class)); + } +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java new file mode 100644 index 000000000..2b9b0e2dc --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java @@ -0,0 +1,27 @@ +package io.weaviate.client.v1.cluster.api.replication.api; + +import java.util.Arrays; +import java.util.List; + +import io.weaviate.client.Config; +import io.weaviate.client.base.BaseClient; +import io.weaviate.client.base.ClientResult; +import io.weaviate.client.base.Response; +import io.weaviate.client.base.Result; +import io.weaviate.client.base.http.HttpClient; +import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation; + +public class ReplicationAllGetter extends BaseClient + implements ClientResult> { + + public ReplicationAllGetter(HttpClient httpClient, Config config) { + super(httpClient, config); + } + + @Override + public Result> run() { + String path = "/replication/replicate/list?includeHistory=true"; + Response resp = sendGetRequest(path, ReplicateOperation[].class); + return new Result<>(resp, Arrays.asList(resp.getBody())); + } +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationCanceler.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationCanceler.java new file mode 100644 index 000000000..f79d793a4 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationCanceler.java @@ -0,0 +1,26 @@ +package io.weaviate.client.v1.cluster.api.replication.api; + +import io.weaviate.client.Config; +import io.weaviate.client.base.BaseClient; +import io.weaviate.client.base.ClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.base.http.HttpClient; + +public class ReplicationCanceler extends BaseClient implements ClientResult { + private String uuid; + + public ReplicationCanceler(HttpClient httpClient, Config config) { + super(httpClient, config); + } + + public ReplicationCanceler withUuid(String uuid) { + this.uuid = uuid; + return this; + } + + @Override + public Result run() { + return Result + .voidToBoolean(sendPostRequest("/replication/replicate/" + uuid + "/cancel", null, Void.class)); + } +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationDeleter.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationDeleter.java new file mode 100644 index 000000000..3f0261d65 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationDeleter.java @@ -0,0 +1,26 @@ +package io.weaviate.client.v1.cluster.api.replication.api; + +import io.weaviate.client.Config; +import io.weaviate.client.base.BaseClient; +import io.weaviate.client.base.ClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.base.http.HttpClient; + +public class ReplicationDeleter extends BaseClient implements ClientResult { + private String uuid; + + public ReplicationDeleter(HttpClient httpClient, Config config) { + super(httpClient, config); + } + + public ReplicationDeleter withUuid(String uuid) { + this.uuid = uuid; + return this; + } + + @Override + public Result run() { + return Result + .voidToBoolean(sendDeleteRequest("/replication/replicate/" + uuid, null, Void.class)); + } +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationGetter.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationGetter.java new file mode 100644 index 000000000..4c54fbbec --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationGetter.java @@ -0,0 +1,37 @@ +package io.weaviate.client.v1.cluster.api.replication.api; + +import io.weaviate.client.Config; +import io.weaviate.client.base.BaseClient; +import io.weaviate.client.base.ClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.base.http.HttpClient; +import io.weaviate.client.base.util.UrlEncoder; +import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation; + +public class ReplicationGetter extends BaseClient implements ClientResult { + private String uuid; + private Boolean includeHistory; + + public ReplicationGetter(HttpClient httpClient, Config config) { + super(httpClient, config); + } + + public ReplicationGetter withUuid(String uuid) { + this.uuid = uuid; + return this; + } + + public ReplicationGetter includeHistory(boolean includeHistory) { + this.includeHistory = includeHistory; + return this; + } + + @Override + public Result run() { + String path = "/replication/replicate/" + uuid; + if (includeHistory != null) { + path += "?" + UrlEncoder.encodeQueryParam("includeHistory", includeHistory.toString()); + } + return new Result<>(sendGetRequest(path, ReplicateOperation.class)); + } +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java new file mode 100644 index 000000000..172d320f3 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java @@ -0,0 +1,63 @@ +package io.weaviate.client.v1.cluster.api.replication.api; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import io.weaviate.client.Config; +import io.weaviate.client.base.BaseClient; +import io.weaviate.client.base.ClientResult; +import io.weaviate.client.base.Response; +import io.weaviate.client.base.Result; +import io.weaviate.client.base.http.HttpClient; +import io.weaviate.client.base.util.UrlEncoder; +import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation; + +public class ReplicationQuerier extends BaseClient + implements ClientResult> { + private Map queryParams = new HashMap<>(); + + public ReplicationQuerier(HttpClient httpClient, Config config) { + super(httpClient, config); + } + + public ReplicationQuerier withClassName(String className) { + this.queryParams.put("collection", className); + return this; + } + + public ReplicationQuerier withShard(String shard) { + this.queryParams.put("shard", shard); + return this; + } + + public ReplicationQuerier withTargetNode(String targetNode) { + this.queryParams.put("targetNode", targetNode); + return this; + } + + public ReplicationQuerier includeHistory(boolean includeHistory) { + this.queryParams.put("includeHistory", includeHistory); + return this; + } + + @Override + public Result> run() { + String path = "/replication/replicate/list"; + + List query = new ArrayList<>(); + for (Entry qp : queryParams.entrySet()) { + query.add(UrlEncoder.encodeQueryParam(qp.getKey(), qp.getValue().toString())); + } + + if (!query.isEmpty()) { + path += "?" + String.join("&", query); + } + + Response resp = sendGetRequest(path, ReplicateOperation[].class); + return new Result<>(resp, Arrays.asList(resp.getBody())); + } +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperation.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperation.java new file mode 100644 index 000000000..558ff60b5 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperation.java @@ -0,0 +1,26 @@ +package io.weaviate.client.v1.cluster.api.replication.model; + +import java.util.List; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client.v1.cluster.model.ReplicationType; + +public class ReplicateOperation { + @SerializedName("id") + String uuid; + @SerializedName("collection") + String className; + @SerializedName("shard") + String shard; + @SerializedName("sourceNode") + String sourceNode; + @SerializedName("targetNode") + String targetNode; + @SerializedName("status") + ReplicateOperationStatus status; + @SerializedName("statusHistory") + List statusHistory; + @SerializedName("type") + ReplicationType transferType; +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationState.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationState.java new file mode 100644 index 000000000..e9a463d1e --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationState.java @@ -0,0 +1,16 @@ +package io.weaviate.client.v1.cluster.api.replication.model; + +public enum ReplicateOperationState { + @SerializedName("REGISTERED") + REGISTERED, + @SerializedName("HYDRATING") + HYDRATING, + @SerializedName("FINALIZING") + FINALIZING, + @SerializedName("DEHYDRATING") + DEHYDRATING, + @SerializedName("READY") + READY, + @SerializedName("CANCELLED") + CANCELLED; +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationStatus.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationStatus.java new file mode 100644 index 000000000..814a2046b --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationStatus.java @@ -0,0 +1,12 @@ +package io.weaviate.client.v1.cluster.api.replication.model; + +import java.util.List; + +import com.google.gson.annotations.SerializedName; + +public class ReplicateOperationStatus { + @SerializedName("state") + ReplicateOperationState state; + @SerializedName("errors") + List errors; +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/model/ReplicationType.java b/src/main/java/io/weaviate/client/v1/cluster/model/ReplicationType.java new file mode 100644 index 000000000..0d7b1f597 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/model/ReplicationType.java @@ -0,0 +1,10 @@ +package io.weaviate.client.v1.cluster.api; + +import com.google.gson.annotations.SerializedName; + +public enum ReplicationType { + @SerializedName("COPY") + COPY, + @SerializedName("MOVE") + MOVE; +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/model/ShardReplicas.java b/src/main/java/io/weaviate/client/v1/cluster/model/ShardReplicas.java new file mode 100644 index 000000000..e4500c15d --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/model/ShardReplicas.java @@ -0,0 +1,15 @@ +package io.weaviate.client.v1.cluster.api; + +import java.util.List; + +import com.google.gson.annotations.SerializedName; + +import lombok.Getter; + +@Getter +public class ShardReplicas { + @SerializedName("shard") + String name; + @SerializedName("replicas") + List replicas; +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/model/ShardingState.java b/src/main/java/io/weaviate/client/v1/cluster/model/ShardingState.java new file mode 100644 index 000000000..5171611c8 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/cluster/model/ShardingState.java @@ -0,0 +1,15 @@ +package io.weaviate.client.v1.cluster.api; + +import java.util.List; + +import com.google.gson.annotations.SerializedName; + +import lombok.Getter; + +@Getter +public class ShardingState { + @SerializedName("collection") + String className; + @SerializedName("shards") + List shards; +} From 3aee3b988f2ec2584f5e8eecd7eabd9d0b16c76f Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 11 Jul 2025 19:57:37 +0200 Subject: [PATCH 2/7] refactor: add Result static factories for common response transformations --- .../java/io/weaviate/client/base/Result.java | 43 +++++++++++++++++++ .../weaviate/client/v1/cluster/Cluster.java | 2 +- .../v1/cluster/api/NodesStatusGetter.java | 4 +- .../client/v1/cluster/api/Replicator.java | 8 ++-- .../v1/cluster/api/ShardingStateQuerier.java | 6 +-- .../replication/api/ReplicationAllGetter.java | 5 +-- .../replication/api/ReplicationQuerier.java | 5 +-- .../model/ReplicateOperationState.java | 2 + .../v1/cluster/model/ReplicationType.java | 2 +- .../v1/cluster/model/ShardReplicas.java | 2 +- .../v1/cluster/model/ShardingState.java | 2 +- 11 files changed, 60 insertions(+), 21 deletions(-) diff --git a/src/main/java/io/weaviate/client/base/Result.java b/src/main/java/io/weaviate/client/base/Result.java index 66cda18e0..df29e31bb 100644 --- a/src/main/java/io/weaviate/client/base/Result.java +++ b/src/main/java/io/weaviate/client/base/Result.java @@ -66,6 +66,49 @@ public Result toErrorResult() { WeaviateErrorResponse.builder().error(this.error.getMessages()).build()); } + /** + * Apply {@code map} function to {@code Response::getBody} and return + * {@link Result} with the transformed body. + * + *

+ * A {@code null}-body is passed as-is. + * + *

+ * Usage: + * + *

{@code @Override
+   * public Result run() {
+   *   // Deserializes into Person.class but returns Person's firstName or null.
+   *   return Result.map(sendGetRequest("/person", Person.class), Person::getFirstName);
+   * }
+   * }
+ */ + public static Result map(Response response, Function map) { + R body = response.getBody() != null + ? map.apply(response.getBody()) + : null; + return new Result<>(response, body); + } + + /** + * Convert {@code T[]} response to a {@code List} response. + * This is handy for all request handlers which returns lists, + * as the current client does not support deserializing into a parametrized + * {@code List.class}. + * + *

+ * Usage: + * + *

{@code @Override
+   * public Result> run() {
+   *   return Result.toList(sendGetRequest("/names", String[].class));
+   * }
+   * }
+ */ + public static Result> toList(Response response) { + return new Result<>(response, Arrays.asList(response.getBody())); + } + /** * Convert {@code Result} response to a {@code Result}. * The result contains true if status code is in 100-299 range. diff --git a/src/main/java/io/weaviate/client/v1/cluster/Cluster.java b/src/main/java/io/weaviate/client/v1/cluster/Cluster.java index ce8506364..8c5e09fce 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/Cluster.java +++ b/src/main/java/io/weaviate/client/v1/cluster/Cluster.java @@ -1,10 +1,10 @@ package io.weaviate.client.v1.cluster; +import io.weaviate.client.Config; import io.weaviate.client.base.http.HttpClient; import io.weaviate.client.v1.cluster.api.NodesStatusGetter; import io.weaviate.client.v1.cluster.api.Replicator; import io.weaviate.client.v1.cluster.api.ShardingStateQuerier; -import io.weaviate.client.Config; public class Cluster { diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/NodesStatusGetter.java b/src/main/java/io/weaviate/client/v1/cluster/api/NodesStatusGetter.java index fb7e7cdc7..c62f20842 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/NodesStatusGetter.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/NodesStatusGetter.java @@ -11,7 +11,6 @@ import io.weaviate.client.Config; import io.weaviate.client.base.BaseClient; import io.weaviate.client.base.ClientResult; -import io.weaviate.client.base.Response; import io.weaviate.client.base.Result; import io.weaviate.client.base.http.HttpClient; import io.weaviate.client.base.util.UrlEncoder; @@ -42,8 +41,7 @@ public NodesStatusGetter withOutput(String output) { @Override public Result run() { - Response resp = sendGetRequest(path(), NodesStatusResponse.class); - return new Result<>(resp); + return new Result<>(sendGetRequest(path(), NodesStatusResponse.class)); } private String path() { diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/Replicator.java b/src/main/java/io/weaviate/client/v1/cluster/api/Replicator.java index 3c29ea85f..178c49b34 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/Replicator.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/Replicator.java @@ -5,11 +5,11 @@ import io.weaviate.client.Config; import io.weaviate.client.base.BaseClient; import io.weaviate.client.base.ClientResult; -import io.weaviate.client.base.Response; import io.weaviate.client.base.Result; import io.weaviate.client.base.http.HttpClient; import io.weaviate.client.v1.cluster.api.Replicator.ResponseBody; import io.weaviate.client.v1.cluster.model.ReplicationType; +import lombok.Getter; public class Replicator extends BaseClient implements ClientResult { private String className; @@ -60,6 +60,7 @@ class RequestBody { ReplicationType replicationType = Replicator.this.replicationType; } + @Getter static class ResponseBody { @SerializedName("id") String replicationId; @@ -67,7 +68,8 @@ static class ResponseBody { @Override public Result run() { - Response resp = sendPostRequest("/replication/replicate", new RequestBody(), ResponseBody.class); - return new Result<>(resp, resp.getBody().replicationId); + return Result.map( + sendPostRequest("/replication/replicate", new RequestBody(), ResponseBody.class), + ResponseBody::getReplicationId); } } diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/ShardingStateQuerier.java b/src/main/java/io/weaviate/client/v1/cluster/api/ShardingStateQuerier.java index 887f08f95..2c6b3768b 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/ShardingStateQuerier.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/ShardingStateQuerier.java @@ -5,12 +5,12 @@ import io.weaviate.client.Config; import io.weaviate.client.base.BaseClient; import io.weaviate.client.base.ClientResult; -import io.weaviate.client.base.Response; import io.weaviate.client.base.Result; import io.weaviate.client.base.http.HttpClient; import io.weaviate.client.base.util.UrlEncoder; import io.weaviate.client.v1.cluster.api.ShardingStateQuerier.ResponseBody; import io.weaviate.client.v1.cluster.model.ShardingState; +import lombok.Getter; public class ShardingStateQuerier extends BaseClient implements ClientResult { private String className; @@ -30,6 +30,7 @@ public ShardingStateQuerier withShard(String shard) { return this; } + @Getter static class ResponseBody { @SerializedName("shardingState") ShardingState state; @@ -41,7 +42,6 @@ public Result run() { if (shard != null) { path += "&" + UrlEncoder.encodeQueryParam("shard", shard); } - Response resp = sendGetRequest(path, ResponseBody.class); - return new Result<>(resp, resp.getBody().state); + return Result.map(sendGetRequest(path, ResponseBody.class), ResponseBody::getState); } } diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java index 2b9b0e2dc..8e36528c9 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java @@ -1,12 +1,10 @@ package io.weaviate.client.v1.cluster.api.replication.api; -import java.util.Arrays; import java.util.List; import io.weaviate.client.Config; import io.weaviate.client.base.BaseClient; import io.weaviate.client.base.ClientResult; -import io.weaviate.client.base.Response; import io.weaviate.client.base.Result; import io.weaviate.client.base.http.HttpClient; import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation; @@ -21,7 +19,6 @@ public ReplicationAllGetter(HttpClient httpClient, Config config) { @Override public Result> run() { String path = "/replication/replicate/list?includeHistory=true"; - Response resp = sendGetRequest(path, ReplicateOperation[].class); - return new Result<>(resp, Arrays.asList(resp.getBody())); + return Result.toList(sendGetRequest(path, ReplicateOperation[].class)); } } diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java index 172d320f3..6f796bbcd 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java @@ -1,7 +1,6 @@ package io.weaviate.client.v1.cluster.api.replication.api; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -10,7 +9,6 @@ import io.weaviate.client.Config; import io.weaviate.client.base.BaseClient; import io.weaviate.client.base.ClientResult; -import io.weaviate.client.base.Response; import io.weaviate.client.base.Result; import io.weaviate.client.base.http.HttpClient; import io.weaviate.client.base.util.UrlEncoder; @@ -57,7 +55,6 @@ public Result> run() { path += "?" + String.join("&", query); } - Response resp = sendGetRequest(path, ReplicateOperation[].class); - return new Result<>(resp, Arrays.asList(resp.getBody())); + return Result.toList(sendGetRequest(path, ReplicateOperation[].class)); } } diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationState.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationState.java index e9a463d1e..547b6aaeb 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationState.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationState.java @@ -1,5 +1,7 @@ package io.weaviate.client.v1.cluster.api.replication.model; +import com.google.gson.annotations.SerializedName; + public enum ReplicateOperationState { @SerializedName("REGISTERED") REGISTERED, diff --git a/src/main/java/io/weaviate/client/v1/cluster/model/ReplicationType.java b/src/main/java/io/weaviate/client/v1/cluster/model/ReplicationType.java index 0d7b1f597..effe190af 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/model/ReplicationType.java +++ b/src/main/java/io/weaviate/client/v1/cluster/model/ReplicationType.java @@ -1,4 +1,4 @@ -package io.weaviate.client.v1.cluster.api; +package io.weaviate.client.v1.cluster.model; import com.google.gson.annotations.SerializedName; diff --git a/src/main/java/io/weaviate/client/v1/cluster/model/ShardReplicas.java b/src/main/java/io/weaviate/client/v1/cluster/model/ShardReplicas.java index e4500c15d..b14c0a2e5 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/model/ShardReplicas.java +++ b/src/main/java/io/weaviate/client/v1/cluster/model/ShardReplicas.java @@ -1,4 +1,4 @@ -package io.weaviate.client.v1.cluster.api; +package io.weaviate.client.v1.cluster.model; import java.util.List; diff --git a/src/main/java/io/weaviate/client/v1/cluster/model/ShardingState.java b/src/main/java/io/weaviate/client/v1/cluster/model/ShardingState.java index 5171611c8..50914899d 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/model/ShardingState.java +++ b/src/main/java/io/weaviate/client/v1/cluster/model/ShardingState.java @@ -1,4 +1,4 @@ -package io.weaviate.client.v1.cluster.api; +package io.weaviate.client.v1.cluster.model; import java.util.List; From ae4d0a356ba3c724e2abc40231783a85c837d5b6 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 14 Jul 2025 11:16:16 +0200 Subject: [PATCH 3/7] feat: add replicate permission --- .../v1/rbac/api/WeaviatePermission.java | 4 ++ .../client/v1/rbac/model/Permission.java | 18 +++++++++ .../v1/rbac/model/ReplicatePermission.java | 37 ++++++++++++++++++ .../v1/rbac/api/WeaviatePermissionTest.java | 7 ++++ .../client/v1/rbac/model/PermissionTest.java | 39 +++++++++++++++++++ .../tests/rbac/ClientRbacTestSuite.java | 5 +++ 6 files changed, 110 insertions(+) create mode 100644 src/main/java/io/weaviate/client/v1/rbac/model/ReplicatePermission.java diff --git a/src/main/java/io/weaviate/client/v1/rbac/api/WeaviatePermission.java b/src/main/java/io/weaviate/client/v1/rbac/api/WeaviatePermission.java index b4cf28317..264e19ffa 100644 --- a/src/main/java/io/weaviate/client/v1/rbac/api/WeaviatePermission.java +++ b/src/main/java/io/weaviate/client/v1/rbac/api/WeaviatePermission.java @@ -9,6 +9,7 @@ import io.weaviate.client.v1.rbac.model.DataPermission; import io.weaviate.client.v1.rbac.model.NodesPermission; import io.weaviate.client.v1.rbac.model.Permission; +import io.weaviate.client.v1.rbac.model.ReplicatePermission; import io.weaviate.client.v1.rbac.model.RolesPermission; import io.weaviate.client.v1.rbac.model.TenantsPermission; import io.weaviate.client.v1.rbac.model.UsersPermission; @@ -34,6 +35,7 @@ public class WeaviatePermission { RolesPermission roles; TenantsPermission tenants; UsersPermission users; + ReplicatePermission replicate; public WeaviatePermission(String action) { this.action = action; @@ -57,6 +59,8 @@ public

> WeaviatePermission(String action, Permission

this.tenants = (TenantsPermission) perm; } else if (perm instanceof UsersPermission) { this.users = (UsersPermission) perm; + } else if (perm instanceof ReplicatePermission) { + this.replicate = (ReplicatePermission) perm; } } diff --git a/src/main/java/io/weaviate/client/v1/rbac/model/Permission.java b/src/main/java/io/weaviate/client/v1/rbac/model/Permission.java index e1d971a11..67a4f114f 100644 --- a/src/main/java/io/weaviate/client/v1/rbac/model/Permission.java +++ b/src/main/java/io/weaviate/client/v1/rbac/model/Permission.java @@ -86,6 +86,12 @@ public static Permission fromWeaviate(WeaviatePermission perm) { return new RolesPermission(roles.getRole(), roles.getScope(), action); } else if (perm.getTenants() != null) { return new TenantsPermission(action); + } else if (perm.getReplicate() != null) { + ReplicatePermission replicate = perm.getReplicate(); + return new ReplicatePermission(replicate.getCollection(), replicate.getShard(), action); + + // Cluster-/UserPermission do not have any additional data, so we can only + // identify them based on the action. } else if (RbacAction.isValid(ClusterPermission.Action.class, action)) { return new ClusterPermission(action); } else if (RbacAction.isValid(UsersPermission.Action.class, action)) { @@ -240,6 +246,18 @@ public static UsersPermission users(UsersPermission.Action... actions) { return new UsersPermission(actions); } + /** + * Create {@link ReplicatePermission}. + * + *

+ * Example: + * {@code Permissions.replicate("Pizza", "shard-123", ReplicatePermission.Action.CREATE)} + */ + public static ReplicatePermission replicate(String collection, String shard, ReplicatePermission.Action... actions) { + checkDeprecation(actions); + return new ReplicatePermission(collection, shard, actions); + } + private static void checkDeprecation(RbacAction... actions) throws IllegalArgumentException { for (RbacAction action : actions) { if (action.isDeprecated()) { diff --git a/src/main/java/io/weaviate/client/v1/rbac/model/ReplicatePermission.java b/src/main/java/io/weaviate/client/v1/rbac/model/ReplicatePermission.java new file mode 100644 index 000000000..7395d3c30 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/rbac/model/ReplicatePermission.java @@ -0,0 +1,37 @@ +package io.weaviate.client.v1.rbac.model; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; + +@Getter +@EqualsAndHashCode(callSuper = true) +public class ReplicatePermission extends Permission { + final String collection; + final String shard; + + public final String getShard() { + return shard != null ? shard : "*"; + } + + public ReplicatePermission(String collection, String shard, Action... actions) { + super(actions); + this.collection = collection; + this.shard = shard; + } + + ReplicatePermission(String collection, String shard, String action) { + this(collection, shard, RbacAction.fromString(Action.class, action)); + } + + @AllArgsConstructor + public enum Action implements RbacAction { + CREATE("create_replicate"), + READ("read_replicate"), + UPDATE("update_replicate"), + DELETE("delete_replicate"); + + @Getter + private final String value; + } +} diff --git a/src/test/java/io/weaviate/client/v1/rbac/api/WeaviatePermissionTest.java b/src/test/java/io/weaviate/client/v1/rbac/api/WeaviatePermissionTest.java index 6438599bb..de6e06305 100644 --- a/src/test/java/io/weaviate/client/v1/rbac/api/WeaviatePermissionTest.java +++ b/src/test/java/io/weaviate/client/v1/rbac/api/WeaviatePermissionTest.java @@ -13,6 +13,7 @@ import io.weaviate.client.v1.rbac.model.DataPermission; import io.weaviate.client.v1.rbac.model.NodesPermission; import io.weaviate.client.v1.rbac.model.Permission; +import io.weaviate.client.v1.rbac.model.ReplicatePermission; import io.weaviate.client.v1.rbac.model.Role; import io.weaviate.client.v1.rbac.model.RolesPermission; import io.weaviate.client.v1.rbac.model.TenantsPermission; @@ -72,6 +73,10 @@ public void testMergedPermissions() { // Read and delete users new WeaviatePermission("read_users", new UsersPermission()), new WeaviatePermission("assign_and_revoke_users", new UsersPermission()), + + // Create and update replications + new WeaviatePermission("create_replicate", new ReplicatePermission("Pizza", "shard-123")), + new WeaviatePermission("update_replicate", new ReplicatePermission("Pizza", "shard-123")), }; Permission[] libraryPermissions = { @@ -89,6 +94,8 @@ public void testMergedPermissions() { new ClusterPermission(ClusterPermission.Action.READ), new TenantsPermission(TenantsPermission.Action.CREATE, TenantsPermission.Action.UPDATE), new UsersPermission(UsersPermission.Action.READ, UsersPermission.Action.ASSIGN_AND_REVOKE), + new ReplicatePermission("Pizza", "shard-123", ReplicatePermission.Action.CREATE, + ReplicatePermission.Action.UPDATE), }; { diff --git a/src/test/java/io/weaviate/client/v1/rbac/model/PermissionTest.java b/src/test/java/io/weaviate/client/v1/rbac/model/PermissionTest.java index 569c44886..6e0574088 100644 --- a/src/test/java/io/weaviate/client/v1/rbac/model/PermissionTest.java +++ b/src/test/java/io/weaviate/client/v1/rbac/model/PermissionTest.java @@ -32,6 +32,7 @@ public static Object[][] serializationTestCases() { ClusterPermission cluster = new ClusterPermission(ClusterPermission.Action.READ); TenantsPermission tenants = new TenantsPermission(TenantsPermission.Action.READ); UsersPermission users = new UsersPermission(UsersPermission.Action.READ); + ReplicatePermission replicate = new ReplicatePermission("Pizza", "shard-123", ReplicatePermission.Action.READ); return new Object[][] { { @@ -79,6 +80,11 @@ public static Object[][] serializationTestCases() { (Supplier>) () -> users, new WeaviatePermission("read_users", users), }, + { + "replicate permission", + (Supplier>) () -> replicate, + new WeaviatePermission("read_replicate", replicate), + }, }; } @@ -109,6 +115,13 @@ public void testDefaultRolesPermission() { .returns(null, RolesPermission::getScope); } + @Test + public void testDefaultReplicatePermission() { + ReplicatePermission perm = new ReplicatePermission("Pizza", null); + assertThat(perm).as("replicate permission returns shard=* on read if one is not specified") + .returns("*", ReplicatePermission::getShard); + } + @DataMethod(source = PermissionTest.class, method = "serializationTestCases") @Name("{0}") @Test @@ -160,6 +173,32 @@ public static Object[][] groupedConstructors() { "update_roles", }, }, + { + Permission.alias("PizzaAlias", + AliasPermission.Action.CREATE, + AliasPermission.Action.READ, + AliasPermission.Action.UPDATE, + AliasPermission.Action.DELETE), + new String[] { + "create_aliases", + "read_aliases", + "update_aliases", + "delete_aliases", + }, + }, + { + Permission.replicate("Pizza", "shard-123", + ReplicatePermission.Action.CREATE, + ReplicatePermission.Action.READ, + ReplicatePermission.Action.UPDATE, + ReplicatePermission.Action.DELETE), + new String[] { + "create_replicate", + "read_replicate", + "update_replicate", + "delete_replicate", + }, + }, }; } diff --git a/src/test/java/io/weaviate/integration/tests/rbac/ClientRbacTestSuite.java b/src/test/java/io/weaviate/integration/tests/rbac/ClientRbacTestSuite.java index 71391e073..f1f597dbf 100644 --- a/src/test/java/io/weaviate/integration/tests/rbac/ClientRbacTestSuite.java +++ b/src/test/java/io/weaviate/integration/tests/rbac/ClientRbacTestSuite.java @@ -31,10 +31,12 @@ import io.weaviate.client.v1.rbac.model.DataPermission; import io.weaviate.client.v1.rbac.model.NodesPermission; import io.weaviate.client.v1.rbac.model.Permission; +import io.weaviate.client.v1.rbac.model.ReplicatePermission; import io.weaviate.client.v1.rbac.model.Role; import io.weaviate.client.v1.rbac.model.RolesPermission; import io.weaviate.client.v1.rbac.model.TenantsPermission; import io.weaviate.client.v1.rbac.model.UserAssignment; +import io.weaviate.client.v1.rbac.model.UsersPermission; import io.weaviate.integration.client.WeaviateDockerImage; import io.weaviate.integration.client.WeaviateWithRbacContainer; @@ -135,6 +137,7 @@ public void testCreate(String _name, Supplier rbac) { String myRole = roleName("VectorOwner"); String myCollection = "Pizza"; String myCollectionAlias = "PizzaAlias"; + String myShard = "shard-123"; Permission[] wantPermissions = new Permission[] { Permission.alias(myCollectionAlias, myCollection, AliasPermission.Action.CREATE), @@ -145,6 +148,8 @@ public void testCreate(String _name, Supplier rbac) { Permission.collections(myCollection, CollectionsPermission.Action.CREATE), Permission.data(myCollection, DataPermission.Action.UPDATE), Permission.tenants(TenantsPermission.Action.DELETE), + Permission.users(UsersPermission.Action.READ), + Permission.replicate(myCollection, myShard, ReplicatePermission.Action.READ), }; try { From 0979369b501bd5aeab032dfbd336a806dc3fa401 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 14 Jul 2025 13:35:16 +0200 Subject: [PATCH 4/7] feat: add integration tests for replication --- .../weaviate/client/v1/cluster/Cluster.java | 6 + .../replication/model/ReplicateOperation.java | 4 + .../model/ReplicateOperationStatus.java | 5 + .../client/WeaviateDockerComposeCluster.java | 6 +- .../client/cluster/ClientClusterTest.java | 41 ++-- .../client/cluster/ClientReplicateTest.java | 211 ++++++++++++++++++ 6 files changed, 252 insertions(+), 21 deletions(-) create mode 100644 src/test/java/io/weaviate/integration/client/cluster/ClientReplicateTest.java diff --git a/src/main/java/io/weaviate/client/v1/cluster/Cluster.java b/src/main/java/io/weaviate/client/v1/cluster/Cluster.java index 8c5e09fce..51b3b057d 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/Cluster.java +++ b/src/main/java/io/weaviate/client/v1/cluster/Cluster.java @@ -5,6 +5,7 @@ import io.weaviate.client.v1.cluster.api.NodesStatusGetter; import io.weaviate.client.v1.cluster.api.Replicator; import io.weaviate.client.v1.cluster.api.ShardingStateQuerier; +import io.weaviate.client.v1.cluster.api.replication.Replication; public class Cluster { @@ -16,6 +17,10 @@ public Cluster(HttpClient httpClient, Config config) { this.httpClient = httpClient; } + public Replication replication() { + return new Replication(httpClient, config); + } + public NodesStatusGetter nodesStatusGetter() { return new NodesStatusGetter(httpClient, config); } @@ -27,4 +32,5 @@ public ShardingStateQuerier shardingStateQuerier() { public Replicator replicator() { return new Replicator(httpClient, config); } + } diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperation.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperation.java index 558ff60b5..4967fc5b6 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperation.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperation.java @@ -5,7 +5,11 @@ import com.google.gson.annotations.SerializedName; import io.weaviate.client.v1.cluster.model.ReplicationType; +import lombok.Getter; +import lombok.ToString; +@Getter +@ToString public class ReplicateOperation { @SerializedName("id") String uuid; diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationStatus.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationStatus.java index 814a2046b..8a5806989 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationStatus.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/model/ReplicateOperationStatus.java @@ -4,6 +4,11 @@ import com.google.gson.annotations.SerializedName; +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString public class ReplicateOperationStatus { @SerializedName("state") ReplicateOperationState state; diff --git a/src/test/java/io/weaviate/integration/client/WeaviateDockerComposeCluster.java b/src/test/java/io/weaviate/integration/client/WeaviateDockerComposeCluster.java index 0d0c63f63..4da5cc665 100644 --- a/src/test/java/io/weaviate/integration/client/WeaviateDockerComposeCluster.java +++ b/src/test/java/io/weaviate/integration/client/WeaviateDockerComposeCluster.java @@ -1,6 +1,7 @@ package io.weaviate.integration.client; import java.time.Duration; + import org.junit.rules.TestRule; import org.junit.runner.Description; import org.junit.runners.model.Statement; @@ -35,8 +36,11 @@ public Weaviate(String dockerImageName, String hostname, Boolean isJoining) { withEnv("RAFT_JOIN", "weaviate-0"); if (isJoining) { withEnv("CLUSTER_JOIN", "weaviate-0:7110"); - waitingFor(Wait.forHttp("/v1/.well-known/ready").forPort(8080).forStatusCode(200).withStartupTimeout(Duration.ofSeconds(10))); + waitingFor(Wait.forHttp("/v1/.well-known/ready").forPort(8080).forStatusCode(200) + .withStartupTimeout(Duration.ofSeconds(10))); } + + withEnv("REPLICA_MOVEMENT_ENABLED", "true"); } } diff --git a/src/test/java/io/weaviate/integration/client/cluster/ClientClusterTest.java b/src/test/java/io/weaviate/integration/client/cluster/ClientClusterTest.java index b925fee3a..77600f321 100644 --- a/src/test/java/io/weaviate/integration/client/cluster/ClientClusterTest.java +++ b/src/test/java/io/weaviate/integration/client/cluster/ClientClusterTest.java @@ -1,5 +1,12 @@ package io.weaviate.integration.client.cluster; +import java.util.function.Supplier; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + import io.weaviate.client.Config; import io.weaviate.client.WeaviateClient; import io.weaviate.client.base.Result; @@ -8,12 +15,6 @@ import io.weaviate.integration.client.WeaviateDockerCompose; import io.weaviate.integration.client.WeaviateTestGenerics; import io.weaviate.integration.tests.cluster.ClusterTestSuite; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; - -import java.util.function.Supplier; public class ClientClusterTest { @@ -37,8 +38,8 @@ public void after() { @Test public void testClusterNodesEndpointWithoutDataWithOutputVerbose() { Supplier> resultSupplier = () -> client.cluster().nodesStatusGetter() - .withOutput(NodeStatusOutput.VERBOSE) - .run(); + .withOutput(NodeStatusOutput.VERBOSE) + .run(); ClusterTestSuite.testNoDataOutputVerbose(resultSupplier); } @@ -46,8 +47,8 @@ public void testClusterNodesEndpointWithoutDataWithOutputVerbose() { @Test public void testClusterNodesEndpointWithDataWithOutputVerbose() throws InterruptedException { Supplier> resultSupplier = () -> client.cluster().nodesStatusGetter() - .withOutput(NodeStatusOutput.VERBOSE) - .run(); + .withOutput(NodeStatusOutput.VERBOSE) + .run(); ClusterTestSuite.testDataOutputVerbose(resultSupplier, testGenerics, client); } @@ -55,25 +56,25 @@ public void testClusterNodesEndpointWithDataWithOutputVerbose() throws Interrupt @Test public void shouldGetNodeStatusPerClassWithOutputVerbose() throws InterruptedException { Supplier> resultSupplierAll = () -> client.cluster().nodesStatusGetter() - .withOutput(NodeStatusOutput.VERBOSE) - .run(); + .withOutput(NodeStatusOutput.VERBOSE) + .run(); Supplier> resultSupplierPizza = () -> client.cluster().nodesStatusGetter() - .withOutput(NodeStatusOutput.VERBOSE) - .withClassName("Pizza") - .run(); + .withOutput(NodeStatusOutput.VERBOSE) + .withClassName("Pizza") + .run(); Supplier> resultSupplierSoup = () -> client.cluster().nodesStatusGetter() - .withOutput(NodeStatusOutput.VERBOSE) - .withClassName("Soup") - .run(); + .withOutput(NodeStatusOutput.VERBOSE) + .withClassName("Soup") + .run(); ClusterTestSuite.testDataPerClassOutputVerbose(resultSupplierAll, resultSupplierPizza, resultSupplierSoup, - testGenerics, client); + testGenerics, client); } @Test public void testClusterNodesEndpointWithOutputMinimalImplicit() { Supplier> resultSupplier = () -> client.cluster().nodesStatusGetter() - .run(); + .run(); ClusterTestSuite.testNoDataOutputMinimalImplicit(resultSupplier); } diff --git a/src/test/java/io/weaviate/integration/client/cluster/ClientReplicateTest.java b/src/test/java/io/weaviate/integration/client/cluster/ClientReplicateTest.java new file mode 100644 index 000000000..98e893844 --- /dev/null +++ b/src/test/java/io/weaviate/integration/client/cluster/ClientReplicateTest.java @@ -0,0 +1,211 @@ +package io.weaviate.integration.client.cluster; + +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import org.assertj.core.api.Assertions; +import org.assertj.core.util.Arrays; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import io.weaviate.client.Config; +import io.weaviate.client.WeaviateClient; +import io.weaviate.client.base.Result; +import io.weaviate.client.v1.cluster.api.replication.Replication; +import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation; +import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperationState; +import io.weaviate.client.v1.cluster.model.NodeStatusOutput; +import io.weaviate.client.v1.cluster.model.NodesStatusResponse; +import io.weaviate.client.v1.cluster.model.ReplicationType; +import io.weaviate.client.v1.cluster.model.ShardReplicas; +import io.weaviate.client.v1.cluster.model.ShardingState; +import io.weaviate.client.v1.schema.model.WeaviateClass; +import io.weaviate.integration.client.WeaviateDockerComposeCluster; + +public class ClientReplicateTest { + @ClassRule + public static WeaviateDockerComposeCluster cluster = new WeaviateDockerComposeCluster(); + + private static WeaviateClient client; + + @Before + public void before() { + Config config = new Config("http", cluster.getHttpHost0Address()); + client = new WeaviateClient(config); + } + + private static final String CLASSNAME = "ShardDweller"; + + @After + public void afterEach() { + client.schema().classDeleter().withClassName(CLASSNAME).run(); + } + + @Test + public void testQueryShardingState() { + // Arrange + Boolean created = client.schema().classCreator() + .withClass(WeaviateClass.builder().className(CLASSNAME).build()) + .run().getResult(); + assumeTrue(created, "created test collection"); + + NodesStatusResponse nodes = client.cluster().nodesStatusGetter() + .withClassName(CLASSNAME) + .withOutput(NodeStatusOutput.VERBOSE) + .run().getResult(); + + assumeTrue(nodes != null, "nodes status result is not null"); + assumeTrue(!Arrays.isArrayEmpty(nodes.getNodes()), "there're 1+ nodes in the cluster"); + String wantShard = nodes.getNodes()[0].getShards()[0].getName(); + + ShardingState shardingState; + + // Act: query by collection name + shardingState = client.cluster().shardingStateQuerier() + .withClassName(CLASSNAME) + .run().getResult(); + Assertions.assertThat(shardingState.getShards()) + .as("shard present in the sharding state output (by collection)") + .extracting(ShardReplicas::getName).contains(wantShard); + + // Act: query by collection + shard name + shardingState = client.cluster().shardingStateQuerier() + .withClassName(CLASSNAME) + .withShard(wantShard) + .run().getResult(); + Assertions.assertThat(shardingState.getShards()) + .as("shard present in the sharding state output (by collection+shard)") + .extracting(ShardReplicas::getName).contains(wantShard); + + ShardingState inexistent; + // Act: query inexistent + inexistent = client.cluster().shardingStateQuerier() + .withClassName("Unknown") + .run().getResult(); + Assertions.assertThat(inexistent).isNull(); + } + + @Test + /** + * This test starts a replication operation between two nodes, + * queries for its status, then cancels the replication and eventually deletes + * it. + * + * Note that assertions that use {@link #eventually} helper may be flaky. + */ + public void testReplicateLifecycle() { + // Arrange + Boolean created = client.schema().classCreator() + .withClass(WeaviateClass.builder().className(CLASSNAME).build()) + .run().getResult(); + assumeTrue(created, "created test collection"); + + NodesStatusResponse nodes = client.cluster().nodesStatusGetter() + .withClassName(CLASSNAME) + .withOutput(NodeStatusOutput.VERBOSE) + .run().getResult(); + + assumeTrue(nodes != null, "nodes status result is not null"); + assumeTrue(nodes.getNodes().length >= 2, "there're 2+ nodes in the cluster"); + + String srcNode = nodes.getNodes()[0].getName(); + String tgtNode = nodes.getNodes()[1].getName(); + String wantShard = nodes.getNodes()[0].getShards()[0].getName(); + + deleteAllReplications(5); + + // Act: kick-off replication + String uuid = client.cluster().replicator() + .withClassName(CLASSNAME) + .withShard(wantShard) + .withSourceNode(srcNode) + .withTargetNode(tgtNode) + .run().getResult(); + assumeTrue(uuid != null, "replication started with valid uuid"); + + // Act: get status + ReplicateOperation status_1 = client.cluster().replication().getter() + .withUuid(uuid).run().getResult(); + + Assertions.assertThat(status_1).isNotNull() + .as("expected replication status") + .returns(CLASSNAME, ReplicateOperation::getClassName) + .returns(wantShard, ReplicateOperation::getShard) + .returns(srcNode, ReplicateOperation::getSourceNode) + .returns(tgtNode, ReplicateOperation::getTargetNode) + .returns(ReplicationType.COPY, ReplicateOperation::getTransferType) + .returns(null, ReplicateOperation::getStatusHistory) + .extracting(ReplicateOperation::getStatus).isNotNull(); + + // Act: get status with history + ReplicateOperation status_2 = client.cluster().replication().getter() + .withUuid(uuid).includeHistory(true) + .run().getResult(); + + Assertions.assertThat(status_2).isNotNull() + .as("includes replication status history") + .extracting(ReplicateOperation::getStatusHistory).isNotNull(); + + // Act: query status + List operations = client.cluster().replication().querier() + .withClassName(CLASSNAME).withShard(wantShard).withTargetNode(tgtNode) + .run().getResult(); + + Assertions.assertThat(operations).as("no. replications").hasSize(1); + + // Act: cancel + Result cancel = client.cluster().replication().canceler().withUuid(uuid).run(); + Assertions.assertThat(cancel).as("cancel error").returns(null, Result::getError); + + eventually(() -> client.cluster().replication().getter().withUuid(uuid).run().getResult() + .getStatus().getState() == ReplicateOperationState.CANCELLED, + 25, "replication was not cancelled"); + + // Act: delete + Result delete = client.cluster().replication().deleter().withUuid(uuid).run(); + Assertions.assertThat(delete).as("delete error").returns(null, Result::getError); + + eventually(() -> client.cluster().replication().allGetter().run().getResult().isEmpty(), + 15, "replication was not deleted"); + } + + private static void deleteAllReplications(int timeoutSeconds) { + Replication replication = client.cluster().replication(); + replication.allDeleter().run(); + eventually(() -> replication.allGetter().run().getResult().isEmpty(), + timeoutSeconds, + "did not delete existing replications"); + } + + private static void eventually(Supplier cond, int timeoutSeconds, String... message) { + CompletableFuture check = CompletableFuture.runAsync(() -> { + while (!Thread.currentThread().isInterrupted() && !cond.get()) { + try { + Thread.sleep(500); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + }); + + try { + check.get(timeoutSeconds, TimeUnit.SECONDS); + } catch (TimeoutException ex) { + check.cancel(true); + Assertions.fail(message.length >= 0 ? message[0] : null, ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + Assertions.fail(ex); + } catch (ExecutionException ex) { + throw new RuntimeException(ex); + } + } +} From d0f250f3c74c4728ee3ea3b3f42e452eb36bc76f Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 14 Jul 2025 14:25:10 +0200 Subject: [PATCH 5/7] feat: add replication ops to async client --- .../java/io/weaviate/client/base/Result.java | 54 +++++ .../client/v1/async/cluster/Cluster.java | 17 +- .../v1/async/cluster/api/Replicator.java | 79 +++++++ .../cluster/api/ShardingStateQuerier.java | 51 ++++ .../cluster/api/replication/Replication.java | 45 ++++ .../api/ReplicationAllDeleter.java | 25 ++ .../replication/api/ReplicationAllGetter.java | 28 +++ .../replication/api/ReplicationCanceler.java | 31 +++ .../replication/api/ReplicationDeleter.java | 31 +++ .../replication/api/ReplicationGetter.java | 43 ++++ .../replication/api/ReplicationQuerier.java | 63 +++++ .../replication/api/ReplicationAllGetter.java | 3 +- .../async/cluster/ClientReplicateTest.java | 223 ++++++++++++++++++ 13 files changed, 690 insertions(+), 3 deletions(-) create mode 100644 src/main/java/io/weaviate/client/v1/async/cluster/api/Replicator.java create mode 100644 src/main/java/io/weaviate/client/v1/async/cluster/api/ShardingStateQuerier.java create mode 100644 src/main/java/io/weaviate/client/v1/async/cluster/api/replication/Replication.java create mode 100644 src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationAllDeleter.java create mode 100644 src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationAllGetter.java create mode 100644 src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationCanceler.java create mode 100644 src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationDeleter.java create mode 100644 src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationGetter.java create mode 100644 src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationQuerier.java create mode 100644 src/test/java/io/weaviate/integration/client/async/cluster/ClientReplicateTest.java diff --git a/src/main/java/io/weaviate/client/base/Result.java b/src/main/java/io/weaviate/client/base/Result.java index df29e31bb..de5f4746b 100644 --- a/src/main/java/io/weaviate/client/base/Result.java +++ b/src/main/java/io/weaviate/client/base/Result.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.Future; import java.util.function.Function; import java.util.stream.Collectors; @@ -90,6 +91,33 @@ public static Result map(Response response, Function map) { return new Result<>(response, body); } + /** + * Apply {@code map} function to {@code Response::getBody} and return + * {@link Future} with the transformed body. + * + *

+ * A {@code null}-body is passed as-is. + * + *

+ * Usage: + * + *

{@code @Override
+   * public Future run(FutureCallback> callback) {
+   *   // Deserializes into Person.class but returns Person's firstName or null.
+   *   return sendGetRequest("/person", callback, Result.mapParser(Person.class, Person::getFirstName));
+   * }
+   * }
+ */ + public static ResponseParser mapParser(Class cls, Function map) { + return new ResponseParser() { + @Override + public Result parse(HttpResponse response, String body, ContentType contentType) { + Response resp = this.serializer.toResponse(response.getCode(), body, cls); + return Result.map(resp, map); + } + }; + } + /** * Convert {@code T[]} response to a {@code List} response. * This is handy for all request handlers which returns lists, @@ -109,6 +137,32 @@ public static Result> toList(Response response) { return new Result<>(response, Arrays.asList(response.getBody())); } + /** + * Convert {@code T[]} response to a {@code List} response. + * This is handy for all request handlers which returns lists, + * as the current client does not support deserializing into a parametrized + * {@code List.class}. + * + *

+ * Usage: + * + *

{@code @Override
+   * public Future> run(FutureCallback> callback) {
+   *   return sendGetRequest("/names", callback, Result.toListParser(String[].class));
+   * }
+   * }
+ */ + public static ResponseParser> toListParser(Class cls) { + return new ResponseParser>() { + + @Override + public Result> parse(HttpResponse response, String body, ContentType contentType) { + Response resp = this.serializer.toResponse(response.getCode(), body, cls); + return Result.toList(resp); + } + }; + } + /** * Convert {@code Result} response to a {@code Result}. * The result contains true if status code is in 100-299 range. diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/Cluster.java b/src/main/java/io/weaviate/client/v1/async/cluster/Cluster.java index e6fb33699..f6271489d 100644 --- a/src/main/java/io/weaviate/client/v1/async/cluster/Cluster.java +++ b/src/main/java/io/weaviate/client/v1/async/cluster/Cluster.java @@ -1,10 +1,14 @@ package io.weaviate.client.v1.async.cluster; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + import io.weaviate.client.Config; import io.weaviate.client.v1.async.cluster.api.NodesStatusGetter; +import io.weaviate.client.v1.async.cluster.api.Replicator; +import io.weaviate.client.v1.async.cluster.api.ShardingStateQuerier; +import io.weaviate.client.v1.async.cluster.api.replication.Replication; import io.weaviate.client.v1.auth.provider.AccessTokenProvider; import lombok.RequiredArgsConstructor; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; @RequiredArgsConstructor public class Cluster { @@ -13,8 +17,19 @@ public class Cluster { private final Config config; private final AccessTokenProvider tokenProvider; + public Replication replication() { + return new Replication(client, config, tokenProvider); + } public NodesStatusGetter nodesStatusGetter() { return new NodesStatusGetter(client, config, tokenProvider); } + + public Replicator replicator() { + return new Replicator(client, config, tokenProvider); + } + + public ShardingStateQuerier shardingStateQuerier() { + return new ShardingStateQuerier(client, config, tokenProvider); + } } diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/api/Replicator.java b/src/main/java/io/weaviate/client/v1/async/cluster/api/Replicator.java new file mode 100644 index 000000000..df3542ae8 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/async/cluster/api/Replicator.java @@ -0,0 +1,79 @@ +package io.weaviate.client.v1.async.cluster.api; + +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client.Config; +import io.weaviate.client.base.AsyncBaseClient; +import io.weaviate.client.base.AsyncClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.v1.auth.provider.AccessTokenProvider; +import io.weaviate.client.v1.cluster.model.ReplicationType; +import lombok.Getter; + +public class Replicator extends AsyncBaseClient implements AsyncClientResult { + private String className; + private String shard; + private String sourceNode; + private String targetNode; + private ReplicationType replicationType; + + public Replicator(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) { + super(httpClient, config, tokenProvider); + } + + public Replicator withClassName(String className) { + this.className = className; + return this; + } + + public Replicator withShard(String shard) { + this.shard = shard; + return this; + } + + public Replicator withSourceNode(String sourceNode) { + this.sourceNode = sourceNode; + return this; + } + + public Replicator withTargetNode(String targetNode) { + this.targetNode = targetNode; + return this; + } + + public Replicator withReplicationType(ReplicationType replicationType) { + this.replicationType = replicationType; + return this; + } + + class RequestBody { + @SerializedName("collection") + String className = Replicator.this.className; + @SerializedName("shard") + String shard = Replicator.this.shard; + @SerializedName("sourceNode") + String sourceNode = Replicator.this.sourceNode; + @SerializedName("targetNode") + String targetNode = Replicator.this.targetNode; + @SerializedName("type") + ReplicationType replicationType = Replicator.this.replicationType; + } + + @Getter + static class ResponseBody { + @SerializedName("id") + String replicationId; + } + + @Override + public Future> run(FutureCallback> callback) { + return sendPostRequest("/replication/replicate", new RequestBody(), + callback, Result.mapParser(ResponseBody.class, ResponseBody::getReplicationId)); + + } +} diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/api/ShardingStateQuerier.java b/src/main/java/io/weaviate/client/v1/async/cluster/api/ShardingStateQuerier.java new file mode 100644 index 000000000..dbf6beaed --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/async/cluster/api/ShardingStateQuerier.java @@ -0,0 +1,51 @@ +package io.weaviate.client.v1.async.cluster.api; + +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client.Config; +import io.weaviate.client.base.AsyncBaseClient; +import io.weaviate.client.base.AsyncClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.base.util.UrlEncoder; +import io.weaviate.client.v1.auth.provider.AccessTokenProvider; +import io.weaviate.client.v1.cluster.model.ShardingState; +import lombok.Getter; + +public class ShardingStateQuerier extends AsyncBaseClient implements AsyncClientResult { + private String className; + private String shard; + + public ShardingStateQuerier(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) { + super(httpClient, config, tokenProvider); + } + + public ShardingStateQuerier withClassName(String className) { + this.className = className; + return this; + } + + public ShardingStateQuerier withShard(String shard) { + this.shard = shard; + return this; + } + + @Getter + static class ResponseBody { + @SerializedName("shardingState") + ShardingState state; + } + + @Override + public Future> run(FutureCallback> callback) { + String path = "/replication/sharding-state?" + UrlEncoder.encodeQueryParam("collection", className); + if (shard != null) { + path += "&" + UrlEncoder.encodeQueryParam("shard", shard); + } + return sendGetRequest(path, callback, Result.mapParser(ResponseBody.class, ResponseBody::getState)); + } +} diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/Replication.java b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/Replication.java new file mode 100644 index 000000000..272a71ae0 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/Replication.java @@ -0,0 +1,45 @@ +package io.weaviate.client.v1.async.cluster.api.replication; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; + +import io.weaviate.client.Config; +import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationAllDeleter; +import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationAllGetter; +import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationCanceler; +import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationDeleter; +import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationGetter; +import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationQuerier; +import io.weaviate.client.v1.auth.provider.AccessTokenProvider; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class Replication { + + private final CloseableHttpAsyncClient client; + private final Config config; + private final AccessTokenProvider tokenProvider; + + public ReplicationGetter getter() { + return new ReplicationGetter(client, config, tokenProvider); + } + + public ReplicationAllGetter allGetter() { + return new ReplicationAllGetter(client, config, tokenProvider); + } + + public ReplicationQuerier querier() { + return new ReplicationQuerier(client, config, tokenProvider); + } + + public ReplicationCanceler canceler() { + return new ReplicationCanceler(client, config, tokenProvider); + } + + public ReplicationDeleter deleter() { + return new ReplicationDeleter(client, config, tokenProvider); + } + + public ReplicationAllDeleter allDeleter() { + return new ReplicationAllDeleter(client, config, tokenProvider); + } +} diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationAllDeleter.java b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationAllDeleter.java new file mode 100644 index 000000000..053b7a69e --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationAllDeleter.java @@ -0,0 +1,25 @@ +package io.weaviate.client.v1.async.cluster.api.replication.api; + +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; + +import io.weaviate.client.Config; +import io.weaviate.client.base.AsyncBaseClient; +import io.weaviate.client.base.AsyncClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.v1.auth.provider.AccessTokenProvider; + +public class ReplicationAllDeleter extends AsyncBaseClient implements AsyncClientResult { + + public ReplicationAllDeleter(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) { + super(httpClient, config, tokenProvider); + } + + @Override + public Future> run(FutureCallback> callback) { + return sendDeleteRequest("/replication/replicate", null, callback, Result.voidToBooleanParser()); + + } +} diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationAllGetter.java b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationAllGetter.java new file mode 100644 index 000000000..52f043ef2 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationAllGetter.java @@ -0,0 +1,28 @@ +package io.weaviate.client.v1.async.cluster.api.replication.api; + +import java.util.List; +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; + +import io.weaviate.client.Config; +import io.weaviate.client.base.AsyncBaseClient; +import io.weaviate.client.base.AsyncClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.v1.auth.provider.AccessTokenProvider; +import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation; + +public class ReplicationAllGetter extends AsyncBaseClient> + implements AsyncClientResult> { + + public ReplicationAllGetter(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) { + super(httpClient, config, tokenProvider); + } + + @Override + public Future>> run(FutureCallback>> callback) { + return sendGetRequest("/replication/replicate/list?includeHistory=true", callback, + Result.toListParser(ReplicateOperation[].class)); + } +} diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationCanceler.java b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationCanceler.java new file mode 100644 index 000000000..2ff83e915 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationCanceler.java @@ -0,0 +1,31 @@ +package io.weaviate.client.v1.async.cluster.api.replication.api; + +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; + +import io.weaviate.client.Config; +import io.weaviate.client.base.AsyncBaseClient; +import io.weaviate.client.base.AsyncClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.v1.auth.provider.AccessTokenProvider; + +public class ReplicationCanceler extends AsyncBaseClient implements AsyncClientResult { + private String uuid; + + public ReplicationCanceler(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) { + super(httpClient, config, tokenProvider); + } + + public ReplicationCanceler withUuid(String uuid) { + this.uuid = uuid; + return this; + } + + @Override + public Future> run(FutureCallback> callback) { + return sendPostRequest("/replication/replicate/" + uuid + "/cancel", null, + callback, Result.voidToBooleanParser()); + } +} diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationDeleter.java b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationDeleter.java new file mode 100644 index 000000000..415a9c0a0 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationDeleter.java @@ -0,0 +1,31 @@ +package io.weaviate.client.v1.async.cluster.api.replication.api; + +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; + +import io.weaviate.client.Config; +import io.weaviate.client.base.AsyncBaseClient; +import io.weaviate.client.base.AsyncClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.v1.auth.provider.AccessTokenProvider; + +public class ReplicationDeleter extends AsyncBaseClient implements AsyncClientResult { + private String uuid; + + public ReplicationDeleter(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) { + super(httpClient, config, tokenProvider); + } + + public ReplicationDeleter withUuid(String uuid) { + this.uuid = uuid; + return this; + } + + @Override + public Future> run(FutureCallback> callback) { + return sendDeleteRequest("/replication/replicate/" + uuid, null, + callback, Result.voidToBooleanParser()); + } +} diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationGetter.java b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationGetter.java new file mode 100644 index 000000000..d5d444933 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationGetter.java @@ -0,0 +1,43 @@ +package io.weaviate.client.v1.async.cluster.api.replication.api; + +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; + +import io.weaviate.client.Config; +import io.weaviate.client.base.AsyncBaseClient; +import io.weaviate.client.base.AsyncClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.base.util.UrlEncoder; +import io.weaviate.client.v1.auth.provider.AccessTokenProvider; +import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation; + +public class ReplicationGetter extends AsyncBaseClient + implements AsyncClientResult { + private String uuid; + private Boolean includeHistory; + + public ReplicationGetter(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) { + super(httpClient, config, tokenProvider); + } + + public ReplicationGetter withUuid(String uuid) { + this.uuid = uuid; + return this; + } + + public ReplicationGetter includeHistory(boolean includeHistory) { + this.includeHistory = includeHistory; + return this; + } + + @Override + public Future> run(FutureCallback> callback) { + String path = "/replication/replicate/" + uuid; + if (includeHistory != null) { + path += "?" + UrlEncoder.encodeQueryParam("includeHistory", includeHistory.toString()); + } + return sendGetRequest(path, ReplicateOperation.class, callback); + } +} diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationQuerier.java b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationQuerier.java new file mode 100644 index 000000000..eee1f6298 --- /dev/null +++ b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationQuerier.java @@ -0,0 +1,63 @@ +package io.weaviate.client.v1.async.cluster.api.replication.api; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.core5.concurrent.FutureCallback; + +import io.weaviate.client.Config; +import io.weaviate.client.base.AsyncBaseClient; +import io.weaviate.client.base.AsyncClientResult; +import io.weaviate.client.base.Result; +import io.weaviate.client.base.util.UrlEncoder; +import io.weaviate.client.v1.auth.provider.AccessTokenProvider; +import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation; + +public class ReplicationQuerier extends AsyncBaseClient> + implements AsyncClientResult> { + private Map queryParams = new HashMap<>(); + + public ReplicationQuerier(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) { + super(httpClient, config, tokenProvider); + } + + public ReplicationQuerier withClassName(String className) { + this.queryParams.put("collection", className); + return this; + } + + public ReplicationQuerier withShard(String shard) { + this.queryParams.put("shard", shard); + return this; + } + + public ReplicationQuerier withTargetNode(String targetNode) { + this.queryParams.put("targetNode", targetNode); + return this; + } + + public ReplicationQuerier includeHistory(boolean includeHistory) { + this.queryParams.put("includeHistory", includeHistory); + return this; + } + + @Override + public Future>> run(FutureCallback>> callback) { + String path = "/replication/replicate/list"; + + List query = new ArrayList<>(); + for (Entry qp : queryParams.entrySet()) { + query.add(UrlEncoder.encodeQueryParam(qp.getKey(), qp.getValue().toString())); + } + + if (!query.isEmpty()) { + path += "?" + String.join("&", query); + } + return sendGetRequest(path, callback, Result.toListParser(ReplicateOperation[].class)); + } +} diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java index 8e36528c9..358957c1e 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationAllGetter.java @@ -18,7 +18,6 @@ public ReplicationAllGetter(HttpClient httpClient, Config config) { @Override public Result> run() { - String path = "/replication/replicate/list?includeHistory=true"; - return Result.toList(sendGetRequest(path, ReplicateOperation[].class)); + return Result.toList(sendGetRequest("/replication/replicate/list?includeHistory=true", ReplicateOperation[].class)); } } diff --git a/src/test/java/io/weaviate/integration/client/async/cluster/ClientReplicateTest.java b/src/test/java/io/weaviate/integration/client/async/cluster/ClientReplicateTest.java new file mode 100644 index 000000000..e83abbdb6 --- /dev/null +++ b/src/test/java/io/weaviate/integration/client/async/cluster/ClientReplicateTest.java @@ -0,0 +1,223 @@ +package io.weaviate.integration.client.async.cluster; + +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.assertj.core.api.Assertions; +import org.assertj.core.util.Arrays; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import io.weaviate.client.Config; +import io.weaviate.client.WeaviateClient; +import io.weaviate.client.base.Result; +import io.weaviate.client.v1.async.WeaviateAsyncClient; +import io.weaviate.client.v1.async.cluster.api.replication.Replication; +import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation; +import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperationState; +import io.weaviate.client.v1.cluster.model.NodeStatusOutput; +import io.weaviate.client.v1.cluster.model.NodesStatusResponse; +import io.weaviate.client.v1.cluster.model.ReplicationType; +import io.weaviate.client.v1.cluster.model.ShardReplicas; +import io.weaviate.client.v1.cluster.model.ShardingState; +import io.weaviate.client.v1.schema.model.WeaviateClass; +import io.weaviate.integration.client.WeaviateDockerComposeCluster; + +public class ClientReplicateTest { + @ClassRule + public static WeaviateDockerComposeCluster cluster = new WeaviateDockerComposeCluster(); + + private static WeaviateAsyncClient client; + + @Before + public void before() { + Config config = new Config("http", cluster.getHttpHost0Address()); + client = new WeaviateClient(config).async(); + } + + private static final String CLASSNAME = "ShardDweller"; + + @After + public void afterEach() { + client.schema().classDeleter().withClassName(CLASSNAME).run(); + } + + @AfterClass + public static void afterAll() { + client.close(); + } + + @Test + public void testQueryShardingState() throws InterruptedException, ExecutionException { + // Arrange + Boolean created = client.schema().classCreator() + .withClass(WeaviateClass.builder().className(CLASSNAME).build()) + .run().get().getResult(); + assumeTrue(created, "created test collection"); + + NodesStatusResponse nodes = client.cluster().nodesStatusGetter() + .withClassName(CLASSNAME) + .withOutput(NodeStatusOutput.VERBOSE) + .run().get().getResult(); + + assumeTrue(nodes != null, "nodes status result is not null"); + assumeTrue(!Arrays.isArrayEmpty(nodes.getNodes()), "there're 1+ nodes in the cluster"); + String wantShard = nodes.getNodes()[0].getShards()[0].getName(); + + ShardingState shardingState; + + // Act: query by collection name + shardingState = client.cluster().shardingStateQuerier() + .withClassName(CLASSNAME) + .run().get().getResult(); + Assertions.assertThat(shardingState.getShards()) + .as("shard present in the sharding state output (by collection)") + .extracting(ShardReplicas::getName).contains(wantShard); + + // Act: query by collection + shard name + shardingState = client.cluster().shardingStateQuerier() + .withClassName(CLASSNAME) + .withShard(wantShard) + .run().get().getResult(); + Assertions.assertThat(shardingState.getShards()) + .as("shard present in the sharding state output (by collection+shard)") + .extracting(ShardReplicas::getName).contains(wantShard); + + ShardingState inexistent; + // Act: query inexistent + inexistent = client.cluster().shardingStateQuerier() + .withClassName("Unknown") + .run().get().getResult(); + Assertions.assertThat(inexistent).isNull(); + } + + @Test + /** + * This test starts a replication operation between two nodes, + * queries for its status, then cancels the replication and eventually deletes + * it. + * + * Note that assertions that use {@link #eventually} helper may be flaky. + */ + public void testReplicateLifecycle() throws InterruptedException, ExecutionException { + // Arrange + Boolean created = client.schema().classCreator() + .withClass(WeaviateClass.builder().className(CLASSNAME).build()) + .run().get().getResult(); + assumeTrue(created, "created test collection"); + + NodesStatusResponse nodes = client.cluster().nodesStatusGetter() + .withClassName(CLASSNAME) + .withOutput(NodeStatusOutput.VERBOSE) + .run().get().getResult(); + + assumeTrue(nodes != null, "nodes status result is not null"); + assumeTrue(nodes.getNodes().length >= 2, "there're 2+ nodes in the cluster"); + + String srcNode = nodes.getNodes()[0].getName(); + String tgtNode = nodes.getNodes()[1].getName(); + String wantShard = nodes.getNodes()[0].getShards()[0].getName(); + + deleteAllReplications(5); + + // Act: kick-off replication + String uuid = client.cluster().replicator() + .withClassName(CLASSNAME) + .withShard(wantShard) + .withSourceNode(srcNode) + .withTargetNode(tgtNode) + .run().get().getResult(); + assumeTrue(uuid != null, "replication started with valid uuid"); + + // Act: get status + ReplicateOperation status_1 = client.cluster().replication().getter() + .withUuid(uuid).run().get().getResult(); + + Assertions.assertThat(status_1).isNotNull() + .as("expected replication status") + .returns(CLASSNAME, ReplicateOperation::getClassName) + .returns(wantShard, ReplicateOperation::getShard) + .returns(srcNode, ReplicateOperation::getSourceNode) + .returns(tgtNode, ReplicateOperation::getTargetNode) + .returns(ReplicationType.COPY, ReplicateOperation::getTransferType) + .returns(null, ReplicateOperation::getStatusHistory) + .extracting(ReplicateOperation::getStatus).isNotNull(); + + // Act: get status with history + ReplicateOperation status_2 = client.cluster().replication().getter() + .withUuid(uuid).includeHistory(true) + .run().get().getResult(); + + Assertions.assertThat(status_2).isNotNull() + .as("includes replication status history") + .extracting(ReplicateOperation::getStatusHistory).isNotNull(); + + // Act: query status + List operations = client.cluster().replication().querier() + .withClassName(CLASSNAME).withShard(wantShard).withTargetNode(tgtNode) + .run().get().getResult(); + + Assertions.assertThat(operations).as("no. replications").hasSize(1); + + // Act: cancel + Result cancel = client.cluster().replication().canceler().withUuid(uuid).run().get(); + Assertions.assertThat(cancel).as("cancel error").returns(null, Result::getError); + + eventually(() -> client.cluster().replication().getter().withUuid(uuid).run().get().getResult() + .getStatus().getState() == ReplicateOperationState.CANCELLED, + 25, "replication was not cancelled"); + + // Act: delete + Result delete = client.cluster().replication().deleter().withUuid(uuid).run().get(); + Assertions.assertThat(delete).as("delete error").returns(null, Result::getError); + + eventually(() -> client.cluster().replication().allGetter().run().get().getResult().isEmpty(), + 15, "replication was not deleted"); + } + + private static void deleteAllReplications(int timeoutSeconds) { + Replication replication = client.cluster().replication(); + replication.allDeleter().run(); + eventually(() -> replication.allGetter().run().get().getResult().isEmpty(), + timeoutSeconds, + "did not delete existing replications"); + } + + private static void eventually(Callable cond, int timeoutSeconds, String... message) { + CompletableFuture check = CompletableFuture.runAsync(() -> { + try { + while (!Thread.currentThread().isInterrupted() && !cond.call()) { + try { + Thread.sleep(500); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } catch (Exception e) { + // Propagate to callee + throw new RuntimeException(e); + } + }); + + try { + check.get(timeoutSeconds, TimeUnit.SECONDS); + } catch (TimeoutException ex) { + check.cancel(true); + Assertions.fail(message.length >= 0 ? message[0] : null, ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + Assertions.fail(ex); + } catch (ExecutionException ex) { + throw new RuntimeException(ex); + } + } +} From 2554157766fed8bdf0293466d03d18003e08052a Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 22 Jul 2025 10:29:35 +0200 Subject: [PATCH 6/7] chore: rename .includeHistory -> .withIncludeHistory --- .../v1/async/cluster/api/replication/api/ReplicationGetter.java | 2 +- .../async/cluster/api/replication/api/ReplicationQuerier.java | 2 +- .../v1/cluster/api/replication/api/ReplicationGetter.java | 2 +- .../v1/cluster/api/replication/api/ReplicationQuerier.java | 2 +- .../integration/client/async/cluster/ClientReplicateTest.java | 2 +- .../integration/client/cluster/ClientReplicateTest.java | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationGetter.java b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationGetter.java index d5d444933..b97ee0048 100644 --- a/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationGetter.java +++ b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationGetter.java @@ -27,7 +27,7 @@ public ReplicationGetter withUuid(String uuid) { return this; } - public ReplicationGetter includeHistory(boolean includeHistory) { + public ReplicationGetter withIncludeHistory(boolean includeHistory) { this.includeHistory = includeHistory; return this; } diff --git a/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationQuerier.java b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationQuerier.java index eee1f6298..0e931bf12 100644 --- a/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationQuerier.java +++ b/src/main/java/io/weaviate/client/v1/async/cluster/api/replication/api/ReplicationQuerier.java @@ -41,7 +41,7 @@ public ReplicationQuerier withTargetNode(String targetNode) { return this; } - public ReplicationQuerier includeHistory(boolean includeHistory) { + public ReplicationQuerier withIncludeHistory(boolean includeHistory) { this.queryParams.put("includeHistory", includeHistory); return this; } diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationGetter.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationGetter.java index 4c54fbbec..8daa34ae4 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationGetter.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationGetter.java @@ -21,7 +21,7 @@ public ReplicationGetter withUuid(String uuid) { return this; } - public ReplicationGetter includeHistory(boolean includeHistory) { + public ReplicationGetter withIncludeHistory(boolean includeHistory) { this.includeHistory = includeHistory; return this; } diff --git a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java index 6f796bbcd..d2ca4824a 100644 --- a/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java +++ b/src/main/java/io/weaviate/client/v1/cluster/api/replication/api/ReplicationQuerier.java @@ -37,7 +37,7 @@ public ReplicationQuerier withTargetNode(String targetNode) { return this; } - public ReplicationQuerier includeHistory(boolean includeHistory) { + public ReplicationQuerier withIncludeHistory(boolean includeHistory) { this.queryParams.put("includeHistory", includeHistory); return this; } diff --git a/src/test/java/io/weaviate/integration/client/async/cluster/ClientReplicateTest.java b/src/test/java/io/weaviate/integration/client/async/cluster/ClientReplicateTest.java index e83abbdb6..39a8abf04 100644 --- a/src/test/java/io/weaviate/integration/client/async/cluster/ClientReplicateTest.java +++ b/src/test/java/io/weaviate/integration/client/async/cluster/ClientReplicateTest.java @@ -154,7 +154,7 @@ public void testReplicateLifecycle() throws InterruptedException, ExecutionExcep // Act: get status with history ReplicateOperation status_2 = client.cluster().replication().getter() - .withUuid(uuid).includeHistory(true) + .withUuid(uuid).withIncludeHistory(true) .run().get().getResult(); Assertions.assertThat(status_2).isNotNull() diff --git a/src/test/java/io/weaviate/integration/client/cluster/ClientReplicateTest.java b/src/test/java/io/weaviate/integration/client/cluster/ClientReplicateTest.java index 98e893844..7aef20940 100644 --- a/src/test/java/io/weaviate/integration/client/cluster/ClientReplicateTest.java +++ b/src/test/java/io/weaviate/integration/client/cluster/ClientReplicateTest.java @@ -147,7 +147,7 @@ public void testReplicateLifecycle() { // Act: get status with history ReplicateOperation status_2 = client.cluster().replication().getter() - .withUuid(uuid).includeHistory(true) + .withUuid(uuid).withIncludeHistory(true) .run().getResult(); Assertions.assertThat(status_2).isNotNull() From c233cc23b22e773743e23dd97ac27547355b54f1 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 22 Jul 2025 10:35:48 +0200 Subject: [PATCH 7/7] fix: add missing parameter --- .../java/io/weaviate/client/v1/rbac/model/PermissionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/weaviate/client/v1/rbac/model/PermissionTest.java b/src/test/java/io/weaviate/client/v1/rbac/model/PermissionTest.java index 6e0574088..f864aa6f9 100644 --- a/src/test/java/io/weaviate/client/v1/rbac/model/PermissionTest.java +++ b/src/test/java/io/weaviate/client/v1/rbac/model/PermissionTest.java @@ -174,7 +174,7 @@ public static Object[][] groupedConstructors() { }, }, { - Permission.alias("PizzaAlias", + Permission.alias("PizzaAlias", "Pizza", AliasPermission.Action.CREATE, AliasPermission.Action.READ, AliasPermission.Action.UPDATE,