diff --git a/lochness/config/__init__.py b/lochness/config/__init__.py index b39bea71..5097a3d7 100644 --- a/lochness/config/__init__.py +++ b/lochness/config/__init__.py @@ -1,52 +1,70 @@ +""" +Module to read Lochness configuration file and keyring file. +""" + +import getpass as gp +import logging import os +import string +from typing import Any, Dict + +import cryptease as crypt import yaml -import logging import yaml.reader -import getpass as gp -import cryptease as crypt -import string logger = logging.getLogger(__name__) -def load(f: 'location', archive_base=None): - '''load configuration file and keyring''' - logger.debug('loading configuration') +def load(path: str, archive_base=None) -> Dict[str, Any]: + """ + Load configuration file and keyring + + Uses passphrase from environment variable NRG_KEYRING_PASS if available. + Otherwise, prompts user for passphrase. + + Args: + path (str): path to configuration file (yaml) + archive_base (str): path to the root of the archive - with open(os.path.expanduser(f), 'rb') as fp: - Lochness = _read_config_file(fp) + Returns: + Dict[str, Any]: configuration dictionary + """ + logger.debug("loading configuration") + Lochness = _read_config_file(path) if archive_base: - Lochness['phoenix_root'] = archive_base - if 'phoenix_root' not in Lochness: - raise ConfigError('need either --archive-base or ' - '\'phoenix_root\' in config file') - Lochness['phoenix_root'] = os.path.expanduser(Lochness['phoenix_root']) - Lochness['keyring_file'] = os.path.expanduser(Lochness['keyring_file']) + Lochness["phoenix_root"] = archive_base + if "phoenix_root" not in Lochness: + raise ConfigError( + "need either --archive-base or 'phoenix_root' in config file" + ) + Lochness["phoenix_root"] = os.path.expanduser(Lochness["phoenix_root"]) + Lochness["keyring_file"] = os.path.expanduser(Lochness["keyring_file"]) # box file pattern strings from the config to string template # regardless of the selected study in the args - if 'box' in Lochness: - for _, study_dict in Lochness['box'].items(): - for _, modality_values in study_dict['file_patterns'].items(): + if "box" in Lochness: + for _, study_dict in Lochness["box"].items(): + for _, modality_values in study_dict["file_patterns"].items(): for modality_dict in modality_values: - modality_dict['pattern'] = \ - string.Template(modality_dict['pattern']) + modality_dict["pattern"] = string.Template(modality_dict["pattern"]) - with open(Lochness['keyring_file'], 'rb') as fp: - logger.info('reading keyring file {0}'.format(Lochness['keyring_file'])) - if 'NRG_KEYRING_PASS' in os.environ: - load.passphrase = os.environ['NRG_KEYRING_PASS'] + with open(Lochness["keyring_file"], "rb") as fp: + logger.info(f"reading keyring file {Lochness["keyring_file"]}") + if "NRG_KEYRING_PASS" in os.environ: + load.passphrase = os.environ["NRG_KEYRING_PASS"] if load.passphrase is None: - load.passphrase = gp.getpass('enter passphrase: ') + load.passphrase = gp.getpass("enter passphrase: ") key = crypt.key_from_file(fp, load.passphrase) - content = b'' + content = b"" for chunk in crypt.decrypt(fp, key): content += chunk try: - Lochness['keyring'] = yaml.load(content, Loader=yaml.FullLoader) + Lochness["keyring"] = yaml.load(content, Loader=yaml.FullLoader) except yaml.reader.ReaderError: - raise KeyringError('could not decrypt keyring {0} (wrong passphrase?)'.format(Lochness['keyring_file'])) + raise KeyringError( + f"could not decrypt keyring {Lochness["keyring_file"]} (wrong passphrase?)" + ) return Lochness @@ -55,19 +73,24 @@ def load(f: 'location', archive_base=None): class KeyringError(Exception): - pass + """ + Generic keyring error. + """ -def _read_config_file(fp): - '''helper to read lochness configuration file''' - try: - cfg = yaml.load(fp.read(), Loader=yaml.FullLoader) - except Exception as e: - raise ConfigError('failed to parse {0} with error: {1}'.format(fp.name, e)) - return cfg +def _read_config_file(path: str) -> Dict[str, Any]: + """helper to read lochness configuration file""" - -class ConfigError(Exception): - pass + expanded_path = os.path.expanduser(path) + with open(expanded_path, "rb") as fp: + try: + cfg = yaml.load(fp.read(), Loader=yaml.FullLoader) + except Exception as e: + raise ConfigError(f"failed to parse {expanded_path} with error: {e}") + return cfg +class ConfigError(Exception): + """ + Malformed configuration file. + """ diff --git a/lochness/db/__init__.py b/lochness/db/__init__.py new file mode 100644 index 00000000..6583d21d --- /dev/null +++ b/lochness/db/__init__.py @@ -0,0 +1,288 @@ +""" +Helper functions for interacting with a PostgreSQL database. +""" + +import json +import logging +import sys +from pathlib import Path +from typing import Callable, Dict, Literal, Optional, Any +import hashlib + +import pandas as pd +import psycopg2 +import sqlalchemy + +logger = logging.getLogger(__name__) + + +def compute_hash(file_path: Path, hash_type: str = "md5") -> str: + """ + Compute the hash digest of a file. + + Args: + file_path (Path): The path to the file. + hash_type (str, optional): The type of hash algorithm to use. Defaults to 'md5'. + + Returns: + str: The computed hash digest of the file. + """ + with open(file_path, "rb") as file: + file_hash = hashlib.file_digest(file, hash_type) + hash_str = file_hash.hexdigest() + + return hash_str + + +def handle_null(query: str) -> str: + """ + Replaces all occurrences of the string 'NULL' with the SQL NULL keyword in the given query. + + Args: + query (str): The SQL query to modify. + + Returns: + str: The modified SQL query with 'NULL' replaced with NULL. + """ + query = query.replace("'NULL'", "NULL") + + return query + + +def handle_nan(query: str) -> str: + """ + Replaces all occurrences of the string 'nan' with the SQL NULL keyword in the given query. + + Args: + query (str): The SQL query to modify. + + Returns: + str: The modified SQL query with 'nan' replaced with NULL. + """ + query = query.replace("'nan'", "NULL") + + return query + + +def santize_string(string: str) -> str: + """ + Sanitizes a string by escaping single quotes. + + Args: + string (str): The string to sanitize. + + Returns: + str: The sanitized string. + """ + return string.replace("'", "''") + + +def sanitize_json(json_dict: dict) -> str: + """ + Sanitizes a JSON object by replacing single quotes with double quotes. + + Args: + json_dict (dict): The JSON object to sanitize. + + Returns: + str: The sanitized JSON object. + """ + for key, value in json_dict.items(): + if isinstance(value, str): + json_dict[key] = santize_string(value) + + json_str = json.dumps(json_dict, default=str) + + # Replace NaN with NULL + json_str = json_str.replace("NaN", "null") + + return json_str + + +def on_failure(): + """ + Exits the program with exit code 1. + """ + sys.exit(1) + + +def get_db_credentials(lochness_config: Dict[str, Any]) -> Dict[str, str]: + """ + Retrieves the database credentials from the configuration file. + + Args: + lochness_config: Dict[str, Any]: The Lochness configuration dictionary. + db (str, optional): The section of the configuration file to use. + Defaults to "postgresql". + + Returns: + Dict[str, str]: A dictionary containing the database credentials. + """ + credentials = lochness_config["database"] + + return credentials + + +def execute_queries( + lochness_config: Dict[str, Any], + queries: list, + show_commands=True, + silent=False, + on_failure: Optional[Callable] = on_failure, +) -> list: + """ + Executes a list of SQL queries on a PostgreSQL database. + + Args: + lochness_config: Dict[str, Any]: The Lochness configuration dictionary. + queries (list): A list of SQL queries to execute. + show_commands (bool, optional): Whether to display the executed SQL queries. + Defaults to True. + show_progress (bool, optional): Whether to display a progress bar. Defaults to False. + silent (bool, optional): Whether to suppress output. Defaults to False. + db (str, optional): The section of the configuration file to use. + Defaults to "postgresql". + backup (bool, optional): Whether to sace all executed queries to a file. + + Returns: + list: A list of tuples containing the results of the executed queries. + """ + command = None + output = [] + + try: + credentials = get_db_credentials(lochness_config=lochness_config) + conn: psycopg2.extensions.connection = psycopg2.connect(**credentials) # type: ignore + cur = conn.cursor() + + def execute_query(query: str): + if show_commands: + logger.debug("Executing query:") + logger.debug(f"[bold blue]{query}", extra={"markup": True}) + cur.execute(query) + try: + output.append(cur.fetchall()) + except psycopg2.ProgrammingError: + pass + for command in queries: + execute_query(command) + + cur.close() + + conn.commit() + + if not silent: + logger.debug( + f"[grey]Executed {len(queries)} SQL query(ies).", extra={"markup": True} + ) + except (Exception, psycopg2.DatabaseError) as e: + logger.error("[bold red]Error executing queries.", extra={"markup": True}) + if command is not None: + logger.error(f"[red]For query: {command}", extra={"markup": True}) + logger.error(e) + if on_failure is not None: + on_failure() + else: + raise e + finally: + if conn is not None: + conn.close() + + return output + + +def get_db_connection(lochness_config: Dict[str, Any]) -> sqlalchemy.engine.base.Engine: + """ + Establishes a connection to the PostgreSQL database using the provided configuration file. + + Args: + lochness_config (Dict[str, Any]): The Lochness configuration dictionary. + + Returns: + sqlalchemy.engine.base.Engine: The database connection engine. + """ + credentials = get_db_credentials(lochness_config=lochness_config) + engine = sqlalchemy.create_engine( + "postgresql+psycopg2://" + + credentials["user"] + + ":" + + credentials["password"] + + "@" + + credentials["host"] + + ":" + + credentials["port"] + + "/" + + credentials["database"] + ) + + return engine + + +def execute_sql( + lochness_config: Dict[str, Any], query: str, debug: bool = False +) -> pd.DataFrame: + """ + Executes a SQL query on a PostgreSQL database and returns the result as a pandas DataFrame. + + Args: + lochness_config: Dict[str, Any]: The Lochness configuration dictionary. + query (str): The SQL query to execute. + + Returns: + pd.DataFrame: A pandas DataFrame containing the result of the SQL query. + """ + engine = get_db_connection(lochness_config=lochness_config) + + if debug: + logger.debug(f"Executing query: {query}") + + df = pd.read_sql(query, engine) + + engine.dispose() + + return df + + +def fetch_record(lochness_config: Path, query: str) -> Optional[str]: + """ + Fetches a single record from the database using the provided SQL query. + + Args: + config_file_path (str): The path to the database configuration file. + query (str): The SQL query to execute. + + Returns: + Optional[str]: The value of the first column of the first row of the result set, + or None if the result set is empty. + """ + df = execute_sql(lochness_config=lochness_config, query=query) + + # Check if there is a row + if df.shape[0] == 0: + return None + + value = df.iloc[0, 0] + + return str(value) + + +def df_to_table( + lochness_config: Dict[str, Any], + df: pd.DataFrame, + table_name: str, + if_exists: Literal["fail", "replace", "append"] = "replace", +) -> None: + """ + Writes a pandas DataFrame to a table in a PostgreSQL database. + + Args: + lochness_config (Dict[str, Any]): The Lochness configuration dictionary. + df (pd.DataFrame): The DataFrame to write to the database. + table_name (str): The name of the table to write to. + if_exists (Literal["fail", "replace", "append"], optional): What to do + if the table already exists. + """ + + engine = get_db_connection(lochness_config=lochness_config) + df.to_sql(table_name, engine, if_exists=if_exists, index=False) + engine.dispose() diff --git a/lochness/db/crawlers/__init__.py b/lochness/db/crawlers/__init__.py new file mode 100644 index 00000000..20c6921c --- /dev/null +++ b/lochness/db/crawlers/__init__.py @@ -0,0 +1,3 @@ +""" +Contains crawlers for Lochness. +""" diff --git a/lochness/db/crawlers/metadata.py b/lochness/db/crawlers/metadata.py new file mode 100644 index 00000000..1cc2f425 --- /dev/null +++ b/lochness/db/crawlers/metadata.py @@ -0,0 +1,57 @@ +""" +Imports metadata information into DB. +""" + +from typing import Any, Dict +import logging + +import pandas as pd + +from lochness import db +from lochness.db.models.subject import Subject +from lochness.db.models.study import Study + +logger = logging.getLogger("lochness.crawlers.metadata") + + +def import_metadata_df( + lochness_config: Dict[str, Any], metadata_df: pd.DataFrame, study_id: str +) -> None: + """ + Import subject metadata from a DataFrame into the database. + + Args: + metadata_df (pd.DataFrame): The DataFrame containing the columns: + 'Subject ID', 'Active', 'Consent', '...'. + Note.: This DataFrame is generally obtained from RPMS or REDCap modules. + study_id (str): The study ID. + """ + logger.info(f"Importing metadata for study {study_id}") + queries = [] + + study = Study(study_id=study_id) + insert_study_sql = study.to_sql() + queries.append(insert_study_sql) + + logger.debug(f"Found {metadata_df.shape[0]} subjects for study {study_id}") + for _, row in metadata_df.iterrows(): + optional_notes = {} + for column in metadata_df.columns: + if column not in ["Subject ID", "Active", "Consent"]: + optional_notes[column] = row[column] + + consent_date = pd.to_datetime(row["Consent"]).to_pydatetime() + is_active = row["Active"] == 1 + subject = Subject( + study_id=study_id, + subject_id=row["Subject ID"], + is_active=is_active, + consent_date=consent_date, + optional_notes=optional_notes, + ) + + subject_sql = subject.to_sql() + queries.append(subject_sql) + + db.execute_queries(lochness_config=lochness_config, queries=queries, show_commands=False) + logger.info(f"Successfully imported metadata for study {study_id}") diff --git a/lochness/db/log/__init__.py b/lochness/db/log/__init__.py new file mode 100644 index 00000000..18d0e313 --- /dev/null +++ b/lochness/db/log/__init__.py @@ -0,0 +1,80 @@ +""" +Contains helper functions to log information to DB. +""" + +import logging +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +from lochness import db +from lochness.db.models.file_mappings import FileMapping +from lochness.db.models.files_audit_log import AuditLog +from lochness.db.models.phoenix_files import PhoenixFiles +from lochness.db.models.remote_files import RemoteFile + +logger = logging.getLogger("lochness.db.log") + + +def log_download( + lochness_config: Dict[str, Any], + remote_file_path: Path, + remote_name: str, + remote_hash: Optional[str], + remote_metadata: Dict[str, Any], + local_file_path: Path, + subject_id: str, + study_id: str, + modality: str, +) -> None: + """ + Logs a download operation to the audit_log table. + + Adds entries in: + - audit_log + - file_mappings + - phoenix_files + - remote_files + """ + + logger.info(f"Logging download of {remote_file_path} to {local_file_path}") + + # Log download to audit_log + audit_log_entry = AuditLog( + source_file=remote_file_path, + destination_file=local_file_path, + action="download", + system=remote_name, + metadata={}, + timestamp=datetime.now(), + ) + + # Log to remote_files + remote_file = RemoteFile( + file_path=remote_file_path, + remote_name=remote_name, + hash_val=remote_hash, + last_checked=datetime.now(), + remote_metadata=remote_metadata, + ) + + # log to phoenix_files + phoenix_file = PhoenixFiles(file_path=local_file_path, with_hash=True) + + # log to file_mappings + file_mapping = FileMapping( + remote_file_path=remote_file_path, + local_file_path=local_file_path, + remote_name=remote_name, + subject_id=subject_id, + study_id=study_id, + modality=modality, + ) + + queries: List[str] = [] + queries.append(audit_log_entry.to_sql()) + queries.append(remote_file.to_sql()) + queries.append(phoenix_file.to_sql()) + queries.append(file_mapping.to_sql()) + + db.execute_queries(lochness_config=lochness_config, queries=queries) diff --git a/lochness/db/models/__init__.py b/lochness/db/models/__init__.py new file mode 100644 index 00000000..702557c4 --- /dev/null +++ b/lochness/db/models/__init__.py @@ -0,0 +1,72 @@ +""" +Contains DB models for Lochness. +""" + +from typing import Any, Dict, List, Union + +from lochness import db +from lochness.db.models.file_mappings import FileMapping +from lochness.db.models.files_audit_log import AuditLog +from lochness.db.models.phoenix_files import PhoenixFiles +from lochness.db.models.remote_files import RemoteFile +from lochness.db.models.study import Study +from lochness.db.models.subject import Subject + + +def flatten_list(coll: list) -> list: + """ + Flattens a list of lists into a single list. + + Args: + coll (list): List of lists. + + Returns: + list: Flattened list. + """ + flat_list = [] + for i in coll: + if isinstance(i, list): + flat_list += flatten_list(i) + else: + flat_list.append(i) + return flat_list + + +def init_db(lochness_config: Dict[str, Any]): + """ + Initializes the database. + + WARNING: This will drop all tables and recreate them. + DO NOT RUN THIS IN PRODUCTION. + + Args: + lochness_config (Path): Path to the config file. + """ + drop_queries_l: List[Union[str, List[str]]] = [ + AuditLog.drop_table_query(), + FileMapping.drop_table_query(), + RemoteFile.drop_table_query(), + PhoenixFiles.drop_table_query(), + Subject.drop_table_query(), + Study.drop_table_query(), + ] + + create_queries_l: List[Union[str, List[str]]] = [ + Study.init_table_query(), + Subject.init_table_query(), + PhoenixFiles.init_table_query(), + RemoteFile.init_table_query(), + FileMapping.init_table_query(), + AuditLog.init_table_query(), + ] + + drop_queries = flatten_list(drop_queries_l) + create_queries = flatten_list(create_queries_l) + + sql_queries: List[str] = drop_queries + create_queries + + db.execute_queries( + lochness_config=lochness_config, + queries=sql_queries, + show_commands=True, + ) diff --git a/lochness/db/models/file_mappings.py b/lochness/db/models/file_mappings.py new file mode 100644 index 00000000..d219d060 --- /dev/null +++ b/lochness/db/models/file_mappings.py @@ -0,0 +1,119 @@ +""" +File mappings are used to map files from one source to another. + +This module contains the FileMapping class, which represents a mapping between +a file on the local file system and a file on a remote file system. +""" + +from pathlib import Path + +from lochness import db + + +class FileMapping: + """ + Maps a file on a remote file system to a file on the local file system. + + Attributes: + remote_file_path (Path): The path to the file on the remote file system. + local_file_path (Path): The path to the file on the local file system. + remote_name (str): The name of the remote system. + subject_id (str): The subject ID assciated with this asset. + study_id (str): The study ID associated with this asset. + modality (str): The modality associated with this asset. + """ + + def __init__( + self, + remote_file_path: Path, + local_file_path: Path, + remote_name: str, + subject_id: str, + study_id: str, + modality: str, + ): + """ + Initialize a FileMapping object. + + Args: + remote_file_path (Path): The path to the file on the remote file system. + local_file_path (Path): The path to the file on the local file system. + remote_name (str): The name of the remote system. + subject_id (str): The subject ID associated with this asset. + study_id (str): The study ID associated with this asset. + modality (str): The modality associated with this asset. + """ + self.remote_file_path = remote_file_path + self.local_file_path = local_file_path + self.remote_name = remote_name + self.subject_id = subject_id + self.study_id = study_id + self.modality = modality + + def __str__(self): + """ + Return a string representation of the FileMapping object. + """ + return f"FileMapping({self.remote_file_path},\ + {self.local_file_path}, {self.remote_name}, {self.subject_id}, \ + {self.modality})" + + def __repr__(self): + """ + Return a string representation of the FileMapping object. + """ + return self.__str__() + + @staticmethod + def init_table_query() -> str: + """ + Return the SQL query to create the 'file_mappings' table. + """ + sql_query = """ + CREATE TABLE file_mappings ( + remote_file_path TEXT NOT NULL, + remote_name TEXT NOT NULL, + local_file_path TEXT NOT NULL, + subject_id TEXT NOT NULL, + study_id TEXT NOT NULL, + modality TEXT NOT NULL, + PRIMARY KEY (remote_file_path, local_file_path, remote_name, subject_id, study_id), + FOREIGN KEY (remote_file_path, remote_name) REFERENCES remote_files(r_file_path, r_remote_name), + FOREIGN KEY (local_file_path) REFERENCES phoenix_files(p_file_path), + FOREIGN KEY (study_id, subject_id) REFERENCES subjects(study_id, subject_id) + ); + """ + + return sql_query + + @staticmethod + def drop_table_query() -> str: + """ + Return the SQL query to drop the 'file_mappings' table. + """ + sql_query = """ + DROP TABLE IF EXISTS file_mappings; + """ + + return sql_query + + def to_sql(self) -> str: + """ + Return the SQL query to insert the object into the 'file_mappings' table. + """ + remote_file_path = db.santize_string(self.remote_file_path) + local_file_path = db.santize_string(self.local_file_path) + remote_name = db.santize_string(self.remote_name) + subject_id = db.santize_string(self.subject_id) + study_id = db.santize_string(self.study_id) + modality = db.santize_string(self.modality) + + return f""" + INSERT INTO file_mappings ( + remote_file_path, local_file_path, remote_name, + subject_id, study_id, modality + ) VALUES ( + '{remote_file_path}', '{local_file_path}', '{remote_name}', + '{subject_id}', '{study_id}', '{modality}' + ON CONFLICT DO NOTHING; + """ diff --git a/lochness/db/models/files_audit_log.py b/lochness/db/models/files_audit_log.py new file mode 100644 index 00000000..fff7a779 --- /dev/null +++ b/lochness/db/models/files_audit_log.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +""" +AuditLog Model +""" +from typing import Dict +from datetime import datetime + +from lochness import db + + +class AuditLog: + """ + Represents an audit log entry. + + Attributes: + source_file (str): The source file. + destination_file (str): The destination file. + system (str): The system the action was taken on. + action (str): The action taken. + metadata (Dict[str, str]): Metadata about the action. + timestamp (datetime): The time the action was taken. + """ + + def __init__( + self, + source_file: str, + destination_file: str, + system: str, + action: str, + metadata: Dict[str, str], + timestamp: datetime, + ): + """ + Initialize an AuditLog object. + + Args: + source_file (str): The source file. + destination_file (str): The destination file. + system (str): The system the action was taken on. e.g. 'local', 'dropbox', etc. + action (str): The action taken. e.g. 'move', 'delete', etc. + metadata (Dict[str, str]): Metadata about the action. + timestamp (str): The time the action was taken. + """ + + self.source_file = source_file + self.destination_file = destination_file + self.system = system + self.action = action + self.metadata = metadata + self.timestamp = timestamp + + def __str__(self): + """ + Return a string representation of the AuditLog object. + """ + return f""" +AuditLog( + {self.source_file}, + {self.destination_file}, + {self.system}, + {self.action}, + {self.metadata}, + {self.timestamp} +) +""" + + def __repr__(self): + """ + Return a string representation of the AuditLog object. + """ + return self.__str__() + + @staticmethod + def init_table_query() -> str: + """ + Return the SQL query to create the 'audit_log' table. + """ + sql_query = """ + CREATE TABLE audit_log ( + source_file TEXT NOT NULL, + destination_file TEXT, + system TEXT NOT NULL, + action TEXT NOT NULL, + metadata JSONB, + timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ); + """ + + return sql_query + + @staticmethod + def drop_table_query() -> str: + """ + Return the SQL query to drop the 'audit_log' table if it exists. + """ + sql_query = """ + DROP TABLE IF EXISTS audit_log CASCADE; + """ + + return sql_query + + def to_sql(self) -> str: + """ + Return the SQL query to insert the object into the 'audit_log' table. + """ + source_file = db.santize_string(self.source_file) + destination_file = db.santize_string(self.destination_file) + system = db.santize_string(self.system) + action = db.santize_string(self.action) + metadata = db.santize_string(self.metadata) + + sql_query = f""" + INSERT INTO audit_log ( + source_file, destination_file, system, + action, metadata, timestamp + ) VALUES ( + '{source_file}', '{destination_file}', '{system}', + '{action}', '{metadata}', '{self.timestamp}' + ); + """ + + return sql_query diff --git a/lochness/db/models/phoenix_files.py b/lochness/db/models/phoenix_files.py new file mode 100644 index 00000000..4fe95e8c --- /dev/null +++ b/lochness/db/models/phoenix_files.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python +""" +File Model +""" + +from pathlib import Path +from datetime import datetime + +from lochness import db + + +class PhoenixFiles: + """ + Represents a file on the PHOENIX file system. + + Attributes: + file_path (Path): The path to the file. + """ + + def __init__(self, file_path: Path, with_hash: bool = True): + """ + Initialize a File object. + + Automatically computes the file size and modification time. + + Args: + file_path (Path): The path to the file. + """ + self.file_path = file_path + + if not file_path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + + self.file_name = file_path.name + self.file_type = file_path.suffix + + self.file_size_mb = file_path.stat().st_size / 1024 / 1024 + self.m_time = datetime.fromtimestamp(file_path.stat().st_mtime) + if with_hash: + self.md5 = db.compute_hash(file_path=file_path, hash_type="md5") + else: + self.md5 = None + + def __str__(self): + """ + Return a string representation of the File object. + """ + return f"PhoenixFile({self.file_name}, {self.file_type}, {self.file_size_mb}, \ + {self.file_path}, {self.m_time}, {self.md5})" + + def __repr__(self): + """ + Return a string representation of the File object. + """ + return self.__str__() + + @staticmethod + def init_table_query() -> str: + """ + Return the SQL query to create the 'phoenix_files' table. + """ + sql_query = """ + CREATE TABLE phoenix_files ( + p_file_name TEXT NOT NULL, + p_file_type TEXT NOT NULL, + p_file_size_mb FLOAT NOT NULL, + p_file_path TEXT PRIMARY KEY, + m_time TIMESTAMP NOT NULL, + md5 TEXT + ); + """ + + return sql_query + + @staticmethod + def drop_table_query() -> str: + """ + Return the SQL query to drop the 'phoenix_files' table if it exists. + """ + sql_query = """ + DROP TABLE IF EXISTS phoenix_files CASCADE; + """ + + return sql_query + + @staticmethod + def find_matches_by_hash_query(hash_val: str) -> str: + """ + Return the SQL query to find matching files by hash. + """ + sql_query = f""" + SELECT p_file_name, p_file_type, p_file_size_mb, p_file_path, m_time, md5 + FROM phoenix_files + WHERE md5 = '{hash_val}'; + """ + + return sql_query + + @staticmethod + def update_file_query(orig_path: Path, new_path: Path) -> str: + """ + Return the SQL query to update the p_file_path of a File object. + """ + orig_path = db.santize_string(str(orig_path)) + new_path = db.santize_string(str(new_path)) + + sql_query = f""" + UPDATE files + SET p_file_path = '{new_path}' + WHERE p_file_path = '{orig_path}'; + """ + + return sql_query + + def to_sql(self): + """ + Return the SQL query to insert the File object into the 'phoenix_files' table. + """ + f_name = db.santize_string(self.file_name) + f_path = db.santize_string(str(self.file_path)) + + if self.md5 is None: + hash_val = "NULL" + else: + hash_val = self.md5 + + sql_query = f""" + INSERT INTO phoenix_files (p_file_name, p_file_type, p_file_size_mb, + p_file_path, m_time, md5) + VALUES ('{f_name}', '{self.file_type}', '{self.file_size_mb}', + '{f_path}', '{self.m_time}', '{hash_val}') + ON CONFLICT (p_file_path) DO UPDATE SET + p_file_name = excluded.p_file_name, + p_file_type = excluded.p_file_type, + p_file_size_mb = excluded.p_file_size_mb, + m_time = excluded.m_time, + md5 = excluded.md5; + """ + + sql_query = db.handle_null(sql_query) + + return sql_query diff --git a/lochness/db/models/remote_files.py b/lochness/db/models/remote_files.py new file mode 100644 index 00000000..7ca53968 --- /dev/null +++ b/lochness/db/models/remote_files.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python +""" +RemoteFile Model +""" + +from datetime import datetime +from typing import Dict + +from lochness import db + + +class RemoteFile: + """ + Represents a file on some remote file system. + + Attributes: + file_path (Path): The path to the file. + """ + + def __init__( + self, + file_path: str, + remote_name: str, + hash_val: str, + last_checked: datetime, + remote_metadata: Dict[str, str], + ): + """ + Initialize a RemoteFile object. + + Args: + file_path (Path): The path to the file. + remote_name (str): The name of the remote system. + hash_val (str): The hash value of the file, + as provided by the remote system. + last_checked (datetime): The last time the file was checked. + remote_metadata (Dict[str, str]): Metadata about the file, + """ + self.file_path = file_path + self.remote_name = remote_name + self.hash_val = hash_val + self.last_checked = last_checked + self.remote_metadata = remote_metadata + + def __str__(self): + """ + Return a string representation of the RemoteFile object. + """ + return f"RemoteFile({self.file_path}, {self.remote_name}, {self.last_checked})" + + def __repr__(self): + """ + Return a string representation of the File object. + """ + return self.__str__() + + @staticmethod + def init_table_query() -> str: + """ + Return the SQL query to create the 'files' table. + """ + sql_query = """ + CREATE TABLE remote_files ( + r_file_path TEXT NOT NULL, + r_remote_name TEXT NOT NULL, + r_hash_val TEXT, + r_last_checked TIMESTAMP NOT NULL, + r_remote_metadata JSONB, + PRIMARY KEY (r_file_path, r_remote_name) + ); + """ + + return sql_query + + @staticmethod + def drop_table_query() -> str: + """ + Return the SQL query to drop the 'remote_files' table if it exists. + """ + sql_query = """ + DROP TABLE IF EXISTS remote_files CASCADE; + """ + + return sql_query + + @staticmethod + def find_matches_by_hash_query(hash_val: str) -> str: + """ + Return the SQL query to find matching remote_files by hash. + """ + sql_query = f""" + SELECT r_file_path, r_remote_name, r_hash_val, r_last_checked, r_remote_metadata + FROM remote_files + WHERE r_hash_val = '{hash_val}'; + """ + + return sql_query + + def to_sql(self): + """ + Return the SQL query to insert the RemoteFile object into the 'remote_files' table. + + Returns: + str: The SQL query. + """ + + file_path = db.santize_string(str(self.file_path)) + remote_name = db.santize_string(self.remote_name) + hash_val = db.santize_string(self.hash_val) + last_checked = db.santize_string(self.last_checked) + + metadata = db.sanitize_json(self.remote_metadata) + + sql_query = f""" + INSERT INTO remote_files ( + r_file_path, r_remote_name, r_hash_val, r_last_checked, r_remote_metadata + ) VALUES ( + '{file_path}', '{remote_name}', '{hash_val}', '{last_checked}', '{metadata}' + ) ON CONFLICT (file_path, remote_name) UPDATE SET + r_hash_val = excluded.r_hash_val, + r_last_checked = excluded.r_last_checked, + r_remote_metadata = excluded.r_remote_metadata; + """ + + return sql_query diff --git a/lochness/db/models/study.py b/lochness/db/models/study.py new file mode 100644 index 00000000..6b6af8c2 --- /dev/null +++ b/lochness/db/models/study.py @@ -0,0 +1,54 @@ +""" +Study Model +""" + +from lochness import db + + +class Study: + """ + Represents a study. + + Attributes: + study_id (str): The study ID. + """ + + def __init__(self, study_id: str): + self.study_id = study_id + + def __str__(self): + return f"Study({self.study_id})" + + def __repr__(self): + return self.__str__() + + @staticmethod + def init_table_query() -> str: + """ + Return the SQL query to create the 'study' table. + """ + return """ + CREATE TABLE IF NOT EXISTS study ( + study_id TEXT PRIMARY KEY + ); + """ + + @staticmethod + def drop_table_query() -> str: + """ + Return the SQL query to drop the 'study' table. + """ + return """ + DROP TABLE IF EXISTS study; + """ + + def to_sql(self): + """ + Return the SQL query to insert the object into the 'study' table. + """ + study_id = db.santize_string(self.study_id) + + return f""" + INSERT INTO study (study_id) + VALUES ('{study_id}') ON CONFLICT DO NOTHING; + """ diff --git a/lochness/db/models/subject.py b/lochness/db/models/subject.py new file mode 100644 index 00000000..adc64d5f --- /dev/null +++ b/lochness/db/models/subject.py @@ -0,0 +1,91 @@ +""" +Subject Model +""" + +from datetime import datetime + +from lochness import db + + +class Subject: + """ + Represents a subject / study participant. + + Attributes: + study_id (str): The study ID. + subject_id (str): The subject ID. + is_active (bool): Whether or not the subject is active. + consent_date (datetime): The date the subject consented to the study. + optional_notes (dict): Optional notes about the subject. + """ + + def __init__( + self, + study_id: str, + subject_id: str, + is_active: bool, + consent_date: datetime, + optional_notes: dict, + ): + self.study_id = study_id + self.subject_id = subject_id + self.is_active = is_active + self.consent_date = consent_date + self.optional_notes = optional_notes + + def __str__(self): + return f"Subject({self.study_id}, {self.subject_id}, {self.is_active}, \ + {self.consent_date}, {self.optional_notes})" + + def __repr__(self): + return self.__str__() + + @staticmethod + def init_table_query() -> str: + """ + Return the SQL query to create the 'subjects' table. + """ + sql_query = """ + CREATE TABLE subjects ( + study_id TEXT NOT NULL REFERENCES study (study_id), + subject_id TEXT NOT NULL, + is_active BOOLEAN NOT NULL, + consent_date DATE NOT NULL, + optional_notes JSON, + PRIMARY KEY (study_id, subject_id) + ); + """ + + return sql_query + + @staticmethod + def drop_table_query() -> str: + """ + Return the SQL query to drop the 'subjects' table. + """ + sql_query = """ + DROP TABLE IF EXISTS subjects; + """ + + return sql_query + + def to_sql(self) -> str: + """ + Return the SQL query to insert the subject into the 'subjects' table. + """ + + consent_date = self.consent_date.strftime("%Y-%m-%d") + optional_notes = db.sanitize_json(self.optional_notes) + + sql_query = f""" + INSERT INTO subjects (study_id, subject_id, is_active, + consent_date, optional_notes) + VALUES ('{self.study_id}', '{self.subject_id}', {self.is_active}, + '{consent_date}', '{optional_notes}') + ON CONFLICT(study_id, subject_id) DO UPDATE SET + is_active = excluded.is_active, + consent_date = excluded.consent_date, + optional_notes = excluded.optional_notes; + """ + + return sql_query diff --git a/lochness/mediaflux/__init__.py b/lochness/mediaflux/__init__.py index 1452f8fb..de040458 100644 --- a/lochness/mediaflux/__init__.py +++ b/lochness/mediaflux/__init__.py @@ -1,25 +1,24 @@ -import os, sys -import gzip +""" +Module to sync data from Mediaflux. +""" import logging -import importlib -import lochness +import os +import re import tempfile as tf -import cryptease as crypt -import lochness.net as net -from typing import Generator, Tuple +from distutils.spawn import find_executable +from os.path import basename, dirname, isfile +from os.path import join as pjoin from pathlib import Path -import hashlib -from io import BytesIO -import lochness.keyring as keyring -from os.path import join as pjoin, basename, dirname, isfile +from subprocess import DEVNULL, STDOUT, Popen + import cryptease as enc -import re -from subprocess import Popen, DEVNULL, STDOUT import pandas as pd -from numpy import nan -from distutils.spawn import find_executable + +import lochness +import lochness.keyring as keyring import lochness.tree as tree from lochness.cleaner import is_transferred_and_removed +from lochness.db import log as db_log logger = logging.getLogger(__name__) Module = lochness.lchop(__name__, 'lochness.') @@ -206,6 +205,19 @@ def sync_module(Lochness: 'lochness.config', stdout=DEVNULL, stderr=STDOUT) p.wait() + # log download to DB + db_log.log_download( + lochness_config=Lochness, + remote_file_path=Path(remote), + remote_name='mediaflux', + remote_hash=checksum, + remote_metadata={}, + local_file_path=Path(mf_local) / subpath.name, + subject_id=subject.id, + study_id=study_name, + modality=datatype + ) + # write checksum to local with open(prev_checksum_file, 'w') as fp: fp.write(checksum) diff --git a/lochness/redcap/__init__.py b/lochness/redcap/__init__.py index 3729be6a..3d7d55fe 100644 --- a/lochness/redcap/__init__.py +++ b/lochness/redcap/__init__.py @@ -1,21 +1,23 @@ -import os -import sys -import re +import collections as col +import datetime import json -import lochness import logging -import requests -import lochness.net as net -import collections as col -import lochness.tree as tree +import os +import re +import sys +import tempfile as tf from pathlib import Path -import pandas as pd -import datetime from typing import List, Union -import tempfile as tf -from lochness.redcap.process_piis import process_and_copy_db +import pandas as pd +import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning + +import lochness +import lochness.net as net +import lochness.tree as tree +from lochness.db.crawlers import metadata as metadata_crawler + requests.packages.urllib3.disable_warnings(InsecureRequestWarning) @@ -211,6 +213,13 @@ def initialize_metadata(Lochness: 'Lochness object', same_df = df.reset_index(drop=True).equals(target_df) + # import into DB + metadata_crawler.import_metadata_df( + lochness_config=Lochness, + metadata_df=df, + study_id=study_name + ) + if same_df: pass else: diff --git a/lochness/rpms/__init__.py b/lochness/rpms/__init__.py index 7b38c0a3..225bf327 100644 --- a/lochness/rpms/__init__.py +++ b/lochness/rpms/__init__.py @@ -1,20 +1,23 @@ -import os -import yaml -import lochness +""" +Module to import data from RPMS +""" +import collections as col import logging -import zipfile +import os +import re import shutil +from datetime import datetime from pathlib import Path -import tempfile as tf -import collections as col +from time import sleep +from typing import Dict, List, Union + +import pandas as pd +import yaml + import lochness.net as net import lochness.tree as tree -from typing import List, Dict, Union -import pandas as pd -from datetime import datetime -from time import sleep -import re -from lochness.redcap.process_piis import process_and_copy_db +from lochness.db.crawlers import metadata as metadata_crawler + pd.set_option('mode.chained_assignment', None) @@ -353,6 +356,13 @@ def initialize_metadata(Lochness: 'Lochness object', df_final = df_final[main_cols + \ [x for x in df_final.columns if x not in main_cols]] + # import into DB + metadata_crawler.import_metadata_df( + lochness_config=Lochness, + metadata_df=df_final, + study_id=study_name + ) + general_path = Path(Lochness['phoenix_root']) / 'PROTECTED' metadata_study = general_path / study_name / f"{study_name}_metadata.csv" diff --git a/scripts/init_db.py b/scripts/init_db.py new file mode 100755 index 00000000..190f0fe0 --- /dev/null +++ b/scripts/init_db.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +""" +Initialize the database with the schema defined in lochness.db.models +""" + +import sys +from pathlib import Path + +file = Path(__file__).resolve() +parent = file.parent +ROOT = None +for parent in file.parents: + if parent.name == "lochness-dev": + ROOT = parent +sys.path.append(str(ROOT)) + +# remove current directory from path +try: + sys.path.remove(str(parent)) +except ValueError: + pass + +import logging + +import lochness.config as config +from lochness.db import models + +logger = logging.getLogger(__name__) +logargs = { + "level": logging.DEBUG, + "format": "%(asctime)s - %(process)d - %(name)s - %(levelname)s - %(message)s", +} +logging.basicConfig(**logargs) + + +if __name__ == "__main__": + logger.info("Initializing database...") + logger.debug( + "This will drop all tables and recreate them. DO NOT RUN THIS IN PRODUCTION." + ) + + config_file = "/var/lib/prescient/soft/lochness-dev/scratch/config.yml" + logger.info(f"Loading config file: {config_file}") + lochness_config = config.load(path=config_file) + + logger.info("Initializing database...") + models.init_db(lochness_config) + + logger.info("Done!")