Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion data_management/services/dlu_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ def get_participant_by_redcap_id(self, redcap_id: str):
return self.db.get_data(
"SELECT * FROM data_management.redcap_participant WHERE redcap_id = %s",(redcap_id,),
)

def set_dlu_package_error(self, package_id: str):
return self.db.insert_data(
"UPDATE dlu_package_inventory SET dlu_error = 1 WHERE dlu_package_id = %s", (package_id,)
)


def insert_dlu_package(self, dpi_values: tuple, dmd_values: tuple):
Expand Down Expand Up @@ -212,7 +217,7 @@ def get_data_manager_data(self):
return result

def get_package(self, package_id: str) -> dict:
result = self.db.get_data("SELECT * dlu_package_inventory WHERE dlu_package_id = %s", (package_id,))
result = self.db.get_data("SELECT * FROM dlu_package_inventory WHERE dlu_package_id = %s", (package_id,))
if result:
return result[0]
else:
Expand Down
4 changes: 4 additions & 0 deletions data_management/services/dlu_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class DLUMongo:

def __init__(self, mongo_connection: MongoConnection):
self.package_collection = mongo_connection.packages.with_options(codec_options=CodecOptions(tz_aware=True))
self.state_collection = mongo_connection.state.with_options(codec_options=CodecOptions(tz_aware=True))

def get_modification_info(self, file_info: dict):
modifications = []
Expand Down Expand Up @@ -63,6 +64,9 @@ def update_package_files(self, package_id: str, file_info: dict) -> int:

result = self.package_collection.update_one({"_id": package_id}, {"$set": {"files": mongo_files, "modifications": final_modifications}})
return result.modified_count

def find_by_upload_failed(self):
return self.state_collection.find({"state": "UPLOAD_FAILED"})

def find_by_package_type_and_redcap_id(self, package_type: str, subject_id: str):
return self.package_collection.find_one({"subjectId": subject_id, "packageType": package_type})
Expand Down
16 changes: 15 additions & 1 deletion data_management/watch_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,20 @@ def is_directory_valid(self, directory_info, package_id):
self.dlu_management.update_dlu_package(package_id, {"globus_dlu_status": error_msg})
return False
return True


def mark_packages_with_error(self):
failed_uploads = self.dlu_mongo.find_by_upload_failed()
for failed_upload in failed_uploads:
package_id = failed_upload["packageId"]
package = self.dlu_management.get_package(package_id)
if package is not None:
if package['dlu_error'] == 1:
continue
else:
self.dlu_management.set_dlu_package_error(failed_upload["packageId"])
logger.info("Marked package " + failed_upload["packageId"] + " as error due to UPLOAD_FAILED in MongoDB")
Comment on lines +203 to +210

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Guard missing packageId before dereferencing failed state docs.

At Line 203, direct indexing (failed_upload["packageId"]) can raise KeyError and break this pass if any malformed state document is present. Add a defensive read and skip/log invalid rows.

Proposed fix
     def mark_packages_with_error(self):
         failed_uploads = self.dlu_mongo.find_by_upload_failed()
         for failed_upload in failed_uploads:
-            package_id = failed_upload["packageId"]
+            package_id = failed_upload.get("packageId")
+            if not package_id:
+                logger.warning("Skipping UPLOAD_FAILED record without packageId: %s", failed_upload.get("_id"))
+                continue
             package = self.dlu_management.get_package(package_id)
             if package is not None:
                 if package['dlu_error'] == 1:
                     continue
                 else:
-                    self.dlu_management.set_dlu_package_error(failed_upload["packageId"])
-                    logger.info("Marked package " + failed_upload["packageId"] + " as error due to UPLOAD_FAILED in MongoDB")
+                    self.dlu_management.set_dlu_package_error(package_id)
+                    logger.info("Marked package %s as error due to UPLOAD_FAILED in MongoDB", package_id)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
package_id = failed_upload["packageId"]
package = self.dlu_management.get_package(package_id)
if package is not None:
if package['dlu_error'] == 1:
continue
else:
self.dlu_management.set_dlu_package_error(failed_upload["packageId"])
logger.info("Marked package " + failed_upload["packageId"] + " as error due to UPLOAD_FAILED in MongoDB")
package_id = failed_upload.get("packageId")
if not package_id:
logger.warning("Skipping UPLOAD_FAILED record without packageId: %s", failed_upload.get("_id"))
continue
package = self.dlu_management.get_package(package_id)
if package is not None:
if package['dlu_error'] == 1:
continue
else:
self.dlu_management.set_dlu_package_error(package_id)
logger.info("Marked package %s as error due to UPLOAD_FAILED in MongoDB", package_id)




if __name__ == "__main__":
dlu_watcher = DLUWatcher()
Expand All @@ -205,4 +218,5 @@ def is_directory_valid(self, directory_info, package_id):
dlu_watcher.watch_for_packages()
dlu_watcher.watch_for_side_manifest_records()
dlu_watcher.fill_in_null_package_ids()
dlu_watcher.mark_packages_with_error()
time.sleep(60)
Loading