From 2d57575bc83ca7f4c121f9f0734bc82b94219983 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Tue, 12 May 2026 15:40:41 +0800 Subject: [PATCH] perf(payload-builder): wake poll on cancellation Poll a `WaitForCancellationFutureOwned` in `BlockPayloadJob::poll` so the service task is woken the moment `PayloadJobCancellation` fires, instead of piggybacking on the next chain event, command, or deadline. Removes the implicit invariant that cancels must run inside the service loop. --- crates/op-rbuilder/src/builder/cancellation.rs | 13 ++++--------- crates/op-rbuilder/src/builder/generator.rs | 18 ++++++++++++------ 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/crates/op-rbuilder/src/builder/cancellation.rs b/crates/op-rbuilder/src/builder/cancellation.rs index 17b52b57c..3c9a165ce 100644 --- a/crates/op-rbuilder/src/builder/cancellation.rs +++ b/crates/op-rbuilder/src/builder/cancellation.rs @@ -54,11 +54,6 @@ impl PayloadJobCancellation { self.cancel_with(CancellationReason::Deadline); } - /// Returns true if any cancellation source has fired. - pub(crate) fn is_cancelled(&self) -> bool { - self.token.is_cancelled() - } - /// Returns true if cancelled with `Resolved` reason. pub(crate) fn is_resolved(&self) -> bool { self.reason() == Some(CancellationReason::Resolved) @@ -115,10 +110,10 @@ mod tests { #[tokio::test] async fn test_cancel_new_fcu() { let cancel = PayloadJobCancellation::new(); - assert!(!cancel.is_cancelled()); + assert!(!cancel.token().is_cancelled()); cancel.cancel_new_fcu(); - assert!(cancel.is_cancelled()); + assert!(cancel.token().is_cancelled()); assert!(cancel.is_new_fcu()); assert!(!cancel.is_resolved()); assert_eq!(cancel.reason(), Some(CancellationReason::NewFcu)); @@ -128,7 +123,7 @@ mod tests { async fn test_cancel_resolved() { let cancel = PayloadJobCancellation::new(); cancel.cancel_resolved(); - assert!(cancel.is_cancelled()); + assert!(cancel.token().is_cancelled()); assert!(cancel.is_resolved()); assert!(!cancel.is_new_fcu()); assert_eq!(cancel.reason(), Some(CancellationReason::Resolved)); @@ -138,7 +133,7 @@ mod tests { async fn test_cancel_deadline() { let cancel = PayloadJobCancellation::new(); cancel.cancel_deadline(); - assert!(cancel.is_cancelled()); + assert!(cancel.token().is_cancelled()); assert!(!cancel.is_new_fcu()); assert!(!cancel.is_resolved()); assert_eq!(cancel.reason(), Some(CancellationReason::Deadline)); diff --git a/crates/op-rbuilder/src/builder/generator.rs b/crates/op-rbuilder/src/builder/generator.rs index faf53c8f5..094284e36 100644 --- a/crates/op-rbuilder/src/builder/generator.rs +++ b/crates/op-rbuilder/src/builder/generator.rs @@ -21,6 +21,7 @@ use tokio::{ sync::watch, time::{Duration, Sleep}, }; +use tokio_util::sync::WaitForCancellationFutureOwned; use tracing::info; use super::cancellation::PayloadJobCancellation; @@ -178,12 +179,15 @@ where let deadline = Box::pin(tokio::time::sleep(deadline)); let config = PayloadConfig::new(Arc::new(parent_header.clone()), attributes); + let cancelled_fut = Box::pin(cancel.token().cancelled_owned()); + let mut job = BlockPayloadJob { executor: self.executor.clone(), builder: self.builder.clone(), config, payload_rx: None, cancel, + cancelled_fut, deadline, cached_reads: self .maybe_pre_cached(parent_header.hash()) @@ -238,6 +242,8 @@ where payload_rx: Option>>, /// Structured cancellation for the running job cancel: PayloadJobCancellation, + /// Future that resolves when `cancel` fires. + cancelled_fut: Pin>, /// Deadline at which the job is forcibly cancelled. deadline: Pin>, /// Caches all disk reads for the state the new payloads builds on. @@ -331,8 +337,8 @@ where return Poll::Ready(Ok(())); } - // If canceled via any source - if this.cancel.is_cancelled() { + // Poll the cancellation future so this task is woken when `PayloadJobCancellation` fires + if this.cancelled_fut.as_mut().poll(cx).is_ready() { tracing::debug!("Job cancelled"); return Poll::Ready(Ok(())); } @@ -458,7 +464,7 @@ mod tests { .expect("resolve should return payload"); assert_eq!(payload, MockPayload(7)); assert!(cancel.is_resolved()); - assert!(cancel.is_cancelled()); + assert!(cancel.token().is_cancelled()); } #[tokio::test] @@ -474,7 +480,7 @@ mod tests { assert_eq!(payload, MockPayload(2)); assert!(cancel.is_resolved()); - assert!(cancel.is_cancelled()); + assert!(cancel.token().is_cancelled()); } #[tokio::test] @@ -501,7 +507,7 @@ mod tests { let handle = tokio::spawn(ResolvePayload::new(Some(rx), cancel.clone())); sleep(Duration::from_millis(20)).await; - assert!(!cancel.is_cancelled()); + assert!(!cancel.token().is_cancelled()); tx.send_replace(Some(MockPayload(9))); let payload = timeout(Duration::from_secs(1), handle) @@ -512,6 +518,6 @@ mod tests { assert_eq!(payload, MockPayload(9)); assert!(cancel.is_resolved()); - assert!(cancel.is_cancelled()); + assert!(cancel.token().is_cancelled()); } }