Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -975,19 +975,32 @@ pramen.sources = [
sasl.jaas.config = "..."
sasl.mechanism = "..."
security.protocol = "..."

ssl.truststore.location = "..."
ssl.truststore.password = "..."
ssl.truststore.type = "..."
ssl.keystore.location = "..."
ssl.keystore.password = "..."
ssl.keystore.type = "..."
ssl.key.password = "..."
}

schema.registry {
url = "https://my.schema.registry:8081"
value.naming.strategy = "topic.name"
#key.naming.strategy = "topic.name"

# Arbitrary options for Schema registry
basic.auth.credentials.source = "..."
basic.auth.user.info = "..."
ssl.truststore.location = "..."
ssl.truststore.password = "..."
ssl.truststore.type = "..."
option {
# Arbitrary options for Schema registry
basic.auth.credentials.source = "..."
basic.auth.user.info = "..."
schema.registry.ssl.truststore.location = "..."
schema.registry.ssl.truststore.password = "..."
schema.registry.ssl.truststore.type = "..."
schema.registry.ssl.keystore.location = "..."
schema.registry.ssl.keystore.password = "..."
schema.registry.ssl.keystore.type = "..."
}
}
}
]
Expand Down Expand Up @@ -1049,18 +1062,31 @@ pramen.sinks = [
sasl.jaas.config = "..."
sasl.mechanism = "..."
security.protocol = "..."

ssl.truststore.location = "..."
ssl.truststore.password = "..."
ssl.truststore.type = "..."
ssl.keystore.location = "..."
ssl.keystore.password = "..."
ssl.keystore.type = "..."
ssl.key.password = "..."
}

schema.registry {
url = "https://my.schema.registry:8081"
value.naming.strategy = "topic.name"

# Arbitrary options for Schema registry
basic.auth.credentials.source = "..."
basic.auth.user.info = "..."
ssl.truststore.location = "..."
ssl.truststore.password = "..."
ssl.truststore.type = "..."
option {
# Arbitrary options for Schema registry
basic.auth.credentials.source = "..."
basic.auth.user.info = "..."
schema.registry.ssl.truststore.location = "..."
schema.registry.ssl.truststore.password = "..."
schema.registry.ssl.truststore.type = "..."
schema.registry.ssl.keystore.location = "..."
schema.registry.ssl.keystore.password = "..."
schema.registry.ssl.keystore.type = "..."
}
}
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ class KafkaAvroSource(sourceConfig: Config,
AbrisConfig.SCHEMA_REGISTRY_URL -> kafkaAvroConfig.schemaRegistryUrl
) ++ kafkaAvroConfig.schemaRegistryExtraOptions

ConfigUtils.logExtraOptions("Schema registry options", schemaRegistryClientConfig, KAFKA_TOKENS_TO_REDACT)

val abrisValueBase = AbrisConfig
.fromConfluentAvro.downloadReaderSchemaByLatestVersion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class TableWriterKafka(topicName: String,
AbrisConfig.SCHEMA_REGISTRY_URL -> writerConf.kafkaAvroConfig.schemaRegistryUrl
) ++ writerConf.kafkaAvroConfig.schemaRegistryExtraOptions

ConfigUtils.logExtraOptions("Schema registry options", schemaRegistryClientConfig, Set("basic.auth.user.info"))
ConfigUtils.logExtraOptions("Schema registry options", schemaRegistryClientConfig, KAFKA_TOKENS_TO_REDACT)

