diff --git a/src/Client.php b/src/Client.php index f3149cb..cb33c66 100644 --- a/src/Client.php +++ b/src/Client.php @@ -143,6 +143,7 @@ public function subscribeIterator( * @param ?non-empty-string $queueGroup * @param positive-int $bufferSize * @throws NatsException + * @throws \Throwable */ public function subscribe( string $subject, @@ -164,7 +165,13 @@ public function subscribe( $this->subscribers[$subscriptionId] = [$handler->push(...), $subscription = $handler->subscription]; - $this->connection($cancellation)->execute(Internal\Command::sub($subject, $subscriptionId, $queueGroup)); + try { + $this->connection($cancellation)->execute(Internal\Command::sub($subject, $subscriptionId, $queueGroup)); + } catch (\Throwable $e) { + unset($this->subscribers[$subscriptionId]); + + throw $e; + } return $subscription; } @@ -247,6 +254,7 @@ public function __destruct() * @param callable(Delivery): void $handler * @return \Closure(?Cancellation=): void * @throws NatsException + * @throws \Throwable */ private function subscribeCallback( string $subject, @@ -264,7 +272,13 @@ static function (Delivery $delivery) use ($handler): bool { null, ]; - $this->connection($cancellation)->execute(Internal\Command::sub($subject, $subscriptionId)); + try { + $this->connection($cancellation)->execute(Internal\Command::sub($subject, $subscriptionId)); + } catch (\Throwable $e) { + unset($this->subscribers[$subscriptionId]); + + throw $e; + } return function (?Cancellation $cancellation = null) use ($subscriptionId): void { $this->unsubscribe($subscriptionId, $cancellation); diff --git a/src/Internal/Protocol/decodeHeaders.php b/src/Internal/Protocol/decodeHeaders.php index ef5d0c6..d8094c6 100644 --- a/src/Internal/Protocol/decodeHeaders.php +++ b/src/Internal/Protocol/decodeHeaders.php @@ -63,7 +63,6 @@ function parseStatus(string $line): ?array if (\count($chunks) > 1) { /** * @var numeric-string $code - * @phpstan-ignore offsetAccess.notFound */ $code = $chunks[1]; $description = implode(' ', \array_slice($chunks, 2)); diff --git a/src/JetStream/Internal/PullMessageHandler.php b/src/JetStream/Internal/PullMessageHandler.php index 63c0b64..198f94c 100644 --- a/src/JetStream/Internal/PullMessageHandler.php +++ b/src/JetStream/Internal/PullMessageHandler.php @@ -123,8 +123,8 @@ private function digest(NatsDelivery $delivery, Subscription $subscription): voi } elseif ($statusDescription?->is(Description::LeadershipChange) ?? false) { $this->reset(); } elseif ($statusDescription?->is(Description::MaxBytesExceeded, Description::BatchCompleted, Description::RequestTimeout) ?? false) { - $messagesLeft = $delivery->message->headers?->get(Header\PendingMessages::header()) ?? 0; - $bytesLeft = $delivery->message->headers?->get(Header\PendingBytes::header()) ?? 0; + $messagesLeft = $delivery->message->headers->get(Header\PendingMessages::header()) ?? 0; + $bytesLeft = $delivery->message->headers->get(Header\PendingBytes::header()) ?? 0; $this->pendingMessages -= $messagesLeft;