11package io .weaviate .client6 .v1 .internal .grpc ;
22
3+ import static java .util .Objects .requireNonNull ;
4+
5+ import java .util .OptionalInt ;
6+ import java .util .concurrent .CompletableFuture ;
7+ import java .util .concurrent .Executor ;
8+ import java .util .concurrent .TimeUnit ;
9+
10+ import javax .net .ssl .SSLException ;
11+
312import com .google .common .util .concurrent .FutureCallback ;
413import com .google .common .util .concurrent .Futures ;
514import com .google .common .util .concurrent .ListenableFuture ;
1120import io .grpc .netty .shaded .io .netty .handler .ssl .SslContext ;
1221import io .grpc .stub .AbstractStub ;
1322import io .grpc .stub .MetadataUtils ;
23+ import io .grpc .stub .StreamObserver ;
1424import io .weaviate .client6 .v1 .api .WeaviateApiException ;
1525import io .weaviate .client6 .v1 .internal .Proxy ;
1626import io .weaviate .client6 .v1 .internal .grpc .protocol .WeaviateGrpc ;
1727import io .weaviate .client6 .v1 .internal .grpc .protocol .WeaviateGrpc .WeaviateBlockingStub ;
1828import io .weaviate .client6 .v1 .internal .grpc .protocol .WeaviateGrpc .WeaviateFutureStub ;
29+ import io .weaviate .client6 .v1 .internal .grpc .protocol .WeaviateProtoBatch .BatchStreamReply ;
30+ import io .weaviate .client6 .v1 .internal .grpc .protocol .WeaviateProtoBatch .BatchStreamRequest ;
1931
2032import javax .net .ssl .SSLException ;
2133import java .net .InetSocketAddress ;
2436import java .util .concurrent .TimeUnit ;
2537
2638public final class DefaultGrpcTransport implements GrpcTransport {
39+ /**
40+ * ListenableFuture callbacks are executed
41+ * in the same thread they are called from.
42+ */
43+ private static final Executor FUTURE_CALLBACK_EXECUTOR = Runnable ::run ;
44+
45+ private final GrpcChannelOptions transportOptions ;
2746 private final ManagedChannel channel ;
2847
2948 private final WeaviateBlockingStub blockingStub ;
3049 private final WeaviateFutureStub futureStub ;
3150
32- private final GrpcChannelOptions transportOptions ;
33-
3451 private TokenCallCredentials callCredentials ;
3552
3653 public DefaultGrpcTransport (GrpcChannelOptions transportOptions ) {
37- this .transportOptions = transportOptions ;
38- this .channel = buildChannel (transportOptions );
39-
40- var blockingStub = WeaviateGrpc .newBlockingStub (channel )
41- .withInterceptors (MetadataUtils .newAttachHeadersInterceptor (transportOptions .headers ()));
42-
43- var futureStub = WeaviateGrpc .newFutureStub (channel )
44- .withInterceptors (MetadataUtils .newAttachHeadersInterceptor (transportOptions .headers ()));
45-
46- if (transportOptions .maxMessageSize () != null ) {
47- var max = transportOptions .maxMessageSize ();
48- blockingStub = blockingStub .withMaxInboundMessageSize (max ).withMaxOutboundMessageSize (max );
49- futureStub = futureStub .withMaxInboundMessageSize (max ).withMaxOutboundMessageSize (max );
50- }
54+ requireNonNull (transportOptions , "transportOptions is null" );
5155
56+ this .transportOptions = transportOptions ;
5257 if (transportOptions .tokenProvider () != null ) {
5358 this .callCredentials = new TokenCallCredentials (transportOptions .tokenProvider ());
54- blockingStub = blockingStub .withCallCredentials (callCredentials );
55- futureStub = futureStub .withCallCredentials (callCredentials );
5659 }
5760
58- this .blockingStub = blockingStub ;
59- this .futureStub = futureStub ;
61+ this .channel = buildChannel (transportOptions );
62+ this .blockingStub = configure (WeaviateGrpc .newBlockingStub (channel ));
63+ this .futureStub = configure (WeaviateGrpc .newFutureStub (channel ));
6064 }
6165
6266 private <StubT extends AbstractStub <StubT >> StubT applyTimeout (StubT stub , Rpc <?, ?, ?, ?> rpc ) {
6367 if (transportOptions .timeout () == null ) {
6468 return stub ;
6569 }
66- var timeout = rpc .isInsert ()
70+ int timeout = rpc .isInsert ()
6771 ? transportOptions .timeout ().insertSeconds ()
6872 : transportOptions .timeout ().querySeconds ();
6973 return stub .withDeadlineAfter (timeout , TimeUnit .SECONDS );
7074 }
7175
76+ @ Override
77+ public OptionalInt maxMessageSizeBytes () {
78+ return transportOptions .maxMessageSize ();
79+ }
80+
7281 @ Override
7382 public <RequestT , RequestM , ReplyM , ResponseT > ResponseT performRequest (RequestT request ,
7483 Rpc <RequestT , RequestM , ResponseT , ReplyM > rpc ) {
@@ -98,7 +107,9 @@ public <RequestT, RequestM, ReplyM, ResponseT> CompletableFuture<ResponseT> perf
98107 * reusing the thread in which the original future is completed.
99108 */
100109 private static final <T > CompletableFuture <T > toCompletableFuture (ListenableFuture <T > listenable ) {
101- var completable = new CompletableFuture <T >();
110+ requireNonNull (listenable , "listenable is null" );
111+
112+ CompletableFuture <T > completable = new CompletableFuture <>();
102113 Futures .addCallback (listenable , new FutureCallback <T >() {
103114
104115 @ Override
@@ -115,13 +126,14 @@ public void onFailure(Throwable t) {
115126 completable .completeExceptionally (t );
116127 }
117128
118- }, Runnable :: run );
129+ }, FUTURE_CALLBACK_EXECUTOR );
119130 return completable ;
120131 }
121132
122133 private static ManagedChannel buildChannel (GrpcChannelOptions transportOptions ) {
123- var channel = NettyChannelBuilder . forAddress (transportOptions . host (), transportOptions . port () );
134+ requireNonNull (transportOptions , " transportOptions is null" );
124135
136+ NettyChannelBuilder channel = NettyChannelBuilder .forAddress (transportOptions .host (), transportOptions .port ());
125137 if (transportOptions .isSecure ()) {
126138 channel .useTransportSecurity ();
127139 } else {
@@ -163,15 +175,39 @@ private static ManagedChannel buildChannel(GrpcChannelOptions transportOptions)
163175 }
164176
165177 channel .intercept (MetadataUtils .newAttachHeadersInterceptor (transportOptions .headers ()));
166-
167178 return channel .build ();
168179 }
169180
181+ @ Override
182+ public StreamObserver <BatchStreamRequest > createStream (StreamObserver <BatchStreamReply > recv ) {
183+ return configure (WeaviateGrpc .newStub (channel )).batchStream (recv );
184+ }
185+
186+ /** Apply common configuration to a stub. */
187+ private <S extends AbstractStub <S >> S configure (S stub ) {
188+ requireNonNull (stub , "stub is null" );
189+
190+ stub = stub .withInterceptors (MetadataUtils .newAttachHeadersInterceptor (transportOptions .headers ()));
191+ if (transportOptions .maxMessageSize ().isPresent ()) {
192+ int max = transportOptions .maxMessageSize ().getAsInt ();
193+ stub = stub .withMaxInboundMessageSize (max ).withMaxOutboundMessageSize (max );
194+ }
195+ if (callCredentials != null ) {
196+ stub = stub .withCallCredentials (callCredentials );
197+ }
198+ return stub ;
199+ }
200+
170201 @ Override
171202 public void close () throws Exception {
172203 channel .shutdown ();
173204 if (callCredentials != null ) {
174205 callCredentials .close ();
175206 }
176207 }
208+
209+ @ Override
210+ public String host () {
211+ return transportOptions .host ();
212+ }
177213}
0 commit comments