Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion database/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,8 @@ BEGIN
END;
$$ LANGUAGE plpgsql;

SELECT create_hypertable('cml_data', 'time');
SELECT create_hypertable('cml_data', 'time');

-- Index is created by the archive_loader service after bulk data load (faster COPY).
-- If no archive data is loaded, create it manually:
-- CREATE INDEX idx_cml_data_cml_id ON cml_data (cml_id, time DESC);
71 changes: 48 additions & 23 deletions database/init_archive_data.sh
Original file line number Diff line number Diff line change
@@ -1,46 +1,79 @@
#!/bin/bash
set -e

# This script loads archive data into the database during initialization
# It runs after the schema is created (init.sql) but before the database
# accepts external connections.
# This script loads archive data into the database.
# It can run either:
# - during PostgreSQL init (mounted in /docker-entrypoint-initdb.d/), OR
# - as a standalone service after the DB is healthy (set PGHOST to connect remotely).
#
# Environment variables:
# ARCHIVE_DATA_DIR Path to the directory with metadata_archive.csv / data_archive.csv
# (default: /docker-entrypoint-initdb.d/archive_data)
# PGHOST Hostname of the PostgreSQL server (blank = Unix socket, i.e. init-time)
# PGUSER / POSTGRES_USER Database user (PGUSER takes precedence)
# PGDATABASE / POSTGRES_DB Database name (PGDATABASE takes precedence)
# PGPASSWORD Database password (required for remote connections)

echo "Loading archive data into database..."

ARCHIVE_DATA_DIR="/docker-entrypoint-initdb.d/archive_data"
ARCHIVE_DATA_DIR="${ARCHIVE_DATA_DIR:-/docker-entrypoint-initdb.d/archive_data}"

# Check if archive data exists (should be included in the repo)
if [ ! -f "$ARCHIVE_DATA_DIR/metadata_archive.csv.gz" ] || [ ! -f "$ARCHIVE_DATA_DIR/data_archive.csv.gz" ]; then
# Resolve credentials: prefer psql env vars, fall back to Postgres Docker image vars.
DB_USER="${PGUSER:-${POSTGRES_USER:-myuser}}"
DB_NAME="${PGDATABASE:-${POSTGRES_DB:-mydatabase}}"

# Build the common psql flags. Add -h only when PGHOST is set (external connection).
PSQL_FLAGS="-v ON_ERROR_STOP=1 --username $DB_USER --dbname $DB_NAME"
if [ -n "${PGHOST:-}" ]; then
PSQL_FLAGS="$PSQL_FLAGS --host $PGHOST"
fi

# Check if archive data exists
if [ ! -f "$ARCHIVE_DATA_DIR/metadata_archive.csv" ] || [ ! -f "$ARCHIVE_DATA_DIR/data_archive.csv" ]; then
echo "Warning: Archive data files not found. Skipping archive data load."
echo "Hint: Run 'python mno_data_source_simulator/generate_archive.py' to generate archive data."
exit 0
fi

# Load metadata first (required for foreign key references)
# Load metadata first (required for foreign key references).
# Use a temp table + INSERT ON CONFLICT DO NOTHING so that metadata already
# inserted by the parser (from a real-time upload) doesn't abort this script.
echo "Loading metadata archive..."
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
\COPY cml_metadata FROM PROGRAM 'gunzip -c $ARCHIVE_DATA_DIR/metadata_archive.csv.gz' WITH (FORMAT csv, HEADER true);
psql $PSQL_FLAGS <<-EOSQL
CREATE TEMP TABLE tmp_cml_metadata (LIKE cml_metadata INCLUDING ALL);
\COPY tmp_cml_metadata FROM '$ARCHIVE_DATA_DIR/metadata_archive.csv' WITH (FORMAT csv, HEADER true);
INSERT INTO cml_metadata SELECT * FROM tmp_cml_metadata ON CONFLICT DO NOTHING;
DROP TABLE tmp_cml_metadata;
EOSQL

