Skip to content

Conversation

@lidezhu
Copy link
Collaborator

@lidezhu lidezhu commented Dec 26, 2025

What problem does this PR solve?

Issue Number: ref #3800

What is changed and how it works?

This pull request significantly refactors the log puller's event processing architecture to enhance performance and scalability, particularly for large tables. It introduces a regionEventProcessor that handles region-specific events concurrently using a sharded worker pool. This new processor acts as an intermediary, processing raw events from workers and transforming them into higher-level subscriptionEvents before they are fed into the DynamicStream. This design aims to optimize the event handling pipeline by distributing the processing load, enabling batching of resolved timestamps, and improving concurrency control, ultimately leading to a more efficient and maintainable system.

Highlights

  • Refactored Event Processing Pipeline: Introduced a new regionEventProcessor component to handle region-specific events, decoupling this logic from the DynamicStream's direct handler. This processor acts as an intermediary, receiving raw region events from workers, processing them, and then dispatching higher-level subscriptionEvents to the DynamicStream.
  • Parallel Event Processing: The new regionEventProcessor utilizes a sharded worker pool, distributing the processing of region events concurrently based on region ID. This design aims to improve throughput and efficiency in event handling, especially for scenarios with a single large table.
  • Streamlined DynamicStream Integration: The DynamicStream now interacts with a new subscriptionEventHandler that processes aggregated subscriptionEvents. This simplifies the DynamicStream's direct responsibilities and allows for more focused event handling, reducing overhead.
  • Improved Concurrency Control: The UpdateLockedRangeStateHeap method in region_range_lock.go now uses an exclusive Lock instead of a RLock, ensuring more robust and thread-safe concurrent updates to the locked range state.
  • Batch Processing of Resolved Timestamps: The regionEventProcessor now supports batch processing of resolved timestamps for multiple regions, which can reduce the number of events pushed to the DynamicStream and improve performance.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Dec 26, 2025
@ti-chi-bot
Copy link

ti-chi-bot bot commented Dec 26, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Dec 26, 2025
@gemini-code-assist
Copy link

Summary of Changes

Hello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request undertakes a significant refactoring of the event processing architecture within the log puller component. The primary goal is to enhance concurrency and modularity by introducing a dedicated regionEventProcessor. This new processor operates with a sharded worker pool, allowing region-specific events to be handled in parallel. It serves as an intermediate layer, processing raw events from workers and transforming them into higher-level subscriptionEvents before they are fed into the DynamicStream. This approach aims to optimize the event handling pipeline by distributing the processing load and ensuring proper synchronization for critical data structures, ultimately leading to a more scalable and maintainable system.

Highlights

  • Refactored Event Processing Pipeline: Introduced a new regionEventProcessor component to handle region-specific events, decoupling this logic from the DynamicStream's direct handler. This processor acts as an intermediary, receiving raw region events from workers, processing them, and then dispatching higher-level subscriptionEvents to the DynamicStream.
  • Parallel Event Processing: The new regionEventProcessor utilizes a sharded worker pool, distributing the processing of region events concurrently based on region ID. This design aims to improve throughput and efficiency in event handling.
  • Streamlined DynamicStream Integration: The DynamicStream now interacts with a new subscriptionEventHandler that processes aggregated subscriptionEvents. This simplifies the DynamicStream's direct responsibilities and allows for more focused event handling.
  • Improved Concurrency Control: The UpdateLockedRangeStateHeap method in region_range_lock.go now uses an exclusive Lock instead of a RLock, ensuring more robust and thread-safe concurrent updates to the locked range state.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant refactoring to the region event handling logic by replacing the single-threaded regionEventHandler with a concurrent regionEventProcessor that uses a pool of workers. This change aims to improve performance by processing region events in parallel. The logic from the old handler has been moved into the new processor and a new subscriptionEventHandler. The changes are well-structured and improve the design. I've found a potential build-breaking issue due to Go version compatibility and an opportunity for code simplification.

@lidezhu lidezhu changed the title [WIP] logpuller: optimize log puller performance under single large table scenario Dec 30, 2025
@ti-chi-bot ti-chi-bot bot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Dec 30, 2025
@lidezhu
Copy link
Collaborator Author

lidezhu commented Dec 30, 2025

/gemini summary

@gemini-code-assist
Copy link

Summary of Changes

This pull request significantly refactors the log puller's event processing architecture to enhance performance and scalability, particularly for large tables. It introduces a regionEventProcessor that handles region-specific events concurrently using a sharded worker pool. This new processor acts as an intermediary, processing raw events from workers and transforming them into higher-level subscriptionEvents before they are fed into the DynamicStream. This design aims to optimize the event handling pipeline by distributing the processing load, enabling batching of resolved timestamps, and improving concurrency control, ultimately leading to a more efficient and maintainable system.

