Skip to content

Smartstack Scaffolding#444

Open
timbeccue wants to merge 34 commits intomainfrom
feature/separate-site-deps
Open

Smartstack Scaffolding#444
timbeccue wants to merge 34 commits intomainfrom
feature/separate-site-deps

Conversation

@timbeccue
Copy link
Copy Markdown
Contributor

@timbeccue timbeccue commented Mar 18, 2026

Originally this PR was just splitting Redis/RabbitMQ out of the site/local
docker-compose files, but it ended up bundling several dependent changes
together rather than stacking PRs that kept overriding each other.

What's in here now:

Smart stacking scaffolding

  • New StackFrame model + SiteBase declarative base for site-only tables
    (dbs.py)
  • SubframeListener + process_subframe Celery task: validates incoming
    RabbitMQ messages, inserts a tracking row, runs the reduction pipeline,
    updates the row with the reduced filepath, and pushes a Redis notification
    per camera (main.py, scheduling.py)
  • stacking.py: per-camera worker loop drains Redis notifications, checks
    group completion (all reduced AND all expected frames present, or
    is_last), and finalizes complete or timed-out stacks; supervisor spawns
    one worker per camera
  • New banzai-subframe-listener, banzai-subframe-worker,
    banzai-stacking-supervisor services + matching env vars
  • E2E coverage in test_smart_stacking.py and a new site-E2E phase

Redis / RabbitMQ as external dependencies

  • Pulled out of docker-compose-{site,local}.yml into
    docker-compose-dependencies.yml so they can be left running across
    pipeline restarts (and pointed at existing site infrastructure)
  • All workers connect via REDIS_URL / RABBITMQ_URL; no more hardcoded
    redis://redis:6379/0

