From 1385a43e73dfca95e102bbdf947bdda70d2616f3 Mon Sep 17 00:00:00 2001 From: ShrooqAyman Date: Mon, 15 Jun 2026 16:09:52 +0200 Subject: [PATCH] introduce unified ETL pipeline and API based data retrieval --- .gitignore | 6 +- app.py | 217 +++++++++++++--- .../get_affiliationproductionovertime.py | 5 +- functions/get_annualproduction.py | 8 +- functions/get_authorlocalimpact.py | 5 +- functions/get_authorproductionovertime.py | 5 +- functions/get_averagecitations.py | 5 +- functions/get_bradfordlaw.py | 5 +- functions/get_correspondingauthorcountries.py | 5 +- functions/get_countriesproductionovertime.py | 5 +- functions/get_data.py | 126 +++++----- functions/get_database.py | 2 + functions/get_filters.py | 5 +- functions/get_localcitedreferences.py | 5 +- functions/get_localcitedsources.py | 5 +- functions/get_lotkalaw.py | 5 +- functions/get_maininformations.py | 5 +- functions/get_relevantaffiliations.py | 5 +- functions/get_relevantauthors.py | 5 +- functions/get_relevantsources.py | 5 +- functions/get_sourcesproduction.py | 5 +- functions/get_table.py | 5 +- www/services/api_retrievers.py | 237 ++++++++++++++++++ www/services/data_validation.py | 87 +++++++ www/services/extractors.py | 79 ++++++ www/services/mappings.py | 105 ++++++++ www/services/metatagextraction.py | 38 +++ www/services/normalizers.py | 131 ++++++++++ www/services/parsers.py | 21 ++ www/services/schema.py | 16 ++ www/services/standardizer.py | 111 ++++++++ www/services/validators.py | 40 +++ 32 files changed, 1195 insertions(+), 114 deletions(-) create mode 100644 www/services/api_retrievers.py create mode 100644 www/services/data_validation.py create mode 100644 www/services/extractors.py create mode 100644 www/services/mappings.py create mode 100644 www/services/normalizers.py create mode 100644 www/services/schema.py create mode 100644 www/services/standardizer.py create mode 100644 www/services/validators.py diff --git a/.gitignore b/.gitignore index 23b99e089..393f32a70 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,8 @@ __pycache__/ bibliovenv/ Bibenv/ -.idea/ \ No newline at end of file +.idea/ +venv/ +venv311/ +__pycache__/ +*.pyc \ No newline at end of file diff --git a/app.py b/app.py index f0891f894..6aaf18fa1 100644 --- a/app.py +++ b/app.py @@ -63,12 +63,13 @@ from shiny import reactive, render from shinywidgets import render_widget from shiny.express import ui, input, render +from www.services.standardizer import convert2df +from www.services.api_retrievers import fetch_openalex, fetch_pubmed # Setup the Directory for static assets - optimized for performance base_dir = tempfile.gettempdir() # Use system temp dir instead of creating new temp file express.app_opts(static_assets=base_dir, debug=False) - -# --- Toggle button --- +sidebar_open = reactive.Value(False)# --- Toggle button --- # This button toggles the visibility of the sidebar(s) in the UI. ui.tags.button("☰", id="toggleSidebar", class_="sidebar-toggle") @@ -751,15 +752,17 @@ def mostra(): reset_all_analyses() # Reset analysis results when sample is loaded @render.express() - @reactive.event(input.Dataset) def show_data(): text = get_data(input, database, df, reset_all_analyses) text - ui.HTML(init_itables()) - + @render.ui - @reactive.event(input.start_button) def show_table(): + data = df.get() + + if data is None: + return ui.p("Please upload a dataset") + table_ui, _, _ = get_table(database, df) return table_ui @@ -853,9 +856,134 @@ def indicator_types_ui_all(): """ ), - with ui.nav_panel("None", value="API"): - ui.h3("🚧 Warning: API is under construction 🚧") + with ui.nav_panel("API", value="API"): + + with ui.card(id="api_card", style="margin: 20px; padding: 20px; border-radius: 10px; box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1); background-color: #f9f9f9;"): + ui.tags.script(""" + function hideApiCard() { + const card = document.getElementById("api_card"); + if (card) card.style.display = "none"; + } + """) + + with ui.tags.div(): + ui.h3("🌐 API Data Import", style="color: #5567BB;") + ui.input_text("api_topic", "Search Topic") + + ui.input_select( + "api_source", + "Source", + { + "openalex": "OpenAlex", + "pubmed": "PubMed" + } + ) + + ui.input_numeric("api_limit", "Limit", value=50) + ui.input_action_button("api_start", "Fetch Data", icon=ICONS["play"]) + + + @render.express() + @reactive.event(input.api_start) + def api_load(): + + topic = input.api_topic() + source = input.api_source() + limit = input.api_limit() + + if not topic: + ui.notification_show("⚠️write a topic to search", duration=5, close_button=True) + return + + if source == "openalex": + data = fetch_openalex(topic, limit) + data = convert2df( + source="openalex", + raw_df=data + ) + + elif source == "pubmed": + data = fetch_pubmed(topic, limit) + data = convert2df( + source="pubmed_api", + raw_df=data + ) + + ui.markdown(f"

Data of {source}

