Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions egomimic/algo/hpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,8 @@ def process_batch_for_training(self, batch):
processed_batch[embodiment_id]["embodiment"] = torch.tensor(
[embodiment_id], device=self.device, dtype=torch.int64
)
if "task" in _batch:
processed_batch[embodiment_id]["task"] = _batch["task"]
# TODO make this work with any fp type
for key, value in processed_batch[embodiment_id].items():
if isinstance(value, torch.Tensor):
Expand Down
72 changes: 69 additions & 3 deletions egomimic/algo/pi.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,71 @@
logger.propagate = True # Explicitly enable propagation (default, but ensures it works)


class _OpenPIPaligemmaPromptTokenizer:
"""Small wrapper matching the Hugging Face tokenizer call used below."""

def __init__(self, max_length: int | None):
from openpi.models.tokenizer import PaligemmaTokenizer

self.max_length = max_length
tokenizer = PaligemmaTokenizer(max_len=max_length or 128)
self._sentencepiece = tokenizer._tokenizer

def __call__(
self,
prompts,
padding="max_length",
truncation=True,
max_length=None,
return_tensors=None,
):
del return_tensors
max_len = max_length or self.max_length
encoded = []
masks = []
for prompt in prompts:
ids = self._sentencepiece.encode(str(prompt), add_bos=True)
if max_len is not None:
if truncation and len(ids) > max_len:
ids = ids[:max_len]
mask = [True] * len(ids)
if padding == "max_length" and len(ids) < max_len:
pad_len = max_len - len(ids)
ids = ids + [0] * pad_len
mask = mask + [False] * pad_len
else:
mask = [True] * len(ids)
encoded.append(ids)
masks.append(mask)

if max_len is None and padding == "longest":
max_len = max(len(ids) for ids in encoded)
for i, ids in enumerate(encoded):
pad_len = max_len - len(ids)
if pad_len > 0:
encoded[i] = ids + [0] * pad_len
masks[i] = masks[i] + [False] * pad_len

return {
"input_ids": torch.tensor(encoded, dtype=torch.long),
"attention_mask": torch.tensor(masks, dtype=torch.long),
}


def _load_prompt_tokenizer(tokenizer_model_name: str, tokenizer_max_length: int | None):
try:
return AutoTokenizer.from_pretrained(tokenizer_model_name)
except OSError:
if tokenizer_model_name != "google/paligemma-3b-mix-224":
raise
logger.warning(
"Falling back to bundled OpenPI PaliGemma tokenizer because "
"Hugging Face access to %s is unavailable.",
tokenizer_model_name,
)
return _OpenPIPaligemmaPromptTokenizer(tokenizer_max_length)


class PI(Algo):
""" """

Expand Down Expand Up @@ -76,7 +141,9 @@ def __init__(
self.norm_stats = norm_stats

# ---- Prompt assembly + tokenization (was in collate_fn) ----
self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_model_name)
self.tokenizer = _load_prompt_tokenizer(
tokenizer_model_name, tokenizer_max_length
)
self.tokenizer_max_length = tokenizer_max_length
self.sampling_mode = sampling_mode
self.annotation_key = annotation_key
Expand Down Expand Up @@ -352,8 +419,7 @@ def process_batch_for_training(self, batch):
processed_batch[embodiment_id]["embodiment"] = torch.tensor(
[embodiment_id], device=self.device, dtype=torch.int64
)
# Forward per-sample task tags (list[str], length B) so EvalVideo's
# one_video_per_task bucketing can read batch[emb]["task"].
# Forward per-sample task tags so EvalVideo can bucket task videos.
if "task" in _batch:
processed_batch[embodiment_id]["task"] = _batch["task"]

Expand Down
3 changes: 2 additions & 1 deletion egomimic/eval/eval_train_viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ def __init__(self, base: EvalVideo, limit_val_batches: int = 50):
limit_val_batches=limit_val_batches,
viz_func=base.viz_func,
transform_lists=base.transform_lists,
one_video_per_task=base.one_video_per_task,
max_frames_per_task=base.max_frames_per_task,
tasks=base.tasks,
videos_per_task=base.videos_per_task,
)