METADATA_COUNT=$(psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" -t -c "SELECT COUNT(*) FROM cml_metadata;")
METADATA_COUNT=$(psql $PSQL_FLAGS -t -c "SELECT COUNT(*) FROM cml_metadata;")
echo "Loaded $METADATA_COUNT metadata records"

# Load time-series data using COPY for maximum speed
echo "Loading time-series archive data (this may take 10-30 seconds)..."
START_TIME=$(date +%s)

psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
\COPY cml_data (time, cml_id, sublink_id, tsl, rsl) FROM PROGRAM 'gunzip -c $ARCHIVE_DATA_DIR/data_archive.csv.gz' WITH (FORMAT csv, HEADER true);
psql $PSQL_FLAGS <<-EOSQL
\COPY cml_data (time, cml_id, sublink_id, tsl, rsl) FROM '$ARCHIVE_DATA_DIR/data_archive.csv' WITH (FORMAT csv, HEADER true);
EOSQL

END_TIME=$(date +%s)
DURATION=$((END_TIME - START_TIME))

DATA_COUNT=$(psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" -t -c "SELECT COUNT(*) FROM cml_data;")
DATA_COUNT=$(psql $PSQL_FLAGS -t -c "SELECT COUNT(*) FROM cml_data;")
echo "Loaded $DATA_COUNT data records in $DURATION seconds"

# Build index after bulk load (much faster than maintaining it during COPY)
echo "Building index on cml_data..."
INDEX_START=$(date +%s)
psql $PSQL_FLAGS <<-EOSQL
CREATE INDEX IF NOT EXISTS idx_cml_data_cml_id ON cml_data (cml_id, time DESC);
EOSQL
INDEX_END=$(date +%s)
echo "Index built in $((INDEX_END - INDEX_START)) seconds"

# Display time range of loaded data
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
psql $PSQL_FLAGS <<-EOSQL
SELECT
'Archive data time range:' as info,
MIN(time) as start_time,
Expand All @@ -49,13 +82,5 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-E
FROM cml_data;
EOSQL

# Populate cml_stats for all loaded CMLs
echo "Populating CML statistics..."
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
SELECT update_cml_stats(cml_id::text) FROM (SELECT DISTINCT cml_id FROM cml_metadata) t;
EOSQL

STATS_COUNT=$(psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" -t -c "SELECT COUNT(*) FROM cml_stats;")
echo "Generated statistics for $STATS_COUNT CMLs"

echo "Archive data successfully loaded!"
# Note: cml_stats is populated by the parser's background stats thread on startup.
42 changes: 38 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
services:
archive_generator:
build: ./mno_data_source_simulator
command: python generate_archive.py --days ${ARCHIVE_DAYS:-30} --interval-seconds ${ARCHIVE_INTERVAL_SECONDS:-300}
environment:
- ARCHIVE_OUTPUT_DIR=/archive_output
- NETCDF_FILE=/app/example_data/openMRG_cmls_20150827_12hours.nc
volumes:
- ./parser/example_data:/app/example_data:ro
- archive_data:/archive_output

mno_simulator:
build: ./mno_data_source_simulator
depends_on:
Expand All @@ -22,7 +32,8 @@ services:
ports:
- "5004:5004"
depends_on:
- database
database:
condition: service_healthy
environment:
- DATABASE_URL=postgresql://myuser:mypassword@database:5432/mydatabase
- PARSER_INCOMING_DIR=/app/data/incoming
Expand All @@ -36,13 +47,35 @@ services:
- parser_archived:/app/data/archived
- parser_quarantine:/app/data/quarantine

archive_loader:
image: timescale/timescaledb:latest-pg13
depends_on:
archive_generator:
condition: service_completed_successfully
database:
condition: service_healthy
environment:
- PGHOST=database
- PGUSER=myuser
- PGPASSWORD=mypassword
- PGDATABASE=mydatabase
- ARCHIVE_DATA_DIR=/archive_data
volumes:
- archive_data:/archive_data:ro
- ./database/init_archive_data.sh:/init_archive_data.sh:ro
command: ["/bin/bash", "/init_archive_data.sh"]
restart: "no"

database:
build: ./database
ports:
- "5432:5432"
volumes:
- ./database/archive_data:/docker-entrypoint-initdb.d/archive_data:ro
- ./database/init_archive_data.sh:/docker-entrypoint-initdb.d/99-load-archive.sh:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -U myuser -d mydatabase"]
interval: 5s
timeout: 5s
retries: 10
start_period: 10s

processor:
build: ./processor
Expand Down Expand Up @@ -143,6 +176,7 @@ services:
- testing

volumes:
archive_data:
sftp_uploads:
webserver_data_staged:
webserver_data_archived:
Expand Down
1 change: 1 addition & 0 deletions mno_data_source_simulator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY data_generator.py .
COPY sftp_uploader.py .
COPY main.py .
COPY generate_archive.py .
COPY config.yml .

# Create directory for example data
Expand Down
6 changes: 3 additions & 3 deletions mno_data_source_simulator/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ sftp:
known_hosts_path: "/app/ssh_keys/known_hosts"
remote_path: "/uploads"
# Upload frequency in seconds
upload_frequency_seconds: 30
upload_frequency_seconds: 10
# Connection timeout in seconds
connection_timeout: 30

# Data generation configuration
generator:
# How often to generate new data points (in seconds)
generation_frequency_seconds: 30
generation_frequency_seconds: 10
# Number of timestamps to include in each generated file
# (timestamps will be spaced by time_resolution_seconds)
timestamps_per_file: 3
timestamps_per_file: 1
# Time resolution between timestamps within a file (in seconds)
time_resolution_seconds: 10
# Directory where generated CSV files will be written
Expand Down
Loading