Skip to content

Commit 4935ba0

Browse files
committed
fix(streaming): stabilize multi-resume event mirroring
1 parent f7d593c commit 4935ba0

3 files changed

Lines changed: 325 additions & 61 deletions

File tree

code-rs/app-server/src/code_message_processor.rs

Lines changed: 111 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -930,18 +930,70 @@ impl CodexMessageProcessor {
930930
return;
931931
}
932932
};
933-
let Ok(conversation) = self
933+
let conversation = match self
934934
.conversation_manager
935935
.get_conversation(conversation_id)
936936
.await
937-
else {
938-
let error = JSONRPCErrorError {
939-
code: INVALID_REQUEST_ERROR_CODE,
940-
message: format!("conversation not found: {conversation_id}"),
941-
data: None,
942-
};
943-
self.outgoing.send_error(request_id, error).await;
944-
return;
937+
{
938+
Ok(conversation) => conversation,
939+
Err(_) => {
940+
// A listener may attach before this broker process has resumed the
941+
// requested thread. Hydrate from disk so independently started
942+
// clients can still subscribe to the same conversation.
943+
let thread_id = conversation_id.to_string();
944+
let rollout_path = match resolve_rollout_path(&self.config.code_home, &thread_id, None).await {
945+
Ok(path) => path,
946+
Err(_) => {
947+
let error = JSONRPCErrorError {
948+
code: INVALID_REQUEST_ERROR_CODE,
949+
message: format!("conversation not found: {conversation_id}"),
950+
data: None,
951+
};
952+
self.outgoing.send_error(request_id, error).await;
953+
return;
954+
}
955+
};
956+
957+
let config_for_resume = self
958+
.thread_configs
959+
.get(&thread_id)
960+
.cloned()
961+
.unwrap_or_else(|| (*self.config).clone());
962+
963+
match self
964+
.conversation_manager
965+
.resume_conversation_from_rollout(
966+
config_for_resume.clone(),
967+
rollout_path,
968+
self.conversation_manager.auth_manager(),
969+
)
970+
.await
971+
{
972+
Ok(NewConversation {
973+
conversation,
974+
conversation_id: resumed_id,
975+
..
976+
}) => {
977+
let resumed_thread_id = resumed_id.to_string();
978+
self.loaded_threads.insert(resumed_thread_id.clone());
979+
self.thread_configs
980+
.entry(resumed_thread_id.clone())
981+
.or_insert(config_for_resume);
982+
self.start_thread_listener(resumed_thread_id, conversation.clone())
983+
.await;
984+
conversation
985+
}
986+
Err(err) => {
987+
let error = JSONRPCErrorError {
988+
code: INTERNAL_ERROR_CODE,
989+
message: format!("failed to load conversation for listener: {err}"),
990+
data: None,
991+
};
992+
self.outgoing.send_error(request_id, error).await;
993+
return;
994+
}
995+
}
996+
}
945997
};
946998

947999
let subscription_id = Uuid::new_v4();
@@ -2086,6 +2138,18 @@ impl CodexMessageProcessor {
20862138
.await;
20872139
match submit {
20882140
Ok(turn_id) => {
2141+
if let Some(user_message_event) = user_message_event_from_v2_input(&input_items) {
2142+
let synthetic_user_event = Event {
2143+
id: turn_id.clone(),
2144+
event_seq: 0,
2145+
msg: EventMsg::UserMessage(user_message_event),
2146+
order: None,
2147+
};
2148+
self.conversation_streams
2149+
.publish(conversation_id, synthetic_user_event)
2150+
.await;
2151+
}
2152+
20892153
let thread_id = params.thread_id.clone();
20902154
let turn = v2::Turn {
20912155
id: turn_id.clone(),
@@ -5368,6 +5432,44 @@ fn map_v2_user_input(input: v2::UserInput) -> CoreInputItem {
53685432
}
53695433
}
53705434

5435+
fn user_message_event_from_v2_input(
5436+
input: &[v2::UserInput],
5437+
) -> Option<code_protocol::protocol::UserMessageEvent> {
5438+
let mut text_parts: Vec<String> = Vec::new();
5439+
let mut image_urls: Vec<String> = Vec::new();
5440+
let mut local_images: Vec<PathBuf> = Vec::new();
5441+
5442+
for item in input {
5443+
match item {
5444+
v2::UserInput::Text { text, .. } => {
5445+
if !text.trim().is_empty() {
5446+
text_parts.push(text.clone());
5447+
}
5448+
}
5449+
v2::UserInput::Image { url } => image_urls.push(url.clone()),
5450+
v2::UserInput::LocalImage { path } => local_images.push(path.clone()),
5451+
v2::UserInput::Skill { name, path } => {
5452+
text_parts.push(format!("skill:{name} ({})", path.display()));
5453+
}
5454+
v2::UserInput::Mention { name, path } => {
5455+
text_parts.push(format!("mention:{name} ({path})"));
5456+
}
5457+
}
5458+
}
5459+
5460+
if text_parts.is_empty() && image_urls.is_empty() && local_images.is_empty() {
5461+
return None;
5462+
}
5463+
5464+
Some(code_protocol::protocol::UserMessageEvent {
5465+
message: text_parts.join("\n"),
5466+
kind: Some(code_protocol::protocol::InputMessageKind::Plain),
5467+
images: (!image_urls.is_empty()).then_some(image_urls),
5468+
local_images,
5469+
text_elements: Vec::new(),
5470+
})
5471+
}
5472+
53715473
async fn send_v2_notification(outgoing: &OutgoingMessageSender, notification: ServerNotification) {
53725474
let method = notification.to_string();
53735475
let params = match notification.to_params() {

code-rs/app-server/src/conversation_streams.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,15 @@ impl ConversationStreams {
7676

7777
sender.subscribe()
7878
}
79+
80+
pub(crate) async fn publish(&self, conversation_id: ConversationId, event: Event) {
81+
let sender = {
82+
let streams = self.streams.lock().await;
83+
streams.get(&conversation_id).map(|entry| entry.sender.clone())
84+
};
85+
86+
if let Some(sender) = sender {
87+
let _ = sender.send(event);
88+
}
89+
}
7990
}

0 commit comments

Comments
 (0)