Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,10 @@ private function disconnect(
return;
}

[$subscribers, $this->subscribers] = [$this->subscribers, []];

/** @var ?Subscription $subscription */
foreach ($this->subscribers as [$_, $subscription]) {
foreach ($subscribers as [$_, $subscription]) {
if ($subscription !== null) {
$do($subscription);
}
Expand Down
12 changes: 12 additions & 0 deletions src/Exception/ConnectionWasClosed.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Thesis\Nats\Exception;

use Thesis\Nats\NatsException;

/**
* @api
*/
final class ConnectionWasClosed extends NatsException {}
6 changes: 1 addition & 5 deletions src/Internal/Connection/SocketConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,13 @@ private function run(): void
$deferred->complete($frame);
}
}

return;
} catch (\Throwable $e) {
foreach ($queue as $deferred) {
$deferred->error($e);
}

} finally {
$running = false;
$hooks->dispatch(Hooks\ConnectionClosed::Event);

return;
}
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/Internal/Protocol/decodeHeaders.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function decodeHeaders(string $encoded): Headers
}

foreach ($lines as $line) {
$keypair = explode(': ', $line);
$keypair = explode(': ', $line, limit: 2);
if (\count($keypair) !== 2) {
throw new \InvalidArgumentException(\sprintf('Invalid msg header line "%s" received.', $line));
}
Expand Down
51 changes: 31 additions & 20 deletions src/Internal/Rpc/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Amp\Future;
use Thesis\Nats\Client;
use Thesis\Nats\Delivery;
use Thesis\Nats\Exception\ConnectionWasClosed;
use Thesis\Nats\Exception\RequestHasNoResponders;
use Thesis\Nats\Header\StatusCode;
use Thesis\Nats\Internal\Id;
Expand All @@ -20,8 +21,8 @@
*/
final class Handler
{
/** @var array<non-empty-string, callable(Delivery): void> */
private array $futures = [];
/** @var array<non-empty-string, PendingRequest> */
private array $pendings = [];

/** @var non-empty-string */
private readonly string $inboxId;
Expand All @@ -40,21 +41,24 @@ public function __construct()
*/
public function setup(\Closure $subscribe, ?Cancellation $cancellation = null): void
{
$futures = &$this->futures;
$pendings = &$this->pendings;
$inboxId = $this->inboxId;

$this->unsubscribe = $subscribe(
"{$inboxId}*",
static function (Delivery $delivery) use (
&$futures,
&$pendings,
$inboxId,
): void {
$replyTo = ReplyTo::parse($inboxId, $delivery->subject);

try {
($futures[$replyTo->token] ?? static fn() => null)($delivery);
$pending = $pendings[$replyTo->token] ?? null;
if ($pending !== null) {
$pending($delivery);
}
} finally {
unset($futures[$replyTo->token]);
unset($pendings[$replyTo->token]);
}
},
$cancellation,
Expand All @@ -63,13 +67,17 @@ static function (Delivery $delivery) use (

public function shutdown(?Cancellation $cancellation = null): void
{
try {
if ($this->unsubscribe !== null) {
($this->unsubscribe)($cancellation);
}
} finally {
$this->unsubscribe = null;
$this->futures = [];
[$pendings, $this->pendings] = [$this->pendings, []];

$e = new ConnectionWasClosed();

foreach ($pendings as $pending) {
$pending->deferred->error($e);
}

[$unsubscribe, $this->unsubscribe] = [$this->unsubscribe, null];
if ($unsubscribe !== null) {
$unsubscribe($cancellation);
}
}

Expand All @@ -86,13 +94,16 @@ public function request(

/** @var DeferredFuture<Delivery> $deferred */
$deferred = new DeferredFuture();
$this->futures[$replyTo->token] = static function (Delivery $delivery) use ($deferred): void {
if ($delivery->message->headers?->get(StatusCode::Header) === Status::NoResponders) {
$deferred->error(new RequestHasNoResponders());
} else {
$deferred->complete($delivery);
}
};
$this->pendings[$replyTo->token] = new PendingRequest(
handle: static function (Delivery $delivery) use ($deferred): void {
if ($delivery->message->headers?->get(StatusCode::Header) === Status::NoResponders) {
$deferred->error(new RequestHasNoResponders());
} else {
$deferred->complete($delivery);
}
},
deferred: $deferred,
);

$client->publish($subject, $message, $replyTo->subject);

Expand Down
28 changes: 28 additions & 0 deletions src/Internal/Rpc/PendingRequest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Thesis\Nats\Internal\Rpc;

use Amp\DeferredFuture;
use Thesis\Nats\Delivery;

/**
* @internal
*/
final readonly class PendingRequest
{
/**
* @param \Closure(Delivery): void $handle
* @param DeferredFuture<Delivery> $deferred
*/
public function __construct(
public \Closure $handle,
public DeferredFuture $deferred,
) {}

public function __invoke(Delivery $delivery): void
{
($this->handle)($delivery);
}
}
4 changes: 4 additions & 0 deletions tests/Internal/Protocol/HeadersTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ final class HeadersTest extends TestCase
new Headers(['X' => ['Y'], StatusCode::Header->value => ['100'], StatusDescription::Header->value => ['idle heartbeat']]),
"NATS/1.0 100 idle heartbeat\r\nX: Y\r\n\r\n",
])]
#[TestWith([
new Headers(['Bar' => ['Baz: Foo']]),
"NATS/1.0\r\nBar: Baz: Foo\r\n\r\n",
])]
public function testEncode(Headers $headers, string $encoded): void
{
self::assertEquals($encoded, encodeHeaders($headers));
Expand Down