-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38909] Fix Unable to delete S3 checkpoint due to presence of default file #27423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| } | ||
| fs.delete(exclusiveCheckpointDir, false); | ||
| // Recursively delete the checkpoint directory and all its contents | ||
| fs.delete(exclusiveCheckpointDir, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the change looks good.
I was wondering if the deletion fails (e.g. for permissions reasons ) should we look to catch and log that error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
I thought about this. Even before this change, errors from the underlying filesystem handler were already caught and logged, and those logs were sufficient to debug the issue.
If we find that permission-related failures need special handling, we can add a targeted catch for that specific error type. For now, I think the existing logging is sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our environment, MinIO automatically generates a default.txt file by default, which prevents us from deleting the checkpoint directory after Flink creates checkpoints.
Since this appears to be a straightforward issue to resolve, I hope this PR will be merged soon.
| } | ||
| fs.delete(exclusiveCheckpointDir, false); | ||
| // Recursively delete the checkpoint directory and all its contents | ||
| fs.delete(exclusiveCheckpointDir, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too.
What is the purpose of the change
This pull request fixes a critical bug (FLINK-38909) that causes checkpoint cleanup to fail with a
PathIsNotEmptyDirectoryException. The root cause was an incorrect, non-recursive delete call on a checkpoint's storage location, which by design contains multiple files.A completed Flink checkpoint always consists of multiple data files and a metadata file, grouped under a common path (
exclusiveCheckpointDir). This logical location is never empty. Attempting to delete it with a non-recursivedelete(path, false)command is fundamentally incorrect and guaranteed to fail on any compliant file system. This bug leads to orphaned checkpoint data and storage leaks.This fix corrects the logic by using a recursive delete, ensuring that all files and objects associated with a checkpoint's location are properly removed, regardless of the underlying filesystem's architecture.
Brief change log
FsCompletedCheckpointStorageLocation.disposeStorageLocation(), the filesystem call was changed tofs.delete(exclusiveCheckpointDir, true). This enables recursive deletion, ensuring the entire directory tree of a checkpoint is properly removed.Verifying this change
This change added tests and can be verified as follows:
FsCompletedCheckpointStorageLocationTestto specifically reproduce the bug and validate the fix. This test simulates a real, non-empty checkpoint by creating a storage location with subdirectories and files. It then calls thedisposeStorageLocation()method and asserts that no exception is thrown and the location is completely removed.Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation