story(issue-73): add JSON serde to kafka Client Produce/Consume#77
Merged
Conversation
Layers JSON wire-format enforcement on top of the Confluent framing primitive from #42: Produce canonicalises via encoding/json before framing; Consume validates via json.Valid after frame strip. Defaults are pass-through so existing callers are unaffected. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR extends the daggerverse/kafka Dagger module’s Client.Produce / Client.Consume APIs with optional JSON wire-format enforcement, allowing callers to canonicalize JSON before producing and validate JSON after consuming, while composing cleanly with existing Confluent Schema Registry framing options.
Changes:
- Add
keySerializeAs/valueSerializeAstoClient.ProduceandkeyDeserializeAs/valueDeserializeAstoClient.Consume, supporting""(pass-through) and"JSON". - Implement JSON canonicalization on produce (parse + re-marshal) and JSON validation on consume (
json.Valid). - Add schema-registry-focused tests for malformed JSON rejection and framed JSON round-trip; document the new options and composition order in the README.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| daggerverse/kafka/client.go | Adds JSON serialize/deserialize modes and integrates them into Produce/Consume pipelines. |
| daggerverse/kafka/README.md | Documents JSON serde options and how they compose with Schema Registry framing. |
| daggerverse/kafka/tests/tests_schema_registry.go | Adds tests for malformed JSON rejection and framed JSON round-trip with canonicalization. |
| daggerverse/kafka/tests/main.go | Registers the new schema-registry JSON serde tests in the schema registry test group. |
Copilot review on #77 flagged two payload-mutation bugs in canonicalJSON: json.Unmarshal into any decodes numbers as float64 (silently losing precision above 2^53), and json.Marshal HTML-escapes <, >, & in strings. Both violate the "canonicalise without corrupting payload" intent. Switch to json.Decoder with UseNumber() so large integers and high-precision decimals round-trip as json.Number, and use json.Encoder with SetEscapeHTML(false) so string contents pass through verbatim. Also reject trailing tokens after the first JSON value so callers can't smuggle extra documents through the validator. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
keySerializeAs/valueSerializeAsto*Client.ProduceandkeyDeserializeAs/valueDeserializeAsto*Client.Consume."JSON"is the only non-empty value accepted; default""is pass-through.encoding/json(parse + re-marshal) before framing — invalid JSON is rejected before any broker I/O. Consume validates viajson.Validafter frame strip.valueSchemaID/schemaRegistryAwareopts so a single call can both frame and validate. No new dependencies —encoding/jsononly.Closes #73.
Test plan
dagger developindaggerverse/kafka/anddaggerverse/kafka/tests/dagger -m daggerverse/kafka/tests call schema-registry-jsonserialize-rejects-malformed-input(negative, no broker — 13s)dagger -m daggerverse/kafka/tests call schema-registry-jsonframed-produce-consume-round-trip --kafka-image-tag=4.2.0(SR+JSON round-trip, asserts byte-equality against canonical form — 23s)dagger -m daggerverse/kafka/tests call all --parallel=4🤖 Generated with Claude Code