-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathfileops.py
More file actions
218 lines (187 loc) · 8.75 KB
/
fileops.py
File metadata and controls
218 lines (187 loc) · 8.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/env python3
import os
from PyQt6 import QtWidgets, QtCore
class FileOperation(QtCore.QObject):
def __init__(self, parent):
self.parent = parent
super().__init__(self.parent)
# Initialize the progress dialog once, but do not show it unless necessary.
self.progress_dialog = QtWidgets.QProgressDialog("Performing operation...", "Cancel", 0, 100, self.parent)
self.progress_dialog.setWindowTitle("Progress")
self.progress_dialog.setWindowModality(QtCore.Qt.WindowModality.WindowModal)
self.progress_dialog.setAutoClose(True)
self.progress_dialog.setAutoReset(True)
self.show_window_timer = QtCore.QTimer(self.parent)
self.show_window_timer.timeout.connect(self.show_progress_dialog)
self.show_window_timer.setSingleShot(True)
self.operation_finished = False
def cancel(self):
if hasattr(self, "op_thread"):
self.op_thread.cancel()
self.progress_dialog.close()
def showError(self, message: str):
# Disconnect the timer and close the progress dialog if visible.
try:
self.show_window_timer.timeout.disconnect()
except:
pass
if self.progress_dialog.isVisible():
self.progress_dialog.close()
# Create and execute an error message box.
err_box = QtWidgets.QMessageBox(self.parent)
err_box.setIcon(QtWidgets.QMessageBox.Icon.Critical)
err_box.setWindowTitle("Error")
err_box.setText(message)
err_box.setStandardButtons(QtWidgets.QMessageBox.StandardButton.Ok)
err_box.exec()
def askOverwrite(self, dest):
"""
Ask the user what to do if a destination file already exists.
Returns one of:
"this" - just overwrite this file,
"all" - overwrite all without asking again,
"none" - cancel the entire operation.
"""
msg_box = QtWidgets.QMessageBox(self.parent)
msg_box.setIcon(QtWidgets.QMessageBox.Icon.Question)
msg_box.setWindowTitle("Overwrite File?")
msg_box.setText(f"The file {dest!r} already exists.\nDo you want to overwrite it?")
this_button = msg_box.addButton("This", QtWidgets.QMessageBox.ButtonRole.AcceptRole)
all_button = msg_box.addButton("All", QtWidgets.QMessageBox.ButtonRole.AcceptRole)
none_button = msg_box.addButton("None", QtWidgets.QMessageBox.ButtonRole.RejectRole)
msg_box.exec()
clicked = msg_box.clickedButton()
if clicked == this_button:
return "this"
elif clicked == all_button:
return "all"
else:
return "none"
def run(self, operations, op_type):
# Check if there are any operations; if not, show an error immediately.
if not operations:
QtWidgets.QMessageBox.critical(self.parent, "Error", "No operations provided!")
return
# Pre-check the operations: verify sources, prevent self-overwrite, confirm overwrites, and calculate total size.
try:
total_size = 0
global_decision = None # Determines if "all" overwriting is confirmed.
valid_operations = []
for src, dest in operations:
# Prevent overwriting itself by comparing normalized absolute paths.
if os.path.abspath(src) == os.path.abspath(dest):
raise ValueError(f"Source and destination are the same: {src}")
return
# Verify the source exists.
if not os.path.exists(src):
raise FileNotFoundError(f"Source file does not exist: {src}")
return
# Ask for confirmation if the destination file exists.
if os.path.exists(dest):
if global_decision is None:
decision = self.askOverwrite(dest)
if decision == "none":
QtWidgets.QMessageBox.information(self.parent, "Operation Cancelled",
"File operation cancelled by user.")
return # Cancel the entire operation.
elif decision == "all":
global_decision = "all"
# If decision is "this" or global_decision is already set to "all", proceed.
valid_operations.append((src, dest))
total_size += os.path.getsize(src)
if total_size == 0:
raise ValueError("The total size of the files to process is zero.")
except Exception as e:
QtWidgets.QMessageBox.critical(self.parent, "Error", str(e))
return
# At this point no errors, so show the progress dialog.
self.progress_dialog.setValue(0)
self.operation_finished = False
self.show_window_timer.start(1000)
self.op_thread = FileOperationThread(valid_operations, op_type, total_size)
self.op_thread.progress.connect(self.progress_dialog.setValue)
self.op_thread.error.connect(self.showError)
self.op_thread.finished.connect(self.operation_finished_slot)
self.progress_dialog.canceled.connect(self.op_thread.cancel)
self.op_thread.start()
def show_progress_dialog(self):
# Only show the dialog if the operation isn't finished yet and progress is still below 30%.
if not self.operation_finished and self.progress_dialog.value() < 30:
self.progress_dialog.show()
def operation_finished_slot(self):
self.operation_finished = True
self.progress_dialog.close()
try:
self.show_window_timer.timeout.disconnect()
except:
pass
if hasattr(self.parent, "refresh_view"):
self.parent.refresh_view()
class FileOperationThread(QtCore.QThread):
progress = QtCore.pyqtSignal(int)
finished = QtCore.pyqtSignal()
error = QtCore.pyqtSignal(str)
def __init__(self, operations, op_type, total_size, parent=None):
super().__init__(parent)
self.operations = operations
self.op_type = op_type
self.total_size = total_size
self._isCanceled = False
def run(self):
try:
copied_size = 0
for src, dest in self.operations:
if self._isCanceled:
self.error.emit("Operation cancelled by user.")
return
if not os.path.exists(src):
self.error.emit(f"Source file not found: {src}")
return
file_size = os.path.getsize(src)
try:
with open(src, "rb") as fsrc, open(dest, "wb") as fdest:
while True:
if self._isCanceled:
self.error.emit("Operation cancelled by user.")
return
chunk = fsrc.read(65536)
if not chunk:
break
fdest.write(chunk)
copied_size += len(chunk)
progress_percentage = min(int((copied_size / self.total_size) * 100), 100)
self.progress.emit(progress_percentage)
except Exception as file_error:
self.error.emit(f"Error processing file {src} → {dest}: {str(file_error)}")
return
if self.op_type == "move":
if os.path.exists(dest) and os.path.getsize(dest) == file_size:
try:
os.remove(src)
except Exception as rem_err:
self.error.emit(f"Could not remove source file {src}: {str(rem_err)}")
return
else:
self.error.emit(f"File move failed for {src} → {dest} (destination invalid).")
return
self.finished.emit()
except Exception as e:
self.error.emit(str(e))
def cancel(self):
self._isCanceled = True
if __name__ == "__main__":
# Test run of a copy operation.
app = QtWidgets.QApplication([])
main_window = QtWidgets.QWidget()
# Dummy refresh_view method for testing purposes.
def refresh_view():
print("Refreshed view.")
main_window.refresh_view = refresh_view
op = FileOperation(main_window)
# Test case: trying to copy a file to itself should trigger an error.
src = "test.txt"
dest = "test.txt" # Using the same file for src and dest.
operations = [(src, dest)]
op.run(operations, "copy")
main_window.show()
app.exec()