diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala index 0e367496..119873b0 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala @@ -155,6 +155,14 @@ class KafkaAvroSource(sourceConfig: Config, val q = "\"" + val proposedStartingOffset = kafkaAvroConfig.extraOptions + .get("kafka.startingOffsets") + .map(v => s"{$q$topic$q: $v}") + + val proposedEndingOffset = kafkaAvroConfig.extraOptions + .get("kafka.endingOffsets") + .map(v => s"{$q$topic$q: $v}") + val startingOffsets = offsetFromOpt match { case Some(offset) => // The starting offset is inclusive in Spark. @@ -169,7 +177,7 @@ class KafkaAvroSource(sourceConfig: Config, Map("startingOffsets" -> s"{$q$topic$q: ${offsetFrom.valueString}}") case None => - Map("startingOffsets" -> "earliest") + Map("startingOffsets" -> proposedStartingOffset.getOrElse("earliest")) } val endingOffsets = offsetToOpt match { @@ -180,7 +188,7 @@ class KafkaAvroSource(sourceConfig: Config, val offsetTo = offset.asInstanceOf[KafkaValue].increment Map("endingOffsets" -> s"{$q$topic$q: ${offsetTo.valueString}}") case None => - Map("endingOffsets" -> "latest") + Map("endingOffsets" -> proposedEndingOffset.getOrElse("latest")) } val kafkaOptions = kafkaAvroConfig.extraOptions ++ startingOffsets ++ endingOffsets +