Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 117 additions & 2 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,24 @@ func getSchemaID(tableMap map[int64]*BasicTableInfo, tableID int64) int64 {
return tableInfo.SchemaID
}

func findSchemaIDByName(databaseMap map[int64]*BasicDatabaseInfo, schemaName string) (int64, bool) {
for id, info := range databaseMap {
if strings.EqualFold(info.Name, schemaName) {
return id, true
}
}
return 0, false
}

func findTableIDByName(tableMap map[int64]*BasicTableInfo, schemaID int64, tableName string) (int64, bool) {
for id, info := range tableMap {
if info.SchemaID == schemaID && strings.EqualFold(info.Name, tableName) {
return id, true
}
}
return 0, false
}

// =======
// buildPersistedDDLEventFunc start
// =======
Expand Down Expand Up @@ -587,9 +605,55 @@ func buildPersistedDDLEventForCreateTable(args buildPersistedDDLEventFuncArgs) P
event := buildPersistedDDLEventCommon(args)
event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID)
event.TableName = event.TableInfo.Name.O
setReferTableForCreateTableLike(&event, args)
return event
}

func setReferTableForCreateTableLike(event *PersistedDDLEvent, args buildPersistedDDLEventFuncArgs) {
if event.Query == "" {
return
}
stmt, err := parser.New().ParseOneStmt(event.Query, "", "")
if err != nil {
log.Error("parse create table ddl failed",
zap.String("query", event.Query),
zap.Error(err))
return
}
createStmt, ok := stmt.(*ast.CreateTableStmt)
if !ok || createStmt.ReferTable == nil {
return
}
refTable := createStmt.ReferTable.Name.O
refSchema := createStmt.ReferTable.Schema.O
if refSchema == "" {
refSchema = event.SchemaName
}
refSchemaID, ok := findSchemaIDByName(args.databaseMap, refSchema)
if !ok {
log.Warn("refer schema not found for create table like",
zap.String("schema", refSchema),
zap.String("table", refTable),
zap.String("query", event.Query))
return
}
refTableID, ok := findTableIDByName(args.tableMap, refSchemaID, refTable)
if !ok {
log.Warn("refer table not found for create table like",
zap.String("schema", refSchema),
zap.String("table", refTable),
zap.String("query", event.Query))
return
}
event.ExtraTableID = refTableID
if partitions, ok := args.partitionMap[refTableID]; ok {
event.ReferTablePartitionIDs = event.ReferTablePartitionIDs[:0]
for id := range partitions {
event.ReferTablePartitionIDs = append(event.ReferTablePartitionIDs, id)
}
}
}

func buildPersistedDDLEventForDropTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent {
event := buildPersistedDDLEventCommon(args)
event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID)
Expand Down Expand Up @@ -871,8 +935,8 @@ func updateDDLHistoryForSchemaDDL(args updateDDLHistoryFuncArgs) []uint64 {

func updateDDLHistoryForAddDropTable(args updateDDLHistoryFuncArgs) []uint64 {
args.appendTableTriggerDDLHistory(args.ddlEvent.FinishedTs)
// Note: for create table, this ddl event will not be sent to table dispatchers.
// add it to ddl history is just for building table info store.
// Note: for create table, this ddl event will not be sent to table dispatchers by default.
// adding it to ddl history is just for building table info store.
if isPartitionTable(args.ddlEvent.TableInfo) {
// for partition table, we only care the ddl history of physical table ids.
for _, partitionID := range getAllPartitionIDs(args.ddlEvent.TableInfo) {
Expand All @@ -881,6 +945,13 @@ func updateDDLHistoryForAddDropTable(args updateDDLHistoryFuncArgs) []uint64 {
} else {
args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.TableID)
}
if args.ddlEvent.Type == byte(model.ActionCreateTable) && args.ddlEvent.ExtraTableID != 0 {
if len(args.ddlEvent.ReferTablePartitionIDs) > 0 {
args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.ReferTablePartitionIDs...)
} else {
args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.ExtraTableID)
}
}
return args.tableTriggerDDLHistory
}

Expand Down Expand Up @@ -1382,6 +1453,21 @@ func extractTableInfoFuncForSingleTableDDL(event *PersistedDDLEvent, tableID int
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
}
}

