From 7fa6b3e52d1d686042466710e05c8a3225f4792a Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 27 Feb 2026 12:52:34 +0100 Subject: [PATCH 1/2] #714 Release all locks when Pramen job exits abnormally. --- .../co/absa/pramen/api/lock/TokenLock.scala | 2 + .../pramen/core/lock/TokenLockAllow.scala | 2 +- .../absa/pramen/core/lock/TokenLockBase.scala | 13 +++- .../core/lock/TokenLockFactoryAllow.scala | 2 +- .../pramen/core/lock/TokenLockRegistry.scala | 68 +++++++++++++++++++ .../pramen/core/state/PipelineStateImpl.scala | 2 + .../core/mocks/lock/TokenLockMock.scala | 2 +- .../core/tests/lock/TokenLockJdbcSuite.scala | 10 ++- 8 files changed, 94 insertions(+), 7 deletions(-) create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLock.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLock.scala index f9bef01d..5caa4b44 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLock.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLock.scala @@ -41,4 +41,6 @@ trait TokenLock extends AutoCloseable { def tryAcquire(): Boolean def release(): Unit + + def token: String } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockAllow.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockAllow.scala index 9f3490f6..84d6a161 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockAllow.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockAllow.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.lock import za.co.absa.pramen.api.lock.TokenLock -class TokenLockAllow extends TokenLock { +class TokenLockAllow(override val token: String) extends TokenLock { override def tryAcquire(): Boolean = true override def release(): Unit = {} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala index 7f2bc723..823591db 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala @@ -32,7 +32,7 @@ import scala.util.control.NonFatal * * @param token the unique identifier for the lock (across multiple JVM processes and Spark jobs). */ -abstract class TokenLockBase(token: String) extends TokenLock { +abstract class TokenLockBase(override val token: String) extends TokenLock { import TokenLockBase._ private val log = LoggerFactory.getLogger(this.getClass) @@ -65,7 +65,7 @@ abstract class TokenLockBase(token: String) extends TokenLock { * Note: Unlike standard lock implementations, this returns false even when the current instance already owns the lock. */ override def tryAcquire(): Boolean = synchronized { - if (lockAcquired) { + val isAcquired = if (lockAcquired) { false } else { if (tryAcquireGuardLock(lockAcquireRetries, 0)) { @@ -79,6 +79,12 @@ abstract class TokenLockBase(token: String) extends TokenLock { false } } + + if (isAcquired) { + TokenLockRegistry.registerLock(this) + } + + isAcquired } override def release(): Unit = { @@ -96,6 +102,7 @@ abstract class TokenLockBase(token: String) extends TokenLock { watcherThreadOpt = None releaseGuardLock() JvmUtils.safeRemoveShutdownHook(shutdownHook) + TokenLockRegistry.unregisterLock(this) log.info(s"Lock released: '$escapedToken'.") } } @@ -104,7 +111,7 @@ abstract class TokenLockBase(token: String) extends TokenLock { release() } - protected def isAcquired: Boolean = synchronized { + private[core] def isAcquired: Boolean = synchronized { lockAcquired } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryAllow.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryAllow.scala index 1949ca40..ddd3d968 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryAllow.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryAllow.scala @@ -20,6 +20,6 @@ import za.co.absa.pramen.api.lock.{TokenLock, TokenLockFactory} class TokenLockFactoryAllow extends TokenLockFactory { override def getLock(token: String): TokenLock = { - new TokenLockAllow + new TokenLockAllow(token) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala new file mode 100644 index 00000000..0628f981 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala @@ -0,0 +1,68 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.lock + +import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.lock.TokenLock + +import java.util.concurrent.locks.ReentrantLock +import scala.collection.mutable.ListBuffer +import scala.util.control.NonFatal + +object TokenLockRegistry { + private val log = LoggerFactory.getLogger(this.getClass) + private var currentLocks = new ListBuffer[TokenLock] + private val registryLock = new ReentrantLock() + + private[core] def registerLock(lock: TokenLock): Unit = { + registryLock.lock() + try { + currentLocks += lock + } finally { + registryLock.unlock() + } + } + + private[core] def unregisterLock(lock: TokenLock): Unit = { + registryLock.lock() + try { + currentLocks -= lock + } finally { + registryLock.unlock() + } + } + + private[core] def releaseAllLocks(): Unit = { + registryLock.lock() + try { + // Making a copy because the `l.release()` can call `unregisterLock()` modifying + // the mutable list buffer while iterating. + val currentListCopy = currentLocks.toList + currentListCopy.foreach { l => + try { + l.release() + } catch { + case NonFatal(ex) => log.warn(s"Unable to release the lock: ${l.token}") + } + } + currentLocks.clear() + } finally { + registryLock.unlock() + } + } + +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index e2c9153e..6a7a18cf 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -26,6 +26,7 @@ import za.co.absa.pramen.core.app.config.RuntimeConfig.{DRY_RUN, EMAIL_IF_NO_CHA import za.co.absa.pramen.core.app.config.{HookConfig, RuntimeConfig} import za.co.absa.pramen.core.config.Keys.{GOOD_THROUGHPUT_RPS, WARN_THROUGHPUT_RPS} import za.co.absa.pramen.core.exceptions.OsSignalException +import za.co.absa.pramen.core.lock.TokenLockRegistry import za.co.absa.pramen.core.metastore.peristence.{TransientJobManager, TransientTableManager} import za.co.absa.pramen.core.notify.PipelineNotificationTargetFactory import za.co.absa.pramen.core.notify.pipeline.{PipelineNotification, PipelineNotificationEmail} @@ -209,6 +210,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification runCustomShutdownHook() removeSignalHandlers() sendNotificationEmail() + TokenLockRegistry.releaseAllLocks() } private lazy val shutdownHook = new Thread() { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/lock/TokenLockMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/lock/TokenLockMock.scala index 9d20da28..e04d0ada 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/lock/TokenLockMock.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/lock/TokenLockMock.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.mocks.lock import za.co.absa.pramen.api.lock.TokenLock -class TokenLockMock extends TokenLock { +class TokenLockMock(override val token: String = "mock") extends TokenLock { var acquired = false override def tryAcquire(): Boolean = this.synchronized { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala index 1d459108..507f1d6c 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import za.co.absa.pramen.api.lock.TokenLock import za.co.absa.pramen.core.fixtures.RelationalDbFixture -import za.co.absa.pramen.core.lock.TokenLockJdbc +import za.co.absa.pramen.core.lock.{TokenLockBase, TokenLockJdbc, TokenLockRegistry} import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc} import za.co.absa.pramen.core.reader.model.JdbcConfig import za.co.absa.pramen.core.utils.UsingUtils @@ -98,6 +98,14 @@ class TokenLockJdbcSuite extends AnyWordSpec with RelationalDbFixture with Befor lock1.release() } } + + "lock registry releases all locks" in { + val lock1 = getLock("token1") + assert(lock1.tryAcquire()) + + TokenLockRegistry.releaseAllLocks() + assert(!lock1.asInstanceOf[TokenLockBase].isAcquired) + } } private def getLock(token: String): TokenLock = { From 1ade7f559a5cc27f123867e135ce0cab4aa8eaaa Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 4 Mar 2026 09:07:13 +0100 Subject: [PATCH 2/2] #714 Protect against exceptions on lock release. --- .../za/co/absa/pramen/core/lock/TokenLockBase.scala | 9 ++++++--- .../za/co/absa/pramen/core/lock/TokenLockRegistry.scala | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala index 823591db..1d2f6a8e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala @@ -100,9 +100,12 @@ abstract class TokenLockBase(override val token: String) extends TokenLock { if (wasAcquired) { watcherThreadOpt.foreach(_.interrupt()) watcherThreadOpt = None - releaseGuardLock() - JvmUtils.safeRemoveShutdownHook(shutdownHook) - TokenLockRegistry.unregisterLock(this) + try { + releaseGuardLock() + } finally { + JvmUtils.safeRemoveShutdownHook(shutdownHook) + TokenLockRegistry.unregisterLock(this) + } log.info(s"Lock released: '$escapedToken'.") } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala index 0628f981..e9a5577a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala @@ -56,7 +56,7 @@ object TokenLockRegistry { try { l.release() } catch { - case NonFatal(ex) => log.warn(s"Unable to release the lock: ${l.token}") + case NonFatal(ex) => log.warn(s"Unable to release the lock: ${l.token}", ex) } } currentLocks.clear()