-
Notifications
You must be signed in to change notification settings - Fork 215
feat: Enhanced Postgres type support and CDC plugin configuration (#802) #833
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,303 @@ | ||||||||||||||||||||||||
| package waljs | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||
| "encoding/json" | ||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| "github.com/datazip-inc/olake/utils/typeutils" | ||||||||||||||||||||||||
| "github.com/jackc/pgtype" | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // PgtypeDecoder uses pgtype for structured decoding of PostgreSQL binary data | ||||||||||||||||||||||||
| type PgtypeDecoder struct { | ||||||||||||||||||||||||
| connInfo *pgtype.ConnInfo | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // NewPgtypeDecoder creates a decoder with registered pgtype handlers | ||||||||||||||||||||||||
| func NewPgtypeDecoder() *PgtypeDecoder { | ||||||||||||||||||||||||
| connInfo := pgtype.NewConnInfo() | ||||||||||||||||||||||||
| return &PgtypeDecoder{ | ||||||||||||||||||||||||
| connInfo: connInfo, | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // DecodeBinary decodes PostgreSQL binary data based on OID into a Go value | ||||||||||||||||||||||||
| // This eliminates the need to convert binary -> string -> type | ||||||||||||||||||||||||
| func (d *PgtypeDecoder) DecodeBinary(data []byte, oid uint32) (interface{}, error) { | ||||||||||||||||||||||||
| if data == nil { | ||||||||||||||||||||||||
| return nil, typeutils.ErrNullValue | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // Handle common types with direct decoding | ||||||||||||||||||||||||
| switch oid { | ||||||||||||||||||||||||
| case pgtype.JSONOID: | ||||||||||||||||||||||||
| return d.decodeJSON(data) | ||||||||||||||||||||||||
| case pgtype.JSONBOID: | ||||||||||||||||||||||||
| return d.decodeJSONB(data) | ||||||||||||||||||||||||
| case pgtype.UUIDOID: | ||||||||||||||||||||||||
| return d.decodeUUID(data) | ||||||||||||||||||||||||
| case pgtype.Int8OID: | ||||||||||||||||||||||||
| return d.decodeInt8(data) | ||||||||||||||||||||||||
| case pgtype.Int4OID: | ||||||||||||||||||||||||
| return d.decodeInt4(data) | ||||||||||||||||||||||||
| case pgtype.Int2OID: | ||||||||||||||||||||||||
| return d.decodeInt2(data) | ||||||||||||||||||||||||
| case pgtype.Float8OID: | ||||||||||||||||||||||||
| return d.decodeFloat8(data) | ||||||||||||||||||||||||
| case pgtype.Float4OID: | ||||||||||||||||||||||||
| return d.decodeFloat4(data) | ||||||||||||||||||||||||
| case pgtype.BoolOID: | ||||||||||||||||||||||||
| return d.decodeBool(data) | ||||||||||||||||||||||||
| case pgtype.TimestampOID: | ||||||||||||||||||||||||
| return d.decodeTimestamp(data) | ||||||||||||||||||||||||
| case pgtype.TimestamptzOID: | ||||||||||||||||||||||||
| return d.decodeTimestamptz(data) | ||||||||||||||||||||||||
| case pgtype.DateOID: | ||||||||||||||||||||||||
| return d.decodeDate(data) | ||||||||||||||||||||||||
| case pgtype.ByteaOID: | ||||||||||||||||||||||||
| return d.decodeBytea(data) | ||||||||||||||||||||||||
| case pgtype.NumericOID: | ||||||||||||||||||||||||
| return d.decodeNumeric(data) | ||||||||||||||||||||||||
| case pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID: | ||||||||||||||||||||||||
| return d.decodeText(data) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // For unknown types or arrays, try generic decode or fall back to string | ||||||||||||||||||||||||
| dt, ok := d.connInfo.DataTypeForOID(oid) | ||||||||||||||||||||||||
| if !ok { | ||||||||||||||||||||||||
| // Try as text for unknown types | ||||||||||||||||||||||||
| return string(data), nil | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| value := dt.Value | ||||||||||||||||||||||||
| if decoder, ok := value.(pgtype.BinaryDecoder); ok { | ||||||||||||||||||||||||
| if err := decoder.DecodeBinary(d.connInfo, data); err == nil { | ||||||||||||||||||||||||
| return d.extractGoValue(value, oid) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // Fallback to text decode | ||||||||||||||||||||||||
| if decoder, ok := value.(pgtype.TextDecoder); ok { | ||||||||||||||||||||||||
| if err := decoder.DecodeText(d.connInfo, data); err == nil { | ||||||||||||||||||||||||
| return d.extractGoValue(value, oid) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // Final fallback to string | ||||||||||||||||||||||||
| return string(data), nil | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // Type-specific decoders | ||||||||||||||||||||||||
| func (d *PgtypeDecoder) decodeJSON(data []byte) (interface{}, error) { | ||||||||||||||||||||||||
| var v pgtype.JSON | ||||||||||||||||||||||||
| if err := v.DecodeBinary(d.connInfo, data); err != nil { | ||||||||||||||||||||||||
| // Try text format | ||||||||||||||||||||||||
| if err := v.DecodeText(d.connInfo, data); err != nil { | ||||||||||||||||||||||||
| return string(data), nil | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| if v.Status != pgtype.Present { | ||||||||||||||||||||||||
| return nil, typeutils.ErrNullValue | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| return d.parseJSON(v.Bytes) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func (d *PgtypeDecoder) decodeJSONB(data []byte) (interface{}, error) { | ||||||||||||||||||||||||
| var v pgtype.JSONB | ||||||||||||||||||||||||
| if err := v.DecodeBinary(d.connInfo, data); err != nil { | ||||||||||||||||||||||||
| // Try text format | ||||||||||||||||||||||||
| if err := v.DecodeText(d.connInfo, data); err != nil { | ||||||||||||||||||||||||
| return string(data), nil | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| if v.Status != pgtype.Present { | ||||||||||||||||||||||||
| return nil, typeutils.ErrNullValue | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| return d.parseJSON(v.Bytes) | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func (d *PgtypeDecoder) parseJSON(data []byte) (interface{}, error) { | ||||||||||||||||||||||||
| // Try to parse as map first | ||||||||||||||||||||||||
| var mapResult map[string]interface{} | ||||||||||||||||||||||||
| if err := json.Unmarshal(data, &mapResult); err == nil { | ||||||||||||||||||||||||
| return mapResult, nil | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // Try to parse as array | ||||||||||||||||||||||||
| var arrayResult []interface{} | ||||||||||||||||||||||||
| if err := json.Unmarshal(data, &arrayResult); err == nil { | ||||||||||||||||||||||||
| return arrayResult, nil | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| // Fallback to string | ||||||||||||||||||||||||
| return string(data), nil | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func (d *PgtypeDecoder) decodeUUID(data []byte) (interface{}, error) { | ||||||||||||||||||||||||
| var v pgtype.UUID | ||||||||||||||||||||||||
| if err := v.DecodeBinary(d.connInfo, data); err != nil { | ||||||||||||||||||||||||
| return string(data), nil | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| if v.Status != pgtype.Present { | ||||||||||||||||||||||||
| return nil, typeutils.ErrNullValue | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
| return fmt.Sprintf("%x-%x-%x-%x-%x", v.Bytes[0:4], v.Bytes[4:6], v.Bytes[6:8], v.Bytes[8:10], v.Bytes[10:16]), nil | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
| return fmt.Sprintf("%x-%x-%x-%x-%x", v.Bytes[0:4], v.Bytes[4:6], v.Bytes[6:8], v.Bytes[8:10], v.Bytes[10:16]), nil | |
| text, err := v.EncodeText(d.connInfo, nil) | |
| if err != nil { | |
| return nil, err | |
| } | |
| return string(text), nil |
Copilot
AI
Feb 18, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential precision loss for NUMERIC types: PostgreSQL NUMERIC can have arbitrary precision, but this code converts it to float64 which has limited precision (53 bits of mantissa, roughly 15-17 decimal digits). Large or high-precision NUMERIC values may lose precision. Consider using string representation or a decimal library for exact numeric values, especially for financial data.
| // Convert numeric to float64 | |
| var f float64 | |
| if err := v.AssignTo(&f); err != nil { | |
| return string(data), nil | |
| } | |
| return f, nil | |
| var s string | |
| if err := v.AssignTo(&s); err != nil { | |
| return string(data), nil | |
| } | |
| return s, nil |
Copilot
AI
Feb 18, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing test coverage for decoder: A new 303-line decoder implementation has been added with no corresponding test file. Given the complexity of handling 60+ PostgreSQL types, including arrays, geometric types, and network types, this creates a significant risk. Consider adding unit tests to verify correct decoding of at least the most common types (JSON, JSONB, arrays, UUID, NUMERIC) and edge cases (null values, malformed data).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Silent error handling: When decoding fails (line 93-97, 107-111), the function silently falls back to string representation without logging. While this ensures data isn't lost, it makes debugging difficult if types are decoded incorrectly. Consider adding logger.Warnf or logger.Debugf calls when falling back to string, so operators can detect and investigate decoding issues. This is especially important for newly added types like geometric and network types.