-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38943][runtime] Support Adaptive Partition Selection for RescalePartitioner and RebalancePartitioner #27446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
046c85b to
e5ea873
Compare
|
Hi, @davidradl @X-czh Could you help take a look ? thx a lot. |
e5ea873 to
b95259b
Compare
|
@flinkbot run azure |
|
@RocMarshal Thanks for the quick contribution. I'll take a look later this week. |
| <td><h5>taskmanager.network.adaptive-partitioner.enabled</h5></td> | ||
| <td style="word-wrap: break-word;">false</td> | ||
| <td>Boolean</td> | ||
| <td>Whether to enabled adaptive partitioner feature for rescale and rebalance partitioners based on the loading of the downstream tasks.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: enabled => enable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @davidradl
The naming is following the conclusion here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RocMarshal understood- but the English is not correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @davidradl ,
Apologizes for the misunderstanding, and thank you for the correction.
Updated.
| <td><h5>taskmanager.network.adaptive-partitioner.max-traverse-size</h5></td> | ||
| <td style="word-wrap: break-word;">4</td> | ||
| <td>Integer</td> | ||
| <td>How many channels to traverse at most when looking for the idlest channel for rescale and rebalance partitioners when enabled <code class="highlighter-rouge">taskmanager.network.adaptive-partitioner.enabled</code>.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should document the default.
nit: How many channels to traverse at most -> Maximum number of channels to traverse,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the line 19 is the default value description.
| .withDescription( | ||
| Description.builder() | ||
| .text( | ||
| "How many channels to traverse at most when looking for the idlest channel for rescale and rebalance partitioners when enabled %s.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: when enabled %s.", -> when %s is enabled .",
| super(writer, timeout, taskName); | ||
| this.numberOfSubpartitions = writer.getNumberOfSubpartitions(); | ||
| this.maxTraverseSize = Math.min(maxTraverseSize, numberOfSubpartitions); | ||
| this.currentChannel = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: does this need to be in the constructor - why not initialise the variable with -1.
|
|
||
| public RecordWriterBuilder<T> setMaxTraverseSize(int maxTraverseSize) { | ||
| Preconditions.checkState( | ||
| maxTraverseSize > 1, "The maxTraverseSize must be greater than 1."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious why it cannot be one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, Because if it is set to 1, this is equivalent to not enabling the adaptive partition selection feature.
Actively enabling an option that does not even provide theoretical benefits is generally not acceptable.
Unless the number of downstream partitions is exactly 1.
Pls correct me if I am wrong.
|
|
||
| public RecordWriterBuilder<T> setMaxTraverseSize(int maxTraverseSize) { | ||
| Preconditions.checkState( | ||
| maxTraverseSize > 1, "The maxTraverseSize must be greater than 1."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a better place to check this, in configuration validation - rather than the setter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved it before the RecordWriter construction
…lePartitioner & RebalancePartitioner
b95259b to
750f38c
Compare
RocMarshal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @davidradl for the review.
I updated the related lines based on your comments.
PTAL ~
| </tr> | ||
| <tr> | ||
| <td><h5>taskmanager.network.adaptive-partitioner.max-traverse-size</h5></td> | ||
| <td style="word-wrap: break-word;">4</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the line is the default value description.
| <td><h5>taskmanager.network.adaptive-partitioner.enabled</h5></td> | ||
| <td style="word-wrap: break-word;">false</td> | ||
| <td>Boolean</td> | ||
| <td>Whether to enabled adaptive partitioner feature for rescale and rebalance partitioners based on the loading of the downstream tasks.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @davidradl
The naming is following the conclusion here
| <td><h5>taskmanager.network.adaptive-partitioner.max-traverse-size</h5></td> | ||
| <td style="word-wrap: break-word;">4</td> | ||
| <td>Integer</td> | ||
| <td>How many channels to traverse at most when looking for the idlest channel for rescale and rebalance partitioners when enabled <code class="highlighter-rouge">taskmanager.network.adaptive-partitioner.enabled</code>.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the line 19 is the default value description.
|
|
||
| public RecordWriterBuilder<T> setMaxTraverseSize(int maxTraverseSize) { | ||
| Preconditions.checkState( | ||
| maxTraverseSize > 1, "The maxTraverseSize must be greater than 1."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, Because if it is set to 1, this is equivalent to not enabling the adaptive partition selection feature.
Actively enabling an option that does not even provide theoretical benefits is generally not acceptable.
Unless the number of downstream partitions is exactly 1.
Pls correct me if I am wrong.
|
|
||
| public RecordWriterBuilder<T> setMaxTraverseSize(int maxTraverseSize) { | ||
| Preconditions.checkState( | ||
| maxTraverseSize > 1, "The maxTraverseSize must be greater than 1."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved it before the RecordWriter construction
What is the purpose of the change
[FLINK-38943][runtime] Support Adaptive Partition Selection for RescalePartitioner and RebalancePartitioner
Brief change log
Introduce the following:
Verifying this change
This change added tests and can be verified as follows:
The benchmark about it is here
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation