Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ trait TokenLock extends AutoCloseable {
def tryAcquire(): Boolean

def release(): Unit

def token: String
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.lock

import za.co.absa.pramen.api.lock.TokenLock

class TokenLockAllow extends TokenLock {
class TokenLockAllow(override val token: String) extends TokenLock {
override def tryAcquire(): Boolean = true

override def release(): Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.util.control.NonFatal
*
* @param token the unique identifier for the lock (across multiple JVM processes and Spark jobs).
*/
abstract class TokenLockBase(token: String) extends TokenLock {
abstract class TokenLockBase(override val token: String) extends TokenLock {
import TokenLockBase._

private val log = LoggerFactory.getLogger(this.getClass)
Expand Down Expand Up @@ -65,7 +65,7 @@ abstract class TokenLockBase(token: String) extends TokenLock {
* Note: Unlike standard lock implementations, this returns false even when the current instance already owns the lock.
*/
override def tryAcquire(): Boolean = synchronized {
if (lockAcquired) {
val isAcquired = if (lockAcquired) {
false
} else {
if (tryAcquireGuardLock(lockAcquireRetries, 0)) {
Expand All @@ -79,6 +79,12 @@ abstract class TokenLockBase(token: String) extends TokenLock {
false
}
}

if (isAcquired) {
TokenLockRegistry.registerLock(this)
}

isAcquired
}

override def release(): Unit = {
Expand All @@ -94,8 +100,12 @@ abstract class TokenLockBase(token: String) extends TokenLock {
if (wasAcquired) {
watcherThreadOpt.foreach(_.interrupt())
watcherThreadOpt = None
releaseGuardLock()
JvmUtils.safeRemoveShutdownHook(shutdownHook)
try {
releaseGuardLock()
} finally {
JvmUtils.safeRemoveShutdownHook(shutdownHook)
TokenLockRegistry.unregisterLock(this)
}
log.info(s"Lock released: '$escapedToken'.")
}
}
Expand All @@ -104,7 +114,7 @@ abstract class TokenLockBase(token: String) extends TokenLock {
release()
}

protected def isAcquired: Boolean = synchronized {
private[core] def isAcquired: Boolean = synchronized {
lockAcquired
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ import za.co.absa.pramen.api.lock.{TokenLock, TokenLockFactory}

class TokenLockFactoryAllow extends TokenLockFactory {
override def getLock(token: String): TokenLock = {
new TokenLockAllow
new TokenLockAllow(token)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.lock

import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.lock.TokenLock

import java.util.concurrent.locks.ReentrantLock
import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal

object TokenLockRegistry {
private val log = LoggerFactory.getLogger(this.getClass)
private var currentLocks = new ListBuffer[TokenLock]
private val registryLock = new ReentrantLock()

private[core] def registerLock(lock: TokenLock): Unit = {
registryLock.lock()
try {
currentLocks += lock
} finally {
registryLock.unlock()
}
}

private[core] def unregisterLock(lock: TokenLock): Unit = {
registryLock.lock()
try {
currentLocks -= lock
} finally {
registryLock.unlock()
}
}

private[core] def releaseAllLocks(): Unit = {
registryLock.lock()
try {
// Making a copy because the `l.release()` can call `unregisterLock()` modifying
// the mutable list buffer while iterating.
val currentListCopy = currentLocks.toList
currentListCopy.foreach { l =>
try {
l.release()
} catch {
case NonFatal(ex) => log.warn(s"Unable to release the lock: ${l.token}", ex)
}
}
currentLocks.clear()
} finally {
registryLock.unlock()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import za.co.absa.pramen.core.app.config.RuntimeConfig.{DRY_RUN, EMAIL_IF_NO_CHA
import za.co.absa.pramen.core.app.config.{HookConfig, RuntimeConfig}
import za.co.absa.pramen.core.config.Keys.{GOOD_THROUGHPUT_RPS, WARN_THROUGHPUT_RPS}
import za.co.absa.pramen.core.exceptions.OsSignalException
import za.co.absa.pramen.core.lock.TokenLockRegistry
import za.co.absa.pramen.core.metastore.peristence.{TransientJobManager, TransientTableManager}
import za.co.absa.pramen.core.notify.PipelineNotificationTargetFactory
import za.co.absa.pramen.core.notify.pipeline.{PipelineNotification, PipelineNotificationEmail}
Expand Down Expand Up @@ -209,6 +210,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
runCustomShutdownHook()
removeSignalHandlers()
sendNotificationEmail()
TokenLockRegistry.releaseAllLocks()
}

private lazy val shutdownHook = new Thread() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.mocks.lock

import za.co.absa.pramen.api.lock.TokenLock

class TokenLockMock extends TokenLock {
class TokenLockMock(override val token: String = "mock") extends TokenLock {
var acquired = false

override def tryAcquire(): Boolean = this.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import za.co.absa.pramen.api.lock.TokenLock
import za.co.absa.pramen.core.fixtures.RelationalDbFixture
import za.co.absa.pramen.core.lock.TokenLockJdbc
import za.co.absa.pramen.core.lock.{TokenLockBase, TokenLockJdbc, TokenLockRegistry}
import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc}
import za.co.absa.pramen.core.reader.model.JdbcConfig
import za.co.absa.pramen.core.utils.UsingUtils
Expand Down Expand Up @@ -98,6 +98,14 @@ class TokenLockJdbcSuite extends AnyWordSpec with RelationalDbFixture with Befor
lock1.release()
}
}

"lock registry releases all locks" in {
val lock1 = getLock("token1")
assert(lock1.tryAcquire())

TokenLockRegistry.releaseAllLocks()
assert(!lock1.asInstanceOf[TokenLockBase].isAcquired)
}
}

private def getLock(token: String): TokenLock = {
Expand Down
Loading