@property
Expand Down
87 changes: 62 additions & 25 deletions egomimic/eval/eval_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,28 @@ def __init__(
limit_val_batches: int = 400,
viz_func: dict = None,
transform_lists: dict | None = None,
one_video_per_task: bool = False,
max_frames_per_task: int | None = 1000,
tasks: list[str] | None = None,
videos_per_task: int = 1,
):
super().__init__()
if videos_per_task < 1:
raise ValueError("videos_per_task must be >= 1")
self.trainer = None
self.model = None
self.viz_func = viz_func
# Per-embodiment list[Transform] applied once during eval to project
# the model's wrist-frame actions back into cam (head) frame. Reused for
# both cam-frame MSE and the viz video so we don't transform twice.
self.transform_lists = transform_lists or {}
# When True, key the buffer by (embodiment_id, task) and emit exactly
# one mp4 per (embodiment, task) at the end of validation. Used in
# eval-only mode when the valid filter spans multiple tasks.
self.one_video_per_task = one_video_per_task
# Cap each (embodiment, task) bucket at this many frames. Only takes
# effect when one_video_per_task=True. Set None to disable.
task_list = [str(task) for task in (tasks or [])]
self.tasks = task_list
self._task_filter = set(task_list) if task_list else None
self._write_task_videos = self._task_filter is not None
# Cap each task video at this many frames. With multiple videos per
# task, the buffered task limit is this value times videos_per_task.
self.max_frames_per_task = max_frames_per_task
self.videos_per_task = int(videos_per_task)
self.val_image_buffer = {}
self.val_counter = {}
self.override_dict = {
Expand Down Expand Up @@ -81,12 +85,26 @@ def _sanitize_task(task: str) -> str:
# Filesystem-safe: collapse whitespace and replace path separators.
return re.sub(r"[^\w.-]+", "_", str(task)).strip("_") or "unknown"

def _task_frame_limit(self):
if self.max_frames_per_task is None:
return None
return self.max_frames_per_task * self.videos_per_task

def _task_video_chunks(self, buffer):
if self.videos_per_task == 1:
return [buffer]
num_chunks = min(self.videos_per_task, len(buffer))
chunk_size = (len(buffer) + num_chunks - 1) // num_chunks
return [
buffer[start : start + chunk_size]
for start in range(0, len(buffer), chunk_size)
][:num_chunks]

def on_validation_end(self):
for key, buffer in self.val_image_buffer.items():
if self.one_video_per_task:
if self._write_task_videos:
embodiment_id, task = key
emb_dir = str(get_embodiment(embodiment_id))
filename = f"{self._sanitize_task(task)}.mp4"
else:
emb_dir = str(get_embodiment(key))
filename = f"validation_video_{self.val_counter[key]}.mp4"
Expand All @@ -100,14 +118,31 @@ def on_validation_end(self):
exist_ok=True,
)
if len(buffer) != 0:
frames = torch.stack(buffer)
path = os.path.join(
self.video_dir(),
f"epoch_{self.trainer.current_epoch}",
emb_dir,
filename,
)
tvio.write_video(path, frames, fps=30, video_codec="h264")
if self._write_task_videos:
task_name = self._sanitize_task(task)
for video_idx, chunk in enumerate(self._task_video_chunks(buffer)):
filename = (
f"{task_name}.mp4"
if self.videos_per_task == 1
else f"{task_name}_{video_idx}.mp4"
)
frames = torch.stack(chunk)
path = os.path.join(
self.video_dir(),
f"epoch_{self.trainer.current_epoch}",
emb_dir,
filename,
)
tvio.write_video(path, frames, fps=30, video_codec="h264")
else:
frames = torch.stack(buffer)
path = os.path.join(
self.video_dir(),
f"epoch_{self.trainer.current_epoch}",
emb_dir,
filename,
)
tvio.write_video(path, frames, fps=30, video_codec="h264")

self.val_counter[key] = 0
self.val_image_buffer[key] = []
Expand All @@ -133,27 +168,29 @@ def on_validation_step(self, batch, batch_idx, dataloader_idx=0):
)
frames_tensor = torch.from_numpy(images)

if self.one_video_per_task:
if self._write_task_videos:
tasks = batch[embodiment_id].get("task")
if tasks is None:
raise KeyError(
"one_video_per_task=True requires 'task' in each batch "
"sample. Confirm ZarrDataset.__getitem__ attaches it."
"Per-task video output requires 'task' in each batch sample. "
"Confirm ZarrDataset.__getitem__ attaches it."
)
# Group sample indices by task so each bucket only takes one
# extend call even when a batch straddles two tasks.
per_task = defaultdict(list)
for i, t in enumerate(tasks):
per_task[str(t)].append(i)
task = str(t)
if self._task_filter is not None and task not in self._task_filter:
continue
per_task[task].append(i)
for task, idxs in per_task.items():
key = (embodiment_id, task)
if key not in self.val_image_buffer:
self.val_image_buffer[key] = []
self.val_counter[key] = 0
if self.max_frames_per_task is not None:
remaining = self.max_frames_per_task - len(
self.val_image_buffer[key]
)
frame_limit = self._task_frame_limit()
if frame_limit is not None:
remaining = frame_limit - len(self.val_image_buffer[key])
if remaining <= 0:
continue
idxs = idxs[:remaining]
Expand Down
2 changes: 1 addition & 1 deletion egomimic/hydra_configs/data/mecka_pi_eval.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ _target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper

