diff --git a/ds4_server.c b/ds4_server.c index 435491fe..99539c58 100644 --- a/ds4_server.c +++ b/ds4_server.c @@ -9208,15 +9208,103 @@ typedef struct { /* SSE keepalive during long prefill: send HTTP/SSE headers ahead of * generation and emit a `:` comment line every few seconds so HTTP/TCP * idle timeouts on the client side don't close the connection while the - * server is busy doing prefill. */ + * server is busy doing prefill. The wall-clock keepalive thread (see + * server_prefill_keepalive) emits comments independently of the prefill + * progress callback so a stall inside a single prefill chunk still keeps + * the connection alive. */ int fd; bool stream; bool enable_cors; bool headers_sent; bool stream_failed; double last_keepalive; + struct server_prefill_keepalive *keepalive; } server_prefill_progress; +/* Wall-clock keepalive thread state. Decoupled from the prefill_chunk + * progress callback because internal stalls inside a single chunk (slow + * matmul tile, KV disk fetch, etc.) prevent the callback from firing for + * minutes at a time; the server would then sit silent on the socket and the + * client would hit its body-idle timeout. The thread ticks roughly once a + * second and writes `: prefill\n\n` whenever 5s have elapsed since the last + * keepalive event. Both the thread and the progress callback acquire `mu` + * before touching the fd or any of the shared keepalive fields on + * server_prefill_progress. */ +typedef struct server_prefill_keepalive { + server_prefill_progress *progress; + pthread_t thread; + pthread_mutex_t mu; + pthread_cond_t cv; + bool active; + bool stop; +} server_prefill_keepalive; + +static void *server_prefill_keepalive_thread(void *ud) { + server_prefill_keepalive *ka = ud; + server_prefill_progress *p = ka->progress; + pthread_mutex_lock(&ka->mu); + while (!ka->stop) { + struct timeval tv; + gettimeofday(&tv, NULL); + struct timespec ts; + ts.tv_sec = tv.tv_sec + 1; + ts.tv_nsec = (long)tv.tv_usec * 1000; + pthread_cond_timedwait(&ka->cv, &ka->mu, &ts); + if (ka->stop) break; + if (!p->stream || p->fd < 0 || p->stream_failed) continue; + double now = now_sec(); + if (!p->headers_sent) { + p->headers_sent = true; + if (sse_headers(p->fd, p->enable_cors)) { + p->last_keepalive = now; + } else { + p->stream_failed = true; + } + } else if (now - p->last_keepalive >= 5.0) { + static const char ka_msg[] = ": prefill\n\n"; + if (send_all(p->fd, ka_msg, sizeof(ka_msg) - 1)) { + p->last_keepalive = now; + } else { + p->stream_failed = true; + } + } + } + pthread_mutex_unlock(&ka->mu); + return NULL; +} + +static void server_prefill_keepalive_attach(server_prefill_keepalive *ka, + server_prefill_progress *p) { + memset(ka, 0, sizeof(*ka)); + ka->progress = p; + if (!p || !p->stream || p->fd < 0) return; + pthread_mutex_init(&ka->mu, NULL); + pthread_cond_init(&ka->cv, NULL); + if (pthread_create(&ka->thread, NULL, + server_prefill_keepalive_thread, ka) != 0) { + pthread_mutex_destroy(&ka->mu); + pthread_cond_destroy(&ka->cv); + return; + } + ka->active = true; + p->keepalive = ka; +} + +static void server_prefill_keepalive_detach(server_prefill_keepalive *ka) { + if (!ka->active) return; + pthread_mutex_lock(&ka->mu); + ka->stop = true; + pthread_cond_signal(&ka->cv); + pthread_mutex_unlock(&ka->mu); + pthread_join(ka->thread, NULL); + pthread_mutex_destroy(&ka->mu); + pthread_cond_destroy(&ka->cv); + if (ka->progress && ka->progress->keepalive == ka) { + ka->progress->keepalive = NULL; + } + ka->active = false; +} + static void request_ctx_span(char *buf, size_t len, int cached, int prompt) { int suffix = prompt - cached; if (suffix < 0) suffix = 0; @@ -9354,23 +9442,30 @@ static void server_progress_cb(void *ud, const char *event, int current, int tot * comment line (`:` prefix, ignored by SSE clients) every few seconds. * Best-effort: if the client has already gone away, the writes fail * silently and the outer code will discover the closed socket the next - * time it tries to stream a real event. */ - if (p->stream && p->fd >= 0 && !p->stream_failed) { - if (!p->headers_sent) { - p->headers_sent = true; - if (sse_headers(p->fd, p->enable_cors)) { - p->last_keepalive = now; - } else { - p->stream_failed = true; - } - } else if (now - p->last_keepalive >= 5.0) { - static const char ka[] = ": prefill\n\n"; - if (send_all(p->fd, ka, sizeof(ka) - 1)) { - p->last_keepalive = now; - } else { - p->stream_failed = true; + * time it tries to stream a real event. The wall-clock keepalive thread + * (when attached) races with this callback on the same shared state, so + * the critical section is serialized through the keepalive mutex. */ + if (p->stream && p->fd >= 0) { + server_prefill_keepalive *ka = p->keepalive; + if (ka) pthread_mutex_lock(&ka->mu); + if (!p->stream_failed) { + if (!p->headers_sent) { + p->headers_sent = true; + if (sse_headers(p->fd, p->enable_cors)) { + p->last_keepalive = now; + } else { + p->stream_failed = true; + } + } else if (now - p->last_keepalive >= 5.0) { + static const char ka_msg[] = ": prefill\n\n"; + if (send_all(p->fd, ka_msg, sizeof(ka_msg) - 1)) { + p->last_keepalive = now; + } else { + p->stream_failed = true; + } } } + if (ka) pthread_mutex_unlock(&ka->mu); } double elapsed = now - p->t0; if (p->seen && current == p->last_current) { @@ -9639,9 +9734,12 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct .headers_sent = true, }; snprintf(rebuild_progress.ctx, sizeof(rebuild_progress.ctx), "%s", rebuild_ctx); + server_prefill_keepalive rebuild_ka; + server_prefill_keepalive_attach(&rebuild_ka, &rebuild_progress); ds4_session_set_progress(s->session, server_progress_cb, &rebuild_progress); if (ds4_session_sync(s->session, sync_prompt, sync_err, sizeof(sync_err)) == 0) { ds4_session_set_progress(s->session, NULL, NULL); + server_prefill_keepalive_detach(&rebuild_ka); const double rebuild_sec = now_sec() - rebuild_t0; if (loaded > 0) { server_log(DS4_LOG_KVCACHE, @@ -9660,6 +9758,7 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct } } else { ds4_session_set_progress(s->session, NULL, NULL); + server_prefill_keepalive_detach(&rebuild_ka); server_log(DS4_LOG_KVCACHE, "ds4-server: tool checkpoint rebuild failed ctx=%s request_ctx=%s source=%s cached=%d replay=%d target=%d error=\"%s\"", rebuild_ctx, ctx, source, loaded, replay_tokens, @@ -9906,6 +10005,8 @@ static void generate_job(server *s, job *j) { ctx_span, req_flags[0] ? " " : "", req_flags); + server_prefill_keepalive keepalive; + server_prefill_keepalive_attach(&keepalive, &progress); ds4_session_set_progress(s->session, server_progress_cb, &progress); int cold_store_len = 0; @@ -9943,6 +10044,7 @@ static void generate_job(server *s, job *j) { ds4_tokens_free(&prefix); ds4_tokens_free(&effective_prompt); ds4_session_set_progress(s->session, NULL, NULL); + server_prefill_keepalive_detach(&keepalive); kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last, cold_store_len); trace_event(s, trace_id, "prefill failed: %s", err); @@ -9963,6 +10065,7 @@ static void generate_job(server *s, job *j) { if (ds4_session_sync(s->session, prompt_for_sync, err, sizeof(err)) != 0) { ds4_tokens_free(&effective_prompt); ds4_session_set_progress(s->session, NULL, NULL); + server_prefill_keepalive_detach(&keepalive); kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last, cold_store_len); trace_event(s, trace_id, "prefill failed: %s", err); @@ -9975,6 +10078,7 @@ static void generate_job(server *s, job *j) { if (!anthropic_live_continuation) anthropic_live_clear(s); if (!thinking_live_continuation) thinking_live_clear(s); ds4_session_set_progress(s->session, NULL, NULL); + server_prefill_keepalive_detach(&keepalive); kv_cache_maybe_store_continued(s); server_log(DS4_LOG_PREFILL, "ds4-server: %s ctx=%s%s%s prompt done %.3fs", @@ -11755,6 +11859,80 @@ static void test_cors_sse_headers(void) { close(sv[1]); } +/* Without a wall-clock keepalive thread, a prefill stall that lasts longer + * than the client's idle timeout drops the socket because the progress + * callback never fires during the stall. The thread must emit + * `: prefill\n\n` even when nothing calls back. */ +static void test_prefill_keepalive_thread_emits_without_progress_callback(void) { + int sv[2]; + TEST_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + if (sv[0] < 0 || sv[1] < 0) return; + + server_prefill_progress progress = { + .fd = sv[0], + .stream = true, + .enable_cors = false, + .headers_sent = true, + /* Force the very first tick to clear the 5s guard. */ + .last_keepalive = now_sec() - 10.0, + }; + server_prefill_keepalive ka; + server_prefill_keepalive_attach(&ka, &progress); + TEST_ASSERT(ka.active); + + /* The thread waits up to one second between ticks; sleep long enough + * for at least one tick to fire without invoking server_progress_cb. */ + struct timespec ts = {.tv_sec = 1, .tv_nsec = 500 * 1000 * 1000L}; + nanosleep(&ts, NULL); + + server_prefill_keepalive_detach(&ka); + shutdown(sv[0], SHUT_WR); + char *out = read_socket_text(sv[1]); + TEST_ASSERT(out != NULL && strstr(out, ": prefill\n\n") != NULL); + TEST_ASSERT(!progress.stream_failed); + + free(out); + close(sv[0]); + close(sv[1]); +} + +/* When the progress callback never fires (stall before the first chunk), + * the thread is still responsible for sending SSE headers so the client + * sees a live response. */ +static void test_prefill_keepalive_thread_sends_headers_when_callback_silent(void) { + int sv[2]; + TEST_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + if (sv[0] < 0 || sv[1] < 0) return; + + server_prefill_progress progress = { + .fd = sv[0], + .stream = true, + .enable_cors = true, + .headers_sent = false, + .last_keepalive = 0.0, + }; + server_prefill_keepalive ka; + server_prefill_keepalive_attach(&ka, &progress); + TEST_ASSERT(ka.active); + + struct timespec ts = {.tv_sec = 1, .tv_nsec = 500 * 1000 * 1000L}; + nanosleep(&ts, NULL); + + server_prefill_keepalive_detach(&ka); + shutdown(sv[0], SHUT_WR); + char *out = read_socket_text(sv[1]); + TEST_ASSERT(out != NULL); + TEST_ASSERT(strstr(out, "HTTP/1.1 200 OK") != NULL); + TEST_ASSERT(strstr(out, "Content-Type: text/event-stream") != NULL); + TEST_ASSERT(strstr(out, "Access-Control-Allow-Origin: *") != NULL); + TEST_ASSERT(progress.headers_sent); + TEST_ASSERT(!progress.stream_failed); + + free(out); + close(sv[0]); + close(sv[1]); +} + static void test_anthropic_live_stream_sends_incremental_blocks(void) { int sv[2]; TEST_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); @@ -14715,6 +14893,8 @@ static void ds4_server_unit_tests_run(void) { test_cors_headers_are_opt_in(); test_cors_preflight_response_is_no_content(); test_cors_sse_headers(); + test_prefill_keepalive_thread_emits_without_progress_callback(); + test_prefill_keepalive_thread_sends_headers_when_callback_silent(); test_anthropic_live_stream_sends_incremental_blocks(); test_anthropic_usage_reports_cache_details(); test_anthropic_tool_stream_sends_live_tool_use();