Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 2025-04-12 - TaskGroup Sliding Window Concurrency Optimization
**Learning:** When using Swift's `withTaskGroup` for high-volume concurrent processing (e.g., scanning hundreds of PIDs), grouping tasks into static chunks and waiting for the chunk to finish causes tail latency. The entire chunk waits for the slowest task to complete, temporarily dropping concurrency to 1.
**Action:** Always use a sliding window approach with an iterator (e.g., `makeIterator()`). Seed the group up to the `maxConcurrency` limit, and within the `for await result in group` loop, immediately add a new task using `iterator.next()`. This ensures the worker pool stays consistently saturated at max concurrency.
40 changes: 24 additions & 16 deletions Sources/Cacheout/Memory/ProcessMemoryScanner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,29 +97,37 @@ actor ProcessMemoryScanner {
///
/// Returns the collected entries and the count of EPERM failures.
private func scanPIDs(_ pids: [pid_t]) async -> (entries: [ProcessEntryDTO], epermCount: Int) {
// Chunk PIDs to cap concurrency at maxConcurrency.
let chunks = stride(from: 0, to: pids.count, by: maxConcurrency).map {
Array(pids[$0..<min($0 + maxConcurrency, pids.count)])
}

var allEntries: [ProcessEntryDTO] = []
var totalEperm = 0

for chunk in chunks {
await withTaskGroup(of: ScanPIDResult.self) { group in
for pid in chunk {
// Use a sliding window iterator to cap concurrency at maxConcurrency
// while avoiding tail latency from static chunking.
await withTaskGroup(of: ScanPIDResult.self) { group in
var pidIterator = pids.makeIterator()

// Seed the group with up to maxConcurrency tasks
for _ in 0..<maxConcurrency {
if let pid = pidIterator.next() {
group.addTask { [self] in
self.scanSinglePID(pid)
}
}
for await result in group {
switch result {
case .success(let entry):
allEntries.append(entry)
case .eperm:
totalEperm += 1
case .otherError:
break
}

// As each task completes, process the result and add a new task if available
for await result in group {
switch result {
case .success(let entry):
allEntries.append(entry)
case .eperm:
totalEperm += 1
case .otherError:
break
}

if let nextPid = pidIterator.next() {
group.addTask { [self] in
self.scanSinglePID(nextPid)
}
}
}
Expand Down