From f60ef752aef06f3d6fdeb54a868ff4a8cdd182b9 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 12 Mar 2026 14:34:53 +0100 Subject: [PATCH] Allow specifying starting and ending offsets via the Kafka source options. --- .../absa/pramen/extras/source/KafkaAvroSource.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala index 0e367496..119873b0 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala @@ -155,6 +155,14 @@ class KafkaAvroSource(sourceConfig: Config, val q = "\"" + val proposedStartingOffset = kafkaAvroConfig.extraOptions + .get("kafka.startingOffsets") + .map(v => s"{$q$topic$q: $v}") + + val proposedEndingOffset = kafkaAvroConfig.extraOptions + .get("kafka.endingOffsets") + .map(v => s"{$q$topic$q: $v}") + val startingOffsets = offsetFromOpt match { case Some(offset) => // The starting offset is inclusive in Spark. @@ -169,7 +177,7 @@ class KafkaAvroSource(sourceConfig: Config, Map("startingOffsets" -> s"{$q$topic$q: ${offsetFrom.valueString}}") case None => - Map("startingOffsets" -> "earliest") + Map("startingOffsets" -> proposedStartingOffset.getOrElse("earliest")) } val endingOffsets = offsetToOpt match { @@ -180,7 +188,7 @@ class KafkaAvroSource(sourceConfig: Config, val offsetTo = offset.asInstanceOf[KafkaValue].increment Map("endingOffsets" -> s"{$q$topic$q: ${offsetTo.valueString}}") case None => - Map("endingOffsets" -> "latest") + Map("endingOffsets" -> proposedEndingOffset.getOrElse("latest")) } val kafkaOptions = kafkaAvroConfig.extraOptions ++ startingOffsets ++ endingOffsets +