diff --git a/bleep-bsp/src/scala/bleep/analysis/CompilationCoordinator.scala b/bleep-bsp/src/scala/bleep/analysis/CompilationCoordinator.scala deleted file mode 100644 index 93ca4ac67..000000000 --- a/bleep-bsp/src/scala/bleep/analysis/CompilationCoordinator.scala +++ /dev/null @@ -1,393 +0,0 @@ -package bleep.analysis - -import cats.effect.{Deferred, IO} -import cats.syntax.all.* -import java.nio.file.{Files, Path, StandardOpenOption} -import java.nio.channels.{FileChannel, FileLock as JFileLock} -import java.security.MessageDigest -import java.time.Instant -import scala.concurrent.duration.* -import scala.collection.concurrent.TrieMap - -/** Content-addressed compilation key. - * - * A CompileKey uniquely identifies a compilation based on its inputs: - * - Source file contents (not paths - content matters) - * - Classpath contents - * - Compiler configuration - * - * Same key = same compilation result (deterministic). - */ -case class CompileKey(value: String) - -object CompileKey { - - /** Compute a compile key from inputs */ - def compute( - sources: Map[Path, String], - classpath: Seq[Path], - config: LanguageConfig - ): CompileKey = { - val digest = MessageDigest.getInstance("SHA-256") - - // Hash source contents (sorted by path for determinism) - sources.toSeq.sortBy(_._1.toString).foreach { case (path, content) => - digest.update(path.toString.getBytes("UTF-8")) - digest.update(0.toByte) // separator - digest.update(content.getBytes("UTF-8")) - digest.update(0.toByte) - } - - // Hash classpath (by modification time and size for speed) - classpath.sorted.foreach { cp => - digest.update(cp.toString.getBytes("UTF-8")) - if Files.exists(cp) then { - val mtime = Files.getLastModifiedTime(cp).toMillis - val size = if Files.isRegularFile(cp) then Files.size(cp) else 0L - digest.update(s":$mtime:$size".getBytes("UTF-8")) - } - digest.update(0.toByte) - } - - // Hash config - val configStr = config match { - case ScalaConfig(version, options) => s"scala:$version:${options.mkString(",")}" - case KotlinConfig(version, jvmTarget, options) => s"kotlin:$version:$jvmTarget:${options.mkString(",")}" - case JavaConfig(release, options, ecjVersion) => s"java:${release.getOrElse("default")}:${options.mkString(",")}:ecj=${ecjVersion.getOrElse("javac")}" - } - digest.update(configStr.getBytes("UTF-8")) - - val hash = digest.digest().map("%02x".format(_)).mkString - CompileKey(hash) - } -} - -/** Cross-process file lock with stale detection. - * - * Uses Java's FileChannel.lock() for cross-process coordination. Acts as a mutex only - no result sharing across processes. Detects stale locks from crashed - * processes by checking PID and timeout. - */ -object CrossProcessLock { - private val lockTimeout = 30.minutes // Max time a compile can hold a lock - - /** Try to acquire a lock, with stale lock recovery */ - def tryAcquire(lockPath: Path): IO[Option[AcquiredLock]] = - for { - _ <- IO.blocking(Files.createDirectories(lockPath.getParent)) - channelAndLock <- IO.blocking { - val channel = FileChannel.open( - lockPath, - StandardOpenOption.CREATE, - StandardOpenOption.WRITE - ) - val lock = channel.tryLock() - (channel, Option(lock)) - } - result <- channelAndLock match { - case (channel, Some(lock)) => - // Got the lock - write our PID - for { - _ <- writeLockInfo(lockPath, ProcessHandle.current().pid(), Instant.now()) - } yield Some(AcquiredLock(lock, channel, lockPath)) - - case (channel, None) => - // Lock held by another process - check if stale - for { - _ <- IO.blocking(channel.close()) - stale <- isLockStale(lockPath) - result <- - if stale then - // Break stale lock and retry - breakLock(lockPath) >> tryAcquire(lockPath) - else IO.pure(None) - } yield result - } - } yield result - - /** Acquire a lock, waiting if necessary */ - def acquire(lockPath: Path, timeout: FiniteDuration): IO[Option[AcquiredLock]] = { - def loop(remaining: FiniteDuration): IO[Option[AcquiredLock]] = - if remaining <= 0.millis then IO.pure(None) - else - tryAcquire(lockPath).flatMap { - case Some(lock) => IO.pure(Some(lock)) - case None => - IO.sleep(100.millis) >> loop(remaining - 100.millis) - } - loop(timeout) - } - - private def writeLockInfo(lockPath: Path, pid: Long, started: Instant): IO[Unit] = { - val infoPath = lockPath.resolveSibling("lock.info") - IO.blocking { - val _ = Files.writeString(infoPath, s"$pid\n${started.toEpochMilli}") - () - } - } - - private def readLockInfo(lockPath: Path): IO[Option[(Long, Instant)]] = { - val infoPath = lockPath.resolveSibling("lock.info") - IO.blocking { - if Files.exists(infoPath) then { - val lines = Files.readString(infoPath).split("\n") - if lines.length >= 2 then Some((lines(0).toLong, Instant.ofEpochMilli(lines(1).toLong))) - else None - } else None - }.handleError(_ => None) - } - - private def isLockStale(lockPath: Path): IO[Boolean] = - readLockInfo(lockPath).flatMap { - case None => IO.pure(true) // No info = stale - case Some((pid, started)) => - for { - processAlive <- IO.blocking { - ProcessHandle.of(pid).map(_.isAlive).orElse(false) - } - timedOut = started.plusMillis(lockTimeout.toMillis).isBefore(Instant.now()) - } yield !processAlive || timedOut - } - - private def breakLock(lockPath: Path): IO[Unit] = { - val infoPath = lockPath.resolveSibling("lock.info") - IO.blocking { - Files.deleteIfExists(lockPath) - Files.deleteIfExists(infoPath) - }.void - } - - case class AcquiredLock(lock: JFileLock, channel: FileChannel, path: Path) { - def release: IO[Unit] = IO - .blocking { - try lock.release() - finally channel.close() - val _ = Files.deleteIfExists(path.resolveSibling("lock.info")) - () - } - .handleError(_ => ()) - } -} - -/** In-process compilation state for sharing results between fibers. - * - * Tracks in-flight compilations within this process using Deferred to share results. Multiple fibers requesting the same compile (by CompileKey) will share - * the result. - * - * Note: This is process-local. Cross-process coordination uses file locks but does NOT share results - each process compiles independently. - */ -class InProcessCompilationState { - // Map of CompileKey -> Deferred result (for waiting) - private val inFlight = TrieMap[String, Deferred[IO, CompilationResult]]() - - /** Check if a compilation is already in flight in this process */ - def getInFlight(key: CompileKey): IO[Option[Deferred[IO, CompilationResult]]] = - IO.pure(inFlight.get(key.value)) - - /** Try to register an in-flight compilation. Returns Left(existing) if already registered, Right(new) if we registered. - */ - def tryRegister(key: CompileKey): IO[Either[Deferred[IO, CompilationResult], Deferred[IO, CompilationResult]]] = - for { - deferred <- Deferred[IO, CompilationResult] - result <- IO { - inFlight.putIfAbsent(key.value, deferred) match { - case Some(existing) => Left(existing) - case None => Right(deferred) - } - } - } yield result - - /** Complete an in-flight compilation and remove it */ - def complete(key: CompileKey, result: CompilationResult): IO[Unit] = - for { - maybeDeferred <- IO(inFlight.remove(key.value)) - _ <- maybeDeferred.traverse_(_.complete(result)) - } yield () -} - -/** Compilation coordinator that handles in-process deduplication. - * - * Provides the following guarantees: - * 1. Same inputs (CompileKey) will not be compiled twice within a process - * 2. Multiple fibers requesting same compile share the result via Deferred - * 3. Cross-process: File lock prevents concurrent writes, but no result sharing - * 4. Changed files get new CompileKeys (no interference with in-flight compiles) - * - * Usage: - * ```scala - * val coordinator = CompilationCoordinator.create(lockDir) - * coordinator.compile(sources, classpath, outputDir, config) - * ``` - */ -class CompilationCoordinator( - lockDir: Path, - inProcessState: InProcessCompilationState -) { - - /** Compile with in-process deduplication. - * - * @param sources - * source files (path -> content) - * @param classpath - * compilation classpath - * @param outputDir - * where to write results - * @param config - * compiler configuration - * @return - * compilation result - */ - def compile( - sources: Map[Path, String], - classpath: Seq[Path], - outputDir: Path, - config: LanguageConfig - ): IO[CompilationResult] = { - val key = CompileKey.compute(sources, classpath, config) - compileWithKey(key, sources, classpath, outputDir, config) - } - - private def compileWithKey( - key: CompileKey, - sources: Map[Path, String], - classpath: Seq[Path], - outputDir: Path, - config: LanguageConfig - ): IO[CompilationResult] = - // Try to register as the compiler for this key - inProcessState.tryRegister(key).flatMap { - case Left(existing) => - // Another fiber is already compiling - wait for their result - existing.get - - case Right(deferred) => - // We're responsible for compiling - // Use guaranteeCase to ensure deferred is completed even on error/cancellation - doCompile(sources, classpath, outputDir, config) - .guaranteeCase { - case cats.effect.Outcome.Succeeded(fa) => - fa.flatMap(result => inProcessState.complete(key, result)) - case cats.effect.Outcome.Errored(e) => - // On error, complete with failure result so waiting fibers don't hang - inProcessState.complete( - key, - CompilationFailure( - List( - CompilerError(None, 0, 0, s"Internal error: ${e.getMessage}", None, CompilerError.Severity.Error) - ) - ) - ) - case cats.effect.Outcome.Canceled() => - // On cancellation, complete with cancelled result - inProcessState.complete(key, CompilationCancelled) - } - } - - private def doCompile( - sources: Map[Path, String], - classpath: Seq[Path], - outputDir: Path, - config: LanguageConfig - ): IO[CompilationResult] = { - // Use output directory for lock to prevent concurrent writes to same location - // Use SHA-256 hash to avoid collision issues with hashCode() - val digest = MessageDigest.getInstance("SHA-256") - val pathHash = digest - .digest(outputDir.toString.getBytes("UTF-8")) - .take(16) - .map("%02x".format(_)) - .mkString - val lockPath = lockDir.resolve(s"$pathHash.lock") - - CrossProcessLock.acquire(lockPath, 10.minutes).flatMap { - case Some(lock) => - performCompilation(sources, classpath, outputDir, config) - .guarantee(lock.release) - - case None => - // Couldn't acquire lock after timeout - compile anyway (might fail) - // This is a fallback for edge cases - performCompilation(sources, classpath, outputDir, config) - } - } - - private def performCompilation( - sources: Map[Path, String], - classpath: Seq[Path], - outputDir: Path, - config: LanguageConfig - ): IO[CompilationResult] = - IO.blocking { - Files.createDirectories(outputDir) - - config match { - case kotlinConfig: KotlinConfig => - compileMixedKotlinJava(sources, classpath, outputDir, kotlinConfig) - case _ => - val sourceFiles = sources.map { case (path, content) => SourceFile(path, content) }.toSeq - val input = CompilationInput(sourceFiles, classpath, outputDir, config) - val compiler = Compiler.forConfig(config) - compiler.compile(input, DiagnosticListener.noop, CancellationToken.never) - } - } - - /** Mixed Kotlin+Java compilation: Java first, then Kotlin. - * - * Java is compiled first so that Kotlin can resolve Java types from the output directory. This handles the common case of generated Java sources (Avro, - * OpenAPI) in Kotlin projects. For the reverse case (Java depending on Kotlin), a compile order flag could be added to the model later. - */ - private def compileMixedKotlinJava( - sources: Map[Path, String], - classpath: Seq[Path], - outputDir: Path, - kotlinConfig: KotlinConfig - ): CompilationResult = { - val kotlinSources = sources.filter { case (p, _) => p.toString.endsWith(".kt") || p.toString.endsWith(".kts") } - val javaSources = sources.filter { case (p, _) => p.toString.endsWith(".java") } - - if (javaSources.isEmpty) { - // Pure Kotlin — no split needed - val sourceFiles = sources.map { case (path, content) => SourceFile(path, content) }.toSeq - val input = CompilationInput(sourceFiles, classpath, outputDir, kotlinConfig) - return KotlinSourceCompiler.compile(input, DiagnosticListener.noop, CancellationToken.never) - } - - // Step 1: Compile Java with javac - val javaSourceFiles = javaSources.map { case (path, content) => SourceFile(path, content) }.toSeq - val javaConfig = JavaConfig(release = Some(kotlinConfig.jvmTarget.toIntOption.getOrElse(11))) - val javaInput = CompilationInput(javaSourceFiles, classpath, outputDir, javaConfig) - val javaResult = Compiler.forConfig(javaConfig).compile(javaInput, DiagnosticListener.noop, CancellationToken.never) - - javaResult match { - case _: CompilationSuccess => - if (kotlinSources.nonEmpty) { - // Step 2: Compile Kotlin — only .kt files, Java .class files already in outputDir - val kotlinSourceFiles = kotlinSources.map { case (path, content) => SourceFile(path, content) }.toSeq - val kotlinClasspath = classpath :+ outputDir - val kotlinInput = CompilationInput(kotlinSourceFiles, kotlinClasspath, outputDir, kotlinConfig) - KotlinSourceCompiler.compile(kotlinInput, DiagnosticListener.noop, CancellationToken.never) - } else javaResult - case failed => failed - } - } -} - -object CompilationCoordinator { - - /** Create a new compilation coordinator with shared in-process state */ - def create(lockDir: Path): IO[CompilationCoordinator] = - IO { - val inProcessState = new InProcessCompilationState() - new CompilationCoordinator(lockDir, inProcessState) - } - - /** Create with default lock directory */ - def create(workspaceRoot: Path, name: String): IO[CompilationCoordinator] = - create(workspaceRoot.resolve(".bleep").resolve("locks").resolve(name)) - - /** Shared global instance for the process (singleton pattern) */ - private val globalState = new InProcessCompilationState() - - /** Get a coordinator using shared global in-process state */ - def withGlobalState(lockDir: Path): CompilationCoordinator = - new CompilationCoordinator(lockDir, globalState) -} diff --git a/bleep-bsp/src/scala/bleep/analysis/ParallelProjectCompiler.scala b/bleep-bsp/src/scala/bleep/analysis/ParallelProjectCompiler.scala index 38f6f5eda..8aff41e40 100644 --- a/bleep-bsp/src/scala/bleep/analysis/ParallelProjectCompiler.scala +++ b/bleep-bsp/src/scala/bleep/analysis/ParallelProjectCompiler.scala @@ -126,7 +126,8 @@ object ParallelProjectCompiler { // This ensures new tasks start as soon as a slot opens, not when an entire batch finishes. def loop( runningRef: Ref[IO, Set[String]], - signalRef: Ref[IO, Deferred[IO, Unit]] + signalRef: Ref[IO, Deferred[IO, Unit]], + supervisor: cats.effect.std.Supervisor[IO] ): IO[Unit] = for { completed <- completedRef.get @@ -142,7 +143,7 @@ object ParallelProjectCompiler { // returning, otherwise Zinc keeps writing class files after the "Cancelled" response has been sent. if (running.nonEmpty) signalRef.get.flatMap(_.get) >> - Deferred[IO, Unit].flatMap(s => signalRef.set(s) >> loop(runningRef, signalRef)) + Deferred[IO, Unit].flatMap(s => signalRef.set(s) >> loop(runningRef, signalRef, supervisor)) else IO.unit } else { val hasFailures = completed.values.exists(!_.isSuccess) @@ -157,26 +158,31 @@ object ParallelProjectCompiler { } else if (hasFailures) { // Failures exist but tasks still running — wait for them to drain signalRef.get.flatMap(_.get) >> - Deferred[IO, Unit].flatMap(s => signalRef.set(s) >> loop(runningRef, signalRef)) + Deferred[IO, Unit].flatMap(s => signalRef.set(s) >> loop(runningRef, signalRef, supervisor)) } else { val ready = dag.ready(completedNames) -- completedNames -- running val availableSlots = parallelism - running.size val toStart = ready.toList.take(availableSlots) - // Launch each task as a fire-and-forget fiber + // Spawn each compile via a Supervisor: if the surrounding fiber is cancelled + // mid-build, every still-running supervised fiber is cancelled on resource + // release. Previously these were `.start.void` orphans — a cancelled outer + // would leave them running, with Zinc writing class files indefinitely. toStart.traverse_ { name => runningRef.update(_ + name) >> - (progressListener.onProjectStarted(name) >> - compileProject(dag.projects(name), dag, deferreds, diagnosticListener, cancellationToken).flatMap { result => - progressListener.onProjectFinished(name, result) >> - completedRef.update(_ + (name -> result)) >> - deferreds(name).complete(result).attempt.void - }) - .guarantee( - runningRef.update(_ - name) >> - signalRef.get.flatMap(_.complete(()).attempt.void) + supervisor + .supervise( + (progressListener.onProjectStarted(name) >> + compileProject(dag.projects(name), dag, deferreds, diagnosticListener, cancellationToken).flatMap { result => + progressListener.onProjectFinished(name, result) >> + completedRef.update(_ + (name -> result)) >> + deferreds(name).complete(result).attempt.void + }) + .guarantee( + runningRef.update(_ - name) >> + signalRef.get.flatMap(_.complete(()).attempt.void) + ) ) - .start .void } >> // If nothing is running and nothing was started, we're stuck or done @@ -198,19 +204,21 @@ object ParallelProjectCompiler { } else { // Wait for any task to complete, then re-evaluate signalRef.get.flatMap(_.get) >> - Deferred[IO, Unit].flatMap(s => signalRef.set(s) >> loop(runningRef, signalRef)) + Deferred[IO, Unit].flatMap(s => signalRef.set(s) >> loop(runningRef, signalRef, supervisor)) } } } } } yield () - for { - runningRef <- Ref.of[IO, Set[String]](Set.empty) - initialSignal <- Deferred[IO, Unit] - signalRef <- Ref.of[IO, Deferred[IO, Unit]](initialSignal) - _ <- loop(runningRef, signalRef) - } yield () + cats.effect.std.Supervisor[IO](await = false).use { supervisor => + for { + runningRef <- Ref.of[IO, Set[String]](Set.empty) + initialSignal <- Deferred[IO, Unit] + signalRef <- Ref.of[IO, Deferred[IO, Unit]](initialSignal) + _ <- loop(runningRef, signalRef, supervisor) + } yield () + } } private def compileProject( diff --git a/bleep-bsp/src/scala/bleep/analysis/ZincBridge.scala b/bleep-bsp/src/scala/bleep/analysis/ZincBridge.scala index 7e1189f98..b60040ef4 100644 --- a/bleep-bsp/src/scala/bleep/analysis/ZincBridge.scala +++ b/bleep-bsp/src/scala/bleep/analysis/ZincBridge.scala @@ -62,6 +62,24 @@ object ZincBridge { /** Singleton incremental compiler — stateless, thread-safe. */ private lazy val incrementalCompiler: IncrementalCompiler = ZincUtil.defaultIncrementalCompiler + /** Counter for naming each ECJ compile thread uniquely. Used by jstack-readable thread names and by the abandoned-thread registry below. + */ + private[analysis] val ecjThreadCounter: java.util.concurrent.atomic.AtomicLong = new java.util.concurrent.atomic.AtomicLong(0) + + /** Threads that didn't terminate within the 30s post-cancel join. Identity-set so a thread that eventually self-completes (via the finally in its run body) + * removes itself; operators reading `abandonedEcjThreadsSnapshot` get an accurate live count. See `runEcj` for context — known leak when the + * CompilationProgress bridge is unavailable and the user cancels mid-compile. + */ + private[analysis] val abandonedEcjThreads: java.util.Set[Thread] = + java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap[Thread, java.lang.Boolean]()) + + /** Snapshot of currently-leaked ECJ compile threads. Intended for diagnostics (oncall, support). */ + def abandonedEcjThreadsSnapshot: List[(String, Long)] = { + val out = List.newBuilder[(String, Long)] + abandonedEcjThreads.forEach(t => out += ((t.getName, t.threadId()))) + out.result() + } + /** Cached ZincScalaInstance per Scala version. Immutable, safe to share. */ private val scalaInstanceCache = new java.util.concurrent.ConcurrentHashMap[String, ZincScalaInstance]() @@ -1294,7 +1312,13 @@ private class EcjCompiler( val successRef = new java.util.concurrent.atomic.AtomicReference[java.lang.Boolean](java.lang.Boolean.FALSE) val exceptionRef = new java.util.concurrent.atomic.AtomicReference[Throwable](null) - val compileThread = new Thread("ecj-compiler") { + val threadName = s"ecj-compiler-${ZincBridge.ecjThreadCounter.incrementAndGet()}" + val compileThread = new Thread(threadName) { + // Daemon so the JVM can exit even if we end up abandoning this thread on a cancel-and- + // ECJ-won't-stop scenario (see comments below). Note: ECJ shouldn't normally leak a + // thread; this only kicks in when the CompilationProgress bridge is unavailable AND the + // user cancelled mid-compile. + setDaemon(true) override def run(): Unit = try successRef.set(compileMethod.invoke(ecjMain, args.toArray).asInstanceOf[java.lang.Boolean]) @@ -1302,7 +1326,10 @@ private class EcjCompiler( case _: InterruptedException => () case e: java.lang.reflect.InvocationTargetException if e.getCause.isInstanceOf[InterruptedException] => () case e: Throwable => exceptionRef.set(e) - } + } finally + // Always deregister, whether the thread completed normally or was abandoned and + // later self-terminated. + ZincBridge.abandonedEcjThreads.remove(this): Unit } // IMPORTANT: Do NOT call compileThread.interrupt(). Java NIO spec says @@ -1332,7 +1359,16 @@ private class EcjCompiler( if (cancellationToken.isCancelled && compileThread.isAlive) { compileThread.join(30000) // 30 second timeout if (compileThread.isAlive) { - System.err.println(s"[ZincBridge] WARNING: ECJ thread did not exit within 30s after cancellation") + // Thread is being abandoned: register it so we have visibility (jstack name shows + // the counter; the registry lets ChildProcessDiagnostics surface a count). The + // thread itself stays alive — ECJ will continue writing class files to disk and + // pinning its in-memory symbol tables until it self-completes. TODO: per-compile + // ECJ classloader so abandoning the thread also lets GC reclaim ECJ's symbol table + // memory. + ZincBridge.abandonedEcjThreads.add(compileThread): Unit + System.err.println( + s"[ZincBridge] WARNING: ECJ thread '$threadName' did not exit within 30s after cancellation (now ${ZincBridge.abandonedEcjThreads.size} abandoned threads)" + ) } } diff --git a/bleep-bsp/src/scala/bleep/bsp/BspDiagnosticTracker.scala b/bleep-bsp/src/scala/bleep/bsp/BspDiagnosticTracker.scala index cca26c204..9d9f51dec 100644 --- a/bleep-bsp/src/scala/bleep/bsp/BspDiagnosticTracker.scala +++ b/bleep-bsp/src/scala/bleep/bsp/BspDiagnosticTracker.scala @@ -1,6 +1,7 @@ package bleep.bsp import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicReference import scala.jdk.CollectionConverters.* /** Tracks BSP diagnostic state across compilation cycles to implement the reset protocol. @@ -12,20 +13,25 @@ import scala.jdk.CollectionConverters.* * * Without this, clients like Metals show "sticky" errors that persist after being fixed (issue #526). * - * Thread-safe: diagnostic listeners from concurrent project compilations can call `recordDiagnostic` safely. + * Thread-safe: diagnostic listeners from concurrent project compilations within the same cycle can call `recordDiagnostic` safely. + * + * Lifetime: ONE tracker per build operation. Don't share a tracker across concurrent operations — `handleCompile` and `handleTest` running side-by-side each + * need their own. Sharing causes `startCycle` from one to wipe the other's in-flight state. */ class BspDiagnosticTracker { /** Key for tracking: (document URI string, target URI string) */ private type FileTarget = (String, String) - /** Files that had diagnostics in the previous compilation cycle */ - @volatile private var previousFiles: Set[FileTarget] = Set.empty - - /** Files that have received at least one diagnostic in the current cycle. `add()` returns true if the element was newly added (= first diagnostic = reset). + /** Atomic swap-on-startCycle. The "current cycle" set is held inside this ref so a `startCycle` rotates the entire set in one CAS, instead of two non-atomic + * ops (snapshot, clear) that allowed concurrent `recordDiagnostic` callers to land in the "old" set after the snapshot and get wiped by the clear. + * + * `previousFiles` is captured by the rotation, also atomically. */ - private val currentFiles: ConcurrentHashMap.KeySetView[FileTarget, java.lang.Boolean] = - ConcurrentHashMap.newKeySet() + private val currentRef: AtomicReference[ConcurrentHashMap.KeySetView[FileTarget, java.lang.Boolean]] = + new AtomicReference(ConcurrentHashMap.newKeySet()) + + @volatile private var previousFiles: Set[FileTarget] = Set.empty /** Record a diagnostic being published. Returns the `reset` value to use. * @@ -34,17 +40,18 @@ class BspDiagnosticTracker { * append). */ def recordDiagnostic(docUri: String, targetUri: String): Boolean = - currentFiles.add((docUri, targetUri)) // ConcurrentHashMap.KeySetView.add returns true if newly added + currentRef.get().add((docUri, targetUri)) // ConcurrentHashMap.KeySetView.add returns true if newly added /** Prepare for a new compilation cycle. Moves current cycle's files to "previous" for clearing detection. Must be called before compilation begins. */ def startCycle(): Unit = { - previousFiles = Set.from(currentFiles.asScala) - currentFiles.clear() + val fresh = ConcurrentHashMap.newKeySet[FileTarget]() + val prior = currentRef.getAndSet(fresh) + previousFiles = prior.asScala.toSet } /** Get (docUri, targetUri) pairs that had diagnostics last cycle but not this one. These need empty diagnostics with `reset = true` to clear stale errors in * the client. */ def filesToClear(): Set[FileTarget] = - previousFiles -- currentFiles.asScala.toSet + previousFiles -- currentRef.get().asScala.toSet } diff --git a/bleep-bsp/src/scala/bleep/bsp/BspMetrics.scala b/bleep-bsp/src/scala/bleep/bsp/BspMetrics.scala index 5443a953b..ac04bd10e 100644 --- a/bleep-bsp/src/scala/bleep/bsp/BspMetrics.scala +++ b/bleep-bsp/src/scala/bleep/bsp/BspMetrics.scala @@ -4,7 +4,8 @@ import java.io.{BufferedWriter, FileWriter} import java.lang.management.ManagementFactory import java.nio.file.{Files, Path} import java.util.Locale -import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong} import scala.jdk.CollectionConverters._ /** Lightweight metrics collector for the BSP server. @@ -16,8 +17,21 @@ object BspMetrics { @volatile private var writer: BufferedWriter = scala.compiletime.uninitialized @volatile private var samplerThread: Thread = scala.compiletime.uninitialized + @volatile private var writerThread: Thread = scala.compiletime.uninitialized @volatile private var metricsPath: Path = scala.compiletime.uninitialized + /** Bounded queue of pending event JSON lines. A dedicated writer thread drains this into the BufferedWriter so event-producing threads (every compile/test + * phase) don't serialize on BufferedWriter.flush — which under high event volume turned `recordCompilePhase` into a synchronous fsync per call. Cap is + * generous: tens of thousands of events queue up to ~16MB before backpressure kicks in. + */ + private val pendingEvents = new LinkedBlockingQueue[String](200000) + + /** Counter of dropped events (queue saturated). Surfaced in the shutdown summary. */ + private val droppedEvents = new AtomicInteger(0) + + /** Flipped by `shutdown()`. The writer thread polls this so it can drain and exit. */ + private val shuttingDown = new AtomicBoolean(false) + // High watermarks private val maxConcurrentCompiles = AtomicInteger(0) private val maxActiveConnections = AtomicInteger(0) @@ -66,6 +80,43 @@ object BspMetrics { t.setDaemon(true) t.start() samplerThread = t + + val writerT = new Thread("bsp-metrics-writer") { + override def run(): Unit = { + val batch = new java.util.ArrayList[String](256) + try + while (!(shuttingDown.get() && pendingEvents.isEmpty)) { + // Block for the first event; then drain whatever else is queued in one batch. + val first = pendingEvents.poll(250, java.util.concurrent.TimeUnit.MILLISECONDS) + if (first != null) { + batch.clear() + batch.add(first) + pendingEvents.drainTo(batch, 1024) + val w = writer + if (w != null) { + // Single synchronized region per batch, single flush at the end — far less + // contention with shutdown.close than per-event sync+flush. + w.synchronized { + try { + val it = batch.iterator() + while (it.hasNext) { + w.write(it.next()) + w.newLine() + } + w.flush() + } catch { case _: Exception => () } + } + } + } + } + catch { + case _: InterruptedException => () + } + } + } + writerT.setDaemon(true) + writerT.start() + writerThread = writerT } def shutdown(): Unit = { @@ -74,11 +125,20 @@ object BspMetrics { t.interrupt() t.join(2000) } + // Enqueue the summary BEFORE flipping shuttingDown so the writer drains it. writeSummary() + shuttingDown.set(true) + val wt = writerThread + if (wt != null) { + // Give the writer up to 2s to drain. If it can't, we lose some tail events — + // acceptable at shutdown. + wt.join(2000) + } val w = writer if (w != null) { w.synchronized { - w.close() + try w.close() + catch { case _: Exception => () } } } } @@ -226,9 +286,10 @@ object BspMetrics { private def writeSummary(): Unit = { val threadBean = ManagementFactory.getThreadMXBean - writeEvent( + writeEventSync( s"""{"type":"summary","ts":${now()},"max_concurrent_compiles":${maxConcurrentCompiles.get()},"max_active_connections":${maxActiveConnections - .get()},"peak_threads":${threadBean.getPeakThreadCount},"max_heap_used_mb":${maxHeapUsedBytes.get() / (1024 * 1024)}}""" + .get()},"peak_threads":${threadBean.getPeakThreadCount},"max_heap_used_mb":${maxHeapUsedBytes.get() / (1024 * 1024)},"dropped_events":${droppedEvents + .get()}}""" ) } @@ -236,13 +297,26 @@ object BspMetrics { private def now(): Long = System.currentTimeMillis() - private def writeEvent(json: String): Unit = { + /** Enqueue an event for the writer thread. Non-blocking — if the queue is full (extreme load or writer thread stuck), the event is dropped and counted in + * `droppedEvents`. This is the right tradeoff for metrics: never block a compile to record a metric, surface the loss in the summary instead. + */ + private def writeEvent(json: String): Unit = + if (writer != null) { + if (!pendingEvents.offer(json)) droppedEvents.incrementAndGet(): Unit + } + + /** Synchronously write — used only at shutdown for the summary, when the writer thread is about to be joined and we want the summary to land on disk + * regardless of queue draining. + */ + private def writeEventSync(json: String): Unit = { val w = writer if (w != null) { w.synchronized { - w.write(json) - w.newLine() - w.flush() + try { + w.write(json) + w.newLine() + w.flush() + } catch { case _: Exception => () } } } } diff --git a/bleep-bsp/src/scala/bleep/bsp/BspServerDaemon.scala b/bleep-bsp/src/scala/bleep/bsp/BspServerDaemon.scala index 1fdf74996..af224a184 100644 --- a/bleep-bsp/src/scala/bleep/bsp/BspServerDaemon.scala +++ b/bleep-bsp/src/scala/bleep/bsp/BspServerDaemon.scala @@ -196,10 +196,13 @@ object BspServerDaemon { val connectionCounter = AtomicInteger(0) val activeClientThreads = ConcurrentHashMap.newKeySet[Thread]() - // Server-wide compile semaphore: limits total concurrent compilations across all connections + // Server-wide compile semaphore: limits total concurrent compilations across all connections. + // FIFO-fair so an IDE typing rapidly can't starve a parallel `bleep test` (or vice versa) — + // unfair semaphores give better throughput but allow indefinite waiting under sustained + // contention, which is the wrong tradeoff for a mixed-traffic build server. val numCores = Runtime.getRuntime.availableProcessors() - val compileSemaphore = new java.util.concurrent.Semaphore(numCores) - logger.info(s"Compile semaphore: $numCores permits (based on available processors)") + val compileSemaphore = new java.util.concurrent.Semaphore(numCores, /* fair = */ true) + logger.info(s"Compile semaphore: $numCores permits (based on available processors, fair)") // NOTE: Do NOT redirect stdout — Zinc writes massive amounts of data to // stdout which would bloat the log file to tens of GB. diff --git a/bleep-bsp/src/scala/bleep/bsp/InProcessBspServer.scala b/bleep-bsp/src/scala/bleep/bsp/InProcessBspServer.scala index 97d064ef7..399614bc8 100644 --- a/bleep-bsp/src/scala/bleep/bsp/InProcessBspServer.scala +++ b/bleep-bsp/src/scala/bleep/bsp/InProcessBspServer.scala @@ -4,6 +4,7 @@ import cats.effect.{IO, Resource} import ryddig.Logger import java.io.{PipedInputStream, PipedOutputStream} +import java.util.concurrent.CompletableFuture /** Creates an in-process BSP server connected via piped streams. * @@ -20,36 +21,46 @@ object InProcessBspServer { val clientIn = new PipedInputStream(1048576) // 1MB buffer val serverOut = new PipedOutputStream(clientIn) // server writes -> client reads + // Use CompletableFuture (not a Deferred) so the server thread can signal exit without + // bouncing through cats-effect from a non-IO thread. IO.fromCompletableFuture bridges + // it back into IO for callers. + val exited = new CompletableFuture[java.lang.Integer]() + // Start BSP server in a daemon thread val serverThread = new Thread("in-process-bsp-server") { setDaemon(true) - override def run(): Unit = + override def run(): Unit = { + var exitCode: java.lang.Integer = 0 try { - val semaphore = new java.util.concurrent.Semaphore(Runtime.getRuntime.availableProcessors()) + val semaphore = new java.util.concurrent.Semaphore(Runtime.getRuntime.availableProcessors(), /* fair = */ true) val server = new MultiWorkspaceBspServer(serverIn, serverOut, logger, compileSemaphore = semaphore, heapMonitor = HeapMonitor.system) server.run() } catch { - case e: Exception => + case e: Throwable => + exitCode = 1 logger.error(s"In-process BSP server failed: ${e.getClass.getName}: ${e.getMessage}", e) } finally { try serverOut.close() catch { case _: Exception => () } try serverIn.close() catch { case _: Exception => () } + exited.complete(exitCode): Unit } + } } serverThread.start() - new InProcessConnection(clientIn, clientOut) + new InProcessConnection(clientIn, clientOut, exited) } )(_.close) private class InProcessConnection( val input: java.io.InputStream, - val output: java.io.OutputStream + val output: java.io.OutputStream, + exited: CompletableFuture[java.lang.Integer] ) extends BspConnection { - def serverExited: IO[Int] = IO.never + def serverExited: IO[Int] = IO.fromCompletableFuture(IO.pure(exited)).map(_.intValue) def close: IO[Unit] = IO.blocking { try output.close() catch { case _: Exception => () } diff --git a/bleep-bsp/src/scala/bleep/bsp/JsonRpcTransport.scala b/bleep-bsp/src/scala/bleep/bsp/JsonRpcTransport.scala index 79fe0733f..f1f7c2595 100644 --- a/bleep-bsp/src/scala/bleep/bsp/JsonRpcTransport.scala +++ b/bleep-bsp/src/scala/bleep/bsp/JsonRpcTransport.scala @@ -383,8 +383,16 @@ class JsonRpcTransport( } def close(): Unit = { - bufferedIn.close() - bufferedOut.close() + // Synchronize close against concurrent reads/writes: a `close` racing a `sendMessage` + // can leave bytes in the buffer or trip a write-on-closed-stream IOException. + readLock.synchronized { + try bufferedIn.close() + catch { case _: Exception => () } + } + synchronized { + try bufferedOut.close() + catch { case _: Exception => () } + } } } diff --git a/bleep-bsp/src/scala/bleep/bsp/MultiWorkspaceBspServer.scala b/bleep-bsp/src/scala/bleep/bsp/MultiWorkspaceBspServer.scala index 23323fe3d..4f27df456 100644 --- a/bleep-bsp/src/scala/bleep/bsp/MultiWorkspaceBspServer.scala +++ b/bleep-bsp/src/scala/bleep/bsp/MultiWorkspaceBspServer.scala @@ -122,8 +122,9 @@ class MultiWorkspaceBspServer( /** Operation IDs registered by this connection (for cleanup on disconnect) */ private val myOperationIds = ConcurrentHashMap.newKeySet[String]() - /** Tracks diagnostic state across compilation cycles for the BSP reset protocol (issue #526). */ - private val diagnosticTracker = BspDiagnosticTracker() + // Diagnostic trackers are scoped per build operation (handleCompile / handleTest) — created + // at the top of those methods, threaded through compileProject. Sharing one tracker across + // concurrent operations would let one `startCycle` wipe another's in-flight diagnostic state. /** Run the server message loop with concurrent request handling. * @@ -1385,6 +1386,8 @@ class MultiWorkspaceBspServer( private def handleCompile(params: CompileParams, cancellation: CancellationToken): CompileResult = { val started = getActiveBuild.fold(msg => throw BspException(JsonRpcErrorCodes.InternalError, msg), identity) + // Per-operation tracker — see field-level comment on diagnosticTracker for why we don't share. + val diagnosticTracker = new BspDiagnosticTracker diagnosticTracker.startCycle() // Parse link options from arguments @@ -1576,7 +1579,7 @@ class MultiWorkspaceBspServer( // KSP doesn't need an equivalent map: the runner emits files to disk that the project's source set picks up directly; no compile-time data flow. val apResults = new java.util.concurrent.ConcurrentHashMap[CrossProjectName, AnnotationProcessorResult]() - val compileHandler = makeCompileHandler(started, workspace, params.originId, serverConfig.effectiveHeapPressureThreshold, apResults) + val compileHandler = makeCompileHandler(started, workspace, params.originId, serverConfig.effectiveHeapPressureThreshold, apResults, diagnosticTracker) val sourcegenHandler = makeSourcegenHandler(started, params.originId) // Create link handler @@ -1653,7 +1656,7 @@ class MultiWorkspaceBspServer( } // Clear stale diagnostics for files that had errors last cycle but not this one - clearStaleDiagnostics() + clearStaleDiagnostics(diagnosticTracker) ioResult match { case Success(dag) => @@ -1803,6 +1806,11 @@ class MultiWorkspaceBspServer( private def handleTest(params: TestParams, cancellation: CancellationToken): TestResult = { val started = getActiveBuild.fold(msg => throw BspException(JsonRpcErrorCodes.InternalError, msg), identity) + // Per-operation diagnostic tracker — keeps test-pipeline compiles' diagnostic state isolated + // from any concurrent handleCompile, which otherwise race on startCycle. + val diagnosticTracker = new BspDiagnosticTracker + diagnosticTracker.startCycle() + val testProjects = params.targets.flatMap { targetId => crossNameFromTargetId(started, targetId) }.toSet @@ -1945,7 +1953,8 @@ class MultiWorkspaceBspServer( // intermediate compile-time data flow, so no equivalent map. val apResults = new java.util.concurrent.ConcurrentHashMap[CrossProjectName, AnnotationProcessorResult]() - val compileHandler = makeCompileHandler(started, workspace, params.originId, serverConfig.effectiveHeapPressureThreshold, apResults) + val compileHandler = + makeCompileHandler(started, workspace, params.originId, serverConfig.effectiveHeapPressureThreshold, apResults, diagnosticTracker) val sourcegenHandler = makeSourcegenHandler(started, params.originId) val includeTagsSet = testOptions.includeTags.toSet @@ -2318,7 +2327,8 @@ class MultiWorkspaceBspServer( workspace: Path, originId: Option[String], heapPressureThreshold: Double, - apResults: java.util.concurrent.ConcurrentHashMap[CrossProjectName, AnnotationProcessorResult] + apResults: java.util.concurrent.ConcurrentHashMap[CrossProjectName, AnnotationProcessorResult], + diagnosticTracker: BspDiagnosticTracker ): (TaskDag.CompileTask, Deferred[IO, KillReason]) => IO[TaskDag.TaskResult] = (compileTask, taskKillSignal) => { val projectName = compileTask.project.value @@ -2352,7 +2362,7 @@ class MultiWorkspaceBspServer( waitForHeapPressure(projectName, originId, heapPressureThreshold) >> { val compileStartTime = System.currentTimeMillis() IO(BspMetrics.recordCompileStart(projectName, wsStr)) >> - compileProject(started, compileTask.project, originId, token, depAnalyses, apFlags) + compileProject(started, compileTask.project, originId, token, depAnalyses, apFlags, diagnosticTracker) .guaranteeCase { case cats.effect.Outcome.Succeeded(resultIO) => resultIO.flatMap { result => @@ -2383,7 +2393,8 @@ class MultiWorkspaceBspServer( originId: Option[String], cancellation: CancellationToken, dependencyAnalyses: Map[Path, Path], - additionalJavaOptions: List[String] + additionalJavaOptions: List[String], + diagnosticTracker: BspDiagnosticTracker ): IO[TaskDag.TaskResult] = { val config = BleepBuildConverter.toProjectConfig(project, started.resolvedProject(project), started, additionalJavaOptions) val compiler = ProjectCompiler.forLanguage(config.language) @@ -3185,19 +3196,24 @@ class MultiWorkspaceBspServer( case _ => IO.unit } - /** Wrap event processing with dead-client detection and kill signal propagation. */ + /** Wrap event processing with dead-client detection and kill signal propagation. + * + * Only the disconnection-handling tail is uncancelable — we want the killSignal completion + log to run atomically once we observe `clientDisconnected`. + * `processEvent` itself stays cancelable so a slow `sendNotification` to a wedged client can be interrupted by the outer build-cancel rather than pinning + * the consumer fiber. + */ private def withDeadClientDetection( killSignal: Deferred[IO, KillReason], contextLabel: String )(processEvent: IO[Unit]): IO[Unit] = - IO.uncancelable { _ => - processEvent >> IO.whenA(clientDisconnected.get()) { - IO.raiseError(new java.io.IOException("Client disconnected (detected via sendNotification)")) - } - }.handleErrorWith { + (processEvent >> IO.whenA(clientDisconnected.get()) { + IO.raiseError(new java.io.IOException("Client disconnected (detected via sendNotification)")) + }).handleErrorWith { case error: java.io.IOException => - IO(logger.withContext("error", error.getMessage).error(s"$contextLabel event send failed (connection dead)")) >> - killSignal.complete(KillReason.DeadClient).attempt >> IO.raiseError(error) + IO.uncancelable { _ => + IO(logger.withContext("error", error.getMessage).error(s"$contextLabel event send failed (connection dead)")) >> + killSignal.complete(KillReason.DeadClient).attempt.void + } >> IO.raiseError(error) case error => IO(logger.withContext("error", error.getMessage).error(s"$contextLabel event processing failed")) >> IO.raiseError(error) @@ -3520,7 +3536,7 @@ class MultiWorkspaceBspServer( } /** Send empty diagnostics with reset=true for files that had errors in the previous compilation but are now clean. */ - private def clearStaleDiagnostics(): Unit = + private def clearStaleDiagnostics(diagnosticTracker: BspDiagnosticTracker): Unit = diagnosticTracker.filesToClear().foreach { case (docUri, targetUri) => val publishParams = PublishDiagnosticsParams( textDocument = TextDocumentIdentifier(Uri(java.net.URI.create(docUri))), @@ -3887,29 +3903,32 @@ object MultiWorkspaceBspServer { model.Dep.Java("org.junit.vintage", "junit-vintage-engine", "5.9.1") ) - /** Process-wide memoization of the [[externalTestRunnerDeps]] resolution. + /** Per-resolver memoization of the [[externalTestRunnerDeps]] resolution. * - * The four deps don't change across workspaces or bleep versions, so resolving them once per JVM avoids re-running Coursier on every inner-bleep - * `commands.test`. Without this cache, each test workspace's [[InProcessBspServer]] (a fresh [[MultiWorkspaceBspServer]] per `commands.test` call) - * re-fetches the same artifacts; under CI's CPU contention with two parallel test JVMs that's enough to trip the 120 s suite-idle timeout in #580. + * The four deps don't change across workspaces or bleep versions, so resolving them once per resolver instance avoids re-running Coursier on every + * inner-bleep `commands.test`. Without this cache, each test workspace's [[InProcessBspServer]] (a fresh [[MultiWorkspaceBspServer]] per `commands.test` + * call) re-fetches the same artifacts; under CI's CPU contention with two parallel test JVMs that's enough to trip the 120 s suite-idle timeout in #580. * - * `AtomicReference` so we can populate it lock-free on first call and read it without synchronization on every subsequent call. The compute may run twice if - * two callers race; harmless — the resolved jars are identical, and Coursier's own disk cache handles concurrent downloads. + * Keyed by resolver-instance identity (not process-wide) so two BSP servers configured with different resolver settings — different mirrors, repositories, + * credentials — don't share jars resolved against the wrong config. Same resolver instance reused across calls within a server still hits the cache. */ - private val cachedExternalTestRunnerJars: java.util.concurrent.atomic.AtomicReference[List[Path]] = - new java.util.concurrent.atomic.AtomicReference[List[Path]](null) + private val cachedExternalTestRunnerJars: java.util.concurrent.ConcurrentHashMap[CoursierResolver, List[Path]] = + new java.util.concurrent.ConcurrentHashMap[CoursierResolver, List[Path]]() private def fetchExternalTestRunnerDeps(started: Started): List[Path] = { - val cached = cachedExternalTestRunnerJars.get() + val resolver = started.resolver + val cached = cachedExternalTestRunnerJars.get(resolver) if (cached != null) return cached - val result = started.resolver.force( + val result = resolver.force( externalTestRunnerDeps, model.VersionCombo.Jvm(model.VersionScala.Scala3), libraryVersionSchemes = SortedSet.empty[model.LibraryVersionScheme], context = "resolving bleep-test-runner external deps", model.IgnoreEvictionErrors.No ) - cachedExternalTestRunnerJars.compareAndSet(null, result.jars) - cachedExternalTestRunnerJars.get() + // putIfAbsent: identical resolver from two threads is harmless (Coursier's disk cache handles + // concurrent downloads, and the jars resolved against the same config are identical). + val existing = cachedExternalTestRunnerJars.putIfAbsent(resolver, result.jars) + if (existing != null) existing else result.jars } } diff --git a/bleep-bsp/src/scala/bleep/bsp/Outcome.scala b/bleep-bsp/src/scala/bleep/bsp/Outcome.scala index 14eef3b1d..4c58f06f2 100644 --- a/bleep-bsp/src/scala/bleep/bsp/Outcome.scala +++ b/bleep-bsp/src/scala/bleep/bsp/Outcome.scala @@ -1,6 +1,7 @@ package bleep.bsp import bleep.bsp.protocol.KillReason +import cats.effect.std.Dispatcher import cats.effect.{Deferred, IO} /** Shared ADT types for explicit outcome tracking. @@ -146,18 +147,36 @@ object Outcome { def asKillIO(killSignal: Deferred[IO, KillReason]): IO[KillReason] = killSignal.get + /** Process-wide Dispatcher used to bridge non-IO completion paths (toolchain callbacks, thread.run finalizers) back into CE without `unsafeRunSync`. + * + * `unsafeRunSync` from inside a callback that may fire on a CE compute thread is forbidden — even if the IO being run is non-blocking, it stalls the calling + * thread until completion and reaches into runtime internals. `dispatcher.unsafeRunAndForget` is the legal escape hatch: it submits the IO to the runtime + * without blocking the caller. + * + * The Dispatcher's lifecycle is the JVM's. `.allocated` gives us the value and a release IO; we keep the value and discard the release. This is a deliberate + * controlled leak — the alternative (passing a Dispatcher through every call to `fromCancellationToken`) cascades through dozens of callsites for no + * functional gain at the JVM-lifetime boundary. + */ + private lazy val cancellationBridgeDispatcher: Dispatcher[IO] = { + val (dispatcher, _) = Dispatcher.parallel[IO](await = false).allocated.unsafeRunSync()(using cats.effect.unsafe.IORuntime.global) + dispatcher + } + /** Create a kill signal from a CancellationToken. * - * This bridges the CancellationToken interface to the Deferred-based kill signal. The Deferred will be completed with UserRequest when the CancellationToken - * is cancelled. Uses the token's onCancel callback for instant notification (no polling). + * Bridges the [[bleep.analysis.CancellationToken]] (callback-driven, used by toolchain code that predates CE) to a [[Deferred]] (the CE-side kill signal + * that fibers can race against and `tryGet`/`get`/`complete`). + * + * The token's `onCancel` callback fires on whatever thread calls `token.cancel()` — could be the BSP message-reader thread, the libdaemonjvm shutdown + * thread, or even a CE compute thread (when [[bridgeKillSignal]]'s `.background` fiber observes the outer kill signal and propagates it back to a token). + * Routing the `Deferred.complete` through [[cancellationBridgeDispatcher]] keeps the completion non-blocking and CE-runtime-safe regardless of caller + * thread. */ def fromCancellationToken(cancellation: bleep.analysis.CancellationToken): IO[Deferred[IO, KillReason]] = Deferred[IO, KillReason].flatTap { deferred => IO { cancellation.onCancel { () => - // Complete the kill signal from the cancellation callback thread. - // unsafeRunSync is safe here — Deferred.complete is non-blocking. - deferred.complete(KillReason.UserRequest).attempt.unsafeRunSync()(using cats.effect.unsafe.IORuntime.global): Unit + cancellationBridgeDispatcher.unsafeRunAndForget(deferred.complete(KillReason.UserRequest).attempt.void) } } } @@ -189,6 +208,23 @@ object Outcome { * * Use `IO.interruptibleMany` for plain in-JVM blocking work without those concerns. */ + /** Identity-set of threads spawned by `runInFreshThread` that didn't terminate after their surrounding IO was cancelled. Native compilers (Scala.js/Scala + * Native linker, JNI calls) frequently don't respect `Thread.interrupt()` — the IO returns immediately on cancel but the underlying thread runs to natural + * completion. This registry gives operators visibility into the leak so it can be diagnosed (jstack shows the named threads; a count surfaces here). + * + * Each registered thread is daemon-marked so it doesn't block JVM exit. + */ + val runawayThreads: java.util.Set[Thread] = + java.util.Collections.newSetFromMap(new java.util.concurrent.ConcurrentHashMap[Thread, java.lang.Boolean]()) + + /** Snapshot of currently-running threads spawned by [[runInFreshThread]] that were marked for cancellation but haven't finished. Use for diagnostics. + */ + def runawayThreadsSnapshot: List[(String, Long)] = { + val out = List.newBuilder[(String, Long)] + runawayThreads.forEach(t => out += ((t.getName, t.threadId()))) + out.result() + } + def runInFreshThread[A]( name: String, contextClassLoader: Option[ClassLoader], @@ -197,16 +233,26 @@ object Outcome { fromCancellationToken(cancellation).flatMap { killSignal => val workIO: IO[Either[Throwable, A]] = IO.async[Either[Throwable, A]] { cb => IO.delay { + val threadRef = new java.util.concurrent.atomic.AtomicReference[Thread](null) val runnable: Runnable = () => try cb(Right(Right(work))) catch { case e: Throwable => cb(Right(Left(e))) } + finally { + val t = threadRef.get() + if (t != null) runawayThreads.remove(t): Unit + } val t = new Thread(runnable, name) + threadRef.set(t) contextClassLoader.foreach(t.setContextClassLoader) t.setDaemon(true) t.start() Some(IO.delay { cancellation.cancel() t.interrupt() + // Register only if the thread is still alive shortly after interrupt — most native + // compilers ignore interrupt and keep running. Adding here even if the thread is + // about to self-complete is harmless: the run-body's finally removes it. + if (t.isAlive) runawayThreads.add(t): Unit }) } } @@ -233,26 +279,26 @@ object Outcome { * Usage: `bridgeKillSignal(killSignal).use { cancellation => ...do the work that takes a CancellationToken... }`. */ def bridgeKillSignal(killSignal: Deferred[IO, KillReason]): cats.effect.Resource[IO, bleep.analysis.CancellationToken] = { - import java.util.concurrent.atomic.AtomicBoolean - import scala.collection.mutable.ListBuffer - + // Match [[bleep.analysis.CancellationToken.create]] — CopyOnWriteArrayList so `cancel`'s + // foreach over callbacks doesn't hold any lock. A callback that recursively calls + // `onCancel` (e.g. a nested resource registering its own listener) would deadlock under a + // `synchronized` block; with COWAL the snapshot-iteration is fully unsynchronized and the + // re-entrant `onCancel` lands cleanly. Snapshot vs live-list semantics match the original + // factory exactly. val mkToken: IO[bleep.analysis.CancellationToken] = IO.delay { - val cancelled = new AtomicBoolean(false) - val callbacks = ListBuffer[() => Unit]() + val cancelled = new java.util.concurrent.atomic.AtomicBoolean(false) + val callbacks = new java.util.concurrent.CopyOnWriteArrayList[() => Unit]() new bleep.analysis.CancellationToken { def isCancelled: Boolean = cancelled.get() def cancel(): Unit = if (cancelled.compareAndSet(false, true)) { - callbacks.synchronized { - callbacks.foreach(cb => cb()) - } + callbacks.forEach(cb => cb()) } - def onCancel(callback: () => Unit): Unit = - callbacks.synchronized { - if (cancelled.get()) callback() - else callbacks += callback - }: Unit + def onCancel(callback: () => Unit): Unit = { + callbacks.add(callback) + if (cancelled.get()) callback() + } } } diff --git a/bleep-bsp/src/scala/bleep/bsp/ProjectLock.scala b/bleep-bsp/src/scala/bleep/bsp/ProjectLock.scala index 370408e53..3f6b47c2d 100644 --- a/bleep-bsp/src/scala/bleep/bsp/ProjectLock.scala +++ b/bleep-bsp/src/scala/bleep/bsp/ProjectLock.scala @@ -7,6 +7,7 @@ import java.io.RandomAccessFile import java.nio.channels.{FileChannel, FileLock, OverlappingFileLockException} import java.nio.file.{Files, Path} import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.locks.ReentrantLock /** Per-project read/write lock for compile and link operations. * @@ -38,10 +39,15 @@ object ProjectLock { } private val states = new ConcurrentHashMap[CrossProjectName, ProjectState]() - private val monitors = new ConcurrentHashMap[CrossProjectName, AnyRef]() - private def getMonitor(project: CrossProjectName): AnyRef = - monitors.computeIfAbsent(project, _ => new AnyRef) + /** Per-project fair lock. Fairness (FIFO) prevents an exclusive waiter from being starved by a continuous stream of shared acquirers — under sustained IDE + * compile bursts mixed with `bleep test` runs, the previous JVM-monitor model gave no ordering guarantees and could leave an exclusive wait queued for the + * full 5-minute timeout. + */ + private val locks = new ConcurrentHashMap[CrossProjectName, ReentrantLock]() + + private def getLock(project: CrossProjectName): ReentrantLock = + locks.computeIfAbsent(project, _ => new ReentrantLock( /* fair = */ true)) private def getState(project: CrossProjectName): ProjectState = states.computeIfAbsent(project, _ => new ProjectState) @@ -83,17 +89,22 @@ object ProjectLock { def attempt(remaining: Int, notified: Boolean): IO[Boolean] = IO.blocking { - val monitor = getMonitor(project) - val state = getState(project) - monitor.synchronized { - // JDK's createDirectories has a known TOCTOU race when parallel threads create overlapping - // trees — the internal symlink check uses NOFOLLOW_LINKS and can fail when the final - // component is a symlink-to-directory created by another thread. - try Files.createDirectories(outputDir) - catch { - case _: java.nio.file.FileAlreadyExistsException if Files.isDirectory(outputDir) => () - } + // Create the output dir OUTSIDE the per-project lock. mkdir is idempotent + safe to call + // from many threads concurrently; doing it under the lock just serializes a stat syscall + // that has no contention by itself. + // + // JDK's createDirectories has a known TOCTOU race when parallel threads create overlapping + // trees — the internal symlink check uses NOFOLLOW_LINKS and can fail when the final + // component is a symlink-to-directory created by another thread. + try Files.createDirectories(outputDir) + catch { + case _: java.nio.file.FileAlreadyExistsException if Files.isDirectory(outputDir) => () + } + val lock = getLock(project) + val state = getState(project) + lock.lock() + try { mode match { case LockMode.Shared => if (state.exclusiveLock.isDefined) throw new LockNotAcquiredException(project) @@ -150,7 +161,7 @@ object ProjectLock { } } notified - } + } finally lock.unlock() }.handleErrorWith { case _: LockNotAcquiredException if remaining > 1 => IO(if (!notified) onContention()) >> @@ -172,21 +183,29 @@ object ProjectLock { private def releaseLock(project: CrossProjectName, mode: LockMode): IO[Unit] = IO.blocking { - val monitor = getMonitor(project) val state = states.get(project) - if (state != null) monitor.synchronized { - mode match { - case LockMode.Shared => - state.sharedHolderCount -= 1 - if (state.sharedHolderCount <= 0) { - state.sharedFileLock.foreach(closeQuietly) - state.sharedFileLock = None - state.sharedHolderCount = 0 - } - case LockMode.Exclusive => - state.exclusiveLock.foreach(closeQuietly) - state.exclusiveLock = None - } + if (state != null) { + val lock = getLock(project) + // Snapshot the LockInfo that needs closing under the lock; do the actual file-handle + // closure outside the lock. Closing a FileChannel under load can trip slow paths + // (Windows file system filters, NFS) we don't want to hold the per-project lock for. + var toClose: Option[LockInfo] = None + lock.lock() + try + mode match { + case LockMode.Shared => + state.sharedHolderCount -= 1 + if (state.sharedHolderCount <= 0) { + toClose = state.sharedFileLock + state.sharedFileLock = None + state.sharedHolderCount = 0 + } + case LockMode.Exclusive => + toClose = state.exclusiveLock + state.exclusiveLock = None + } + finally lock.unlock() + toClose.foreach(closeQuietly) } } diff --git a/bleep-bsp/src/scala/bleep/bsp/SharedWorkspaceState.scala b/bleep-bsp/src/scala/bleep/bsp/SharedWorkspaceState.scala index e54874390..1267333e6 100644 --- a/bleep-bsp/src/scala/bleep/bsp/SharedWorkspaceState.scala +++ b/bleep-bsp/src/scala/bleep/bsp/SharedWorkspaceState.scala @@ -31,23 +31,22 @@ object SharedWorkspaceState { ops.put(work.operationId, work): Unit } - /** Unregister a specific operation by ID. */ + /** Unregister a specific operation by ID. + * + * We intentionally do NOT remove the empty inner map: doing so races with a concurrent `register` on the same workspace. `ConcurrentHashMap.remove(K, V)` + * compares values by `equals`, and Java's `AbstractMap.equals` is content-based — so a `register` that snuck in between our `isEmpty` check and the + * `remove(K, V)` call would leave the inner map *appearing* identical to itself (a single new entry), and we'd drop it, losing the just-registered work. The + * inner map is bounded by the number of workspaces this server sees (one in the common case, a handful in tests), so the bookkeeping cost is negligible. + */ def unregister(workspace: Path, operationId: String): Unit = { val ops = activeWork.get(workspace) - if (ops != null) { - ops.remove(operationId) - // Clean up empty inner maps to avoid memory leak - if (ops.isEmpty) activeWork.remove(workspace, ops): Unit - } + if (ops != null) ops.remove(operationId): Unit } /** Unregister specific operations by ID (connection cleanup). */ def unregisterAll(workspace: Path, operationIds: Iterable[String]): Unit = { val ops = activeWork.get(workspace) - if (ops != null) { - operationIds.foreach(ops.remove) - if (ops.isEmpty) activeWork.remove(workspace, ops): Unit - } + if (ops != null) operationIds.foreach(ops.remove) } /** Get all active operations for a workspace. */ diff --git a/bleep-bsp/src/scala/bleep/bsp/SourceGenRunner.scala b/bleep-bsp/src/scala/bleep/bsp/SourceGenRunner.scala index d7f83f6f2..ca6feb2f1 100644 --- a/bleep-bsp/src/scala/bleep/bsp/SourceGenRunner.scala +++ b/bleep-bsp/src/scala/bleep/bsp/SourceGenRunner.scala @@ -258,15 +258,20 @@ object SourceGenRunner { * * Uses cats-effect Semaphore instead of ReentrantLock because IO fibers can switch threads between lock/unlock, causing IllegalMonitorStateException with * ReentrantLock. + * + * Keyed on `(scriptProject, mainClass)` rather than `mainClass` alone — two different script projects can legitimately share a main-class name (e.g. both + * declare a `scripts.Generate`), and serializing across project boundaries because of that coincidence would block independent work. */ - private val scriptSemaphores = new ConcurrentHashMap[String, cats.effect.std.Semaphore[IO]]() + private case class ScriptKey(scriptProject: CrossProjectName, mainClass: String) + + private val scriptSemaphores = new ConcurrentHashMap[ScriptKey, cats.effect.std.Semaphore[IO]]() - private def getScriptSemaphore(scriptMain: String): IO[cats.effect.std.Semaphore[IO]] = - IO.delay(scriptSemaphores.get(scriptMain)).flatMap { + private def getScriptSemaphore(key: ScriptKey): IO[cats.effect.std.Semaphore[IO]] = + IO.delay(scriptSemaphores.get(key)).flatMap { case null => cats.effect.std.Semaphore[IO](1).flatMap { sem => IO.delay { - val existing = scriptSemaphores.putIfAbsent(scriptMain, sem) + val existing = scriptSemaphores.putIfAbsent(key, sem) if (existing != null) existing else sem } } @@ -343,7 +348,7 @@ object SourceGenRunner { killSignal: Deferred[IO, KillReason], listener: SourceGenListener ): IO[Option[String]] = - getScriptSemaphore(script.main).flatMap { sem => + getScriptSemaphore(ScriptKey(script.project, script.main)).flatMap { sem => sem.permit.use { _ => val stillNeeded = projectsNeedingRegeneration(started, script, forProjects) if (stillNeeded.isEmpty) { diff --git a/bleep-bsp/src/scala/bleep/bsp/TaskDag.scala b/bleep-bsp/src/scala/bleep/bsp/TaskDag.scala index 0b71798fa..c1544c797 100644 --- a/bleep-bsp/src/scala/bleep/bsp/TaskDag.scala +++ b/bleep-bsp/src/scala/bleep/bsp/TaskDag.scala @@ -938,11 +938,16 @@ object TaskDag { } case tt: TestSuiteTask => - // Make test execution uncancelable so it always completes and reports status. - // Without this, fiber cancellation could abort mid-test and leave no status event. - IO.uncancelable { _ => - withRecovery(s"Test ${tt.suiteName.value}", taskKill)(handlers.test(tt, taskKill)) - } + // Tests handle their own cancellation: the kill-signal Deferred (`taskKill`) + // is racked by handlers.test internally (e.g. TestRunner.runSuite races + // suite-execution vs idle-timeout vs killSignal.get). `withRecovery` catches + // fiber cancellation via outcome.embed and emits a Killed TaskResult, and the + // outer executeTask still runs the TaskFinished emit after — so a cancelled + // test reports a structured Killed status without us blocking cancellation + // entirely. Previously this branch wrapped the whole thing in IO.uncancelable + // "so status events always fire", but that meant a wedged test framework + // could pin the BSP fiber indefinitely and block server shutdown. + withRecovery(s"Test ${tt.suiteName.value}", taskKill)(handlers.test(tt, taskKill)) case sgt: SourcegenTask => withRecovery(s"Sourcegen ${sgt.script.main}", taskKill) { @@ -1017,13 +1022,18 @@ object TaskDag { _ <- dagRef.update(_.kill(task.id)) } yield () - // Use Deferred signaling instead of polling for task completion. - // The signalRef holds the current Deferred so running tasks can always signal the latest one. + // Coalescing wakeup channel. Every task-completion does `wakeup.tryOffer(())` — non-blocking, + // dropped if a wakeup is already pending (no point queuing N wakeups when the loop will + // re-read everything anyway). The loop `take`s one wakeup per iteration. This replaces the + // prior pattern of rotating a Deferred under a Ref, which conflated "wake the loop" with + // "broadcast a signal", had a race window where completions could land between the rotate + // and the next get, and made the deadlock-detection path fire spurious false positives on + // very fast no-op tasks. def loop( dagRef: Ref[IO, Dag], runningRef: Ref[IO, Set[TaskId]], taskKillSignals: Ref[IO, Map[TaskId, Deferred[IO, KillReason]]], - signalRef: Ref[IO, Deferred[IO, Unit]], + wakeup: Queue[IO, Unit], supervisor: cats.effect.std.Supervisor[IO] ): IO[Unit] = for { @@ -1048,12 +1058,9 @@ object TaskDag { killTask(dag.tasks(taskId), maybeKilled.get, dagRef) } } else if (maybeKilled.isDefined) { - // Kill requested but tasks still running - wait for signal + // Kill requested but tasks still running - wait for any to complete IO(System.err.println(s"[DAG] Kill requested (${maybeKilled.get}), waiting for ${running.size} running tasks: ${running.mkString(", ")}")) >> - signalRef.get.flatMap(_.get) >> - Deferred[IO, Unit].flatMap { newSignal => - signalRef.set(newSignal) >> loop(dagRef, runningRef, taskKillSignals, signalRef, supervisor) - } + wakeup.take >> loop(dagRef, runningRef, taskKillSignals, wakeup, supervisor) } else { // Normal execution // Skip tasks with failed dependencies @@ -1067,59 +1074,42 @@ object TaskDag { depCounts = dag.dependentsCount availableSlots = maxParallelism - running.size tasksToStart = readyTasks.toList.sortBy(t => -depCounts.getOrElse(t.id, 0)).take(availableSlots) - // Start tasks - they will signal completion via the signalRef + // Start tasks. The guarantee fires both runningRef cleanup and a wakeup; the + // wakeup is `tryOffer` so concurrent completions coalesce on the bounded(1) queue. _ <- tasksToStart.toList.parTraverse_ { task => runningRef.update(_ + task.id) >> supervisor .supervise( executeTask(task, dagRef, taskKillSignals) .guarantee( - runningRef.update(_ - task.id) >> - // Signal the CURRENT deferred (read from ref to avoid stale reference). - // .attempt handles already-completed Deferred (multiple tasks finishing concurrently). - signalRef.get.flatMap(_.complete(()).attempt.void) + runningRef.update(_ - task.id) >> wakeup.tryOffer(()).void ) ) .void } - // If no tasks are running, we're done or in error state + // Re-read state. If nothing is running, the DAG is either complete, in a + // transient gap (skips just opened up new ready tasks), or genuinely stuck. + newDag <- dagRef.get newRunning <- runningRef.get _ <- - if (newRunning.isEmpty) { - // Nothing running - check if we're complete or stuck - dagRef.get.flatMap { newDag => - if (newDag.isComplete) IO.unit - else { - // Check if we're actually stuck: no ready tasks and nothing to skip - val stillReady = newDag.ready - val stillToSkip = newDag.toSkip - if (stillReady.isEmpty && stillToSkip.isEmpty) { - // Deadlock: nothing running, nothing ready, nothing to skip, but DAG not complete - val remaining = newDag.tasks.keySet -- newDag.finished - val stuckDetails = remaining.toList.map { taskId => - val task = newDag.tasks(taskId) - val unsatisfied = task.dependencies.filterNot(newDag.finished.contains) - s" $taskId (waiting for: ${unsatisfied.mkString(", ")})" - } - IO.raiseError( - new RuntimeException( - s"DAG deadlock: ${remaining.size} tasks stuck:\n${stuckDetails.mkString("\n")}" - ) - ) - } else { - // Some tasks became ready (e.g., from skipping) — retry - Deferred[IO, Unit].flatMap { newSignal => - signalRef.set(newSignal) >> loop(dagRef, runningRef, taskKillSignals, signalRef, supervisor) - } - } - } + if (newDag.isComplete) IO.unit + else if (newRunning.isEmpty && newDag.ready.isEmpty && newDag.toSkip.isEmpty) { + val remaining = newDag.tasks.keySet -- newDag.finished + val stuckDetails = remaining.toList.map { taskId => + val task = newDag.tasks(taskId) + val unsatisfied = task.dependencies.filterNot(newDag.finished.contains) + s" $taskId (waiting for: ${unsatisfied.mkString(", ")})" } + IO.raiseError( + new RuntimeException( + s"DAG deadlock: ${remaining.size} tasks stuck:\n${stuckDetails.mkString("\n")}" + ) + ) + } else if (newRunning.isEmpty) { + // No tasks running but progress still possible — re-evaluate without waiting. + loop(dagRef, runningRef, taskKillSignals, wakeup, supervisor) } else { - // Wait for any task to complete, then create fresh signal and continue - signalRef.get.flatMap(_.get) >> - Deferred[IO, Unit].flatMap { newSignal => - signalRef.set(newSignal) >> loop(dagRef, runningRef, taskKillSignals, signalRef, supervisor) - } + wakeup.take >> loop(dagRef, runningRef, taskKillSignals, wakeup, supervisor) } } yield () } @@ -1133,9 +1123,8 @@ object TaskDag { dagRef <- Ref.of[IO, Dag](initialDag) runningRef <- Ref.of[IO, Set[TaskId]](Set.empty) taskKillSignals <- Ref.of[IO, Map[TaskId, Deferred[IO, KillReason]]](Map.empty) - initialSignal <- Deferred[IO, Unit] - signalRef <- Ref.of[IO, Deferred[IO, Unit]](initialSignal) - _ <- loop(dagRef, runningRef, taskKillSignals, signalRef, supervisor) + wakeup <- Queue.bounded[IO, Unit](1) + _ <- loop(dagRef, runningRef, taskKillSignals, wakeup, supervisor) finalDag <- dagRef.get } yield finalDag } diff --git a/bleep-bsp/src/scala/bleep/bsp/TestRunner.scala b/bleep-bsp/src/scala/bleep/bsp/TestRunner.scala index 4e8aed2d3..fe35f2842 100644 --- a/bleep-bsp/src/scala/bleep/bsp/TestRunner.scala +++ b/bleep-bsp/src/scala/bleep/bsp/TestRunner.scala @@ -257,10 +257,16 @@ object TestRunner { // just sees "Suite idle timeout after 120s" with no idea what the JVM was doing. // jstack writes to its own stdout, decoupled from the test JVM's stdio, so the // protocol stream doesn't get polluted. + // + // Bound the dump itself to 5s — jstack attaches via the JVM tool interface, which + // can stall during a long GC pause or kernel signal handling. Without this cap a + // wedged JVM keeps the TimedOut → SuiteTimedOut protocol event from ever firing, + // turning a "suite stuck" into "BSP appears stuck". def captureThreadDump: IO[Option[String]] = - jvm.dumpThreads.attempt.map { - case Right(lines) if lines.nonEmpty => Some(lines.mkString("\n")) - case _ => None + IO.race(jvm.dumpThreads.attempt, IO.sleep(5.seconds)).map { + case Left(Right(lines)) if lines.nonEmpty => Some(lines.mkString("\n")) + case Left(_) => None + case Right(_) => Some("(thread dump timed out after 5s — JVM unresponsive)") } // NOTE: For timeout/kill/error cases, we do NOT emit SuiteFinished here.