-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.php
More file actions
130 lines (115 loc) · 3.81 KB
/
main.php
File metadata and controls
130 lines (115 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
<?php
declare(strict_types=1);
/*
* cancellation — two scenarios over the §10.4 / §10.5 control surface:
* - `cancel`: cooperative termination with a deadline.
* - `interrupt`: pause the job and route through a human, no termination.
*/
require __DIR__ . '/../../vendor/autoload.php';
use function Amp\delay;
use Arcp\Client\ARCPClient;
use Arcp\Clock\SystemClock;
use Arcp\Envelope\Envelope;
use Arcp\Errors\FailedPreconditionException;
use Arcp\Ids\JobId;
use Arcp\Ids\MessageId;
use Arcp\Messages\Control\Interrupt;
const CANCEL_DEADLINE_MS = 5_000;
function startLongJob(ARCPClient $client): JobId
{
// tool.invoke that the runtime promotes into a long-running job.
// Real impl reads the job id from the JobAccepted (sent
// synchronously alongside the ToolResult future).
$client->invokeTool('demo.long_running', ['work_seconds' => 600]);
throw new \RuntimeException('not implemented');
}
/**
* Cooperative cancel. Runtime drives target to a clean checkpoint
* inside `deadline_ms` before terminating; escalates to ABORTED on
* timeout (RFC §10.4).
*/
function cancelJob(ARCPClient $client, JobId $jobId, string $reason, int $deadlineMs): void
{
try {
$client->cancelJob($jobId, $reason, $deadlineMs);
} catch (FailedPreconditionException $e) {
// Surfaced as cancel.refused → FailedPrecondition by the client.
throw $e;
}
}
/**
* Distinct from cancel: pauses the job (`blocked`), runtime emits
* `human.input.request`. Job is NOT terminated (RFC §10.5).
*/
function interruptJob(ARCPClient $client, JobId $jobId, string $prompt): void
{
$client->session->transport->send(new Envelope(
id: MessageId::random(),
payload: new Interrupt(target: 'job', targetId: (string) $jobId, prompt: $prompt),
timestamp: new SystemClock()->now(),
sessionId: $client->session->sessionId,
jobId: $jobId,
));
}
function awaitTerminal(ARCPClient $client, JobId $jobId): Envelope
{
// Real impl subscribes to the session, filters by jobId, returns
// the first envelope whose type ∈ {job.completed, job.failed,
// job.cancelled}.
throw new \RuntimeException('not implemented');
}
function scenarioCancel(): void
{
/** @var ARCPClient $client */
$client = elided(); // transport, identity, auth elided
try {
$jobId = startLongJob($client);
delay(2.0); // let the job actually start
cancelJob($client, $jobId, 'user_aborted', CANCEL_DEADLINE_MS);
echo "cancel ack\n";
$terminal = awaitTerminal($client, $jobId);
printf("terminal: %s\n", $terminal->type());
} finally {
$client->close();
}
}
function scenarioInterrupt(): void
{
/** @var ARCPClient $client */
$client = elided();
try {
$jobId = startLongJob($client);
delay(2.0);
interruptJob($client, $jobId, 'Pause and ask before touching production tables.');
// Runtime now emits human.input.request; answer via human_input sample.
$client->subscribe(
['types' => ['human.input.request']],
static function (Envelope $env) use ($jobId): void {
if ($env->jobId !== null && (string) $env->jobId === (string) $jobId) {
printf("awaiting human: %s\n", $env->payload::typeName());
}
},
);
} finally {
$client->close();
}
}
function main(): void
{
/** @var list<string> $argv */
global $argv;
$which = isset($argv[1]) ? (string) $argv[1] : 'cancel';
if ($which === 'cancel') {
scenarioCancel();
} elseif ($which === 'interrupt') {
scenarioInterrupt();
} else {
fwrite(STDERR, "unknown scenario: {$which}\n");
exit(1);
}
}
function elided(): ARCPClient
{
throw new \RuntimeException('not implemented');
}
main();