-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEnvelope.php
More file actions
98 lines (92 loc) · 3.19 KB
/
Envelope.php
File metadata and controls
98 lines (92 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
<?php
declare(strict_types=1);
namespace Arcp\Envelope;
use Arcp\Ids\IdempotencyKey;
use Arcp\Ids\JobId;
use Arcp\Ids\MessageId;
use Arcp\Ids\SessionId;
use Arcp\Ids\SpanId;
use Arcp\Ids\StreamId;
use Arcp\Ids\SubscriptionId;
use Arcp\Ids\TraceId;
use Arcp\Version;
/**
* Canonical ARCP message container (RFC §6.1).
*
* Required fields: `arcp`, `id`, `type`, `timestamp`, `payload`.
* Conditional fields are populated based on context (sessions, jobs,
* streams, subscriptions). `correlation_id` and `causation_id` are
* **distinct**: `correlation_id` answers "which command does this
* respond to?", `causation_id` answers "which immediately upstream
* message produced this?".
*
* @phpstan-type ExtensionMap array<string, mixed>
*/
final readonly class Envelope
{
/**
* @param ExtensionMap $extensions
*/
public function __construct(
public MessageId $id,
public MessageType $payload,
public \DateTimeImmutable $timestamp,
public Priority $priority = Priority::Normal,
public ?SessionId $sessionId = null,
public ?JobId $jobId = null,
public ?StreamId $streamId = null,
public ?SubscriptionId $subscriptionId = null,
public ?TraceId $traceId = null,
public ?SpanId $spanId = null,
public ?SpanId $parentSpanId = null,
public ?MessageId $correlationId = null,
public ?MessageId $causationId = null,
public ?IdempotencyKey $idempotencyKey = null,
public ?string $source = null,
public ?string $target = null,
public string $arcp = Version::PROTOCOL_VERSION,
public array $extensions = [],
) {
if (trim($this->arcp) === '') {
throw new \InvalidArgumentException('envelope.arcp must be non-empty');
}
if ($this->source !== null && trim($this->source) === '') {
throw new \InvalidArgumentException('envelope.source must be non-empty when present');
}
if ($this->target !== null && trim($this->target) === '') {
throw new \InvalidArgumentException('envelope.target must be non-empty when present');
}
}
public function type(): string
{
return $this->payload::typeName();
}
/**
* Return a copy with a different `correlationId`. Used when the
* runtime answers a command and needs to point at the incoming
* message id.
*/
public function withCorrelationId(?MessageId $correlationId): self
{
return new self(
id: $this->id,
payload: $this->payload,
timestamp: $this->timestamp,
priority: $this->priority,
sessionId: $this->sessionId,
jobId: $this->jobId,
streamId: $this->streamId,
subscriptionId: $this->subscriptionId,
traceId: $this->traceId,
spanId: $this->spanId,
parentSpanId: $this->parentSpanId,
correlationId: $correlationId,
causationId: $this->causationId,
idempotencyKey: $this->idempotencyKey,
source: $this->source,
target: $this->target,
arcp: $this->arcp,
extensions: $this->extensions,
);
}
}