From 38456e709c80e42e7c6a5db1037721d7915d1a74 Mon Sep 17 00:00:00 2001 From: kafkiansky Date: Sun, 22 Mar 2026 17:35:44 +0300 Subject: [PATCH] Fix graceful shutdown --- src/Client.php | 43 ++++++++++++-------- src/Internal/Connection/ConnectionState.php | 16 ++++++++ src/Internal/Connection/SocketConnection.php | 34 ++++++++-------- 3 files changed, 59 insertions(+), 34 deletions(-) create mode 100644 src/Internal/Connection/ConnectionState.php diff --git a/src/Client.php b/src/Client.php index a9eb709..55020b8 100644 --- a/src/Client.php +++ b/src/Client.php @@ -7,6 +7,7 @@ use Amp\Cancellation; use Amp\Future; use Amp\Pipeline; +use Thesis\Nats\Exception\ConnectionWasClosed; use Thesis\Nats\Internal\Connection; use Thesis\Nats\Internal\Hooks; use Thesis\Nats\Internal\Id; @@ -21,6 +22,7 @@ use Thesis\Nats\Serialization\Serializer; use Thesis\Nats\Serialization\ValinorSerializer; use function Amp\async; +use function Amp\weakClosure; /** * @api @@ -331,36 +333,43 @@ private function disconnect( return; } - [$subscribers, $this->subscribers] = [$this->subscribers, []]; + $completes = []; - /** @var ?Subscription $subscription */ - foreach ($subscribers as [$_, $subscription]) { + foreach ($this->subscribers as [$_, $subscription]) { if ($subscription !== null) { - $do($subscription); + $completes[] = async($do, $subscription); } } - $this->rpc?->await($cancellation)?->shutdown($cancellation); + $rpc = $this->rpc?->await($cancellation); - $this->rpc = null; - $this->connection = null; - $connection->close(); + if ($rpc !== null) { + $completes[] = async($rpc->shutdown(...), $cancellation); + } + + try { + Future\await($completes, $cancellation); + } finally { + $this->subscribers = []; + $this->rpc = null; + $this->connection = null; + $connection->close(); + } } private function connection(?Cancellation $cancellation = null): Connection\Connection { - $connectionFactory = $this->connectionFactory; - $invokeSubscriber = $this->invokeSubscriber(...); + $this->connection ??= async(weakClosure(function () use ($cancellation): Connection\Connection { + $connection = $this->connectionFactory->connect(); + $connection->hooks()->onMessage($this->invokeSubscriber(...)); + $connection->hooks()->onClose(function () use ($cancellation): void { + $e = new ConnectionWasClosed(); - $this->connection ??= async(static function () use ( - $connectionFactory, - $invokeSubscriber, - ): Connection\Connection { - $connection = $connectionFactory->connect(); - $connection->hooks()->onMessage($invokeSubscriber); + $this->disconnect(static fn(Subscription $subscription) => $subscription->error($e, $cancellation)); + }); return $connection; - }); + })); return $this->connection->await($cancellation); } diff --git a/src/Internal/Connection/ConnectionState.php b/src/Internal/Connection/ConnectionState.php new file mode 100644 index 0000000..e7d835f --- /dev/null +++ b/src/Internal/Connection/ConnectionState.php @@ -0,0 +1,16 @@ +running) { + if ($this->state !== ConnectionState::Alive) { $this->run(); } @@ -124,8 +124,7 @@ public function info(): ConnectionInfo public function close(): void { $this->hooks->dispatch(Hooks\ConnectionClosed::Event); - - $this->running = false; + $this->state = ConnectionState::GracefulClosed; $this->socket->close(); } @@ -134,15 +133,17 @@ private function run(): void $framer = $this->framer; $queue = $this->queue; $hooks = $this->hooks; - $running = &$this->running; + $socket = $this->socket; + $state = &$this->state; EventLoop::queue(static function () use ( $framer, $queue, $hooks, - &$running, + $socket, + &$state, ): void { - while ($running) { + while ($state === ConnectionState::Alive) { try { while (($frame = $framer->readFrame()) !== null) { $event = match (true) { @@ -170,13 +171,16 @@ private function run(): void $deferred->error($e); } } finally { - $running = false; - $hooks->dispatch(Hooks\ConnectionClosed::Event); + if ($state !== ConnectionState::GracefulClosed) { // @phpstan-ignore notIdentical.alwaysTrue + $state = ConnectionState::Closed; + $socket->close(); + $hooks->dispatch(Hooks\ConnectionClosed::Event); + } } } }); - $this->running = true; + $this->state = ConnectionState::Alive; } /** @@ -184,14 +188,10 @@ private function run(): void */ private function generateSignature(?string $nonce, ?string $nkey): ?string { - if ($nonce === null) { - return null; - } - - if ($nkey === null) { - return null; + if ($nonce !== null && $nkey !== null) { + return $this->signer->sign($nonce, $nkey); } - return $this->signer->sign($nonce, $nkey); + return null; } }