Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,32 @@ import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.ZoneId
import java.time.ZoneOffset
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.*
import org.apache.kafka.connect.data.SchemaBuilder

class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val debeziumPropertiesKey: String = "temporal"

private var serverTimezone: String = "UTC"
override val handlers: List<RelationalColumnCustomConverter.Handler> =
listOf(
DatetimeMillisHandler,
DatetimeMicrosHandler,
DateHandler,
TimeHandler,
TimestampHandler
DatetimeMillisHandler(),
DatetimeMicrosHandler(),
DateHandler(),
TimeHandler(),
TimestampHandler()
)

data object DatetimeMillisHandler : RelationalColumnCustomConverter.Handler {
override fun configure(props: Properties?) {
serverTimezone = props?.getProperty("connectionTimezone") ?: "UTC"
}

inner class DatetimeMillisHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("DATETIME", ignoreCase = true) &&
Expand All @@ -48,7 +55,13 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null || it == 0) {
Converted("1970-01-01T00:00:00.000000")
} else {
NoConversion
}
},
PartialConverter {
if (it is LocalDateTime) {
Converted(it.format(LocalDateTimeCodec.formatter))
Expand All @@ -71,7 +84,7 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {
)
}

data object DatetimeMicrosHandler : RelationalColumnCustomConverter.Handler {
inner class DatetimeMicrosHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("DATETIME", ignoreCase = true) && column.length().orElse(0) > 3
Expand All @@ -80,7 +93,13 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null || it == 0) {
Converted("1970-01-01T00:00:00.000000")
}else {
NoConversion
}
},
PartialConverter {
if (it is LocalDateTime) {
Converted(it.format(LocalDateTimeCodec.formatter))
Expand All @@ -103,7 +122,7 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {
)
}

data object DateHandler : RelationalColumnCustomConverter.Handler {
inner class DateHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("DATE", ignoreCase = true)
Expand All @@ -112,7 +131,13 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null || it == 0) {
Converted("1970-01-01")
} else {
NoConversion
}
},
PartialConverter {
if (it is LocalDate) {
Converted(it.format(LocalDateCodec.formatter))
Expand All @@ -132,7 +157,7 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {
)
}

data object TimeHandler : RelationalColumnCustomConverter.Handler {
inner class TimeHandler : RelationalColumnCustomConverter.Handler {

override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("TIME", ignoreCase = true)
Expand Down Expand Up @@ -163,19 +188,31 @@ class MySqlSourceCdcTemporalConverter : RelationalColumnCustomConverter {
)
}

data object TimestampHandler : RelationalColumnCustomConverter.Handler {
inner class TimestampHandler : RelationalColumnCustomConverter.Handler {
override fun matches(column: RelationalColumn): Boolean =
column.typeName().equals("TIMESTAMP", ignoreCase = true)

override fun outputSchemaBuilder(): SchemaBuilder = SchemaBuilder.string()

override val partialConverters: List<PartialConverter> =
listOf(
NullFallThrough,
PartialConverter {
if (it == null || it == 0) {
Converted("1970-01-01T00:00:00.000000")
} else {
NoConversion
}
},
PartialConverter {
if (it is ZonedDateTime) {
val offsetDateTime: OffsetDateTime = it.toOffsetDateTime()
Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter))
if (serverTimezone != "UTC") {
val offsetDateTime = Instant.parse(it.toInstant().toString()).atZone(ZoneId.of(serverTimezone)).toLocalDateTime()
val formattedValue = offsetDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))
Converted(formattedValue)
}else {
val offsetDateTime: OffsetDateTime = it.toOffsetDateTime()
Converted(offsetDateTime.format(OffsetDateTimeCodec.formatter))
}
} else {
NoConversion
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,13 +434,15 @@ class MySqlSourceDebeziumOperations(
.withOffset()
.withSchemaHistory()
.withConverters(
MySqlSourceCdcBooleanConverter::class,
MySqlSourceCdcTemporalConverter::class
)

cdcIncrementalConfiguration.serverTimezone
?.takeUnless { it.isBlank() }
?.let { dbzPropertiesBuilder.withDatabase("connectionTimezone", it) }
?.let {
dbzPropertiesBuilder.withDatabase("connectionTimezone", it)
dbzPropertiesBuilder.with("temporal.connectionTimezone", it)
}

dbzPropertiesBuilder.buildMap()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ class MySqlSourceOperations :

private fun leafType(type: SystemType): JdbcFieldType<*> {
return when (MysqlType.getByName(type.typeName)) {
MysqlType.BIT -> if (type.precision == 1) BooleanFieldType else BytesFieldType
MysqlType.BOOLEAN -> BooleanFieldType
MysqlType.BIT,
MysqlType.TINYINT,
MysqlType.TINYINT_UNSIGNED,
MysqlType.YEAR -> ShortFieldType
Expand Down
Loading