diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index e5b446d259..a3059ade53 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -524,6 +524,14 @@ func (w *writer) appendRow2Group(dml *commonEvent.DMLEvent, progress *partitionP group = util.NewEventsGroup(progress.partition, tableID) progress.eventsGroup[tableID] = group } + if commitTs < progress.watermark { + log.Warn("DML Event fallback row, since less than the partition watermark, ignore it", + zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition), + zap.Uint64("commitTs", commitTs), zap.Any("offset", offset), + zap.Uint64("watermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.String("schema", schema), zap.String("table", table)) + return + } if commitTs >= group.HighWatermark { group.Append(dml, false) log.Info("DML event append to the group", diff --git a/cmd/pulsar-consumer/writer.go b/cmd/pulsar-consumer/writer.go index 2b4cc86c15..defc2383f4 100644 --- a/cmd/pulsar-consumer/writer.go +++ b/cmd/pulsar-consumer/writer.go @@ -434,6 +434,13 @@ func (w *writer) appendRow2Group(dml *commonEvent.DMLEvent, progress *partitionP group = util.NewEventsGroup(progress.partition, tableID) progress.eventsGroup[tableID] = group } + if commitTs < progress.watermark { + log.Warn("DML Event fallback row, since less than the partition watermark, ignore it", + zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition), + zap.Uint64("commitTs", commitTs), zap.Uint64("watermark", progress.watermark), + zap.String("schema", schema), zap.String("table", table)) + return + } if commitTs >= group.HighWatermark { group.Append(dml, false) log.Info("DML event append to the group", diff --git a/pkg/sink/codec/common/ddl.go b/pkg/sink/codec/common/ddl.go index 79de6f7ef0..c90acf7fc3 100644 --- a/pkg/sink/codec/common/ddl.go +++ b/pkg/sink/codec/common/ddl.go @@ -160,7 +160,6 @@ func GetBlockedTables( tableName = ddl.TableName action = timodel.ActionType(ddl.Type) ) - blockedTableIDs := accessor.GetBlockedTables(schemaName, tableName) if action == timodel.ActionRenameTable { stmt, err := parser.New().ParseOneStmt(ddl.Query, "", "") if err != nil { @@ -175,9 +174,8 @@ func GetBlockedTables( ddl.ExtraSchemaName = schemaName ddl.ExtraTableName = tableName - extraTableIDs := accessor.GetBlockedTables(schemaName, tableName) - blockedTableIDs = append(blockedTableIDs, extraTableIDs...) } + blockedTableIDs := accessor.GetBlockedTables(schemaName, tableName) if action == timodel.ActionExchangeTablePartition { stmt, err := parser.New().ParseOneStmt(ddl.Query, "", "") diff --git a/pkg/sink/codec/debezium/helper.go b/pkg/sink/codec/debezium/helper.go index 58a4da19fe..f23f3333b4 100644 --- a/pkg/sink/codec/debezium/helper.go +++ b/pkg/sink/codec/debezium/helper.go @@ -117,11 +117,11 @@ func parseBit(s string, n int) string { } func getValueFromDefault(defaultVal any, tp *types.FieldType) any { - // defaultValue shoul be string + // defaultValue should be string // see https://github.com/pingcap/tidb/blob/72b1b7c564c301de33a4bd335a05770c78528db4/pkg/ddl/add_column.go#L791 val, ok := defaultVal.(string) if !ok { - log.Debug("default value is not string", zap.Any("defaultValue", defaultVal)) + log.Warn("default value is not string", zap.Any("defaultValue", defaultVal)) return defaultVal } // TODO: more data types need be consider