-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathStdioTransport.php
More file actions
121 lines (110 loc) · 3.53 KB
/
StdioTransport.php
File metadata and controls
121 lines (110 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
<?php
declare(strict_types=1);
namespace Arcp\Transport;
use Amp\ByteStream\ReadableResourceStream;
use Amp\ByteStream\ReadableStream;
use Amp\ByteStream\WritableResourceStream;
use Amp\ByteStream\WritableStream;
use Amp\Cancellation;
use Arcp\Envelope\Envelope;
use Arcp\Json\EnvelopeSerializer;
/**
* Newline-delimited JSON transport over arbitrary readable/writable
* streams (RFC §22 mandatory transport for stdio).
*
* Each envelope is encoded with {@see EnvelopeSerializer::encode()} and
* appended with `\n`. The reader buffers input until it sees a newline,
* decodes the envelope, and yields. EOF on read is reported as a
* `null` return from {@see receive()}.
*/
final class StdioTransport implements Transport
{
private string $readBuffer = '';
private bool $closed = false;
public function __construct(
private readonly ReadableStream $reader,
private readonly WritableStream $writer,
private readonly EnvelopeSerializer $serializer,
) {
}
public static function fromResources(mixed $readResource, mixed $writeResource, EnvelopeSerializer $serializer): self
{
if (!\is_resource($readResource) || !\is_resource($writeResource)) {
throw new \Arcp\Errors\InvalidArgumentException('stdio transport requires open resources');
}
return new self(
new ReadableResourceStream($readResource),
new WritableResourceStream($writeResource),
$serializer,
);
}
/** Wire the local process's stdin/stdout. */
public static function localProcess(EnvelopeSerializer $serializer): self
{
/** @var resource $in */
$in = \STDIN;
/** @var resource $out */
$out = \STDOUT;
return self::fromResources($in, $out, $serializer);
}
#[\Override]
public function send(Envelope $env, ?Cancellation $cancellation = null): void
{
if ($this->closed) {
throw new \RuntimeException('stdio transport closed');
}
$line = $this->serializer->encode($env) . "\n";
$this->writer->write($line);
}
#[\Override]
public function receive(?Cancellation $cancellation = null): ?Envelope
{
while (true) {
$newlinePos = strpos($this->readBuffer, "\n");
if ($newlinePos !== false) {
$line = substr($this->readBuffer, 0, $newlinePos);
$this->readBuffer = substr($this->readBuffer, $newlinePos + 1);
$line = rtrim($line, "\r");
if ($line === '') {
continue;
}
return $this->serializer->decode($line);
}
if ($this->closed) {
return null;
}
$chunk = $this->reader->read($cancellation);
if ($chunk === null) {
$this->closed = true;
if ($this->readBuffer === '') {
return null;
}
continue;
}
$this->readBuffer .= $chunk;
}
}
#[\Override]
public function close(): void
{
if ($this->closed) {
return;
}
$this->closed = true;
try {
$this->writer->end();
} catch (\Throwable) {
// already closed; ignore
}
try {
$this->reader->close();
} catch (\Throwable) {
// already closed; ignore
}
}
#[\Override]
public function isClosed(): bool
{
return $this->closed;
}
}