From 95bd6900755ab0d2419ad846daaca6bcf5616628 Mon Sep 17 00:00:00 2001 From: Rintaro Ishizaki Date: Thu, 28 May 2026 02:08:14 -0700 Subject: [PATCH 1/2] withTaskPriorityChangedHandler: delegate to stdlib on macOS 26+ On platforms with SwiftStdlib 6.2 (macOS/iOS/macCatalyst 26+), delegate to withTaskPriorityEscalationHandler. The new API is event-driven (no polling) and integrated with the runtime, so it reacts immediately to escalations. Mark the wrapper @available(..., deprecated: 26.0). Once the min deployment target reaches 26, the deprecation warning fires at every call site, prompting migration to withTaskPriorityEscalationHandler and removal of this wrapper. Change the function from `throws` to `rethrows` following the stdlib version. Also, use `RefBox>` for the previous priority storage. Atomic is lighter, and actually simplifies the code. The old codepath is extracted as a function and kept tested even on platforms with built-in withTaskPriorityEscalationHandler. --- .../Task+WithPriorityChangedHandler.swift | 71 ++++++++++++------- .../AsyncUtilsTests.swift | 56 ++++++++++++++- 2 files changed, 99 insertions(+), 28 deletions(-) diff --git a/Sources/ToolsProtocolsSwiftExtensions/Task+WithPriorityChangedHandler.swift b/Sources/ToolsProtocolsSwiftExtensions/Task+WithPriorityChangedHandler.swift index ad974a0d4..74a8ca196 100644 --- a/Sources/ToolsProtocolsSwiftExtensions/Task+WithPriorityChangedHandler.swift +++ b/Sources/ToolsProtocolsSwiftExtensions/Task+WithPriorityChangedHandler.swift @@ -10,22 +10,50 @@ // //===----------------------------------------------------------------------===// +import Synchronization + /// Runs `operation`. If the task's priority changes while the operation is running, calls `taskPriorityChanged`. /// -/// Since Swift Concurrency doesn't support direct observation of a task's priority, this polls the task's priority at -/// `pollingInterval`. -/// The function assumes that the original priority of the task is `initialPriority`. If the task priority changed -/// compared to `initialPriority`, the `taskPriorityChanged` will be called. -// Workaround formatter issue: https://github.com/swiftlang/swift-format/issues/1081 -// swift-format-ignore -@_spi(SourceKitLSP) public func withTaskPriorityChangedHandler( +/// On platforms with the runtime-provided priority escalation hook (SwiftStdlib 6.2+), this delegates +/// to `withTaskPriorityEscalationHandler` and reacts immediately. Otherwise it polls +/// `Task.currentPriority` every `pollingInterval` and assumes the original priority is `initialPriority`; +/// the `taskPriorityChanged` callback fires when the polled priority differs. +@available(macOS, deprecated: 26.0, message: "Use withTaskPriorityEscalationHandler") +@available(iOS, deprecated: 26.0, message: "Use withTaskPriorityEscalationHandler") +@available(macCatalyst, deprecated: 26.0, message: "Use withTaskPriorityEscalationHandler") +@_spi(SourceKitLSP) @inlinable public func withTaskPriorityChangedHandler( initialPriority: TaskPriority = Task.currentPriority, pollingInterval: Duration = .seconds(0.1), @_inheritActorContext operation: nonisolated(nonsending) @escaping @Sendable () async throws -> T, taskPriorityChanged: @escaping @Sendable () -> Void -) async throws -> T { - let lastPriority = ThreadSafeBox(initialValue: initialPriority) - let result: T? = try await withThrowingTaskGroup(of: Optional.self) { taskGroup in +) async rethrows -> T { + if #available(macOS 26, iOS 26, macCatalyst 26, *) { + return try await withTaskPriorityEscalationHandler( + operation: operation, + onPriorityEscalated: { _, _ in taskPriorityChanged() } + ) + } else { + return try await withTaskPriorityChangedHandlerLegacy( + initialPriority: initialPriority, + pollingInterval: pollingInterval, + operation: operation, + taskPriorityChanged: taskPriorityChanged + ) + } +} + +/// Polling-based fallback for ``withTaskPriorityChangedHandler`` on platforms without +/// `withTaskPriorityEscalationHandler`. Exposed under `@_spi(Testing)` so tests can +/// exercise this path even on platforms where the inlinable wrapper would dispatch to +/// the stdlib hook. +@_spi(Testing) public func withTaskPriorityChangedHandlerLegacy( + initialPriority: TaskPriority, + pollingInterval: Duration, + @_inheritActorContext operation: nonisolated(nonsending) @escaping @Sendable () async throws -> T, + taskPriorityChanged: @escaping @Sendable () -> Void +) async rethrows -> T { + let lastPriority = RefBox(Atomic(initialPriority.rawValue)) + return try await withThrowingTaskGroup(of: Optional.self) { taskGroup in defer { // We leave this closure when either we have received a result or we registered cancellation. In either case, we // want to make sure that we don't leave the body task or the priority watching task running. @@ -36,15 +64,8 @@ if Task.isCancelled { break } - let newPriority = Task.currentPriority - let didChange = lastPriority.withLock { lastPriority in - if newPriority != lastPriority { - lastPriority = newPriority - return true - } - return false - } - if didChange { + let newPriority = Task.currentPriority.rawValue + if newPriority != lastPriority.value.exchange(newPriority, ordering: .relaxed) { taskPriorityChanged() } do { @@ -58,16 +79,12 @@ taskGroup.addTask { try await operation() } - // The first task that watches the priority never finishes unless it is cancelled, so we are effectively await the - // `operation` task here. - // We do need to await the observation task as well so that priority escalation also affects the observation task. + // The watcher loops forever until cancelled, so iterating the group effectively awaits + // `operation`. The watcher is structured into the same task group so it inherits the + // parent's priority and is automatically escalated alongside `operation`. for try await case let value? in taskGroup { return value } - return nil - } - guard let result else { - throw CancellationError() + preconditionFailure("Task group exits only via operation's value or throw") } - return result } diff --git a/Tests/ToolsProtocolsSwiftExtensionsTests/AsyncUtilsTests.swift b/Tests/ToolsProtocolsSwiftExtensionsTests/AsyncUtilsTests.swift index 5443151fc..2c68ec335 100644 --- a/Tests/ToolsProtocolsSwiftExtensionsTests/AsyncUtilsTests.swift +++ b/Tests/ToolsProtocolsSwiftExtensionsTests/AsyncUtilsTests.swift @@ -11,7 +11,7 @@ //===----------------------------------------------------------------------===// @_spi(SourceKitLSP) import SKLogging -@_spi(SourceKitLSP) import ToolsProtocolsSwiftExtensions +@_spi(SourceKitLSP) @_spi(Testing) import ToolsProtocolsSwiftExtensions import ToolsProtocolsTestSupport import XCTest @@ -73,4 +73,58 @@ final class AsyncUtilsTests: XCTestCase { try await task.value }.value } + + func testWithTaskPriorityChangedHandlerLegacyReturnsOptionalNilFromOperation() async throws { + // When the operation's `T` is itself an `Optional`, verify `nil` return + // value is propagated as the operation's result. + let result: String? = try await withTaskPriorityChangedHandlerLegacy( + initialPriority: Task.currentPriority, + pollingInterval: .seconds(0.1), + operation: { + let value: String? = nil + return value + }, + taskPriorityChanged: {} + ) + XCTAssertNil(result) + } + + func testWithTaskPriorityChangedHandlerLegacyDetectsPriorityEscalation() async throws { + let started = self.expectation(description: "Operation started") + let callbackFired = ThreadSafeBox(initialValue: false) + let task = Task(priority: .background) { + try await withTaskPriorityChangedHandlerLegacy( + initialPriority: .background, + pollingInterval: .seconds(0.05), + operation: { + started.fulfill() + try await repeatUntilExpectedResult(sleepInterval: .seconds(0.1)) { + return callbackFired.value + } + }, + taskPriorityChanged: { + callbackFired.withLock { $0 = true } + } + ) + } + try await fulfillmentOfOrThrow(started) + try await Task(priority: .high) { + try await task.value + }.value + XCTAssertTrue(callbackFired.value) + } + + func testWithTaskPriorityChangedHandlerLegacyRethrowsError() async throws { + struct TestError: Error {} + await assertThrowsError( + try await withTaskPriorityChangedHandlerLegacy( + initialPriority: Task.currentPriority, + pollingInterval: .seconds(0.1), + operation: { throw TestError() }, + taskPriorityChanged: {} + ) + ) { error in + XCTAssert(error is TestError, "Received unexpected error \(error)") + } + } } From f8a0aa37b55555a1f48ab320841d208b974a5803 Mon Sep 17 00:00:00 2001 From: Rintaro Ishizaki Date: Tue, 26 May 2026 10:45:36 -0700 Subject: [PATCH 2/2] Add WithTimeoutResult; unify withTimeout family MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce `withTimeoutResult(_:body:resultReceivedAfterTimeout:)` as a unified core of the `withTimeout` family. The existing `withTimeout` overloads become thin `@inlinable` switches over `WithTimeoutResult`, forwarding errors via `rethrows`. `withTimeoutResult` supports an optional late-result callback. Late errors after the timeout are dropped silently because the callback signature accepts only the success value, so errors have nowhere to go. Implementation note: `bodyTask` itself yields its outcome onto the internal `AsyncStream` instead of a separate forwarder task. Adding a forwarder that awaits `bodyTask.result` proved unreliable — priority escalation through the await chain didn't always reach `bodyTask`, leaving `Task.currentPriority` inside `body` stuck at the original priority. --- .../AsyncUtils.swift | 184 +++++++++--------- 1 file changed, 90 insertions(+), 94 deletions(-) diff --git a/Sources/ToolsProtocolsSwiftExtensions/AsyncUtils.swift b/Sources/ToolsProtocolsSwiftExtensions/AsyncUtils.swift index a4fd29c90..8011f0972 100644 --- a/Sources/ToolsProtocolsSwiftExtensions/AsyncUtils.swift +++ b/Sources/ToolsProtocolsSwiftExtensions/AsyncUtils.swift @@ -204,127 +204,123 @@ extension Collection where Self: Sendable, Element: Sendable { } } -/// Executes `body`. If it doesn't finish after `duration`, throws a `TimeoutError` and cancels `body`. +@_spi(SourceKitLSP) +public enum WithTimeoutResult: Sendable { + case result(T) + case timedOut +} + +/// Executes `body` with a `duration` timeout. /// -/// `TimeoutError` is thrown immediately an the function does not wait for `body` to honor the cancellation. +/// Returns `.result(value)` if `body` finishes within `duration`, otherwise `.timedOut`. /// -/// If a `handle` is passed in and this `withTimeout` call times out, the thrown `TimeoutError` contains this handle. -/// This way a caller can identify whether this call to `withTimeout` timed out or if a nested call timed out. -package func withTimeout( - _ duration: Duration, - handle: TimeoutHandle? = nil, - _ body: @escaping @Sendable () async throws -> T -) async throws -> T { - // Get the priority with which to launch the body task here so that we can pass the same priority as the initial - // priority to `withTaskPriorityChangedHandler`. Otherwise, we can get into a race condition where bodyTask gets - // launched with a low priority, then the priority gets elevated before we call with `withTaskPriorityChangedHandler`, - // we thus don't receive a `taskPriorityChanged` and hence never increase the priority of `bodyTask`. +/// On timeout: if `resultReceivedAfterTimeout` is provided, `body` keeps running and its +/// eventual result is passed to that callback. Otherwise, `body` is cancelled. +@_spi(SourceKitLSP) +public func withTimeoutResult( + _ timeout: Duration, + body: @escaping @Sendable () async throws -> T, + resultReceivedAfterTimeout: (@Sendable (_ result: T) async -> Void)? = nil +) async rethrows -> WithTimeoutResult { + // Capture the priority here so it stays consistent across `bodyTask`, timeoutTask`, + // and `withTaskPriorityChangedHandler`'s initial state. let priority = Task.currentPriority - var mutableTasks: [Task] = [] - let stream = AsyncThrowingStream { continuation in - let bodyTask = Task(priority: priority) { - do { - let result = try await body() - continuation.yield(result) - } catch { - continuation.yield(with: .failure(error)) - } - } - let timeoutTask = Task(priority: priority) { - try await Task.sleep(for: duration) - continuation.yield(with: .failure(TimeoutError(handle: handle))) - bodyTask.cancel() + let (stream, continuation) = AsyncStream>>.makeStream() + let bodyTask = Task(priority: priority) { + do { + let value = try await body() + continuation.yield(.result(.success(value))) + return value + } catch { + continuation.yield(.result(.failure(error))) + throw error } - mutableTasks = [bodyTask, timeoutTask] } - - let tasks = mutableTasks - - defer { - // Be extra careful and ensure that we don't leave `bodyTask` or `timeoutTask` running when `withTimeout` finishes, - // eg. if `withTaskPriorityChangedHandler` adds some behavior that never executes `body` if the task gets cancelled. - for task in tasks { - task.cancel() - } + let timeoutTask = Task(priority: priority) { + do { try await Task.sleep(for: timeout) } catch { return } + continuation.yield(.timedOut) } + // `bodyTask` is intentionally not cancelled here: it must keep running so the late-result + // dispatcher can deliver its value. Cancellation happens at the specific sites that own that + // decision. + defer { timeoutTask.cancel() } + return try await withTaskPriorityChangedHandler(initialPriority: priority) { - for try await value in stream { - return value - } - // The only reason for the loop above to terminate is if the Task got cancelled or if the stream finishes - // (which it never does). - if Task.isCancelled { - // Throwing a `CancellationError` will make us return from `withTimeout`. We will cancel the `bodyTask` from the - // `defer` method above. - throw CancellationError() - } else { - preconditionFailure("Continuation never finishes") - } - } taskPriorityChanged: { - for task in tasks { - Task(priority: Task.currentPriority) { - _ = try? await task.value + for await value in stream { + switch value { + case .result(let r): + return try .result(r.get()) + case .timedOut: + if let resultReceivedAfterTimeout { + // Late-result dispatch: await body and deliver via callback. + Task { try? await resultReceivedAfterTimeout(bodyTask.value) } + } else { + bodyTask.cancel() + } + return .timedOut } } + // The for-await exits without a return only if the consuming task is cancelled. + guard Task.isCancelled else { preconditionFailure("Continuation never finishes") } + + bodyTask.cancel() + throw CancellationError() + } taskPriorityChanged: { + // Spawning fresh tasks that await `bodyTask` and `timeoutTask` forces the runtime to + // escalate their priorities via the await chain so `body`'s `Task.currentPriority` + // reflects the elevated value. + let newPriority = Task.currentPriority + Task(priority: newPriority) { _ = await bodyTask.result } + Task(priority: newPriority) { _ = await timeoutTask.value } + } +} + +/// Executes `body`. If it doesn't finish after `duration`, throws a `TimeoutError` and cancels `body`. +/// +/// `TimeoutError` is thrown immediately; the function does not wait for `body` to honor the cancellation. +/// +/// If a `handle` is passed in and this `withTimeout` call times out, the thrown `TimeoutError` contains this handle. +/// This way a caller can identify whether this call to `withTimeout` timed out or if a nested call timed out. +@_spi(SourceKitLSP) @inlinable +public func withTimeout( + _ duration: Duration, + handle: TimeoutHandle? = nil, + _ body: @escaping @Sendable () async throws -> T +) async throws -> T { + switch try await withTimeoutResult(duration, body: body) { + case .result(let value): return value + case .timedOut: throw TimeoutError(handle: handle) } } /// Executes `body`. If it doesn't finish after `duration`, return `nil` and continue running body. When `body` returns -/// a value after the timeout, `resultReceivedAfterTimeout` is called. +/// a value or throws an error after the timeout, `resultReceivedAfterTimeout` is called with the outcome. /// /// - Important: `body` will not be cancelled when the timeout is received. Use the other overload of `withTimeout` if /// `body` should be cancelled after `timeout`. -package func withTimeout( +@_spi(SourceKitLSP) @inlinable +public func withTimeout( _ timeout: Duration, body: @escaping @Sendable () async throws -> T, resultReceivedAfterTimeout: @escaping @Sendable (_ result: T) async -> Void -) async throws -> T? { - let didHitTimeout = ThreadSafeBox(initialValue: false) - - let stream = AsyncThrowingStream { continuation in - Task { - try await Task.sleep(for: timeout) - didHitTimeout.withLock { $0 = true } - continuation.yield(nil) - } - - Task { - do { - let result = try await body() - if didHitTimeout.value { - await resultReceivedAfterTimeout(result) - } - continuation.yield(result) - } catch { - continuation.yield(with: .failure(error)) - } - } - } - - for try await value in stream { - return value - } - // The only reason for the loop above to terminate is if the Task got cancelled or if the continuation finishes - // (which it never does). - if Task.isCancelled { - throw CancellationError() - } else { - preconditionFailure("Continuation never finishes") +) async rethrows -> T? { + switch try await withTimeoutResult(timeout, body: body, resultReceivedAfterTimeout: resultReceivedAfterTimeout) { + case .result(let value): return value + case .timedOut: return nil } } /// Same as `withTimeout` above but allows `body` to return an optional value. -package func withTimeout( +@_spi(SourceKitLSP) @inlinable +public func withTimeout( _ timeout: Duration, body: @escaping @Sendable () async throws -> T?, resultReceivedAfterTimeout: @escaping @Sendable (_ result: T?) async -> Void -) async throws -> T? { - let result: T?? = try await withTimeout(timeout, body: body, resultReceivedAfterTimeout: resultReceivedAfterTimeout) - switch result { - case .none: return nil - case .some(.none): return nil - case .some(.some(let value)): return value +) async rethrows -> T? { + switch try await withTimeoutResult(timeout, body: body, resultReceivedAfterTimeout: resultReceivedAfterTimeout) { + case .result(let value): return value + case .timedOut: return nil } }