") + df.set(data) + ui.tags.script("hideApiCard();") + database= f"{source} API" + reset_all_analyses() + + @render.express() + def show_data(): + text = get_data(input, database, df, reset_all_analyses) + text + @render.ui + def show_table(): + data = df.get() + + if data is None: + return ui.p("Please upload a dataset") + + table_ui, _, _ = get_table(database, df) + return table_ui + + ui.notification_show( + f"✅ Data loaded from {source} (API)", + duration=5 + ) + # -------- ADVICE BUTTON -------- + @render.ui + @reactive.event(input.advice_modal_completeness) + def show_advice_notification(): + return ui.notification_show( + ui.div( + ui.h4("Your metadata have no critical issues", style="font-size: 30px; text-align: center;"), + ui.input_action_button("close_advice_modal_notification", "OK", + style="display: block; margin: 20px auto;") + ), + duration=None, # La notifica rimane finché non viene chiusa + close_button=False, # Disabilita la X per la chiusura + id="advice_modal_notification", + ) + + # Aggiungi l'evento di chiusura al bottone OK + @reactive.effect + @reactive.event(input.close_advice_modal_notification) + def close_advice_notification(): + ui.notification_remove(id="advice_modal_notification") + + # -------- REPORT BUTTON -------- + @render.ui + @reactive.event(input.report_modal_completeness) + def show_missing_data_report(): + _, missingData, _ = get_table(database, df, modal=False) + dataframe = pd.read_html(io.StringIO(missingData)) + report_excel.set(add_to_report(report_choices, report_excel, [dataframe[0]], [], "missingdata")) + selection.set(selection.get() + (f"{list(report_choices.get().keys())[-1]}",)) + return ui.notification_show("✅ Missing data added to report", duration=5, close_button=False) + + # -------- SAVE BUTTON -------- + completeness_table_download_folder = str(Path.home() / "Downloads") + todaydate = datetime.today().strftime("%Y-%m-%d") + completeness_table_image_path = os.path.join(completeness_table_download_folder, f"missingDataTable-{todaydate}.png") + @render.ui + @reactive.event(input.save_modal_completeness) + def save_dataframe_image(): + _, _, fig = get_table(database, df, dpi=dpi.get(), modal=False) + fig.write_image(completeness_table_image_path) + return ui.notification_show(f"✅ Missing data image saved into {completeness_table_image_path}", duration=5, close_button=False) + + # Loader indicator + @render.ui + def indicator_types_ui(): + return ui.busy_indicators.use( + spinners=input.api_start() > 0 + ) + + with ui.nav_panel("None", value="collections"): ui.h3("🚧 Warning: Merge Collection is under construction 🚧") @@ -8185,8 +8313,10 @@ def update_plot_settings(): # --- Sidebar Management --- @render.express() -@reactive.event(input.start_button) def toggle_sidebar(): + if df.get() is None: + return + with ui.tags.div(id="sidebar_2", class_="custom-sidebar"): with ui.accordion(id="sidebar_accordion_data", multiple=False, open=False): # Info Section @@ -8301,6 +8431,14 @@ def toggle_sidebar(): """ ) + ui.tags.script(""" + setTimeout(function() { + if (typeof setSidebarState === "function") { + setSidebarState(true); + } + }, 0); + """) + # --- Javascript for Sidebar --- ui.tags.script(""" @@ -8309,9 +8447,18 @@ def toggle_sidebar(): const sidebar = document.getElementById("sidebar"); const sidebar_2 = document.getElementById("sidebar_2"); const content = document.getElementById("mainContent"); - if (sidebar) sidebar.classList.toggle("sidebar-hidden", !show); - if (sidebar_2) sidebar_2.classList.toggle("sidebar-hidden", !show); - if (content) content.classList.toggle("full-width", !show); + + if (sidebar) { + sidebar.classList.toggle("sidebar-hidden", !show); + } + + if (sidebar_2 && sidebar_2.classList) { + sidebar_2.classList.toggle("sidebar-hidden", !show); + } + + if (content && content.classList) { + content.classList.toggle("full-width", !show); + } } // Hide sidebars on page load @@ -8320,33 +8467,45 @@ def toggle_sidebar(): }); // Toggle both sidebars on button click - document.getElementById("toggleSidebar").addEventListener("click", function() { - const sidebar = document.getElementById("sidebar"); - // If either sidebar is visible, hide both; otherwise, show both - const isVisible = sidebar && !sidebar.classList.contains("sidebar-hidden"); - setSidebarState(!isVisible); + document.addEventListener("click", function(e) { + if (e.target && e.target.id === "toggleSidebar") { + + const sidebar = document.getElementById("sidebar"); + const isVisible = sidebar && !sidebar.classList.contains("sidebar-hidden"); + + setSidebarState(!isVisible); + } }); - // Listen for Shiny events that might add/remove sidebar_2 dynamically - // and keep them in sync + // Safe MutationObserver (prevents null crash) const observer = new MutationObserver(function(mutations) { mutations.forEach(function(mutation) { - if (mutation.addedNodes.length > 0 || mutation.removedNodes.length > 0) { - // Always keep both sidebars in the same state - const sidebar = document.getElementById("sidebar"); - const sidebar_2 = document.getElementById("sidebar_2"); - if (sidebar && sidebar_2) { - const sidebarHidden = sidebar.classList.contains("sidebar-hidden"); - sidebar_2.classList.toggle("sidebar-hidden", sidebarHidden); - } + + const sidebar = document.getElementById("sidebar"); + const sidebar_2 = document.getElementById("sidebar_2"); + + if (!sidebar || !sidebar_2) return; + + if (sidebar.classList && sidebar_2.classList) { + const sidebarHidden = sidebar.classList.contains("sidebar-hidden"); + sidebar_2.classList.toggle("sidebar-hidden", sidebarHidden); } }); }); + observer.observe(document.body, { childList: true, subtree: true }); - // Show both sidebars when 'start_button' is clicked + // Show sidebars when buttons clicked (safe) document.addEventListener("click", function(e) { - if (e.target && e.target.id === "start_button") { + const target = e.target && e.target.closest + ? e.target.closest("#start_button, #api_start") + : e.target; + + if ( + target && + (target.id === "start_button" || target.id === "api_start") + ) { + console.log("Start button clicked - showing sidebars"); setSidebarState(true); } }); diff --git a/functions/get_affiliationproductionovertime.py b/functions/get_affiliationproductionovertime.py index e1b87f583..d838241cf 100644 --- a/functions/get_affiliationproductionovertime.py +++ b/functions/get_affiliationproductionovertime.py @@ -12,7 +12,10 @@ def get_affiliation_production_over_time(df, top_k_affiliations): Returns: A Plotly figure object representing the affiliation's production over time. """ - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() AFF = data["AU_UN"].dropna().apply(lambda x: [aff for aff in x if aff.strip() != ""]) nAFF = [len(aff) for aff in AFF] diff --git a/functions/get_annualproduction.py b/functions/get_annualproduction.py index dd27105c2..8b96ac9dd 100644 --- a/functions/get_annualproduction.py +++ b/functions/get_annualproduction.py @@ -11,7 +11,13 @@ def get_annual_production(df): Returns: A Plotly figure object representing the annual scientific production. """ - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() + + data["PY"] = pd.to_numeric(data["PY"], errors="coerce").fillna(0).astype(int) + data = data[data["PY"] > 0] # Calculate the number of publications per year publications_per_year = data["PY"].value_counts().sort_index().reset_index() diff --git a/functions/get_authorlocalimpact.py b/functions/get_authorlocalimpact.py index 74a68e263..ac81d1671 100644 --- a/functions/get_authorlocalimpact.py +++ b/functions/get_authorlocalimpact.py @@ -13,7 +13,10 @@ def get_authors_local_impact(df, num_of_authors_local_impact, author_local_impac Returns: A Plotly figure object and a DataFrame of the most impactful sources. """ - df = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + df = df.get() + else: + df = df.copy() today = pd.Timestamp.now().year # Ensure 'TC' and 'PY' are numeric diff --git a/functions/get_authorproductionovertime.py b/functions/get_authorproductionovertime.py index 65edaca96..0294938fb 100644 --- a/functions/get_authorproductionovertime.py +++ b/functions/get_authorproductionovertime.py @@ -16,7 +16,10 @@ def get_author_production_over_time(df, top_k_authors): table_authors_production (pd.DataFrame): Table summarizing authors' production with TC and TCpY. table_documents (pd.DataFrame): Detailed table with additional document information. """ - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() # Ensure "PY" is numeric data["PY"] = pd.to_numeric(data["PY"], errors="coerce") diff --git a/functions/get_averagecitations.py b/functions/get_averagecitations.py index d752aa9b7..1692988f5 100644 --- a/functions/get_averagecitations.py +++ b/functions/get_averagecitations.py @@ -11,7 +11,10 @@ def get_average_citations(df): Returns: A Plotly figure object representing the average citations per year. """ - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() # Calculate the current year current_year = pd.Timestamp.now().year + 1 diff --git a/functions/get_bradfordlaw.py b/functions/get_bradfordlaw.py index 86580591f..b58f25373 100644 --- a/functions/get_bradfordlaw.py +++ b/functions/get_bradfordlaw.py @@ -12,7 +12,10 @@ def get_bradford_law(df): A Plotly figure object and a DataFrame of the Bradford's Law zones. """ # Sort data by frequency of occurrence (equivalent to R's sort(table(M$SO), decreasing = TRUE)) - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() source_counts = data["SO"].value_counts() # Total number of sources diff --git a/functions/get_correspondingauthorcountries.py b/functions/get_correspondingauthorcountries.py index 5ba9832b2..a244c2ec3 100644 --- a/functions/get_correspondingauthorcountries.py +++ b/functions/get_correspondingauthorcountries.py @@ -15,7 +15,10 @@ def get_corresponding_author_countries(df, top_k_countries): # Estrai i metadati "AU_CO" e "AU1_CO" e verifica il tipo di dati df = metaTagExtraction(df, Field="AU_CO") # Assumendo che `metaTagExtraction` sia già definita df = metaTagExtraction(df, Field="AU1_CO") - data = df.get() # Se `df` è un oggetto reattivo + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() # Se `df` è un oggetto reattivo # Assicurati che le colonne siano di tipo stringa e rimuovi righe con valori mancanti data = data.dropna(subset=["AU1_CO", "AU_CO"]) diff --git a/functions/get_countriesproductionovertime.py b/functions/get_countriesproductionovertime.py index aede25bbd..a8fc536f8 100644 --- a/functions/get_countriesproductionovertime.py +++ b/functions/get_countriesproductionovertime.py @@ -13,7 +13,10 @@ def get_countries_production_over_time(df, top_k_countries): A Plotly figure object representing the country's production over time. """ df = metaTagExtraction(df, "AU_CO") - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() AFF = pd.Series(data["AU_CO"]).dropna().apply(lambda x: [aff.strip() for aff in x if aff.strip() != ""]) nAFF = [len(aff) for aff in AFF] diff --git a/functions/get_data.py b/functions/get_data.py index 16baed992..109008c4e 100644 --- a/functions/get_data.py +++ b/functions/get_data.py @@ -1,82 +1,76 @@ from www.services import * +from www.services.standardizer import convert2df + +from io import StringIO +import pandas as pd def get_data(input, database, df, reset_callback=None): - """ - Handle the data upload and display process. - + """Load uploaded bibliographic files into the app's reactive DataFrame. + + Source exports supported by the ETL path are standardized through + ``convert2df``; other sources continue through the existing JSON conversion + path. All uploaded files are concatenated before updating ``df``. + Args: - input: An object that provides user input methods. - database: The name of the database. - df: A DataFrame object to store the data. - reset_callback: Function to call to reset analysis results (optional) - + input: Shiny input object that exposes uploaded files and source choices. + database: Current database label; kept for compatibility with callers. + df: Reactive value that receives the final standardized DataFrame. + reset_callback: Optional callable used to clear previous analysis state. + Returns: - A message indicating the status of the data upload. + A Shiny UI element describing success or the processing error. """ - file: list[FileInfo] | None = input.Dataset() - - if file is None: - text = ui.h5("Please select a file to begin importing your data.") - - elif input.select() == "1A": - ui.update_action_button("action_button_save", disabled=False) - + + files = input.Dataset() + + if not files: + return ui.h5("Please select a file to begin importing your data.") + + try: + frames = [] + source = input.database() author = input.author() - - try: - # Check if multiple files are selected - if len(file) > 1: - # Process multiple files - json = process_multiple_files(file, source, author) - df.set(pd.read_json(StringIO(json))) - # Reset all analysis results when new dataset is loaded - if reset_callback: - reset_callback() - text = ui.p( - f"{database}'s files uploaded and processed successfully! " - f"{len(file)} files have been processed and combined. " - f"The dataset contains {df.get().shape[0]} rows and {df.get().shape[1]} columns." + + for f in files: + + if source in ["scopus", "pubmed", "dimensions"]: + + standardized_df = convert2df( + source=source, + file_path=f["datapath"] ) + else: - # Process single file (original logic) - type = file[0]["name"] - json = biblio_json(file[0]["datapath"], source, type, author) - df.set(pd.read_json(StringIO(json))) - # Reset all analysis results when new dataset is loaded - if reset_callback: - reset_callback() - - if type.endswith(".zip"): - text = ui.p( - f"{database}'s ZIP archive uploaded and extracted successfully! " - f"Multiple files have been processed and combined. " - f"The dataset contains {df.get().shape[0]} rows and {df.get().shape[1]} columns." - ) - else: - text = ui.p( - f"{database}'s file uploaded successfully! You can now proceed to analyze your data. " - f"The dataset contains {df.get().shape[0]} rows and {df.get().shape[1]} columns." - ) - except Exception as e: - text = ui.div( - ui.h5("Error processing file(s):", style="color: red;"), - ui.p(str(e), style="color: red;"), - ui.p("Please check that your files are in the correct format and try again.", style="color: gray;") - ) - - elif input.select() == "1B": - df.set(pd.read_excel(file[0]["datapath"])) - # Reset all analysis results when new dataset is loaded + file_name = f["name"] + + json_data = biblio_json( + f["datapath"], + source, + file_name, + author + ) + + standardized_df = pd.read_json(StringIO(json_data)) + + frames.append(standardized_df) + + final_df = pd.concat(frames, ignore_index=True) + + df.set(final_df) + if reset_callback: reset_callback() - text = ui.p( - f"{database}'s file uploaded successfully! You can now proceed to analyze your data. " - f"The dataset contains {df.get().shape[0]} rows and {df.get().shape[1]} columns." + + return ui.p( + f"✅ Upload successful: {len(files)} file(s)\n" + f"Rows: {final_df.shape[0]} | Columns: {final_df.shape[1]}" ) - else: - text = "" + except Exception as e: - return text + return ui.div( + ui.h5("Error processing file(s):", style="color: red;"), + ui.p(str(e), style="color: red;") + ) diff --git a/functions/get_database.py b/functions/get_database.py index 5c5d4edc5..7d391b8d3 100644 --- a/functions/get_database.py +++ b/functions/get_database.py @@ -1,3 +1,5 @@ +"""Helpers for resolving the selected bibliographic database label.""" + from www.services import * diff --git a/functions/get_filters.py b/functions/get_filters.py index 206c215aa..90f5975f4 100644 --- a/functions/get_filters.py +++ b/functions/get_filters.py @@ -12,7 +12,10 @@ def get_filters(df): Returns: A DataFrame with additional columns for filters and metrics. """ - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() # Calculate the minimum and maximum publication years data["Min_Year"] = data["PY"].min() diff --git a/functions/get_localcitedreferences.py b/functions/get_localcitedreferences.py index 68ea11fef..17d008803 100644 --- a/functions/get_localcitedreferences.py +++ b/functions/get_localcitedreferences.py @@ -13,7 +13,10 @@ def get_local_cited_refs(df, num_of_cited_refs, field_separator): Returns: A Plotly figure object and a DataFrame of the most local cited sources. """ - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() if isinstance(data["CR"].iloc[0], list): # Check if the first element is a list # Flatten the 'CR' column containing lists diff --git a/functions/get_localcitedsources.py b/functions/get_localcitedsources.py index 74b261455..0fb1f848d 100644 --- a/functions/get_localcitedsources.py +++ b/functions/get_localcitedsources.py @@ -16,7 +16,10 @@ def get_local_cited_sources(df, num_of_cited_sources): # Extract metadata tags for cited sources df = metaTagExtraction(df, "CR_SO") - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() if isinstance(data["CR_SO"].iloc[0], list): # Check if the first element is a list # Flatten the 'CR_SO' column containing lists diff --git a/functions/get_lotkalaw.py b/functions/get_lotkalaw.py index 94545fda2..71f9c77b5 100644 --- a/functions/get_lotkalaw.py +++ b/functions/get_lotkalaw.py @@ -14,7 +14,10 @@ def get_lotka_law(df): """ # Calculate Lotka's Law - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() # Author Productivity (Lotka's Law) authors = pd.Series([author.strip() for sublist in data['AU'] for author in sublist]) diff --git a/functions/get_maininformations.py b/functions/get_maininformations.py index 97443abdb..368d5cd11 100644 --- a/functions/get_maininformations.py +++ b/functions/get_maininformations.py @@ -12,7 +12,10 @@ def get_main_informations(df, log=False): Returns: A DataFrame with additional columns for filters and metrics. """ - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() #### Min and Max Year #### start_time = time.time() diff --git a/functions/get_relevantaffiliations.py b/functions/get_relevantaffiliations.py index b86e36509..56c6c58b6 100644 --- a/functions/get_relevantaffiliations.py +++ b/functions/get_relevantaffiliations.py @@ -13,7 +13,10 @@ def get_relevant_affiliations(df, num_of_affiliations, disambiguation): Returns: A Plotly figure object and a DataFrame of the most relevant authors. """ - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() if disambiguation == "yes": # Extract affiliations from the "AU_UN" field diff --git a/functions/get_relevantauthors.py b/functions/get_relevantauthors.py index cdf960151..50bc2e2be 100644 --- a/functions/get_relevantauthors.py +++ b/functions/get_relevantauthors.py @@ -13,7 +13,10 @@ def get_relevant_authors(df, num_of_authors, frequency="N. of Documents"): Returns: A Plotly figure object and a DataFrame of the most relevant authors. """ - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() # Drop rows with missing values data = data.dropna(subset=["AU"]) diff --git a/functions/get_relevantsources.py b/functions/get_relevantsources.py index dccd8d3e5..cad17616d 100644 --- a/functions/get_relevantsources.py +++ b/functions/get_relevantsources.py @@ -12,7 +12,10 @@ def get_relevant_sources(df, num_of_sources): Returns: A Plotly figure object and a DataFrame of the most relevant sources. """ - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() # Drop rows with missing values data = data.dropna(subset=["SO"]) diff --git a/functions/get_sourcesproduction.py b/functions/get_sourcesproduction.py index 0795668d7..81a375291 100644 --- a/functions/get_sourcesproduction.py +++ b/functions/get_sourcesproduction.py @@ -13,7 +13,10 @@ def get_sources_production(df, num_of_sources_production, occurences): Returns: A Plotly figure object representing the sources' production over time. """ - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() # Calculate the number of publications per year for each source WSO = cocMatrix(df, Field="SO") diff --git a/functions/get_table.py b/functions/get_table.py index 75b9c91d8..7c20d7ddd 100644 --- a/functions/get_table.py +++ b/functions/get_table.py @@ -79,7 +79,10 @@ def get_table(database, df, dpi=300, filter=False, modal=True): A DataTable object if data is available, otherwise a message indicating no data. """ # Retrieve the data from the DataFrame - data = df.get() + if hasattr(df, "get") and not isinstance(df, pd.DataFrame): + data = df.get() + else: + data = df.copy() table_html = "" fig = None diff --git a/www/services/api_retrievers.py b/www/services/api_retrievers.py new file mode 100644 index 000000000..6d92b2a38 --- /dev/null +++ b/www/services/api_retrievers.py @@ -0,0 +1,237 @@ +"""Optional API retrievers for the advanced project level.""" + +from __future__ import annotations + +import time +from typing import Any + +import pandas as pd +import requests +from bs4 import BeautifulSoup +import xml.etree.ElementTree as ET + + +def fetch_openalex(query: str, max_records: int = 200, per_page: int = 50, polite_email: str | None = None) -> pd.DataFrame: + """Fetch works from OpenAlex and return a raw DataFrame for convert2df(raw_df=...). + + Handles cursor pagination and basic retry/backoff. The returned columns are + intentionally close to OPENALEX_MAPPING in mappings.py. + """ + rows: list[dict[str, Any]] = [] + cursor = "*" + headers = {"User-Agent": f"bibliometrix-python-etl ({polite_email})"} if polite_email else None + + while len(rows) < max_records: + params = { + "search": query, + "per-page": min(per_page, max_records - len(rows)), + "cursor": cursor, + } + if polite_email: + params["mailto"] = polite_email + + for attempt in range(3): + response = requests.get("https://api.openalex.org/works", params=params, headers=headers, timeout=200) + if response.status_code in {429, 500, 502, 503, 504}: + time.sleep(2 ** attempt) + continue + response.raise_for_status() + break + else: + response.raise_for_status() + + payload = response.json() + for work in payload.get("results", []): + primary_location = work.get("primary_location") or {} + source = (primary_location.get("source") or {}).get("display_name", "") + authorships = work.get("authorships") or [] + authors = [a.get("author", {}).get("display_name", "") for a in authorships] + institutions = [] + for authorship in authorships: + institutions.extend(i.get("display_name", "") for i in authorship.get("institutions", [])) + keywords = [k.get("display_name", "") for k in work.get("keywords", [])] + + abstract = "" + inverted = work.get("abstract_inverted_index") or {} + if inverted: + positions = [] + for word, indexes in inverted.items(): + for index in indexes: + positions.append((index, word)) + abstract = " ".join(word for _, word in sorted(positions)) + + ids = work.get("ids") or {} + rows.append({ + "id": work.get("id", ""), + "doi": work.get("doi", ""), + "pmid": ids.get("pmid", ""), + "title": work.get("title", ""), + "publication_year": work.get("publication_year", ""), + "type": work.get("type", ""), + "cited_by_count": work.get("cited_by_count", 0), + "abstract": abstract, + "source": source, + "authors": authors, + "authorships": authors, + "institutions": institutions, + "keywords": keywords, + "referenced_works": work.get("referenced_works", []), + }) + + next_cursor = payload.get("meta", {}).get("next_cursor") + if not next_cursor or next_cursor == cursor: + break + cursor = next_cursor + + return pd.DataFrame(rows) + +def fetch_pubmed( + query: str, + max_records: int = 200, +) -> pd.DataFrame: + """ + Fetch publications from PubMed API and return a raw DataFrame + suitable for convert2df(raw_df=...). + """ + + # ---------- Search ---------- + search_params = { + "db": "pubmed", + "term": query, + "retmax": max_records, + "retmode": "json", + } + + response = requests.get( + "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi", + params=search_params, + timeout=30, + ) + + response.raise_for_status() + + pmids = response.json().get("esearchresult", {}).get("idlist", []) + + if not pmids: + return pd.DataFrame() + + # ---------- Fetch ---------- + fetch_params = { + "db": "pubmed", + "id": ",".join(pmids), + "retmode": "xml", + } + + response = requests.get( + "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi", + params=fetch_params, + timeout=60, + ) + + response.raise_for_status() + + root = ET.fromstring(response.text) + + rows = [] + + for article in root.findall(".//PubmedArticle"): + + try: + + pmid = article.findtext(".//PMID", default="") + + title = article.findtext(".//ArticleTitle", default="") + + abstract = " ".join( + x.text or "" + for x in article.findall(".//AbstractText") + ) + + year = article.findtext(".//PubDate/Year", default="") + + journal = article.findtext(".//Journal/Title", default="") + + journal_abbreviation = article.findtext( + ".//ISOAbbreviation", + default="" + ) + + language = article.findtext( + ".//Language", + default="" + ) + + doi = "" + + for aid in article.findall(".//ArticleId"): + if aid.attrib.get("IdType") == "doi": + doi = aid.text or "" + break + + # Authors + authors = [] + + for author in article.findall(".//Author"): + + last = author.findtext("LastName", default="") + first = author.findtext("ForeName", default="") + + if last or first: + authors.append( + f"{last}, {first}" + ) + + # Affiliations + affiliations = [] + + for aff in article.findall(".//Affiliation"): + if aff.text: + affiliations.append(aff.text) + + # Keywords + keywords = [] + + for kw in article.findall(".//Keyword"): + if kw.text: + keywords.append(kw.text) + + # MeSH Terms + mesh_terms = [] + + for mesh in article.findall(".//MeshHeading"): + descriptor = mesh.find("DescriptorName") + + if descriptor is not None: + mesh_terms.append(descriptor.text) + + # Publication Types + publication_types = [] + + for pt in article.findall(".//PublicationType"): + if pt.text: + publication_types.append(pt.text) + + rows.append( + { + "pmid": pmid, + "doi": doi, + "title": title, + "publication_year": year, + "journal": journal, + "journal_abbreviation": journal_abbreviation, + "abstract": abstract, + "authors": authors, + "keywords": keywords, + "mesh_terms": mesh_terms, + "affiliations": affiliations, + "language": language, + "publication_type": publication_types, + } + ) + + except Exception: + continue + + time.sleep(0.01) + + return pd.DataFrame(rows) diff --git a/www/services/data_validation.py b/www/services/data_validation.py new file mode 100644 index 000000000..549a37ab9 --- /dev/null +++ b/www/services/data_validation.py @@ -0,0 +1,87 @@ +"""Legacy DataFrame validation and coercion helpers. + +This module checks that mapped bibliographic data has the WoS-style columns and +basic value types expected by downstream analysis functions. +""" + +from www.services.utils import * + +def is_df_valid(df: pd.DataFrame): + """Validates and coerces a bibliometric DataFrame to the required schema. + + This function performs a structural integrity check on the DataFrame to + ensure it contains the mandatory WoS-style columns. It validates column + types (string, integer, and list-of-strings) and attempts to coerce + invalid or missing data where appropriate (e.g., filling missing + strings, converting numeric types). + + Args: + df (pd.DataFrame): The DataFrame to validate, containing mapped + bibliographic records. + + Returns: + tuple: A tuple containing: + - bool: True if the DataFrame passes validation (or was + successfully coerced), False otherwise. + - pd.DataFrame or None: The cleaned DataFrame if valid, + otherwise None. + """ + # Copy the dataframe to avoid modifying the original parsed object directly + df = df.copy() + + # PY and TC removed from STR_COLUMNS + STR_COLUMNS = [ + "DB", "UT", "DI", "PMID", "TI", "SO", "JI", "DT", "LA", + "RP", "AB", "VL", "IS", "BP", "EP", "SR" + ] + + LST_COLUMNS = [ + "AU", "AF", "C1", "CR", "DE", "ID" + ] + + # Added INT_COLUMNS + INT_COLUMNS = [ + "PY", "TC" + ] + + # 1. Verify all expected columns exist + for col in STR_COLUMNS + LST_COLUMNS + INT_COLUMNS: + if col not in df.columns: + print(f"The tag {col} is missing. The dataframe is not valid.") + return (False, None) + + # 2. Check and cast STR_COLUMNS + for col in STR_COLUMNS: + is_pure_str = df[col].map(lambda x: isinstance(x, str)).all() + + if is_pure_str: + print(f"Column {col} type: clean str ✅") + else: + print(f"The column {col} contains non-string types or illegal None values! Coercing...") + df[col] = df[col].fillna("").astype("string") + + # 3. Check and cast INT_COLUMNS + for col in INT_COLUMNS: + is_pure_int = df[col].map(lambda x: isinstance(x, int)).all() + + if is_pure_int: + print(f"Column {col} type: clean int ✅") + else: + print(f"The column {col} contains non-int types or missing values! Coercing...") + # to_numeric safely parses strings like "2023", fills unparseable/NaNs with 0, and casts to int + df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int) + + # 4. Strict Check for LST_COLUMNS + for col in LST_COLUMNS: + is_valid_list = df[col].map( + lambda x: isinstance(x, list) and all(isinstance(i, str) for i in x) + ).all() + + if not is_valid_list: + print(f"The column {col} fails constraints. Found non-list types, None values, or non-string elements inside lists.") + return (False, None) + + print(f"Column {col} type: clean list of strings ✅") + + print("The dataset is in a valid state. Proceeding.") + return (True, df) diff --git a/www/services/extractors.py b/www/services/extractors.py new file mode 100644 index 000000000..2662361d1 --- /dev/null +++ b/www/services/extractors.py @@ -0,0 +1,79 @@ +"""Source extraction functions for manual bibliographic exports.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pandas as pd + +try: + from parsers import parse_pubmed_data +except Exception: # pragma: no cover - allows local testing outside Shiny package path + parse_pubmed_data = None + + +def extract_table(path: str | Path) -> pd.DataFrame: + """Read CSV/XLSX tabular exports into a DataFrame.""" + path = Path(path) + suffix = path.suffix.lower() + if suffix == ".csv": + return pd.read_csv(path) + if suffix in {".xlsx", ".xls"}: + return pd.read_excel(path) + raise ValueError(f"Unsupported tabular file type: {suffix}") + + +def extract_pubmed_txt(path): + """ + Parse a PubMed MEDLINE/plain text export into a pandas DataFrame. + Supports tags such as PMID, TI, AB, AU, FAU, JT, TA, DP, PT, LA, MH, AID. + """ + import pandas as pd + + records = [] + current = {} + current_tag = None + + with open(path, "r", encoding="utf-8", errors="ignore") as f: + for line in f: + raw = line.rstrip("\n") + + if not raw.strip(): + if current: + records.append(current) + current = {} + current_tag = None + continue + + if len(raw) >= 6 and raw[4:6] == "- ": + tag = raw[:4].strip() + value = raw[6:].strip() + current_tag = tag + + if tag in current: + if isinstance(current[tag], list): + current[tag].append(value) + else: + current[tag] = [current[tag], value] + else: + current[tag] = value + + elif current_tag: + if isinstance(current.get(current_tag), list): + current[current_tag][-1] += " " + raw.strip() + else: + current[current_tag] = str(current.get(current_tag, "")) + " " + raw.strip() + + if current: + records.append(current) + + return pd.DataFrame(records) + +def extract(source: str, path: str | Path) -> pd.DataFrame: + """Dispatch extraction by source name and file extension.""" + source_key = source.lower().strip() + path = Path(path) + if source_key == "pubmed" and path.suffix.lower() == ".txt": + return extract_pubmed_txt(path) + return extract_table(path) diff --git a/www/services/mappings.py b/www/services/mappings.py new file mode 100644 index 000000000..af2b7a73f --- /dev/null +++ b/www/services/mappings.py @@ -0,0 +1,105 @@ +"""Column mappings from source-specific exports to WoS-like field tags.""" + +SCOPUS_CSV_MAPPING = { + "EID": "UT", + "Authors": "AU", + "Author full names": "AF", + "Title": "TI", + "Year": "PY", + "Source title": "SO", + "Abbreviated Source Title": "JI", + "DOI": "DI", + "Cited by": "TC", + "Abstract": "AB", + "Author Keywords": "DE", + "Index Keywords": "ID", + "Affiliations": "C1", + "References": "CR", + "Volume": "VL", + "Issue": "IS", + "Page start": "BP", + "Page end": "EP", + "Document Type": "DT", + "Language of Original Document": "LA", +} + +DIMENSIONS_MAPPING = { + "Publication ID": "UT", + "DOI": "DI", + "Title": "TI", + "Source title": "SO", + "Journal": "SO", + "Year": "PY", + "Publication Year": "PY", + "Document Type": "DT", + "Language": "LA", + "Times cited": "TC", + "Citations": "TC", + "Authors": "AU", + "Authors Affiliations": "C1", + "Abstract": "AB", + "Volume": "VL", + "Issue": "IS", + "Pagination": "BP", +} + +PUBMED_MAPPING = { + "PMID": "PMID", + "AID": "DI", + "TI": "TI", + "JT": "SO", + "TA": "JI", + "DP": "PY", + "PT": "DT", + "LA": "LA", + "AU": "AU", + "FAU": "AF", + "AD": "C1", + "MH": "DE", + "OT": "ID", + "AB": "AB", + "VI": "VL", + "IP": "IS", + "PG": "BP", +} + +OPENALEX_MAPPING = { + "id": "UT", + "doi": "DI", + "pmid": "PMID", + "title": "TI", + "publication_year": "PY", + "type": "DT", + "cited_by_count": "TC", + "abstract": "AB", + "source": "SO", + "journal": "SO", + "authors": "AU", + "authorships": "AF", + "institutions": "C1", + "keywords": "DE", + "referenced_works": "CR", +} +PUBMED_API_MAPPING = { + "authors": "AU", + "title": "TI", + "journal": "SO", + "journal_abbreviation": "JI", + "publication_year": "PY", + "abstract": "AB", + "keywords": "DE", + "mesh_terms": "ID", + "doi": "DI", + "pmid": "PM", + "language": "LA", + "affiliations": "C1", + "publication_type": "DT", +} + +MAPPINGS = { + "scopus": SCOPUS_CSV_MAPPING, + "dimensions": DIMENSIONS_MAPPING, + "pubmed": PUBMED_MAPPING, + "openalex": OPENALEX_MAPPING, + "pubmed_api": PUBMED_API_MAPPING, +} diff --git a/www/services/metatagextraction.py b/www/services/metatagextraction.py index 5e1f8b9c8..ec745ba00 100644 --- a/www/services/metatagextraction.py +++ b/www/services/metatagextraction.py @@ -1,3 +1,10 @@ +"""Metadata tag extraction helpers for bibliographic DataFrames. + +These functions derive secondary tags such as source references, cited-author +fields, cited-source fields, countries, and affiliations from standardized ETL +columns. +""" + from .utils import * @@ -47,6 +54,7 @@ def metaTagExtraction(df, Field="AU_CO", sep=";", aff_disamb=False): def SR(M): + """Build unique short-reference tags from first author, year, and journal.""" listAU = M["AU"].apply(lambda l: [x.strip() for x in l]) if M["DB"].iloc[0].lower() == "scopus": listAU = listAU.apply(lambda l: [x.replace(" ", ",").replace(",,", ",").replace(" ", "") for x in l]) @@ -74,6 +82,7 @@ def SR(M): # TO BE DONE def CR_AU(M): + """Extract cited-reference author names into the ``CR_AU`` column.""" listCAU = M["CR"].apply(lambda x: x if isinstance(x, list) else []).apply(lambda l: [x for x in l if len(x) > 10]) FCAU = listCAU.apply(lambda l: [x.split(",")[0].strip() for x in l]) M["CR_AU"] = FCAU.apply(lambda l: ";".join(l)) @@ -82,6 +91,7 @@ def CR_AU(M): def CR_SO(M): + """Extract cited-reference source names into the ``CR_SO`` column.""" listCAU = M["CR"].apply(lambda x: x if isinstance(x, list) else []) if M["DB"].iloc[0].upper() != "SCOPUS": FCAU = listCAU.apply(lambda l: [x.split(",")[2].strip() for x in l if len(x.split(",")) > 2]) @@ -94,6 +104,15 @@ def CR_SO(M): def AU_CO(M, log=False): + """Extract all author countries from affiliation strings. + + Args: + M: Bibliographic DataFrame with ``C1`` and ``RP`` affiliation fields. + log: When True, write extracted country lists to ``affiliations.txt``. + + Returns: + The input DataFrame with an ``AU_CO`` column added or updated. + """ # Read the list of countries with open("www/static/countries.txt", "r") as file: countries = file.read().splitlines() @@ -145,6 +164,15 @@ def AU_CO(M, log=False): def AU1_CO(M, log=False): + """Extract the first author's country from affiliation strings. + + Args: + M: Bibliographic DataFrame with ``C1`` and ``RP`` affiliation fields. + log: When True, write extracted countries to ``first_author_countries.txt``. + + Returns: + The input DataFrame with an ``AU1_CO`` column added or updated. + """ # Read the list of countries with open("www/static/countries.txt", "r") as file: countries = file.read().splitlines() @@ -204,6 +232,15 @@ def AU1_CO(M, log=False): # TO BE DONE def AU_UN(M, sep): + """Extract author and first-author organization names from affiliations. + + Args: + M: Bibliographic DataFrame with affiliation and reprint-address fields. + sep: Separator used to split affiliation strings. + + Returns: + The input DataFrame with ``AU_UN``, ``AU1_UN``, and ``AU_UN_NR`` fields. + """ C1 = M["C1"].fillna(M["RP"]) AFF = C1.str.replace(r"\[.*?\] ", "", regex=True) indna = AFF.isna() @@ -217,6 +254,7 @@ def AU_UN(M, sep): "CONSORTIUM", "OBSERVAT", "AGRI", "MIT ", "INFN", "SUNY "] def extract_affiliations(l): + """Return normalized institution names for one list of affiliations.""" index = [] for item in l: item = item.replace("(REPRINT AUTHOR)", "") diff --git a/www/services/normalizers.py b/www/services/normalizers.py new file mode 100644 index 000000000..ab8a67c20 --- /dev/null +++ b/www/services/normalizers.py @@ -0,0 +1,131 @@ +"""Normalization helpers for the bibliometrix-python ETL.""" + +from __future__ import annotations + +import math +import re +from typing import Any, Iterable + +import pandas as pd + +from .schema import INTEGER_COLUMNS, MULTI_VALUE_COLUMNS, SCALAR_COLUMNS, STANDARD_COLUMNS + + +def is_missing(value: Any) -> bool: + """Return True for None, NaN, pandas NA, and empty-like missing values.""" + if value is None: + return True + try: + if pd.isna(value): + return True + except (TypeError, ValueError): + return False + return False + + +def split_multi_value(value: Any) -> list[str]: + """Convert a raw multi-value cell into a clean list of strings. + + Supports common bibliographic delimiters: semicolon, pipe, newline, and repeated + PubMed-style fields already joined with semicolons by parsers.py. + """ + if is_missing(value): + return [] + if isinstance(value, list): + raw_items = value + elif isinstance(value, tuple) or isinstance(value, set): + raw_items = list(value) + else: + text = str(value).strip() + if not text or text.lower() in {"nan", "none", "null"}: + return [] + raw_items = re.split(r"\s*;\s*|\s*\|\s*|\n+", text) + + cleaned: list[str] = [] + for item in raw_items: + if is_missing(item): + continue + item_text = re.sub(r"\s+", " ", str(item)).strip() + if item_text and item_text.lower() not in {"nan", "none", "null"}: + cleaned.append(item_text) + return cleaned + + +def normalize_scalar(value: Any) -> str: + """Convert a scalar bibliographic value to a non-null string.""" + if is_missing(value): + return "" + text = re.sub(r"\s+", " ", str(value)).strip() + if text.lower() in {"nan", "none", "null"}: + return "" + return text + + +def normalize_int(value: Any) -> int: + """Convert citation counters and other integer columns to int, defaulting to 0.""" + if is_missing(value): + return 0 + text = str(value).strip() + if not text or text.lower() in {"nan", "none", "null"}: + return 0 + match = re.search(r"-?\d+", text.replace(",", "")) + return int(match.group(0)) if match else 0 + + +def normalize_year(value: Any) -> str: + """Extract a four-digit publication year as a string.""" + text = normalize_scalar(value) + match = re.search(r"(18|19|20|21)\d{2}", text) + return match.group(0) if match else "" + + +def normalize_doi(value: Any) -> str: + """Extract and normalize a DOI when possible.""" + text = normalize_scalar(value) + if not text: + return "" + # PubMed AID may contain: 10.xxxx [doi] + match = re.search(r"10\.\d{4,9}/[^\s;]+", text, flags=re.IGNORECASE) + doi = match.group(0) if match else text + return doi.replace("[doi]", "").strip().lower() + + +def ensure_standard_columns(df: pd.DataFrame) -> pd.DataFrame: + """Add all missing standard columns with type-aware empty values.""" + result = df.copy() + for col in STANDARD_COLUMNS: + if col not in result.columns: + if col in MULTI_VALUE_COLUMNS: + result[col] = [[] for _ in range(len(result))] + elif col in INTEGER_COLUMNS: + result[col] = 0 + else: + result[col] = "" + return result + + +def normalize_types(df: pd.DataFrame) -> pd.DataFrame: + """Enforce the target type contract for all standard columns.""" + result = ensure_standard_columns(df) + + for col in MULTI_VALUE_COLUMNS: + result[col] = result[col].apply(split_multi_value) + + for col in INTEGER_COLUMNS: + result[col] = result[col].apply(normalize_int) + + for col in SCALAR_COLUMNS: + result[col] = result[col].apply(normalize_scalar) + + result["PY"] = result["PY"].apply(normalize_year) + result["DI"] = result["DI"].apply(normalize_doi) + return result[STANDARD_COLUMNS] + + +def serialize_lists_for_csv(df: pd.DataFrame, delimiter: str = ";") -> pd.DataFrame: + """Return a CSV-safe copy where list columns are serialized with semicolons.""" + result = df.copy() + for col in MULTI_VALUE_COLUMNS: + if col in result.columns: + result[col] = result[col].apply(lambda xs: delimiter.join(xs) if isinstance(xs, list) else normalize_scalar(xs)) + return result diff --git a/www/services/parsers.py b/www/services/parsers.py index 72b9d370e..ca28e6558 100644 --- a/www/services/parsers.py +++ b/www/services/parsers.py @@ -1,8 +1,19 @@ +"""Parsers for bibliographic exports that are not already tabular. + +Each parser returns a list of record dictionaries, preserving source tags so +the ETL mapping layer can later transform them into the standard schema. +""" + from .utils import * #### WEB OF SCIENCE PARSER #### def parse_wos_data(datapath): # PARSER FOR WEB OF SCIENCE TXT and CIW + """Parse a Web of Science TXT or CIW export into record dictionaries. + + Continuation lines are appended to the current tag, while repeated records + are separated by the WoS ``ER`` marker. + """ elem_data = [] data = {} current_key = None @@ -41,6 +52,11 @@ def parse_wos_data(datapath): # PARSER FOR WEB OF SCIENCE TXT and CIW #### PUBMED PARSER #### def parse_pubmed_data(datapath): # PARSER FOR PUBMED TXT + """Parse a PubMed plain-text export into record dictionaries. + + PubMed tags are read from ``KEY - value`` lines; continuation lines are + joined to the previous tag and repeated tags are semicolon-separated. + """ data = [] current_record = {} @@ -78,6 +94,11 @@ def parse_pubmed_data(datapath): # PARSER FOR PUBMED TXT #### COCHRANE PARSER #### def parse_cochrane_data(datapath): + """Parse a Cochrane citation text export into record dictionaries. + + Records are detected from blank lines and ``Record #`` markers. Field + values use ``KEY: value`` tags and continuation lines extend the latest tag. + """ data = [] current_record = {} diff --git a/www/services/schema.py b/www/services/schema.py new file mode 100644 index 000000000..2c8ec87be --- /dev/null +++ b/www/services/schema.py @@ -0,0 +1,16 @@ +"""Target WoS-like schema for bibliometrix-python ETL. + +The analytical functions in bibliometrix-python mostly expect Web of Science style +field tags. This module defines the minimal stable contract produced by the ETL. +""" + +STANDARD_COLUMNS = [ + "DB", "UT", "DI", "PMID", "TI", "SO", "JI", "PY", "DT", "LA", + "TC", "AU", "AF", "C1", "RP", "CR", "DE", "ID", "AB", + "VL", "IS", "BP", "EP", "SR", +] + +MULTI_VALUE_COLUMNS = {"AU", "AF", "C1", "CR", "DE", "ID"} +INTEGER_COLUMNS = {"TC"} + +SCALAR_COLUMNS = set(STANDARD_COLUMNS) - MULTI_VALUE_COLUMNS - INTEGER_COLUMNS diff --git a/www/services/standardizer.py b/www/services/standardizer.py new file mode 100644 index 000000000..2af1632bd --- /dev/null +++ b/www/services/standardizer.py @@ -0,0 +1,111 @@ +"""Unified convert2df-like ETL entry point for bibliometrix-python. + +This module converts heterogeneous bibliographic exports into a stable WoS-like +schema expected by many bibliometrix-python analytical functions. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pandas as pd + +from .extractors import extract +from .mappings import MAPPINGS +from .normalizers import ensure_standard_columns, normalize_types, serialize_lists_for_csv, split_multi_value +from .validators import validate_standard_df + + +def _rename_with_mapping(df, mapping): + """ + Rename source columns into standard WoS-like columns. + Handles duplicate target columns safely by taking the first non-empty value. + """ + import pandas as pd + + renamed = df.rename(columns=mapping) + result = pd.DataFrame(index=df.index) + + for col in renamed.columns.unique(): + selected = renamed.loc[:, renamed.columns == col] + + if selected.shape[1] == 1: + result[col] = selected.iloc[:, 0] + else: + result[col] = selected.bfill(axis=1).iloc[:, 0] + + return result +def generate_sr(row: pd.Series) -> str: + """Generate a stable short reference key: FirstAuthor, Year, Source. + + If the repository exposes an SR helper later, replace this function body with + a call to that helper to satisfy the project instruction exactly. + """ + authors = row.get("AU", []) + first_author = "" + if isinstance(authors, list) and authors: + first_author = authors[0] + elif isinstance(authors, str): + parsed = split_multi_value(authors) + first_author = parsed[0] if parsed else "" + + surname = first_author.split(",")[0].split(" ")[0].strip().upper() if first_author else "" + year = str(row.get("PY", "")).strip() + source = str(row.get("SO", "")).strip().upper() + parts = [part for part in [surname, year, source] if part] + return ", ".join(parts) + + +def standardize_dataframe(df: pd.DataFrame, source: str) -> pd.DataFrame: + """Transform an already-loaded raw DataFrame into the standard schema.""" + source_key = source.lower().strip() + if source_key not in MAPPINGS: + supported = ", ".join(sorted(MAPPINGS)) + raise ValueError(f"Unsupported source '{source}'. Supported sources: {supported}") + + mapped = _rename_with_mapping(df, MAPPINGS[source_key]) + mapped = ensure_standard_columns(mapped) + mapped["DB"] = source_key.upper() + normalized = normalize_types(mapped) + + # UT fallback: use PMID or DOI if the source has no native unique id. + normalized["UT"] = normalized.apply( + lambda row: row["UT"] or row["PMID"] or row["DI"], axis=1 + ) + normalized["SR"] = normalized.apply(generate_sr, axis=1) + + validate_standard_df(normalized) + if "PY" in normalized.columns: + normalized["PY"] = ( + pd.to_numeric(normalized["PY"], errors="coerce") + .fillna(0) + .astype(int) + ) + return normalized + + +def convert2df(source: str, file_path: str | Path | None = None, raw_df: pd.DataFrame | None = None) -> pd.DataFrame: + """Extract, transform, and validate bibliographic data. + + Args: + source: Bibliographic source name: scopus, pubmed, dimensions, openalex. + file_path: Path to a manually exported file. Required unless raw_df is given. + raw_df: Optional pre-loaded raw DataFrame, useful for API results or tests. + + Returns: + A standardized pandas DataFrame with WoS-like field tags. + """ + if raw_df is None: + if file_path is None: + raise ValueError("Either file_path or raw_df must be provided.") + raw_df = extract(source, file_path) + return standardize_dataframe(raw_df, source) + + +def convert2csv(source: str, file_path: str | Path, output_path: str | Path) -> pd.DataFrame: + """Standardize a source export and save it as a semicolon-serialized CSV.""" + df = convert2df(source=source, file_path=file_path) + csv_df = serialize_lists_for_csv(df) + csv_df.to_csv(output_path, index=False) + return df diff --git a/www/services/validators.py b/www/services/validators.py new file mode 100644 index 000000000..597583cc4 --- /dev/null +++ b/www/services/validators.py @@ -0,0 +1,40 @@ +"""Validation layer for standardized bibliographic DataFrames.""" + +from __future__ import annotations + +import pandas as pd + +from .schema import INTEGER_COLUMNS, MULTI_VALUE_COLUMNS, STANDARD_COLUMNS + + +class ValidationError(ValueError): + """Raised when a standardized DataFrame violates the ETL contract.""" + + +def validate_standard_df(df: pd.DataFrame) -> None: + """Validate mandatory columns, null safety, and type contracts. + + Raises: + ValidationError: if the dataframe is not safe for downstream analysis. + """ + missing = [col for col in STANDARD_COLUMNS if col not in df.columns] + if missing: + raise ValidationError(f"Missing mandatory columns: {missing}") + + # Avoid DataFrame-wide pd.isna because list cells can produce ambiguous arrays. + for col in STANDARD_COLUMNS: + for idx, value in df[col].items(): + if value is None: + raise ValidationError(f"None value found at row={idx}, column={col}") + if not isinstance(value, list) and pd.isna(value): + raise ValidationError(f"NaN value found at row={idx}, column={col}") + + for col in MULTI_VALUE_COLUMNS: + bad_rows = [idx for idx, value in df[col].items() if not isinstance(value, list) or not all(isinstance(x, str) for x in value)] + if bad_rows: + raise ValidationError(f"Column {col} must contain list[str]. Bad rows: {bad_rows[:10]}") + + for col in INTEGER_COLUMNS: + bad_rows = [idx for idx, value in df[col].items() if not isinstance(value, int)] + if bad_rows: + raise ValidationError(f"Column {col} must contain int. Bad rows: {bad_rows[:10]}")