Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions src/main/java/io/weaviate/client/base/Result.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -66,6 +67,102 @@ public <C> Result<C> toErrorResult() {
WeaviateErrorResponse.builder().error(this.error.getMessages()).build());
}

/**
* Apply {@code map} function to {@code Response::getBody} and return
* {@link Result} with the transformed body.
*
* <p>
* A {@code null}-body is passed as-is.
*
* <p>
* Usage:
*
* <pre>{@code @Override
* public Result<String> run() {
* // Deserializes into Person.class but returns Person's firstName or null.
* return Result.map(sendGetRequest("/person", Person.class), Person::getFirstName);
* }
* }</pre>
*/
public static <T, R> Result<R> map(Response<T> response, Function<T, R> map) {
R body = response.getBody() != null
? map.apply(response.getBody())
: null;
return new Result<>(response, body);
}

/**
* Apply {@code map} function to {@code Response::getBody} and return
* {@link Future} with the transformed body.
*
* <p>
* A {@code null}-body is passed as-is.
*
* <p>
* Usage:
*
* <pre>{@code @Override
* public Future<String> run(FutureCallback<Result<String>> callback) {
* // Deserializes into Person.class but returns Person's firstName or null.
* return sendGetRequest("/person", callback, Result.mapParser(Person.class, Person::getFirstName));
* }
* }</pre>
*/
public static <T, R> ResponseParser<R> mapParser(Class<T> cls, Function<T, R> map) {
return new ResponseParser<R>() {
@Override
public Result<R> parse(HttpResponse response, String body, ContentType contentType) {
Response<T> resp = this.serializer.toResponse(response.getCode(), body, cls);
return Result.map(resp, map);
}
};
}

/**
* Convert {@code T[]} response to a {@code List<T>} response.
* This is handy for all request handlers which returns lists,
* as the current client does not support deserializing into a parametrized
* {@code List.class}.
*
* <p>
* Usage:
*
* <pre>{@code @Override
* public Result<List<String>> run() {
* return Result.toList(sendGetRequest("/names", String[].class));
* }
* }</pre>
*/
public static <T> Result<List<T>> toList(Response<T[]> response) {
return new Result<>(response, Arrays.asList(response.getBody()));
}

/**
* Convert {@code T[]} response to a {@code List<T>} response.
* This is handy for all request handlers which returns lists,
* as the current client does not support deserializing into a parametrized
* {@code List.class}.
*
* <p>
* Usage:
*
* <pre>{@code @Override
* public Future<List<String>> run(FutureCallback<List<String>> callback) {
* return sendGetRequest("/names", callback, Result.toListParser(String[].class));
* }
* }</pre>
*/
public static <T> ResponseParser<List<T>> toListParser(Class<T[]> cls) {
return new ResponseParser<List<T>>() {

@Override
public Result<List<T>> parse(HttpResponse response, String body, ContentType contentType) {
Response<T[]> resp = this.serializer.toResponse(response.getCode(), body, cls);
return Result.toList(resp);
}
};
}

