From 74024af28c4d01ef6974d11521e37beee82a0ab1 Mon Sep 17 00:00:00 2001 From: Ryan Co Date: Mon, 1 Jun 2026 16:08:17 -0400 Subject: [PATCH] add custom configs to specify tasks for viz --- egomimic/algo/hpt.py | 2 + egomimic/algo/pi.py | 72 ++++++++++++++- egomimic/eval/eval_train_viz.py | 3 +- egomimic/eval/eval_video.py | 87 +++++++++++++------ .../hydra_configs/data/mecka_pi_eval.yaml | 2 +- .../data/mecka_pi_fold_clothes_freeform.yaml | 3 + .../hydra_configs/evaluator/eval_hpt.yaml | 8 ++ egomimic/hydra_configs/evaluator/eval_pi.yaml | 12 ++- .../hydra_configs/evaluator/train_viz_pi.yaml | 6 ++ .../hydra/launcher/submitit_cpu_pace.yaml | 2 +- egomimic/pl_utils/pl_data_utils.py | 4 + 11 files changed, 166 insertions(+), 35 deletions(-) diff --git a/egomimic/algo/hpt.py b/egomimic/algo/hpt.py index 46f7150c2..466486246 100644 --- a/egomimic/algo/hpt.py +++ b/egomimic/algo/hpt.py @@ -1021,6 +1021,8 @@ def process_batch_for_training(self, batch): processed_batch[embodiment_id]["embodiment"] = torch.tensor( [embodiment_id], device=self.device, dtype=torch.int64 ) + if "task" in _batch: + processed_batch[embodiment_id]["task"] = _batch["task"] # TODO make this work with any fp type for key, value in processed_batch[embodiment_id].items(): if isinstance(value, torch.Tensor): diff --git a/egomimic/algo/pi.py b/egomimic/algo/pi.py index 0415ee478..21345c785 100644 --- a/egomimic/algo/pi.py +++ b/egomimic/algo/pi.py @@ -34,6 +34,71 @@ logger.propagate = True # Explicitly enable propagation (default, but ensures it works) +class _OpenPIPaligemmaPromptTokenizer: + """Small wrapper matching the Hugging Face tokenizer call used below.""" + + def __init__(self, max_length: int | None): + from openpi.models.tokenizer import PaligemmaTokenizer + + self.max_length = max_length + tokenizer = PaligemmaTokenizer(max_len=max_length or 128) + self._sentencepiece = tokenizer._tokenizer + + def __call__( + self, + prompts, + padding="max_length", + truncation=True, + max_length=None, + return_tensors=None, + ): + del return_tensors + max_len = max_length or self.max_length + encoded = [] + masks = [] + for prompt in prompts: + ids = self._sentencepiece.encode(str(prompt), add_bos=True) + if max_len is not None: + if truncation and len(ids) > max_len: + ids = ids[:max_len] + mask = [True] * len(ids) + if padding == "max_length" and len(ids) < max_len: + pad_len = max_len - len(ids) + ids = ids + [0] * pad_len + mask = mask + [False] * pad_len + else: + mask = [True] * len(ids) + encoded.append(ids) + masks.append(mask) + + if max_len is None and padding == "longest": + max_len = max(len(ids) for ids in encoded) + for i, ids in enumerate(encoded): + pad_len = max_len - len(ids) + if pad_len > 0: + encoded[i] = ids + [0] * pad_len + masks[i] = masks[i] + [False] * pad_len + + return { + "input_ids": torch.tensor(encoded, dtype=torch.long), + "attention_mask": torch.tensor(masks, dtype=torch.long), + } + + +def _load_prompt_tokenizer(tokenizer_model_name: str, tokenizer_max_length: int | None): + try: + return AutoTokenizer.from_pretrained(tokenizer_model_name) + except OSError: + if tokenizer_model_name != "google/paligemma-3b-mix-224": + raise + logger.warning( + "Falling back to bundled OpenPI PaliGemma tokenizer because " + "Hugging Face access to %s is unavailable.", + tokenizer_model_name, + ) + return _OpenPIPaligemmaPromptTokenizer(tokenizer_max_length) + + class PI(Algo): """ """ @@ -76,7 +141,9 @@ def __init__( self.norm_stats = norm_stats # ---- Prompt assembly + tokenization (was in collate_fn) ---- - self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_model_name) + self.tokenizer = _load_prompt_tokenizer( + tokenizer_model_name, tokenizer_max_length + ) self.tokenizer_max_length = tokenizer_max_length self.sampling_mode = sampling_mode self.annotation_key = annotation_key @@ -352,8 +419,7 @@ def process_batch_for_training(self, batch): processed_batch[embodiment_id]["embodiment"] = torch.tensor( [embodiment_id], device=self.device, dtype=torch.int64 ) - # Forward per-sample task tags (list[str], length B) so EvalVideo's - # one_video_per_task bucketing can read batch[emb]["task"]. + # Forward per-sample task tags so EvalVideo can bucket task videos. if "task" in _batch: processed_batch[embodiment_id]["task"] = _batch["task"] diff --git a/egomimic/eval/eval_train_viz.py b/egomimic/eval/eval_train_viz.py index e63cee828..73cc84aca 100644 --- a/egomimic/eval/eval_train_viz.py +++ b/egomimic/eval/eval_train_viz.py @@ -28,8 +28,9 @@ def __init__(self, base: EvalVideo, limit_val_batches: int = 50): limit_val_batches=limit_val_batches, viz_func=base.viz_func, transform_lists=base.transform_lists, - one_video_per_task=base.one_video_per_task, max_frames_per_task=base.max_frames_per_task, + tasks=base.tasks, + videos_per_task=base.videos_per_task, ) @property diff --git a/egomimic/eval/eval_video.py b/egomimic/eval/eval_video.py index 77e293ffe..153e41cc8 100644 --- a/egomimic/eval/eval_video.py +++ b/egomimic/eval/eval_video.py @@ -22,10 +22,13 @@ def __init__( limit_val_batches: int = 400, viz_func: dict = None, transform_lists: dict | None = None, - one_video_per_task: bool = False, max_frames_per_task: int | None = 1000, + tasks: list[str] | None = None, + videos_per_task: int = 1, ): super().__init__() + if videos_per_task < 1: + raise ValueError("videos_per_task must be >= 1") self.trainer = None self.model = None self.viz_func = viz_func @@ -33,13 +36,14 @@ def __init__( # the model's wrist-frame actions back into cam (head) frame. Reused for # both cam-frame MSE and the viz video so we don't transform twice. self.transform_lists = transform_lists or {} - # When True, key the buffer by (embodiment_id, task) and emit exactly - # one mp4 per (embodiment, task) at the end of validation. Used in - # eval-only mode when the valid filter spans multiple tasks. - self.one_video_per_task = one_video_per_task - # Cap each (embodiment, task) bucket at this many frames. Only takes - # effect when one_video_per_task=True. Set None to disable. + task_list = [str(task) for task in (tasks or [])] + self.tasks = task_list + self._task_filter = set(task_list) if task_list else None + self._write_task_videos = self._task_filter is not None + # Cap each task video at this many frames. With multiple videos per + # task, the buffered task limit is this value times videos_per_task. self.max_frames_per_task = max_frames_per_task + self.videos_per_task = int(videos_per_task) self.val_image_buffer = {} self.val_counter = {} self.override_dict = { @@ -81,12 +85,26 @@ def _sanitize_task(task: str) -> str: # Filesystem-safe: collapse whitespace and replace path separators. return re.sub(r"[^\w.-]+", "_", str(task)).strip("_") or "unknown" + def _task_frame_limit(self): + if self.max_frames_per_task is None: + return None + return self.max_frames_per_task * self.videos_per_task + + def _task_video_chunks(self, buffer): + if self.videos_per_task == 1: + return [buffer] + num_chunks = min(self.videos_per_task, len(buffer)) + chunk_size = (len(buffer) + num_chunks - 1) // num_chunks + return [ + buffer[start : start + chunk_size] + for start in range(0, len(buffer), chunk_size) + ][:num_chunks] + def on_validation_end(self): for key, buffer in self.val_image_buffer.items(): - if self.one_video_per_task: + if self._write_task_videos: embodiment_id, task = key emb_dir = str(get_embodiment(embodiment_id)) - filename = f"{self._sanitize_task(task)}.mp4" else: emb_dir = str(get_embodiment(key)) filename = f"validation_video_{self.val_counter[key]}.mp4" @@ -100,14 +118,31 @@ def on_validation_end(self): exist_ok=True, ) if len(buffer) != 0: - frames = torch.stack(buffer) - path = os.path.join( - self.video_dir(), - f"epoch_{self.trainer.current_epoch}", - emb_dir, - filename, - ) - tvio.write_video(path, frames, fps=30, video_codec="h264") + if self._write_task_videos: + task_name = self._sanitize_task(task) + for video_idx, chunk in enumerate(self._task_video_chunks(buffer)): + filename = ( + f"{task_name}.mp4" + if self.videos_per_task == 1 + else f"{task_name}_{video_idx}.mp4" + ) + frames = torch.stack(chunk) + path = os.path.join( + self.video_dir(), + f"epoch_{self.trainer.current_epoch}", + emb_dir, + filename, + ) + tvio.write_video(path, frames, fps=30, video_codec="h264") + else: + frames = torch.stack(buffer) + path = os.path.join( + self.video_dir(), + f"epoch_{self.trainer.current_epoch}", + emb_dir, + filename, + ) + tvio.write_video(path, frames, fps=30, video_codec="h264") self.val_counter[key] = 0 self.val_image_buffer[key] = [] @@ -133,27 +168,29 @@ def on_validation_step(self, batch, batch_idx, dataloader_idx=0): ) frames_tensor = torch.from_numpy(images) - if self.one_video_per_task: + if self._write_task_videos: tasks = batch[embodiment_id].get("task") if tasks is None: raise KeyError( - "one_video_per_task=True requires 'task' in each batch " - "sample. Confirm ZarrDataset.__getitem__ attaches it." + "Per-task video output requires 'task' in each batch sample. " + "Confirm ZarrDataset.__getitem__ attaches it." ) # Group sample indices by task so each bucket only takes one # extend call even when a batch straddles two tasks. per_task = defaultdict(list) for i, t in enumerate(tasks): - per_task[str(t)].append(i) + task = str(t) + if self._task_filter is not None and task not in self._task_filter: + continue + per_task[task].append(i) for task, idxs in per_task.items(): key = (embodiment_id, task) if key not in self.val_image_buffer: self.val_image_buffer[key] = [] self.val_counter[key] = 0 - if self.max_frames_per_task is not None: - remaining = self.max_frames_per_task - len( - self.val_image_buffer[key] - ) + frame_limit = self._task_frame_limit() + if frame_limit is not None: + remaining = frame_limit - len(self.val_image_buffer[key]) if remaining <= 0: continue idxs = idxs[:remaining] diff --git a/egomimic/hydra_configs/data/mecka_pi_eval.yaml b/egomimic/hydra_configs/data/mecka_pi_eval.yaml index 9ca0dbc44..389af593d 100644 --- a/egomimic/hydra_configs/data/mecka_pi_eval.yaml +++ b/egomimic/hydra_configs/data/mecka_pi_eval.yaml @@ -2,7 +2,7 @@ _target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper # Eval-only variant of mecka_pi: train side is unchanged (single task), but the # valid filter is broadened to all mecka tasks so eval mode can score across -# them. Use together with evaluator.one_video_per_task=true. +# them. Set data.viz_tasks or evaluator.tasks to emit per-task videos. train_datasets: mecka_bimanual: diff --git a/egomimic/hydra_configs/data/mecka_pi_fold_clothes_freeform.yaml b/egomimic/hydra_configs/data/mecka_pi_fold_clothes_freeform.yaml index e34d0a8e5..64787b857 100644 --- a/egomimic/hydra_configs/data/mecka_pi_fold_clothes_freeform.yaml +++ b/egomimic/hydra_configs/data/mecka_pi_fold_clothes_freeform.yaml @@ -1,5 +1,8 @@ _target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper +# Visualization task allowlist consumed by evaluator.tasks by default. +viz_tasks: [folding_clothes] + train_datasets: mecka_bimanual: _target_: egomimic.rldb.zarr.zarr_dataset_multi.MultiDataset._from_resolver diff --git a/egomimic/hydra_configs/evaluator/eval_hpt.yaml b/egomimic/hydra_configs/evaluator/eval_hpt.yaml index 901be3074..9caadd21f 100644 --- a/egomimic/hydra_configs/evaluator/eval_hpt.yaml +++ b/egomimic/hydra_configs/evaluator/eval_hpt.yaml @@ -3,3 +3,11 @@ defaults: - _self_ _target_: egomimic.eval.eval_hpt.HPTEvalVideo + +# Optional task allowlist for visualization. A non-empty list emits one mp4 per +# listed task and skips frames from other tasks. Data configs can set +# `viz_tasks`; direct evaluator.tasks overrides still take precedence. +tasks: ${oc.select:data.viz_tasks,[]} + +# Number of videos to write per task when per-task video output is enabled. +videos_per_task: 1 diff --git a/egomimic/hydra_configs/evaluator/eval_pi.yaml b/egomimic/hydra_configs/evaluator/eval_pi.yaml index 13fc4945e..212d0259a 100644 --- a/egomimic/hydra_configs/evaluator/eval_pi.yaml +++ b/egomimic/hydra_configs/evaluator/eval_pi.yaml @@ -4,12 +4,16 @@ defaults: _target_: egomimic.eval.eval_pi.PIEvalVideo -# When true, key the buffer by (embodiment, task) and emit one mp4 per task -# in on_validation_end. Opt-in for eval-only runs that span multiple tasks. -one_video_per_task: false +# Optional task allowlist for visualization. A non-empty list emits one mp4 per +# listed task and skips frames from other tasks. Data configs can set +# `viz_tasks`; direct evaluator.tasks overrides still take precedence. +tasks: ${oc.select:data.viz_tasks,[]} + +# Number of videos to write per task when per-task video output is enabled. +videos_per_task: 1 # Cap each (embodiment, task) video at this many frames. Only takes effect -# when one_video_per_task=true. Set null to disable. +# when per-task video output is enabled. Set null to disable. max_frames_per_task: 1000 # Per-embodiment revert transform. Applied once during validation to project diff --git a/egomimic/hydra_configs/evaluator/train_viz_pi.yaml b/egomimic/hydra_configs/evaluator/train_viz_pi.yaml index d69b0bf4d..3263bf291 100644 --- a/egomimic/hydra_configs/evaluator/train_viz_pi.yaml +++ b/egomimic/hydra_configs/evaluator/train_viz_pi.yaml @@ -17,3 +17,9 @@ _target_: egomimic.eval.eval_train_viz.TrainVizEvalVideo # Train data is iterated as a "spot check"; keep this small relative to the # canonical valid loader's limit_val_batches. limit_val_batches: 50 + +# Mirror canonical validation-viz task settings unless explicitly overridden. +base: + tasks: ${evaluator.tasks} + videos_per_task: ${evaluator.videos_per_task} + max_frames_per_task: ${evaluator.max_frames_per_task} diff --git a/egomimic/hydra_configs/hydra/launcher/submitit_cpu_pace.yaml b/egomimic/hydra_configs/hydra/launcher/submitit_cpu_pace.yaml index 178b91c90..1a5276ef6 100644 --- a/egomimic/hydra_configs/hydra/launcher/submitit_cpu_pace.yaml +++ b/egomimic/hydra_configs/hydra/launcher/submitit_cpu_pace.yaml @@ -5,7 +5,7 @@ _target_: hydra_plugins.hydra_submitit_launcher.submitit_launcher.SlurmLauncher # CPU-only PACE launcher for norm-stats / preprocessing jobs (no GPU). name: ${hydra.job.name} -partition: "cpu-small" # PACE CPU partition — confirm before first submit +partition: "cpu-medium" # PACE CPU partition for higher-memory CPU jobs account: "gts-dxu345-rl2" cpus_per_task: 24 nodes: 1 diff --git a/egomimic/pl_utils/pl_data_utils.py b/egomimic/pl_utils/pl_data_utils.py index 28fec7e8b..50ec2e731 100644 --- a/egomimic/pl_utils/pl_data_utils.py +++ b/egomimic/pl_utils/pl_data_utils.py @@ -62,6 +62,7 @@ def __init__( valid_dataloader_params: dict, train_viz_datasets: dict | None = None, train_viz_dataloader_params: dict | None = None, + viz_tasks: list[str] | None = None, ): """ Args: @@ -77,6 +78,8 @@ def __init__( ``dataloader_idx=1`` on validation_step. train_viz_dataloader_params: dict of per-dataset DataLoader kwargs for the train-viz loader (parallels valid_dataloader_params). + viz_tasks: optional visualization task allowlist from data configs. + Stored as metadata; evaluators consume it through Hydra config. Tokenization (sampling a prompt from per-sample annotation lists, splicing in embodiment / control-mode / proprio blocks, and running @@ -97,6 +100,7 @@ def __init__( self.train_dataloader_params = train_dataloader_params self.valid_dataloader_params = valid_dataloader_params self.train_viz_dataloader_params = train_viz_dataloader_params or {} + self.viz_tasks = list(viz_tasks or []) self.collate_fn = annotation_collate def train_dataloader(self):