Skip to content

Added daily requeuing of missing frames.#461

Open
cmccully wants to merge 1 commit intomainfrom
requeue-missing
Open

Added daily requeuing of missing frames.#461
cmccully wants to merge 1 commit intomainfrom
requeue-missing

Conversation

@cmccully
Copy link
Copy Markdown
Collaborator

@cmccully cmccully commented May 8, 2026

This PR adds a cron to requeue missing frames using the same cron container as the calibration scheduler. The query only relies on the archive api.

I also refactored some of the archive querying for better reuse of the retry logic and less code duplication.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a Celery beat (cron) job to detect and requeue “missing” frames by querying the archive API, and refactors archive querying / queue routing to reduce duplication.

Changes:

  • Introduces a new celery.requeue_missing_frames periodic task and wires it into the existing cron container/entrypoint.
  • Adds a shared banzai.query module with retrying archive GET + archive frame pagination helpers.
  • Refactors queue selection into get_processing_queue() and reuses archive query helper in FITS downloading / BPM ingestion.

Reviewed changes

Copilot reviewed 10 out of 11 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
uv.lock Bumps lco-banzai to 1.36.0 and updates lock metadata.
pyproject.toml Updates version and switches console script to new cron entrypoint.
helm-chart/banzai/templates/listener.yaml Updates the cron container command to banzai_cron.
CHANGES.md Adds 1.36.0 changelog entry for daily requeueing.
banzai/utils/observation_utils.py Adds tenacity retry to calibration-block archive query.
banzai/utils/instrument_utils.py Adds shared get_processing_queue() helper.
banzai/utils/fits_utils.py Replaces direct requests.get with archive_get helper for downloads.
banzai/settings.py Adds requeue cron configuration knobs.
banzai/scheduling.py Adds the requeue_missing_frames Celery task and archive querying usage.
banzai/query.py New module for archive querying helpers and cross-matching.
banzai/main.py Adds new cron entrypoint scheduling and reuses archive_get / get_processing_queue.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread banzai/settings.py

REFERENCE_CATALOG_URL = os.getenv('REFERENCE_CATALOG_URL', 'http://phot-catalog.lco.gtn/')

REQUEUE_MISSING_FRAMES_TIME = datetime.datetime(0, 0, 0, hour=14, minute=30)
Comment thread banzai/query.py
@@ -0,0 +1,69 @@
from tenacity import retry, wait_exponential, stop_after_attempt
import requests
from banzai.utils.logs import get_logger
Comment thread banzai/query.py
logger.error(message,
extra_tags={'attempt_number': archive_get.statistics['attempt_number']}
)
response.raise_for_status()
Comment thread banzai/query.py


def frames_from_archive(start, end, obstype, reduction_level, runtime_context, raw=False, related_frames=False):
archive_params = {'OBSTYPE': obstype, 'reduction_level': reduction_level, 'related_frames': related_frames}
Comment thread banzai/scheduling.py
Comment on lines +266 to +274
start = datetime.datetime.now(timezone.utc) - datetime.timedelta(hours=runtime_context.REQUEUE_LOOKBACK_HOURS)
end = datetime.datetime.now(timezone.utc)
raw_frames += query.from_archive(start, end, obstype, 0, runtime_context, related_frames=False)
# Get the reduced frames
reduced_frames += query.from_archive(start, end, obstype, runtime_context.reduction_level, runtime_context, related_frames=True)

# cross match to find any that are missing
missing_frames = cross_match_missing_frames(raw_frames, reduced_frames)

Comment thread banzai/scheduling.py
Comment on lines +274 to +275

with dbs.get_session(os.environ['DB_ADDRESS']) as db_session:
Comment thread banzai/main.py
archive_auth_header = settings.ARCHIVE_AUTH_HEADER
response = requests.get(url, headers=archive_auth_header)
response.raise_for_status()
response = archive_get(url, params={}, headers=archive_auth_header)
Comment on lines 86 to 96
if is_raw_frame:
url = f'{context.RAW_DATA_FRAME_URL}/{frame_id}/?include_related_frames=false'
archive_auth_header = context.RAW_DATA_AUTH_HEADER
else:
url = f'{context.ARCHIVE_FRAME_URL}/{frame_id}/?include_related_frames=false'
archive_auth_header = context.ARCHIVE_AUTH_HEADER
logger.info(f"Requesting archive URL {url} (auth header present: {bool(archive_auth_header)})")

try:
response = requests.get(url, headers=archive_auth_header, timeout=30)
response.raise_for_status()
except requests.exceptions.HTTPError:
message = 'Error downloading file from archive.'
if int(response.status_code) == 429:
message += ' Rate limited.'
logger.error(
message,
extra_tags={
'filename': file_info.get('filename'),
'attempt_number': download_from_s3.statistics['attempt_number']
}
)
raise
except requests.exceptions.RequestException as e:
message = "Archive download connection error."
logger.error(
f"{message} {e}",
extra_tags={
'filename': file_info.get('filename'),
'attempt_number': download_from_s3.statistics['attempt_number']
}
)
raise
response = archive_get(url, params={'related_frames': False},
auth_headers=archive_auth_header)

Comment on lines +106 to +107
response = archive_get(response_data['url'], params={},
auth_headers=archive_auth_header, timeout=60)
Comment thread banzai/main.py
Comment on lines +62 to +66
try:
queue_name = get_processing_queue(body, self.runtime_context)
except Exception:
message.ack()
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants