Skip to content

Conversation

@wk989898
Copy link
Collaborator

@wk989898 wk989898 commented Dec 31, 2025

What problem does this PR solve?

Issue Number: close #3662 revert #3845

What is changed and how it works?

This pull request addresses an issue where consumers might process DML events that are older than the current partition watermark, leading to potential data inconsistencies. By implementing a conditional check, the system now explicitly ignores these outdated events, ensuring that only relevant and ordered data is processed. This change enhances the robustness and correctness of data replication across different message queue integrations.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

@ti-chi-bot ti-chi-bot bot added release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/needs-triage-completed labels Dec 31, 2025
@gemini-code-assist
Copy link

Summary of Changes

Hello @wk989898, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses an issue where consumers might process DML events that are older than the current partition watermark, leading to potential data inconsistencies. By implementing a conditional check, the system now explicitly ignores these outdated events, ensuring that only relevant and ordered data is processed. This change enhances the robustness and correctness of data replication across different message queue integrations.

Highlights

  • DML Event Filtering: Introduced a new check in the consumer logic to ignore Data Manipulation Language (DML) events that have a commit timestamp older than the current partition watermark. This prevents processing of outdated or 'fallback' events.
  • Cross-Consumer Application: The logic to ignore fallback DML events has been applied to both the Kafka consumer (cmd/kafka-consumer/writer.go) and the Pulsar consumer (cmd/pulsar-consumer/writer.go), ensuring consistent behavior across different message queues.
  • Improved Logging: A warning log message is now generated whenever a DML event is ignored due to its commit timestamp being less than the partition watermark, providing better visibility into such occurrences.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@ti-chi-bot ti-chi-bot bot added size/S Denotes a PR that changes 10-29 lines, ignoring generated files. and removed do-not-merge/needs-triage-completed labels Dec 31, 2025
Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a check to ignore DML events with a commit timestamp older than the partition's watermark. This prevents processing of "fallback" or out-of-order events that are already considered resolved. The change is applied to both the Kafka and Pulsar consumers.

My review focuses on improving the clarity of the new log messages. The core logic of the change is correct and effectively addresses the issue.

Comment on lines +528 to +532
log.Warn("DML Event fallback row, since less than the partition watermark, ignore it",
zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition),
zap.Uint64("commitTs", commitTs), zap.Any("offset", offset),
zap.Uint64("watermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", schema), zap.String("table", table))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The log message is a bit verbose. The reason for ignoring the event ("since less than the partition watermark") is already evident from the logged commitTs and watermark fields. A more concise message would improve log readability and maintainability.

Suggested change
log.Warn("DML Event fallback row, since less than the partition watermark, ignore it",
zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition),
zap.Uint64("commitTs", commitTs), zap.Any("offset", offset),
zap.Uint64("watermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", schema), zap.String("table", table))
log.Warn("Ignore fallback DML event",
zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition),
zap.Uint64("commitTs", commitTs), zap.Any("offset", offset),
zap.Uint64("watermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", schema), zap.String("table", table))

Comment on lines +438 to +441
log.Warn("DML Event fallback row, since less than the partition watermark, ignore it",
zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition),
zap.Uint64("commitTs", commitTs), zap.Uint64("watermark", progress.watermark),
zap.String("schema", schema), zap.String("table", table))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The log message is a bit verbose. The reason for ignoring the event ("since less than the partition watermark") is already evident from the logged commitTs and watermark fields. A more concise message would improve log readability and maintainability.

Suggested change
log.Warn("DML Event fallback row, since less than the partition watermark, ignore it",
zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition),
zap.Uint64("commitTs", commitTs), zap.Uint64("watermark", progress.watermark),
zap.String("schema", schema), zap.String("table", table))
log.Warn("Ignore fallback DML event",
zap.Int64("tableID", tableID), zap.Int32("partition", group.Partition),
zap.Uint64("commitTs", commitTs), zap.Uint64("watermark", progress.watermark),
zap.String("schema", schema), zap.String("table", table))

val, ok := defaultVal.(string)
if !ok {
log.Debug("default value is not string", zap.Any("defaultValue", defaultVal))
log.Warn("default value is not string", zap.Any("defaultValue", defaultVal))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only used in the testing code, set it to panic.

@ti-chi-bot ti-chi-bot bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels Jan 4, 2026
@ti-chi-bot ti-chi-bot bot added the lgtm label Jan 4, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 4, 2026

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: 3AceShowHand, wlwilliamx

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:
  • OWNERS [3AceShowHand,wlwilliamx]

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot removed the needs-1-more-lgtm Indicates a PR needs 1 more LGTM. label Jan 4, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 4, 2026

[LGTM Timeline notifier]

Timeline:

  • 2026-01-04 02:52:05.236310294 +0000 UTC m=+497881.054618725: ☑️ agreed by 3AceShowHand.
  • 2026-01-04 02:53:33.846115156 +0000 UTC m=+497969.664423578: ☑️ agreed by wlwilliamx.

@wk989898
Copy link
Collaborator Author

wk989898 commented Jan 4, 2026

/retest

@wk989898
Copy link
Collaborator Author

wk989898 commented Jan 4, 2026

/test pull-cdc-kafka-integration-heavy

1 similar comment
@wk989898
Copy link
Collaborator Author

wk989898 commented Jan 4, 2026

/test pull-cdc-kafka-integration-heavy

@wk989898
Copy link
Collaborator Author

wk989898 commented Jan 5, 2026

/retest

1 similar comment
@wk989898
Copy link
Collaborator Author

wk989898 commented Jan 5, 2026

/retest

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/S Denotes a PR that changes 10-29 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

data inconsistency between upstream and downstream after simulated one of kafka failure

3 participants