// The DDL "CREATE TABLE ... LIKE ..." may be added to the ddl history of the referenced table
// (or its physical partition IDs) to ensure those tables are blocked while this DDL is executing.
// It does not change the schema of the referenced table itself, so we should ignore it when
// building the table info store for the referenced table.
if event.Type == byte(model.ActionCreateTable) && event.ExtraTableID != 0 {
if tableID == event.ExtraTableID {
return nil, false
}
for _, id := range event.ReferTablePartitionIDs {
if tableID == id {
return nil, false
}
}
}
log.Panic("should not reach here",
zap.Any("type", event.Type),
zap.String("query", event.Query),
Expand Down Expand Up @@ -1753,6 +1839,13 @@ func buildDDLEventForNewTableDDL(rawEvent *PersistedDDLEvent, tableFilter filter
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{common.DDLSpanTableID},
}
if rawEvent.ExtraTableID != 0 {
if len(rawEvent.ReferTablePartitionIDs) > 0 {
ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, rawEvent.ReferTablePartitionIDs...)
} else {
ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, rawEvent.ExtraTableID)
}
}
if isPartitionTable(rawEvent.TableInfo) {
physicalIDs := getAllPartitionIDs(rawEvent.TableInfo)
ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, len(physicalIDs))
Expand Down Expand Up @@ -1781,6 +1874,28 @@ func buildDDLEventForNewTableDDL(rawEvent *PersistedDDLEvent, tableFilter filter
},
},
}
if rawEvent.Query != "" {
stmt, err := parser.New().ParseOneStmt(rawEvent.Query, "", "")
if err != nil {
log.Error("parse create table ddl failed",
zap.String("query", rawEvent.Query),
zap.Error(err))
return ddlEvent, false, err
}
if createStmt, ok := stmt.(*ast.CreateTableStmt); ok && createStmt.ReferTable != nil {
refTable := createStmt.ReferTable.Name.O
refSchema := createStmt.ReferTable.Schema.O
if refSchema == "" {
refSchema = rawEvent.SchemaName
}
ddlEvent.BlockedTableNames = []commonEvent.SchemaTableName{
{
SchemaName: refSchema,
TableName: refTable,
},
}
}
}
return ddlEvent, true, err
}

Expand Down
168 changes: 168 additions & 0 deletions logservice/schemastore/persist_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3129,3 +3129,171 @@ func TestRenameTable(t *testing.T) {
})
assert.Equal(t, "RENAME TABLE `test`.`t1` TO `test`.`t2`", ddl.Query)
}

func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) {
cases := []struct {
name string
query string
expected []commonEvent.SchemaTableName
}{
{
name: "default schema",
query: "CREATE TABLE `b` LIKE `a`",
expected: []commonEvent.SchemaTableName{
{SchemaName: "test", TableName: "a"},
},
},
{
name: "explicit schema",
query: "CREATE TABLE `b` LIKE `other`.`a`",
expected: []commonEvent.SchemaTableName{
{SchemaName: "other", TableName: "a"},
},
},
}

for _, tc := range cases {
rawEvent := &PersistedDDLEvent{
Type: byte(model.ActionCreateTable),
SchemaID: 1,
TableID: 2,
SchemaName: "test",
TableName: "b",
Query: tc.query,
TableInfo: &model.TableInfo{},
}

ddlEvent, ok, err := buildDDLEventForNewTableDDL(rawEvent, nil, 0)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, tc.expected, ddlEvent.BlockedTableNames)
}
}

func TestBuildPersistedDDLEventForCreateTableLikeSetsReferTableID(t *testing.T) {
cases := []struct {
name string
query string
partitionIDs []int64
expectedReferID int64
}{
{
name: "non partition refer table",
query: "CREATE TABLE `b` LIKE `a`",
partitionIDs: nil,
expectedReferID: 101,
},
{
name: "partition refer table",
query: "CREATE TABLE `b` LIKE `a`",
partitionIDs: []int64{111, 112},
expectedReferID: 101,
},
}

for _, tc := range cases {
job := buildCreateTableJobForTest(100, 200, "b", 1010)
job.Query = tc.query
partitionMap := map[int64]BasicPartitionInfo{}
if len(tc.partitionIDs) > 0 {
partitionInfo := make(BasicPartitionInfo)
for _, id := range tc.partitionIDs {
partitionInfo[id] = nil
}
partitionMap[tc.expectedReferID] = partitionInfo
}
ddl := buildPersistedDDLEventForCreateTable(buildPersistedDDLEventFuncArgs{
job: job,
databaseMap: map[int64]*BasicDatabaseInfo{
100: {Name: "test", Tables: map[int64]bool{101: true, 200: true}},
},
tableMap: map[int64]*BasicTableInfo{
101: {SchemaID: 100, Name: "a"},
200: {SchemaID: 100, Name: "b"},
},
partitionMap: partitionMap,
})
require.Equal(t, tc.expectedReferID, ddl.ExtraTableID, tc.name)
if len(tc.partitionIDs) > 0 {
require.ElementsMatch(t, tc.partitionIDs, ddl.ReferTablePartitionIDs, tc.name)
} else {
require.Empty(t, ddl.ReferTablePartitionIDs, tc.name)
}
}
}

