#716 fFx schema registry configuration for Kafka source and sink#717
#716 fFx schema registry configuration for Kafka source and sink#717
Conversation
…as classes being tested.
…ptions have 'schema.registry' prefix and some are not. This would allow users to decide.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
WalkthroughWrapped Schema Registry options in README examples under Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (2)
38-60:⚠️ Potential issue | 🟠 MajorRedact the prefixed auth key before supporting it here.
This change allows users to pass
schema.registry.basic.auth.user.infovia the newoptionblock, butpramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala:123-135currently masks onlybasic.auth.user.infowhen logging Schema Registry options. Using the prefixed form will leak credentials into logs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala` around lines 38 - 60, The schema registry extra options added in KafkaAvroConfig.fromConfig (SCHEMA_EXTRA_OPTIONS / schemaRegistryExtraProperties) must redact the prefixed auth key before it's used or logged; update the mapping that builds schemaRegistryExtraProperties (the same place you prefix keys like you do for KAFKA_EXTRA_OPTIONS) to detect keys that equal or end with "basic.auth.user.info" (and/or "schema.registry.basic.auth.user.info") and replace their values with a redacted placeholder (e.g. "***REDACTED***") so credentials cannot leak when TableWriterKafka logs schema registry options (see TableWriterKafka masking logic around basic.auth.user.info).
38-60:⚠️ Potential issue | 🟠 MajorPreserve legacy
schema.registry.*extras while adding the newoptionblock.Switching the lookup to
schema.registry.optionmeans existing configs that already define auth/SSL properties directly underschema.registry.*are now ignored. That makes this bugfix a breaking migration for secured registries. Please merge the legacy location with the newoptionblock and let the new block win on collisions.💡 Suggested change
- val schemaRegistryExtraProperties = ConfigUtils.getExtraOptions(conf, SCHEMA_EXTRA_OPTIONS) + val legacySchemaRegistryExtraProperties = + if (conf.hasPath("schema.registry")) { + ConfigUtils.getFlatConfig(conf.getConfig("schema.registry")) + .collect { + case (k, v) + if k != "url" && + !k.startsWith("key.") && + !k.startsWith("value.") && + !k.startsWith("option.") => + k -> v.toString + } + } else { + Map.empty[String, String] + } + + val schemaRegistryExtraProperties = + legacySchemaRegistryExtraProperties ++ ConfigUtils.getExtraOptions(conf, SCHEMA_EXTRA_OPTIONS)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala` around lines 38 - 60, The code currently only reads extras from SCHEMA_EXTRA_OPTIONS ("schema.registry.option") and drops legacy extras directly under "schema.registry.*", causing config breakage; update fromConfig to also read legacy extras (use ConfigUtils.getExtraOptions with the legacy "schema.registry" base), merge that map with the map from SCHEMA_EXTRA_OPTIONS, and ensure when combining the two maps the values from SCHEMA_EXTRA_OPTIONS override legacy entries on key collisions; adjust the subsequent usage of schemaRegistryExtraProperties to use the merged map (refer to fromConfig, SCHEMA_EXTRA_OPTIONS, schemaRegistryExtraProperties, and ConfigUtils.getExtraOptions).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In
`@pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala`:
- Around line 38-60: The schema registry extra options added in
KafkaAvroConfig.fromConfig (SCHEMA_EXTRA_OPTIONS /
schemaRegistryExtraProperties) must redact the prefixed auth key before it's
used or logged; update the mapping that builds schemaRegistryExtraProperties
(the same place you prefix keys like you do for KAFKA_EXTRA_OPTIONS) to detect
keys that equal or end with "basic.auth.user.info" (and/or
"schema.registry.basic.auth.user.info") and replace their values with a redacted
placeholder (e.g. "***REDACTED***") so credentials cannot leak when
TableWriterKafka logs schema registry options (see TableWriterKafka masking
logic around basic.auth.user.info).
- Around line 38-60: The code currently only reads extras from
SCHEMA_EXTRA_OPTIONS ("schema.registry.option") and drops legacy extras directly
under "schema.registry.*", causing config breakage; update fromConfig to also
read legacy extras (use ConfigUtils.getExtraOptions with the legacy
"schema.registry" base), merge that map with the map from SCHEMA_EXTRA_OPTIONS,
and ensure when combining the two maps the values from SCHEMA_EXTRA_OPTIONS
override legacy entries on key collisions; adjust the subsequent usage of
schemaRegistryExtraProperties to use the merged map (refer to fromConfig,
SCHEMA_EXTRA_OPTIONS, schemaRegistryExtraProperties, and
ConfigUtils.getExtraOptions).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 73257dbc-e26b-42ba-802e-bb7873968a7c
📒 Files selected for processing (13)
README.mdpramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/avro/AvroUtilsSuite.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/infofile/InfoFileGenerationSuite.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusConfigSuite.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusSinkSuite.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusUtilsSuite.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/StandardizationConfigSuite.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/StandardizationSinkSuite.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/TableWriterKafkaSuite.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfigSuite.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfigSuite.scalapramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/NamingStrategySuite.scala
Unit Test Coverage
Files
|
Closes #716
Summary by CodeRabbit
Documentation
Configuration
Tests
Chores