diff --git a/src/Client.php b/src/Client.php index cb33c66..a9eb709 100644 --- a/src/Client.php +++ b/src/Client.php @@ -331,8 +331,10 @@ private function disconnect( return; } + [$subscribers, $this->subscribers] = [$this->subscribers, []]; + /** @var ?Subscription $subscription */ - foreach ($this->subscribers as [$_, $subscription]) { + foreach ($subscribers as [$_, $subscription]) { if ($subscription !== null) { $do($subscription); } diff --git a/src/Exception/ConnectionWasClosed.php b/src/Exception/ConnectionWasClosed.php new file mode 100644 index 0000000..1960f78 --- /dev/null +++ b/src/Exception/ConnectionWasClosed.php @@ -0,0 +1,12 @@ +complete($frame); } } - - return; } catch (\Throwable $e) { foreach ($queue as $deferred) { $deferred->error($e); } - + } finally { $running = false; $hooks->dispatch(Hooks\ConnectionClosed::Event); - - return; } } }); diff --git a/src/Internal/Protocol/decodeHeaders.php b/src/Internal/Protocol/decodeHeaders.php index d8094c6..7cc6523 100644 --- a/src/Internal/Protocol/decodeHeaders.php +++ b/src/Internal/Protocol/decodeHeaders.php @@ -38,7 +38,7 @@ function decodeHeaders(string $encoded): Headers } foreach ($lines as $line) { - $keypair = explode(': ', $line); + $keypair = explode(': ', $line, limit: 2); if (\count($keypair) !== 2) { throw new \InvalidArgumentException(\sprintf('Invalid msg header line "%s" received.', $line)); } diff --git a/src/Internal/Rpc/Handler.php b/src/Internal/Rpc/Handler.php index 5e9e51a..2d8936f 100644 --- a/src/Internal/Rpc/Handler.php +++ b/src/Internal/Rpc/Handler.php @@ -9,6 +9,7 @@ use Amp\Future; use Thesis\Nats\Client; use Thesis\Nats\Delivery; +use Thesis\Nats\Exception\ConnectionWasClosed; use Thesis\Nats\Exception\RequestHasNoResponders; use Thesis\Nats\Header\StatusCode; use Thesis\Nats\Internal\Id; @@ -20,8 +21,8 @@ */ final class Handler { - /** @var array */ - private array $futures = []; + /** @var array */ + private array $pendings = []; /** @var non-empty-string */ private readonly string $inboxId; @@ -40,21 +41,24 @@ public function __construct() */ public function setup(\Closure $subscribe, ?Cancellation $cancellation = null): void { - $futures = &$this->futures; + $pendings = &$this->pendings; $inboxId = $this->inboxId; $this->unsubscribe = $subscribe( "{$inboxId}*", static function (Delivery $delivery) use ( - &$futures, + &$pendings, $inboxId, ): void { $replyTo = ReplyTo::parse($inboxId, $delivery->subject); try { - ($futures[$replyTo->token] ?? static fn() => null)($delivery); + $pending = $pendings[$replyTo->token] ?? null; + if ($pending !== null) { + $pending($delivery); + } } finally { - unset($futures[$replyTo->token]); + unset($pendings[$replyTo->token]); } }, $cancellation, @@ -63,13 +67,17 @@ static function (Delivery $delivery) use ( public function shutdown(?Cancellation $cancellation = null): void { - try { - if ($this->unsubscribe !== null) { - ($this->unsubscribe)($cancellation); - } - } finally { - $this->unsubscribe = null; - $this->futures = []; + [$pendings, $this->pendings] = [$this->pendings, []]; + + $e = new ConnectionWasClosed(); + + foreach ($pendings as $pending) { + $pending->deferred->error($e); + } + + [$unsubscribe, $this->unsubscribe] = [$this->unsubscribe, null]; + if ($unsubscribe !== null) { + $unsubscribe($cancellation); } } @@ -86,13 +94,16 @@ public function request( /** @var DeferredFuture $deferred */ $deferred = new DeferredFuture(); - $this->futures[$replyTo->token] = static function (Delivery $delivery) use ($deferred): void { - if ($delivery->message->headers?->get(StatusCode::Header) === Status::NoResponders) { - $deferred->error(new RequestHasNoResponders()); - } else { - $deferred->complete($delivery); - } - }; + $this->pendings[$replyTo->token] = new PendingRequest( + handle: static function (Delivery $delivery) use ($deferred): void { + if ($delivery->message->headers?->get(StatusCode::Header) === Status::NoResponders) { + $deferred->error(new RequestHasNoResponders()); + } else { + $deferred->complete($delivery); + } + }, + deferred: $deferred, + ); $client->publish($subject, $message, $replyTo->subject); diff --git a/src/Internal/Rpc/PendingRequest.php b/src/Internal/Rpc/PendingRequest.php new file mode 100644 index 0000000..e9912e9 --- /dev/null +++ b/src/Internal/Rpc/PendingRequest.php @@ -0,0 +1,28 @@ + $deferred + */ + public function __construct( + public \Closure $handle, + public DeferredFuture $deferred, + ) {} + + public function __invoke(Delivery $delivery): void + { + ($this->handle)($delivery); + } +} diff --git a/tests/Internal/Protocol/HeadersTest.php b/tests/Internal/Protocol/HeadersTest.php index 7295e1a..abf43bc 100644 --- a/tests/Internal/Protocol/HeadersTest.php +++ b/tests/Internal/Protocol/HeadersTest.php @@ -44,6 +44,10 @@ final class HeadersTest extends TestCase new Headers(['X' => ['Y'], StatusCode::Header->value => ['100'], StatusDescription::Header->value => ['idle heartbeat']]), "NATS/1.0 100 idle heartbeat\r\nX: Y\r\n\r\n", ])] + #[TestWith([ + new Headers(['Bar' => ['Baz: Foo']]), + "NATS/1.0\r\nBar: Baz: Foo\r\n\r\n", + ])] public function testEncode(Headers $headers, string $encoded): void { self::assertEquals($encoded, encodeHeaders($headers));