func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTables(t *testing.T) {
rawEvent := &PersistedDDLEvent{
Type: byte(model.ActionCreateTable),
SchemaID: 1,
TableID: 2,
SchemaName: "test",
TableName: "b",
Query: "CREATE TABLE `b` LIKE `a`",
TableInfo: &model.TableInfo{},
ExtraTableID: 101,
}
ddlEvent, ok, err := buildDDLEventForNewTableDDL(rawEvent, nil, 0)
require.NoError(t, err)
require.True(t, ok)
require.ElementsMatch(t, []int64{common.DDLSpanTableID, 101}, ddlEvent.BlockedTables.TableIDs)

rawEvent.ReferTablePartitionIDs = []int64{111, 112}
ddlEvent, ok, err = buildDDLEventForNewTableDDL(rawEvent, nil, 0)
require.NoError(t, err)
require.True(t, ok)
require.ElementsMatch(t, []int64{common.DDLSpanTableID, 111, 112}, ddlEvent.BlockedTables.TableIDs)
}

func TestUpdateDDLHistoryForAddDropTable_CreateTableLikeAddsReferTable(t *testing.T) {
args := updateDDLHistoryFuncArgs{
ddlEvent: &PersistedDDLEvent{
Type: byte(model.ActionCreateTable),
TableID: 200,
ExtraTableID: 101,
FinishedTs: 10,
TableInfo: &model.TableInfo{},
},
tablesDDLHistory: map[int64][]uint64{},
tableTriggerDDLHistory: []uint64{},
}
updateDDLHistoryForAddDropTable(args)
require.Equal(t, []uint64{10}, args.tablesDDLHistory[200])
require.Equal(t, []uint64{10}, args.tablesDDLHistory[101])

args.ddlEvent.ReferTablePartitionIDs = []int64{111, 112}
args.tablesDDLHistory = map[int64][]uint64{}
args.tableTriggerDDLHistory = []uint64{}
updateDDLHistoryForAddDropTable(args)
require.Equal(t, []uint64{10}, args.tablesDDLHistory[200])
require.Equal(t, []uint64{10}, args.tablesDDLHistory[111])
require.Equal(t, []uint64{10}, args.tablesDDLHistory[112])
require.Empty(t, args.tablesDDLHistory[101])
}

func TestExtractTableInfoFuncForSingleTableDDL_CreateTableLikeReferTableIgnored(t *testing.T) {
rawEvent := &PersistedDDLEvent{
Type: byte(model.ActionCreateTable),
TableID: 140,
ExtraTableID: 138,
Query: "CREATE TABLE `b` LIKE `a`",
}

require.NotPanics(t, func() {
tableInfo, deleted := extractTableInfoFuncForSingleTableDDL(rawEvent, 138)
require.Nil(t, tableInfo)
require.False(t, deleted)
})

rawEvent.ReferTablePartitionIDs = []int64{111, 112}
require.NotPanics(t, func() {
tableInfo, deleted := extractTableInfoFuncForSingleTableDDL(rawEvent, 111)
require.Nil(t, tableInfo)
require.False(t, deleted)
})
require.NotPanics(t, func() {
tableInfo, deleted := extractTableInfoFuncForSingleTableDDL(rawEvent, 112)
require.Nil(t, tableInfo)
require.False(t, deleted)
})
}
3 changes: 3 additions & 0 deletions logservice/schemastore/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type PersistedDDLEvent struct {
// the following fields are only set when the ddl job involves a partition table
// it is the partition info of the table before this ddl
PrevPartitions []int64 `msg:"prev_partitions"`
// ReferTablePartitionIDs is only set for CREATE TABLE ... LIKE ... when the referenced table is partitioned.
// It records the physical partition IDs of the referenced table.
ReferTablePartitionIDs []int64 `msg:"refer_table_partitions"`

Query string `msg:"query"`
SchemaVersion int64 `msg:"schema_version"`
Expand Down
Loading