Skip to content

#716 fFx schema registry configuration for Kafka source and sink#717

Merged
yruslan merged 4 commits intomainfrom
bugfix/716-fix-schema-registry-in-kafka-source
Mar 12, 2026
Merged

#716 fFx schema registry configuration for Kafka source and sink#717
yruslan merged 4 commits intomainfrom
bugfix/716-fix-schema-registry-in-kafka-source

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Mar 12, 2026

Closes #716

Summary by CodeRabbit

  • Documentation

    • Updated configuration examples to wrap Schema Registry settings in an option { … } block for Sources and Sinks.
  • Configuration

    • Schema Registry options are now namespaced under schema.registry.option.* and collected via the option block.
  • Tests

    • Added a comprehensive Kafka Avro configuration test suite and moved existing tests to the updated package layout.
  • Chores

    • Improved logging of Schema Registry options with sensitive fields redacted via a centralized redaction set.

yruslan added 2 commits March 11, 2026 15:53
…ptions have 'schema.registry' prefix and some are not.

This would allow users to decide.
@coderabbitai
Copy link

coderabbitai bot commented Mar 12, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0b5dacad-ccca-43af-ae37-48efe2c8489c

📥 Commits

Reviewing files that changed from the base of the PR and between 4d78acd and bbb4a7d.

📒 Files selected for processing (2)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala
🚧 Files skipped from review as they are similar to previous changes (1)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala

Walkthrough

Wrapped Schema Registry options in README examples under option { ... }; changed how KafkaAvroConfig collects/keys schema-registry extra options; added logging of schema-registry extra options in KafkaAvroSource; moved several test packages and added unit tests for KafkaAvroConfig.fromConfig.

Changes

Cohort / File(s) Summary
Documentation
README.md
Wrapped Schema Registry arbitrary options in option { ... } blocks and namespaced SSL keys under schema.registry.ssl. within those blocks (formatting/configuration change only).
Kafka Avro Config
pramen/extras/src/main/scala/.../writer/model/KafkaAvroConfig.scala
Changed the schema-registry extra-options prefix/collection logic (renamed prefix and removed prior re-keying to schema.registry.option.*). No public API signature changes.
Source & Writer Logging
pramen/extras/src/main/scala/.../source/KafkaAvroSource.scala, pramen/extras/src/main/scala/.../writer/TableWriterKafka.scala
Added logging call for Schema Registry extra options in KafkaAvroSource (redacting auth); replaced hard-coded redaction set with centralized KAFKA_TOKENS_TO_REDACT in TableWriterKafka.
Tests — New & Moves
pramen/extras/src/test/scala/.../writer/model/KafkaAvroConfigSuite.scala, pramen/extras/src/test/scala/.../writer/..., pramen/extras/src/test/scala/.../sink/..., pramen/extras/src/test/scala/.../avro/AvroUtilsSuite.scala, pramen/extras/src/test/scala/.../infofile/InfoFileGenerationSuite.scala
Added KafkaAvroConfigSuite with comprehensive tests for fromConfig; moved many test files from za.co.absa.pramen.extras.tests.* packages into component packages (...extras.writer, ...extras.sink, etc.) and adjusted imports (no logic changes).

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Poem

🐰 I hop through configs with a curious twitch,
Nesting options tidy in an option-y niche.
Tests find new homes, names neat and bright,
SSL tucked and auth redacted from sight.
A nibble, a log — then a joyful twitch.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title references issue #716 and describes fixing schema registry configuration for Kafka source and sink, which aligns with the main objective of the PR.
Linked Issues check ✅ Passed The PR addresses the core requirement from #716 by removing schema.registry prefix from configuration keys and implementing a namespaced structure within schema registry sections.
Out of Scope Changes check ✅ Passed All changes align with the PR objective: configuration restructuring, test package reorganization, adding test coverage, and centralizing redaction constants.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch bugfix/716-fix-schema-registry-in-kafka-source

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala (2)

38-60: ⚠️ Potential issue | 🟠 Major

Redact the prefixed auth key before supporting it here.