/**
* Convert {@code Result<Void>} response to a {@code Result<Boolean>}.
* The result contains true if status code is in 100-299 range.
Expand Down
17 changes: 16 additions & 1 deletion src/main/java/io/weaviate/client/v1/async/cluster/Cluster.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package io.weaviate.client.v1.async.cluster;

import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;

import io.weaviate.client.Config;
import io.weaviate.client.v1.async.cluster.api.NodesStatusGetter;
import io.weaviate.client.v1.async.cluster.api.Replicator;
import io.weaviate.client.v1.async.cluster.api.ShardingStateQuerier;
import io.weaviate.client.v1.async.cluster.api.replication.Replication;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import lombok.RequiredArgsConstructor;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;

@RequiredArgsConstructor
public class Cluster {
Expand All @@ -13,8 +17,19 @@ public class Cluster {
private final Config config;
private final AccessTokenProvider tokenProvider;

public Replication replication() {
return new Replication(client, config, tokenProvider);
}

public NodesStatusGetter nodesStatusGetter() {
return new NodesStatusGetter(client, config, tokenProvider);
}

public Replicator replicator() {
return new Replicator(client, config, tokenProvider);
}

public ShardingStateQuerier shardingStateQuerier() {
return new ShardingStateQuerier(client, config, tokenProvider);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.weaviate.client.v1.async.cluster.api;

import java.util.concurrent.Future;

import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;

import com.google.gson.annotations.SerializedName;

import io.weaviate.client.Config;
import io.weaviate.client.base.AsyncBaseClient;
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Result;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.cluster.model.ReplicationType;
import lombok.Getter;

public class Replicator extends AsyncBaseClient<String> implements AsyncClientResult<String> {
private String className;
private String shard;
private String sourceNode;
private String targetNode;
private ReplicationType replicationType;

public Replicator(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) {
super(httpClient, config, tokenProvider);
}

public Replicator withClassName(String className) {
this.className = className;
return this;
}

public Replicator withShard(String shard) {
this.shard = shard;
return this;
}

public Replicator withSourceNode(String sourceNode) {
this.sourceNode = sourceNode;
return this;
}

public Replicator withTargetNode(String targetNode) {
this.targetNode = targetNode;
return this;
}

public Replicator withReplicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return this;
}

class RequestBody {
@SerializedName("collection")
String className = Replicator.this.className;
@SerializedName("shard")
String shard = Replicator.this.shard;
@SerializedName("sourceNode")
String sourceNode = Replicator.this.sourceNode;
@SerializedName("targetNode")
String targetNode = Replicator.this.targetNode;
@SerializedName("type")
ReplicationType replicationType = Replicator.this.replicationType;
}

@Getter
static class ResponseBody {
@SerializedName("id")
String replicationId;
}

@Override
public Future<Result<String>> run(FutureCallback<Result<String>> callback) {
return sendPostRequest("/replication/replicate", new RequestBody(),
callback, Result.mapParser(ResponseBody.class, ResponseBody::getReplicationId));

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.weaviate.client.v1.async.cluster.api;

import java.util.concurrent.Future;

import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;

import com.google.gson.annotations.SerializedName;

import io.weaviate.client.Config;
import io.weaviate.client.base.AsyncBaseClient;
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Result;
import io.weaviate.client.base.util.UrlEncoder;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.cluster.model.ShardingState;
import lombok.Getter;

public class ShardingStateQuerier extends AsyncBaseClient<ShardingState> implements AsyncClientResult<ShardingState> {
private String className;
private String shard;

public ShardingStateQuerier(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) {
super(httpClient, config, tokenProvider);
}

public ShardingStateQuerier withClassName(String className) {
this.className = className;
return this;
}

public ShardingStateQuerier withShard(String shard) {
this.shard = shard;
return this;
}

@Getter
static class ResponseBody {
@SerializedName("shardingState")
ShardingState state;
}

@Override
public Future<Result<ShardingState>> run(FutureCallback<Result<ShardingState>> callback) {
String path = "/replication/sharding-state?" + UrlEncoder.encodeQueryParam("collection", className);
if (shard != null) {
path += "&" + UrlEncoder.encodeQueryParam("shard", shard);
}
return sendGetRequest(path, callback, Result.mapParser(ResponseBody.class, ResponseBody::getState));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.weaviate.client.v1.async.cluster.api.replication;

import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;

import io.weaviate.client.Config;
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationAllDeleter;
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationAllGetter;
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationCanceler;
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationDeleter;
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationGetter;
import io.weaviate.client.v1.async.cluster.api.replication.api.ReplicationQuerier;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class Replication {

private final CloseableHttpAsyncClient client;
private final Config config;
private final AccessTokenProvider tokenProvider;

public ReplicationGetter getter() {
return new ReplicationGetter(client, config, tokenProvider);
}

public ReplicationAllGetter allGetter() {
return new ReplicationAllGetter(client, config, tokenProvider);
}

public ReplicationQuerier querier() {
return new ReplicationQuerier(client, config, tokenProvider);
}

public ReplicationCanceler canceler() {
return new ReplicationCanceler(client, config, tokenProvider);
}

public ReplicationDeleter deleter() {
return new ReplicationDeleter(client, config, tokenProvider);
}

public ReplicationAllDeleter allDeleter() {
return new ReplicationAllDeleter(client, config, tokenProvider);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.weaviate.client.v1.async.cluster.api.replication.api;

import java.util.concurrent.Future;

import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;

import io.weaviate.client.Config;
import io.weaviate.client.base.AsyncBaseClient;
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Result;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;

public class ReplicationAllDeleter extends AsyncBaseClient<Boolean> implements AsyncClientResult<Boolean> {

public ReplicationAllDeleter(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) {
super(httpClient, config, tokenProvider);
}

@Override
public Future<Result<Boolean>> run(FutureCallback<Result<Boolean>> callback) {
return sendDeleteRequest("/replication/replicate", null, callback, Result.voidToBooleanParser());

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.weaviate.client.v1.async.cluster.api.replication.api;

import java.util.List;
import java.util.concurrent.Future;

import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;

import io.weaviate.client.Config;
import io.weaviate.client.base.AsyncBaseClient;
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Result;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.cluster.api.replication.model.ReplicateOperation;

public class ReplicationAllGetter extends AsyncBaseClient<List<ReplicateOperation>>
implements AsyncClientResult<List<ReplicateOperation>> {

public ReplicationAllGetter(CloseableHttpAsyncClient httpClient, Config config, AccessTokenProvider tokenProvider) {
super(httpClient, config, tokenProvider);
}

@Override
public Future<Result<List<ReplicateOperation>>> run(FutureCallback<Result<List<ReplicateOperation>>> callback) {
return sendGetRequest("/replication/replicate/list?includeHistory=true", callback,
Result.toListParser(ReplicateOperation[].class));
}
}
Loading