val schemaId = schemaIdOpt match {
case Some(id) => id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object KafkaAvroConfig {
val SCHEMA_REGISTRY_URL = "schema.registry.url"
val SCHEMA_REGISTRY_KEY_PREFIX = "schema.registry.key"
val SCHEMA_REGISTRY_VALUE_PREFIX = "schema.registry.value"
val SCHEMA_EXTRA_OPTIONS = "schema.registry"
val SCHEMA_EXTRA_OPTIONS = "schema.registry.option"
val KAFKA_EXTRA_OPTIONS = "kafka"

def fromConfig(conf: Config): KafkaAvroConfig = {
Expand All @@ -58,9 +58,6 @@ object KafkaAvroConfig {
}

val schemaRegistryExtraProperties = ConfigUtils.getExtraOptions(conf, SCHEMA_EXTRA_OPTIONS)
.map {
case (k, v) => (s"$SCHEMA_EXTRA_OPTIONS.$k", v)
}

KafkaAvroConfig(
brokers = conf.getString(KAFKA_BROKERS_KEY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
* limitations under the License.
*/

package za.co.absa.pramen.extras.tests.avro
package za.co.absa.pramen.extras.avro

import org.apache.spark.sql.functions.struct
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.extras.NestedDataFrameFactory
import za.co.absa.pramen.extras.utils.ResourceUtils.getResourceString
import za.co.absa.pramen.extras.utils.JsonUtils
import za.co.absa.pramen.extras.avro.AvroUtils
import za.co.absa.pramen.extras.base.SparkTestBase
import za.co.absa.pramen.extras.fixtures.TextComparisonFixture
import za.co.absa.pramen.extras.utils.JsonUtils
import za.co.absa.pramen.extras.utils.ResourceUtils.getResourceString

class AvroUtilsSuite extends AnyWordSpec with SparkTestBase with TextComparisonFixture {
import spark.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.pramen.extras.tests.builtin.infofile
package za.co.absa.pramen.extras.infofile

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.fs.Path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
* limitations under the License.
*/

package za.co.absa.pramen.extras.tests.sink
package za.co.absa.pramen.extras.sink

import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.extras.base.SparkTestBase
import za.co.absa.pramen.extras.sink.EnceladusConfig

import java.time.ZoneId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.pramen.extras.tests.sink
package za.co.absa.pramen.extras.sink

import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.Path
Expand All @@ -23,7 +23,6 @@ import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.extras.base.SparkTestBase
import za.co.absa.pramen.extras.fixtures.{TempDirFixture, TextComparisonFixture}
import za.co.absa.pramen.extras.mocks.QueryExecutorSpy
import za.co.absa.pramen.extras.sink.EnceladusSink
import za.co.absa.pramen.extras.sink.EnceladusSink.{DATASET_NAME_KEY, DATASET_VERSION_KEY, HIVE_TABLE_KEY}
import za.co.absa.pramen.extras.utils.FsUtils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,22 @@
* limitations under the License.
*/

package za.co.absa.pramen.extras.tests.sink
package za.co.absa.pramen.extras.sink

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.DataFrame
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.extras.base.SparkTestBase
import za.co.absa.pramen.extras.fixtures.{TempDirFixture, TextComparisonFixture}
import za.co.absa.pramen.extras.mocks.QueryExecutorSpy
import za.co.absa.pramen.extras.sink.EnceladusUtils
import za.co.absa.pramen.extras.utils.FsUtils

import java.nio.file.{Files, Paths}
import java.time.LocalDate

class EnceladusUtilsSuite extends AnyWordSpec with SparkTestBase with TextComparisonFixture with TempDirFixture {
import za.co.absa.pramen.extras.sink.InfoVersionStatus._
import spark.implicits._
import za.co.absa.pramen.extras.sink.InfoVersionStatus._

private val infoDate = LocalDate.of(2022, 2, 18)
private val rawPathPattern = "{year}/{month}/{day}/v{version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
* limitations under the License.
*/

package za.co.absa.pramen.extras.tests.sink
package za.co.absa.pramen.extras.sink

import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.extras.base.SparkTestBase
import za.co.absa.pramen.extras.sink.StandardizationConfig

import java.time.ZoneId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.pramen.extras.tests.sink
package za.co.absa.pramen.extras.sink

import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import org.apache.hadoop.fs.Path
Expand All @@ -24,7 +24,6 @@ import za.co.absa.pramen.core.utils.hive.{HiveHelperSql, HiveQueryTemplates}
import za.co.absa.pramen.extras.base.SparkTestBase
import za.co.absa.pramen.extras.fixtures.{TempDirFixture, TextComparisonFixture}
import za.co.absa.pramen.extras.mocks.QueryExecutorMock
import za.co.absa.pramen.extras.sink.{StandardizationConfig, StandardizationSink}
import za.co.absa.pramen.extras.utils.FsUtils

import java.nio.file.{Files, Paths}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
* limitations under the License.
*/

package za.co.absa.pramen.extras.tests.writer
package za.co.absa.pramen.extras.writer

import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.extras.base.SparkTestBase
import za.co.absa.pramen.extras.writer.TableWriterKafka

class TableWriterKafkaSuite extends AnyWordSpec with SparkTestBase {
"TableWriterKafka" should {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.writer.model

import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpec

class KafkaAvroConfigSuite extends AnyWordSpec {
"fromConfig" should {
"read a minimalistic config with brokers and schema registry URL" in {
val conf = ConfigFactory.parseString(
"""kafka.bootstrap.servers = "localhost:9092"
|schema.registry.url = "localhost:8081"
|schema.registry.value.naming.strategy = "topic.name"
|""".stripMargin)

val kafkaConfig = KafkaAvroConfig.fromConfig(conf)

assert(kafkaConfig.brokers == "localhost:9092")
assert(kafkaConfig.schemaRegistryUrl == "localhost:8081")
assert(kafkaConfig.valueNamingStrategy.namingStrategy == "topic.name")
assert(kafkaConfig.keyNamingStrategy.isEmpty)
}

"read a config with key naming strategy" in {
val conf = ConfigFactory.parseString(
"""kafka.bootstrap.servers = "localhost:9092"
|schema.registry.url = "localhost:8081"
|schema.registry.value.naming.strategy = "topic.name"
|schema.registry.key.naming.strategy = "topic.name"
|""".stripMargin)

val kafkaConfig = KafkaAvroConfig.fromConfig(conf)

assert(kafkaConfig.brokers == "localhost:9092")
assert(kafkaConfig.schemaRegistryUrl == "localhost:8081")
assert(kafkaConfig.valueNamingStrategy.namingStrategy == "topic.name")
assert(kafkaConfig.keyNamingStrategy.isDefined)
assert(kafkaConfig.keyNamingStrategy.get.namingStrategy == "topic.name")
}

"read a config with record.name naming strategy" in {
val conf = ConfigFactory.parseString(
"""kafka.bootstrap.servers = "localhost:9092"
|schema.registry.url = "localhost:8081"
|schema.registry.value {
| naming.strategy = "record.name"
| schema.record.name = "MyRecord"
| schema.record.namespace = "com.example"
|}
|""".stripMargin)

val kafkaConfig = KafkaAvroConfig.fromConfig(conf)

assert(kafkaConfig.brokers == "localhost:9092")
assert(kafkaConfig.schemaRegistryUrl == "localhost:8081")
assert(kafkaConfig.valueNamingStrategy.namingStrategy == "record.name")
assert(kafkaConfig.valueNamingStrategy.recordName.contains("MyRecord"))
assert(kafkaConfig.valueNamingStrategy.recordNamespace.contains("com.example"))
}

"read a config with extra auth properties" in {
val conf = ConfigFactory.parseString(
"""kafka.bootstrap.servers = "localhost:9092"
|schema.registry.url = "localhost:8081"
|schema.registry.value.naming.strategy = "topic.name"
|schema.registry.option {
| basic.auth.credentials.source = "USER_INFO"
| basic.auth.user.info = "test:12345"
| ssl.truststore.location = "cacerts"
| ssl.truststore.type = "JKS"
|}
|""".stripMargin)

val kafkaConfig = KafkaAvroConfig.fromConfig(conf)

assert(kafkaConfig.brokers == "localhost:9092")
assert(kafkaConfig.schemaRegistryUrl == "localhost:8081")
assert(kafkaConfig.valueNamingStrategy.namingStrategy == "topic.name")
assert(kafkaConfig.keyNamingStrategy.isEmpty)
assert(kafkaConfig.schemaRegistryExtraOptions("basic.auth.credentials.source") == "USER_INFO")
assert(kafkaConfig.schemaRegistryExtraOptions("basic.auth.user.info") == "test:12345")
assert(kafkaConfig.schemaRegistryExtraOptions("ssl.truststore.location") == "cacerts")
assert(kafkaConfig.schemaRegistryExtraOptions("ssl.truststore.type") == "JKS")
}

"throw an exception when bootstrap servers are missing" in {
val conf = ConfigFactory.parseString(
"""schema.registry.url = "localhost:8081"
|schema.registry.value.naming.strategy = "topic.name"
|""".stripMargin)

intercept[com.typesafe.config.ConfigException.Missing] {
KafkaAvroConfig.fromConfig(conf)
}
}

"throw an exception when schema registry URL is missing" in {
val conf = ConfigFactory.parseString(
"""kafka.bootstrap.servers = "localhost:9092"
|schema.registry.value.naming.strategy = "topic.name"
|""".stripMargin)

intercept[com.typesafe.config.ConfigException.Missing] {
KafkaAvroConfig.fromConfig(conf)
}
}

"throw an exception when value naming strategy is missing" in {
val conf = ConfigFactory.parseString(
"""kafka.bootstrap.servers = "localhost:9092"
|schema.registry.url = "localhost:8081"
|""".stripMargin)

intercept[com.typesafe.config.ConfigException.Missing] {
KafkaAvroConfig.fromConfig(conf)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.pramen.extras.tests.writer.model
package za.co.absa.pramen.extras.writer.model

import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import org.scalatest.wordspec.AnyWordSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.pramen.extras.tests.writer.model
package za.co.absa.pramen.extras.writer.model

import com.typesafe.config.ConfigException.Missing
import com.typesafe.config.ConfigFactory
Expand Down
Loading