Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions drivers/postgres/internal/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (p *Postgres) prepareWALJSConfig(streams ...types.StreamInterface) (*waljs.
InitialWaitTime: time.Duration(p.cdcConfig.InitialWaitTime) * time.Second,
Tables: types.NewSet(streams...),
Publication: p.cdcConfig.Publication,
PluginArgs: p.cdcConfig.PluginArgs,
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions drivers/postgres/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type CDC struct {
InitialWaitTime int `json:"initial_wait_time"`
// Publications used when OutputPlugin is pgoutput
Publication string `json:"publication"`
// PluginArgs allows custom replication plugin arguments
// Format: key-value pairs e.g., {"include-unchanged-toast": "false", "format-version": "2"}
PluginArgs map[string]string `json:"plugin_args"`
}

func (c *Config) Validate() error {
Expand Down
8 changes: 8 additions & 0 deletions drivers/postgres/resources/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@
"type": "string",
"title": "Publication",
"description": "Publication defines which tables need to be consumed"
},
"plugin_args": {
"type": "object",
"title": "Plugin Arguments",
"description": "Custom replication plugin arguments as key-value pairs (optional). Example: {\"include-unchanged-toast\": \"false\", \"format-version\": \"2\"}",
"additionalProperties": {
"type": "string"
}
}
},
"required": [
Expand Down
303 changes: 303 additions & 0 deletions pkg/waljs/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
package waljs

import (
"encoding/json"
"fmt"

"github.com/datazip-inc/olake/utils/typeutils"
"github.com/jackc/pgtype"
)

// PgtypeDecoder uses pgtype for structured decoding of PostgreSQL binary data
type PgtypeDecoder struct {
connInfo *pgtype.ConnInfo
}

// NewPgtypeDecoder creates a decoder with registered pgtype handlers
func NewPgtypeDecoder() *PgtypeDecoder {
connInfo := pgtype.NewConnInfo()
return &PgtypeDecoder{
connInfo: connInfo,
}
}

// DecodeBinary decodes PostgreSQL binary data based on OID into a Go value
// This eliminates the need to convert binary -> string -> type
func (d *PgtypeDecoder) DecodeBinary(data []byte, oid uint32) (interface{}, error) {
if data == nil {
return nil, typeutils.ErrNullValue
}

// Handle common types with direct decoding
switch oid {
case pgtype.JSONOID:
return d.decodeJSON(data)
case pgtype.JSONBOID:
return d.decodeJSONB(data)
case pgtype.UUIDOID:
return d.decodeUUID(data)
case pgtype.Int8OID:
return d.decodeInt8(data)
case pgtype.Int4OID:
return d.decodeInt4(data)
case pgtype.Int2OID:
return d.decodeInt2(data)
case pgtype.Float8OID:
return d.decodeFloat8(data)
case pgtype.Float4OID:
return d.decodeFloat4(data)
case pgtype.BoolOID:
return d.decodeBool(data)
case pgtype.TimestampOID:
return d.decodeTimestamp(data)
case pgtype.TimestamptzOID:
return d.decodeTimestamptz(data)
case pgtype.DateOID:
return d.decodeDate(data)
case pgtype.ByteaOID:
return d.decodeBytea(data)
case pgtype.NumericOID:
return d.decodeNumeric(data)
case pgtype.TextOID, pgtype.VarcharOID, pgtype.BPCharOID:
return d.decodeText(data)
}

// For unknown types or arrays, try generic decode or fall back to string
dt, ok := d.connInfo.DataTypeForOID(oid)
if !ok {
// Try as text for unknown types
return string(data), nil
}

value := dt.Value
if decoder, ok := value.(pgtype.BinaryDecoder); ok {
if err := decoder.DecodeBinary(d.connInfo, data); err == nil {
return d.extractGoValue(value, oid)
}
}

// Fallback to text decode
if decoder, ok := value.(pgtype.TextDecoder); ok {
if err := decoder.DecodeText(d.connInfo, data); err == nil {
return d.extractGoValue(value, oid)
}
}

// Final fallback to string
return string(data), nil
}

// Type-specific decoders
func (d *PgtypeDecoder) decodeJSON(data []byte) (interface{}, error) {
var v pgtype.JSON
if err := v.DecodeBinary(d.connInfo, data); err != nil {
// Try text format
if err := v.DecodeText(d.connInfo, data); err != nil {
return string(data), nil
}
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return d.parseJSON(v.Bytes)
}

func (d *PgtypeDecoder) decodeJSONB(data []byte) (interface{}, error) {
var v pgtype.JSONB
if err := v.DecodeBinary(d.connInfo, data); err != nil {
// Try text format
if err := v.DecodeText(d.connInfo, data); err != nil {
return string(data), nil
}
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return d.parseJSON(v.Bytes)
}
Comment on lines +91 to +117
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent error handling: When decoding fails (line 93-97, 107-111), the function silently falls back to string representation without logging. While this ensures data isn't lost, it makes debugging difficult if types are decoded incorrectly. Consider adding logger.Warnf or logger.Debugf calls when falling back to string, so operators can detect and investigate decoding issues. This is especially important for newly added types like geometric and network types.

Copilot uses AI. Check for mistakes.

func (d *PgtypeDecoder) parseJSON(data []byte) (interface{}, error) {
// Try to parse as map first
var mapResult map[string]interface{}
if err := json.Unmarshal(data, &mapResult); err == nil {
return mapResult, nil
}

// Try to parse as array
var arrayResult []interface{}
if err := json.Unmarshal(data, &arrayResult); err == nil {
return arrayResult, nil
}

// Fallback to string
return string(data), nil
}

func (d *PgtypeDecoder) decodeUUID(data []byte) (interface{}, error) {
var v pgtype.UUID
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return string(data), nil
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return fmt.Sprintf("%x-%x-%x-%x-%x", v.Bytes[0:4], v.Bytes[4:6], v.Bytes[6:8], v.Bytes[8:10], v.Bytes[10:16]), nil
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UUID formatting is not RFC 4122 compliant: the format string uses lowercase 'x' which produces lowercase hex digits, but standard UUID representation uses uppercase for the alphabetic hex digits in many implementations. More importantly, this manual formatting doesn't include proper handling of the version and variant bits. Consider using a standard UUID library or the pgtype.UUID.String() method if available, which would ensure RFC 4122 compliance.

Suggested change
return fmt.Sprintf("%x-%x-%x-%x-%x", v.Bytes[0:4], v.Bytes[4:6], v.Bytes[6:8], v.Bytes[8:10], v.Bytes[10:16]), nil
text, err := v.EncodeText(d.connInfo, nil)
if err != nil {
return nil, err
}
return string(text), nil

Copilot uses AI. Check for mistakes.
}

func (d *PgtypeDecoder) decodeInt8(data []byte) (interface{}, error) {
var v pgtype.Int8
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return nil, err
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return v.Int, nil
}

func (d *PgtypeDecoder) decodeInt4(data []byte) (interface{}, error) {
var v pgtype.Int4
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return nil, err
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return v.Int, nil
}

func (d *PgtypeDecoder) decodeInt2(data []byte) (interface{}, error) {
var v pgtype.Int2
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return nil, err
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return v.Int, nil
}

func (d *PgtypeDecoder) decodeFloat8(data []byte) (interface{}, error) {
var v pgtype.Float8
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return nil, err
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return v.Float, nil
}

func (d *PgtypeDecoder) decodeFloat4(data []byte) (interface{}, error) {
var v pgtype.Float4
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return nil, err
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return v.Float, nil
}

func (d *PgtypeDecoder) decodeBool(data []byte) (interface{}, error) {
var v pgtype.Bool
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return nil, err
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return v.Bool, nil
}

func (d *PgtypeDecoder) decodeTimestamp(data []byte) (interface{}, error) {
var v pgtype.Timestamp
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return nil, err
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return v.Time, nil
}

func (d *PgtypeDecoder) decodeTimestamptz(data []byte) (interface{}, error) {
var v pgtype.Timestamptz
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return nil, err
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return v.Time, nil
}

func (d *PgtypeDecoder) decodeDate(data []byte) (interface{}, error) {
var v pgtype.Date
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return nil, err
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return v.Time, nil
}

func (d *PgtypeDecoder) decodeBytea(data []byte) (interface{}, error) {
var v pgtype.Bytea
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return nil, err
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return v.Bytes, nil
}

func (d *PgtypeDecoder) decodeNumeric(data []byte) (interface{}, error) {
var v pgtype.Numeric
if err := v.DecodeBinary(d.connInfo, data); err != nil {
return nil, err
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
// Convert numeric to float64
var f float64
if err := v.AssignTo(&f); err != nil {
return string(data), nil
}
return f, nil
Comment on lines +265 to +270
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential precision loss for NUMERIC types: PostgreSQL NUMERIC can have arbitrary precision, but this code converts it to float64 which has limited precision (53 bits of mantissa, roughly 15-17 decimal digits). Large or high-precision NUMERIC values may lose precision. Consider using string representation or a decimal library for exact numeric values, especially for financial data.

Suggested change
// Convert numeric to float64
var f float64
if err := v.AssignTo(&f); err != nil {
return string(data), nil
}
return f, nil
var s string
if err := v.AssignTo(&s); err != nil {
return string(data), nil
}
return s, nil

Copilot uses AI. Check for mistakes.
}

func (d *PgtypeDecoder) decodeText(data []byte) (interface{}, error) {
var v pgtype.Text
if err := v.DecodeBinary(d.connInfo, data); err != nil {
// Try as raw string
return string(data), nil
}
if v.Status != pgtype.Present {
return nil, typeutils.ErrNullValue
}
return v.String, nil
}

// extractGoValue converts pgtype values to appropriate Go types (for generic cases)
func (d *PgtypeDecoder) extractGoValue(value pgtype.Value, oid uint32) (interface{}, error) {
// For array types and other complex types, use Get() method
if getter, ok := value.(interface{ Get() interface{} }); ok {
result := getter.Get()
if result == nil {
return nil, typeutils.ErrNullValue
}
return result, nil
}

// Fallback: convert to AssignTo string
var s string
if err := value.AssignTo(&s); err == nil {
return s, nil
}

return fmt.Sprintf("%v", value), nil
}
Comment on lines +1 to +303
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for decoder: A new 303-line decoder implementation has been added with no corresponding test file. Given the complexity of handling 60+ PostgreSQL types, including arrays, geometric types, and network types, this creates a significant risk. Consider adding unit tests to verify correct decoding of at least the most common types (JSON, JSONB, arrays, UUID, NUMERIC) and edge cases (null values, malformed data).

Copilot uses AI. Check for mistakes.
Loading
Loading