Skip to content

story(issue-42): add SR-aware Produce/Consume to kafka Client#72

Merged
Zaba505 merged 3 commits into
mainfrom
worktree-issue-42
May 20, 2026
Merged

story(issue-42): add SR-aware Produce/Consume to kafka Client#72
Zaba505 merged 3 commits into
mainfrom
worktree-issue-42

Conversation

@Zaba505
Copy link
Copy Markdown
Member

@Zaba505 Zaba505 commented May 20, 2026

Summary

  • Extend *Client.Produce with keySchemaId / valueSchemaId opts — when non-zero, the field is prefixed with the Confluent wire-format header (0x00 || uint32be(schemaID) || payload) via sr.ConfluentHeader.AppendEncode.
  • Extend *Client.Consume with schemaRegistryAware opt — when true, each record's key/value is run through sr.ConfluentHeader.DecodeID; framed records surface their parsed IDs on the new ConsumedRecord.KeySchemaID / ValueSchemaID fields and have their framing bytes stripped before encodeBytes. Plaintext (and malformed-framing) records pass through with *SchemaID = 0 and bytes untouched.
  • Payload (de)serialization (Avro / Protobuf / JSON Schema) stays out of scope per the issue — follow-up stories will add native serde on top of this primitive.

Closes #42.

Test plan

  • dagger -m daggerverse/kafka/tests call schema-registry-framed-produce-consume-round-trip — happy path: register schema → produce framed → consume → assert ValueSchemaID and payload match.
  • dagger -m daggerverse/kafka/tests call schema-registry-plaintext-consume-unframed — negative path: plaintext record consumed with schemaRegistryAware=true surfaces ValueSchemaID=0 and unmodified value.
  • dagger -m daggerverse/kafka/tests call schema-registry-register-lookup-round-trip — existing SR admin round-trip unaffected.

🤖 Generated with Claude Code

Adds Confluent wire-format framing (0x00 || uint32be(schemaID) || payload)
on the Client data path so produced records are accepted by Schema-Registry-
aware consumers and consumed records expose their embedded schema IDs.
Framing is laid down / parsed via github.com/twmb/franz-go/pkg/sr rather
than hand-rolled. Payload (de)serialization remains out of scope — only the
5-byte header is added.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@Zaba505 Zaba505 self-assigned this May 20, 2026
@Zaba505 Zaba505 added the enhancement New feature or request label May 20, 2026
@Zaba505 Zaba505 requested a review from Copilot May 20, 2026 03:06
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds Confluent Schema Registry (SR) wire-format awareness to the daggerverse/kafka module’s data path, enabling framed produce and SR-aware consume while keeping payload (de)serialization out of scope.

Changes:

  • Extend Client.Produce to optionally prefix key/value with the Confluent framing header and add Client.Consume support for detecting/stripping framing and surfacing schema IDs.
  • Add schema-registry framed and plaintext negative-path tests in daggerverse/kafka/tests.
  • Document SR framing usage in the Kafka module README and add the franz-go/pkg/sr dependency.

Reviewed changes

Copilot reviewed 5 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
daggerverse/kafka/client.go Adds SR framing to Produce, SR-aware decoding to Consume, and new schema ID fields on ConsumedRecord.
daggerverse/kafka/tests/tests_schema_registry.go Adds new round-trip and negative-path tests for SR framing behavior.
daggerverse/kafka/tests/main.go Wires the new SR tests into the schema registry test job set.
daggerverse/kafka/README.md Documents how to produce/consume Confluent wire-format framed records.
daggerverse/kafka/go.mod Adds github.com/twmb/franz-go/pkg/sr requirement (and reorders requirements).
daggerverse/kafka/go.sum Adds checksums for github.com/twmb/franz-go/pkg/sr.

Comment thread daggerverse/kafka/client.go
Comment thread daggerverse/kafka/client.go Outdated
Comment thread daggerverse/kafka/client.go
- Validate keySchemaID/valueSchemaID >= 0 and propagate AppendEncode errors
  instead of silently sending unframed bytes.
- Rename Produce params keySchemaId/valueSchemaId -> keySchemaID/valueSchemaID
  to match the ConsumedRecord.{Key,Value}SchemaID initialism used elsewhere.
- Document schemaRegistryAware behavior (header detection, 5-byte strip,
  KeySchemaID/ValueSchemaID population) on the Consume doc comment so it
  surfaces in `dagger functions` and generated SDKs.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 6 changed files in this pull request and generated 1 comment.

Comment thread daggerverse/kafka/README.md Outdated
Resolve ValueSchemaID/Value via their generated `(ctx)` methods so the
snippet matches the actual Go API surface; field-style access in the
prior version was misleading.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@Zaba505 Zaba505 merged commit 0d42124 into main May 20, 2026
1 check passed
@Zaba505 Zaba505 deleted the worktree-issue-42 branch May 20, 2026 21:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

story(daggerverse): add SR-aware Produce/Consume to kafka Client

2 participants