Refactor AsyncQueue: replace-on-insert + symmetric async/asyncThrowing#54
Conversation
ee0e571 to
264441a
Compare
| let dependencies: [RegisteredTask] = candidates.reduce(into: []) { result, bucket in | ||
| let (candidateMetadata, tasks) = bucket | ||
| guard candidateMetadata.isDependency(of: metadata) else { return } | ||
| result.append(contentsOf: tasks) |
There was a problem hiding this comment.
Doesn't really matter, but wouldn't flatMap make more sense here?
| /// build the Task using those, and registers it as a dependency candidate. | ||
| private func withDependencies<T: AnyTask>( | ||
| for metadata: TaskMetadata, | ||
| _ makeTask: (UUID, [RegisteredTask]) -> T |
There was a problem hiding this comment.
| _ makeTask: (UUID, [RegisteredTask]) -> T | |
| _ makeTask: (_ newTaskId: UUID, _ dependencies: [RegisteredTask]) -> T |
| return try await operation() | ||
| /// Atomically computes the dependencies for a new task with `metadata`, lets `makeTask` | ||
| /// build the Task using those, and registers it as a dependency candidate. | ||
| private func withDependencies<T: AnyTask>( |
There was a problem hiding this comment.
withDependencies does not sound like it has a side effect to me but it does (it registered the new task). I think something like registerTask would be a better name.
| for dependency in dependencies { | ||
| await dependency.task.waitForCompletion() | ||
| } |
There was a problem hiding this comment.
This should happen concurrently so that the priority of all dependencies get elevated at one if a high-priority task is enqueued instead of bumping them one-by-one.
There was a problem hiding this comment.
Nice find. Updated to use concurrentForEach. Also updated concurrentForEach to use withDiscardingTaskGroup, because we don't care the result of each sub task.
6cd21be to
955e579
Compare
1. Replace-on-insert for self-serializing metadata. When a task's metadata is self-serializing (e.g. Serial), only the latest task is stored - earlier in-flight tasks are removed on insert because the new task transitively depends on them, so future schedulings only need to wait on the latest. The "wait only on last entry of bucket" special case moves from the read path to the write path; this simplifies the dependency building. Renames reflect the new semantics: pendingTasks -> dependencyCandidates (the dict holds tasks visible to future schedulings, not all in-flight tasks), PendingTask -> RegisteredTask. Also drop PendingTask's unused generic parameter and make it private. 2. Make async() and asyncThrowing() symmetric. Extract registerTask(for:_:), which atomically computes dependencies and registers a caller-built Task in a single lock scope. Generic over T: AnyTask, so it returns the same Task type the body produces — no throwing/non-throwing overloads. Extract runRegistered, which waits for dependencies, runs the operation, and removes the task on exit. rethrows propagates throwing-ness, so async and asyncThrowing now differ only in `try`. No more Task<_, Never> wrapping Task<_, any Error>.
concurrentForEach never iterates child results, so the queue of Void results in withTaskGroup<Void> is pure overhead. Switch to withDiscardingTaskGroup, which drops child results immediately and is the idiomatic choice for fire-and-forget concurrent work.
955e579 to
70befb3
Compare
Refactor
AsyncQueueand also some doc-comment improvements and test case additions.No observable behavior change intended.
Replace-on-insert for self-serializing metadata
Simplifies overall dependency tracking: if a metadata is self-serializing, a new task only needs to depend on the last entry (it transitively covers earlier ones), so we only need to remember the last one. Replace rather than append on insert, and the read path becomes a single
reduce(into:)collecting tasks from buckets whose metadata is a dependency.Renames to reflect the new semantics:
pendingTasks→dependencyCandidates(no longer holds all in-flight tasks; only those visible to future schedulings)PendingTask→RegisteredTask(also dropped its generic parameter and made itprivate)Symmetric
async()andasyncThrowing()Previously the non-throwing
asyncwas implemented by wrappingasyncThrowing'sTask<_, any Error>in a secondTask<_, Never>with apreconditionFailurein the catch. Two Tasks per call, plus a "the only throwing call here must beoperation" invariant the compiler couldn't enforce.Now both share the same scaffolding via two helpers:
withDependencies(for:_:): atomically computes dependencies and registers a caller-built Task in a single lock scope. Generic overT: AnyTask, so it returns the same Task type the body produces.runRegistered: waits for dependencies, runs the operation, removes the task on exit. Usesrethrowsto propagate throwing-ness from the operation closure.Now
asyncandasyncThrowingnow have parallel structure that differs only intry. Therethrowscontract enforces the no-other-throws invariant for free.