-
Notifications
You must be signed in to change notification settings - Fork 297
ticdc: MySQL sink dispatcher routing #12435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release-8.5
Are you sure you want to change the base?
Conversation
|
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Hi @a-cong. Thanks for your PR. I'm waiting for a pingcap member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Welcome @a-cong! |
213bbd7 to
676be0e
Compare
cdc/model/sink.go
Outdated
| sourceTables, err := dmparser.FetchDDLTables(defaultSchema, stmts[0], conn.LCTableNamesSensitive) | ||
| if err != nil { | ||
| log.Warn("failed to fetch source tables for sink routing, keeping original query", | ||
| zap.String("query", d.Query), zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to fall back here with the warning or explicitly return an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use an explicit error to avoid data inconsistency.
cdc/model/sink.go
Outdated
| if err != nil { | ||
| // If rewriting fails, keep the original query | ||
| log.Warn("failed to rewrite DDL for sink routing, keeping original query", | ||
| zap.String("query", d.Query), zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements schema and table routing for MySQL-compatible sinks by extending the dispatchers configuration field, which was previously only used for MQ sinks (Kafka/Pulsar topic and partition routing). The implementation adds new TargetSchema and TargetTable fields to the TableName struct to maintain separation between source (for pullers) and target (for sinks) schemas and tables.
Key Changes:
- Extended
DispatchRuleconfiguration withSchemaRuleandTableRulefields supporting template expressions like{schema},{table}, and static values - Implemented
SinkRouterandDispatcherabstractions for schema/table name transformation - Added DDL query rewriting using DM parser's
FetchDDLTablesandRenameDDLTablefunctions to maintain correct table reference ordering
Reviewed changes
Copilot reviewed 29 out of 29 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
pkg/config/sink.go |
Added SchemaRule and TableRule fields to DispatchRule, updated comments, added ValidateRoutingExpression function |
pkg/config/replica_config.go |
Added validation for sink routing rules, ensuring they're only used with MySQL sinks |
pkg/config/replica_config_test.go |
Added comprehensive tests for routing validation |
cdc/sink/dispatcher/dispatcher.go |
Implemented Dispatcher interface and DynamicSchemaDispatcher with expression substitution |
cdc/sink/dispatcher/dispatcher_test.go |
Added extensive unit tests for dispatcher transformations |
cdc/sink/dispatcher/sink_router.go |
Implemented SinkRouter to manage routing rules with filter matching |
cdc/sink/dispatcher/sink_router_test.go |
Added comprehensive tests for router behavior including case sensitivity |
cdc/model/sink.go |
Added TargetSchema/TargetTable fields, QuoteSinkString() method, and DDL routing logic via applySinkRouting() |
cdc/model/sink_test.go |
Added extensive tests for routing including DDL rewriting edge cases |
cdc/entry/schema_storage.go |
Integrated SinkRouter, added routing application for both initial snapshots and new tables |
cdc/entry/schema_storage_test.go |
Added integration tests verifying routing on pre-existing and newly created tables |
cdc/entry/mounter_test.go |
Added tests verifying DML events have correct TargetSchema/TargetTable |
pkg/sqlmodel/row_change.go |
Changed to use QuoteSinkString() instead of QuoteString() for SQL generation |
pkg/sqlmodel/row_change_test.go |
Added comprehensive tests for schema, table, and combined routing in DML generation |
pkg/sqlmodel/multirow.go |
Changed to use QuoteSinkString() for multi-row operations |
pkg/sqlmodel/multirow_test.go |
Added tests for multi-row DML with routing |
cdc/sink/ddlsink/mysql/mysql_ddl_sink.go |
Updated USE statement to use TargetSchema when set |
cdc/model/changefeed.go |
Modified rmMQOnlyFields() to preserve dispatch rules with schema/table routing |
cdc/api/v2/model.go |
Added SchemaRule and TableRule to API model's DispatchRule |
cdc/processor/processor.go |
Integrated SinkRouter creation in DDL handler initialization |
cdc/owner/changefeed.go |
Integrated SinkRouter creation in schema storage initialization |
| Various test files | Updated function signatures to pass sinkRouter parameter to NewSchemaStorage |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| IndexName string `json:"index,omitempty"` | ||
| Columns []string `json:"columns,omitempty"` | ||
| TopicRule string `json:"topic,omitempty"` | ||
| SchemaRule string `json:"schema,omitempty"` | ||
| TableRule string `json:"table,omitempty"` | ||
| } |
Copilot
AI
Dec 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment for DispatchRule is outdated and doesn't reflect the extended functionality. It says "represents partition rule for a table" but it now also handles schema/table routing for MySQL sinks. This should be updated to match the comprehensive comment in pkg/config/sink.go which correctly describes both MQ (topic/partition) and MySQL (schema/table routing) use cases.
|
Could you provide a case to demonstrate this feature? It's better to add an integration test to cover it. |
|
/ok-to-test |
676be0e to
acded41
Compare
|
/retest |
9a8a32d to
fe61fe8
Compare
| @@ -0,0 +1,27 @@ | |||
| -- Test DML operations | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It suggested add more DDLs and DMLs mixed queries here, to cover more scenario.
ca8c7ad to
769181d
Compare
|
/retest |
b594bd3 to
d0fb962
Compare
For TRUNCATE TABLE, job.TableID is the OLD table ID which no longer exists in the snapshot after the DDL is applied. The snapshot only contains the NEW table with a different ID from job.BinlogInfo.TableInfo.ID. This fix ensures sink routing is applied to the correct (new) table ID so that DML events after TRUNCATE are routed to the correct destination. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
d0fb962 to
f94f22b
Compare
|
/test all |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: 3AceShowHand, wk989898 The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/retest |
|
@benmeadowcroft PTAL |
|
I think this PR should be merged into the master branch first, and then cherry-pick to the release-8.5 branch. At the moment, it tries to merge into the release-8.5 directly, this is not standard operation. |
Oh, I thought we were not planning to merge this since we aren't upstreaming any more features in this repo - we will merge this change internally, but for upstreaming I will just merge this one when it's ready pingcap/ticdc#3704 |
got it, so let's only review the PR in the ticdc repository. |
What problem does this PR solve?
Issue Number: close #12434
What is changed and how it works?
dispatchersfield (currently used for mq sinks https://docs.pingcap.com/tidb/stable/ticdc-sink-to-kafka/#topic-dispatchers) to support schema/table routing for mysql sinks. The config looks like this:Dispatcherinterface routes the givensourceSchema, sourceTabletotargetSchema, targetTable.SinkRouterwraps the dispatchersTargetSchemaandTargetTablefields toTableNamestruct in cdc/model/sink.go, which are populated when the sink router is defined. This is handled in 2 ways - for tables that already exist when the changefeed is created/started, the fields are added to the existing snapshot of the schema. For tables that are created later, the fields are added to the new table before the new snapshot is savedTargetSchemaandTargetTablefor the USE statement inmysql_ddl_sink.go, and in new methodQuoteSinkString()which is called to construct mysql DMLs inpkg/sqlmodel/multirow.goandpkg/sqlmodel/row_change.goFetchDDLTablesandRenameDDLTablefrom the dm parser pkg to rewrite DDLs after they are constructed if the schema router is specified.FetchDDLTablesreturns all of the tables referenced in the DDL in the order that they appear in the DDL. We then use the schema router to map each source schema to its target schema and pass this list of tables toRenameDDLTableCheck List
Tests
Questions
Will it cause performance regression or break compatibility?
No, the additional latency for DDL rewrites should be minimal compared to barrier ts, and there should not be compatibility issues since this is a new feature
Do you need to update user documentation, design documentation or monitoring documentation?
Yes, this is a new feature for mysql sinks, will need to update https://docs.pingcap.com/tidb/stable/ticdc-sink-to-mysql/
Release note