Skip to content

Refactor AsyncQueue: replace-on-insert + symmetric async/asyncThrowing#54

Merged
rintaro merged 2 commits into
swiftlang:mainfrom
rintaro:asyncqueue-refactor
May 28, 2026
Merged

Refactor AsyncQueue: replace-on-insert + symmetric async/asyncThrowing#54
rintaro merged 2 commits into
swiftlang:mainfrom
rintaro:asyncqueue-refactor

Conversation

@rintaro
Copy link
Copy Markdown
Member

@rintaro rintaro commented May 22, 2026

Refactor AsyncQueue and also some doc-comment improvements and test case additions.
No observable behavior change intended.

Replace-on-insert for self-serializing metadata

Simplifies overall dependency tracking: if a metadata is self-serializing, a new task only needs to depend on the last entry (it transitively covers earlier ones), so we only need to remember the last one. Replace rather than append on insert, and the read path becomes a single reduce(into:) collecting tasks from buckets whose metadata is a dependency.

Renames to reflect the new semantics:

  • pendingTasksdependencyCandidates (no longer holds all in-flight tasks; only those visible to future schedulings)
  • PendingTaskRegisteredTask (also dropped its generic parameter and made it private)

Symmetric async() and asyncThrowing()

Previously the non-throwing async was implemented by wrapping asyncThrowing's Task<_, any Error> in a second Task<_, Never> with a preconditionFailure in the catch. Two Tasks per call, plus a "the only throwing call here must be operation" invariant the compiler couldn't enforce.

Now both share the same scaffolding via two helpers:

  • withDependencies(for:_:): atomically computes dependencies and registers a caller-built Task in a single lock scope. Generic over T: AnyTask, so it returns the same Task type the body produces.
  • runRegistered: waits for dependencies, runs the operation, removes the task on exit. Uses rethrows to propagate throwing-ness from the operation closure.

Now async and asyncThrowing now have parallel structure that differs only in try. The rethrows contract enforces the no-other-throws invariant for free.

@rintaro rintaro force-pushed the asyncqueue-refactor branch 4 times, most recently from ee0e571 to 264441a Compare May 22, 2026 18:56
Copy link
Copy Markdown

@hamishknight hamishknight left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cleanup!

Comment on lines +118 to +121
let dependencies: [RegisteredTask] = candidates.reduce(into: []) { result, bucket in
let (candidateMetadata, tasks) = bucket
guard candidateMetadata.isDependency(of: metadata) else { return }
result.append(contentsOf: tasks)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't really matter, but wouldn't flatMap make more sense here?

/// build the Task using those, and registers it as a dependency candidate.
private func withDependencies<T: AnyTask>(
for metadata: TaskMetadata,
_ makeTask: (UUID, [RegisteredTask]) -> T
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_ makeTask: (UUID, [RegisteredTask]) -> T
_ makeTask: (_ newTaskId: UUID, _ dependencies: [RegisteredTask]) -> T

return try await operation()
/// Atomically computes the dependencies for a new task with `metadata`, lets `makeTask`
/// build the Task using those, and registers it as a dependency candidate.
private func withDependencies<T: AnyTask>(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withDependencies does not sound like it has a side effect to me but it does (it registered the new task). I think something like registerTask would be a better name.

Comment on lines +152 to +154
for dependency in dependencies {
await dependency.task.waitForCompletion()
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should happen concurrently so that the priority of all dependencies get elevated at one if a high-priority task is enqueued instead of bumping them one-by-one.

Copy link
Copy Markdown
Member Author

@rintaro rintaro May 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find. Updated to use concurrentForEach. Also updated concurrentForEach to use withDiscardingTaskGroup, because we don't care the result of each sub task.

@rintaro rintaro force-pushed the asyncqueue-refactor branch 3 times, most recently from 6cd21be to 955e579 Compare May 26, 2026 18:49
rintaro added 2 commits May 27, 2026 15:16
1. Replace-on-insert for self-serializing metadata.
   When a task's metadata is self-serializing (e.g. Serial), only
   the latest task is stored - earlier in-flight tasks are removed
   on insert because the new task transitively depends on them, so
   future schedulings only need to wait on the latest. The "wait
   only on last entry of bucket" special case moves from the read
   path to the write path; this simplifies the dependency building.

   Renames reflect the new semantics: pendingTasks ->
   dependencyCandidates (the dict holds tasks visible to future
   schedulings, not all in-flight tasks), PendingTask ->
   RegisteredTask. Also drop PendingTask's unused generic parameter
   and make it private.

2. Make async() and asyncThrowing() symmetric.
   Extract registerTask(for:_:), which atomically computes
   dependencies and registers a caller-built Task in a single lock
   scope. Generic over T: AnyTask, so it returns the same Task
   type the body produces — no throwing/non-throwing overloads.

   Extract runRegistered, which waits for dependencies, runs the
   operation, and removes the task on exit. rethrows propagates
   throwing-ness, so async and asyncThrowing now differ only in
   `try`. No more Task<_, Never> wrapping Task<_, any Error>.
concurrentForEach never iterates child results, so the queue of
Void results in withTaskGroup<Void> is pure overhead. Switch to
withDiscardingTaskGroup, which drops child results immediately
and is the idiomatic choice for fire-and-forget concurrent work.
@rintaro rintaro force-pushed the asyncqueue-refactor branch from 955e579 to 70befb3 Compare May 27, 2026 22:16
@rintaro rintaro merged commit 7dcfd72 into swiftlang:main May 28, 2026
209 of 212 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants