From a41807b20ca8859c3f0f9f8db9a6f5abc7610f6c Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 11 Mar 2026 15:53:33 +0100 Subject: [PATCH 1/4] #716 Move unit tests out of 'tests' package and use the same package as classes being tested. --- .../pramen/extras/{tests => }/avro/AvroUtilsSuite.scala | 7 +++---- .../builtin => }/infofile/InfoFileGenerationSuite.scala | 2 +- .../extras/{tests => }/sink/EnceladusConfigSuite.scala | 3 +-- .../extras/{tests => }/sink/EnceladusSinkSuite.scala | 3 +-- .../extras/{tests => }/sink/EnceladusUtilsSuite.scala | 5 ++--- .../{tests => }/sink/StandardizationConfigSuite.scala | 3 +-- .../extras/{tests => }/sink/StandardizationSinkSuite.scala | 3 +-- .../extras/{tests => }/writer/TableWriterKafkaSuite.scala | 3 +-- .../writer/model/KafkaAvroWriterConfigSuite.scala | 2 +- .../{tests => }/writer/model/NamingStrategySuite.scala | 2 +- 10 files changed, 13 insertions(+), 20 deletions(-) rename pramen/extras/src/test/scala/za/co/absa/pramen/extras/{tests => }/avro/AvroUtilsSuite.scala (97%) rename pramen/extras/src/test/scala/za/co/absa/pramen/extras/{tests/builtin => }/infofile/InfoFileGenerationSuite.scala (99%) rename pramen/extras/src/test/scala/za/co/absa/pramen/extras/{tests => }/sink/EnceladusConfigSuite.scala (97%) rename pramen/extras/src/test/scala/za/co/absa/pramen/extras/{tests => }/sink/EnceladusSinkSuite.scala (98%) rename pramen/extras/src/test/scala/za/co/absa/pramen/extras/{tests => }/sink/EnceladusUtilsSuite.scala (99%) rename pramen/extras/src/test/scala/za/co/absa/pramen/extras/{tests => }/sink/StandardizationConfigSuite.scala (97%) rename pramen/extras/src/test/scala/za/co/absa/pramen/extras/{tests => }/sink/StandardizationSinkSuite.scala (98%) rename pramen/extras/src/test/scala/za/co/absa/pramen/extras/{tests => }/writer/TableWriterKafkaSuite.scala (97%) rename pramen/extras/src/test/scala/za/co/absa/pramen/extras/{tests => }/writer/model/KafkaAvroWriterConfigSuite.scala (98%) rename pramen/extras/src/test/scala/za/co/absa/pramen/extras/{tests => }/writer/model/NamingStrategySuite.scala (98%) diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/avro/AvroUtilsSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/avro/AvroUtilsSuite.scala similarity index 97% rename from pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/avro/AvroUtilsSuite.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/avro/AvroUtilsSuite.scala index 8e0f1e111..2cfbf37b2 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/avro/AvroUtilsSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/avro/AvroUtilsSuite.scala @@ -14,16 +14,15 @@ * limitations under the License. */ -package za.co.absa.pramen.extras.tests.avro +package za.co.absa.pramen.extras.avro import org.apache.spark.sql.functions.struct import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.extras.NestedDataFrameFactory -import za.co.absa.pramen.extras.utils.ResourceUtils.getResourceString -import za.co.absa.pramen.extras.utils.JsonUtils -import za.co.absa.pramen.extras.avro.AvroUtils import za.co.absa.pramen.extras.base.SparkTestBase import za.co.absa.pramen.extras.fixtures.TextComparisonFixture +import za.co.absa.pramen.extras.utils.JsonUtils +import za.co.absa.pramen.extras.utils.ResourceUtils.getResourceString class AvroUtilsSuite extends AnyWordSpec with SparkTestBase with TextComparisonFixture { import spark.implicits._ diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/builtin/infofile/InfoFileGenerationSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/infofile/InfoFileGenerationSuite.scala similarity index 99% rename from pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/builtin/infofile/InfoFileGenerationSuite.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/infofile/InfoFileGenerationSuite.scala index 8dd3063cb..9394de517 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/builtin/infofile/InfoFileGenerationSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/infofile/InfoFileGenerationSuite.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.pramen.extras.tests.builtin.infofile +package za.co.absa.pramen.extras.infofile import com.typesafe.config.{Config, ConfigFactory} import org.apache.hadoop.fs.Path diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/EnceladusConfigSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusConfigSuite.scala similarity index 97% rename from pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/EnceladusConfigSuite.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusConfigSuite.scala index 2b4782ec4..f6e7b54de 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/EnceladusConfigSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusConfigSuite.scala @@ -14,12 +14,11 @@ * limitations under the License. */ -package za.co.absa.pramen.extras.tests.sink +package za.co.absa.pramen.extras.sink import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.extras.base.SparkTestBase -import za.co.absa.pramen.extras.sink.EnceladusConfig import java.time.ZoneId diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/EnceladusSinkSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusSinkSuite.scala similarity index 98% rename from pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/EnceladusSinkSuite.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusSinkSuite.scala index 57b750867..f41becec2 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/EnceladusSinkSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusSinkSuite.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.pramen.extras.tests.sink +package za.co.absa.pramen.extras.sink import com.typesafe.config.ConfigFactory import org.apache.hadoop.fs.Path @@ -23,7 +23,6 @@ import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.extras.base.SparkTestBase import za.co.absa.pramen.extras.fixtures.{TempDirFixture, TextComparisonFixture} import za.co.absa.pramen.extras.mocks.QueryExecutorSpy -import za.co.absa.pramen.extras.sink.EnceladusSink import za.co.absa.pramen.extras.sink.EnceladusSink.{DATASET_NAME_KEY, DATASET_VERSION_KEY, HIVE_TABLE_KEY} import za.co.absa.pramen.extras.utils.FsUtils diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/EnceladusUtilsSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusUtilsSuite.scala similarity index 99% rename from pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/EnceladusUtilsSuite.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusUtilsSuite.scala index 5ef236b32..3f4d88195 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/EnceladusUtilsSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusUtilsSuite.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.pramen.extras.tests.sink +package za.co.absa.pramen.extras.sink import org.apache.hadoop.fs.Path import org.apache.spark.sql.DataFrame @@ -22,15 +22,14 @@ import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.extras.base.SparkTestBase import za.co.absa.pramen.extras.fixtures.{TempDirFixture, TextComparisonFixture} import za.co.absa.pramen.extras.mocks.QueryExecutorSpy -import za.co.absa.pramen.extras.sink.EnceladusUtils import za.co.absa.pramen.extras.utils.FsUtils import java.nio.file.{Files, Paths} import java.time.LocalDate class EnceladusUtilsSuite extends AnyWordSpec with SparkTestBase with TextComparisonFixture with TempDirFixture { - import za.co.absa.pramen.extras.sink.InfoVersionStatus._ import spark.implicits._ + import za.co.absa.pramen.extras.sink.InfoVersionStatus._ private val infoDate = LocalDate.of(2022, 2, 18) private val rawPathPattern = "{year}/{month}/{day}/v{version}" diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/StandardizationConfigSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/StandardizationConfigSuite.scala similarity index 97% rename from pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/StandardizationConfigSuite.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/StandardizationConfigSuite.scala index 39a065c36..4b667e77b 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/StandardizationConfigSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/StandardizationConfigSuite.scala @@ -14,12 +14,11 @@ * limitations under the License. */ -package za.co.absa.pramen.extras.tests.sink +package za.co.absa.pramen.extras.sink import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.extras.base.SparkTestBase -import za.co.absa.pramen.extras.sink.StandardizationConfig import java.time.ZoneId diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/StandardizationSinkSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/StandardizationSinkSuite.scala similarity index 98% rename from pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/StandardizationSinkSuite.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/StandardizationSinkSuite.scala index 4d667db59..f540d2a1f 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/sink/StandardizationSinkSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/StandardizationSinkSuite.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.pramen.extras.tests.sink +package za.co.absa.pramen.extras.sink import com.typesafe.config.{ConfigFactory, ConfigValueFactory} import org.apache.hadoop.fs.Path @@ -24,7 +24,6 @@ import za.co.absa.pramen.core.utils.hive.{HiveHelperSql, HiveQueryTemplates} import za.co.absa.pramen.extras.base.SparkTestBase import za.co.absa.pramen.extras.fixtures.{TempDirFixture, TextComparisonFixture} import za.co.absa.pramen.extras.mocks.QueryExecutorMock -import za.co.absa.pramen.extras.sink.{StandardizationConfig, StandardizationSink} import za.co.absa.pramen.extras.utils.FsUtils import java.nio.file.{Files, Paths} diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/TableWriterKafkaSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/TableWriterKafkaSuite.scala similarity index 97% rename from pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/TableWriterKafkaSuite.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/TableWriterKafkaSuite.scala index ade17c2f9..720953afb 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/TableWriterKafkaSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/TableWriterKafkaSuite.scala @@ -14,12 +14,11 @@ * limitations under the License. */ -package za.co.absa.pramen.extras.tests.writer +package za.co.absa.pramen.extras.writer import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.extras.base.SparkTestBase -import za.co.absa.pramen.extras.writer.TableWriterKafka class TableWriterKafkaSuite extends AnyWordSpec with SparkTestBase { "TableWriterKafka" should { diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/model/KafkaAvroWriterConfigSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfigSuite.scala similarity index 98% rename from pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/model/KafkaAvroWriterConfigSuite.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfigSuite.scala index c8d378e21..a116dc122 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/model/KafkaAvroWriterConfigSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfigSuite.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.pramen.extras.tests.writer.model +package za.co.absa.pramen.extras.writer.model import com.typesafe.config.{ConfigFactory, ConfigValueFactory} import org.scalatest.wordspec.AnyWordSpec diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/model/NamingStrategySuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/NamingStrategySuite.scala similarity index 98% rename from pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/model/NamingStrategySuite.scala rename to pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/NamingStrategySuite.scala index 20c3d5882..2210ba31d 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/tests/writer/model/NamingStrategySuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/NamingStrategySuite.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package za.co.absa.pramen.extras.tests.writer.model +package za.co.absa.pramen.extras.writer.model import com.typesafe.config.ConfigException.Missing import com.typesafe.config.ConfigFactory From d4d4c6ffa2ca093edc0e409bbadc22f1ad68f411 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 12 Mar 2026 10:09:40 +0100 Subject: [PATCH 2/4] #716 Move Schema registry options under 'option' block because some options have 'schema.registry' prefix and some are not. This would allow users to decide. --- README.md | 28 ++-- .../extras/writer/model/KafkaAvroConfig.scala | 5 +- .../writer/model/KafkaAvroConfigSuite.scala | 134 ++++++++++++++++++ 3 files changed, 151 insertions(+), 16 deletions(-) create mode 100644 pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfigSuite.scala diff --git a/README.md b/README.md index 1e81924e1..f5d7b7508 100644 --- a/README.md +++ b/README.md @@ -982,12 +982,14 @@ pramen.sources = [ value.naming.strategy = "topic.name" #key.naming.strategy = "topic.name" - # Arbitrary options for Schema registry - basic.auth.credentials.source = "..." - basic.auth.user.info = "..." - ssl.truststore.location = "..." - ssl.truststore.password = "..." - ssl.truststore.type = "..." + option { + # Arbitrary options for Schema registry + basic.auth.credentials.source = "..." + basic.auth.user.info = "..." + ssl.truststore.location = "..." + ssl.truststore.password = "..." + ssl.truststore.type = "..." + } } } ] @@ -1055,12 +1057,14 @@ pramen.sinks = [ url = "https://my.schema.registry:8081" value.naming.strategy = "topic.name" - # Arbitrary options for Schema registry - basic.auth.credentials.source = "..." - basic.auth.user.info = "..." - ssl.truststore.location = "..." - ssl.truststore.password = "..." - ssl.truststore.type = "..." + option { + # Arbitrary options for Schema registry + basic.auth.credentials.source = "..." + basic.auth.user.info = "..." + ssl.truststore.location = "..." + ssl.truststore.password = "..." + ssl.truststore.type = "..." + } } } ] diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala index f00d9f928..737d43b8b 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala @@ -35,7 +35,7 @@ object KafkaAvroConfig { val SCHEMA_REGISTRY_URL = "schema.registry.url" val SCHEMA_REGISTRY_KEY_PREFIX = "schema.registry.key" val SCHEMA_REGISTRY_VALUE_PREFIX = "schema.registry.value" - val SCHEMA_EXTRA_OPTIONS = "schema.registry" + val SCHEMA_EXTRA_OPTIONS = "schema.registry.option" val KAFKA_EXTRA_OPTIONS = "kafka" def fromConfig(conf: Config): KafkaAvroConfig = { @@ -58,9 +58,6 @@ object KafkaAvroConfig { } val schemaRegistryExtraProperties = ConfigUtils.getExtraOptions(conf, SCHEMA_EXTRA_OPTIONS) - .map { - case (k, v) => (s"$SCHEMA_EXTRA_OPTIONS.$k", v) - } KafkaAvroConfig( brokers = conf.getString(KAFKA_BROKERS_KEY), diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfigSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfigSuite.scala new file mode 100644 index 000000000..9c9dad053 --- /dev/null +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfigSuite.scala @@ -0,0 +1,134 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.extras.writer.model + +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpec + +class KafkaAvroConfigSuite extends AnyWordSpec { + "fromConfig" should { + "read a minimalistic config with brokers and schema registry URL" in { + val conf = ConfigFactory.parseString( + """kafka.bootstrap.servers = "localhost:9092" + |schema.registry.url = "localhost:8081" + |schema.registry.value.naming.strategy = "topic.name" + |""".stripMargin) + + val kafkaConfig = KafkaAvroConfig.fromConfig(conf) + + assert(kafkaConfig.brokers == "localhost:9092") + assert(kafkaConfig.schemaRegistryUrl == "localhost:8081") + assert(kafkaConfig.valueNamingStrategy.namingStrategy == "topic.name") + assert(kafkaConfig.keyNamingStrategy.isEmpty) + } + + "read a config with key naming strategy" in { + val conf = ConfigFactory.parseString( + """kafka.bootstrap.servers = "localhost:9092" + |schema.registry.url = "localhost:8081" + |schema.registry.value.naming.strategy = "topic.name" + |schema.registry.key.naming.strategy = "topic.name" + |""".stripMargin) + + val kafkaConfig = KafkaAvroConfig.fromConfig(conf) + + assert(kafkaConfig.brokers == "localhost:9092") + assert(kafkaConfig.schemaRegistryUrl == "localhost:8081") + assert(kafkaConfig.valueNamingStrategy.namingStrategy == "topic.name") + assert(kafkaConfig.keyNamingStrategy.isDefined) + assert(kafkaConfig.keyNamingStrategy.get.namingStrategy == "topic.name") + } + + "read a config with record.name naming strategy" in { + val conf = ConfigFactory.parseString( + """kafka.bootstrap.servers = "localhost:9092" + |schema.registry.url = "localhost:8081" + |schema.registry.value { + | naming.strategy = "record.name" + | schema.record.name = "MyRecord" + | schema.record.namespace = "com.example" + |} + |""".stripMargin) + + val kafkaConfig = KafkaAvroConfig.fromConfig(conf) + + assert(kafkaConfig.brokers == "localhost:9092") + assert(kafkaConfig.schemaRegistryUrl == "localhost:8081") + assert(kafkaConfig.valueNamingStrategy.namingStrategy == "record.name") + assert(kafkaConfig.valueNamingStrategy.recordName.contains("MyRecord")) + assert(kafkaConfig.valueNamingStrategy.recordNamespace.contains("com.example")) + } + + "read a config with extra auth properties" in { + val conf = ConfigFactory.parseString( + """kafka.bootstrap.servers = "localhost:9092" + |schema.registry.url = "localhost:8081" + |schema.registry.value.naming.strategy = "topic.name" + |schema.registry.option { + | basic.auth.credentials.source = "USER_INFO" + | basic.auth.user.info = "test:12345" + | ssl.truststore.location = "cacerts" + | ssl.truststore.type = "JKS" + |} + |""".stripMargin) + + val kafkaConfig = KafkaAvroConfig.fromConfig(conf) + + assert(kafkaConfig.brokers == "localhost:9092") + assert(kafkaConfig.schemaRegistryUrl == "localhost:8081") + assert(kafkaConfig.valueNamingStrategy.namingStrategy == "topic.name") + assert(kafkaConfig.keyNamingStrategy.isEmpty) + assert(kafkaConfig.schemaRegistryExtraOptions("basic.auth.credentials.source") == "USER_INFO") + assert(kafkaConfig.schemaRegistryExtraOptions("basic.auth.user.info") == "test:12345") + assert(kafkaConfig.schemaRegistryExtraOptions("ssl.truststore.location") == "cacerts") + assert(kafkaConfig.schemaRegistryExtraOptions("ssl.truststore.type") == "JKS") + } + + "throw an exception when bootstrap servers are missing" in { + val conf = ConfigFactory.parseString( + """schema.registry.url = "localhost:8081" + |schema.registry.value.naming.strategy = "topic.name" + |""".stripMargin) + + intercept[com.typesafe.config.ConfigException.Missing] { + KafkaAvroConfig.fromConfig(conf) + } + } + + "throw an exception when schema registry URL is missing" in { + val conf = ConfigFactory.parseString( + """kafka.bootstrap.servers = "localhost:9092" + |schema.registry.value.naming.strategy = "topic.name" + |""".stripMargin) + + intercept[com.typesafe.config.ConfigException.Missing] { + KafkaAvroConfig.fromConfig(conf) + } + } + + "throw an exception when value naming strategy is missing" in { + val conf = ConfigFactory.parseString( + """kafka.bootstrap.servers = "localhost:9092" + |schema.registry.url = "localhost:8081" + |""".stripMargin) + + intercept[com.typesafe.config.ConfigException.Missing] { + KafkaAvroConfig.fromConfig(conf) + } + } + } +} From 4d78acdebeea4062b1fbef358c1756c493c62b05 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 12 Mar 2026 11:52:43 +0100 Subject: [PATCH 3/4] #716 Fix config examples and which options are passed to the schema registry. --- README.md | 34 +++++++++++++++---- .../extras/source/KafkaAvroSource.scala | 2 ++ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index f5d7b7508..0469876f7 100644 --- a/README.md +++ b/README.md @@ -975,6 +975,14 @@ pramen.sources = [ sasl.jaas.config = "..." sasl.mechanism = "..." security.protocol = "..." + + ssl.truststore.location = "..." + ssl.truststore.password = "..." + ssl.truststore.type = "..." + ssl.keystore.location = "..." + ssl.keystore.password = "..." + ssl.keystore.type = "..." + ssl.key.password = "..." } schema.registry { @@ -986,9 +994,12 @@ pramen.sources = [ # Arbitrary options for Schema registry basic.auth.credentials.source = "..." basic.auth.user.info = "..." - ssl.truststore.location = "..." - ssl.truststore.password = "..." - ssl.truststore.type = "..." + schema.registry.ssl.truststore.location = "..." + schema.registry.ssl.truststore.password = "..." + schema.registry.ssl.truststore.type = "..." + schema.registry.ssl.keystore.location = "..." + schema.registry.ssl.keystore.password = "..." + schema.registry.ssl.keystore.type = "..." } } } @@ -1051,6 +1062,14 @@ pramen.sinks = [ sasl.jaas.config = "..." sasl.mechanism = "..." security.protocol = "..." + + ssl.truststore.location = "..." + ssl.truststore.password = "..." + ssl.truststore.type = "..." + ssl.keystore.location = "..." + ssl.keystore.password = "..." + ssl.keystore.type = "..." + ssl.key.password = "..." } schema.registry { @@ -1061,9 +1080,12 @@ pramen.sinks = [ # Arbitrary options for Schema registry basic.auth.credentials.source = "..." basic.auth.user.info = "..." - ssl.truststore.location = "..." - ssl.truststore.password = "..." - ssl.truststore.type = "..." + schema.registry.ssl.truststore.location = "..." + schema.registry.ssl.truststore.password = "..." + schema.registry.ssl.truststore.type = "..." + schema.registry.ssl.keystore.location = "..." + schema.registry.ssl.keystore.password = "..." + schema.registry.ssl.keystore.type = "..." } } } diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala index 9862ff992..2458028bb 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala @@ -201,6 +201,8 @@ class KafkaAvroSource(sourceConfig: Config, AbrisConfig.SCHEMA_REGISTRY_URL -> kafkaAvroConfig.schemaRegistryUrl ) ++ kafkaAvroConfig.schemaRegistryExtraOptions + ConfigUtils.logExtraOptions("Schema registry options", schemaRegistryClientConfig, Set("basic.auth.user.info")) + val abrisValueBase = AbrisConfig .fromConfluentAvro.downloadReaderSchemaByLatestVersion From bbb4a7d7188f1fbec8995973ab0e835bf12fd93b Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 12 Mar 2026 12:00:06 +0100 Subject: [PATCH 4/4] #716 Fix secrets redacting for Schema Registry options. --- .../scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala | 2 +- .../za/co/absa/pramen/extras/writer/TableWriterKafka.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala index 2458028bb..0e3674964 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala @@ -201,7 +201,7 @@ class KafkaAvroSource(sourceConfig: Config, AbrisConfig.SCHEMA_REGISTRY_URL -> kafkaAvroConfig.schemaRegistryUrl ) ++ kafkaAvroConfig.schemaRegistryExtraOptions - ConfigUtils.logExtraOptions("Schema registry options", schemaRegistryClientConfig, Set("basic.auth.user.info")) + ConfigUtils.logExtraOptions("Schema registry options", schemaRegistryClientConfig, KAFKA_TOKENS_TO_REDACT) val abrisValueBase = AbrisConfig .fromConfluentAvro.downloadReaderSchemaByLatestVersion diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala index 867aadbe3..46f621066 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala @@ -125,7 +125,7 @@ class TableWriterKafka(topicName: String, AbrisConfig.SCHEMA_REGISTRY_URL -> writerConf.kafkaAvroConfig.schemaRegistryUrl ) ++ writerConf.kafkaAvroConfig.schemaRegistryExtraOptions - ConfigUtils.logExtraOptions("Schema registry options", schemaRegistryClientConfig, Set("basic.auth.user.info")) + ConfigUtils.logExtraOptions("Schema registry options", schemaRegistryClientConfig, KAFKA_TOKENS_TO_REDACT) val schemaId = schemaIdOpt match { case Some(id) => id