From bd289397e62c2965785b3bf450c11affe6ecb21d Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Tue, 19 Aug 2025 12:07:08 -0700 Subject: [PATCH 01/24] Update ckgo version to 2.11.0 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d25c4dc8c9..7920ba36bf 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/confluentinc/ccloud-sdk-go-v2/sso v0.0.1 github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.2.0 github.com/confluentinc/cmf-sdk-go v0.0.4 - github.com/confluentinc/confluent-kafka-go/v2 v2.8.0 + github.com/confluentinc/confluent-kafka-go/v2 v2.11.0 github.com/confluentinc/go-editor v0.11.0 github.com/confluentinc/go-prompt v0.2.40 github.com/confluentinc/go-ps1 v1.0.2 diff --git a/go.sum b/go.sum index 16407e4dd3..011e90683d 100644 --- a/go.sum +++ b/go.sum @@ -262,8 +262,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.2.0 h1:ALDmQhIbzKjOz8xVgOx github.com/confluentinc/ccloud-sdk-go-v2/tableflow v0.2.0/go.mod h1:9eUl1KxQurBT6HgO/Whd7v2mfCBeqkA7FV8IGAjCOPY= github.com/confluentinc/cmf-sdk-go v0.0.4 h1:IAzACCIgcp0OAah9pvr6xtqaLUoQNoorxynNzIH5dQQ= github.com/confluentinc/cmf-sdk-go v0.0.4/go.mod h1:xY6OWDUc4UtKMgmade99v1C8/YovgWTl8tzeZ23LrEg= -github.com/confluentinc/confluent-kafka-go/v2 v2.8.0 h1:0HlcSNWg4LpLA9nIjzUMIqWHI+w0S68UN7alXAc3TeA= -github.com/confluentinc/confluent-kafka-go/v2 v2.8.0/go.mod h1:hScqtFIGUI1wqHIgM3mjoqEou4VweGGGX7dMpcUKves= +github.com/confluentinc/confluent-kafka-go/v2 v2.11.0 h1:rsqfCqZXAHjWQp4TuRgiNPuW1BlF3xO/5+TsE9iHApw= +github.com/confluentinc/confluent-kafka-go/v2 v2.11.0/go.mod h1:hScqtFIGUI1wqHIgM3mjoqEou4VweGGGX7dMpcUKves= github.com/confluentinc/go-editor v0.11.0 h1:fcEALYHj7xV/fRSp54/IHi2DS4GlZMJWVgrYvi/llvU= github.com/confluentinc/go-editor v0.11.0/go.mod h1:nEjwqdqx8S7ZGjXsDvRgawsA04Fu2P/KAtA8fa5afMI= github.com/confluentinc/go-prompt v0.2.40 h1:tveghQJ+FVOVvF0dgQaZEm7YZSQ3r3tyuLMHy7w4PK0= From 00b133d28ab6dc281f8e167d9360e625d39d7c90 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Wed, 20 Aug 2025 16:44:15 -0700 Subject: [PATCH 02/24] repair two tests --- pkg/serdes/serdes_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index f4b3297d06..7289cb9bd4 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -419,7 +419,7 @@ func TestJsonSerdesInvalid(t *testing.T) { req.Regexp("unexpected end of JSON input$", err) _, err = deserializationProvider.Deserialize("topic1", brokenBytes) - req.Regexp("unknown magic byte$", err) + req.Regexp("unknown magic byte[\\s\\d]*$", err) invalidString := `{"f2": "abc"}` invalidBytes := []byte{123, 34, 102, 50, 34, 58, 34, 97, 115, 100, 34, 125} @@ -428,7 +428,7 @@ func TestJsonSerdesInvalid(t *testing.T) { req.Regexp("missing properties: 'f1'$", err) _, err = deserializationProvider.Deserialize("topic1", invalidBytes) - req.Regexp("unknown magic byte$", err) + req.Regexp("unknown magic byte[\\s\\d]*$", err) } func TestJsonSerdesNestedValid(t *testing.T) { @@ -696,7 +696,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { req.EqualError(err, "the protobuf document is invalid") _, err = deserializationProvider.Deserialize("topic1", brokenBytes) - req.Regexp("^failed to deserialize payload:.*Subject Not Found$", err) + req.Regexp("^failed to deserialize payload: parsed invalid message index count", err) invalidString := `{"page":"abc"}` invalidBytes := []byte{0, 12, 3, 97, 98, 99, 16, 1, 24, 2} @@ -705,7 +705,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { req.EqualError(err, "the protobuf document is invalid") _, err = deserializationProvider.Deserialize("topic1", invalidBytes) - req.Regexp("^failed to deserialize payload:.*Subject Not Found$", err) + req.Regexp("^failed to deserialize payload: parsed invalid message index count", err) } func TestProtobufSerdesNestedValid(t *testing.T) { From 5f89e2c67af28ec3be5cb6461b653f5f743d9623 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Tue, 16 Sep 2025 14:25:18 -0700 Subject: [PATCH 03/24] remove validation on json deserialize (unnecessary) --- pkg/serdes/json_deserialization_provider.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/serdes/json_deserialization_provider.go b/pkg/serdes/json_deserialization_provider.go index bc5d3dc76f..61e19b1800 100644 --- a/pkg/serdes/json_deserialization_provider.go +++ b/pkg/serdes/json_deserialization_provider.go @@ -47,7 +47,6 @@ func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } serdeConfig := jsonschema.NewDeserializerConfig() - serdeConfig.EnableValidation = true // local KMS secret is only set and used during local testing with ruleSet if localKmsSecretValue := os.Getenv(localKmsSecretMacro); srClientUrl == mockClientUrl && localKmsSecretValue != "" { From 7a6d9aa7d28f85bc62fbab31faa0f15761e3c0a9 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Mon, 22 Sep 2025 11:04:20 -0700 Subject: [PATCH 04/24] don't load schema for avro or json --- internal/kafka/confluent_kafka.go | 5 ++--- pkg/serdes/serdes.go | 4 ++++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/kafka/confluent_kafka.go b/internal/kafka/confluent_kafka.go index 587d76feac..d56b89fd32 100644 --- a/internal/kafka/confluent_kafka.go +++ b/internal/kafka/confluent_kafka.go @@ -10,7 +10,6 @@ import ( "os/signal" "path/filepath" "regexp" - "slices" "strings" "time" @@ -251,7 +250,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { return err } - if slices.Contains(serdes.SchemaBasedFormats, h.KeyFormat) { + if serdes.IsProtobufSchema(h.KeyFormat) { // avro and json schema don't need to load schema schemaPath, referencePathMap, err := h.RequestSchema(message.Key) if err != nil { return err @@ -284,7 +283,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { return err } - if slices.Contains(serdes.SchemaBasedFormats, h.ValueFormat) { + if serdes.IsProtobufSchema(h.ValueFormat) { // avro and json schema don't need to load schema schemaPath, referencePathMap, err := h.RequestSchema(message.Value) if err != nil { return err diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index 9d5a8880b8..89f67c6dcd 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -130,3 +130,7 @@ func GetDeserializationProvider(valueFormat string) (DeserializationProvider, er return nil, fmt.Errorf(errors.UnknownValueFormatErrorMsg) } } + +func IsProtobufSchema(valueFormat string) bool { + return valueFormat == protobufSchemaName +} From 49d832c8ff283272cfaa6e5d2efe792a542a0294 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Mon, 22 Sep 2025 17:49:41 -0700 Subject: [PATCH 05/24] add deserialize with header for avro --- internal/asyncapi/command_export.go | 2 +- internal/kafka/confluent_kafka.go | 4 +-- pkg/serdes/avro_deserialization_provider.go | 21 +++++++++--- pkg/serdes/double_deserialization_provider.go | 4 ++- .../integer_deserialization_provider.go | 4 ++- pkg/serdes/json_deserialization_provider.go | 8 +++-- .../protobuf_deserialization_provider.go | 3 +- pkg/serdes/serdes.go | 3 +- pkg/serdes/serdes_test.go | 34 +++++++++---------- pkg/serdes/string_deserialization_provider.go | 4 ++- 10 files changed, 55 insertions(+), 32 deletions(-) diff --git a/internal/asyncapi/command_export.go b/internal/asyncapi/command_export.go index ace7d3aed6..3632df2a9c 100644 --- a/internal/asyncapi/command_export.go +++ b/internal/asyncapi/command_export.go @@ -327,7 +327,7 @@ func (c *command) getMessageExamples(consumer *ckgo.Consumer, topicName, content } } - jsonMessage, err := deserializationProvider.Deserialize(topicName, value) + jsonMessage, err := deserializationProvider.Deserialize(topicName, message.Headers, value) if err != nil { return nil, fmt.Errorf("failed to deserialize example: %v", err) } diff --git a/internal/kafka/confluent_kafka.go b/internal/kafka/confluent_kafka.go index d56b89fd32..f350f74f7a 100644 --- a/internal/kafka/confluent_kafka.go +++ b/internal/kafka/confluent_kafka.go @@ -260,7 +260,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { } } - jsonMessage, err := keyDeserializer.Deserialize(h.Topic, message.Key) + jsonMessage, err := keyDeserializer.Deserialize(h.Topic, message.Headers, message.Key) if err != nil { return err } @@ -315,7 +315,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { } func getMessageString(message *ckgo.Message, valueDeserializer serdes.DeserializationProvider, properties ConsumerProperties, topic string) (string, error) { - messageString, err := valueDeserializer.Deserialize(topic, message.Value) + messageString, err := valueDeserializer.Deserialize(topic, message.Headers, message.Value) if err != nil { return "", err } diff --git a/pkg/serdes/avro_deserialization_provider.go b/pkg/serdes/avro_deserialization_provider.go index aa9f259414..b3d74beee5 100644 --- a/pkg/serdes/avro_deserialization_provider.go +++ b/pkg/serdes/avro_deserialization_provider.go @@ -5,9 +5,12 @@ import ( "fmt" "os" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avrov2" + + "github.com/confluentinc/cli/v4/pkg/log" ) type AvroDeserializationProvider struct { @@ -35,7 +38,8 @@ func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } else if srAuth.Token != "" { serdeClientConfig = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") } else { - return fmt.Errorf("schema registry client authentication should be provider to initialize deserializer") + serdeClientConfig = schemaregistry.NewConfig(srClientUrl) + log.CliLogger.Info("initializing deserializer with no schema registry client authentication") } serdeClientConfig.SslCaLocation = srAuth.CertificateAuthorityPath serdeClientConfig.SslCertificateLocation = srAuth.ClientCertPath @@ -80,11 +84,18 @@ func (a *AvroDeserializationProvider) LoadSchema(_ string, _ map[string]string) return nil } -func (a *AvroDeserializationProvider) Deserialize(topic string, payload []byte) (string, error) { +func (a *AvroDeserializationProvider) Deserialize(topic string, headers []kafka.Header, payload []byte) (string, error) { message := make(map[string]any) - err := a.deser.DeserializeInto(topic, payload, &message) - if err != nil { - return "", fmt.Errorf("failed to deserialize payload: %w", err) + if len(headers) > 0 { + err := a.deser.DeserializeWithHeadersInto(topic, headers, payload, &message) + if err != nil { + return "", fmt.Errorf("failed to deserialize payload: %w", err) + } + } else { + err := a.deser.DeserializeInto(topic, payload, &message) + if err != nil { + return "", fmt.Errorf("failed to deserialize payload: %w", err) + } } jsonBytes, err := json.Marshal(message) if err != nil { diff --git a/pkg/serdes/double_deserialization_provider.go b/pkg/serdes/double_deserialization_provider.go index 71e72be9e3..83b91ed20c 100644 --- a/pkg/serdes/double_deserialization_provider.go +++ b/pkg/serdes/double_deserialization_provider.go @@ -4,6 +4,8 @@ import ( "encoding/binary" "fmt" "math" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) type DoubleDeserializationProvider struct{} @@ -16,7 +18,7 @@ func (DoubleDeserializationProvider) LoadSchema(_ string, _ map[string]string) e return nil } -func (DoubleDeserializationProvider) Deserialize(_ string, data []byte) (string, error) { +func (DoubleDeserializationProvider) Deserialize(_ string, _ []kafka.Header, data []byte) (string, error) { if len(data) == 0 { return "", nil } diff --git a/pkg/serdes/integer_deserialization_provider.go b/pkg/serdes/integer_deserialization_provider.go index 800f689d34..a332314438 100644 --- a/pkg/serdes/integer_deserialization_provider.go +++ b/pkg/serdes/integer_deserialization_provider.go @@ -3,6 +3,8 @@ package serdes import ( "encoding/binary" "fmt" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) type IntegerDeserializationProvider struct{} @@ -15,7 +17,7 @@ func (IntegerDeserializationProvider) LoadSchema(_ string, _ map[string]string) return nil } -func (IntegerDeserializationProvider) Deserialize(_ string, data []byte) (string, error) { +func (IntegerDeserializationProvider) Deserialize(_ string, _ []kafka.Header, data []byte) (string, error) { if len(data) == 0 { return "", nil } diff --git a/pkg/serdes/json_deserialization_provider.go b/pkg/serdes/json_deserialization_provider.go index 61e19b1800..ede52b490b 100644 --- a/pkg/serdes/json_deserialization_provider.go +++ b/pkg/serdes/json_deserialization_provider.go @@ -5,9 +5,12 @@ import ( "fmt" "os" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/jsonschema" + + "github.com/confluentinc/cli/v4/pkg/log" ) type JsonDeserializationProvider struct { @@ -34,7 +37,8 @@ func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } else if srAuth.Token != "" { serdeClientConfig = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") } else { - return fmt.Errorf("schema registry client authentication should be provider to initialize deserializer") + serdeClientConfig = schemaregistry.NewConfig(srClientUrl) + log.CliLogger.Info("initializing deserializer with no schema registry client authentication") } serdeClientConfig.SslCaLocation = srAuth.CertificateAuthorityPath serdeClientConfig.SslCertificateLocation = srAuth.ClientCertPath @@ -79,7 +83,7 @@ func (j *JsonDeserializationProvider) LoadSchema(_ string, _ map[string]string) return nil } -func (j *JsonDeserializationProvider) Deserialize(topic string, payload []byte) (string, error) { +func (j *JsonDeserializationProvider) Deserialize(topic string, headers []kafka.Header, payload []byte) (string, error) { message := make(map[string]interface{}) err := j.deser.DeserializeInto(topic, payload, &message) if err != nil { diff --git a/pkg/serdes/protobuf_deserialization_provider.go b/pkg/serdes/protobuf_deserialization_provider.go index bdbd0363e5..66603a8589 100644 --- a/pkg/serdes/protobuf_deserialization_provider.go +++ b/pkg/serdes/protobuf_deserialization_provider.go @@ -8,6 +8,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" gproto "google.golang.org/protobuf/proto" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/protobuf" @@ -89,7 +90,7 @@ func (p *ProtobufDeserializationProvider) LoadSchema(schemaPath string, referenc return nil } -func (p *ProtobufDeserializationProvider) Deserialize(topic string, payload []byte) (string, error) { +func (p *ProtobufDeserializationProvider) Deserialize(topic string, headers []kafka.Header, payload []byte) (string, error) { // Register the protobuf message err := p.deser.ProtoRegistry.RegisterMessage(p.message.ProtoReflect().Type()) re := regexp.MustCompile(`message .* is already registered`) diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index 89f67c6dcd..56a327aef2 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -3,6 +3,7 @@ package serdes import ( "fmt" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/cli/v4/pkg/errors" @@ -75,7 +76,7 @@ type SerializationProvider interface { type DeserializationProvider interface { InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient any) error LoadSchema(string, map[string]string) error - Deserialize(string, []byte) (string, error) + Deserialize(string, []kafka.Header, []byte) (string, error) } func FormatTranslation(backendValueFormat string) (string, error) { diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index 7289cb9bd4..87360b0efc 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -79,7 +79,7 @@ func TestStringSerdes(t *testing.T) { deserializationProvider, _ := GetDeserializationProvider(stringSchemaName) data = []byte{115, 111, 109, 101, 83, 116, 114, 105, 110, 103} - str, err := deserializationProvider.Deserialize("", data) + str, err := deserializationProvider.Deserialize("", nil, data) req.Nil(err) req.Equal(str, "someString") } @@ -122,7 +122,7 @@ func TestAvroSerdesValid(t *testing.T) { err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - actualString, err := deserializationProvider.Deserialize("topic1", data) + actualString, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) req.Equal(expectedString, actualString) @@ -164,7 +164,7 @@ func TestAvroSerdesInvalid(t *testing.T) { _, err = serializationProvider.Serialize("topic1", brokenString) req.Regexp(`cannot decode textual record "myRecord": short buffer`, err) - _, err = deserializationProvider.Deserialize("topic1", brokenBytes) + _, err = deserializationProvider.Deserialize("topic1", nil, brokenBytes) req.Regexp("unexpected EOF$", err) invalidString := `{"f2": "abc"}` @@ -210,7 +210,7 @@ func TestAvroSerdesNestedValid(t *testing.T) { err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - actualString, err := deserializationProvider.Deserialize("topic1", expectedBytes) + actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) req.Nil(err) req.Equal(expectedString, actualString) @@ -270,7 +270,7 @@ func TestAvroSerdesValidWithRuleSet(t *testing.T) { err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - actualString, err := deserializationProvider.Deserialize("topic1", data) + actualString, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) req.Equal(expectedString, actualString) @@ -315,7 +315,7 @@ func TestJsonSerdesValid(t *testing.T) { err = deserializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) - actualString, err := deserializationProvider.Deserialize("topic1", expectedBytes) + actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) req.Nil(err) req.Equal(expectedString, actualString) } @@ -379,7 +379,7 @@ func TestJsonSerdesReference(t *testing.T) { err = deserializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) - actualString, err := deserializationProvider.Deserialize("topic1", expectedBytes) + actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) req.Nil(err) req.Equal(expectedString, actualString) } @@ -418,7 +418,7 @@ func TestJsonSerdesInvalid(t *testing.T) { _, err = serializationProvider.Serialize("topic1", brokenString) req.Regexp("unexpected end of JSON input$", err) - _, err = deserializationProvider.Deserialize("topic1", brokenBytes) + _, err = deserializationProvider.Deserialize("topic1", nil, brokenBytes) req.Regexp("unknown magic byte[\\s\\d]*$", err) invalidString := `{"f2": "abc"}` @@ -427,7 +427,7 @@ func TestJsonSerdesInvalid(t *testing.T) { _, err = serializationProvider.Serialize("topic1", invalidString) req.Regexp("missing properties: 'f1'$", err) - _, err = deserializationProvider.Deserialize("topic1", invalidBytes) + _, err = deserializationProvider.Deserialize("topic1", nil, invalidBytes) req.Regexp("unknown magic byte[\\s\\d]*$", err) } @@ -471,7 +471,7 @@ func TestJsonSerdesNestedValid(t *testing.T) { err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - actualString, err := deserializationProvider.Deserialize("topic1", expectedBytes) + actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) req.Nil(err) req.Equal(expectedString, actualString) @@ -530,7 +530,7 @@ func TestJsonSerdesValidWithRuleSet(t *testing.T) { err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - actualString, err := deserializationProvider.Deserialize("topic1", data) + actualString, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) req.Equal(expectedString, actualString) @@ -574,7 +574,7 @@ func TestProtobufSerdesValid(t *testing.T) { req.Nil(err) err = deserializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) - actualString, err := deserializationProvider.Deserialize("topic1", data) + actualString, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) req.JSONEq(expectedString, actualString) } @@ -650,7 +650,7 @@ message Person { req.Nil(err) err = deserializationProvider.LoadSchema(schemaPath, map[string]string{"address.proto": referencePath}) req.Nil(err) - str, err := deserializationProvider.Deserialize("topic1", data) + str, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) req.JSONEq(str, expectedString) } @@ -695,7 +695,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { _, err = serializationProvider.Serialize("topic1", brokenString) req.EqualError(err, "the protobuf document is invalid") - _, err = deserializationProvider.Deserialize("topic1", brokenBytes) + _, err = deserializationProvider.Deserialize("topic1", nil, brokenBytes) req.Regexp("^failed to deserialize payload: parsed invalid message index count", err) invalidString := `{"page":"abc"}` @@ -704,7 +704,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { _, err = serializationProvider.Serialize("topic1", invalidString) req.EqualError(err, "the protobuf document is invalid") - _, err = deserializationProvider.Deserialize("topic1", invalidBytes) + _, err = deserializationProvider.Deserialize("topic1", nil, invalidBytes) req.Regexp("^failed to deserialize payload: parsed invalid message index count", err) } @@ -754,7 +754,7 @@ func TestProtobufSerdesNestedValid(t *testing.T) { req.Nil(err) err = deserializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) - actualString, err := deserializationProvider.Deserialize("topic1", data) + actualString, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) req.JSONEq(expectedString, actualString) } @@ -823,7 +823,7 @@ func TestProtobufSerdesValidWithRuleSet(t *testing.T) { req.Nil(err) err = deserializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) - actualString, err := deserializationProvider.Deserialize("topic1", data) + actualString, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) req.JSONEq(expectedString, actualString) } diff --git a/pkg/serdes/string_deserialization_provider.go b/pkg/serdes/string_deserialization_provider.go index 0c351fc6dd..11f728af9c 100644 --- a/pkg/serdes/string_deserialization_provider.go +++ b/pkg/serdes/string_deserialization_provider.go @@ -1,5 +1,7 @@ package serdes +import "github.com/confluentinc/confluent-kafka-go/v2/kafka" + type StringDeserializationProvider struct{} func (s *StringDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ any) error { @@ -10,7 +12,7 @@ func (s *StringDeserializationProvider) LoadSchema(_ string, _ map[string]string return nil } -func (s *StringDeserializationProvider) Deserialize(_ string, data []byte) (string, error) { +func (s *StringDeserializationProvider) Deserialize(_ string, _ []kafka.Header, data []byte) (string, error) { message := string(data) return message, nil } From ec001de27f5ab042da2765e7666450176cb7b147 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Mon, 22 Sep 2025 18:10:54 -0700 Subject: [PATCH 06/24] add deserialize with header for jsonschema --- pkg/serdes/json_deserialization_provider.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/serdes/json_deserialization_provider.go b/pkg/serdes/json_deserialization_provider.go index ede52b490b..78ec05f901 100644 --- a/pkg/serdes/json_deserialization_provider.go +++ b/pkg/serdes/json_deserialization_provider.go @@ -85,9 +85,16 @@ func (j *JsonDeserializationProvider) LoadSchema(_ string, _ map[string]string) func (j *JsonDeserializationProvider) Deserialize(topic string, headers []kafka.Header, payload []byte) (string, error) { message := make(map[string]interface{}) - err := j.deser.DeserializeInto(topic, payload, &message) - if err != nil { - return "", fmt.Errorf("failed to deserialize payload: %w", err) + if len(headers) > 0 { + err := j.deser.DeserializeWithHeadersInto(topic, headers, payload, &message) + if err != nil { + return "", fmt.Errorf("failed to deserialize payload: %w", err) + } + } else { + err := j.deser.DeserializeInto(topic, payload, &message) + if err != nil { + return "", fmt.Errorf("failed to deserialize payload: %w", err) + } } jsonBytes, err := json.Marshal(message) if err != nil { From fc1843311db34b7288fe3b43d14507f832f059a4 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Fri, 3 Oct 2025 16:41:47 -0700 Subject: [PATCH 07/24] Add deserialization with header for protobuf --- internal/asyncapi/command_export.go | 12 +- internal/kafka/confluent_kafka.go | 78 +------------ pkg/serdes/avro_deserialization_provider.go | 2 +- pkg/serdes/double_deserialization_provider.go | 3 +- .../integer_deserialization_provider.go | 3 +- pkg/serdes/json_deserialization_provider.go | 2 +- .../protobuf_deserialization_provider.go | 107 +++++++++++++++++- pkg/serdes/serdes.go | 3 +- pkg/serdes/serdes_test.go | 51 +++++++-- pkg/serdes/string_deserialization_provider.go | 7 +- 10 files changed, 165 insertions(+), 103 deletions(-) diff --git a/internal/asyncapi/command_export.go b/internal/asyncapi/command_export.go index 3632df2a9c..386aa210f0 100644 --- a/internal/asyncapi/command_export.go +++ b/internal/asyncapi/command_export.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "os" - "slices" "strconv" "strings" "time" @@ -15,6 +14,7 @@ import ( "github.com/swaggest/go-asyncapi/spec-2.4.0" ckgo "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/cli/v4/internal/kafka" "github.com/confluentinc/cli/v4/pkg/auth" @@ -317,14 +317,8 @@ func (c *command) getMessageExamples(consumer *ckgo.Consumer, topicName, content Subject: topicName + "-value", Properties: kafka.ConsumerProperties{}, } - if slices.Contains(serdes.SchemaBasedFormats, valueFormat) { - schemaPath, referencePathMap, err := groupHandler.RequestSchema(value) - if err != nil { - return nil, err - } - if err := deserializationProvider.LoadSchema(schemaPath, referencePathMap); err != nil { - return nil, err - } + if err := deserializationProvider.LoadSchema(groupHandler.Subject, groupHandler.Properties.SchemaPath, serde.ValueSerde, message); err != nil { + return nil, err } jsonMessage, err := deserializationProvider.Deserialize(topicName, message.Headers, value) diff --git a/internal/kafka/confluent_kafka.go b/internal/kafka/confluent_kafka.go index f350f74f7a..4ebb0726b7 100644 --- a/internal/kafka/confluent_kafka.go +++ b/internal/kafka/confluent_kafka.go @@ -2,13 +2,10 @@ package kafka import ( "context" - "encoding/binary" - "encoding/json" "fmt" "io" "os" "os/signal" - "path/filepath" "regexp" "strings" "time" @@ -16,8 +13,8 @@ import ( "github.com/spf13/cobra" ckgo "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/mds-sdk-go-public/mdsv1" - srsdk "github.com/confluentinc/schema-registry-sdk-go" "github.com/confluentinc/cli/v4/pkg/config" "github.com/confluentinc/cli/v4/pkg/errors" @@ -26,7 +23,6 @@ import ( "github.com/confluentinc/cli/v4/pkg/output" "github.com/confluentinc/cli/v4/pkg/schemaregistry" "github.com/confluentinc/cli/v4/pkg/serdes" - "github.com/confluentinc/cli/v4/pkg/utils" ) const ( @@ -250,14 +246,8 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { return err } - if serdes.IsProtobufSchema(h.KeyFormat) { // avro and json schema don't need to load schema - schemaPath, referencePathMap, err := h.RequestSchema(message.Key) - if err != nil { - return err - } - if err := keyDeserializer.LoadSchema(schemaPath, referencePathMap); err != nil { - return err - } + if err := keyDeserializer.LoadSchema(h.Subject, h.Properties.SchemaPath, serde.KeySerde, message); err != nil { + return err } jsonMessage, err := keyDeserializer.Deserialize(h.Topic, message.Headers, message.Key) @@ -283,14 +273,8 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { return err } - if serdes.IsProtobufSchema(h.ValueFormat) { // avro and json schema don't need to load schema - schemaPath, referencePathMap, err := h.RequestSchema(message.Value) - if err != nil { - return err - } - if err := valueDeserializer.LoadSchema(schemaPath, referencePathMap); err != nil { - return err - } + if err := valueDeserializer.LoadSchema(h.Subject, h.Properties.SchemaPath, serde.ValueSerde, message); err != nil { + return err } messageString, err := getMessageString(message, valueDeserializer, h.Properties, h.Topic) @@ -389,58 +373,6 @@ func (c *command) runConsumer(consumer *ckgo.Consumer, groupHandler *GroupHandle return nil } -func (h *GroupHandler) RequestSchema(value []byte) (string, map[string]string, error) { - if len(value) == 0 || value[0] != 0x0 { - return "", nil, errors.NewErrorWithSuggestions("unknown magic byte", fmt.Sprintf("Check that all messages from this topic are in the %s format.", h.ValueFormat)) - } - if len(value) < messageOffset { - return "", nil, fmt.Errorf("failed to find schema ID in topic data") - } - - // Retrieve schema from cluster only if schema is specified. - schemaID := int32(binary.BigEndian.Uint32(value[1:messageOffset])) // schema id is stored as a part of message meta info - - // Create temporary file to store schema retrieved (also for cache). Retry if get error retrieving schema or writing temp schema file - tempStorePath := filepath.Join(h.Properties.SchemaPath, fmt.Sprintf("%s-%d.txt", h.Subject, schemaID)) - tempRefStorePath := filepath.Join(h.Properties.SchemaPath, fmt.Sprintf("%s-%d.ref", h.Subject, schemaID)) - var references []srsdk.SchemaReference - if !utils.FileExists(tempStorePath) || !utils.FileExists(tempRefStorePath) { - // TODO: add handler for writing schema failure - schemaString, err := h.SrClient.GetSchema(schemaID, h.Subject) - if err != nil { - return "", nil, err - } - if err := os.WriteFile(tempStorePath, []byte(schemaString.GetSchema()), 0644); err != nil { - return "", nil, err - } - - refBytes, err := json.Marshal(schemaString.References) - if err != nil { - return "", nil, err - } - if err := os.WriteFile(tempRefStorePath, refBytes, 0644); err != nil { - return "", nil, err - } - references = schemaString.GetReferences() - } else { - refBlob, err := os.ReadFile(tempRefStorePath) - if err != nil { - return "", nil, err - } - if err := json.Unmarshal(refBlob, &references); err != nil { - return "", nil, err - } - } - - // Store the references in temporary files - referencePathMap, err := schemaregistry.StoreSchemaReferences(h.Properties.SchemaPath, references, h.SrClient) - if err != nil { - return "", nil, err - } - - return tempStorePath, referencePathMap, nil -} - func getFullHeaders(headers []ckgo.Header) []string { headerStrings := make([]string, len(headers)) for i, header := range headers { diff --git a/pkg/serdes/avro_deserialization_provider.go b/pkg/serdes/avro_deserialization_provider.go index b3d74beee5..84689eadff 100644 --- a/pkg/serdes/avro_deserialization_provider.go +++ b/pkg/serdes/avro_deserialization_provider.go @@ -80,7 +80,7 @@ func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, return nil } -func (a *AvroDeserializationProvider) LoadSchema(_ string, _ map[string]string) error { +func (a *AvroDeserializationProvider) LoadSchema(_ string, _ string, _ serde.Type, _ *kafka.Message) error { return nil } diff --git a/pkg/serdes/double_deserialization_provider.go b/pkg/serdes/double_deserialization_provider.go index 83b91ed20c..f04c368be2 100644 --- a/pkg/serdes/double_deserialization_provider.go +++ b/pkg/serdes/double_deserialization_provider.go @@ -6,6 +6,7 @@ import ( "math" "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" ) type DoubleDeserializationProvider struct{} @@ -14,7 +15,7 @@ func (DoubleDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRe return nil } -func (DoubleDeserializationProvider) LoadSchema(_ string, _ map[string]string) error { +func (DoubleDeserializationProvider) LoadSchema(_ string, _ string, _ serde.Type, _ *kafka.Message) error { return nil } diff --git a/pkg/serdes/integer_deserialization_provider.go b/pkg/serdes/integer_deserialization_provider.go index a332314438..fd5a8420da 100644 --- a/pkg/serdes/integer_deserialization_provider.go +++ b/pkg/serdes/integer_deserialization_provider.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" ) type IntegerDeserializationProvider struct{} @@ -13,7 +14,7 @@ func (IntegerDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaR return nil } -func (IntegerDeserializationProvider) LoadSchema(_ string, _ map[string]string) error { +func (IntegerDeserializationProvider) LoadSchema(_ string, _ string, _ serde.Type, _ *kafka.Message) error { return nil } diff --git a/pkg/serdes/json_deserialization_provider.go b/pkg/serdes/json_deserialization_provider.go index 78ec05f901..53d154f442 100644 --- a/pkg/serdes/json_deserialization_provider.go +++ b/pkg/serdes/json_deserialization_provider.go @@ -79,7 +79,7 @@ func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, return nil } -func (j *JsonDeserializationProvider) LoadSchema(_ string, _ map[string]string) error { +func (j *JsonDeserializationProvider) LoadSchema(_ string, _ string, _ serde.Type, _ *kafka.Message) error { return nil } diff --git a/pkg/serdes/protobuf_deserialization_provider.go b/pkg/serdes/protobuf_deserialization_provider.go index 66603a8589..628c86b04c 100644 --- a/pkg/serdes/protobuf_deserialization_provider.go +++ b/pkg/serdes/protobuf_deserialization_provider.go @@ -1,10 +1,14 @@ package serdes import ( + "encoding/json" "fmt" "os" + "path/filepath" "regexp" + "strconv" + "github.com/google/uuid" "google.golang.org/protobuf/encoding/protojson" gproto "google.golang.org/protobuf/proto" @@ -12,6 +16,8 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/protobuf" + + "github.com/confluentinc/cli/v4/pkg/utils" ) type ProtobufDeserializationProvider struct { @@ -81,8 +87,13 @@ func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srCluste return nil } -func (p *ProtobufDeserializationProvider) LoadSchema(schemaPath string, referencePathMap map[string]string) error { - message, err := parseMessage(schemaPath, referencePathMap) +func (p *ProtobufDeserializationProvider) LoadSchema(subject, schemaPath string, serdeType serde.Type, kafkaMessage *kafka.Message) error { + updatedSchemaPath, referencePathMap, err := p.requestSchema(subject, schemaPath, serdeType, kafkaMessage) + if err != nil { + return err + } + + message, err := parseMessage(updatedSchemaPath, referencePathMap) if err != nil { return err } @@ -90,6 +101,98 @@ func (p *ProtobufDeserializationProvider) LoadSchema(schemaPath string, referenc return nil } +func (p *ProtobufDeserializationProvider) requestSchema(subject, schemaPath string, serdeType serde.Type, message *kafka.Message) (string, map[string]string, error) { + if message == nil { + return "", nil, fmt.Errorf("kafka message is nil") + } + + schemaID := serde.SchemaID{} + _, err := serde.DualSchemaIDDeserializer(subject, serdeType, message.Headers, message.Value, &schemaID) + if err != nil { + return "", nil, err + } + + var idString string + if schemaID.ID > 0 { // integer schema ID from the message prefix + idString = strconv.Itoa(int(schemaID.ID)) + } else if schemaID.GUID != uuid.Nil { // GUID schema ID from the header + idString = schemaID.GUID.String() + } + + tempStorePath := filepath.Join(schemaPath, fmt.Sprintf("%s-%s.txt", subject, idString)) + tempRefStorePath := filepath.Join(schemaPath, fmt.Sprintf("%s-%s.ref", subject, idString)) + var references []schemaregistry.Reference + if !utils.FileExists(tempStorePath) || !utils.FileExists(tempRefStorePath) { + var schemaInfo schemaregistry.SchemaInfo + if schemaID.ID > 0 { + schemaInfo, err = p.deser.Client.GetBySubjectAndID(subject, schemaID.ID) + if err != nil { + return "", nil, err + } + } else if schemaID.GUID != uuid.Nil { + schemaInfo, err = p.deser.Client.GetByGUID(schemaID.GUID.String()) + if err != nil { + return "", nil, err + } + } + + if err := os.WriteFile(tempStorePath, []byte(schemaInfo.Schema), 0644); err != nil { + return "", nil, err + } + + refBytes, err := json.Marshal(schemaInfo.References) + if err != nil { + return "", nil, err + } + if err := os.WriteFile(tempRefStorePath, refBytes, 0644); err != nil { + return "", nil, err + } + references = schemaInfo.References + } else { + refBytes, err := os.ReadFile(tempRefStorePath) + if err != nil { + return "", nil, err + } + if err := json.Unmarshal(refBytes, &references); err != nil { + return "", nil, err + } + } + + referencePathMap := map[string]string{} + for _, ref := range references { + refTempStorePath := filepath.Join(schemaPath, ref.Name) + if !utils.FileExists(refTempStorePath) { + schemaMetadata, err := p.deser.Client.GetSchemaMetadata(ref.Subject, ref.Version) + if err != nil { + return "", nil, err + } + + var refSchemaInfo schemaregistry.SchemaInfo + if schemaMetadata.ID > 0 { + refSchemaInfo, err = p.deser.Client.GetBySubjectAndID(ref.Subject, schemaMetadata.ID) + if err != nil { + return "", nil, err + } + } else if schemaMetadata.GUID != "" { + refSchemaInfo, err = p.deser.Client.GetByGUID(schemaMetadata.GUID) + if err != nil { + return "", nil, err + } + } + + if err := os.MkdirAll(filepath.Dir(refTempStorePath), 0755); err != nil { + return "", nil, err + } + if err := os.WriteFile(refTempStorePath, []byte(refSchemaInfo.Schema), 0644); err != nil { + return "", nil, err + } + } + referencePathMap[ref.Name] = refTempStorePath + } + + return tempStorePath, referencePathMap, nil +} + func (p *ProtobufDeserializationProvider) Deserialize(topic string, headers []kafka.Header, payload []byte) (string, error) { // Register the protobuf message err := p.deser.ProtoRegistry.RegisterMessage(p.message.ProtoReflect().Type()) diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index 56a327aef2..c859ba3d55 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -5,6 +5,7 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/cli/v4/pkg/errors" ) @@ -75,7 +76,7 @@ type SerializationProvider interface { type DeserializationProvider interface { InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient any) error - LoadSchema(string, map[string]string) error + LoadSchema(string, string, serde.Type, *kafka.Message) error Deserialize(string, []kafka.Header, []byte) (string, error) } diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index 87360b0efc..839186fa94 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -12,7 +12,9 @@ import ( "github.com/stretchr/testify/require" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" ) var tempDir string @@ -313,7 +315,7 @@ func TestJsonSerdesValid(t *testing.T) { err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - err = deserializationProvider.LoadSchema(schemaPath, map[string]string{}) + err = deserializationProvider.LoadSchema("topic1-value", schemaPath, serde.ValueSerde, nil) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) req.Nil(err) @@ -377,7 +379,7 @@ func TestJsonSerdesReference(t *testing.T) { err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - err = deserializationProvider.LoadSchema(schemaPath, map[string]string{}) + err = deserializationProvider.LoadSchema("topic1-value", schemaPath, serde.ValueSerde, nil) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, expectedBytes) req.Nil(err) @@ -539,6 +541,10 @@ func TestJsonSerdesValidWithRuleSet(t *testing.T) { func TestProtobufSerdesValid(t *testing.T) { req := require.New(t) + tempDir, err := os.MkdirTemp(tempDir, "protobuf") + req.NoError(err) + defer os.RemoveAll(tempDir) + schemaString := ` syntax = "proto3"; message Person { @@ -552,7 +558,7 @@ func TestProtobufSerdesValid(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -572,7 +578,7 @@ func TestProtobufSerdesValid(t *testing.T) { deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - err = deserializationProvider.LoadSchema(schemaPath, map[string]string{}) + err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) @@ -582,6 +588,10 @@ func TestProtobufSerdesValid(t *testing.T) { func TestProtobufSerdesReference(t *testing.T) { req := require.New(t) + tempDir, err := os.MkdirTemp(tempDir, "protobuf") + req.NoError(err) + defer os.RemoveAll(tempDir) + referenceString := `syntax = "proto3"; package test; @@ -614,7 +624,7 @@ message Person { expectedString := `{"name":"abc","address":{"city":"LA"},"result":2}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{"address.proto": referencePath}) req.Nil(err) @@ -648,7 +658,7 @@ message Person { deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - err = deserializationProvider.LoadSchema(schemaPath, map[string]string{"address.proto": referencePath}) + err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) str, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) @@ -658,6 +668,10 @@ message Person { func TestProtobufSerdesInvalid(t *testing.T) { req := require.New(t) + tempDir, err := os.MkdirTemp(tempDir, "protobuf") + req.NoError(err) + defer os.RemoveAll(tempDir) + schemaString := ` syntax = "proto3"; message Person { @@ -669,7 +683,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -683,10 +697,14 @@ func TestProtobufSerdesInvalid(t *testing.T) { _, err = client.Register("topic1-value", info, false) req.Nil(err) + exampleString := `{"name":"abc","page":1,"result":2}` + data, err := serializationProvider.Serialize("topic1", exampleString) + req.Nil(err) + deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - err = deserializationProvider.LoadSchema(schemaPath, map[string]string{}) + err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) brokenString := `{"name":"abc` @@ -711,6 +729,10 @@ func TestProtobufSerdesInvalid(t *testing.T) { func TestProtobufSerdesNestedValid(t *testing.T) { req := require.New(t) + tempDir, err := os.MkdirTemp(tempDir, "protobuf") + req.NoError(err) + defer os.RemoveAll(tempDir) + schemaString := ` syntax = "proto3"; message Input { @@ -732,7 +754,7 @@ func TestProtobufSerdesNestedValid(t *testing.T) { expectedString := `{"name":"abc","id":2,"add":{"zip":"123","street":"def"},"phones":{"number":"234"}}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -752,7 +774,7 @@ func TestProtobufSerdesNestedValid(t *testing.T) { deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - err = deserializationProvider.LoadSchema(schemaPath, map[string]string{}) + err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) @@ -761,6 +783,11 @@ func TestProtobufSerdesNestedValid(t *testing.T) { func TestProtobufSerdesValidWithRuleSet(t *testing.T) { req := require.New(t) + + tempDir, err := os.MkdirTemp(tempDir, "protobuf") + req.NoError(err) + defer os.RemoveAll(tempDir) + t.Setenv(localKmsSecretMacro, localKmsSecretValueDefault) schemaString := ` @@ -782,7 +809,7 @@ func TestProtobufSerdesValidWithRuleSet(t *testing.T) { expectedString := `{"name":"abc","page":1,"result":2.5}` serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) req.Nil(err) err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) req.Nil(err) @@ -821,7 +848,7 @@ func TestProtobufSerdesValidWithRuleSet(t *testing.T) { deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) req.Nil(err) - err = deserializationProvider.LoadSchema(schemaPath, map[string]string{}) + err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) actualString, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) diff --git a/pkg/serdes/string_deserialization_provider.go b/pkg/serdes/string_deserialization_provider.go index 11f728af9c..c6c856d412 100644 --- a/pkg/serdes/string_deserialization_provider.go +++ b/pkg/serdes/string_deserialization_provider.go @@ -1,6 +1,9 @@ package serdes -import "github.com/confluentinc/confluent-kafka-go/v2/kafka" +import ( + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" +) type StringDeserializationProvider struct{} @@ -8,7 +11,7 @@ func (s *StringDeserializationProvider) InitDeserializer(_, _, _ string, _ Schem return nil } -func (s *StringDeserializationProvider) LoadSchema(_ string, _ map[string]string) error { +func (s *StringDeserializationProvider) LoadSchema(_ string, _ string, _ serde.Type, _ *kafka.Message) error { return nil } From 28181fa72f43a713523dd08a38d3dc352f5589de Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Fri, 3 Oct 2025 16:55:56 -0700 Subject: [PATCH 08/24] missing from the last commit --- pkg/serdes/protobuf_deserialization_provider.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/serdes/protobuf_deserialization_provider.go b/pkg/serdes/protobuf_deserialization_provider.go index 628c86b04c..21f7ec7212 100644 --- a/pkg/serdes/protobuf_deserialization_provider.go +++ b/pkg/serdes/protobuf_deserialization_provider.go @@ -204,9 +204,17 @@ func (p *ProtobufDeserializationProvider) Deserialize(topic string, headers []ka } // Deserialize the payload into the msgObj - msgObj, err := p.deser.Deserialize(topic, payload) - if err != nil { - return "", fmt.Errorf("failed to deserialize payload: %w", err) + var msgObj interface{} + if len(headers) > 0 { + msgObj, err = p.deser.DeserializeWithHeaders(topic, headers, payload) + if err != nil { + return "", fmt.Errorf("failed to deserialize payload: %w", err) + } + } else { + msgObj, err = p.deser.Deserialize(topic, payload) + if err != nil { + return "", fmt.Errorf("failed to deserialize payload: %w", err) + } } // Use protojson library to marshal the message to JSON in a compact format From 3ea7c0b9ee7c4c19e797314b7e9e624bb08d5ff1 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Fri, 3 Oct 2025 16:59:32 -0700 Subject: [PATCH 09/24] fix lint --- internal/kafka/confluent_kafka.go | 2 -- pkg/serdes/protobuf_deserialization_provider.go | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/kafka/confluent_kafka.go b/internal/kafka/confluent_kafka.go index 4ebb0726b7..4de369fa41 100644 --- a/internal/kafka/confluent_kafka.go +++ b/internal/kafka/confluent_kafka.go @@ -26,8 +26,6 @@ import ( ) const ( - messageOffset = 5 // Schema ID is stored at the [1:5] bytes of a message as meta info (when valid) - // required fields of SASL/oauthbearer configuration principalClaimNameKey = "principalClaimName" principalKey = "principal" diff --git a/pkg/serdes/protobuf_deserialization_provider.go b/pkg/serdes/protobuf_deserialization_provider.go index 21f7ec7212..9c017e77b2 100644 --- a/pkg/serdes/protobuf_deserialization_provider.go +++ b/pkg/serdes/protobuf_deserialization_provider.go @@ -114,7 +114,7 @@ func (p *ProtobufDeserializationProvider) requestSchema(subject, schemaPath stri var idString string if schemaID.ID > 0 { // integer schema ID from the message prefix - idString = strconv.Itoa(int(schemaID.ID)) + idString = strconv.Itoa(schemaID.ID) } else if schemaID.GUID != uuid.Nil { // GUID schema ID from the header idString = schemaID.GUID.String() } From b49807f1413df833d01618177190288d221e204b Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Fri, 3 Oct 2025 18:51:08 -0700 Subject: [PATCH 10/24] some bug fixes --- internal/kafka/command_topic_consume.go | 1 + pkg/serdes/protobuf_deserialization_provider.go | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/kafka/command_topic_consume.go b/internal/kafka/command_topic_consume.go index 375f6786a6..b80faa9b59 100644 --- a/internal/kafka/command_topic_consume.go +++ b/internal/kafka/command_topic_consume.go @@ -446,6 +446,7 @@ func (c *command) consumeOnPrem(cmd *cobra.Command, args []string) error { KeyFormat: keyFormat, ValueFormat: valueFormat, Out: cmd.OutOrStdout(), + Subject: topicName, Topic: topicName, Properties: ConsumerProperties{ Delimiter: delimiter, diff --git a/pkg/serdes/protobuf_deserialization_provider.go b/pkg/serdes/protobuf_deserialization_provider.go index 9c017e77b2..e1d95e3856 100644 --- a/pkg/serdes/protobuf_deserialization_provider.go +++ b/pkg/serdes/protobuf_deserialization_provider.go @@ -17,6 +17,7 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/protobuf" + "github.com/confluentinc/cli/v4/pkg/log" "github.com/confluentinc/cli/v4/pkg/utils" ) @@ -46,7 +47,8 @@ func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srCluste } else if srAuth.Token != "" { serdeClientConfig = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") } else { - return fmt.Errorf("schema registry client authentication should be provider to initialize deserializer") + serdeClientConfig = schemaregistry.NewConfig(srClientUrl) + log.CliLogger.Info("initializing deserializer with no schema registry client authentication") } serdeClientConfig.SslCaLocation = srAuth.CertificateAuthorityPath serdeClientConfig.SslCertificateLocation = srAuth.ClientCertPath From 83f49134828db48501be182499219b2c03f503ef Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Wed, 15 Oct 2025 13:44:03 -0700 Subject: [PATCH 11/24] Add unit tests for header deserialization --- internal/kafka/command_topic_produce.go | 4 +- pkg/serdes/avro_serialization_provider.go | 22 ++- pkg/serdes/double_serialization_provider.go | 12 +- pkg/serdes/integer_serialization_provider.go | 12 +- pkg/serdes/json_serialization_provider.go | 16 +- pkg/serdes/protobuf_serialization_provider.go | 16 +- pkg/serdes/serdes.go | 3 +- pkg/serdes/serdes_test.go | 172 ++++++++++++++++-- pkg/serdes/string_serialization_provider.go | 14 +- 9 files changed, 222 insertions(+), 49 deletions(-) diff --git a/internal/kafka/command_topic_produce.go b/internal/kafka/command_topic_produce.go index 3b5ed7a6c5..99b5d76948 100644 --- a/internal/kafka/command_topic_produce.go +++ b/internal/kafka/command_topic_produce.go @@ -396,7 +396,7 @@ func serializeMessage(_, _ []byte, topic, data, delimiter string, parseKey bool, return nil, nil, err } - serializedKey, err = keySerializer.Serialize(topic, key) + _, serializedKey, err = keySerializer.Serialize(topic, key) if err != nil { return nil, nil, err } @@ -404,7 +404,7 @@ func serializeMessage(_, _ []byte, topic, data, delimiter string, parseKey bool, val = value } - serializedValue, err := valueSerializer.Serialize(topic, val) + _, serializedValue, err := valueSerializer.Serialize(topic, val) if err != nil { return nil, nil, err } diff --git a/pkg/serdes/avro_serialization_provider.go b/pkg/serdes/avro_serialization_provider.go index ddb8632010..5b342eeda8 100644 --- a/pkg/serdes/avro_serialization_provider.go +++ b/pkg/serdes/avro_serialization_provider.go @@ -6,6 +6,7 @@ import ( "github.com/linkedin/goavro/v2" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/cel" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/encryption" @@ -106,25 +107,25 @@ func (a *AvroSerializationProvider) GetSchemaName() string { return avroSchemaBackendName } -func (a *AvroSerializationProvider) Serialize(topic, message string) ([]byte, error) { +func (a *AvroSerializationProvider) Serialize(topic, message string) ([]kafka.Header, []byte, error) { // Step#1: Fetch the schemaInfo based on subject and schema ID schemaObj, err := a.GetSchemaRegistryClient().GetBySubjectAndID(topic+"-"+a.mode, a.schemaId) if err != nil { - return nil, fmt.Errorf("failed to serialize message: %w", err) + return nil, nil, fmt.Errorf("failed to serialize message: %w", err) } // Step#2: Prepare the Codec based on schemaInfo schemaString := schemaObj.Schema codec, err := goavro.NewCodec(schemaString) if err != nil { - return nil, fmt.Errorf("failed to serialize message: %w", err) + return nil, nil, fmt.Errorf("failed to serialize message: %w", err) } // Step#3: Convert the Avro message data in JSON text format into Go native // data types in accordance with the Avro schema supplied when creating the Codec object, _, err := codec.NativeFromTextual([]byte(message)) if err != nil { - return nil, fmt.Errorf("failed to serialize message: %w", err) + return nil, nil, fmt.Errorf("failed to serialize message: %w", err) } // Step#4: Fetch the Go native data object, cast it into generic map for Serialize() @@ -134,13 +135,13 @@ func (a *AvroSerializationProvider) Serialize(topic, message string) ([]byte, er // Passing the Go native object directly could cause issues during ruleSet execution v, ok := object.(map[string]interface{}) if !ok { - return nil, fmt.Errorf("failed to serialize message: unexpected message type assertion result") + return nil, nil, fmt.Errorf("failed to serialize message: unexpected message type assertion result") } - payload, err := a.ser.Serialize(topic, &v) + headers, payload, err := a.ser.SerializeWithHeaders(topic, &v) if err != nil { - return nil, fmt.Errorf("failed to serialize message: %w", err) + return nil, nil, fmt.Errorf("failed to serialize message: %w", err) } - return payload, nil + return headers, payload, nil } // GetSchemaRegistryClient This getter function is used in mock testing @@ -148,3 +149,8 @@ func (a *AvroSerializationProvider) Serialize(topic, message string) ([]byte, er func (a *AvroSerializationProvider) GetSchemaRegistryClient() schemaregistry.Client { return a.ser.Client } + +// For unit testing purposes +func (a *AvroSerializationProvider) SetSchemaIDSerializer(headerSerializer serde.SchemaIDSerializerFunc) { + a.ser.SchemaIDSerializer = headerSerializer +} diff --git a/pkg/serdes/double_serialization_provider.go b/pkg/serdes/double_serialization_provider.go index 1d58db4310..93bcfcf287 100644 --- a/pkg/serdes/double_serialization_provider.go +++ b/pkg/serdes/double_serialization_provider.go @@ -5,7 +5,9 @@ import ( "math" "strconv" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" ) type DoubleSerializationProvider struct{} @@ -18,16 +20,16 @@ func (DoubleSerializationProvider) LoadSchema(_ string, _ map[string]string) err return nil } -func (DoubleSerializationProvider) Serialize(_, message string) ([]byte, error) { +func (DoubleSerializationProvider) Serialize(_, message string) ([]kafka.Header, []byte, error) { f, err := strconv.ParseFloat(message, 64) if err != nil { - return nil, err + return nil, nil, err } buf := make([]byte, 8) binary.LittleEndian.PutUint64(buf, math.Float64bits(f)) - return buf, nil + return nil, buf, nil } func (DoubleSerializationProvider) GetSchemaName() string { @@ -37,3 +39,7 @@ func (DoubleSerializationProvider) GetSchemaName() string { func (DoubleSerializationProvider) GetSchemaRegistryClient() schemaregistry.Client { return nil } + +func (DoubleSerializationProvider) SetSchemaIDSerializer(_ serde.SchemaIDSerializerFunc) { + return +} diff --git a/pkg/serdes/integer_serialization_provider.go b/pkg/serdes/integer_serialization_provider.go index 4e24954db6..a4b8ea953f 100644 --- a/pkg/serdes/integer_serialization_provider.go +++ b/pkg/serdes/integer_serialization_provider.go @@ -4,7 +4,9 @@ import ( "encoding/binary" "strconv" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" ) type IntegerSerializationProvider struct{} @@ -17,16 +19,16 @@ func (IntegerSerializationProvider) LoadSchema(_ string, _ map[string]string) er return nil } -func (IntegerSerializationProvider) Serialize(_, message string) ([]byte, error) { +func (IntegerSerializationProvider) Serialize(_, message string) ([]kafka.Header, []byte, error) { i, err := strconv.ParseUint(message, 10, 32) if err != nil { - return nil, err + return nil, nil, err } buf := make([]byte, 4) binary.LittleEndian.PutUint32(buf, uint32(i)) - return buf, nil + return nil, buf, nil } func (IntegerSerializationProvider) GetSchemaName() string { @@ -36,3 +38,7 @@ func (IntegerSerializationProvider) GetSchemaName() string { func (IntegerSerializationProvider) GetSchemaRegistryClient() schemaregistry.Client { return nil } + +func (IntegerSerializationProvider) SetSchemaIDSerializer(_ serde.SchemaIDSerializerFunc) { + return +} diff --git a/pkg/serdes/json_serialization_provider.go b/pkg/serdes/json_serialization_provider.go index 1e093b14df..54a4a03ade 100644 --- a/pkg/serdes/json_serialization_provider.go +++ b/pkg/serdes/json_serialization_provider.go @@ -5,6 +5,7 @@ import ( "fmt" "os" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/cel" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/encryption" @@ -99,19 +100,19 @@ func (j *JsonSerializationProvider) GetSchemaName() string { return jsonSchemaBackendName } -func (j *JsonSerializationProvider) Serialize(topic, message string) ([]byte, error) { +func (j *JsonSerializationProvider) Serialize(topic, message string) ([]kafka.Header, []byte, error) { // Convert the plain string message from customer type-in in CLI terminal into generic map var result map[string]any err := json.Unmarshal([]byte(message), &result) if err != nil { - return nil, fmt.Errorf("failed to convert message string into generic map for serialization: %w", err) + return nil, nil, fmt.Errorf("failed to convert message string into generic map for serialization: %w", err) } - payload, err := j.ser.Serialize(topic, &result) + headers, payload, err := j.ser.SerializeWithHeaders(topic, &result) if err != nil { - return nil, fmt.Errorf("failed to serialize message: %w", err) + return nil, nil, fmt.Errorf("failed to serialize message: %w", err) } - return payload, nil + return headers, payload, nil } // GetSchemaRegistryClient This getter function is used in mock testing @@ -119,3 +120,8 @@ func (j *JsonSerializationProvider) Serialize(topic, message string) ([]byte, er func (j *JsonSerializationProvider) GetSchemaRegistryClient() schemaregistry.Client { return j.ser.Client } + +// For unit testing purposes +func (j *JsonSerializationProvider) SetSchemaIDSerializer(headerSerializer serde.SchemaIDSerializerFunc) { + j.ser.SchemaIDSerializer = headerSerializer +} diff --git a/pkg/serdes/protobuf_serialization_provider.go b/pkg/serdes/protobuf_serialization_provider.go index ddbe7ed154..4e6e9c4179 100644 --- a/pkg/serdes/protobuf_serialization_provider.go +++ b/pkg/serdes/protobuf_serialization_provider.go @@ -14,6 +14,7 @@ import ( gproto "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/dynamicpb" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/cel" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/encryption" @@ -123,17 +124,17 @@ func (p *ProtobufSerializationProvider) GetSchemaName() string { return protobufSchemaBackendName } -func (p *ProtobufSerializationProvider) Serialize(topic, message string) ([]byte, error) { +func (p *ProtobufSerializationProvider) Serialize(topic, message string) ([]kafka.Header, []byte, error) { // Need to materialize the message into the schema of p.message if err := protojson.Unmarshal([]byte(message), p.message); err != nil { - return nil, fmt.Errorf(errors.ProtoDocumentInvalidErrorMsg) + return nil, nil, fmt.Errorf(errors.ProtoDocumentInvalidErrorMsg) } - payload, err := p.ser.Serialize(topic, p.message) + headers, payload, err := p.ser.SerializeWithHeaders(topic, p.message) if err != nil { - return nil, fmt.Errorf("failed to serialize message: %w", err) + return nil, nil, fmt.Errorf("failed to serialize message: %w", err) } - return payload, nil + return headers, payload, nil } func parseMessage(schemaPath string, referencePathMap map[string]string) (gproto.Message, error) { @@ -224,3 +225,8 @@ func copyBuiltInProtoFiles(destinationDir string) error { return nil }) } + +// For unit testing purposes +func (p *ProtobufSerializationProvider) SetSchemaIDSerializer(headerSerializer serde.SchemaIDSerializerFunc) { + p.ser.SchemaIDSerializer = headerSerializer +} diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index c859ba3d55..47bb66c6b0 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -69,9 +69,10 @@ type SchemaRegistryAuth struct { type SerializationProvider interface { InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error LoadSchema(string, map[string]string) error - Serialize(string, string) ([]byte, error) + Serialize(string, string) ([]kafka.Header, []byte, error) GetSchemaName() string GetSchemaRegistryClient() schemaregistry.Client + SetSchemaIDSerializer(headerSerializer serde.SchemaIDSerializerFunc) // For unit testing purposes } type DeserializationProvider interface { diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index 839186fa94..5a42b8d028 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -74,7 +74,7 @@ func TestStringSerdes(t *testing.T) { serializationProvider, _ := GetSerializationProvider(stringSchemaName) expectedBytes := []byte{115, 111, 109, 101, 83, 116, 114, 105, 110, 103} - data, err := serializationProvider.Serialize("", "someString") + _, data, err := serializationProvider.Serialize("", "someString") req.Nil(err) result := bytes.Compare(data, expectedBytes) req.Zero(result) @@ -113,7 +113,7 @@ func TestAvroSerdesValid(t *testing.T) { _, err = client.Register("topic1-value", info, false) req.Nil(err) - data, err := serializationProvider.Serialize("topic1", expectedString) + _, data, err := serializationProvider.Serialize("topic1", expectedString) req.Nil(err) result := bytes.Compare(expectedBytes, data) @@ -130,6 +130,48 @@ func TestAvroSerdesValid(t *testing.T) { req.Equal(expectedString, actualString) } +func TestAvroSerdesValidWithHeaders(t *testing.T) { + req := require.New(t) + + schemaString := `{"type":"record","name":"myRecord","fields":[{"name":"f1","type":"int"}]}` + schemaPath := filepath.Join(tempDir, "avro-schema.txt") + req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) + + expectedString := `{"f1":123}` + expectedBytes := []byte{246, 1} + + // Initialize the mock serializer and use latest schemaId + serializationProvider, _ := GetSerializationProvider(avroSchemaName) + err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + req.Nil(err) + serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) + + // Explicitly register the schema to have a schemaId with mock SR client + client := serializationProvider.GetSchemaRegistryClient() + info := schemaregistry.SchemaInfo{ + Schema: schemaString, + SchemaType: "AVRO", + } + _, err = client.Register("topic1-value", info, false) + req.Nil(err) + + headers, data, err := serializationProvider.Serialize("topic1", expectedString) + req.Nil(err) + + result := bytes.Compare(expectedBytes, data) + req.Zero(result) + + // Initialize the mock deserializer + deserializationProvider, _ := GetDeserializationProvider(avroSchemaName) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + req.Nil(err) + + actualString, err := deserializationProvider.Deserialize("topic1", headers, data) + req.Nil(err) + + req.Equal(expectedString, actualString) +} + func TestAvroSerdesInvalid(t *testing.T) { req := require.New(t) @@ -163,14 +205,14 @@ func TestAvroSerdesInvalid(t *testing.T) { brokenString := `{"f1"` brokenBytes := []byte{0, 0, 0, 0, 1, 6, 97} - _, err = serializationProvider.Serialize("topic1", brokenString) + _, _, err = serializationProvider.Serialize("topic1", brokenString) req.Regexp(`cannot decode textual record "myRecord": short buffer`, err) _, err = deserializationProvider.Deserialize("topic1", nil, brokenBytes) req.Regexp("unexpected EOF$", err) invalidString := `{"f2": "abc"}` - _, err = serializationProvider.Serialize("topic1", invalidString) + _, _, err = serializationProvider.Serialize("topic1", invalidString) req.Regexp(`cannot decode textual map: cannot determine codec: "f2"$`, err) } @@ -201,7 +243,7 @@ func TestAvroSerdesNestedValid(t *testing.T) { _, err = client.Register("topic1-value", info, false) req.Nil(err) - data, err := serializationProvider.Serialize("topic1", expectedString) + _, data, err := serializationProvider.Serialize("topic1", expectedString) req.Nil(err) result := bytes.Compare(expectedBytes, data) @@ -264,7 +306,7 @@ func TestAvroSerdesValidWithRuleSet(t *testing.T) { _, err = client.Register("topic1-value", info, false) req.Nil(err) - data, err := serializationProvider.Serialize("topic1", expectedString) + _, data, err := serializationProvider.Serialize("topic1", expectedString) req.Nil(err) // Initialize the mock deserializer @@ -304,7 +346,7 @@ func TestJsonSerdesValid(t *testing.T) { _, err = client.Register("topic1-value", info, false) req.Nil(err) - data, err := serializationProvider.Serialize("topic1", expectedString) + _, data, err := serializationProvider.Serialize("topic1", expectedString) req.Nil(err) result := bytes.Compare(expectedBytes, data) @@ -322,6 +364,47 @@ func TestJsonSerdesValid(t *testing.T) { req.Equal(expectedString, actualString) } +func TestJsonSerdesValidWithHeaders(t *testing.T) { + req := require.New(t) + + schemaString := `{"type":"object","properties":{"f1":{"type":"string"}},"required":["f1"]}` + schemaPath := filepath.Join(tempDir, "json-schema.json") + req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) + + expectedString := `{"f1":"asd"}` + expectedBytes := []byte{123, 34, 102, 49, 34, 58, 34, 97, 115, 100, 34, 125} + + // Initialize the mock serializer and use latest schemaId + serializationProvider, _ := GetSerializationProvider(jsonSchemaName) + err := serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + req.Nil(err) + serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) + + // Explicitly register the schema to have a schemaId with mock SR client + client := serializationProvider.GetSchemaRegistryClient() + info := schemaregistry.SchemaInfo{ + Schema: schemaString, + SchemaType: "JSON", + } + _, err = client.Register("topic1-value", info, false) + req.Nil(err) + + headers, data, err := serializationProvider.Serialize("topic1", expectedString) + req.Nil(err) + + result := bytes.Compare(expectedBytes, data) + req.Zero(result) + + // Initialize the mock deserializer + deserializationProvider, _ := GetDeserializationProvider(jsonSchemaName) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + req.Nil(err) + + actualString, err := deserializationProvider.Deserialize("topic1", headers, expectedBytes) + req.Nil(err) + req.Equal(expectedString, actualString) +} + func TestJsonSerdesReference(t *testing.T) { req := require.New(t) @@ -368,7 +451,7 @@ func TestJsonSerdesReference(t *testing.T) { _, err = client.Register("topic1-value", info, false) req.Nil(err) - data, err := serializationProvider.Serialize("topic1", expectedString) + _, data, err := serializationProvider.Serialize("topic1", expectedString) req.Nil(err) result := bytes.Compare(expectedBytes, data) @@ -417,7 +500,7 @@ func TestJsonSerdesInvalid(t *testing.T) { brokenString := `{"f1":` brokenBytes := []byte{123, 34, 102, 50} - _, err = serializationProvider.Serialize("topic1", brokenString) + _, _, err = serializationProvider.Serialize("topic1", brokenString) req.Regexp("unexpected end of JSON input$", err) _, err = deserializationProvider.Deserialize("topic1", nil, brokenBytes) @@ -426,7 +509,7 @@ func TestJsonSerdesInvalid(t *testing.T) { invalidString := `{"f2": "abc"}` invalidBytes := []byte{123, 34, 102, 50, 34, 58, 34, 97, 115, 100, 34, 125} - _, err = serializationProvider.Serialize("topic1", invalidString) + _, _, err = serializationProvider.Serialize("topic1", invalidString) req.Regexp("missing properties: 'f1'$", err) _, err = deserializationProvider.Deserialize("topic1", nil, invalidBytes) @@ -462,7 +545,7 @@ func TestJsonSerdesNestedValid(t *testing.T) { _, err = client.Register("topic1-value", info, false) req.Nil(err) - data, err := serializationProvider.Serialize("topic1", expectedString) + _, data, err := serializationProvider.Serialize("topic1", expectedString) req.Nil(err) result := bytes.Compare(expectedBytes, data) @@ -524,7 +607,7 @@ func TestJsonSerdesValidWithRuleSet(t *testing.T) { _, err = client.Register("topic1-value", info, false) req.Nil(err) - data, err := serializationProvider.Serialize("topic1", expectedString) + _, data, err := serializationProvider.Serialize("topic1", expectedString) req.Nil(err) // Initialize the mock deserializer @@ -572,7 +655,7 @@ func TestProtobufSerdesValid(t *testing.T) { _, err = client.Register("topic1-value", info, false) req.Nil(err) - data, err := serializationProvider.Serialize("topic1", expectedString) + _, data, err := serializationProvider.Serialize("topic1", expectedString) req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) @@ -585,6 +668,57 @@ func TestProtobufSerdesValid(t *testing.T) { req.JSONEq(expectedString, actualString) } +func TestProtobufSerdesValidWithHeaders(t *testing.T) { + req := require.New(t) + + tempDir, err := os.MkdirTemp(tempDir, "protobuf") + req.NoError(err) + defer os.RemoveAll(tempDir) + + schemaString := ` + syntax = "proto3"; + message Person { + string name = 1; + int32 page = 2; + double result = 3; + }` + schemaPath := filepath.Join(tempDir, "person-schema.proto") + req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) + + expectedString := `{"name":"abc","page":1,"result":2.5}` + + serializationProvider, _ := GetSerializationProvider(protobufSchemaName) + err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + req.Nil(err) + serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) + err = serializationProvider.LoadSchema(schemaPath, map[string]string{}) + req.Nil(err) + + // Explicitly register the schema to have a schemaId with mock SR client + client := serializationProvider.GetSchemaRegistryClient() + info := schemaregistry.SchemaInfo{ + Schema: schemaString, + SchemaType: "PROTOBUF", + } + _, err = client.Register("topic1-value", info, false) + req.Nil(err) + + headers, data, err := serializationProvider.Serialize("topic1", expectedString) + req.Nil(err) + + deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + req.Nil(err) + err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{ + Value: data, + Headers: headers, + }) + req.Nil(err) + actualString, err := deserializationProvider.Deserialize("topic1", headers, data) + req.Nil(err) + req.JSONEq(expectedString, actualString) +} + func TestProtobufSerdesReference(t *testing.T) { req := require.New(t) @@ -652,7 +786,7 @@ message Person { _, err = client.Register("topic1-value", info, false) req.Nil(err) - data, err := serializationProvider.Serialize("topic1", expectedString) + _, data, err := serializationProvider.Serialize("topic1", expectedString) req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) @@ -698,7 +832,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { req.Nil(err) exampleString := `{"name":"abc","page":1,"result":2}` - data, err := serializationProvider.Serialize("topic1", exampleString) + _, data, err := serializationProvider.Serialize("topic1", exampleString) req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) @@ -710,7 +844,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { brokenString := `{"name":"abc` brokenBytes := []byte{0, 10, 3, 97, 98, 99, 16} - _, err = serializationProvider.Serialize("topic1", brokenString) + _, _, err = serializationProvider.Serialize("topic1", brokenString) req.EqualError(err, "the protobuf document is invalid") _, err = deserializationProvider.Deserialize("topic1", nil, brokenBytes) @@ -719,7 +853,7 @@ func TestProtobufSerdesInvalid(t *testing.T) { invalidString := `{"page":"abc"}` invalidBytes := []byte{0, 12, 3, 97, 98, 99, 16, 1, 24, 2} - _, err = serializationProvider.Serialize("topic1", invalidString) + _, _, err = serializationProvider.Serialize("topic1", invalidString) req.EqualError(err, "the protobuf document is invalid") _, err = deserializationProvider.Deserialize("topic1", nil, invalidBytes) @@ -768,7 +902,7 @@ func TestProtobufSerdesNestedValid(t *testing.T) { _, err = client.Register("topic1-value", info, false) req.Nil(err) - data, err := serializationProvider.Serialize("topic1", expectedString) + _, data, err := serializationProvider.Serialize("topic1", expectedString) req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) @@ -842,7 +976,7 @@ func TestProtobufSerdesValidWithRuleSet(t *testing.T) { _, err = client.Register("topic1-value", info, false) req.Nil(err) - data, err := serializationProvider.Serialize("topic1", expectedString) + _, data, err := serializationProvider.Serialize("topic1", expectedString) req.Nil(err) deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) diff --git a/pkg/serdes/string_serialization_provider.go b/pkg/serdes/string_serialization_provider.go index b9e990c1df..76eaa6346d 100644 --- a/pkg/serdes/string_serialization_provider.go +++ b/pkg/serdes/string_serialization_provider.go @@ -1,6 +1,10 @@ package serdes -import "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" +import ( + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" +) type StringSerializationProvider struct{} @@ -12,8 +16,8 @@ func (s *StringSerializationProvider) LoadSchema(_ string, _ map[string]string) return nil } -func (s *StringSerializationProvider) Serialize(_, message string) ([]byte, error) { - return []byte(message), nil +func (s *StringSerializationProvider) Serialize(_, message string) ([]kafka.Header, []byte, error) { + return nil, []byte(message), nil } func (s *StringSerializationProvider) GetSchemaName() string { @@ -23,3 +27,7 @@ func (s *StringSerializationProvider) GetSchemaName() string { func (s *StringSerializationProvider) GetSchemaRegistryClient() schemaregistry.Client { return nil } + +func (s *StringSerializationProvider) SetSchemaIDSerializer(_ serde.SchemaIDSerializerFunc) { + return +} From 47f886eb09c91fa6fd6814ecf50395ed013bef2b Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Wed, 15 Oct 2025 14:37:06 -0700 Subject: [PATCH 12/24] fix linter --- pkg/serdes/double_serialization_provider.go | 4 +--- pkg/serdes/integer_serialization_provider.go | 4 +--- pkg/serdes/string_serialization_provider.go | 4 +--- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/pkg/serdes/double_serialization_provider.go b/pkg/serdes/double_serialization_provider.go index 93bcfcf287..e17c17da25 100644 --- a/pkg/serdes/double_serialization_provider.go +++ b/pkg/serdes/double_serialization_provider.go @@ -40,6 +40,4 @@ func (DoubleSerializationProvider) GetSchemaRegistryClient() schemaregistry.Clie return nil } -func (DoubleSerializationProvider) SetSchemaIDSerializer(_ serde.SchemaIDSerializerFunc) { - return -} +func (DoubleSerializationProvider) SetSchemaIDSerializer(_ serde.SchemaIDSerializerFunc) {} diff --git a/pkg/serdes/integer_serialization_provider.go b/pkg/serdes/integer_serialization_provider.go index a4b8ea953f..12b8de571b 100644 --- a/pkg/serdes/integer_serialization_provider.go +++ b/pkg/serdes/integer_serialization_provider.go @@ -39,6 +39,4 @@ func (IntegerSerializationProvider) GetSchemaRegistryClient() schemaregistry.Cli return nil } -func (IntegerSerializationProvider) SetSchemaIDSerializer(_ serde.SchemaIDSerializerFunc) { - return -} +func (IntegerSerializationProvider) SetSchemaIDSerializer(_ serde.SchemaIDSerializerFunc) {} diff --git a/pkg/serdes/string_serialization_provider.go b/pkg/serdes/string_serialization_provider.go index 76eaa6346d..a88000ed6a 100644 --- a/pkg/serdes/string_serialization_provider.go +++ b/pkg/serdes/string_serialization_provider.go @@ -28,6 +28,4 @@ func (s *StringSerializationProvider) GetSchemaRegistryClient() schemaregistry.C return nil } -func (s *StringSerializationProvider) SetSchemaIDSerializer(_ serde.SchemaIDSerializerFunc) { - return -} +func (s *StringSerializationProvider) SetSchemaIDSerializer(_ serde.SchemaIDSerializerFunc) {} From a454cc44d4a9e731c92a9ed0c9f5830eb7ea703e Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Wed, 15 Oct 2025 15:48:54 -0700 Subject: [PATCH 13/24] test protobuf requestSchema path where reference files aren't already there --- pkg/serdes/serdes_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index 5a42b8d028..1ce1e418bf 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -797,6 +797,15 @@ message Person { str, err := deserializationProvider.Deserialize("topic1", nil, data) req.Nil(err) req.JSONEq(str, expectedString) + + // Deserialize again but without the reference file already stored locally + err = os.Remove(referencePath) + req.Nil(err) + err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) + req.Nil(err) + str, err = deserializationProvider.Deserialize("topic1", nil, data) + req.NotNil(err) + req.JSONEq(str, expectedString) } func TestProtobufSerdesInvalid(t *testing.T) { From 5a633ee1fcd637b914c194d79ccb0f119be94b17 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Wed, 15 Oct 2025 16:33:04 -0700 Subject: [PATCH 14/24] fix typo from debugging --- pkg/serdes/serdes_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index 1ce1e418bf..e37fc169f4 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -804,7 +804,7 @@ message Person { err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data}) req.Nil(err) str, err = deserializationProvider.Deserialize("topic1", nil, data) - req.NotNil(err) + req.Nil(err) req.JSONEq(str, expectedString) } From 50a6826f3eb9a99cd02b083b77dcea7b7bb7bfe1 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Thu, 16 Oct 2025 10:23:24 -0700 Subject: [PATCH 15/24] reduce code duplication and test serdes sr config --- pkg/serdes/avro_deserialization_provider.go | 35 +++------------ pkg/serdes/double_deserialization_provider.go | 7 ++- .../integer_deserialization_provider.go | 7 ++- pkg/serdes/json_deserialization_provider.go | 33 +++----------- .../protobuf_deserialization_provider.go | 33 +++----------- pkg/serdes/serdes.go | 25 ++++++++++- pkg/serdes/serdes_test.go | 43 +++++++++++++++++++ pkg/serdes/string_deserialization_provider.go | 7 ++- 8 files changed, 103 insertions(+), 87 deletions(-) diff --git a/pkg/serdes/avro_deserialization_provider.go b/pkg/serdes/avro_deserialization_provider.go index 84689eadff..0b2e6c8221 100644 --- a/pkg/serdes/avro_deserialization_provider.go +++ b/pkg/serdes/avro_deserialization_provider.go @@ -9,44 +9,18 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avrov2" - - "github.com/confluentinc/cli/v4/pkg/log" ) type AvroDeserializationProvider struct { deser *avrov2.Deserializer } -func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient any) error { +func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache // If existingClient is nil (which is normal case), ser and deser don't have to share the same client. - var serdeClient schemaregistry.Client - var err error - var ok bool - - if existingClient != nil { - serdeClient, ok = existingClient.(schemaregistry.Client) - if !ok { - return fmt.Errorf("failed to cast existing schema registry client to expected type") - } - } else { - var serdeClientConfig *schemaregistry.Config - if srAuth.ApiKey != "" && srAuth.ApiSecret != "" { - serdeClientConfig = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret) - } else if srAuth.Token != "" { - serdeClientConfig = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") - } else { - serdeClientConfig = schemaregistry.NewConfig(srClientUrl) - log.CliLogger.Info("initializing deserializer with no schema registry client authentication") - } - serdeClientConfig.SslCaLocation = srAuth.CertificateAuthorityPath - serdeClientConfig.SslCertificateLocation = srAuth.ClientCertPath - serdeClientConfig.SslKeyLocation = srAuth.ClientKeyPath - serdeClient, err = schemaregistry.NewClient(serdeClientConfig) - } - + serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, existingClient) if err != nil { return fmt.Errorf("failed to create deserializer-specific Schema Registry client: %w", err) } @@ -71,7 +45,6 @@ func (a *AvroDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, } deser, err := avrov2.NewDeserializer(serdeClient, serdeType, serdeConfig) - if err != nil { return fmt.Errorf("failed to initialize AVRO deserializer: %w", err) } @@ -104,3 +77,7 @@ func (a *AvroDeserializationProvider) Deserialize(topic string, headers []kafka. return string(jsonBytes), nil } + +func (a *AvroDeserializationProvider) GetSchemaRegistryClient() schemaregistry.Client { + return a.deser.Client +} diff --git a/pkg/serdes/double_deserialization_provider.go b/pkg/serdes/double_deserialization_provider.go index f04c368be2..d24f2d3e67 100644 --- a/pkg/serdes/double_deserialization_provider.go +++ b/pkg/serdes/double_deserialization_provider.go @@ -6,12 +6,13 @@ import ( "math" "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" ) type DoubleDeserializationProvider struct{} -func (DoubleDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ any) error { +func (DoubleDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } @@ -31,3 +32,7 @@ func (DoubleDeserializationProvider) Deserialize(_ string, _ []kafka.Header, dat message := fmt.Sprintf("%f", math.Float64frombits(binary.LittleEndian.Uint64(data))) return message, nil } + +func (DoubleDeserializationProvider) GetSchemaRegistryClient() schemaregistry.Client { + return nil +} diff --git a/pkg/serdes/integer_deserialization_provider.go b/pkg/serdes/integer_deserialization_provider.go index fd5a8420da..564b3ae724 100644 --- a/pkg/serdes/integer_deserialization_provider.go +++ b/pkg/serdes/integer_deserialization_provider.go @@ -5,12 +5,13 @@ import ( "fmt" "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" ) type IntegerDeserializationProvider struct{} -func (IntegerDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ any) error { +func (IntegerDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } @@ -30,3 +31,7 @@ func (IntegerDeserializationProvider) Deserialize(_ string, _ []kafka.Header, da message := fmt.Sprintf("%d", binary.LittleEndian.Uint32(data)) return message, nil } + +func (IntegerDeserializationProvider) GetSchemaRegistryClient() schemaregistry.Client { + return nil +} diff --git a/pkg/serdes/json_deserialization_provider.go b/pkg/serdes/json_deserialization_provider.go index 53d154f442..713f45bfde 100644 --- a/pkg/serdes/json_deserialization_provider.go +++ b/pkg/serdes/json_deserialization_provider.go @@ -9,43 +9,18 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/jsonschema" - - "github.com/confluentinc/cli/v4/pkg/log" ) type JsonDeserializationProvider struct { deser *jsonschema.Deserializer } -func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient any) error { +func (j *JsonDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache // If existingClient is nil (which is normal case), ser and deser don't have to share the same client. - var serdeClient schemaregistry.Client - var err error - var ok bool - if existingClient != nil { - serdeClient, ok = existingClient.(schemaregistry.Client) - if !ok { - return fmt.Errorf("failed to cast existing schema registry client to expected type") - } - } else { - var serdeClientConfig *schemaregistry.Config - if srAuth.ApiKey != "" && srAuth.ApiSecret != "" { - serdeClientConfig = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret) - } else if srAuth.Token != "" { - serdeClientConfig = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") - } else { - serdeClientConfig = schemaregistry.NewConfig(srClientUrl) - log.CliLogger.Info("initializing deserializer with no schema registry client authentication") - } - serdeClientConfig.SslCaLocation = srAuth.CertificateAuthorityPath - serdeClientConfig.SslCertificateLocation = srAuth.ClientCertPath - serdeClientConfig.SslKeyLocation = srAuth.ClientKeyPath - serdeClient, err = schemaregistry.NewClient(serdeClientConfig) - } - + serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, existingClient) if err != nil { return fmt.Errorf("failed to create deserializer-specific Schema Registry client: %w", err) } @@ -103,3 +78,7 @@ func (j *JsonDeserializationProvider) Deserialize(topic string, headers []kafka. return string(jsonBytes), nil } + +func (j *JsonDeserializationProvider) GetSchemaRegistryClient() schemaregistry.Client { + return j.deser.Client +} diff --git a/pkg/serdes/protobuf_deserialization_provider.go b/pkg/serdes/protobuf_deserialization_provider.go index e1d95e3856..b2b70b82b8 100644 --- a/pkg/serdes/protobuf_deserialization_provider.go +++ b/pkg/serdes/protobuf_deserialization_provider.go @@ -17,7 +17,6 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/protobuf" - "github.com/confluentinc/cli/v4/pkg/log" "github.com/confluentinc/cli/v4/pkg/utils" ) @@ -26,36 +25,12 @@ type ProtobufDeserializationProvider struct { message gproto.Message } -func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient any) error { +func (p *ProtobufDeserializationProvider) InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error { // Note: Now Serializer/Deserializer are tightly coupled with Schema Registry // If existingClient is not nil, we should share this client between ser and deser. // As the shared client is referred as mock client to store the same set of schemas in cache // If existingClient is nil (which is normal case), ser and deser don't have to share the same client. - var serdeClient schemaregistry.Client - var err error - var ok bool - - if existingClient != nil { - serdeClient, ok = existingClient.(schemaregistry.Client) - if !ok { - return fmt.Errorf("failed to cast existing schema registry client to expected type") - } - } else { - var serdeClientConfig *schemaregistry.Config - if srAuth.ApiKey != "" && srAuth.ApiSecret != "" { - serdeClientConfig = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret) - } else if srAuth.Token != "" { - serdeClientConfig = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") - } else { - serdeClientConfig = schemaregistry.NewConfig(srClientUrl) - log.CliLogger.Info("initializing deserializer with no schema registry client authentication") - } - serdeClientConfig.SslCaLocation = srAuth.CertificateAuthorityPath - serdeClientConfig.SslCertificateLocation = srAuth.ClientCertPath - serdeClientConfig.SslKeyLocation = srAuth.ClientKeyPath - serdeClient, err = schemaregistry.NewClient(serdeClientConfig) - } - + serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, existingClient) if err != nil { return fmt.Errorf("failed to create deserializer-specific Schema Registry client: %w", err) } @@ -233,3 +208,7 @@ func (p *ProtobufDeserializationProvider) Deserialize(topic string, headers []ka return string(jsonBytes), nil } + +func (p *ProtobufDeserializationProvider) GetSchemaRegistryClient() schemaregistry.Client { + return p.deser.Client +} diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index 47bb66c6b0..0bcd116c9a 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -8,6 +8,7 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/log" ) var DekAlgorithms = []string{ @@ -76,9 +77,10 @@ type SerializationProvider interface { } type DeserializationProvider interface { - InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient any) error + InitDeserializer(srClientUrl, srClusterId, mode string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) error LoadSchema(string, string, serde.Type, *kafka.Message) error Deserialize(string, []kafka.Header, []byte) (string, error) + GetSchemaRegistryClient() schemaregistry.Client } func FormatTranslation(backendValueFormat string) (string, error) { @@ -137,3 +139,24 @@ func GetDeserializationProvider(valueFormat string) (DeserializationProvider, er func IsProtobufSchema(valueFormat string) bool { return valueFormat == protobufSchemaName } + +func initSchemaRegistryClient(srClientUrl, srClusterId string, srAuth SchemaRegistryAuth, existingClient schemaregistry.Client) (schemaregistry.Client, error) { + if existingClient != nil { + return existingClient, nil + } + + var serdeClientConfig *schemaregistry.Config + if srAuth.ApiKey != "" && srAuth.ApiSecret != "" { + serdeClientConfig = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret) + } else if srAuth.Token != "" { + serdeClientConfig = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") + } else { + serdeClientConfig = schemaregistry.NewConfig(srClientUrl) + log.CliLogger.Info("initializing deserializer with no schema registry client authentication") + } + serdeClientConfig.SslCaLocation = srAuth.CertificateAuthorityPath + serdeClientConfig.SslCertificateLocation = srAuth.ClientCertPath + serdeClientConfig.SslKeyLocation = srAuth.ClientKeyPath + + return schemaregistry.NewClient(serdeClientConfig) +} diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index e37fc169f4..739641edce 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -40,6 +40,49 @@ func TestMain(m *testing.M) { os.Exit(code) } +func TestInitSchemaRegistryClient(t *testing.T) { + req := require.New(t) + + // Basic Auth + provider, err := GetDeserializationProvider(avroSchemaName) + req.Nil(err) + err = provider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{ + ApiKey: "key", + ApiSecret: "secret", + }, nil) + req.Nil(err) + config := provider.GetSchemaRegistryClient().Config() + req.Equal(config.SchemaRegistryURL, mockClientUrl) + req.Equal(config.BasicAuthUserInfo, "key:secret") + req.Equal(config.BasicAuthCredentialsSource, "USER_INFO") + + // Bearer Auth + serde.GlobalRuleRegistry().Clear() + err = provider.InitDeserializer(mockClientUrl, "lsrc-abc123", "value", SchemaRegistryAuth{Token: "token"}, nil) + req.Nil(err) + config = provider.GetSchemaRegistryClient().Config() + req.Equal(config.SchemaRegistryURL, mockClientUrl) + req.Equal(config.BearerAuthToken, "token") + req.Equal(config.BearerAuthLogicalCluster, "lsrc-abc123") + req.Equal(config.BearerAuthCredentialsSource, "STATIC_TOKEN") + + // No Auth (and also mTLS) + serde.GlobalRuleRegistry().Clear() + err = provider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{ + CertificateAuthorityPath: "ca.cert", + ClientCertPath: "client.crt", + ClientKeyPath: "client.key", + }, nil) + req.Nil(err) + config = provider.GetSchemaRegistryClient().Config() + req.Equal(config.SchemaRegistryURL, mockClientUrl) + req.Equal(config.BasicAuthCredentialsSource, "") + req.Equal(config.BearerAuthCredentialsSource, "") + req.Equal(config.SslCaLocation, "ca.cert") + req.Equal(config.SslCertificateLocation, "client.crt") + req.Equal(config.SslKeyLocation, "client.key") +} + func TestGetSerializationProvider(t *testing.T) { req := require.New(t) valueFormats := []string{avroSchemaName, jsonSchemaName, protobufSchemaName} diff --git a/pkg/serdes/string_deserialization_provider.go b/pkg/serdes/string_deserialization_provider.go index c6c856d412..0d1891a424 100644 --- a/pkg/serdes/string_deserialization_provider.go +++ b/pkg/serdes/string_deserialization_provider.go @@ -2,12 +2,13 @@ package serdes import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry" "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" ) type StringDeserializationProvider struct{} -func (s *StringDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ any) error { +func (s *StringDeserializationProvider) InitDeserializer(_, _, _ string, _ SchemaRegistryAuth, _ schemaregistry.Client) error { return nil } @@ -19,3 +20,7 @@ func (s *StringDeserializationProvider) Deserialize(_ string, _ []kafka.Header, message := string(data) return message, nil } + +func (s *StringDeserializationProvider) GetSchemaRegistryClient() schemaregistry.Client { + return nil +} From 5d256ba45fb43f9c3cb4d1f0e4d5671570cb2881 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Thu, 16 Oct 2025 11:00:27 -0700 Subject: [PATCH 16/24] increase test coverage --- pkg/serdes/serdes_test.go | 94 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index 739641edce..0be0d47568 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -58,6 +58,8 @@ func TestInitSchemaRegistryClient(t *testing.T) { // Bearer Auth serde.GlobalRuleRegistry().Clear() + provider, err = GetDeserializationProvider(jsonSchemaName) + req.Nil(err) err = provider.InitDeserializer(mockClientUrl, "lsrc-abc123", "value", SchemaRegistryAuth{Token: "token"}, nil) req.Nil(err) config = provider.GetSchemaRegistryClient().Config() @@ -68,6 +70,8 @@ func TestInitSchemaRegistryClient(t *testing.T) { // No Auth (and also mTLS) serde.GlobalRuleRegistry().Clear() + provider, err = GetDeserializationProvider(protobufSchemaName) + req.Nil(err) err = provider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{ CertificateAuthorityPath: "ca.cert", ClientCertPath: "client.crt", @@ -851,6 +855,96 @@ message Person { req.JSONEq(str, expectedString) } +func TestProtobufSerdesReferenceWithHeaders(t *testing.T) { + req := require.New(t) + + tempDir, err := os.MkdirTemp(tempDir, "protobuf") + req.NoError(err) + defer os.RemoveAll(tempDir) + + referenceString := `syntax = "proto3"; + +package test; + +message Address { + string city = 1; +} +` + + // Reference schema should be registered from user side prior to be used as reference + // So subject and schema version will be known value at this time + referencePath := filepath.Join(tempDir, "address.proto") + req.NoError(os.WriteFile(referencePath, []byte(referenceString), 0644)) + + schemaString := `syntax = "proto3"; + +package test; + +import "address.proto"; + +message Person { + string name = 1; + test.Address address = 2; + int32 result = 3; +} +` + schemaPath := filepath.Join(tempDir, "person.proto") + req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) + + expectedString := `{"name":"abc","address":{"city":"LA"},"result":2}` + + serializationProvider, _ := GetSerializationProvider(protobufSchemaName) + err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) + req.Nil(err) + serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) + err = serializationProvider.LoadSchema(schemaPath, map[string]string{"address.proto": referencePath}) + req.Nil(err) + + // Explicitly register the reference schema and root schema to have a schemaId with mock SR client + client := serializationProvider.GetSchemaRegistryClient() + referenceInfo := schemaregistry.SchemaInfo{ + Schema: referenceString, + SchemaType: "PROTOBUF", + } + _, err = client.Register("address.proto", referenceInfo, false) + req.Nil(err) + + info := schemaregistry.SchemaInfo{ + Schema: schemaString, + SchemaType: "PROTOBUF", + References: []schemaregistry.Reference{ + { + Name: "address.proto", + Subject: "address.proto", + Version: 1, + }, + }, + } + _, err = client.Register("topic1-value", info, false) + req.Nil(err) + + headers, data, err := serializationProvider.Serialize("topic1", expectedString) + req.Nil(err) + + deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) + err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) + req.Nil(err) + err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data, Headers: headers}) + req.Nil(err) + str, err := deserializationProvider.Deserialize("topic1", headers, data) + req.Nil(err) + req.JSONEq(str, expectedString) + + // Deserialize again but without the reference file already stored locally + err = os.Remove(referencePath) + req.Nil(err) + err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data, Headers: headers}) + req.Nil(err) + str, err = deserializationProvider.Deserialize("topic1", headers, data) + req.Nil(err) + req.JSONEq(str, expectedString) +} + func TestProtobufSerdesInvalid(t *testing.T) { req := require.New(t) From fca7168e3e7028c62782d6627aa9017efbb558c3 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Thu, 16 Oct 2025 12:16:24 -0700 Subject: [PATCH 17/24] remove redundant test --- pkg/serdes/serdes_test.go | 90 --------------------------------------- 1 file changed, 90 deletions(-) diff --git a/pkg/serdes/serdes_test.go b/pkg/serdes/serdes_test.go index 0be0d47568..b52c8b4d03 100644 --- a/pkg/serdes/serdes_test.go +++ b/pkg/serdes/serdes_test.go @@ -855,96 +855,6 @@ message Person { req.JSONEq(str, expectedString) } -func TestProtobufSerdesReferenceWithHeaders(t *testing.T) { - req := require.New(t) - - tempDir, err := os.MkdirTemp(tempDir, "protobuf") - req.NoError(err) - defer os.RemoveAll(tempDir) - - referenceString := `syntax = "proto3"; - -package test; - -message Address { - string city = 1; -} -` - - // Reference schema should be registered from user side prior to be used as reference - // So subject and schema version will be known value at this time - referencePath := filepath.Join(tempDir, "address.proto") - req.NoError(os.WriteFile(referencePath, []byte(referenceString), 0644)) - - schemaString := `syntax = "proto3"; - -package test; - -import "address.proto"; - -message Person { - string name = 1; - test.Address address = 2; - int32 result = 3; -} -` - schemaPath := filepath.Join(tempDir, "person.proto") - req.NoError(os.WriteFile(schemaPath, []byte(schemaString), 0644)) - - expectedString := `{"name":"abc","address":{"city":"LA"},"result":2}` - - serializationProvider, _ := GetSerializationProvider(protobufSchemaName) - err = serializationProvider.InitSerializer(mockClientUrl, "", "value", -1, SchemaRegistryAuth{}) - req.Nil(err) - serializationProvider.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) - err = serializationProvider.LoadSchema(schemaPath, map[string]string{"address.proto": referencePath}) - req.Nil(err) - - // Explicitly register the reference schema and root schema to have a schemaId with mock SR client - client := serializationProvider.GetSchemaRegistryClient() - referenceInfo := schemaregistry.SchemaInfo{ - Schema: referenceString, - SchemaType: "PROTOBUF", - } - _, err = client.Register("address.proto", referenceInfo, false) - req.Nil(err) - - info := schemaregistry.SchemaInfo{ - Schema: schemaString, - SchemaType: "PROTOBUF", - References: []schemaregistry.Reference{ - { - Name: "address.proto", - Subject: "address.proto", - Version: 1, - }, - }, - } - _, err = client.Register("topic1-value", info, false) - req.Nil(err) - - headers, data, err := serializationProvider.Serialize("topic1", expectedString) - req.Nil(err) - - deserializationProvider, _ := GetDeserializationProvider(protobufSchemaName) - err = deserializationProvider.InitDeserializer(mockClientUrl, "", "value", SchemaRegistryAuth{}, client) - req.Nil(err) - err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data, Headers: headers}) - req.Nil(err) - str, err := deserializationProvider.Deserialize("topic1", headers, data) - req.Nil(err) - req.JSONEq(str, expectedString) - - // Deserialize again but without the reference file already stored locally - err = os.Remove(referencePath) - req.Nil(err) - err = deserializationProvider.LoadSchema("topic1-value", tempDir, serde.ValueSerde, &kafka.Message{Value: data, Headers: headers}) - req.Nil(err) - str, err = deserializationProvider.Deserialize("topic1", headers, data) - req.Nil(err) - req.JSONEq(str, expectedString) -} - func TestProtobufSerdesInvalid(t *testing.T) { req := require.New(t) From 3438f1bd507e2ce4675708b3df9411b490b1ad99 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Fri, 17 Oct 2025 15:01:13 -0700 Subject: [PATCH 18/24] Add support for serializing schema id in header --- internal/kafka/command_topic_produce.go | 48 ++++++++++++++----- internal/kafka/confluent_kafka.go | 21 ++++++++ pkg/serdes/avro_serialization_provider.go | 16 ++----- pkg/serdes/json_serialization_provider.go | 16 ++----- pkg/serdes/protobuf_serialization_provider.go | 16 ++----- pkg/serdes/serdes.go | 2 +- 6 files changed, 67 insertions(+), 52 deletions(-) diff --git a/internal/kafka/command_topic_produce.go b/internal/kafka/command_topic_produce.go index 99b5d76948..ec9c15ea98 100644 --- a/internal/kafka/command_topic_produce.go +++ b/internal/kafka/command_topic_produce.go @@ -15,6 +15,7 @@ import ( "github.com/spf13/cobra" ckgo "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde" srsdk "github.com/confluentinc/schema-registry-sdk-go" "github.com/confluentinc/cli/v4/pkg/auth" @@ -63,6 +64,7 @@ func (c *command) newProduceCommand() *cobra.Command { pcmd.AddProducerConfigFileFlag(cmd) cmd.Flags().String("schema-registry-endpoint", "", "Endpoint for Schema Registry cluster.") cmd.Flags().StringSlice("headers", nil, `A comma-separated list of headers formatted as "key:value".`) + cmd.Flags().Bool("schema-id-header", false, "Serialize schema ID in the header instead of the message prefix.") // cloud-only flags cmd.Flags().String("key-references", "", "The path to the message key schema references file.") @@ -155,6 +157,15 @@ func (c *command) produceCloud(cmd *cobra.Command, args []string) error { return err } + schemaIdHeader, err := cmd.Flags().GetBool("schema-id-header") + if err != nil { + return err + } + if schemaIdHeader { + keySerializer.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) + valueSerializer.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) + } + parseKey, err := cmd.Flags().GetBool("parse-key") if err != nil { return err @@ -206,6 +217,15 @@ func (c *command) produceOnPrem(cmd *cobra.Command, args []string) error { return err } + schemaIdHeader, err := cmd.Flags().GetBool("schema-id-header") + if err != nil { + return err + } + if schemaIdHeader { + keySerializer.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) + valueSerializer.SetSchemaIDSerializer(serde.HeaderSchemaIDSerializer) + } + parseKey, err := cmd.Flags().GetBool("parse-key") if err != nil { return err @@ -359,7 +379,7 @@ func GetProduceMessage(cmd *cobra.Command, keyMetaInfo, valueMetaInfo []byte, to return nil, err } - key, value, err := serializeMessage(keyMetaInfo, valueMetaInfo, topic, data, delimiter, parseKey, keySerializer, valueSerializer) + serializerHeaders, key, value, err := serializeMessage(keyMetaInfo, valueMetaInfo, topic, data, delimiter, parseKey, keySerializer, valueSerializer) if err != nil { return nil, err } @@ -369,8 +389,9 @@ func GetProduceMessage(cmd *cobra.Command, keyMetaInfo, valueMetaInfo []byte, to Topic: &topic, Partition: ckgo.PartitionAny, }, - Key: key, - Value: value, + Key: key, + Value: value, + Headers: serializerHeaders, } // This error is intentionally ignored because `confluent local kafka topic produce` does not define this flag @@ -380,36 +401,39 @@ func GetProduceMessage(cmd *cobra.Command, keyMetaInfo, valueMetaInfo []byte, to if err != nil { return nil, err } - message.Headers = parsedHeaders + message.Headers = append(message.Headers, parsedHeaders...) } return message, nil } -func serializeMessage(_, _ []byte, topic, data, delimiter string, parseKey bool, keySerializer, valueSerializer serdes.SerializationProvider) ([]byte, []byte, error) { +func serializeMessage(_, _ []byte, topic, data, delimiter string, parseKey bool, keySerializer, valueSerializer serdes.SerializationProvider) ([]ckgo.Header, []byte, []byte, error) { var serializedKey []byte + headers := []ckgo.Header{} + var keyHeader []ckgo.Header val := data if parseKey { schemaBased := keySerializer.GetSchemaName() != "" key, value, err := getKeyAndValue(schemaBased, data, delimiter) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - _, serializedKey, err = keySerializer.Serialize(topic, key) + keyHeader, serializedKey, err = keySerializer.Serialize(topic, key) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - + headers = append(headers, keyHeader...) val = value } - _, serializedValue, err := valueSerializer.Serialize(topic, val) + valueHeader, serializedValue, err := valueSerializer.Serialize(topic, val) if err != nil { - return nil, nil, err + return nil, nil, nil, err } + headers = append(headers, valueHeader...) - return serializedKey, serializedValue, nil + return headers, serializedKey, serializedValue, nil } func getKeyAndValue(schemaBased bool, data, delimiter string) (string, string, error) { diff --git a/internal/kafka/confluent_kafka.go b/internal/kafka/confluent_kafka.go index 4de369fa41..9f14f3df57 100644 --- a/internal/kafka/confluent_kafka.go +++ b/internal/kafka/confluent_kafka.go @@ -284,6 +284,7 @@ func ConsumeMessage(message *ckgo.Message, h *GroupHandler) error { } if message.Headers != nil { + message.Headers = unmarshalSchemaIdHeader(message.Headers) var headers any = message.Headers if h.Properties.FullHeader { headers = getFullHeaders(message.Headers) @@ -388,3 +389,23 @@ func getHeaderString(header ckgo.Header) string { return fmt.Sprintf(`%s="%s"`, header.Key, string(header.Value)) } } + +func unmarshalSchemaIdHeader(headers []ckgo.Header) []ckgo.Header { + for i, header := range headers { + if header.Key != "__key_schema_id" && header.Key != "__value_schema_id" { + continue + } + + schemaID := serde.SchemaID{} + if _, err := schemaID.FromBytes(header.Value); err != nil { + continue + } + + headers[i] = ckgo.Header{ + Key: header.Key, + Value: []byte(schemaID.GUID.String()), + } + } + + return headers +} diff --git a/pkg/serdes/avro_serialization_provider.go b/pkg/serdes/avro_serialization_provider.go index 5b342eeda8..e985b97fd9 100644 --- a/pkg/serdes/avro_serialization_provider.go +++ b/pkg/serdes/avro_serialization_provider.go @@ -27,20 +27,10 @@ type AvroSerializationProvider struct { } func (a *AvroSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { - var serdeClientConfig *schemaregistry.Config - if srClientUrl == mockClientUrl { - serdeClientConfig = schemaregistry.NewConfig(srClientUrl) - } else if srAuth.ApiKey != "" && srAuth.ApiSecret != "" { - serdeClientConfig = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret) - } else if srAuth.Token != "" { - serdeClientConfig = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") - } else { - return fmt.Errorf("schema registry client authentication should be provider to initialize serializer") + serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) + if err != nil { + return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) } - serdeClientConfig.SslCaLocation = srAuth.CertificateAuthorityPath - serdeClientConfig.SslCertificateLocation = srAuth.ClientCertPath - serdeClientConfig.SslKeyLocation = srAuth.ClientKeyPath - serdeClient, err := schemaregistry.NewClient(serdeClientConfig) // Register the KMS drivers and the field-level encryption executor awskms.Register() diff --git a/pkg/serdes/json_serialization_provider.go b/pkg/serdes/json_serialization_provider.go index 54a4a03ade..75db104201 100644 --- a/pkg/serdes/json_serialization_provider.go +++ b/pkg/serdes/json_serialization_provider.go @@ -24,20 +24,10 @@ type JsonSerializationProvider struct { } func (j *JsonSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { - var serdeClientConfig *schemaregistry.Config - if srClientUrl == mockClientUrl { - serdeClientConfig = schemaregistry.NewConfig(srClientUrl) - } else if srAuth.ApiKey != "" && srAuth.ApiSecret != "" { - serdeClientConfig = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret) - } else if srAuth.Token != "" { - serdeClientConfig = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") - } else { - return fmt.Errorf("schema registry client authentication should be provider to initialize serializer") + serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) + if err != nil { + return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) } - serdeClientConfig.SslCaLocation = srAuth.CertificateAuthorityPath - serdeClientConfig.SslCertificateLocation = srAuth.ClientCertPath - serdeClientConfig.SslKeyLocation = srAuth.ClientKeyPath - serdeClient, err := schemaregistry.NewClient(serdeClientConfig) // Register the KMS drivers and the field-level encryption executor awskms.Register() diff --git a/pkg/serdes/protobuf_serialization_provider.go b/pkg/serdes/protobuf_serialization_provider.go index 4e6e9c4179..f2af66ea1a 100644 --- a/pkg/serdes/protobuf_serialization_provider.go +++ b/pkg/serdes/protobuf_serialization_provider.go @@ -44,20 +44,10 @@ type ProtobufSerializationProvider struct { } func (p *ProtobufSerializationProvider) InitSerializer(srClientUrl, srClusterId, mode string, schemaId int, srAuth SchemaRegistryAuth) error { - var serdeClientConfig *schemaregistry.Config - if srClientUrl == mockClientUrl { - serdeClientConfig = schemaregistry.NewConfig(srClientUrl) - } else if srAuth.ApiKey != "" && srAuth.ApiSecret != "" { - serdeClientConfig = schemaregistry.NewConfigWithBasicAuthentication(srClientUrl, srAuth.ApiKey, srAuth.ApiSecret) - } else if srAuth.Token != "" { - serdeClientConfig = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") - } else { - return fmt.Errorf("schema registry client authentication should be provider to initialize serializer") + serdeClient, err := initSchemaRegistryClient(srClientUrl, srClusterId, srAuth, nil) + if err != nil { + return fmt.Errorf("failed to create serializer-specific Schema Registry client: %w", err) } - serdeClientConfig.SslCaLocation = srAuth.CertificateAuthorityPath - serdeClientConfig.SslCertificateLocation = srAuth.ClientCertPath - serdeClientConfig.SslKeyLocation = srAuth.ClientKeyPath - serdeClient, err := schemaregistry.NewClient(serdeClientConfig) // Register the KMS drivers and the field-level encryption executor awskms.Register() diff --git a/pkg/serdes/serdes.go b/pkg/serdes/serdes.go index 0bcd116c9a..be1e0a4b3b 100644 --- a/pkg/serdes/serdes.go +++ b/pkg/serdes/serdes.go @@ -152,7 +152,7 @@ func initSchemaRegistryClient(srClientUrl, srClusterId string, srAuth SchemaRegi serdeClientConfig = schemaregistry.NewConfigWithBearerAuthentication(srClientUrl, srAuth.Token, srClusterId, "") } else { serdeClientConfig = schemaregistry.NewConfig(srClientUrl) - log.CliLogger.Info("initializing deserializer with no schema registry client authentication") + log.CliLogger.Info("initializing schema registry client with no authentication") } serdeClientConfig.SslCaLocation = srAuth.CertificateAuthorityPath serdeClientConfig.SslCertificateLocation = srAuth.ClientCertPath From 1d68e6f8c650f19c4e37b7322cf4be9fc8d4c176 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Fri, 17 Oct 2025 15:52:29 -0700 Subject: [PATCH 19/24] update golden files --- test/fixtures/output/kafka/topic/produce-help-onprem.golden | 1 + test/fixtures/output/kafka/topic/produce-help.golden | 1 + test/fixtures/output/kafka/topic/produce-no-schema.golden | 6 +++++- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/test/fixtures/output/kafka/topic/produce-help-onprem.golden b/test/fixtures/output/kafka/topic/produce-help-onprem.golden index 7733aadde2..88e54f6a62 100644 --- a/test/fixtures/output/kafka/topic/produce-help-onprem.golden +++ b/test/fixtures/output/kafka/topic/produce-help-onprem.golden @@ -23,6 +23,7 @@ Flags: --config-file string The path to the configuration file for the producer client, in JSON or Avro format. --schema-registry-endpoint string Endpoint for Schema Registry cluster. --headers strings A comma-separated list of headers formatted as "key:value". + --schema-id-header Serialize schema ID in the header instead of the message prefix. --key-references string The path to the message key schema references file. --api-key string API key. --api-secret string API secret. diff --git a/test/fixtures/output/kafka/topic/produce-help.golden b/test/fixtures/output/kafka/topic/produce-help.golden index 7733aadde2..88e54f6a62 100644 --- a/test/fixtures/output/kafka/topic/produce-help.golden +++ b/test/fixtures/output/kafka/topic/produce-help.golden @@ -23,6 +23,7 @@ Flags: --config-file string The path to the configuration file for the producer client, in JSON or Avro format. --schema-registry-endpoint string Endpoint for Schema Registry cluster. --headers strings A comma-separated list of headers formatted as "key:value". + --schema-id-header Serialize schema ID in the header instead of the message prefix. --key-references string The path to the message key schema references file. --api-key string API key. --api-secret string API secret. diff --git a/test/fixtures/output/kafka/topic/produce-no-schema.golden b/test/fixtures/output/kafka/topic/produce-no-schema.golden index 80dc9cd831..eb2607c1e0 100644 --- a/test/fixtures/output/kafka/topic/produce-no-schema.golden +++ b/test/fixtures/output/kafka/topic/produce-no-schema.golden @@ -1 +1,5 @@ -Error: schema registry client authentication should be provider to initialize serializer +Error: failed to load schema: error compiling .proto files: read .: is a directory + + +Suggestions: + Specify a schema by passing a schema ID or the path to a schema file to the `--schema` flag. From 955c3581a33fda3e66889db82bdc15f48c95aace Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Fri, 17 Oct 2025 17:21:35 -0700 Subject: [PATCH 20/24] error out early if no schema is provided to prevent writing temp proto files to the current directory --- pkg/serdes/protobuf_serialization_provider.go | 4 ++++ test/fixtures/output/kafka/topic/produce-no-schema.golden | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/serdes/protobuf_serialization_provider.go b/pkg/serdes/protobuf_serialization_provider.go index f2af66ea1a..c5de37200f 100644 --- a/pkg/serdes/protobuf_serialization_provider.go +++ b/pkg/serdes/protobuf_serialization_provider.go @@ -128,6 +128,10 @@ func (p *ProtobufSerializationProvider) Serialize(topic, message string) ([]kafk } func parseMessage(schemaPath string, referencePathMap map[string]string) (gproto.Message, error) { + if schemaPath == "" { + return nil, fmt.Errorf("schema path is empty") + } + // Collect import paths importPath := filepath.Dir(schemaPath) importPaths := []string{importPath} diff --git a/test/fixtures/output/kafka/topic/produce-no-schema.golden b/test/fixtures/output/kafka/topic/produce-no-schema.golden index eb2607c1e0..74fb8ffa3a 100644 --- a/test/fixtures/output/kafka/topic/produce-no-schema.golden +++ b/test/fixtures/output/kafka/topic/produce-no-schema.golden @@ -1,5 +1,4 @@ -Error: failed to load schema: error compiling .proto files: read .: is a directory - +Error: failed to load schema: schema path is empty Suggestions: Specify a schema by passing a schema ID or the path to a schema file to the `--schema` flag. From a2d596a6c9dcc68af7072212cb99c730432f57d0 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Fri, 17 Oct 2025 19:15:55 -0700 Subject: [PATCH 21/24] Only show telemetry client instance id message for produce/consume when the verbosity is set to info --- internal/kafka/confluent_kafka_configs.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/internal/kafka/confluent_kafka_configs.go b/internal/kafka/confluent_kafka_configs.go index 9c25e2bdb8..2c2f2592fb 100644 --- a/internal/kafka/confluent_kafka_configs.go +++ b/internal/kafka/confluent_kafka_configs.go @@ -334,23 +334,33 @@ func getPartitionsByIndex(partitions []ckgo.TopicPartition, partitionFilter Part } func SetProducerDebugOption(configMap *ckgo.ConfigMap) error { + // Note: log_levels are based on syslog levels switch log.CliLogger.Level { + case log.WARN: + return configMap.Set("log_level=4") // Warn level and above + case log.INFO: + return configMap.Set("log_level=6") // Info level and above case log.DEBUG: return configMap.Set("debug=broker, topic, msg, protocol") case log.TRACE, log.UNSAFE_TRACE: return configMap.Set("debug=all") } - return nil + return configMap.Set("log_level=3") // error level and above } func SetConsumerDebugOption(configMap *ckgo.ConfigMap) error { + // Note: log_levels are based on syslog levels switch log.CliLogger.Level { + case log.WARN: + return configMap.Set("log_level=4") // Warn level and above + case log.INFO: + return configMap.Set("log_level=6") // Info level and above case log.DEBUG: return configMap.Set("debug=broker, topic, msg, protocol, consumer, cgrp, fetch") case log.TRACE, log.UNSAFE_TRACE: return configMap.Set("debug=all") } - return nil + return configMap.Set("log_level=3") // error level and above } func newProducerWithOverwrittenConfigs(configMap *ckgo.ConfigMap, configPath string, configStrings []string) (*ckgo.Producer, error) { From 930ea817961138033624bf34fc75cc986b4dd8b2 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Mon, 20 Oct 2025 20:05:38 -0700 Subject: [PATCH 22/24] fix bug in protobuf key schemas introduced by refactor --- pkg/serdes/protobuf_deserialization_provider.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/serdes/protobuf_deserialization_provider.go b/pkg/serdes/protobuf_deserialization_provider.go index b2b70b82b8..695f176465 100644 --- a/pkg/serdes/protobuf_deserialization_provider.go +++ b/pkg/serdes/protobuf_deserialization_provider.go @@ -84,7 +84,11 @@ func (p *ProtobufDeserializationProvider) requestSchema(subject, schemaPath stri } schemaID := serde.SchemaID{} - _, err := serde.DualSchemaIDDeserializer(subject, serdeType, message.Headers, message.Value, &schemaID) + payload := message.Value + if serdeType == serde.KeySerde { + payload = message.Key + } + _, err := serde.DualSchemaIDDeserializer(subject, serdeType, message.Headers, payload, &schemaID) if err != nil { return "", nil, err } From f1ad3f95a875faabfe9f87561724028763f071ed Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Tue, 21 Oct 2025 17:20:26 -0700 Subject: [PATCH 23/24] Remove obsolete comment --- pkg/serdes/avro_serialization_provider.go | 1 - pkg/serdes/json_serialization_provider.go | 1 - pkg/serdes/protobuf_serialization_provider.go | 1 - 3 files changed, 3 deletions(-) diff --git a/pkg/serdes/avro_serialization_provider.go b/pkg/serdes/avro_serialization_provider.go index e985b97fd9..c71e68b516 100644 --- a/pkg/serdes/avro_serialization_provider.go +++ b/pkg/serdes/avro_serialization_provider.go @@ -140,7 +140,6 @@ func (a *AvroSerializationProvider) GetSchemaRegistryClient() schemaregistry.Cli return a.ser.Client } -// For unit testing purposes func (a *AvroSerializationProvider) SetSchemaIDSerializer(headerSerializer serde.SchemaIDSerializerFunc) { a.ser.SchemaIDSerializer = headerSerializer } diff --git a/pkg/serdes/json_serialization_provider.go b/pkg/serdes/json_serialization_provider.go index 75db104201..600fa6d810 100644 --- a/pkg/serdes/json_serialization_provider.go +++ b/pkg/serdes/json_serialization_provider.go @@ -111,7 +111,6 @@ func (j *JsonSerializationProvider) GetSchemaRegistryClient() schemaregistry.Cli return j.ser.Client } -// For unit testing purposes func (j *JsonSerializationProvider) SetSchemaIDSerializer(headerSerializer serde.SchemaIDSerializerFunc) { j.ser.SchemaIDSerializer = headerSerializer } diff --git a/pkg/serdes/protobuf_serialization_provider.go b/pkg/serdes/protobuf_serialization_provider.go index c5de37200f..2f8e2ce45b 100644 --- a/pkg/serdes/protobuf_serialization_provider.go +++ b/pkg/serdes/protobuf_serialization_provider.go @@ -220,7 +220,6 @@ func copyBuiltInProtoFiles(destinationDir string) error { }) } -// For unit testing purposes func (p *ProtobufSerializationProvider) SetSchemaIDSerializer(headerSerializer serde.SchemaIDSerializerFunc) { p.ser.SchemaIDSerializer = headerSerializer } From 7bfedd62932462a8bb6ffabfc0a1fa9344bafc37 Mon Sep 17 00:00:00 2001 From: Steven Gagniere Date: Wed, 22 Oct 2025 13:12:34 -0700 Subject: [PATCH 24/24] Fix the same bug in asyncapi export --- internal/asyncapi/command_export.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/asyncapi/command_export.go b/internal/asyncapi/command_export.go index ace7d3aed6..c0b79fb6fa 100644 --- a/internal/asyncapi/command_export.go +++ b/internal/asyncapi/command_export.go @@ -708,7 +708,7 @@ func addComponents(reflector asyncapi.Reflector, messages map[string]spec.Messag } func createConsumer(broker string, clusterCreds *config.APIKeyPair, group string) (*ckgo.Consumer, error) { - consumer, err := ckgo.NewConsumer(&ckgo.ConfigMap{ + configMap := &ckgo.ConfigMap{ "bootstrap.servers": broker, "sasl.mechanisms": "PLAIN", "security.protocol": "SASL_SSL", @@ -717,7 +717,11 @@ func createConsumer(broker string, clusterCreds *config.APIKeyPair, group string "group.id": group, "auto.offset.reset": "earliest", "enable.auto.commit": "false", - }) + } + if err := kafka.SetConsumerDebugOption(configMap); err != nil { + return nil, fmt.Errorf("failed to create Kafka consumer: %w", err) + } + consumer, err := ckgo.NewConsumer(configMap) if err != nil { return nil, fmt.Errorf("failed to create Kafka consumer: %w", err) }