Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"thesis/byte-reader-writer": "^0.1.0",
"thesis/byte-writer": "^0.2.1",
"thesis/endian": "^0.1.0",
"thesis/package-version": "^0.1.2",
"thesis/sync-once": "^0.1.1",
"thesis/time-span": "^0.2.0"
},
Expand Down
2 changes: 1 addition & 1 deletion src/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ classId: Protocol\ClassType::BASIC,
properties: MessageProperties::fromMessage($message),
);

foreach (Internal\chunks($message->body, $this->properties->maxFrame()) as $chunk) {
foreach (Internal\chunks($message->body, $this->properties->frameMax) as $chunk) {
yield new Protocol\Body(
channelId: $this->channelId,
body: $chunk,
Expand Down
2 changes: 1 addition & 1 deletion src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ final class Client
public function __construct(
public readonly Config $config,
) {
$properties = Properties::createDefault();
$properties = new Properties();
$this->hooks = new Hooks();

$this->connectionFactory = new AmqpConnectionFactory(
Expand Down
6 changes: 3 additions & 3 deletions src/Internal/Delivery/ConsumerTagGenerator.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
final class ConsumerTagGenerator
{
private const int TAG_LENGTH_MAX = 0xFF;
private const string PACKAGE_NAME = 'thesis/amqp';
private const string DEFAULT_INFIX = 'thesis/amqp';

/** @var non-empty-string */
private readonly string $infix;
Expand All @@ -21,7 +21,7 @@ final class ConsumerTagGenerator
public function __construct()
{
$command = $_SERVER['argv'][0] ?? null; // @phpstan-ignore offsetAccess.nonOffsetAccessible
$this->infix = \is_string($command) && $command !== '' ? $command : self::PACKAGE_NAME;
$this->infix = \is_string($command) && $command !== '' ? $command : self::DEFAULT_INFIX;
}

/**
Expand All @@ -34,7 +34,7 @@ public function next(): string
$suffix = \sprintf('-%d', ++$this->consumerId);

if (\strlen($prefix) + \strlen($infix) + \strlen($suffix) > self::TAG_LENGTH_MAX) {
$infix = self::PACKAGE_NAME;
$infix = self::DEFAULT_INFIX;
}

return "{$prefix}{$infix}{$suffix}";
Expand Down
39 changes: 27 additions & 12 deletions src/Internal/Io/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,22 @@
use Thesis\Amqp\Internal\Protocol\Auth;
use Thesis\Amqp\Internal\Protocol\Frame;
use Thesis\Amqp\Scheme;
use function Thesis\Package\version;

/**
* @internal
*/
final readonly class AmqpConnectionFactory
{
private const string PRODUCT = 'AMQP 0.9.1 Client';
private const string PLATFORM = 'php';
private const string PACKAGE = 'thesis/amqp';
private const array CAPABILITIES = [
'connection.blocked' => true,
'basic.nack' => true,
'publisher_confirms' => true,
];

public function __construct(
private Config $config,
private Properties $properties,
Expand All @@ -33,25 +43,30 @@ public function connect(): AmqpConnection
$start = $connection->rpc(Frame\ProtocolHeader::frame, Frame\ConnectionStart::class);

$tune = $connection->rpc(
Protocol\Method::connectionStartOk($this->properties->toArray(), Auth\Mechanism::select(
$this->config->sasl,
$start->mechanisms,
)),
Protocol\Method::connectionStartOk(
clientProperties: [
'product' => self::PRODUCT,
'version' => version(self::PACKAGE),
'platform' => self::PLATFORM,
'capabilities' => self::CAPABILITIES,
],
auth: Auth\Mechanism::select($this->config->sasl, $start->mechanisms),
),
Frame\ConnectionTune::class,
);

[$heartbeat, $channelMax, $frameMax] = [
$this->config->heartbeat($tune->heartbeat),
$this->config->channelMax($tune->channelMax),
$this->config->frameMax($tune->frameMax),
];
$heartbeat = $this->config->heartbeat($tune->heartbeat);
$this->properties->channelMax = $this->config->channelMax($tune->channelMax);
$this->properties->frameMax = $this->config->frameMax($tune->frameMax);

$connection->rpc(
Protocol\Method::connectionTuneOk($channelMax, $frameMax, $heartbeat),
Protocol\Method::connectionTuneOk(
channelMax: $this->properties->channelMax,
frameMax: $this->properties->frameMax,
heartbeat: $heartbeat,
),
);

$this->properties->tune($channelMax, $frameMax);

if ($heartbeat > 0) {
$connection->heartbeat($heartbeat);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Internal/Io/ChannelFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ function (Frame\ChannelCloseOk|Frame\ChannelClose $frame) use ($channelId, $conn
*/
private function allocateChannelId(): int
{
for ($id = $this->channelId; $id <= $this->properties->maxChannel(); ++$id) {
for ($id = $this->channelId; $id <= $this->properties->channelMax; ++$id) {
if (!isset($this->channels[$id])) {
$this->channelId = $id + 1;

Expand All @@ -118,6 +118,6 @@ private function allocateChannelId(): int
}
}

throw Exception\NoAvailableChannel::forMaxChannel($this->properties->maxChannel());
throw Exception\NoAvailableChannel::forMaxChannel($this->properties->channelMax);
}
}
87 changes: 6 additions & 81 deletions src/Internal/Properties.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,87 +9,12 @@
*/
final class Properties
{
private const string DEFAULT_PLATFORM = 'php';

/** @var non-negative-int */
private int $maxChannel = 0xFFFF;

/** @var positive-int */
private int $maxFrame = 0xFFFF;

/** @var array<string, bool> */
private array $capabilities = [
'connection.blocked' => true,
'basic.nack' => true,
'publisher_confirms' => true,
];

/** @var non-empty-string */
private string $product = 'AMQP 0.9.1 Client';

/** @var non-empty-string */
private readonly string $version;

/** @var non-empty-string */
private readonly string $platform;

public static function createDefault(): self
{
return new self();
}

/**
* @param non-negative-int $maxChannel
* @param positive-int $maxFrame
*/
public function tune(
int $maxChannel,
int $maxFrame,
): void {
$this->maxChannel = $maxChannel;
$this->maxFrame = $maxFrame;
}

/**
* @param non-empty-string $capability
* @param non-negative-int $channelMax
* @param positive-int $frameMax
*/
public function capable(string $capability): bool
{
return $this->capabilities[$capability] ?? false;
}

/**
* @return non-negative-int
*/
public function maxChannel(): int
{
return $this->maxChannel;
}

/**
* @return positive-int
*/
public function maxFrame(): int
{
return $this->maxFrame;
}

/**
* @return array<string, mixed>
*/
public function toArray(): array
{
return [
'product' => $this->product,
'version' => $this->version,
'platform' => $this->platform,
'capabilities' => $this->capabilities,
];
}

private function __construct()
{
$this->version = VersionProvider::provide();
$this->platform = self::DEFAULT_PLATFORM;
}
public function __construct(
public int $channelMax = 0xFFFF,
public int $frameMax = 0xFFFF,
) {}
}
45 changes: 0 additions & 45 deletions src/Internal/VersionProvider.php

This file was deleted.

29 changes: 0 additions & 29 deletions tests/Internal/PropertiesTest.php

This file was deleted.