# Eval-only variant of mecka_pi: train side is unchanged (single task), but the
# valid filter is broadened to all mecka tasks so eval mode can score across
# them. Use together with evaluator.one_video_per_task=true.
# them. Set data.viz_tasks or evaluator.tasks to emit per-task videos.

train_datasets:
mecka_bimanual:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
_target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper

# Visualization task allowlist consumed by evaluator.tasks by default.
viz_tasks: [folding_clothes]

train_datasets:
mecka_bimanual:
_target_: egomimic.rldb.zarr.zarr_dataset_multi.MultiDataset._from_resolver
Expand Down
8 changes: 8 additions & 0 deletions egomimic/hydra_configs/evaluator/eval_hpt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@ defaults:
- _self_

_target_: egomimic.eval.eval_hpt.HPTEvalVideo

# Optional task allowlist for visualization. A non-empty list emits one mp4 per
# listed task and skips frames from other tasks. Data configs can set
# `viz_tasks`; direct evaluator.tasks overrides still take precedence.
tasks: ${oc.select:data.viz_tasks,[]}

# Number of videos to write per task when per-task video output is enabled.
videos_per_task: 1
12 changes: 8 additions & 4 deletions egomimic/hydra_configs/evaluator/eval_pi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ defaults:

_target_: egomimic.eval.eval_pi.PIEvalVideo

# When true, key the buffer by (embodiment, task) and emit one mp4 per task
# in on_validation_end. Opt-in for eval-only runs that span multiple tasks.
one_video_per_task: false
# Optional task allowlist for visualization. A non-empty list emits one mp4 per
# listed task and skips frames from other tasks. Data configs can set
# `viz_tasks`; direct evaluator.tasks overrides still take precedence.
tasks: ${oc.select:data.viz_tasks,[]}

# Number of videos to write per task when per-task video output is enabled.
videos_per_task: 1

# Cap each (embodiment, task) video at this many frames. Only takes effect
# when one_video_per_task=true. Set null to disable.
# when per-task video output is enabled. Set null to disable.
max_frames_per_task: 1000

# Per-embodiment revert transform. Applied once during validation to project
Expand Down
6 changes: 6 additions & 0 deletions egomimic/hydra_configs/evaluator/train_viz_pi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ _target_: egomimic.eval.eval_train_viz.TrainVizEvalVideo
# Train data is iterated as a "spot check"; keep this small relative to the
# canonical valid loader's limit_val_batches.
limit_val_batches: 50

# Mirror canonical validation-viz task settings unless explicitly overridden.
base:
tasks: ${evaluator.tasks}
videos_per_task: ${evaluator.videos_per_task}
max_frames_per_task: ${evaluator.max_frames_per_task}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ _target_: hydra_plugins.hydra_submitit_launcher.submitit_launcher.SlurmLauncher

# CPU-only PACE launcher for norm-stats / preprocessing jobs (no GPU).
name: ${hydra.job.name}
partition: "cpu-small" # PACE CPU partition — confirm before first submit
partition: "cpu-medium" # PACE CPU partition for higher-memory CPU jobs
account: "gts-dxu345-rl2"
cpus_per_task: 24
nodes: 1
Expand Down
4 changes: 4 additions & 0 deletions egomimic/pl_utils/pl_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(
valid_dataloader_params: dict,
train_viz_datasets: dict | None = None,
train_viz_dataloader_params: dict | None = None,
viz_tasks: list[str] | None = None,
):
"""
Args:
Expand All @@ -77,6 +78,8 @@ def __init__(
``dataloader_idx=1`` on validation_step.
train_viz_dataloader_params: dict of per-dataset DataLoader kwargs
for the train-viz loader (parallels valid_dataloader_params).
viz_tasks: optional visualization task allowlist from data configs.
Stored as metadata; evaluators consume it through Hydra config.

Tokenization (sampling a prompt from per-sample annotation lists,
splicing in embodiment / control-mode / proprio blocks, and running
Expand All @@ -97,6 +100,7 @@ def __init__(
self.train_dataloader_params = train_dataloader_params
self.valid_dataloader_params = valid_dataloader_params
self.train_viz_dataloader_params = train_viz_dataloader_params or {}
self.viz_tasks = list(viz_tasks or [])
self.collate_fn = annotation_collate

def train_dataloader(self):
Expand Down
Loading