From 9dbd6db275f7d3ce70e33f1e845ca0ca588b5c8a Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Tue, 10 Mar 2026 13:46:16 +0100 Subject: [PATCH 1/8] feat: configurable archive data generation at startup - generate_archive.py: add --days, --interval-seconds CLI args and ARCHIVE_DAYS, ARCHIVE_INTERVAL_SECONDS env var support; timestamps computed at runtime so archive is always relative to 'now' - Dockerfile: copy generate_archive.py into the simulator image - docker-compose.yml: add archive_generator service that runs before database starts (service_completed_successfully dependency); switch archive data from bind-mount to named volume - config.yml: generation frequency 30s->10s, 1 timestamp per file --- docker-compose.yml | 16 +++- mno_data_source_simulator/Dockerfile | 1 + mno_data_source_simulator/config.yml | 6 +- mno_data_source_simulator/generate_archive.py | 85 +++++++++++++++---- 4 files changed, 86 insertions(+), 22 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index bab31c8..d2fe207 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,14 @@ services: + archive_generator: + build: ./mno_data_source_simulator + command: python generate_archive.py --days ${ARCHIVE_DAYS:-30} --interval-seconds ${ARCHIVE_INTERVAL_SECONDS:-300} + environment: + - ARCHIVE_OUTPUT_DIR=/archive_output + - NETCDF_FILE=/app/example_data/openMRG_cmls_20150827_12hours.nc + volumes: + - ./parser/example_data:/app/example_data:ro + - archive_data:/archive_output + mno_simulator: build: ./mno_data_source_simulator depends_on: @@ -40,8 +50,11 @@ services: build: ./database ports: - "5432:5432" + depends_on: + archive_generator: + condition: service_completed_successfully volumes: - - ./database/archive_data:/docker-entrypoint-initdb.d/archive_data:ro + - archive_data:/docker-entrypoint-initdb.d/archive_data:ro - ./database/init_archive_data.sh:/docker-entrypoint-initdb.d/99-load-archive.sh:ro processor: @@ -143,6 +156,7 @@ services: - testing volumes: + archive_data: sftp_uploads: webserver_data_staged: webserver_data_archived: diff --git a/mno_data_source_simulator/Dockerfile b/mno_data_source_simulator/Dockerfile index cd86ea9..379ab8f 100644 --- a/mno_data_source_simulator/Dockerfile +++ b/mno_data_source_simulator/Dockerfile @@ -10,6 +10,7 @@ RUN pip install --no-cache-dir -r requirements.txt COPY data_generator.py . COPY sftp_uploader.py . COPY main.py . +COPY generate_archive.py . COPY config.yml . # Create directory for example data diff --git a/mno_data_source_simulator/config.yml b/mno_data_source_simulator/config.yml index 264afbb..f40dbe5 100644 --- a/mno_data_source_simulator/config.yml +++ b/mno_data_source_simulator/config.yml @@ -21,17 +21,17 @@ sftp: known_hosts_path: "/app/ssh_keys/known_hosts" remote_path: "/uploads" # Upload frequency in seconds - upload_frequency_seconds: 30 + upload_frequency_seconds: 10 # Connection timeout in seconds connection_timeout: 30 # Data generation configuration generator: # How often to generate new data points (in seconds) - generation_frequency_seconds: 30 + generation_frequency_seconds: 10 # Number of timestamps to include in each generated file # (timestamps will be spaced by time_resolution_seconds) - timestamps_per_file: 3 + timestamps_per_file: 1 # Time resolution between timestamps within a file (in seconds) time_resolution_seconds: 10 # Directory where generated CSV files will be written diff --git a/mno_data_source_simulator/generate_archive.py b/mno_data_source_simulator/generate_archive.py index bad2ac8..69fe9eb 100755 --- a/mno_data_source_simulator/generate_archive.py +++ b/mno_data_source_simulator/generate_archive.py @@ -5,8 +5,19 @@ This script uses the existing CMLDataGenerator to create archive data with real RSL/TSL values from the NetCDF file, but with fake timestamps spanning the configured archive period. + +Usage: + python generate_archive.py [--days N] [--interval-seconds S] [--output-dir PATH] [--netcdf-file PATH] + +Environment variables (fallbacks for CLI args): + ARCHIVE_DAYS Number of days of history to generate (default: 7) + ARCHIVE_INTERVAL_SECONDS Time resolution in seconds between data points (default: 10) + ARCHIVE_OUTPUT_DIR Output directory for archive files + NETCDF_FILE Path to the NetCDF source file """ +import argparse +import os import sys import gzip from pathlib import Path @@ -23,24 +34,27 @@ ) logger = logging.getLogger(__name__) -# Configuration -NETCDF_FILE = "../parser/example_data/openMRG_cmls_20150827_12hours.nc" -ARCHIVE_DAYS = 7 # Archive period in days (reduced for demo purposes) -TIME_INTERVAL_MINUTES = 5 # Resample to 5-minute intervals (reduces data size) -ARCHIVE_END_DATE = datetime.now() -ARCHIVE_START_DATE = ARCHIVE_END_DATE - timedelta(days=ARCHIVE_DAYS) -OUTPUT_DIR = "../database/archive_data" +# Defaults (overridable via CLI args or environment variables) +DEFAULT_NETCDF_FILE = "../parser/example_data/openMRG_cmls_20150827_12hours.nc" +DEFAULT_OUTPUT_DIR = "../database/archive_data" +DEFAULT_ARCHIVE_DAYS = 7 +DEFAULT_INTERVAL_SECONDS = 300 # 5-minute default; use 10 for raw real-time resolution # Output files (gzipped) METADATA_OUTPUT = "metadata_archive.csv.gz" DATA_OUTPUT = "data_archive.csv.gz" -def generate_archive_data(): +def generate_archive_data(archive_days, output_dir, netcdf_file, interval_seconds): """Generate archive metadata and time-series data.""" - netcdf_path = Path(__file__).parent / NETCDF_FILE - output_path = Path(__file__).parent / OUTPUT_DIR + netcdf_path = Path(netcdf_file) + if not netcdf_path.is_absolute(): + netcdf_path = Path(__file__).parent / netcdf_file + + output_path = Path(output_dir) + if not output_path.is_absolute(): + output_path = Path(__file__).parent / output_dir if not netcdf_path.exists(): logger.error(f"NetCDF file not found: {netcdf_path}") @@ -48,18 +62,21 @@ def generate_archive_data(): output_path.mkdir(parents=True, exist_ok=True) + archive_end_date = datetime.now() + archive_start_date = archive_end_date - timedelta(days=archive_days) + logger.info("=" * 60) logger.info("Generating Archive Data from NetCDF") logger.info("=" * 60) logger.info(f"NetCDF file: {netcdf_path}") logger.info( - f"Archive period: {ARCHIVE_START_DATE} to {ARCHIVE_END_DATE} ({ARCHIVE_DAYS} days)" + f"Archive period: {archive_start_date} to {archive_end_date} ({archive_days} days)" ) # Initialize the data generator generator = CMLDataGenerator( netcdf_file=str(netcdf_path), - loop_duration_seconds=ARCHIVE_DAYS * 24 * 3600, # Loop over archive period + loop_duration_seconds=archive_days * 24 * 3600, # Loop over archive period ) # Generate and save metadata using existing function @@ -75,19 +92,19 @@ def generate_archive_data(): # Generate timestamps for the archive period with configured interval logger.info(f"\nGenerating time-series data...") - logger.info(f" Time interval: {TIME_INTERVAL_MINUTES} minutes") + logger.info(f" Time interval: {interval_seconds} seconds") timestamps = pd.date_range( - start=ARCHIVE_START_DATE, - end=ARCHIVE_END_DATE, - freq=f"{TIME_INTERVAL_MINUTES}min", + start=archive_start_date, + end=archive_end_date, + freq=f"{interval_seconds}s", ) logger.info(f" Total timestamps: {len(timestamps):,}") logger.info(f" Total rows (estimate): {len(timestamps) * len(metadata_df):,}") # Set the generator's loop start time to archive start - generator.loop_start_time = ARCHIVE_START_DATE + generator.loop_start_time = archive_start_date # Generate data in batches using existing generate_data function batch_size = 100 @@ -131,4 +148,36 @@ def generate_archive_data(): if __name__ == "__main__": - generate_archive_data() + parser = argparse.ArgumentParser( + description="Generate archive CML data for database initialization." + ) + parser.add_argument( + "--days", + type=int, + default=int(os.getenv("ARCHIVE_DAYS", str(DEFAULT_ARCHIVE_DAYS))), + help=f"Number of days of archive data to generate (default: {DEFAULT_ARCHIVE_DAYS}, or ARCHIVE_DAYS env var)", + ) + parser.add_argument( + "--interval-seconds", + type=int, + default=int(os.getenv("ARCHIVE_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS))), + help=f"Time resolution in seconds between archive data points (default: {DEFAULT_INTERVAL_SECONDS}, or ARCHIVE_INTERVAL_SECONDS env var)", + ) + parser.add_argument( + "--output-dir", + default=os.getenv("ARCHIVE_OUTPUT_DIR", DEFAULT_OUTPUT_DIR), + help="Output directory for archive files (default: ../database/archive_data, or ARCHIVE_OUTPUT_DIR env var)", + ) + parser.add_argument( + "--netcdf-file", + default=os.getenv("NETCDF_FILE", DEFAULT_NETCDF_FILE), + help="Path to the NetCDF source file (default: ../parser/example_data/..., or NETCDF_FILE env var)", + ) + args = parser.parse_args() + + generate_archive_data( + archive_days=args.days, + output_dir=args.output_dir, + netcdf_file=args.netcdf_file, + interval_seconds=args.interval_seconds, + ) From 2b9198d0ce5a42bdbe47308782d24868a5818876 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Tue, 10 Mar 2026 13:46:30 +0100 Subject: [PATCH 2/8] fix: decouple cml_stats refresh from file ingestion to fix real-time lag update_cml_stats was doing a full table scan for all CMLs on every incoming 10-second file, causing ~4 minute processing delays. - db_writer.py: remove update_cml_stats from write_rawdata; add refresh_stats() method intended for background use only - main.py: run stats refresh in a dedicated daemon thread on a configurable timer (STATS_REFRESH_INTERVAL env var, default 60s) with its own DB connection so inserts are never blocked - init.sql: add idx_cml_data_cml_id index on cml_data(cml_id, time DESC) to speed up per-CML stats queries as data grows --- database/init.sql | 4 +++- parser/db_writer.py | 31 +++++++++++++++++++++++-------- parser/main.py | 27 +++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 9 deletions(-) diff --git a/database/init.sql b/database/init.sql index ca67ad3..3758236 100644 --- a/database/init.sql +++ b/database/init.sql @@ -77,4 +77,6 @@ BEGIN END; $$ LANGUAGE plpgsql; -SELECT create_hypertable('cml_data', 'time'); \ No newline at end of file +SELECT create_hypertable('cml_data', 'time'); + +CREATE INDEX idx_cml_data_cml_id ON cml_data (cml_id, time DESC); \ No newline at end of file diff --git a/parser/db_writer.py b/parser/db_writer.py index c410ba3..58f0c25 100644 --- a/parser/db_writer.py +++ b/parser/db_writer.py @@ -288,9 +288,6 @@ def write_rawdata(self, df) -> int: ) records = [tuple(x) for x in df_subset.to_numpy()] - # Get unique CML IDs for stats update - unique_cml_ids = df_subset["cml_id"].unique().tolist() - sql = "INSERT INTO cml_data (time, cml_id, sublink_id, rsl, tsl) VALUES %s" rows_written = self._with_connection_retry( @@ -299,15 +296,33 @@ def write_rawdata(self, df) -> int: ) ) - # Update stats for affected CMLs and commit once afterward - if rows_written > 0 and unique_cml_ids: - self._update_stats_for_cmls(unique_cml_ids) - + # Commit immediately after insert; stats are updated separately try: if self.conn: self.conn.commit() except Exception: - logger.exception("Failed to commit raw data + stats update") + logger.exception("Failed to commit raw data") raise return rows_written + + def refresh_stats(self) -> None: + """Recalculate cml_stats for all known CMLs. Intended to be called + from a background thread on a slow timer so it never blocks inserts.""" + cur = self.conn.cursor() + try: + cur.execute( + "SELECT update_cml_stats(cml_id::text) " + "FROM (SELECT DISTINCT cml_id FROM cml_metadata) t" + ) + self.conn.commit() + logger.info("Refreshed cml_stats for all CMLs") + except Exception: + try: + self.conn.rollback() + except Exception: + pass + logger.exception("Failed to refresh cml_stats") + finally: + if cur and not cur.closed: + cur.close() diff --git a/parser/main.py b/parser/main.py index 7513089..f192722 100644 --- a/parser/main.py +++ b/parser/main.py @@ -6,6 +6,7 @@ import os import time import logging +import threading from pathlib import Path from .file_watcher import FileWatcher @@ -26,6 +27,8 @@ class Config: "PROCESS_EXISTING_ON_STARTUP", "True" ).lower() in ("1", "true", "yes") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") + # How often (seconds) to recalculate aggregate CML stats in the background + STATS_REFRESH_INTERVAL = int(os.getenv("STATS_REFRESH_INTERVAL", "60")) def setup_logging(): @@ -85,12 +88,36 @@ def on_new_file(filepath): ) watcher.start() + # Background thread: refresh cml_stats on a slow timer so it never + # blocks file processing. + stop_event = threading.Event() + + def stats_loop(): + # Use a separate DBWriter connection so stats queries don't contend + # with the insert connection. + stats_db = DBWriter(Config.DATABASE_URL) + try: + stats_db.connect() + except Exception: + logger.exception("Stats thread: could not connect to DB") + return + while not stop_event.wait(Config.STATS_REFRESH_INTERVAL): + try: + stats_db.refresh_stats() + except Exception: + logger.exception("Stats thread: refresh_stats failed") + stats_db.close() + + stats_thread = threading.Thread(target=stats_loop, daemon=True, name="stats-refresh") + stats_thread.start() + try: while True: time.sleep(1) except KeyboardInterrupt: logger.info("Shutting down parser service") finally: + stop_event.set() watcher.stop() db_writer.close() From 268c0f9d3236e57307a3580983c0f8d63dbdb4c5 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Tue, 10 Mar 2026 17:18:01 +0100 Subject: [PATCH 3/8] perf: speed up archive generation with numpy caching and drop gzip Previously generate_archive.py called isel()+to_dataframe() once per archive timestamp (up to 864,000 calls for 10 days at 10s), and wrote the output through gzip compression. Combined: ~20 min for 10 days. - Pre-cache all unique NetCDF time slices (max 720) into numpy arrays upfront; use np.tile/repeat/concatenate to build batches of 5000 timestamps at once -- eliminates repeated xarray overhead - Drop gzip entirely: files live only in a Docker volume, compression was unnecessary and dominated write time - Output filenames change from .csv.gz to .csv - Update test to reflect new function signature and plain-file output --- mno_data_source_simulator/generate_archive.py | 82 ++++++++++++++----- .../tests/test_generate_archive.py | 17 ++-- 2 files changed, 72 insertions(+), 27 deletions(-) diff --git a/mno_data_source_simulator/generate_archive.py b/mno_data_source_simulator/generate_archive.py index 69fe9eb..26aea0d 100755 --- a/mno_data_source_simulator/generate_archive.py +++ b/mno_data_source_simulator/generate_archive.py @@ -19,10 +19,10 @@ import argparse import os import sys -import gzip from pathlib import Path from datetime import datetime, timedelta import logging +import numpy as np import pandas as pd from data_generator import CMLDataGenerator @@ -40,9 +40,9 @@ DEFAULT_ARCHIVE_DAYS = 7 DEFAULT_INTERVAL_SECONDS = 300 # 5-minute default; use 10 for raw real-time resolution -# Output files (gzipped) -METADATA_OUTPUT = "metadata_archive.csv.gz" -DATA_OUTPUT = "data_archive.csv.gz" +# Output files +METADATA_OUTPUT = "metadata_archive.csv" +DATA_OUTPUT = "data_archive.csv" def generate_archive_data(archive_days, output_dir, netcdf_file, interval_seconds): @@ -84,8 +84,7 @@ def generate_archive_data(archive_days, output_dir, netcdf_file, interval_second metadata_path = output_path / METADATA_OUTPUT metadata_df = generator.get_metadata_dataframe() - with gzip.open(metadata_path, "wt") as f: - metadata_df.to_csv(f, index=False) + metadata_df.to_csv(metadata_path, index=False) logger.info(f"Saved {len(metadata_df)} metadata rows to {metadata_path}") logger.info(f" Unique CML IDs: {metadata_df['cml_id'].nunique()}") @@ -106,29 +105,72 @@ def generate_archive_data(archive_days, output_dir, netcdf_file, interval_second # Set the generator's loop start time to archive start generator.loop_start_time = archive_start_date - # Generate data in batches using existing generate_data function - batch_size = 100 + # --- Fast numpy-cached generation --- + # Map each archive timestamp to a NetCDF index (cycles through 720 steps) + all_indices = np.array( + [generator._get_netcdf_index_for_timestamp(ts) for ts in timestamps] + ) + unique_indices = np.unique(all_indices) + logger.info( + f" Unique NetCDF time slices needed: {len(unique_indices)} " + f"(of {len(generator.original_time_points)} in file)" + ) + + # Pre-cache RSL/TSL arrays for each unique NetCDF index (one isel call each) + logger.info(" Pre-caching NetCDF slices...") + base_df = ( + generator.dataset.isel(time=int(unique_indices[0])) + .to_dataframe() + .reset_index()[["cml_id", "sublink_id", "tsl", "rsl"]] + ) + cml_ids = base_df["cml_id"].values + sublink_ids = base_df["sublink_id"].values + n_links = len(cml_ids) + + rsl_cache = {} + tsl_cache = {} + for idx in unique_indices: + df_slice = ( + generator.dataset.isel(time=int(idx)).to_dataframe().reset_index() + ) + rsl_cache[idx] = df_slice["rsl"].values + tsl_cache[idx] = df_slice["tsl"].values + logger.info(f" Cached {len(unique_indices)} slices, generating output...") + + # Write in batches using pre-cached numpy arrays + batch_size = 5000 # timestamps per batch (not rows) total_rows = 0 data_path = output_path / DATA_OUTPUT - with gzip.open(data_path, "wt") as f: + with open(data_path, "w") as f: first_batch = True - for i in range(0, len(timestamps), batch_size): - batch_timestamps = timestamps[i : i + batch_size] - - # Use existing generate_data function - df = generator.generate_data(batch_timestamps) - - # Write to gzipped CSV + batch_ts = timestamps[i : i + batch_size] + batch_indices = all_indices[i : i + batch_size] + batch_n = len(batch_ts) + + time_col = np.repeat(batch_ts.values, n_links) + cml_col = np.tile(cml_ids, batch_n) + sub_col = np.tile(sublink_ids, batch_n) + tsl_col = np.concatenate([tsl_cache[idx] for idx in batch_indices]) + rsl_col = np.concatenate([rsl_cache[idx] for idx in batch_indices]) + + df = pd.DataFrame( + { + "time": time_col, + "cml_id": cml_col, + "sublink_id": sub_col, + "tsl": tsl_col, + "rsl": rsl_col, + } + ) df.to_csv(f, index=False, header=first_batch) first_batch = False - total_rows += len(df) - # Progress indicator every 10% - if (i + batch_size) % (len(timestamps) // 10) < batch_size: - progress = min(100, ((i + batch_size) / len(timestamps)) * 100) + progress_interval = max(batch_size, len(timestamps) // 10) + if (i + batch_size) % progress_interval < batch_size: + progress = min(100, (i + batch_size) / len(timestamps) * 100) logger.info(f" Progress: {progress:.0f}% ({total_rows:,} rows)") logger.info(f"\nSaved {total_rows:,} data rows to {data_path}") diff --git a/mno_data_source_simulator/tests/test_generate_archive.py b/mno_data_source_simulator/tests/test_generate_archive.py index fac2d9e..079ac82 100644 --- a/mno_data_source_simulator/tests/test_generate_archive.py +++ b/mno_data_source_simulator/tests/test_generate_archive.py @@ -10,11 +10,11 @@ @patch("generate_archive.Path.mkdir") @patch("generate_archive.Path.exists") @patch("generate_archive.CMLDataGenerator") -@patch("generate_archive.gzip.open", new_callable=mock_open) -def test_generate_archive_creates_gzipped_files( - mock_gzip, mock_generator_class, mock_exists, mock_mkdir +@patch("builtins.open", new_callable=mock_open) +def test_generate_archive_creates_files( + mock_open_fn, mock_generator_class, mock_exists, mock_mkdir ): - """Test generate_archive_data() creates compressed metadata and data files.""" + """Test generate_archive_data() creates metadata and data CSV files.""" from generate_archive import generate_archive_data mock_exists.return_value = True @@ -38,10 +38,13 @@ def test_generate_archive_creates_gzipped_files( with patch("generate_archive.Path.stat") as mock_stat: mock_stat.return_value.st_size = 1024 - generate_archive_data() + generate_archive_data( + archive_days=1, + output_dir="/tmp/test_archive", + netcdf_file="/fake/file.nc", + interval_seconds=300, + ) - # Verify gzipped files created (critical for demo setup) - assert mock_gzip.call_count == 2 mock_generator.close.assert_called_once() From 18fe266898e2df88ffb4580d42bd5d57fcf5e7c0 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Tue, 10 Mar 2026 17:18:16 +0100 Subject: [PATCH 4/8] perf: speed up database initialization by ~3x - Move index creation from init.sql to after \COPY in init_archive_data.sh: building the index on an empty table was maintained during bulk load, doubling COPY time; post-load index build is a single sequential scan - Drop update_cml_stats call from init script: it scanned all rows for 364 CMLs before the DB was ready, adding 1-2 min; the parser's background stats thread handles this immediately on startup instead - Result: 2-day 10s archive init time reduced from ~5 min to ~98 s --- database/init.sql | 4 +++- database/init_archive_data.sh | 25 +++++++++++++------------ 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/database/init.sql b/database/init.sql index 3758236..abb2a94 100644 --- a/database/init.sql +++ b/database/init.sql @@ -79,4 +79,6 @@ $$ LANGUAGE plpgsql; SELECT create_hypertable('cml_data', 'time'); -CREATE INDEX idx_cml_data_cml_id ON cml_data (cml_id, time DESC); \ No newline at end of file +-- Index is created after bulk data load in 99-load-archive.sh for faster COPY. +-- If no archive data is loaded, create it manually: +-- CREATE INDEX idx_cml_data_cml_id ON cml_data (cml_id, time DESC); \ No newline at end of file diff --git a/database/init_archive_data.sh b/database/init_archive_data.sh index b515da3..5592f97 100755 --- a/database/init_archive_data.sh +++ b/database/init_archive_data.sh @@ -10,7 +10,7 @@ echo "Loading archive data into database..." ARCHIVE_DATA_DIR="/docker-entrypoint-initdb.d/archive_data" # Check if archive data exists (should be included in the repo) -if [ ! -f "$ARCHIVE_DATA_DIR/metadata_archive.csv.gz" ] || [ ! -f "$ARCHIVE_DATA_DIR/data_archive.csv.gz" ]; then +if [ ! -f "$ARCHIVE_DATA_DIR/metadata_archive.csv" ] || [ ! -f "$ARCHIVE_DATA_DIR/data_archive.csv" ]; then echo "Warning: Archive data files not found. Skipping archive data load." echo "Hint: Run 'python mno_data_source_simulator/generate_archive.py' to generate archive data." exit 0 @@ -19,7 +19,7 @@ fi # Load metadata first (required for foreign key references) echo "Loading metadata archive..." psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL - \COPY cml_metadata FROM PROGRAM 'gunzip -c $ARCHIVE_DATA_DIR/metadata_archive.csv.gz' WITH (FORMAT csv, HEADER true); + \COPY cml_metadata FROM '$ARCHIVE_DATA_DIR/metadata_archive.csv' WITH (FORMAT csv, HEADER true); EOSQL METADATA_COUNT=$(psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" -t -c "SELECT COUNT(*) FROM cml_metadata;") @@ -30,7 +30,7 @@ echo "Loading time-series archive data (this may take 10-30 seconds)..." START_TIME=$(date +%s) psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL - \COPY cml_data (time, cml_id, sublink_id, tsl, rsl) FROM PROGRAM 'gunzip -c $ARCHIVE_DATA_DIR/data_archive.csv.gz' WITH (FORMAT csv, HEADER true); + \COPY cml_data (time, cml_id, sublink_id, tsl, rsl) FROM '$ARCHIVE_DATA_DIR/data_archive.csv' WITH (FORMAT csv, HEADER true); EOSQL END_TIME=$(date +%s) @@ -39,6 +39,15 @@ DURATION=$((END_TIME - START_TIME)) DATA_COUNT=$(psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" -t -c "SELECT COUNT(*) FROM cml_data;") echo "Loaded $DATA_COUNT data records in $DURATION seconds" +# Build index after bulk load (much faster than maintaining it during COPY) +echo "Building index on cml_data..." +INDEX_START=$(date +%s) +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL + CREATE INDEX idx_cml_data_cml_id ON cml_data (cml_id, time DESC); +EOSQL +INDEX_END=$(date +%s) +echo "Index built in $((INDEX_END - INDEX_START)) seconds" + # Display time range of loaded data psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL SELECT @@ -49,13 +58,5 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-E FROM cml_data; EOSQL -# Populate cml_stats for all loaded CMLs -echo "Populating CML statistics..." -psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL - SELECT update_cml_stats(cml_id::text) FROM (SELECT DISTINCT cml_id FROM cml_metadata) t; -EOSQL - -STATS_COUNT=$(psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" -t -c "SELECT COUNT(*) FROM cml_stats;") -echo "Generated statistics for $STATS_COUNT CMLs" - echo "Archive data successfully loaded!" +# Note: cml_stats is populated by the parser's background stats thread on startup. From 675f37870399f12d8f7aaae6b11d65879e19dc2c Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Tue, 10 Mar 2026 17:18:30 +0100 Subject: [PATCH 5/8] fix: run initial stats refresh immediately on parser startup The background stats thread previously waited a full STATS_REFRESH_INTERVAL (60s) before its first run, so Grafana dashboards backed by cml_stats showed no data for up to a minute after the parser came up. Run refresh_stats() once immediately after connecting, then fall into the timed loop as before. --- parser/main.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/parser/main.py b/parser/main.py index f192722..f067312 100644 --- a/parser/main.py +++ b/parser/main.py @@ -101,6 +101,12 @@ def stats_loop(): except Exception: logger.exception("Stats thread: could not connect to DB") return + # Run immediately on startup so Grafana has fresh stats without + # waiting a full interval after the backlog is processed. + try: + stats_db.refresh_stats() + except Exception: + logger.exception("Stats thread: initial refresh_stats failed") while not stop_event.wait(Config.STATS_REFRESH_INTERVAL): try: stats_db.refresh_stats() From 921d39727de62c340d094d3b7e99f85398e1dbe7 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Tue, 10 Mar 2026 19:43:54 +0100 Subject: [PATCH 6/8] fix: make archive load script idempotent and dual-mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Refactor init_archive_data.sh to work both as a PostgreSQL init-phase script (Unix socket, no PGHOST) and as a standalone post-startup container (PGHOST set for TCP connection) - Credentials resolve via PGUSER/PGDATABASE env vars with fallback to POSTGRES_USER/POSTGRES_DB so the script works in both contexts - Replace bare \COPY cml_metadata with a temp-table approach: COPY → tmp, INSERT INTO cml_metadata ON CONFLICT DO NOTHING so the script never aborts when the parser has already inserted metadata from a real-time upload before the loader runs - Change CREATE INDEX to CREATE INDEX IF NOT EXISTS for the same reason - Update comment in init.sql to reference archive_loader service --- database/init.sql | 2 +- database/init_archive_data.sh | 52 +++++++++++++++++++++++++---------- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/database/init.sql b/database/init.sql index abb2a94..3483d87 100644 --- a/database/init.sql +++ b/database/init.sql @@ -79,6 +79,6 @@ $$ LANGUAGE plpgsql; SELECT create_hypertable('cml_data', 'time'); --- Index is created after bulk data load in 99-load-archive.sh for faster COPY. +-- Index is created by the archive_loader service after bulk data load (faster COPY). -- If no archive data is loaded, create it manually: -- CREATE INDEX idx_cml_data_cml_id ON cml_data (cml_id, time DESC); \ No newline at end of file diff --git a/database/init_archive_data.sh b/database/init_archive_data.sh index 5592f97..9b09177 100755 --- a/database/init_archive_data.sh +++ b/database/init_archive_data.sh @@ -1,55 +1,79 @@ #!/bin/bash set -e -# This script loads archive data into the database during initialization -# It runs after the schema is created (init.sql) but before the database -# accepts external connections. +# This script loads archive data into the database. +# It can run either: +# - during PostgreSQL init (mounted in /docker-entrypoint-initdb.d/), OR +# - as a standalone service after the DB is healthy (set PGHOST to connect remotely). +# +# Environment variables: +# ARCHIVE_DATA_DIR Path to the directory with metadata_archive.csv / data_archive.csv +# (default: /docker-entrypoint-initdb.d/archive_data) +# PGHOST Hostname of the PostgreSQL server (blank = Unix socket, i.e. init-time) +# PGUSER / POSTGRES_USER Database user (PGUSER takes precedence) +# PGDATABASE / POSTGRES_DB Database name (PGDATABASE takes precedence) +# PGPASSWORD Database password (required for remote connections) echo "Loading archive data into database..." -ARCHIVE_DATA_DIR="/docker-entrypoint-initdb.d/archive_data" +ARCHIVE_DATA_DIR="${ARCHIVE_DATA_DIR:-/docker-entrypoint-initdb.d/archive_data}" -# Check if archive data exists (should be included in the repo) +# Resolve credentials: prefer psql env vars, fall back to Postgres Docker image vars. +DB_USER="${PGUSER:-${POSTGRES_USER:-myuser}}" +DB_NAME="${PGDATABASE:-${POSTGRES_DB:-mydatabase}}" + +# Build the common psql flags. Add -h only when PGHOST is set (external connection). +PSQL_FLAGS="-v ON_ERROR_STOP=1 --username $DB_USER --dbname $DB_NAME" +if [ -n "${PGHOST:-}" ]; then + PSQL_FLAGS="$PSQL_FLAGS --host $PGHOST" +fi + +# Check if archive data exists if [ ! -f "$ARCHIVE_DATA_DIR/metadata_archive.csv" ] || [ ! -f "$ARCHIVE_DATA_DIR/data_archive.csv" ]; then echo "Warning: Archive data files not found. Skipping archive data load." echo "Hint: Run 'python mno_data_source_simulator/generate_archive.py' to generate archive data." exit 0 fi -# Load metadata first (required for foreign key references) +# Load metadata first (required for foreign key references). +# Use a temp table + INSERT ON CONFLICT DO NOTHING so that metadata already +# inserted by the parser (from a real-time upload) doesn't abort this script. echo "Loading metadata archive..." -psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL - \COPY cml_metadata FROM '$ARCHIVE_DATA_DIR/metadata_archive.csv' WITH (FORMAT csv, HEADER true); +psql $PSQL_FLAGS <<-EOSQL + CREATE TEMP TABLE tmp_cml_metadata (LIKE cml_metadata INCLUDING ALL); + \COPY tmp_cml_metadata FROM '$ARCHIVE_DATA_DIR/metadata_archive.csv' WITH (FORMAT csv, HEADER true); + INSERT INTO cml_metadata SELECT * FROM tmp_cml_metadata ON CONFLICT DO NOTHING; + DROP TABLE tmp_cml_metadata; EOSQL -METADATA_COUNT=$(psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" -t -c "SELECT COUNT(*) FROM cml_metadata;") +METADATA_COUNT=$(psql $PSQL_FLAGS -t -c "SELECT COUNT(*) FROM cml_metadata;") echo "Loaded $METADATA_COUNT metadata records" # Load time-series data using COPY for maximum speed echo "Loading time-series archive data (this may take 10-30 seconds)..." START_TIME=$(date +%s) -psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL +psql $PSQL_FLAGS <<-EOSQL \COPY cml_data (time, cml_id, sublink_id, tsl, rsl) FROM '$ARCHIVE_DATA_DIR/data_archive.csv' WITH (FORMAT csv, HEADER true); EOSQL END_TIME=$(date +%s) DURATION=$((END_TIME - START_TIME)) -DATA_COUNT=$(psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" -t -c "SELECT COUNT(*) FROM cml_data;") +DATA_COUNT=$(psql $PSQL_FLAGS -t -c "SELECT COUNT(*) FROM cml_data;") echo "Loaded $DATA_COUNT data records in $DURATION seconds" # Build index after bulk load (much faster than maintaining it during COPY) echo "Building index on cml_data..." INDEX_START=$(date +%s) -psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL - CREATE INDEX idx_cml_data_cml_id ON cml_data (cml_id, time DESC); +psql $PSQL_FLAGS <<-EOSQL + CREATE INDEX IF NOT EXISTS idx_cml_data_cml_id ON cml_data (cml_id, time DESC); EOSQL INDEX_END=$(date +%s) echo "Index built in $((INDEX_END - INDEX_START)) seconds" # Display time range of loaded data -psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL +psql $PSQL_FLAGS <<-EOSQL SELECT 'Archive data time range:' as info, MIN(time) as start_time, From e47f59e027427920a8bd0ecbad4aaafcdab11b4e Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Tue, 10 Mar 2026 19:44:31 +0100 Subject: [PATCH 7/8] fix: eliminate startup delay by decoupling archive load from DB init MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the archive data was bulk-loaded inside PostgreSQL's init phase (docker-entrypoint-initdb.d), which blocked all TCP connections for 1-3 minutes. Services that depended on 'database' would start, fail to connect, and the parser's stats thread would exit permanently. New startup sequence: archive_generator → generates CSVs (runs before DB starts) database → schema-only init, healthy in ~6 s parser → starts immediately when DB is healthy (~10 s T0) archive_loader → loads CSVs after DB is healthy, in background Changes: - docker-compose.yml: - Add healthcheck to database (pg_isready, 5 s interval) - Add archive_loader service: runs init_archive_data.sh as a standalone container after archive_generator completes and DB is healthy; does not block any other service - Remove archive volume mounts from database service (no longer loaded during DB init) - Change parser depends_on to condition: service_healthy so it only starts when the DB is actually accepting connections - parser/main.py: - Stats thread retries DB connection every 5 s instead of giving up after 3 attempts; prevents permanent silent failure if DB is momentarily unreachable Result: real-time data flows within ~10 s of docker compose up; archive history appears in Grafana ~90 s later without any gap or delay in real-time ingestion. --- docker-compose.yml | 34 +++++++++++++++++++++++++++------- parser/main.py | 15 +++++++++++---- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index d2fe207..089fd6c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -32,7 +32,8 @@ services: ports: - "5004:5004" depends_on: - - database + database: + condition: service_healthy environment: - DATABASE_URL=postgresql://myuser:mypassword@database:5432/mydatabase - PARSER_INCOMING_DIR=/app/data/incoming @@ -46,16 +47,35 @@ services: - parser_archived:/app/data/archived - parser_quarantine:/app/data/quarantine - database: - build: ./database - ports: - - "5432:5432" + archive_loader: + image: timescale/timescaledb:latest-pg13 depends_on: archive_generator: condition: service_completed_successfully + database: + condition: service_healthy + environment: + - PGHOST=database + - PGUSER=myuser + - PGPASSWORD=mypassword + - PGDATABASE=mydatabase + - ARCHIVE_DATA_DIR=/archive_data volumes: - - archive_data:/docker-entrypoint-initdb.d/archive_data:ro - - ./database/init_archive_data.sh:/docker-entrypoint-initdb.d/99-load-archive.sh:ro + - archive_data:/archive_data:ro + - ./database/init_archive_data.sh:/init_archive_data.sh:ro + command: ["/bin/bash", "/init_archive_data.sh"] + restart: "no" + + database: + build: ./database + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U myuser -d mydatabase"] + interval: 5s + timeout: 5s + retries: 10 + start_period: 10s processor: build: ./processor diff --git a/parser/main.py b/parser/main.py index f067312..b2ea262 100644 --- a/parser/main.py +++ b/parser/main.py @@ -96,11 +96,18 @@ def stats_loop(): # Use a separate DBWriter connection so stats queries don't contend # with the insert connection. stats_db = DBWriter(Config.DATABASE_URL) - try: - stats_db.connect() - except Exception: - logger.exception("Stats thread: could not connect to DB") + + # Keep retrying until the DB is reachable (e.g. if it starts slowly). + while not stop_event.is_set(): + try: + stats_db.connect() + break + except Exception: + logger.warning("Stats thread: DB not ready, retrying in 5s...") + stop_event.wait(5) + if stop_event.is_set(): return + # Run immediately on startup so Grafana has fresh stats without # waiting a full interval after the backlog is processed. try: From 64649c10921371bc475f0d4394b7d67bc62a1802 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Tue, 10 Mar 2026 19:53:00 +0100 Subject: [PATCH 8/8] fix(tests): update test_generate_archive.py for refactored generate_archive_data After the numpy-cached rewrite: - generate_archive_data() requires 4 positional args (archive_days, output_dir, netcdf_file, interval_seconds) -- update both tests to pass them explicitly - The generation path no longer calls generator.generate_data(); it accesses generator.dataset.isel(), _get_netcdf_index_for_timestamp(), and original_time_points directly -- mock those instead - pandas to_csv() calls pathlib.Path.is_dir() internally; patch it to return True so the mock_open approach still works - Add numpy import (used for mock attribute setup) --- .../tests/test_generate_archive.py | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/mno_data_source_simulator/tests/test_generate_archive.py b/mno_data_source_simulator/tests/test_generate_archive.py index 079ac82..b109693 100644 --- a/mno_data_source_simulator/tests/test_generate_archive.py +++ b/mno_data_source_simulator/tests/test_generate_archive.py @@ -2,6 +2,7 @@ from pathlib import Path from unittest.mock import patch, MagicMock, mock_open import pandas as pd +import numpy as np import sys sys.path.insert(0, str(Path(__file__).parent.parent)) @@ -19,6 +20,16 @@ def test_generate_archive_creates_files( mock_exists.return_value = True + # Minimal per-link DataFrame returned by dataset.isel().to_dataframe().reset_index() + slice_df = pd.DataFrame( + { + "cml_id": ["101", "102"], + "sublink_id": ["sublink_1", "sublink_1"], + "tsl": [50.0, 51.0], + "rsl": [-60.0, -61.0], + } + ) + mock_generator = MagicMock() mock_generator_class.return_value = mock_generator mock_generator.get_metadata_dataframe.return_value = pd.DataFrame( @@ -27,16 +38,15 @@ def test_generate_archive_creates_files( "sublink_id": ["sublink_1", "sublink_1"], } ) - mock_generator.generate_data.return_value = pd.DataFrame( - { - "time": pd.date_range("2024-01-01", periods=2, freq="5min"), - "cml_id": ["101", "101"], - "tsl": [50.0, 51.0], - "rsl": [-60.0, -61.0], - } + # Internal attributes used by the numpy-cached generation path + mock_generator.original_time_points = list(range(720)) + mock_generator._get_netcdf_index_for_timestamp.return_value = 0 + mock_generator.dataset.isel.return_value.to_dataframe.return_value.reset_index.return_value = ( + slice_df ) - with patch("generate_archive.Path.stat") as mock_stat: + with patch("generate_archive.Path.stat") as mock_stat, \ + patch("pathlib.Path.is_dir", return_value=True): mock_stat.return_value.st_size = 1024 generate_archive_data( archive_days=1, @@ -56,4 +66,9 @@ def test_generate_archive_fails_if_netcdf_missing(mock_exists): mock_exists.return_value = False with pytest.raises(SystemExit): - generate_archive_data() + generate_archive_data( + archive_days=1, + output_dir="/tmp/test_archive", + netcdf_file="/fake/file.nc", + interval_seconds=300, + )