diff --git a/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala b/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala index 73743406..c4adc9b0 100644 --- a/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala +++ b/admin/app/com/lucidchart/piezo/admin/controllers/Triggers.scala @@ -355,17 +355,17 @@ class Triggers( } } - def triggerJobOneTime(group: String, name: String, id: Long): Action[AnyContent] = Action { request => + def triggerJobOneTime(group: String, name: String, oneTimeJobId: Long): Action[AnyContent] = Action { request => val jobKey = new JobKey(name, group) if (scheduler.checkExists(jobKey)) { try { // Only run a trigger, if we haven't seen this id before - if (jobHistoryModel.addOneTimeJobIfNotExists(jobKey, id)) { + jobHistoryModel.addOneTimeJobIfNotExists(jobKey, oneTimeJobId).foreach { fireInstanceId => // Single run trigger has its id passed to the scheduler via the job-data-map. The WorkerJobListener will // use that id to update the existing record in job_history table // Piezo-admin expects job-data-map values for triggers to be stored as strings - val jobDataMap = jobHistoryModel.createJobDataMapForOneTimeJob(id.toString) + val jobDataMap = jobHistoryModel.createJobDataMapForOneTimeJob(fireInstanceId) scheduler.triggerJob(jobKey, jobDataMap) } Ok @@ -373,7 +373,7 @@ class Triggers( case e: SchedulerException => { logger.error( "Exception caught triggering job one-time %s %s - %s. -- %s" - .format(group, name, id, e.getLocalizedMessage), + .format(group, name, oneTimeJobId, e.getLocalizedMessage), e, ) InternalServerError diff --git a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala index b5fed937..72a15389 100644 --- a/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala +++ b/worker/src/main/scala/com/lucidchart/piezo/JobHistoryModel.scala @@ -23,16 +23,20 @@ class JobHistoryModel(getConnection: () => Connection) { // Trigger Group for records that aren't deletable private final val oneTimeJobTriggerGroup = "ONE_TIME_JOB" - final def oneTimeTriggerKey(fireInstanceId: Long): TriggerKey = - TriggerKey(fireInstanceId.toString, oneTimeJobTriggerGroup) + final def oneTimeTriggerKey(fireInstanceId: String): TriggerKey = + TriggerKey(fireInstanceId, oneTimeJobTriggerGroup) + + // Makes the job id unique per job, instead of globally unique + final def getFireInstanceIdFromOneTimeJobId(group: String, name: String, oneTimeJobId: Long): String = + s"${group}_${name}_$oneTimeJobId" // Methods to store the one-time-job id in a job-data-map final val jobDataMapOneTimeJobKey = "OneTimeJobId" final def getOneTimeJobIdFromDataMap(jobDataMap: JobDataMap): Option[String] = Option( jobDataMap.getString(jobDataMapOneTimeJobKey), ) - final def createJobDataMapForOneTimeJob(id: String): JobDataMap = new JobDataMap( - java.util.Map.of(jobDataMapOneTimeJobKey, id), + final def createJobDataMapForOneTimeJob(fireInstanceId: String): JobDataMap = new JobDataMap( + java.util.Map.of(jobDataMapOneTimeJobKey, fireInstanceId), ) def addJob( @@ -207,15 +211,18 @@ class JobHistoryModel(getConnection: () => Connection) { } /** - * Check if we have already triggered a one-time-job with the given trigger key and fireInstanceId. + * Check if we have already triggered a one-time-job with the given trigger key and fireInstanceId. Returns the + * fireInstanceId of the one-time-job, if it doesn't exist * * This is useful for seeing if a one-time job has already been triggered, to ensure that triggering a one-time job * with the same instance id is an idempotent operation. If the one-time job has not been triggered, the same * transaction is used to add the one-time-job to the database, to avoid race conditions */ - def addOneTimeJobIfNotExists(jobKey: JobKey, fireInstanceId: Long): Boolean = { + def addOneTimeJobIfNotExists(jobKey: JobKey, oneTimeJobId: Long): Option[String] = { val connection = getConnection() + // Make the job id unique for that job, instead of globally unique + val fireInstanceId: String = getFireInstanceIdFromOneTimeJobId(jobKey.getGroup, jobKey.getName, oneTimeJobId) // Use a trigger key that the database won't clean up in "JobHistoryCleanup" val triggerKey: TriggerKey = oneTimeTriggerKey(fireInstanceId) @@ -238,16 +245,19 @@ class JobHistoryModel(getConnection: () => Connection) { VALUES(?, ?, ?, ?, ?, ?, ?) """.stripMargin, ) - prepared.setString(1, fireInstanceId.toString) + prepared.setString(1, fireInstanceId) prepared.setString(2, jobKey.getName) prepared.setString(3, jobKey.getGroup) prepared.setString(4, triggerKey.getName) prepared.setString(5, triggerKey.getGroup) prepared.setBoolean(6, true) prepared.setObject(7, triggerStartTime) - // Check if we actually inserted the row, - // if we didn't, then the firs_instance_id was already in use - prepared.executeUpdate() > 0 + // Check if we actually inserted the row, to determine whether to return the fire_instance_id + if (prepared.executeUpdate() > 0) { + Some(fireInstanceId) + } else { + None + } } finally { connection.close() } diff --git a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala index 7f7af018..2d7d4d09 100644 --- a/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala +++ b/worker/src/test/scala/com/lucidchart/piezo/ModelTest.scala @@ -155,10 +155,10 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val jobKey = new JobKey("blahjz123", "blahzg123") val scheduledStart = java.time.Instant.now() val temporaryFireInstanceId = "FireInstanceId" - val permanentFireInstanceId = 123456789 - val permanentFireInstanceIdString = permanentFireInstanceId.toString + val permanentFireInstanceIdLong = 123456789 + val permanentFireInstanceIdString = jobHistoryModel.getFireInstanceIdFromOneTimeJobId(jobKey.getGroup, jobKey.getName, permanentFireInstanceIdLong) jobHistoryModel.addJob(temporaryFireInstanceId, jobKey, temporaryTriggerKey, scheduledStart, 1, true) - jobHistoryModel.addOneTimeJobIfNotExists(jobKey, permanentFireInstanceId) + jobHistoryModel.addOneTimeJobIfNotExists(jobKey, permanentFireInstanceIdLong) jobHistoryModel .getJob(jobKey) @@ -184,15 +184,16 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val jobHistoryModel = new JobHistoryModel(getConnection) val jobKey = new JobKey("blahjzasd", "blahzgasd") - val fireInstanceId: Long = 123123123 + val permanentFireInstanceIdLong = 123123123 + val fireInstanceId = jobHistoryModel.getFireInstanceIdFromOneTimeJobId(jobKey.getGroup, jobKey.getName, permanentFireInstanceIdLong) val combinedFutures: Future[Set[Boolean]] = Future.sequence( Set( Future { - jobHistoryModel.addOneTimeJobIfNotExists(jobKey, fireInstanceId) + jobHistoryModel.addOneTimeJobIfNotExists(jobKey, permanentFireInstanceIdLong).isDefined }, Future { - jobHistoryModel.addOneTimeJobIfNotExists(jobKey, fireInstanceId) + jobHistoryModel.addOneTimeJobIfNotExists(jobKey, permanentFireInstanceIdLong).isDefined }, ), ) @@ -207,7 +208,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { val fireTime = java.time.Instant.now().truncatedTo(SECONDS) val instanceDurationInMillis: Long = 3000 jobHistoryModel.completeOneTimeJob( - fireInstanceId.toString, + fireInstanceId, fireTime, instanceDurationInMillis, true, @@ -221,7 +222,7 @@ class ModelTest extends Specification with BeforeAll with AfterAll { .map(record => (record.fire_instance_id, record.finish.map(_.getEpochSecond))) must containTheSameElementsAs( List( ( - fireInstanceId.toString, + fireInstanceId, Some(expectedFinishSeconds), ), ),