Implement graceful shutdown coordination for service lifecycle#823
Open
filinvadim wants to merge 4 commits into
Open
Implement graceful shutdown coordination for service lifecycle#823filinvadim wants to merge 4 commits into
filinvadim wants to merge 4 commits into
Conversation
Implements an industry-standard, load-balancer-friendly shutdown sequence
so the service can be rolled / restarted without dropping in-flight work.
The new GracefulShutdown service listens for ContextClosedEvent and runs
the following ordered phases before bean destruction:
1. Mark the service unhealthy (/health returns 503) and sleep for
`dshackle.shutdown.health-grace-seconds` so the load balancer notices
and stops sending new traffic.
2. Stop accepting new requests on inbound servers (gRPC: server.shutdown,
HTTP/WS proxy: 503 + DisposableServer.disposeNow).
3. Signal long-running streams (gRPC subscriptions, eth_subscribe) to
terminate cleanly via a shared cancel signal.
4. Wait up to `dshackle.shutdown.drain-timeout-seconds` for tracked
in-flight requests to complete; force-cancel anything still pending.
5. Hand off to the standard @PreDestroy lifecycle so upstream connections
close and the access log queue is flushed.
Wiring:
- GrpcServer registers stop-accepting (server.shutdown) and force-close
(awaitTermination then shutdownNow) hooks.
- ProxyServer stores its DisposableServer and registers a force-close hook
(DisposableServer.disposeNow with timeout); ProxyStarter has a
@PreDestroy fallback.
- HealthCheckSetup returns SHUTTING DOWN / 503 once the coordinator marks
the service as shutting down.
- BlockchainRpc.nativeCall and proxy HTTP/WS handlers now register their
request flux with the in-flight tracker; nativeSubscribe, subscribeHead,
subscribeStatus, subscribeNodeStatus and subscribeChainStatus, plus the
eth_subscribe websocket stream, are wired through takeUntilOther on the
cancel signal so subscriptions complete cleanly.
- AccessLogWriter gets a @PreDestroy that drains its event queue so no
access-log lines are lost on shutdown.
Configuration (with defaults):
- dshackle.shutdown.health-grace-seconds=5
- dshackle.shutdown.drain-timeout-seconds=30
- dshackle.shutdown.force-timeout-seconds=10
https://claude.ai/code/session_01NuLxxbuTAUyXcSg5ghN8PT
Reactor declares Mono<T> and Flux<T> with @NonNullApi, so T is bounded to Any (non-nullable) on the Kotlin side. The previous Mono.using/Flux.using approach plus the unbounded type parameter `<T>` produced compile errors. Replace with doOnSubscribe/doFinally and constrain `T : Any` on trackMono and trackFlux. doFinally fires for complete, error, and cancel, so in-flight bookkeeping stays correct. https://claude.ai/code/session_01NuLxxbuTAUyXcSg5ghN8PT
There was a problem hiding this comment.
Pull request overview
Implements a coordinated graceful shutdown flow for Dshackle, aiming to stop taking new traffic, allow in-flight work to drain, cancel long-running streams, and then force-close remaining inbound servers/resources during Spring context shutdown.
Changes:
- Added a new
GracefulShutdownSpring service that orchestrates shutdown phases, provides a stream-cancel signal, and tracks in-flight requests. - Integrated shutdown behavior into inbound servers (gRPC + HTTP/WS proxy) and into long-running subscriptions (gRPC + WS) to terminate cleanly.
- Updated health and access logging components to participate in shutdown (health returns unhealthy; access log flushes on
@PreDestroy), and updated tests for new constructor dependencies.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/main/kotlin/io/emeraldpay/dshackle/GracefulShutdown.kt | New shutdown coordinator (phases, hooks, in-flight tracking, stream cancel signal). |
| src/main/kotlin/io/emeraldpay/dshackle/GrpcServer.kt | Registers shutdown hooks for gRPC stop-accepting + forced termination; improves @PreDestroy fallback. |
| src/main/kotlin/io/emeraldpay/dshackle/rpc/BlockchainRpc.kt | Tracks gRPC native calls as in-flight and terminates streaming RPCs on shutdown signal. |
| src/main/kotlin/io/emeraldpay/dshackle/proxy/HttpHandler.kt | Rejects new HTTP traffic during shutdown; tracks proxy requests as in-flight. |
| src/main/kotlin/io/emeraldpay/dshackle/proxy/WebsocketHandler.kt | Cancels subscription streams on shutdown signal; tracks non-subscription RPC calls. |
| src/main/kotlin/io/emeraldpay/dshackle/proxy/ProxyServer.kt | Stores DisposableServer, adds stop(), and registers force-close hook for shutdown coordination. |
| src/main/kotlin/io/emeraldpay/dshackle/ProxyStarter.kt | Wires GracefulShutdown into proxy server startup; adds @PreDestroy fallback stop. |
| src/main/kotlin/io/emeraldpay/dshackle/monitoring/HealthCheckSetup.kt | Health endpoint returns unhealthy (503) when shutdown begins. |
| src/main/kotlin/io/emeraldpay/dshackle/monitoring/accesslog/AccessLogWriter.kt | Flushes queued access log entries on @PreDestroy. |
| src/test/groovy/io/emeraldpay/dshackle/proxy/WebsocketHandlerSpec.groovy | Updates tests for new WebsocketHandler constructor dependency. |
| src/test/groovy/io/emeraldpay/dshackle/proxy/HttpHandlerSpec.groovy | Updates tests for new HttpHandler constructor dependency. |
| src/test/groovy/io/emeraldpay/dshackle/proxy/ProxyServerSpec.groovy | Updates tests for new ProxyServer constructor dependency. |
| src/test/groovy/io/emeraldpay/dshackle/monitoring/HealthCheckSetupSpec.groovy | Updates tests for new HealthCheckSetup constructor dependency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+59
to
+61
| private val drainComplete = AtomicBoolean(false) | ||
| private val inFlight = AtomicInteger(0) | ||
|
|
Comment on lines
+105
to
+112
| log.info("Flushing access log queue ({} entries pending)", queue.size) | ||
| scheduler.shutdown() | ||
| try { | ||
| scheduler.awaitTermination(2, TimeUnit.SECONDS) | ||
| } catch (e: InterruptedException) { | ||
| Thread.currentThread().interrupt() | ||
| } | ||
| // Drain any remaining queued events. The runner uses a fixed batch limit, |
Comment on lines
116
to
+121
|
|
||
| serverBuilder | ||
| val server = serverBuilder | ||
| .route(this::setupRoutes) | ||
| .runOn(MultiThreadIoEventLoopGroup(NioIoHandler.newFactory())) | ||
| .bindNow() | ||
| this.disposableServer = server |
Comment on lines
+122
to
+128
|
|
||
| gracefulShutdown.registerForceClose("http-proxy-server") { | ||
| stop() | ||
| } | ||
| } | ||
|
|
||
| /** |
Comment on lines
193
to
+200
| val eventHandler: AccessHandlerHttp.RequestHandler = eventHandlerFactory.call() | ||
| val proxyCall = readRpcJson.convertToNativeCall(ProxyCall.RpcType.SINGLE, listOf(call)) | ||
| Mono.from(execute(blockchain, proxyCall, eventHandler)) | ||
| // thought the event handler is used in execute | ||
| // it still needs to be closed at the end, so it can render the logs | ||
| .doFinally { eventHandler.close() } | ||
| gracefulShutdown.trackMono( | ||
| Mono.from(execute(blockchain, proxyCall, eventHandler)) | ||
| // thought the event handler is used in execute | ||
| // it still needs to be closed at the end, so it can render the logs | ||
| .doFinally { eventHandler.close() }, | ||
| ) |
| srv.shutdownNow() | ||
| srv.awaitTermination(5, TimeUnit.SECONDS) | ||
| } | ||
| log.info("GRPC Server shot down") |
ktlint: - Reorder imports to lexicographic order in HttpHandler/ProxyServer/ WebsocketHandler. GracefulShutdown: - Remove unused drainComplete field. AccessLogWriter: - Don't reschedule the flush runner once the scheduler is shut down; swallow RejectedExecutionException from the scheduler/shutdown race. - Synchronize flush() to prevent the periodic runner and the @PreDestroy drain loop from interleaving writes to the access log file. ProxyServer: - Store the Netty MultiThreadIoEventLoopGroup and shutdownGracefully it in stop(), so the worker threads no longer leak after shutdown. - Register a registerStopAccepting hook that closes the server (listening) channel; existing accepted child channels keep serving in-flight requests until the drain phase completes. WebsocketHandler: - Reject new non-subscription RPC calls with a JSON-RPC error during shutdown, mirroring the HTTP-side 503 behavior so new work is not accepted into the drain window. GrpcServer: - Fix log-message typo "shot down" -> "shut down". https://claude.ai/code/session_01NuLxxbuTAUyXcSg5ghN8PT
The previous wiring relied solely on a ContextClosedEvent listener. On SIGINT/SIGTERM, Spring Boot's default shutdown hook calls context.close() which both publishes the event AND tears down beans — and these run concurrently with the listener. As a result, @PreDestroy bean destruction could race the in-flight drain (or, depending on environment, the listener might not see the event at all), and Ctrl+C appeared to kill the process immediately without graceful behavior. Fix: - Disable Spring Boot's auto-registered shutdown hook (app.setRegisterShutdownHook(false)). - Register our own JVM shutdown hook in main() that synchronously runs GracefulShutdown.triggerShutdown() and only then calls context.close() to release @PreDestroy beans. - Extract the shutdown orchestration into a public triggerShutdown() method on GracefulShutdown so both the JVM hook (signal path) and the ContextClosedEvent listener (programmatic / test path) drive the same idempotent sequence guarded by an AtomicBoolean. This makes SIGINT (Ctrl+C) and SIGTERM both go through the full sequence without racing bean destruction. https://claude.ai/code/session_01NuLxxbuTAUyXcSg5ghN8PT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR implements a comprehensive graceful shutdown mechanism that coordinates the orderly termination of the Dshackle service, following industry-standard load-balancer-friendly patterns. The implementation ensures in-flight requests complete cleanly, long-running streams terminate gracefully, and resources are properly released before the application exits.
Key Changes
New
GracefulShutdownservice: A Spring-managed coordinator that orchestrates a 4-phase shutdown sequence:Provides configurable timeouts via properties:
dshackle.shutdown.health-grace-seconds,dshackle.shutdown.drain-timeout-seconds, anddshackle.shutdown.force-timeout-secondsRequest tracking: Added
trackMono()andtrackFlux()methods to count in-flight requests, enabling the coordinator to wait for active calls to complete before forcing shutdownHTTP/WS proxy integration:
ProxyServer.stop()method to cleanly dispose the Netty servergRPC server integration:
shutdownNow()if drain timeout is exceeded@PreDestroycleanup as a safety netWebSocket subscription handling: Long-running subscriptions now listen to
streamsCancelSignal()and terminate cleanly when shutdown beginsgRPC BlockchainRpc: Track all native call requests as in-flight to ensure they complete before shutdown
Health check endpoint: Integrated with graceful shutdown to reject requests during shutdown
Access log flushing: Added
@PreDestroymethod to flush remaining queued log entries during shutdownProxyStarter: Added fallback
@PreDestroyto ensure proxy server stops even if graceful shutdown coordinator doesn't runImplementation Details
AtomicBooleanandAtomicIntegerfor thread-safe shutdown state and in-flight request countingApplicationListener<ContextClosedEvent>withHIGHEST_PRECEDENCEto run before standard bean destructionSinks.replay()for the streams cancel signal so late subscribers still receive the shutdown notificationhttps://claude.ai/code/session_01NuLxxbuTAUyXcSg5ghN8PT