Highlights

  • Refactored Event Processing Pipeline: Introduced a new regionEventProcessor component to handle region-specific events, decoupling this logic from the DynamicStream's direct handler. This processor acts as an intermediary, receiving raw region events from workers, processing them, and then dispatching higher-level subscriptionEvents to the DynamicStream.
  • Parallel Event Processing: The new regionEventProcessor utilizes a sharded worker pool, distributing the processing of region events concurrently based on region ID. This design aims to improve throughput and efficiency in event handling, especially for scenarios with a single large table.
  • Streamlined DynamicStream Integration: The DynamicStream now interacts with a new subscriptionEventHandler that processes aggregated subscriptionEvents. This simplifies the DynamicStream's direct responsibilities and allows for more focused event handling, reducing overhead.
  • Improved Concurrency Control: The UpdateLockedRangeStateHeap method in region_range_lock.go now uses an exclusive Lock instead of a RLock, ensuring more robust and thread-safe concurrent updates to the locked range state.
  • Batch Processing of Resolved Timestamps: The regionEventProcessor now supports batch processing of resolved timestamps for multiple regions, which can reduce the number of events pushed to the DynamicStream and improve performance.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • logservice/logpuller/region_event_handler.go
    • Removed the regionEventHandler file, as its functionality has been refactored into new components.
  • logservice/logpuller/region_event_handler_test.go
    • Removed the test file for regionEventHandler, with corresponding tests migrated to the new region_event_processor_test.go.
  • logservice/logpuller/region_event_processor.go
    • Added a new file introducing the regionEventProcessor to manage and process region events concurrently.
    • Implemented sharded worker pools for efficient event dispatching based on region ID.
    • Included logic for handling KV entries, resolved timestamps, and region errors within the processor.
  • logservice/logpuller/region_event_processor_test.go
    • Added a new test file to cover the functionality of the regionEventProcessor, including tests for out-of-order KV entries and resolved timestamp advancement across regions.
  • logservice/logpuller/region_request_worker.go
    • Updated the regionEvent structure to use a slice of regionFeedState.
    • Modified event dispatching to use the new regionEventProcessor instead of direct DynamicStream interaction.
    • Introduced collectRegionStates for efficient gathering of region states for batch processing.
    • Renamed receiveAndDispatchChangeEventsToProcessor to receiveAndDispatchChangeEvents for clarity.
  • logservice/logpuller/regionlock/region_range_lock.go
    • Changed UpdateLockedRangeStateHeap to use Lock() instead of RLock() for enhanced thread safety during updates.
  • logservice/logpuller/subscription_client.go
    • Imported the runtime package for dynamic worker count configuration.
    • Updated metrics handling, moving metricsResolvedTsCount and metricsEventCount.
    • Renamed kvEventsCacheMaxSize to pendingKVEntriesMaxSize and kvEventsCache to pendingKVEntries in subscribedSpan.
    • Added configuration options for RegionEventWorkerCount and RegionEventWorkerQueueSize.
    • Modified the DynamicStream to work with subscriptionEvent and subscriptionEventHandler.
    • Integrated and initialized the new regionEventProcessor.
    • Updated event pushing logic from pushRegionEventToDS to pushSubscriptionEventToDS.
    • Adjusted log messages related to event pushing pauses and resumes.
    • Ensured proper shutdown of regionEventProcessor during client closure.
  • logservice/logpuller/subscription_event_handler.go
    • Added a new file defining subscriptionEvent and subscriptionEventHandler to process events from the regionEventProcessor and feed them into the DynamicStream.
Activity
  • The pull request was opened by lidezhu.
  • An automated check by ti-chi-bot[bot] indicated that the PR is not yet approved.
  • An initial summary of changes was provided by gemini-code-assist[bot].
  • Two review comments were made by gemini-code-assist[bot] regarding Go version compatibility for the min function and simplification of the growSlice function.
  • A test named pull-error-log-review failed, as reported by ti-chi-bot[bot].
  • The author lidezhu requested a new summary from gemini-code-assist[bot].

@lidezhu
Copy link
Collaborator Author

lidezhu commented Dec 30, 2025

/gemini review

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-executed refactoring of the log puller's event handling logic to improve performance, especially for scenarios with a single large table. The introduction of a parallel regionEventProcessor with a worker pool is a solid approach to leverage more CPU resources for processing region events. The optimization to batch resolved-ts events is also a great improvement.

I'm particularly impressed with the fix in region_range_lock.go that corrects a race condition by changing RLock to Lock. This is a critical correctness improvement.

The new architecture separating region event processing from subscription event handling (via dynstream) is much cleaner and removes previous workarounds for deadlocks. The code is well-structured and the changes are logical. I have a couple of minor suggestions for improving maintainability by extracting magic numbers into constants. Overall, this is a high-quality contribution.

Comment on lines +50 to +55
if workerCount <= 0 {
workerCount = 1
}
if queueSize <= 0 {
queueSize = 1024
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These default values for workerCount and queueSize are hardcoded. It would be better to define them as constants at the package level, for example, defaultRegionEventWorkerCount and defaultRegionEventWorkerQueueSize. This improves readability and makes it easier to manage these default values. Please also add the constant definitions at the package level.

Suggested change
if workerCount <= 0 {
workerCount = 1
}
if queueSize <= 0 {
queueSize = 1024
}
if (workerCount <= 0) {
workerCount = defaultRegionEventWorkerCount
}
if (queueSize <= 0) {
queueSize = defaultRegionEventWorkerQueueSize
}

if len(regionIDs) == 0 {
return nil, 0, nil
}
missingSample = make([]uint64, 0, 8)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The capacity 8 for missingSample is a magic number. It's good practice to define it as a named constant, for example const missingRegionSampleCapacity = 8, to improve code clarity and maintainability.

Suggested change
missingSample = make([]uint64, 0, 8)
missingSample = make([]uint64, 0, missingRegionSampleCapacity)

@ti-chi-bot
Copy link

ti-chi-bot bot commented Dec 30, 2025

@lidezhu: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review 015858f link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@lidezhu lidezhu changed the title logpuller: optimize log puller performance under single large table scenario [DNM] logpuller: optimize log puller performance under single large table scenario Dec 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants