Skip to content

Commit 09819df

Browse files
chore(client): refactor closing / shutdown
1 parent 3e6fe1e commit 09819df

6 files changed

Lines changed: 92 additions & 3 deletions

File tree

braintrust-java-client-okhttp/src/main/kotlin/com/braintrustdata/api/client/okhttp/BraintrustOkHttpClient.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ class BraintrustOkHttpClient private constructor() {
126126
* The executor to use for running [AsyncStreamResponse.Handler] callbacks.
127127
*
128128
* Defaults to a dedicated cached thread pool.
129+
*
130+
* This class takes ownership of the executor and shuts it down, if possible, when closed.
129131
*/
130132
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
131133
clientOptions.streamHandlerExecutor(streamHandlerExecutor)

braintrust-java-client-okhttp/src/main/kotlin/com/braintrustdata/api/client/okhttp/BraintrustOkHttpClientAsync.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ class BraintrustOkHttpClientAsync private constructor() {
126126
* The executor to use for running [AsyncStreamResponse.Handler] callbacks.
127127
*
128128
* Defaults to a dedicated cached thread pool.
129+
*
130+
* This class takes ownership of the executor and shuts it down, if possible, when closed.
129131
*/
130132
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
131133
clientOptions.streamHandlerExecutor(streamHandlerExecutor)

braintrust-java-core/src/main/kotlin/com/braintrustdata/api/client/BraintrustClientAsyncImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class BraintrustClientAsyncImpl(private val clientOptions: ClientOptions) : Brai
172172

173173
override fun evals(): EvalServiceAsync = evals
174174

175-
override fun close() = clientOptions.httpClient.close()
175+
override fun close() = clientOptions.close()
176176

177177
class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) :
178178
BraintrustClientAsync.WithRawResponse {

braintrust-java-core/src/main/kotlin/com/braintrustdata/api/client/BraintrustClientImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ class BraintrustClientImpl(private val clientOptions: ClientOptions) : Braintrus
160160

161161
override fun evals(): EvalService = evals
162162

163-
override fun close() = clientOptions.httpClient.close()
163+
override fun close() = clientOptions.close()
164164

165165
class WithRawResponseImpl internal constructor(private val clientOptions: ClientOptions) :
166166
BraintrustClient.WithRawResponse {

braintrust-java-core/src/main/kotlin/com/braintrustdata/api/core/ClientOptions.kt

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import java.time.Clock
1313
import java.time.Duration
1414
import java.util.Optional
1515
import java.util.concurrent.Executor
16+
import java.util.concurrent.ExecutorService
1617
import java.util.concurrent.Executors
1718
import java.util.concurrent.ThreadFactory
1819
import java.util.concurrent.atomic.AtomicLong
@@ -26,6 +27,8 @@ private constructor(
2627
* The HTTP client to use in the SDK.
2728
*
2829
* Use the one published in `braintrust-java-client-okhttp` or implement your own.
30+
*
31+
* This class takes ownership of the client and closes it when closed.
2932
*/
3033
@get:JvmName("httpClient") val httpClient: HttpClient,
3134
/**
@@ -47,6 +50,8 @@ private constructor(
4750
* The executor to use for running [AsyncStreamResponse.Handler] callbacks.
4851
*
4952
* Defaults to a dedicated cached thread pool.
53+
*
54+
* This class takes ownership of the executor and shuts it down, if possible, when closed.
5055
*/
5156
@get:JvmName("streamHandlerExecutor") val streamHandlerExecutor: Executor,
5257
/**
@@ -170,6 +175,8 @@ private constructor(
170175
* The HTTP client to use in the SDK.
171176
*
172177
* Use the one published in `braintrust-java-client-okhttp` or implement your own.
178+
*
179+
* This class takes ownership of the client and closes it when closed.
173180
*/
174181
fun httpClient(httpClient: HttpClient) = apply {
175182
this.httpClient = PhantomReachableClosingHttpClient(httpClient)
@@ -198,9 +205,14 @@ private constructor(
198205
* The executor to use for running [AsyncStreamResponse.Handler] callbacks.
199206
*
200207
* Defaults to a dedicated cached thread pool.
208+
*
209+
* This class takes ownership of the executor and shuts it down, if possible, when closed.
201210
*/
202211
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
203-
this.streamHandlerExecutor = streamHandlerExecutor
212+
this.streamHandlerExecutor =
213+
if (streamHandlerExecutor is ExecutorService)
214+
PhantomReachableExecutorService(streamHandlerExecutor)
215+
else streamHandlerExecutor
204216
}
205217

206218
/**
@@ -440,4 +452,19 @@ private constructor(
440452
)
441453
}
442454
}
455+
456+
/**
457+
* Closes these client options, relinquishing any underlying resources.
458+
*
459+
* This is purposefully not inherited from [AutoCloseable] because the client options are
460+
* long-lived and usually should not be synchronously closed via try-with-resources.
461+
*
462+
* It's also usually not necessary to call this method at all. the default client automatically
463+
* releases threads and connections if they remain idle, but if you are writing an application
464+
* that needs to aggressively release unused resources, then you may call this method.
465+
*/
466+
fun close() {
467+
httpClient.close()
468+
(streamHandlerExecutor as? ExecutorService)?.shutdown()
469+
}
443470
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.braintrustdata.api.core
2+
3+
import java.util.concurrent.Callable
4+
import java.util.concurrent.ExecutorService
5+
import java.util.concurrent.Future
6+
import java.util.concurrent.TimeUnit
7+
8+
/**
9+
* A delegating wrapper around an [ExecutorService] that shuts it down once it's only phantom
10+
* reachable.
11+
*
12+
* This class ensures the [ExecutorService] is shut down even if the user forgets to do it.
13+
*/
14+
internal class PhantomReachableExecutorService(private val executorService: ExecutorService) :
15+
ExecutorService {
16+
init {
17+
closeWhenPhantomReachable(this) { executorService.shutdown() }
18+
}
19+
20+
override fun execute(command: Runnable) = executorService.execute(command)
21+
22+
override fun shutdown() = executorService.shutdown()
23+
24+
override fun shutdownNow(): MutableList<Runnable> = executorService.shutdownNow()
25+
26+
override fun isShutdown(): Boolean = executorService.isShutdown
27+
28+
override fun isTerminated(): Boolean = executorService.isTerminated
29+
30+
override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean =
31+
executorService.awaitTermination(timeout, unit)
32+
33+
override fun <T : Any?> submit(task: Callable<T>): Future<T> = executorService.submit(task)
34+
35+
override fun <T : Any?> submit(task: Runnable, result: T): Future<T> =
36+
executorService.submit(task, result)
37+
38+
override fun submit(task: Runnable): Future<*> = executorService.submit(task)
39+
40+
override fun <T : Any?> invokeAll(
41+
tasks: MutableCollection<out Callable<T>>
42+
): MutableList<Future<T>> = executorService.invokeAll(tasks)
43+
44+
override fun <T : Any?> invokeAll(
45+
tasks: MutableCollection<out Callable<T>>,
46+
timeout: Long,
47+
unit: TimeUnit,
48+
): MutableList<Future<T>> = executorService.invokeAll(tasks, timeout, unit)
49+
50+
override fun <T : Any?> invokeAny(tasks: MutableCollection<out Callable<T>>): T =
51+
executorService.invokeAny(tasks)
52+
53+
override fun <T : Any?> invokeAny(
54+
tasks: MutableCollection<out Callable<T>>,
55+
timeout: Long,
56+
unit: TimeUnit,
57+
): T = executorService.invokeAny(tasks, timeout, unit)
58+
}

0 commit comments

Comments
 (0)