diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index ccca5e1f52..77ac2e202d 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -516,6 +516,26 @@ func getSchemaID(tableMap map[int64]*BasicTableInfo, tableID int64) int64 { return tableInfo.SchemaID } +// schemaName should be "Name.O" +func findSchemaIDByName(databaseMap map[int64]*BasicDatabaseInfo, schemaName string) (int64, bool) { + for id, info := range databaseMap { + if info.Name == schemaName { + return id, true + } + } + return 0, false +} + +// tableName should be "Name.O" +func findTableIDByName(tableMap map[int64]*BasicTableInfo, schemaID int64, tableName string) (int64, bool) { + for id, info := range tableMap { + if info.SchemaID == schemaID && info.Name == tableName { + return id, true + } + } + return 0, false +} + // ======= // buildPersistedDDLEventFunc start // ======= @@ -587,9 +607,55 @@ func buildPersistedDDLEventForCreateTable(args buildPersistedDDLEventFuncArgs) P event := buildPersistedDDLEventCommon(args) event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) event.TableName = event.TableInfo.Name.O + setReferTableForCreateTableLike(&event, args) return event } +func setReferTableForCreateTableLike(event *PersistedDDLEvent, args buildPersistedDDLEventFuncArgs) { + if event.Query == "" { + return + } + stmt, err := parser.New().ParseOneStmt(event.Query, "", "") + if err != nil { + log.Error("parse create table ddl failed", + zap.String("query", event.Query), + zap.Error(err)) + return + } + createStmt, ok := stmt.(*ast.CreateTableStmt) + if !ok || createStmt.ReferTable == nil { + return + } + refTable := createStmt.ReferTable.Name.O + refSchema := createStmt.ReferTable.Schema.O + if refSchema == "" { + refSchema = event.SchemaName + } + refSchemaID, ok := findSchemaIDByName(args.databaseMap, refSchema) + if !ok { + log.Warn("refer schema not found for create table like", + zap.String("schema", refSchema), + zap.String("table", refTable), + zap.String("query", event.Query)) + return + } + refTableID, ok := findTableIDByName(args.tableMap, refSchemaID, refTable) + if !ok { + log.Warn("refer table not found for create table like", + zap.String("schema", refSchema), + zap.String("table", refTable), + zap.String("query", event.Query)) + return + } + event.ExtraTableID = refTableID + if partitions, ok := args.partitionMap[refTableID]; ok { + event.ReferTablePartitionIDs = event.ReferTablePartitionIDs[:0] + for id := range partitions { + event.ReferTablePartitionIDs = append(event.ReferTablePartitionIDs, id) + } + } +} + func buildPersistedDDLEventForDropTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) @@ -871,8 +937,8 @@ func updateDDLHistoryForSchemaDDL(args updateDDLHistoryFuncArgs) []uint64 { func updateDDLHistoryForAddDropTable(args updateDDLHistoryFuncArgs) []uint64 { args.appendTableTriggerDDLHistory(args.ddlEvent.FinishedTs) - // Note: for create table, this ddl event will not be sent to table dispatchers. - // add it to ddl history is just for building table info store. + // Note: for create table, this ddl event will not be sent to table dispatchers by default. + // adding it to ddl history is just for building table info store. if isPartitionTable(args.ddlEvent.TableInfo) { // for partition table, we only care the ddl history of physical table ids. for _, partitionID := range getAllPartitionIDs(args.ddlEvent.TableInfo) { @@ -881,6 +947,13 @@ func updateDDLHistoryForAddDropTable(args updateDDLHistoryFuncArgs) []uint64 { } else { args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.TableID) } + if args.ddlEvent.Type == byte(model.ActionCreateTable) && args.ddlEvent.ExtraTableID != 0 { + if len(args.ddlEvent.ReferTablePartitionIDs) > 0 { + args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.ReferTablePartitionIDs...) + } else { + args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.ExtraTableID) + } + } return args.tableTriggerDDLHistory } @@ -1382,6 +1455,21 @@ func extractTableInfoFuncForSingleTableDDL(event *PersistedDDLEvent, tableID int return common.WrapTableInfo(event.SchemaName, event.TableInfo), false } } + + // The DDL "CREATE TABLE ... LIKE ..." may be added to the ddl history of the referenced table + // (or its physical partition IDs) to ensure those tables are blocked while this DDL is executing. + // It does not change the schema of the referenced table itself, so we should ignore it when + // building the table info store for the referenced table. + if event.Type == byte(model.ActionCreateTable) && event.ExtraTableID != 0 { + if tableID == event.ExtraTableID { + return nil, false + } + for _, id := range event.ReferTablePartitionIDs { + if tableID == id { + return nil, false + } + } + } log.Panic("should not reach here", zap.Any("type", event.Type), zap.String("query", event.Query), @@ -1753,6 +1841,13 @@ func buildDDLEventForNewTableDDL(rawEvent *PersistedDDLEvent, tableFilter filter InfluenceType: commonEvent.InfluenceTypeNormal, TableIDs: []int64{common.DDLSpanTableID}, } + if rawEvent.ExtraTableID != 0 { + if len(rawEvent.ReferTablePartitionIDs) > 0 { + ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, rawEvent.ReferTablePartitionIDs...) + } else { + ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, rawEvent.ExtraTableID) + } + } if isPartitionTable(rawEvent.TableInfo) { physicalIDs := getAllPartitionIDs(rawEvent.TableInfo) ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, len(physicalIDs)) @@ -1781,6 +1876,28 @@ func buildDDLEventForNewTableDDL(rawEvent *PersistedDDLEvent, tableFilter filter }, }, } + if rawEvent.Query != "" { + stmt, err := parser.New().ParseOneStmt(rawEvent.Query, "", "") + if err != nil { + log.Error("parse create table ddl failed", + zap.String("query", rawEvent.Query), + zap.Error(err)) + return ddlEvent, false, err + } + if createStmt, ok := stmt.(*ast.CreateTableStmt); ok && createStmt.ReferTable != nil { + refTable := createStmt.ReferTable.Name.O + refSchema := createStmt.ReferTable.Schema.O + if refSchema == "" { + refSchema = rawEvent.SchemaName + } + ddlEvent.BlockedTableNames = []commonEvent.SchemaTableName{ + { + SchemaName: refSchema, + TableName: refTable, + }, + } + } + } return ddlEvent, true, err } diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index aff8bacd35..fd22723db9 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -3129,3 +3129,171 @@ func TestRenameTable(t *testing.T) { }) assert.Equal(t, "RENAME TABLE `test`.`t1` TO `test`.`t2`", ddl.Query) } + +func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) { + cases := []struct { + name string + query string + expected []commonEvent.SchemaTableName + }{ + { + name: "default schema", + query: "CREATE TABLE `b` LIKE `a`", + expected: []commonEvent.SchemaTableName{ + {SchemaName: "test", TableName: "a"}, + }, + }, + { + name: "explicit schema", + query: "CREATE TABLE `b` LIKE `other`.`a`", + expected: []commonEvent.SchemaTableName{ + {SchemaName: "other", TableName: "a"}, + }, + }, + } + + for _, tc := range cases { + rawEvent := &PersistedDDLEvent{ + Type: byte(model.ActionCreateTable), + SchemaID: 1, + TableID: 2, + SchemaName: "test", + TableName: "b", + Query: tc.query, + TableInfo: &model.TableInfo{}, + } + + ddlEvent, ok, err := buildDDLEventForNewTableDDL(rawEvent, nil, 0) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, tc.expected, ddlEvent.BlockedTableNames) + } +} + +func TestBuildPersistedDDLEventForCreateTableLikeSetsReferTableID(t *testing.T) { + cases := []struct { + name string + query string + partitionIDs []int64 + expectedReferID int64 + }{ + { + name: "non partition refer table", + query: "CREATE TABLE `b` LIKE `a`", + partitionIDs: nil, + expectedReferID: 101, + }, + { + name: "partition refer table", + query: "CREATE TABLE `b` LIKE `a`", + partitionIDs: []int64{111, 112}, + expectedReferID: 101, + }, + } + + for _, tc := range cases { + job := buildCreateTableJobForTest(100, 200, "b", 1010) + job.Query = tc.query + partitionMap := map[int64]BasicPartitionInfo{} + if len(tc.partitionIDs) > 0 { + partitionInfo := make(BasicPartitionInfo) + for _, id := range tc.partitionIDs { + partitionInfo[id] = nil + } + partitionMap[tc.expectedReferID] = partitionInfo + } + ddl := buildPersistedDDLEventForCreateTable(buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + 100: {Name: "test", Tables: map[int64]bool{101: true, 200: true}}, + }, + tableMap: map[int64]*BasicTableInfo{ + 101: {SchemaID: 100, Name: "a"}, + 200: {SchemaID: 100, Name: "b"}, + }, + partitionMap: partitionMap, + }) + require.Equal(t, tc.expectedReferID, ddl.ExtraTableID, tc.name) + if len(tc.partitionIDs) > 0 { + require.ElementsMatch(t, tc.partitionIDs, ddl.ReferTablePartitionIDs, tc.name) + } else { + require.Empty(t, ddl.ReferTablePartitionIDs, tc.name) + } + } +} + +func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTables(t *testing.T) { + rawEvent := &PersistedDDLEvent{ + Type: byte(model.ActionCreateTable), + SchemaID: 1, + TableID: 2, + SchemaName: "test", + TableName: "b", + Query: "CREATE TABLE `b` LIKE `a`", + TableInfo: &model.TableInfo{}, + ExtraTableID: 101, + } + ddlEvent, ok, err := buildDDLEventForNewTableDDL(rawEvent, nil, 0) + require.NoError(t, err) + require.True(t, ok) + require.ElementsMatch(t, []int64{common.DDLSpanTableID, 101}, ddlEvent.BlockedTables.TableIDs) + + rawEvent.ReferTablePartitionIDs = []int64{111, 112} + ddlEvent, ok, err = buildDDLEventForNewTableDDL(rawEvent, nil, 0) + require.NoError(t, err) + require.True(t, ok) + require.ElementsMatch(t, []int64{common.DDLSpanTableID, 111, 112}, ddlEvent.BlockedTables.TableIDs) +} + +func TestUpdateDDLHistoryForAddDropTable_CreateTableLikeAddsReferTable(t *testing.T) { + args := updateDDLHistoryFuncArgs{ + ddlEvent: &PersistedDDLEvent{ + Type: byte(model.ActionCreateTable), + TableID: 200, + ExtraTableID: 101, + FinishedTs: 10, + TableInfo: &model.TableInfo{}, + }, + tablesDDLHistory: map[int64][]uint64{}, + tableTriggerDDLHistory: []uint64{}, + } + updateDDLHistoryForAddDropTable(args) + require.Equal(t, []uint64{10}, args.tablesDDLHistory[200]) + require.Equal(t, []uint64{10}, args.tablesDDLHistory[101]) + + args.ddlEvent.ReferTablePartitionIDs = []int64{111, 112} + args.tablesDDLHistory = map[int64][]uint64{} + args.tableTriggerDDLHistory = []uint64{} + updateDDLHistoryForAddDropTable(args) + require.Equal(t, []uint64{10}, args.tablesDDLHistory[200]) + require.Equal(t, []uint64{10}, args.tablesDDLHistory[111]) + require.Equal(t, []uint64{10}, args.tablesDDLHistory[112]) + require.Empty(t, args.tablesDDLHistory[101]) +} + +func TestExtractTableInfoFuncForSingleTableDDL_CreateTableLikeReferTableIgnored(t *testing.T) { + rawEvent := &PersistedDDLEvent{ + Type: byte(model.ActionCreateTable), + TableID: 140, + ExtraTableID: 138, + Query: "CREATE TABLE `b` LIKE `a`", + } + + require.NotPanics(t, func() { + tableInfo, deleted := extractTableInfoFuncForSingleTableDDL(rawEvent, 138) + require.Nil(t, tableInfo) + require.False(t, deleted) + }) + + rawEvent.ReferTablePartitionIDs = []int64{111, 112} + require.NotPanics(t, func() { + tableInfo, deleted := extractTableInfoFuncForSingleTableDDL(rawEvent, 111) + require.Nil(t, tableInfo) + require.False(t, deleted) + }) + require.NotPanics(t, func() { + tableInfo, deleted := extractTableInfoFuncForSingleTableDDL(rawEvent, 112) + require.Nil(t, tableInfo) + require.False(t, deleted) + }) +} diff --git a/logservice/schemastore/types.go b/logservice/schemastore/types.go index 1dfb164ebd..5b16ed4eb3 100644 --- a/logservice/schemastore/types.go +++ b/logservice/schemastore/types.go @@ -61,6 +61,9 @@ type PersistedDDLEvent struct { // the following fields are only set when the ddl job involves a partition table // it is the partition info of the table before this ddl PrevPartitions []int64 `msg:"prev_partitions"` + // ReferTablePartitionIDs is only set for CREATE TABLE ... LIKE ... when the referenced table is partitioned. + // It records the physical partition IDs of the referenced table. + ReferTablePartitionIDs []int64 `msg:"refer_table_partitions"` Query string `msg:"query"` SchemaVersion int64 `msg:"schema_version"` diff --git a/logservice/schemastore/types_gen.go b/logservice/schemastore/types_gen.go index a08a59b2d6..d5f5b033f1 100644 --- a/logservice/schemastore/types_gen.go +++ b/logservice/schemastore/types_gen.go @@ -198,6 +198,25 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { return } } + case "refer_table_partitions": + var zb0008 uint32 + zb0008, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "ReferTablePartitionIDs") + return + } + if cap(z.ReferTablePartitionIDs) >= int(zb0008) { + z.ReferTablePartitionIDs = (z.ReferTablePartitionIDs)[:zb0008] + } else { + z.ReferTablePartitionIDs = make([]int64, zb0008) + } + for za0007 := range z.ReferTablePartitionIDs { + z.ReferTablePartitionIDs[za0007], err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "ReferTablePartitionIDs", za0007) + return + } + } case "query": z.Query, err = dc.ReadString() if err != nil { @@ -235,21 +254,21 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { return } case "multi_table_info_value": - var zb0008 uint32 - zb0008, err = dc.ReadArrayHeader() + var zb0009 uint32 + zb0009, err = dc.ReadArrayHeader() if err != nil { err = msgp.WrapError(err, "MultipleTableInfosValue") return } - if cap(z.MultipleTableInfosValue) >= int(zb0008) { - z.MultipleTableInfosValue = (z.MultipleTableInfosValue)[:zb0008] + if cap(z.MultipleTableInfosValue) >= int(zb0009) { + z.MultipleTableInfosValue = (z.MultipleTableInfosValue)[:zb0009] } else { - z.MultipleTableInfosValue = make([][]byte, zb0008) + z.MultipleTableInfosValue = make([][]byte, zb0009) } - for za0007 := range z.MultipleTableInfosValue { - z.MultipleTableInfosValue[za0007], err = dc.ReadBytes(z.MultipleTableInfosValue[za0007]) + for za0008 := range z.MultipleTableInfosValue { + z.MultipleTableInfosValue[za0008], err = dc.ReadBytes(z.MultipleTableInfosValue[za0008]) if err != nil { - err = msgp.WrapError(err, "MultipleTableInfosValue", za0007) + err = msgp.WrapError(err, "MultipleTableInfosValue", za0008) return } } @@ -278,9 +297,9 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 25 + // map header, size 26 // write "id" - err = en.Append(0xde, 0x0, 0x19, 0xa2, 0x69, 0x64) + err = en.Append(0xde, 0x0, 0x1a, 0xa2, 0x69, 0x64) if err != nil { return } @@ -481,6 +500,23 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { return } } + // write "refer_table_partitions" + err = en.Append(0xb6, 0x72, 0x65, 0x66, 0x65, 0x72, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.ReferTablePartitionIDs))) + if err != nil { + err = msgp.WrapError(err, "ReferTablePartitionIDs") + return + } + for za0007 := range z.ReferTablePartitionIDs { + err = en.WriteInt64(z.ReferTablePartitionIDs[za0007]) + if err != nil { + err = msgp.WrapError(err, "ReferTablePartitionIDs", za0007) + return + } + } // write "query" err = en.Append(0xa5, 0x71, 0x75, 0x65, 0x72, 0x79) if err != nil { @@ -551,10 +587,10 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "MultipleTableInfosValue") return } - for za0007 := range z.MultipleTableInfosValue { - err = en.WriteBytes(z.MultipleTableInfosValue[za0007]) + for za0008 := range z.MultipleTableInfosValue { + err = en.WriteBytes(z.MultipleTableInfosValue[za0008]) if err != nil { - err = msgp.WrapError(err, "MultipleTableInfosValue", za0007) + err = msgp.WrapError(err, "MultipleTableInfosValue", za0008) return } } @@ -584,9 +620,9 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 25 + // map header, size 26 // string "id" - o = append(o, 0xde, 0x0, 0x19, 0xa2, 0x69, 0x64) + o = append(o, 0xde, 0x0, 0x1a, 0xa2, 0x69, 0x64) o = msgp.AppendInt64(o, z.ID) // string "type" o = append(o, 0xa4, 0x74, 0x79, 0x70, 0x65) @@ -651,6 +687,12 @@ func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { for za0006 := range z.PrevPartitions { o = msgp.AppendInt64(o, z.PrevPartitions[za0006]) } + // string "refer_table_partitions" + o = append(o, 0xb6, 0x72, 0x65, 0x66, 0x65, 0x72, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.ReferTablePartitionIDs))) + for za0007 := range z.ReferTablePartitionIDs { + o = msgp.AppendInt64(o, z.ReferTablePartitionIDs[za0007]) + } // string "query" o = append(o, 0xa5, 0x71, 0x75, 0x65, 0x72, 0x79) o = msgp.AppendString(o, z.Query) @@ -672,8 +714,8 @@ func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { // string "multi_table_info_value" o = append(o, 0xb6, 0x6d, 0x75, 0x6c, 0x74, 0x69, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) o = msgp.AppendArrayHeader(o, uint32(len(z.MultipleTableInfosValue))) - for za0007 := range z.MultipleTableInfosValue { - o = msgp.AppendBytes(o, z.MultipleTableInfosValue[za0007]) + for za0008 := range z.MultipleTableInfosValue { + o = msgp.AppendBytes(o, z.MultipleTableInfosValue[za0008]) } // string "bdr_role" o = append(o, 0xa8, 0x62, 0x64, 0x72, 0x5f, 0x72, 0x6f, 0x6c, 0x65) @@ -876,6 +918,25 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { return } } + case "refer_table_partitions": + var zb0008 uint32 + zb0008, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReferTablePartitionIDs") + return + } + if cap(z.ReferTablePartitionIDs) >= int(zb0008) { + z.ReferTablePartitionIDs = (z.ReferTablePartitionIDs)[:zb0008] + } else { + z.ReferTablePartitionIDs = make([]int64, zb0008) + } + for za0007 := range z.ReferTablePartitionIDs { + z.ReferTablePartitionIDs[za0007], bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReferTablePartitionIDs", za0007) + return + } + } case "query": z.Query, bts, err = msgp.ReadStringBytes(bts) if err != nil { @@ -913,21 +974,21 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { return } case "multi_table_info_value": - var zb0008 uint32 - zb0008, bts, err = msgp.ReadArrayHeaderBytes(bts) + var zb0009 uint32 + zb0009, bts, err = msgp.ReadArrayHeaderBytes(bts) if err != nil { err = msgp.WrapError(err, "MultipleTableInfosValue") return } - if cap(z.MultipleTableInfosValue) >= int(zb0008) { - z.MultipleTableInfosValue = (z.MultipleTableInfosValue)[:zb0008] + if cap(z.MultipleTableInfosValue) >= int(zb0009) { + z.MultipleTableInfosValue = (z.MultipleTableInfosValue)[:zb0009] } else { - z.MultipleTableInfosValue = make([][]byte, zb0008) + z.MultipleTableInfosValue = make([][]byte, zb0009) } - for za0007 := range z.MultipleTableInfosValue { - z.MultipleTableInfosValue[za0007], bts, err = msgp.ReadBytesBytes(bts, z.MultipleTableInfosValue[za0007]) + for za0008 := range z.MultipleTableInfosValue { + z.MultipleTableInfosValue[za0008], bts, err = msgp.ReadBytesBytes(bts, z.MultipleTableInfosValue[za0008]) if err != nil { - err = msgp.WrapError(err, "MultipleTableInfosValue", za0007) + err = msgp.WrapError(err, "MultipleTableInfosValue", za0008) return } } @@ -969,9 +1030,9 @@ func (z *PersistedDDLEvent) Msgsize() (s int) { for za0005 := range z.ExtraTableNames { s += msgp.StringPrefixSize + len(z.ExtraTableNames[za0005]) } - s += 16 + msgp.ArrayHeaderSize + (len(z.PrevPartitions) * (msgp.Int64Size)) + 6 + msgp.StringPrefixSize + len(z.Query) + 15 + msgp.Int64Size + 12 + msgp.Uint64Size + 9 + msgp.Uint64Size + 17 + msgp.BytesPrefixSize + len(z.TableInfoValue) + 23 + msgp.BytesPrefixSize + len(z.ExtraTableInfoValue) + 23 + msgp.ArrayHeaderSize - for za0007 := range z.MultipleTableInfosValue { - s += msgp.BytesPrefixSize + len(z.MultipleTableInfosValue[za0007]) + s += 16 + msgp.ArrayHeaderSize + (len(z.PrevPartitions) * (msgp.Int64Size)) + 23 + msgp.ArrayHeaderSize + (len(z.ReferTablePartitionIDs) * (msgp.Int64Size)) + 6 + msgp.StringPrefixSize + len(z.Query) + 15 + msgp.Int64Size + 12 + msgp.Uint64Size + 9 + msgp.Uint64Size + 17 + msgp.BytesPrefixSize + len(z.TableInfoValue) + 23 + msgp.BytesPrefixSize + len(z.ExtraTableInfoValue) + 23 + msgp.ArrayHeaderSize + for za0008 := range z.MultipleTableInfosValue { + s += msgp.BytesPrefixSize + len(z.MultipleTableInfosValue[za0008]) } s += 9 + msgp.StringPrefixSize + len(z.BDRRole) + 17 + msgp.Uint64Size return diff --git a/pkg/sink/mysql/helper.go b/pkg/sink/mysql/helper.go index 4b89c1f0ee..4dbc7a9891 100644 --- a/pkg/sink/mysql/helper.go +++ b/pkg/sink/mysql/helper.go @@ -397,17 +397,6 @@ func needSwitchDB(event *commonEvent.DDLEvent) bool { return true } -func needWaitAsyncExecDone(t timodel.ActionType) bool { - switch t { - case timodel.ActionCreateTable, timodel.ActionCreateTables: - return false - case timodel.ActionCreateSchema: - return false - default: - return true - } -} - func getTiDBVersion(db *sql.DB) version.ServerInfo { versionInfo, err := export.SelectVersion(db) if err != nil { diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index 710fc63360..b1f29539db 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -192,10 +192,6 @@ func (w *Writer) waitDDLDone(ctx context.Context, ddl *commonEvent.DDLEvent, ddl // waitAsyncDDLDone wait previous ddl func (w *Writer) waitAsyncDDLDone(event *commonEvent.DDLEvent) { - if !needWaitAsyncExecDone(event.GetDDLType()) { - return - } - switch event.GetBlockedTables().InfluenceType { // db-class, all-class ddl with not affect by async ddl, just return case commonEvent.InfluenceTypeDB, commonEvent.InfluenceTypeAll: diff --git a/pkg/sink/mysql/mysql_writer_test.go b/pkg/sink/mysql/mysql_writer_test.go index 0cbc2ba5c5..3087c94d6b 100644 --- a/pkg/sink/mysql/mysql_writer_test.go +++ b/pkg/sink/mysql/mysql_writer_test.go @@ -445,6 +445,32 @@ func TestMysqlWriter_RemoveDDLTsTable(t *testing.T) { require.NoError(t, err) } +func TestWaitAsyncDDLDone_CreateTableLikeShouldQueryDownstreamAddIndexJob(t *testing.T) { + writer, db, mock := newTestMysqlWriterForTiDB(t) + defer db.Close() + + event := &commonEvent.DDLEvent{ + Type: byte(timodel.ActionCreateTable), + BlockedTables: &commonEvent.InfluencedTables{ + InfluenceType: commonEvent.InfluenceTypeNormal, + TableIDs: []int64{0}, + }, + BlockedTableNames: []commonEvent.SchemaTableName{ + { + SchemaName: "test", + TableName: "a", + }, + }, + } + + expected := fmt.Sprintf(checkRunningAddIndexSQL, "test", "a") + mock.ExpectQuery(expected). + WillReturnRows(sqlmock.NewRows([]string{"JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "STATE", "QUERY"})) + + writer.waitAsyncDDLDone(event) + require.NoError(t, mock.ExpectationsWereMet()) +} + // Test the async ddl can be write successfully func TestMysqlWriter_AsyncDDL(t *testing.T) { writer, db, mock := newTestMysqlWriterForTiDB(t) diff --git a/tests/integration_tests/ddl_wait/run.sh b/tests/integration_tests/ddl_wait/run.sh index 6f0d53598d..847b39c83e 100755 --- a/tests/integration_tests/ddl_wait/run.sh +++ b/tests/integration_tests/ddl_wait/run.sh @@ -32,10 +32,25 @@ function run() { cdc_cli_changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id} run_sql "alter table test.t modify column col decimal(30,10);" - run_sql "alter table test.t add index (col);" + run_sql "alter table test.t add index idx_col (col);" + # The downstream add index DDL may finish quickly with fast reorg enabled, + # so we need a short fixed-interval polling to avoid missing the running window. + for i in $(seq 1 120); do + run_sql 'SELECT JOB_ID FROM information_schema.ddl_jobs WHERE DB_NAME = "test" AND TABLE_NAME = "t" AND JOB_TYPE LIKE "add index%" AND (STATE = "running" OR STATE = "queueing") LIMIT 1;' \ + "${DOWN_TIDB_HOST}" "${DOWN_TIDB_PORT}" >/dev/null 2>&1 || true + if check_contains 'JOB_ID:' >/dev/null 2>&1; then + break + fi + sleep 0.5 + done + check_contains 'JOB_ID:' + run_sql "create table test.t_like like test.t;" run_sql "insert into test.t values (1, 1);" run_sql "create table test.finish_mark (a int primary key);" check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists test.t_like ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + run_sql "show index from test.t_like where Key_name='idx_col';" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "Key_name: idx_col" # ensure all dml / ddl related to test.t finish check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 300