Skip to content
Open
8 changes: 7 additions & 1 deletion drivers/mssql/internal/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ func normalizeBoundaryValue(value any, pkCols []string, columnType string) strin
return utils.ConvertToString(value)
}

value = normalizeMSSQLValue(value, columnType)
return utils.ConvertToString(value)
}

func normalizeMSSQLValue(value any, columnType string) any {
columnType = strings.ToLower(columnType)

switch v := value.(type) {
Expand Down Expand Up @@ -328,7 +333,8 @@ func normalizeBoundaryValue(value any, pkCols []string, columnType string) strin
return utils.HexEncode(v)
}
}
return utils.ConvertToString(value)

return value
}

// getTableExtremes returns MIN and MAX key values for the given PK columns
Expand Down
28 changes: 27 additions & 1 deletion drivers/mssql/internal/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,31 @@ func (m *MSSQL) FetchMaxCursorValues(ctx context.Context, stream types.StreamInt
if err != nil {
return nil, nil, err
}

primaryCursor, secondaryCursor := stream.Cursor()
maxPrimaryCursorValue, err = m.normalizeCursorValue(ctx, stream, primaryCursor, maxPrimaryCursorValue)
if err != nil {
return nil, nil, err
}
if secondaryCursor != "" {
maxSecondaryCursorValue, err = m.normalizeCursorValue(ctx, stream, secondaryCursor, maxSecondaryCursorValue)
if err != nil {
return nil, nil, err
}
}

return maxPrimaryCursorValue, maxSecondaryCursorValue, nil
}
}

func (m *MSSQL) normalizeCursorValue(ctx context.Context, stream types.StreamInterface, cursorField string, value any) (any, error) {
if value == nil {
return value, nil
}

var dataType string
if err := m.client.QueryRowContext(ctx, jdbc.MSSQLColumnTypeQuery(), stream.Namespace(), stream.Name(), cursorField).Scan(&dataType); err != nil {
return nil, fmt.Errorf("failed to get MSSQL cursor type: %s", err)
}

return normalizeMSSQLValue(value, dataType), nil
}
5 changes: 5 additions & 0 deletions pkg/jdbc/jdbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,11 @@ func GetMaxCursorValues(ctx context.Context, client *sqlx.DB, driverType constan
var maxPrimaryCursorValue, maxSecondaryCursorValue any

bytesConverter := func(value any) any {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, instead of removing this entirely, let’s add an MSSQL-specific check; otherwise, we’d need to test all other drivers.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not break anything. As there is no change in the code flow. I am doing the same thing now but in a different place

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have also verified with 3 models

Copy link
Copy Markdown
Collaborator

@vikaxsh vikaxsh Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, for now we should implement for mssql only

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Done

// MSSQL has type-aware normalization in the driver.
if driverType == constants.MSSQL {
return value
}

switch v := value.(type) {
case []byte:
return string(v)
Expand Down
Loading