This change allows users to pass schema.registry.basic.auth.user.info via the new option block, but pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/TableWriterKafka.scala:123-135 currently masks only basic.auth.user.info when logging Schema Registry options. Using the prefixed form will leak credentials into logs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala`
around lines 38 - 60, The schema registry extra options added in
KafkaAvroConfig.fromConfig (SCHEMA_EXTRA_OPTIONS /
schemaRegistryExtraProperties) must redact the prefixed auth key before it's
used or logged; update the mapping that builds schemaRegistryExtraProperties
(the same place you prefix keys like you do for KAFKA_EXTRA_OPTIONS) to detect
keys that equal or end with "basic.auth.user.info" (and/or
"schema.registry.basic.auth.user.info") and replace their values with a redacted
placeholder (e.g. "***REDACTED***") so credentials cannot leak when
TableWriterKafka logs schema registry options (see TableWriterKafka masking
logic around basic.auth.user.info).

38-60: ⚠️ Potential issue | 🟠 Major

Preserve legacy schema.registry.* extras while adding the new option block.

Switching the lookup to schema.registry.option means existing configs that already define auth/SSL properties directly under schema.registry.* are now ignored. That makes this bugfix a breaking migration for secured registries. Please merge the legacy location with the new option block and let the new block win on collisions.

💡 Suggested change
-  val schemaRegistryExtraProperties = ConfigUtils.getExtraOptions(conf, SCHEMA_EXTRA_OPTIONS)
+  val legacySchemaRegistryExtraProperties =
+    if (conf.hasPath("schema.registry")) {
+      ConfigUtils.getFlatConfig(conf.getConfig("schema.registry"))
+        .collect {
+          case (k, v)
+              if k != "url" &&
+                 !k.startsWith("key.") &&
+                 !k.startsWith("value.") &&
+                 !k.startsWith("option.") =>
+            k -> v.toString
+        }
+    } else {
+      Map.empty[String, String]
+    }
+
+  val schemaRegistryExtraProperties =
+    legacySchemaRegistryExtraProperties ++ ConfigUtils.getExtraOptions(conf, SCHEMA_EXTRA_OPTIONS)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala`
around lines 38 - 60, The code currently only reads extras from
SCHEMA_EXTRA_OPTIONS ("schema.registry.option") and drops legacy extras directly
under "schema.registry.*", causing config breakage; update fromConfig to also
read legacy extras (use ConfigUtils.getExtraOptions with the legacy
"schema.registry" base), merge that map with the map from SCHEMA_EXTRA_OPTIONS,
and ensure when combining the two maps the values from SCHEMA_EXTRA_OPTIONS
override legacy entries on key collisions; adjust the subsequent usage of
schemaRegistryExtraProperties to use the merged map (refer to fromConfig,
SCHEMA_EXTRA_OPTIONS, schemaRegistryExtraProperties, and
ConfigUtils.getExtraOptions).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In
`@pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala`:
- Around line 38-60: The schema registry extra options added in
KafkaAvroConfig.fromConfig (SCHEMA_EXTRA_OPTIONS /
schemaRegistryExtraProperties) must redact the prefixed auth key before it's
used or logged; update the mapping that builds schemaRegistryExtraProperties
(the same place you prefix keys like you do for KAFKA_EXTRA_OPTIONS) to detect
keys that equal or end with "basic.auth.user.info" (and/or
"schema.registry.basic.auth.user.info") and replace their values with a redacted
placeholder (e.g. "***REDACTED***") so credentials cannot leak when
TableWriterKafka logs schema registry options (see TableWriterKafka masking
logic around basic.auth.user.info).
- Around line 38-60: The code currently only reads extras from
SCHEMA_EXTRA_OPTIONS ("schema.registry.option") and drops legacy extras directly
under "schema.registry.*", causing config breakage; update fromConfig to also
read legacy extras (use ConfigUtils.getExtraOptions with the legacy
"schema.registry" base), merge that map with the map from SCHEMA_EXTRA_OPTIONS,
and ensure when combining the two maps the values from SCHEMA_EXTRA_OPTIONS
override legacy entries on key collisions; adjust the subsequent usage of
schemaRegistryExtraProperties to use the merged map (refer to fromConfig,
SCHEMA_EXTRA_OPTIONS, schemaRegistryExtraProperties, and
ConfigUtils.getExtraOptions).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 73257dbc-e26b-42ba-802e-bb7873968a7c

📥 Commits

Reviewing files that changed from the base of the PR and between 70d2a69 and d4d4c6f.

📒 Files selected for processing (13)
  • README.md
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfig.scala
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/avro/AvroUtilsSuite.scala
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/infofile/InfoFileGenerationSuite.scala
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusConfigSuite.scala
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusSinkSuite.scala
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/EnceladusUtilsSuite.scala
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/StandardizationConfigSuite.scala
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/sink/StandardizationSinkSuite.scala
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/TableWriterKafkaSuite.scala
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroConfigSuite.scala
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/KafkaAvroWriterConfigSuite.scala
  • pramen/extras/src/test/scala/za/co/absa/pramen/extras/writer/model/NamingStrategySuite.scala

@github-actions
Copy link

github-actions bot commented Mar 12, 2026

Unit Test Coverage

Overall Project 84.37% -0.01% 🍏
Files changed 33.33%

Module Coverage
pramen-extras Jacoco Report 66.3% -0.13%
Files
Module File Coverage
pramen-extras Jacoco Report KafkaAvroConfig.scala 90.44% 🍏
TableWriterKafka.scala 9.15% -1.37%
KafkaAvroSource.scala 0% -0.75%

@yruslan yruslan merged commit 52c81d9 into main Mar 12, 2026
7 checks passed
@yruslan yruslan deleted the bugfix/716-fix-schema-registry-in-kafka-source branch March 12, 2026 12:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka source uses deprecated config for the Schema Registry

2 participants