Conversation
0f7e6d3 to
3512bdb
Compare
3502eec to
8b6fb9a
Compare
3805921 to
ba59ec7
Compare
38dc3d6 to
b0189da
Compare
ba59ec7 to
e5350bd
Compare
e5350bd to
3bc4bc5
Compare
29b6816 to
b720aa8
Compare
Docker bind mounts on Linux preserve container UIDs, causing permission failures during builds, runtime writes, and cleanup. Switch postgres to a named volume, add Docker-based cleanup for container-owned files, and fix host.docker.internal resolution for replication.
| logger = logs.get_logger() | ||
| HEARTBEAT_INTERVAL = 300 | ||
| HEARTBEAT_INTERVAL = 600 # seconds | ||
| FAILURE_RETRY_SECONDS = 43200 # seconds |
There was a problem hiding this comment.
That's a really long time in seconds... Do we really want to wait this long?
There was a problem hiding this comment.
Probably a poorly chosen balance when I was trying to reduce log clutter from repeated retries. Maybe 2 hours is better than 12? I can improve the readability here too.
| update_filepath(db_address, cal.id, dest_dir) | ||
| logger.info(f"Downloaded {cal.filename}") | ||
| logger.info(f"Cached {cal.filename}") | ||
| return True |
There was a problem hiding this comment.
I don't generally like returning True unless it is a boolean check, e.g., is this object x type? I tend to raise an exception if something didn't work. This is kind of a stylistic thing, but it's a place to make the code more readable for future us.
There was a problem hiding this comment.
This function takes a file that needs to be cached and results in three possible outcomes: downloaded, skipped, or failed. The return value is used by the caller to keep track of how many files were downloaded vs how many were already cached. So I still think this is the cleanest solution, but I'm open to alternatives if you have one in mind.
There was a problem hiding this comment.
Pull request overview
This PR introduces “smart stacking” scaffolding for site deployments (DB tracking + subframe reduction + Redis-based notifications + per-camera stacking workers), while also refactoring local/site Docker Compose to treat Redis/RabbitMQ as external dependencies and polishing site deployment/E2E workflows.
Changes:
- Add site-only DB tables (
StackFrameviaSiteBase) and smart-stacking worker/supervisor plumbing. - Externalize Redis/RabbitMQ into
docker-compose-dependencies.ymland switch workers toREDIS_URL/RABBITMQ_URL. - Expand/adjust unit + site E2E coverage and update site/local env templates and documentation accordingly.
Reviewed changes
Copilot reviewed 30 out of 32 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Dependency lock revision bump. |
| site-banzai-env.default | Site env defaults updated (absolute paths, broker URLs, smart stacking knobs). |
| local-banzai-env.default | Local env defaults updated to use broker URLs and queue names. |
| scripts/queue_images.py | Queueing script changed to publish absolute host paths. |
| README.rst | Docs updated for new dependencies compose flow and env vars. |
| pytest.ini | Adds smart stacking markers and updates recursion excludes. |
| pyproject.toml | Adds new console entry points for stacking/subframe components. |
| docker-compose-site.yml | Site compose updated for external brokers, absolute-path mounts, new stacking services, and e2e isolation knobs. |
| docker-compose-local.yml | Local compose updated for external brokers and configurable queue/worker naming. |
| docker-compose-dependencies.yml | New compose file for Redis + RabbitMQ dependencies. |
| banzai/utils/stage_utils.py | run_pipeline_stages now returns the processed images list. |
| banzai/utils/fits_utils.py | Adds optional attempt logging to download_from_s3. |
| banzai/stacking.py | New stacking worker loop, timeout handling, and supervisor process manager. |
| banzai/settings.py | Adds env-derived settings for subframe/stacking queues and broker URLs. |
| banzai/scheduling.py | Adds process_subframe Celery task and wiring for DB + Redis notifications. |
| banzai/main.py | Adds SubframeListener + CLI entrypoint to consume stack-queue messages and dispatch Celery tasks. |
| banzai/dbs.py | Introduces SiteBase, StackFrame, site-aware sessions, and stack frame CRUD/upsert helpers. |
| banzai/celeryconfig.py | Removes hardcoded broker_url from Celery config module. |
| banzai/cache/replication.py | Subscription naming/slot reuse improvements for replication setup. |
| banzai/cache/init.py | Ensures site DB schema creation and updated replication setup call signature. |
| banzai/cache/download_worker.py | Adds argparse CLI, improved logging/heartbeat, NULL-frameid handling, reconciliation and backoff logic. |
| banzai/tests/test_smart_stacking.py | New unit tests covering stacking DB ops, notifications, listener, task, supervisor, and resilience. |
| banzai/tests/test_replication.py | Updates replication tests for new subscription/slot behavior. |
| banzai/tests/test_download_worker.py | Updates tests for new download worker behavior and CLI parsing. |
| banzai/tests/test_dbs.py | Adds tests for site-table creation and site-aware session enforcement. |
| banzai/tests/test_cache_init.py | Updates cache-init expectations for site schema and replication call changes. |
| banzai/tests/site_e2e/test_site_e2e.py | Expands site E2E coverage (cached cal usage, drift reconciliation, stacking path, slot reuse). |
| banzai/tests/site_e2e/conftest.py | Improves e2e isolation/scoping, absolute path enforcement, compose helpers, and cleanup. |
| banzai/tests/site_e2e/site_e2e.env.template | Adds isolation knobs + absolute host path requirements + broker URLs. |
| banzai/tests/site_e2e/README.md | Updates e2e docs for isolation, absolute paths, and log watching. |
| .gitignore | Ignores local-banzai-env. |
| .dockerignore | Ignores data/ directory in Docker build context. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Queue each file — use absolute host paths (containers mount host dirs at the same path) | ||
| for filepath in sorted(fits_files): | ||
| abs_path = os.path.abspath(filepath) | ||
| with fits.open(filepath) as hdul: | ||
| header = hdul['SCI'].header | ||
| siteid = header.get('SITEID', '').strip() | ||
| instrume = header.get('INSTRUME', '').strip() | ||
|
|
||
| if siteid and instrume: | ||
| # Use container path instead of host path | ||
| container_path = f'{args.container_path}/{os.path.basename(filepath)}' | ||
|
|
||
| post_to_archive_queue( | ||
| filename=os.path.basename(filepath), | ||
| broker_url=args.broker_url, | ||
| exchange_name=args.exchange, | ||
| path=container_path, | ||
| path=abs_path, | ||
| SITEID=siteid, |
| def on_message(self, body, message): | ||
| """Validate and dispatch to Celery for processing.""" | ||
| if isinstance(body, str): | ||
| try: | ||
| body = json.loads(body) | ||
| except json.JSONDecodeError as e: | ||
| logger.error('Malformed JSON payload, discarding message', extra_tags={'error': str(e)}) | ||
| message.ack() | ||
| return | ||
| if not validate_message(body): | ||
| logger.error('Invalid message received, missing required fields') | ||
| message.ack() | ||
| return |
| 'dateobs': stmt.excluded.dateobs, | ||
| 'filepath': stmt.excluded.filepath, | ||
| 'status': 'active', | ||
| 'completed_at': None, |
| REDIS_URL=os.getenv('REDIS_URL', 'redis://host.docker.internal:6379/0') | ||
| RABBITMQ_URL=os.getenv('RABBITMQ_URL', 'amqp://host.docker.internal:5672') |
| time.sleep(30) | ||
|
|
||
|
|
||
| def create_parser(): |
There was a problem hiding this comment.
I'm on the fence about whether I like this logic being a function. If it's reused anywhere, it should definitely be a function. It does help the readability of the other function a little at the cost of splitting the logic into multiple places. Maybe the function just needs a more descriptive name to signal to the reader that this is specific to this module and not some generic library function.
| try: | ||
| logger.info("Creating database schema...") | ||
| dbs.create_db(args.db_address) | ||
| dbs.create_db(args.db_address, site=True) |
There was a problem hiding this comment.
I'm not sure I like this variable being just "site" here. We use site a lot of places and it is a 3 letter site code generally. Maybe at_site? Or site_deploy?
| # HOST_*_DIR paths must be absolute so that host-path:host-path volume | ||
| # mounts in docker-compose-site.yml resolve correctly (mirrors the | ||
| # production site-banzai-env contract). | ||
| for _key in ('HOST_RAW_DIR', 'HOST_CALS_DIR', 'HOST_REDUCED_DIR'): |
There was a problem hiding this comment.
Is there any way we can add sensible defaults for these values so that it doesn't just crash out if they are not set?
| LOCAL_DB_ADDRESS = os.environ.get( | ||
| "LOCAL_DB_ADDRESS", | ||
| "postgresql+psycopg://banzai@localhost:5442/banzai_local" | ||
| f"postgresql+psycopg://banzai@localhost:{os.environ.get('SITE_PG_HOST_PORT', '5443')}/banzai_local" |
There was a problem hiding this comment.
Do we want to make the banzai site db hostname configurable?
There was a problem hiding this comment.
It's currently configurable by setting the value of LCOAL_DB_ADDRESS; localhost is just part of the default value. So I think this is fine unless I'm not understanding your question.
|
|
||
|
|
||
| def run_docker_compose(compose_file, *args, cwd=None, env=None): | ||
| def publish_to_queue(queue_name, body, broker_url='amqp://localhost:5672'): |
There was a problem hiding this comment.
What's the difference between this code and the code we use to post to the fits exchange? I guess maybe the fits exchange? This is not an important thing if it is different enough, but just smelled like code duplication.
| shutil.rmtree(path, ignore_errors=True) | ||
|
|
||
|
|
||
| def run_docker_compose(compose_file, *args, cwd=None, env=None, project=None): |
There was a problem hiding this comment.
This may just be me, but I'm really nervous about python calling docker when this python code will likely be running in a docker container.
| - `docker compose down` from the e2e fixture is project-scoped — it cannot | ||
| tear down a docker-compose-site stack (project `banzai`). | ||
|
|
||
| The only manual collision risk is the host data directories (`HOST_RAW_DIR`, |
There was a problem hiding this comment.
This is another place sensible defaults could help.
| DOWNLOAD_WORKER_POLL_INTERVAL=1 | ||
|
|
||
| # Message brokers | ||
| REDIS_URL=redis://host.docker.internal:6379/0 |
There was a problem hiding this comment.
I need to go look at the docker compose more carefully. You might want to actually create a docker network here that isn't just the default. The way docker does its DNS is different between Mac and linux so it can bite you.
| result = subprocess.run( | ||
| [ | ||
| 'uv', 'run', 'python', str(queue_script), | ||
| sys.executable, str(queue_script), |
There was a problem hiding this comment.
The subprocesses are getting nested deeply here. Maybe we should think a little more about this deploy.
| f"worker did not reconcile {target} back to {expected} after drift to {drifted_value!r}" | ||
| ) | ||
|
|
||
| @pytest.mark.e2e_site_cache |
There was a problem hiding this comment.
I don't remember if there is, but is there a way to set a pytest run to skip by default? Running the unit tests not has -m "not e2e" -m "not e2e_site_cache" which is starting to get onerous on a newbie user just trying to spin up the project.
| # Timeout | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| class TestTimeout: |
There was a problem hiding this comment.
I would check the naming conventions in pytest. Something generic like this may be a little hard to find if it doesn't print the module name right next to it.
| tries = Column(Integer, default=0) | ||
|
|
||
|
|
||
| class StackFrame(SiteBase): |
There was a problem hiding this comment.
I'm not sure I like the name of the table or class. frames_to_stack might be epsilon better.
| logger.info('Shutting down pipeline listener.') | ||
|
|
||
|
|
||
| class SubframeListener(ConsumerMixin): |
There was a problem hiding this comment.
This is probably a nit, but is there a way to refactor things to reuse more of the regular listener. I'm just trying to avoid code duplication where possible.
| dateobs = parse_date_obs(dateobs_str) if dateobs_str else None | ||
|
|
||
| # Phase 1: Insert DB record before reduction so stacking worker can see it | ||
| insert_stack_frame( |
There was a problem hiding this comment.
Question about the stacking infrastructure. Is the stacking worker checking a database or listening to a queue on say rabbitmq? It seems like a listener would be a better pattern as polling a db for changes is error prone.
| redis_client.rename(key, drain_key) | ||
| except redis_lib.exceptions.ResponseError: | ||
| return set() | ||
| raw = redis_client.lrange(drain_key, 0, -1) |
There was a problem hiding this comment.
This scares me. Can you say why we are having to drain queues manually?
| def finalize_stack(db_address, moluid, status='complete'): | ||
| """Mark stack complete and log mock stacking/JPEG/ingester operations.""" | ||
| dbs.mark_stack_complete(db_address, moluid, status=status) | ||
| logger.info(f'Mock stacking complete for {moluid}', extra_tags={'moluid': moluid}) |
There was a problem hiding this comment.
I understand these are mock warnings, but it opens a general point. Info messages should be logs we want to store in something like opensearch that can be used later. The temptation is to leave in debug messages when writing the code. If we can lower those to logger.debug that's fine. banzai is already really noisy and I'm hesitant to add more noise.
| self.workers[camera] = proc | ||
| logger.info(f'Started stacking worker for camera {camera}') | ||
|
|
||
| def monitor(self, check_interval=10): |
There was a problem hiding this comment.
If feels strange to have the monitoring function and worker function in the same Python class. I'm struggling to visualize the whole infrastructure of processes. My sense is it kind of feels like overkill. Maybe we could a process flow diagram to the docs?
| self.workers.clear() | ||
|
|
||
|
|
||
| def create_parser(): |
There was a problem hiding this comment.
Another create_parser method. I'd try to use a little more descriptive name.
cmccully
left a comment
There was a problem hiding this comment.
I think this is a huge step forward. I've left some comments so more significant than others. Once those are addressed, I'm happy to approve.
Originally this PR was just splitting Redis/RabbitMQ out of the site/local
docker-compose files, but it ended up bundling several dependent changes
together rather than stacking PRs that kept overriding each other.
What's in here now:
Smart stacking scaffolding
StackFramemodel +SiteBasedeclarative base for site-only tables(
dbs.py)SubframeListener+process_subframeCelery task: validates incomingRabbitMQ messages, inserts a tracking row, runs the reduction pipeline,
updates the row with the reduced filepath, and pushes a Redis notification
per camera (
main.py,scheduling.py)stacking.py: per-camera worker loop drains Redis notifications, checksgroup completion (all reduced AND all expected frames present, or
is_last), and finalizes complete or timed-out stacks; supervisor spawnsone worker per camera
banzai-subframe-listener,banzai-subframe-worker,banzai-stacking-supervisorservices + matching env varstest_smart_stacking.pyand a new site-E2E phaseRedis / RabbitMQ as external dependencies
docker-compose-{site,local}.ymlintodocker-compose-dependencies.ymlso they can be left running acrosspipeline restarts (and pointed at existing site infrastructure)
REDIS_URL/RABBITMQ_URL; no more hardcodedredis://redis:6379/0Site deployment fixes & polish (from running this at a site)
triggers, NULL-
frameidhandling, per-frameid failure backoff, filepathreconciliation when replication overwrites with NULL, "waiting for
replication" log while
calimagesis empty during initializationSLOT_NAMEoverride,reuse existing slot when recreating subscription
created_at(notdateobs) for stack-timeout — handles NULLdateobsand reprocessing (note: stack timeout is being reworked a bit in the branch I'm currently working on)
host.docker.internalresolution), container-prefix support so thefixture stack and
just site-upcan coexist, absolute-HOST_*_DIRrequirement so file paths resolve identically inside and outside
containers
argparseentry points for site workers; removed unusedOPENTSDB_PYTHON_METRICS_TEST_MODEplumbing