story(issue-42): add SR-aware Produce/Consume to kafka Client#72
Merged
Conversation
Adds Confluent wire-format framing (0x00 || uint32be(schemaID) || payload) on the Client data path so produced records are accepted by Schema-Registry- aware consumers and consumed records expose their embedded schema IDs. Framing is laid down / parsed via github.com/twmb/franz-go/pkg/sr rather than hand-rolled. Payload (de)serialization remains out of scope — only the 5-byte header is added. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds Confluent Schema Registry (SR) wire-format awareness to the daggerverse/kafka module’s data path, enabling framed produce and SR-aware consume while keeping payload (de)serialization out of scope.
Changes:
- Extend
Client.Produceto optionally prefix key/value with the Confluent framing header and addClient.Consumesupport for detecting/stripping framing and surfacing schema IDs. - Add schema-registry framed and plaintext negative-path tests in
daggerverse/kafka/tests. - Document SR framing usage in the Kafka module README and add the
franz-go/pkg/srdependency.
Reviewed changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| daggerverse/kafka/client.go | Adds SR framing to Produce, SR-aware decoding to Consume, and new schema ID fields on ConsumedRecord. |
| daggerverse/kafka/tests/tests_schema_registry.go | Adds new round-trip and negative-path tests for SR framing behavior. |
| daggerverse/kafka/tests/main.go | Wires the new SR tests into the schema registry test job set. |
| daggerverse/kafka/README.md | Documents how to produce/consume Confluent wire-format framed records. |
| daggerverse/kafka/go.mod | Adds github.com/twmb/franz-go/pkg/sr requirement (and reorders requirements). |
| daggerverse/kafka/go.sum | Adds checksums for github.com/twmb/franz-go/pkg/sr. |
- Validate keySchemaID/valueSchemaID >= 0 and propagate AppendEncode errors
instead of silently sending unframed bytes.
- Rename Produce params keySchemaId/valueSchemaId -> keySchemaID/valueSchemaID
to match the ConsumedRecord.{Key,Value}SchemaID initialism used elsewhere.
- Document schemaRegistryAware behavior (header detection, 5-byte strip,
KeySchemaID/ValueSchemaID population) on the Consume doc comment so it
surfaces in `dagger functions` and generated SDKs.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Resolve ValueSchemaID/Value via their generated `(ctx)` methods so the snippet matches the actual Go API surface; field-style access in the prior version was misleading. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
*Client.ProducewithkeySchemaId/valueSchemaIdopts — when non-zero, the field is prefixed with the Confluent wire-format header (0x00 || uint32be(schemaID) || payload) viasr.ConfluentHeader.AppendEncode.*Client.ConsumewithschemaRegistryAwareopt — when true, each record's key/value is run throughsr.ConfluentHeader.DecodeID; framed records surface their parsed IDs on the newConsumedRecord.KeySchemaID/ValueSchemaIDfields and have their framing bytes stripped beforeencodeBytes. Plaintext (and malformed-framing) records pass through with*SchemaID = 0and bytes untouched.Closes #42.
Test plan
dagger -m daggerverse/kafka/tests call schema-registry-framed-produce-consume-round-trip— happy path: register schema → produce framed → consume → assertValueSchemaIDand payload match.dagger -m daggerverse/kafka/tests call schema-registry-plaintext-consume-unframed— negative path: plaintext record consumed withschemaRegistryAware=truesurfacesValueSchemaID=0and unmodified value.dagger -m daggerverse/kafka/tests call schema-registry-register-lookup-round-trip— existing SR admin round-trip unaffected.🤖 Generated with Claude Code