Site deployment fixes & polish (from running this at a site)

  • Download worker: clearer status logging w/ heartbeat + state-change
    triggers, NULL-frameid handling, per-frameid failure backoff, filepath
    reconciliation when replication overwrites with NULL, "waiting for
    replication" log while calimages is empty during initialization
  • Replication: fixed subscription name, optional SLOT_NAME override,
    reuse existing slot when recreating subscription
  • Listener improvements: log and remove malformed JSON payloads from queue, top-level try/except around the stacking worker loop
  • created_at (not dateobs) for stack-timeout — handles NULL dateobs
    and reprocessing (note: stack timeout is being reworked a bit in the branch I'm currently working on)
  • Site E2E: Linux compatibility (named volume for postgres,
    host.docker.internal resolution), container-prefix support so the
    fixture stack and just site-up can coexist, absolute-HOST_*_DIR
    requirement so file paths resolve identically inside and outside
    containers
  • argparse entry points for site workers; removed unused
    OPENTSDB_PYTHON_METRICS_TEST_MODE plumbing

@timbeccue timbeccue linked an issue Mar 19, 2026 that may be closed by this pull request
@timbeccue timbeccue requested a review from cmccully March 19, 2026 12:19
@timbeccue timbeccue force-pushed the feature/smartstack-integration branch from 0f7e6d3 to 3512bdb Compare March 20, 2026 05:49
@timbeccue timbeccue force-pushed the feature/separate-site-deps branch 2 times, most recently from 3502eec to 8b6fb9a Compare March 23, 2026 18:38
@timbeccue timbeccue force-pushed the feature/smartstack-scaffold branch from 3805921 to ba59ec7 Compare March 27, 2026 01:32
@timbeccue timbeccue force-pushed the feature/separate-site-deps branch 2 times, most recently from 38dc3d6 to b0189da Compare March 27, 2026 05:19
@timbeccue timbeccue force-pushed the feature/smartstack-scaffold branch from ba59ec7 to e5350bd Compare March 27, 2026 05:19
@timbeccue timbeccue force-pushed the feature/smartstack-scaffold branch from e5350bd to 3bc4bc5 Compare April 6, 2026 21:01
@timbeccue timbeccue force-pushed the feature/separate-site-deps branch 2 times, most recently from 29b6816 to b720aa8 Compare April 9, 2026 22:02
@timbeccue timbeccue changed the title Separate Redis and RabbitMQ into standalone dependencies Smartstack Scaffolding May 6, 2026
@cmccully cmccully requested a review from Copilot May 8, 2026 13:41
logger = logs.get_logger()
HEARTBEAT_INTERVAL = 300
HEARTBEAT_INTERVAL = 600 # seconds
FAILURE_RETRY_SECONDS = 43200 # seconds
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really long time in seconds... Do we really want to wait this long?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a poorly chosen balance when I was trying to reduce log clutter from repeated retries. Maybe 2 hours is better than 12? I can improve the readability here too.

update_filepath(db_address, cal.id, dest_dir)
logger.info(f"Downloaded {cal.filename}")
logger.info(f"Cached {cal.filename}")
return True
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't generally like returning True unless it is a boolean check, e.g., is this object x type? I tend to raise an exception if something didn't work. This is kind of a stylistic thing, but it's a place to make the code more readable for future us.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function takes a file that needs to be cached and results in three possible outcomes: downloaded, skipped, or failed. The return value is used by the caller to keep track of how many files were downloaded vs how many were already cached. So I still think this is the cleanest solution, but I'm open to alternatives if you have one in mind.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces “smart stacking” scaffolding for site deployments (DB tracking + subframe reduction + Redis-based notifications + per-camera stacking workers), while also refactoring local/site Docker Compose to treat Redis/RabbitMQ as external dependencies and polishing site deployment/E2E workflows.

Changes:

  • Add site-only DB tables (StackFrame via SiteBase) and smart-stacking worker/supervisor plumbing.
  • Externalize Redis/RabbitMQ into docker-compose-dependencies.yml and switch workers to REDIS_URL / RABBITMQ_URL.
  • Expand/adjust unit + site E2E coverage and update site/local env templates and documentation accordingly.

Reviewed changes

Copilot reviewed 30 out of 32 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
uv.lock Dependency lock revision bump.
site-banzai-env.default Site env defaults updated (absolute paths, broker URLs, smart stacking knobs).
local-banzai-env.default Local env defaults updated to use broker URLs and queue names.
scripts/queue_images.py Queueing script changed to publish absolute host paths.
README.rst Docs updated for new dependencies compose flow and env vars.
pytest.ini Adds smart stacking markers and updates recursion excludes.
pyproject.toml Adds new console entry points for stacking/subframe components.
docker-compose-site.yml Site compose updated for external brokers, absolute-path mounts, new stacking services, and e2e isolation knobs.
docker-compose-local.yml Local compose updated for external brokers and configurable queue/worker naming.
docker-compose-dependencies.yml New compose file for Redis + RabbitMQ dependencies.
banzai/utils/stage_utils.py run_pipeline_stages now returns the processed images list.
banzai/utils/fits_utils.py Adds optional attempt logging to download_from_s3.
banzai/stacking.py New stacking worker loop, timeout handling, and supervisor process manager.
banzai/settings.py Adds env-derived settings for subframe/stacking queues and broker URLs.
banzai/scheduling.py Adds process_subframe Celery task and wiring for DB + Redis notifications.
banzai/main.py Adds SubframeListener + CLI entrypoint to consume stack-queue messages and dispatch Celery tasks.
banzai/dbs.py Introduces SiteBase, StackFrame, site-aware sessions, and stack frame CRUD/upsert helpers.
banzai/celeryconfig.py Removes hardcoded broker_url from Celery config module.
banzai/cache/replication.py Subscription naming/slot reuse improvements for replication setup.
banzai/cache/init.py Ensures site DB schema creation and updated replication setup call signature.
banzai/cache/download_worker.py Adds argparse CLI, improved logging/heartbeat, NULL-frameid handling, reconciliation and backoff logic.
banzai/tests/test_smart_stacking.py New unit tests covering stacking DB ops, notifications, listener, task, supervisor, and resilience.
banzai/tests/test_replication.py Updates replication tests for new subscription/slot behavior.
banzai/tests/test_download_worker.py Updates tests for new download worker behavior and CLI parsing.
banzai/tests/test_dbs.py Adds tests for site-table creation and site-aware session enforcement.
banzai/tests/test_cache_init.py Updates cache-init expectations for site schema and replication call changes.
banzai/tests/site_e2e/test_site_e2e.py Expands site E2E coverage (cached cal usage, drift reconciliation, stacking path, slot reuse).
banzai/tests/site_e2e/conftest.py Improves e2e isolation/scoping, absolute path enforcement, compose helpers, and cleanup.
banzai/tests/site_e2e/site_e2e.env.template Adds isolation knobs + absolute host path requirements + broker URLs.
banzai/tests/site_e2e/README.md Updates e2e docs for isolation, absolute paths, and log watching.
.gitignore Ignores local-banzai-env.
.dockerignore Ignores data/ directory in Docker build context.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread scripts/queue_images.py
Comment on lines +34 to 48
# Queue each file — use absolute host paths (containers mount host dirs at the same path)
for filepath in sorted(fits_files):
abs_path = os.path.abspath(filepath)
with fits.open(filepath) as hdul:
header = hdul['SCI'].header
siteid = header.get('SITEID', '').strip()
instrume = header.get('INSTRUME', '').strip()

if siteid and instrume:
# Use container path instead of host path
container_path = f'{args.container_path}/{os.path.basename(filepath)}'

post_to_archive_queue(
filename=os.path.basename(filepath),
broker_url=args.broker_url,
exchange_name=args.exchange,
path=container_path,
path=abs_path,
SITEID=siteid,
Comment thread banzai/main.py
Comment on lines +249 to +261
def on_message(self, body, message):
"""Validate and dispatch to Celery for processing."""
if isinstance(body, str):
try:
body = json.loads(body)
except json.JSONDecodeError as e:
logger.error('Malformed JSON payload, discarding message', extra_tags={'error': str(e)})
message.ack()
return
if not validate_message(body):
logger.error('Invalid message received, missing required fields')
message.ack()
return
Comment thread banzai/dbs.py
'dateobs': stmt.excluded.dateobs,
'filepath': stmt.excluded.filepath,
'status': 'active',
'completed_at': None,
Comment thread banzai/settings.py
Comment on lines +170 to +171
REDIS_URL=os.getenv('REDIS_URL', 'redis://host.docker.internal:6379/0')
RABBITMQ_URL=os.getenv('RABBITMQ_URL', 'amqp://host.docker.internal:5672')
time.sleep(30)


def create_parser():
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence about whether I like this logic being a function. If it's reused anywhere, it should definitely be a function. It does help the readability of the other function a little at the cost of splitting the logic into multiple places. Maybe the function just needs a more descriptive name to signal to the reader that this is specific to this module and not some generic library function.

Comment thread banzai/cache/init.py
try:
logger.info("Creating database schema...")
dbs.create_db(args.db_address)
dbs.create_db(args.db_address, site=True)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I like this variable being just "site" here. We use site a lot of places and it is a 3 letter site code generally. Maybe at_site? Or site_deploy?

Comment thread banzai/cache/replication.py
# HOST_*_DIR paths must be absolute so that host-path:host-path volume
# mounts in docker-compose-site.yml resolve correctly (mirrors the
# production site-banzai-env contract).
for _key in ('HOST_RAW_DIR', 'HOST_CALS_DIR', 'HOST_REDUCED_DIR'):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way we can add sensible defaults for these values so that it doesn't just crash out if they are not set?

LOCAL_DB_ADDRESS = os.environ.get(
"LOCAL_DB_ADDRESS",
"postgresql+psycopg://banzai@localhost:5442/banzai_local"
f"postgresql+psycopg://banzai@localhost:{os.environ.get('SITE_PG_HOST_PORT', '5443')}/banzai_local"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to make the banzai site db hostname configurable?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's currently configurable by setting the value of LCOAL_DB_ADDRESS; localhost is just part of the default value. So I think this is fine unless I'm not understanding your question.



def run_docker_compose(compose_file, *args, cwd=None, env=None):
def publish_to_queue(queue_name, body, broker_url='amqp://localhost:5672'):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this code and the code we use to post to the fits exchange? I guess maybe the fits exchange? This is not an important thing if it is different enough, but just smelled like code duplication.

shutil.rmtree(path, ignore_errors=True)


def run_docker_compose(compose_file, *args, cwd=None, env=None, project=None):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may just be me, but I'm really nervous about python calling docker when this python code will likely be running in a docker container.

Comment thread banzai/tests/site_e2e/README.md
- `docker compose down` from the e2e fixture is project-scoped — it cannot
tear down a docker-compose-site stack (project `banzai`).

The only manual collision risk is the host data directories (`HOST_RAW_DIR`,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another place sensible defaults could help.

DOWNLOAD_WORKER_POLL_INTERVAL=1

# Message brokers
REDIS_URL=redis://host.docker.internal:6379/0
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to go look at the docker compose more carefully. You might want to actually create a docker network here that isn't just the default. The way docker does its DNS is different between Mac and linux so it can bite you.

result = subprocess.run(
[
'uv', 'run', 'python', str(queue_script),
sys.executable, str(queue_script),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subprocesses are getting nested deeply here. Maybe we should think a little more about this deploy.

f"worker did not reconcile {target} back to {expected} after drift to {drifted_value!r}"
)

@pytest.mark.e2e_site_cache
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember if there is, but is there a way to set a pytest run to skip by default? Running the unit tests not has -m "not e2e" -m "not e2e_site_cache" which is starting to get onerous on a newbie user just trying to spin up the project.

# Timeout
# ---------------------------------------------------------------------------

class TestTimeout:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would check the naming conventions in pytest. Something generic like this may be a little hard to find if it doesn't print the module name right next to it.

Comment thread banzai/dbs.py
tries = Column(Integer, default=0)


class StackFrame(SiteBase):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I like the name of the table or class. frames_to_stack might be epsilon better.

Comment thread banzai/main.py
logger.info('Shutting down pipeline listener.')


class SubframeListener(ConsumerMixin):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a nit, but is there a way to refactor things to reuse more of the regular listener. I'm just trying to avoid code duplication where possible.

Comment thread banzai/scheduling.py
dateobs = parse_date_obs(dateobs_str) if dateobs_str else None

# Phase 1: Insert DB record before reduction so stacking worker can see it
insert_stack_frame(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question about the stacking infrastructure. Is the stacking worker checking a database or listening to a queue on say rabbitmq? It seems like a listener would be a better pattern as polling a db for changes is error prone.

Comment thread banzai/stacking.py
redis_client.rename(key, drain_key)
except redis_lib.exceptions.ResponseError:
return set()
raw = redis_client.lrange(drain_key, 0, -1)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scares me. Can you say why we are having to drain queues manually?

Comment thread banzai/stacking.py
def finalize_stack(db_address, moluid, status='complete'):
"""Mark stack complete and log mock stacking/JPEG/ingester operations."""
dbs.mark_stack_complete(db_address, moluid, status=status)
logger.info(f'Mock stacking complete for {moluid}', extra_tags={'moluid': moluid})
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand these are mock warnings, but it opens a general point. Info messages should be logs we want to store in something like opensearch that can be used later. The temptation is to leave in debug messages when writing the code. If we can lower those to logger.debug that's fine. banzai is already really noisy and I'm hesitant to add more noise.

Comment thread banzai/stacking.py
self.workers[camera] = proc
logger.info(f'Started stacking worker for camera {camera}')

def monitor(self, check_interval=10):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If feels strange to have the monitoring function and worker function in the same Python class. I'm struggling to visualize the whole infrastructure of processes. My sense is it kind of feels like overkill. Maybe we could a process flow diagram to the docs?

Comment thread banzai/stacking.py
self.workers.clear()


def create_parser():
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another create_parser method. I'd try to use a little more descriptive name.

Copy link
Copy Markdown
Collaborator

@cmccully cmccully left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a huge step forward. I've left some comments so more significant than others. Once those are addressed, I'm happy to approve.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Separate redis and rabbitmq from site docker-compose

3 participants