diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index da036b7..524e441 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -21,8 +21,10 @@ jobs: with: python-version: "3.10" cache: "pip" + - name: Install library + run: python -m pip install -e . - name: Install linting and formatting dependencies - run: python -m pip install '.[dev]' + run: python -m pip install 'pip>=25.1' && python -m pip install --group dev - name: Check formatting run: make checkformat # - name: Check linting diff --git a/README.md b/README.md index 8d3a097..75e2e8c 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ On top of vlagents you can then install a simulation environment where the agent We currently the following environments: - [maniskill](https://github.com/haosulab/ManiSkill) - [robot control stack](https://github.com/RobotControlStack/robot-control-stack) +- [duobench](https://github.com/RobotControlStack/duobench) - [libero](https://github.com/Lifelong-Robot-Learning/LIBERO) @@ -41,6 +42,14 @@ We currently support the following policies: - [openpi](https://github.com/Physical-Intelligence/openpi) - [vjepa2-ac](https://github.com/facebookresearch/vjepa2) - [diffusion policy](https://github.com/real-stanford/diffusion_policy) +- [lerobot policies](https://github.com/huggingface/lerobot/tree/main/src/lerobot/policies) + + +### LeRobot +```shell +pip install 'lerobot[all]' +``` + ### Octo @@ -74,7 +83,7 @@ pip install git+https://github.com/juelg/vlagents.git For more details, see the [Octo github page](https://github.com/octo-models/octo). #### Troubleshooting -If pip conplains about dependency issues than it might have happened that torch somehow slipped in. +If pip complains about dependency issues than it might have happened that torch somehow slipped in. Check if you have any torch packages installed by ```shell pip freeze | grep torch @@ -152,6 +161,16 @@ Currently located on the branch `diffusion_policy`. ## Usage To start an vlagents server use the `start-server` command where `kwargs` is a dictionary of the constructor arguments of the policy you want to start e.g. ```shell +# lerobot act (n_action_steps is the executed horizon of the action chunk) +python -m vlagents start-server lerobot --port 8080 --host 0.0.0.0 --kwargs '{"policy_name": "act", "checkpoint_path": "", "n_action_steps": 1}' + +# lerobot pi05 +python -m vlagents start-server lerobot --port 20000 --host 0.0.0.0 --kwargs '{"policy_name": "pi05", "checkpoint_path": "", "n_action_steps": 1}' + +# lerobot xvla +uv run python -m vlagents start-server lerobot --port 20000 --host 0.0.0.0 --kwargs '{"policy_name": "xvla", "checkpoint_path": "", "n_action_steps": 1, "rename_map": {"head": "image", "left_wrist": "image2", "right_wrist": "image3"}}' + + # octo python -m vlagents start-server octo --host localhost --port 8080 --kwargs '{"checkpoint_path": "hf://Juelg/octo-base-1.5-finetuned-maniskill", "checkpoint_step": None, "horizon": 1, "unnorm_key": []}' @@ -245,7 +264,8 @@ In order to extend the library with a new agent environment, extend the `Evaluat ### Developer Tools Install the following dev dependencies: ```shell -pip install -ve '.[dev]' +pip install 'pip>=25.1' +pip install --group dev ``` The following dev tools are provided: @@ -261,12 +281,18 @@ make test ``` ## Citation -If you find the agent useful for your work, please consider citing the original work behind it: +If you find the agent useful for your work, please consider citing the original works behind it: ``` @inproceedings{juelg2025refinedpolicydistillationvla, title={{Refined Policy Distillation}: {F}rom {VLA} Generalists to {RL} Experts}, author={Tobias J{\"u}lg and Wolfram Burgard and Florian Walter}, year={2025}, - booktitle={Proc.~of the IEEE/RSJ Int.~Conf.~on Intelligent Robots and Systems (IROS)}, + booktitle={Proc.~of the IEEE/RSJ Int.~Conf.~on Intelligent Robots and Systems (IROS)} +} +@misc{juelg2026vlagentspolicyserverefficient, + title={VLAgents: A Policy Server for Efficient VLA Inference}, + author={Tobias J{\"u}lg and Khaled Gamal and Nisarga Nilavadi and Pierre Krack and Seongjin Bien and Michael Krawez and Florian Walter and Wolfram Burgard}, + year={2026}, + howpublished={\url{https://arxiv.org/abs/2601.11250}} } ``` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 50a946a..a05e530 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ Homepage = "https://github.com/RobotControlStack/vlagents" Issues = "https://github.com/RobotControlStack/vlagents/issues" -[project.optional-dependencies] +[dependency-groups] dev = [ "ruff", "black", diff --git a/src/tests/test_eval_merge.py b/src/tests/test_eval_merge.py new file mode 100644 index 0000000..cae3697 --- /dev/null +++ b/src/tests/test_eval_merge.py @@ -0,0 +1,36 @@ +import numpy as np + +from vlagents.__main__ import _merge_env_split_results +from vlagents.evaluator_envs import EvalConfig + + +def test_merge_env_split_results_keeps_distinct_seeded_cfgs(): + cfg_a = EvalConfig("duobench/spring_door", {}, max_steps_per_episode=900, seed=0, jpeg_encoding=True) + cfg_b = EvalConfig("duobench/spring_door", {}, max_steps_per_episode=900, seed=10, jpeg_encoding=True) + + results = [ + ( + np.array([[[1.0, 0.1, 100.0], [0.0, 0.2, 200.0]]]), + [[[0.1], [0.2]]], + [0.15], + 40000, + ), + ( + np.array([[[0.0, 0.3, 300.0], [1.0, 0.4, 400.0]]]), + [[[0.3], [0.4]]], + [0.35], + 40000, + ), + ] + + merged_last_reward, merged_rewards, merged_mean_rewards, merged_step = _merge_env_split_results( + results=results, + worker_eval_cfgs=[[cfg_a], [cfg_b]], + eval_cfgs=[cfg_a, cfg_b], + ) + + assert merged_step == 40000 + assert np.array_equal(merged_last_reward[0], results[0][0][0]) + assert np.array_equal(merged_last_reward[1], results[1][0][0]) + assert merged_rewards == [results[0][1][0], results[1][1][0]] + assert merged_mean_rewards == [0.15, 0.35] diff --git a/src/tests/test_libero.py b/src/tests/test_libero.py index 67d422f..b37cc53 100644 --- a/src/tests/test_libero.py +++ b/src/tests/test_libero.py @@ -3,9 +3,9 @@ import os import numpy as np + from lerobot.envs.libero import LiberoEnv from PIL import Image - from lerobot.envs.libero import LiberoEnv from vlagents.__main__ import _run_eval from vlagents.evaluator_envs import AgentConfig, EvalConfig diff --git a/src/vlagents/__main__.py b/src/vlagents/__main__.py index 74727ce..5176894 100644 --- a/src/vlagents/__main__.py +++ b/src/vlagents/__main__.py @@ -69,10 +69,13 @@ def _per_process( os.environ["CUDA_VISIBLE_DEVICES"] = str(nth_gpu) os.environ["CAM_PATH"] = f"{os.environ['RUN_PATH']}/videos/{step}" agent_cfg = copy.deepcopy(_agent_cfg) + # Each worker masks itself to a single CUDA device, so `cuda:0` resolves + # to the GPU assigned via CUDA_VISIBLE_DEVICES. + agent_cfg.agent_kwargs.setdefault("device", "cuda:0") agent_cfg.agent_kwargs["checkpoint_step"] = step per_env_results_last_reward, per_env_results_rewards = evaluation( - agent_cfg=agent_cfg, eval_cfgs=eval_cfgs, episodes=episodes, n_processes=n_processes + agent_cfg=agent_cfg, eval_cfgs=eval_cfgs, episodes=episodes ) logging.info(f"Finished evaluation for step {step}") flatten_rewards = [[item for sublist in env_rewards for item in sublist] for env_rewards in per_env_results_rewards] @@ -81,6 +84,115 @@ def _per_process( return per_env_results_last_reward, per_env_results_rewards, mean_rewards, step +def _eval_cfg_key(cfg: EvalConfig) -> str: + return json.dumps( + { + "env_id": cfg.env_id, + "env_kwargs": cfg.env_kwargs, + "max_steps_per_episode": cfg.max_steps_per_episode, + "seed": cfg.seed, + "same_machine": cfg.same_machine, + "jpeg_encoding": cfg.jpeg_encoding, + }, + sort_keys=True, + ) + + +def _report_eval_cfg_key(cfg: EvalConfig) -> str: + return json.dumps( + { + "env_id": cfg.env_id, + "env_kwargs": cfg.env_kwargs, + "max_steps_per_episode": cfg.max_steps_per_episode, + }, + sort_keys=True, + ) + + +def _group_eval_cfgs_for_reporting(eval_cfgs: list[EvalConfig]) -> list[list[EvalConfig]]: + grouped_cfgs: dict[str, list[EvalConfig]] = {} + ordered_keys: list[str] = [] + + for cfg in eval_cfgs: + key = _report_eval_cfg_key(cfg) + if key not in grouped_cfgs: + grouped_cfgs[key] = [] + ordered_keys.append(key) + grouped_cfgs[key].append(cfg) + + return [grouped_cfgs[key] for key in ordered_keys] + + +def _aggregate_results_for_reporting( + per_env_results_last_reward: np.ndarray, + per_env_results_rewards: list[list[list[float]]], + eval_cfgs: list[EvalConfig], +) -> tuple[np.ndarray, list[list[list[float]]], list[float], list[EvalConfig], list[list[EvalConfig]]]: + grouped_cfgs = _group_eval_cfgs_for_reporting(eval_cfgs) + + aggregated_last_reward = [] + aggregated_rewards = [] + aggregated_mean_rewards = [] + representative_cfgs = [] + + for cfg_group in grouped_cfgs: + indices = [ + idx for idx, cfg in enumerate(eval_cfgs) if _report_eval_cfg_key(cfg) == _report_eval_cfg_key(cfg_group[0]) + ] + merged_last_reward = np.concatenate([per_env_results_last_reward[idx] for idx in indices], axis=0) + merged_rewards = [episode_rewards for idx in indices for episode_rewards in per_env_results_rewards[idx]] + flatten_rewards = [reward for episode_rewards in merged_rewards for reward in episode_rewards] + + aggregated_last_reward.append(merged_last_reward) + aggregated_rewards.append(merged_rewards) + aggregated_mean_rewards.append(np.mean(flatten_rewards) if flatten_rewards else 0.0) + representative_cfgs.append(copy.deepcopy(cfg_group[0])) + + return ( + np.stack(aggregated_last_reward, axis=0), + aggregated_rewards, + aggregated_mean_rewards, + representative_cfgs, + grouped_cfgs, + ) + + +def _merge_env_split_results( + results: list[tuple[np.ndarray, list[list[list[float]]], list[float], int]], + worker_eval_cfgs: list[list[EvalConfig]], + eval_cfgs: list[EvalConfig], +) -> tuple[np.ndarray, list[list[list[float]]], list[float], int]: + per_cfg_results: dict[str, tuple[np.ndarray, list[list[float]], float]] = {} + merged_step = results[0][3] if results else None + + for result, cfgs in zip(results, worker_eval_cfgs, strict=True): + per_env_results_last_reward, per_env_results_rewards, mean_rewards, step = result + if merged_step is None: + merged_step = step + + assert len(cfgs) == per_env_results_last_reward.shape[0] == len(per_env_results_rewards) == len(mean_rewards) + for idx, cfg in enumerate(cfgs): + per_cfg_results[_eval_cfg_key(cfg)] = ( + per_env_results_last_reward[idx], + per_env_results_rewards[idx], + mean_rewards[idx], + ) + + ordered_last_reward = [] + ordered_rewards = [] + ordered_mean_rewards = [] + for cfg in eval_cfgs: + key = _eval_cfg_key(cfg) + if key not in per_cfg_results: + raise KeyError(f"Missing worker result for evaluation config {cfg}") + last_reward, rewards, mean_reward = per_cfg_results[key] + ordered_last_reward.append(last_reward) + ordered_rewards.append(rewards) + ordered_mean_rewards.append(mean_reward) + + return np.stack(ordered_last_reward, axis=0), ordered_rewards, ordered_mean_rewards, merged_step + + @main_app.command() def run_eval( output_path: Annotated[str, typer.Option(help="Path to store the run results.")], @@ -142,10 +254,12 @@ def _run_eval( else: steps = json.loads(steps) - agent_cfgs = [agent_cfg] * len(steps) # TODO: make this a prober argument that is passed os.environ["RUN_PATH"] = output_path + report_cfg_groups = _group_eval_cfgs_for_reporting(eval_cfgs) + report_eval_cfgs = [copy.deepcopy(cfg_group[0]) for cfg_group in report_cfg_groups] + use_wandb = wandb_project is not None and wandb_entity is not None if use_wandb: wandb.init( @@ -157,6 +271,14 @@ def _run_eval( job_type="eval", name=wandb_name, group=wandb_group, + config={ + "eval_cfgs": eval_cfgs, + "agent_cfg": agent_cfg, + "episodes": episodes, + "n_processes": n_processes, + "n_gpus": n_gpus, + "steps": steps, + }, ) wandb_log_git_diff(output_path) wandb.run.log_code(".") @@ -193,7 +315,7 @@ def _run_eval( hidden=False, summary="max", ) - for idx, env in enumerate(eval_cfgs): + for env in report_eval_cfgs: wandb.define_metric( f"{env.env_id}/success", step_metric="train_step", @@ -227,47 +349,77 @@ def _run_eval( summary="max", ) - # distribute gpus equally - gpus_ids = [i % n_gpus for i in range(len(steps))] + if len(steps) > 1: + # Evaluate different checkpoints in parallel and keep the full env set together. + worker_eval_cfgs = [eval_cfgs for _ in steps] + args = [] + for idx, step in enumerate(steps): + worker_agent_cfg = copy.deepcopy(agent_cfg) + worker_agent_cfg.port += idx + args.append((step, worker_agent_cfg, eval_cfgs, episodes, 1, idx % n_gpus)) + max_workers = len(args) + else: + # For a single checkpoint, fan out across environments so multiple GPUs + # can each host one policy server and evaluate a subset of tasks. + worker_eval_cfgs = [[cfg] for cfg in eval_cfgs] + args = [] + for idx, cfg in enumerate(eval_cfgs): + worker_agent_cfg = copy.deepcopy(agent_cfg) + worker_agent_cfg.port += idx + args.append((steps[0], worker_agent_cfg, [cfg], episodes, 1, idx % n_gpus)) + max_workers = len(args) - # spawn n processes and run in parallel + pool_size = min(max_workers, n_processes or max_workers) + if pool_size > 1: + with Pool(pool_size) as p: + results = p.map(_per_process, args) + else: + results = [_per_process(arg) for arg in args] - for idx in range(len(steps)): - agent_cfgs[idx].port += idx - with Pool(n_processes) as p: - args = [(step, agent_cfgs[idx], eval_cfgs, episodes, 1, gpus_ids[idx]) for idx, step in enumerate(steps)] - results = p.map(_per_process, args) logging.info("Finished evaluation") - for result in results: - per_env_results_last_reward, per_env_results_rewards, mean_rewards, step = result + if len(steps) > 1: + merged_results = results + else: + merged_results = [_merge_env_split_results(results, worker_eval_cfgs, eval_cfgs)] + + for result in merged_results: + per_env_results_last_reward, per_env_results_rewards, _, step = result + ( + report_results_last_reward, + report_results_rewards, + report_mean_rewards, + report_eval_cfgs, + report_cfg_groups, + ) = _aggregate_results_for_reporting(per_env_results_last_reward, per_env_results_rewards, eval_cfgs) if use_wandb: step = step if step is not None else 0 wandb_log_dict = { - "total/success": per_env_results_last_reward.mean(axis=(0, 1))[0], - "total/last_step_reward": per_env_results_last_reward.mean(axis=(0, 1))[1], - "total/total_steps": per_env_results_last_reward.mean(axis=(0, 1))[2], - "total/mean_reward": np.mean(mean_rewards), + "total/success": report_results_last_reward.mean(axis=(0, 1))[0], + "total/last_step_reward": report_results_last_reward.mean(axis=(0, 1))[1], + "total/total_steps": report_results_last_reward.mean(axis=(0, 1))[2], + "total/mean_reward": np.mean(report_mean_rewards), "train_step": step, } # log for each env - for idx, env in enumerate(eval_cfgs): + for idx, env in enumerate(report_eval_cfgs): wandb_log_dict.update( { - f"{env.env_id}/success": per_env_results_last_reward[idx].mean(axis=0)[0], - f"{env.env_id}/last_step_reward": per_env_results_last_reward[idx].mean(axis=0)[1], - f"{env.env_id}/total_steps": per_env_results_last_reward[idx].mean(axis=0)[2], - f"{env.env_id}/mean_reward": mean_rewards[idx], + f"{env.env_id}/success": report_results_last_reward[idx].mean(axis=0)[0], + f"{env.env_id}/last_step_reward": report_results_last_reward[idx].mean(axis=0)[1], + f"{env.env_id}/total_steps": report_results_last_reward[idx].mean(axis=0)[2], + f"{env.env_id}/mean_reward": report_mean_rewards[idx], } ) wandb.log(wandb_log_dict, step=step, commit=True) path = write_results( - per_env_results_last_reward, - per_env_results_rewards, - eval_cfgs, - agent_cfg=agent_cfgs[0], + report_results_last_reward, + report_results_rewards, + report_eval_cfgs, + agent_cfg=agent_cfg, out=output_path, + grouped_eval_cfgs=report_cfg_groups, ) if use_wandb: wandb.log_artifact(path, type="file", name="results", aliases=[f"step_{step}"]) diff --git a/src/vlagents/client.py b/src/vlagents/client.py index c0bde08..4e4f670 100644 --- a/src/vlagents/client.py +++ b/src/vlagents/client.py @@ -34,13 +34,54 @@ def __init__(self, host: str, port: int, model: str, on_same_machine: bool = Fal jpeg_encoding (bool, optional): If True the image data is jpeg encoded for smaller transfer size. Defaults to False. """ + self.host = host + self.port = port + self.model = model self.on_same_machine = on_same_machine self.jpeg_encoding = jpeg_encoding self._shm: dict[str, shared_memory.SharedMemory] = {} + self.c = None + self._connect() + + def _connect(self): self.c = rpyc.connect( - host, port, config={"allow_pickle": True, "allow_public_attrs": True, "sync_request_timeout": 300} + self.host, + self.port, + config={"allow_pickle": True, "allow_public_attrs": True, "sync_request_timeout": 300}, ) - assert model == self.c.root.name() + assert self.model == self.c.root.name() + + def reconnect( + self, + host: str | None = None, + port: int | None = None, + model: str | None = None, + on_same_machine: bool | None = None, + jpeg_encoding: bool | None = None, + ): + if self.c is not None: + try: + self.c.close() + except Exception: + pass + if host is not None: + self.host = host + if port is not None: + self.port = port + if model is not None: + self.model = model + if on_same_machine is not None: + self.on_same_machine = on_same_machine + if jpeg_encoding is not None: + self.jpeg_encoding = jpeg_encoding + self._connect() + + def ensure_connected(self): + try: + assert self.c is not None + self.c.ping() + except Exception: + self.reconnect() def _process(self, obs: Obs) -> Obs: if self.on_same_machine: @@ -75,18 +116,32 @@ def act(self, obs: Obs) -> Act: obs = self._process(obs) obs = json_numpy.dumps(asdict(obs)) # action, done, info - return dataclass_from_dict(Act, json_numpy.loads(self.c.root.act(obs))) + try: + assert self.c is not None + return dataclass_from_dict(Act, json_numpy.loads(self.c.root.act(obs))) + except Exception: + self.reconnect() + assert self.c is not None + return dataclass_from_dict(Act, json_numpy.loads(self.c.root.act(obs))) def reset(self, obs: Obs, instruction: Any, **kwargs) -> dict[str, Any]: obs = self._process(obs) obs_dict = asdict(obs) # info - return json_numpy.loads(self.c.root.reset(json_numpy.dumps((obs_dict, instruction, kwargs)))) + try: + assert self.c is not None + return json_numpy.loads(self.c.root.reset(json_numpy.dumps((obs_dict, instruction, kwargs)))) + except Exception: + self.reconnect() + assert self.c is not None + return json_numpy.loads(self.c.root.reset(json_numpy.dumps((obs_dict, instruction, kwargs)))) def git_status(self) -> str: + assert self.c is not None return json_numpy.loads(self.c.root.git_status()) def is_initialized(self) -> bool: + assert self.c is not None return self.c.root.is_initialized() def close(self): @@ -94,7 +149,8 @@ def close(self): shm.close() shm.unlink() self._shm = {} - self.c.close() + if self.c is not None: + self.c.close() if __name__ == "__main__": diff --git a/src/vlagents/evaluator_envs.py b/src/vlagents/evaluator_envs.py index d30c4bf..9b41630 100644 --- a/src/vlagents/evaluator_envs.py +++ b/src/vlagents/evaluator_envs.py @@ -8,6 +8,7 @@ from abc import ABC from contextlib import contextmanager from dataclasses import asdict, dataclass +from multiprocessing import Pool from pathlib import Path from time import sleep from typing import Any @@ -32,12 +33,10 @@ class EvaluatorEnv(ABC): ENVS: dict[str, "EvaluatorEnv"] = {} - def __init__(self, env_id: str, seed: int, **env_kwargs) -> None: + def __init__(self, env_id: str, **env_kwargs) -> None: self.do_import() self.env = gym.make(env_id, **env_kwargs) - self.env.np_random = np.random.RandomState(seed=seed) self.env_id = env_id - self.seed = seed def step(self, action: Act) -> tuple[Obs, float, bool, bool, dict]: raise NotImplementedError @@ -54,63 +53,100 @@ def register(env_id: str, env: "EvaluatorEnv") -> None: EvaluatorEnv.ENVS[env_id] = env @staticmethod - def make(env_id: str, seed: int, **env_kwargs) -> "EvaluatorEnv": - return EvaluatorEnv.ENVS[env_id](env_id, seed, **env_kwargs) + def make(env_id: str, **env_kwargs) -> "EvaluatorEnv": + return EvaluatorEnv.ENVS[env_id](env_id, **env_kwargs) @staticmethod def do_import(): raise NotImplementedError -class RCSPickUpCubeEval(EvaluatorEnv): - INSTRUCTIONS = { - "rcs/FR3SimplePickUpSim-v0": "pick the green box", - "rcs/FR3LabPickUpSimDigitHand-v0": "pick the green box", - } +class RCSDuoBench(EvaluatorEnv): + INSTRUCTIONS = {} + + def __init__(self, env_id, **env_kwargs): + self.robot_keys: str = env_kwargs.pop("robot_keys", ["left", "right"]) + self.control_mode: str = env_kwargs.pop("control_mode", "joints") + self._instruction: str | None = None + super().__init__(env_id, **env_kwargs) def translate_obs(self, obs: dict[str, Any]) -> Obs: - # does not include history + cameras = {} + for key in obs["frames"]: + cameras[key] = obs["frames"][key]["rgb"]["data"] + cameras[key] = np.array(Image.fromarray(cameras[key]).resize((224, 224), Image.Resampling.BILINEAR)) + state = [] + for key in self.robot_keys: + state.append(obs[key]["joints"]) + state.append(obs[key]["gripper"]) - # side = obs["frames"]["arro"]["rgb"]["data"] - side = obs["frames"]["side"]["rgb"]["data"] - wrist = obs["frames"]["wrist"]["rgb"]["data"] - # depth_side = obs["frames"]["side"]["depth"]["data"], return Obs( - cameras=dict(rgb_side=side, rgb_wrist=wrist), - # cameras=dict(rgb_side=side), - gripper=obs["gripper"], - info=dict(joints=obs["joints"]), + cameras=cameras, + gripper=None, + state=np.concatenate(state), + info={"high_res_cameras": {key: obs["frames"][key]["rgb"]["data"] for key in obs["frames"]}}, ) def step(self, action: Act) -> tuple[Obs, float, bool, bool, dict]: - # includes horizon - if action.action.shape[0] != 7: - obs, reward, success, truncated, info = self.env.step( - {"xyzrpy": action.action[0][:6], "gripper": action.action[0][6]} - ) - else: - obs, reward, success, truncated, info = self.env.step( - {"xyzrpy": action.action[:6], "gripper": action.action[6]} - ) - # print(action.action, obs["xyzrpy"], obs["gripper"]) - return self.translate_obs(obs), reward, success, truncated, info + assert ( + len(action.action.shape) == 1 + ), "this function cannot deal with batches or action chunks, please return single actions" + env_action = {} + for idx, robot in enumerate(self.robot_keys): + if self.control_mode == "joints": + env_action[robot] = { + "joints": action.action[idx * 8 : idx * 8 + 7], + "gripper": action.action[idx * 8 + 7 : idx * 8 + 8], + } + else: + env_action[robot] = { + "xyzrpy": action.action[idx * 7 : idx * 7 + 6], + "gripper": action.action[idx * 7 + 6 : idx * 7 + 7], + } + obs, reward, success, truncated, info = self.env.step(env_action) + r = float(reward) + + return self.translate_obs(obs), r, success, truncated, info def reset(self, seed: int | None = None, options: dict[str, Any] | None = None) -> tuple[Obs, dict[str, Any]]: - obs, info = self.env.reset() + obs, info = self.env.reset(seed=seed, options=options) + self._instruction = info["instruction"] return self.translate_obs(obs), info @property def language_instruction(self) -> str: - return self.INSTRUCTIONS[self.env_id] + assert self._instruction is not None + return self._instruction @staticmethod def do_import(): import rcs - import rcs_toolbox + from rcs_duobench.tasks import ( + ball_maze, + bin_sort, + block_balance, + carry_pot, + hinge_chest, + join_blocks, + pour_marbles, + spring_door, + transfer_cube, + transfer_gate, + transfer_reorient, + ) -EvaluatorEnv.register("rcs/FR3SimplePickUpSim-v0", RCSPickUpCubeEval) -EvaluatorEnv.register("rcs/FR3LabPickUpSimDigitHand-v0", RCSPickUpCubeEval) +EvaluatorEnv.register("duobench/ball_maze", RCSDuoBench) +EvaluatorEnv.register("duobench/bin_sort", RCSDuoBench) +EvaluatorEnv.register("duobench/block_balance", RCSDuoBench) +EvaluatorEnv.register("duobench/carry_pot", RCSDuoBench) +EvaluatorEnv.register("duobench/join_blocks", RCSDuoBench) +EvaluatorEnv.register("duobench/hinge_chest", RCSDuoBench) +EvaluatorEnv.register("duobench/pour_marbles", RCSDuoBench) +EvaluatorEnv.register("duobench/spring_door", RCSDuoBench) +EvaluatorEnv.register("duobench/transfer_cube", RCSDuoBench) +EvaluatorEnv.register("duobench/transfer_gate", RCSDuoBench) +EvaluatorEnv.register("duobench/transfer_reorient", RCSDuoBench) class ManiSkill(EvaluatorEnv): @@ -128,12 +164,12 @@ class ManiSkill(EvaluatorEnv): "PokeCube-v1": "push the cube by using the blue tool", } - def __init__(self, env_id, seed, **env_kwargs): + def __init__(self, env_id, **env_kwargs): # TODO: one could save only every nth episode by adding an episode counter which steps the record env only # when the counter is divisible by n otherwise steps the normal env logging.info(f"Creating ManiSkill env {env_id}") output_dir = env_kwargs.pop("video_dir", None) - super().__init__(env_id, seed, **env_kwargs) + super().__init__(env_id, **env_kwargs) logging.info(f"Created ManiSkill env {env_id}") if "human_render_camera_configs" in env_kwargs: self.env = HumanCameraWrapper(self.env) @@ -175,7 +211,8 @@ def step(self, action: Act) -> tuple[Obs, float, bool, bool, dict]: return self.translate_obs(obs), reward, success, truncated, info def reset(self, seed: int | None = None, options: dict[str, Any] | None = None) -> tuple[Obs, dict[str, Any]]: - obs, info = self.env.reset(seed=seed, options=options) + # maniskill has a bug that does not allow None in options + obs, info = self.env.reset(seed=seed) return self.translate_obs(obs), info @property @@ -202,7 +239,7 @@ def do_import(): class Libero(EvaluatorEnv): - def __init__(self, env_id: str, seed: int, reset_steps: int = 14, **env_kwargs) -> None: + def __init__(self, env_id: str, reset_steps: int = 14, **env_kwargs) -> None: """ For supported env_kwargs checkout ControlEnv class in libero. We add the following env_kwargs on top: @@ -215,13 +252,12 @@ def __init__(self, env_id: str, seed: int, reset_steps: int = 14, **env_kwargs) self.reset_steps = reset_steps self.control_mode = self.env_kwargs.pop("control_mode", "relative") self.env, self._language_instruction, self.task_name, self.task_suite, self.task_id, self.task = self._make_gym( - env_id, seed, **self.env_kwargs + env_id, **self.env_kwargs ) logging.info( f"Created Libero env, task suite: {env_id}, task id: {self.task_id}, task name {self.task_name}, instruction: {self._language_instruction}" ) self.env_id = env_id - self.seed = seed @staticmethod def n_tasks(env_id: str) -> int: @@ -232,7 +268,7 @@ def n_tasks(env_id: str) -> int: return task_suite.n_tasks @staticmethod - def _make_gym(env_id, seed, **env_kwargs): + def _make_gym(env_id, **env_kwargs): from libero.libero import benchmark, get_libero_path from libero.libero.envs import OffScreenRenderEnv @@ -247,7 +283,6 @@ def _make_gym(env_id, seed, **env_kwargs): bddl_file_name=task_bddl_file, **env_kwargs, ) - env.seed(seed) return env, task.language, task.name, task_suite, task_id, task @@ -266,6 +301,8 @@ def step(self, action: Act) -> tuple[Obs, float, bool, bool, dict]: return self.translate_obs(obs), reward, success, done, info def reset(self, seed: int | None = None, options: dict[str, Any] | None = None) -> tuple[Obs, dict[str, Any]]: + if seed is not None: + self.env.seed(seed) obs = self.env.reset() init_states = self.task_suite.get_task_init_states( self.task_id @@ -325,11 +362,55 @@ class AgentConfig: port: int = 8080 -def single_eval(env: EvaluatorEnv, agent: Agent, max_steps: int, i) -> tuple[list[float], list[float], list[float]]: +def _write_camera_mp4(frames: list[np.ndarray], output_path: Path, fps: int = 30) -> None: + if not frames: + return + + height, width = frames[0].shape[:2] + process = subprocess.Popen( + [ + "ffmpeg", + "-y", + "-f", + "rawvideo", + "-pix_fmt", + "rgb24", + "-s", + f"{width}x{height}", + "-r", + str(fps), + "-i", + "-", + "-an", + "-vf", + "pad=ceil(iw/2)*2:ceil(ih/2)*2", + "-c:v", + "libx264", + "-pix_fmt", + "yuv420p", + "-movflags", + "+faststart", + str(output_path), + ], + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + assert process.stdin is not None + for frame in frames: + process.stdin.write(np.ascontiguousarray(frame).astype(np.uint8).tobytes()) + process.stdin.close() + process.wait() + + +def single_eval( + env: EvaluatorEnv, agent: Agent, max_steps: int, ith_episode: int, start_seed: int +) -> tuple[list[float], list[float], list[float]]: logging.debug(f"Starting evaluation") - obs, _ = env.reset(options={}) + obs, _ = env.reset(seed=start_seed + ith_episode) # ensure different seed for each episode + cameras = obs.info.pop("high_res_cameras", obs.cameras) logging.debug(f"Reset env") - agent.reset(obs, env.language_instruction) + agent.reset(copy.deepcopy(obs), env.language_instruction) logging.debug(f"Reset agent") done = False truncated = False @@ -339,28 +420,25 @@ def single_eval(env: EvaluatorEnv, agent: Agent, max_steps: int, i) -> tuple[lis while not done and not truncated and max_steps > step: action = agent.act(obs) obs, reward, done, truncated, _ = env.step(action) + cameras = obs.info.pop("high_res_cameras", obs.cameras) reward = float(reward) done, truncated = bool(done), bool(truncated) step += 1 rewards.append(reward) - im.append(obs.cameras) - - Path(f"{os.environ['CAM_PATH']}").mkdir(exist_ok=True, parents=True) - for camera in im[0].keys(): - imgs = [] - for img in im: - # skip images that have timestamps closer together than 0.5s - imgs.append(Image.fromarray(img[camera])) - - imgs[0].save( - f"{os.environ['CAM_PATH']}/{i}_{camera}_{str(datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))}.gif", - save_all=True, - append_images=imgs[1:], - duration=0.2 * 1000, - loop=0, - ) + im.append(cameras) + + cam_path = os.environ.get("CAM_PATH", None) + if cam_path is not None and im: + output_dir = Path(os.environ["CAM_PATH"]) / env.env_id + output_dir.mkdir(exist_ok=True, parents=True) + timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + for camera in im[0].keys(): + _write_camera_mp4( + [img[camera] for img in im], + output_dir / f"{ith_episode}_{camera}_{timestamp}.mp4", + ) - env.reset(options={}) + env.reset() logging.debug(f"Finished evaluation with {step} steps and reward {reward}, success {done}") # success, last reward and number of steps return done, rewards, step @@ -369,12 +447,12 @@ def single_eval(env: EvaluatorEnv, agent: Agent, max_steps: int, i) -> tuple[lis per_process_cache = {} -def create_env_agent(agent_config: AgentConfig, cfg: EvalConfig, seed: int) -> tuple[EvaluatorEnv, RemoteAgent]: +def create_env_agent(agent_config: AgentConfig, cfg: EvalConfig) -> tuple[EvaluatorEnv, RemoteAgent]: logging.debug(f"retrieving env {cfg.env_id} and agent") key = (cfg.env_id, agent_config.host, agent_config.port) if key not in per_process_cache: logging.info(f"env {cfg.env_id} not available, creating new env and agent") - env = EvaluatorEnv.make(cfg.env_id, seed=seed, **cfg.env_kwargs) + env = EvaluatorEnv.make(cfg.env_id, **cfg.env_kwargs) logging.info("done creating env") agent = RemoteAgent( agent_config.host, @@ -388,30 +466,24 @@ def create_env_agent(agent_config: AgentConfig, cfg: EvalConfig, seed: int) -> t return per_process_cache[key] -def run_episode(args: tuple[int, list[EvalConfig], int, AgentConfig]) -> tuple[float, float, float]: +def run_episode(args: tuple[int, list[EvalConfig], int, AgentConfig]) -> tuple[list[float], list[float], list[float]]: i, cfgs, episodes, agent_cfg = args cfg = cfgs[i // episodes] - env, agent = create_env_agent(agent_cfg, cfg, seed=i) + env, agent = create_env_agent(agent_cfg, cfg) # busy wait for server to finish initialization while not agent.is_initialized(): logging.info("Waiting for agent to initialize...") sleep(5) - return single_eval(env, agent, cfg.max_steps_per_episode, i) + return single_eval(env, agent, cfg.max_steps_per_episode, i, start_seed=cfg.seed) def multi_eval( - agent_cfg: AgentConfig, cfgs: list[EvalConfig], episodes: int = 100, n_processes: int = 1 + agent_cfg: AgentConfig, cfgs: list[EvalConfig], episodes: int = 100 ) -> tuple[np.ndarray, list[list[list[float]]]]: # return is [envs, episodes, 3(success, reward, steps)], [envs, episodes, rewards for all steps in the episode] logging.info(f"Starting evaluation with {len(cfgs)} environments and {episodes} episodes each") - # with process - # with Pool(n_processes) as p: - # args = [(i, cfgs, episodes, client_cfg) for i in range(len(cfgs) * episodes)] - # single_results = p.map(run_episode, args) - - # without process - np.random.seed(cfgs[0].seed) + # np.random.seed(cfgs[0].seed) args = [(i, cfgs, episodes, agent_cfg) for i in range(len(cfgs) * episodes)] single_results = [run_episode(arg) for arg in tqdm(args)] @@ -478,7 +550,6 @@ def evaluation( agent_cfg: AgentConfig, eval_cfgs: list[EvalConfig], episodes: int = 100, - n_processes: int = 1, ): per_process_cache.clear() logging.info(f"Starting evaluation with {agent_cfg.agent_name} and {agent_cfg.agent_kwargs}") @@ -486,7 +557,8 @@ def evaluation( with start_server( agent_cfg.agent_name, agent_cfg.agent_kwargs, agent_cfg.port, agent_cfg.host, agent_cfg.python_path ): - res = multi_eval(agent_cfg, eval_cfgs, episodes, n_processes) + sleep(30) + res = multi_eval(agent_cfg, eval_cfgs, episodes) except Exception: # Ensures you SEE the client's stack trace and any logged errors. logging.exception("Client failed") @@ -523,7 +595,7 @@ def run_eval( [ "-m", "vlagents", - "run-eval-post-training", + "run-eval", f"--agent-cfg={json.dumps(asdict(agent_cfg))}", f"--episodes={episodes}", f"--n-processes={n_processes}", @@ -550,6 +622,7 @@ def write_results( eval_cfgs: list[EvalConfig], agent_cfg: AgentConfig, out: str = "", + grouped_eval_cfgs: list[list[EvalConfig]] | None = None, ) -> str: # first read json, if not exists write empty list path = os.path.join(out, f"results_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.json") @@ -562,49 +635,51 @@ def write_results( flatten_rewards = [[item for sublist in env_rewards for item in sublist] for env_rewards in rewards] mean_rewards = [np.mean(env_rewards) for env_rewards in flatten_rewards] + grouped_eval_cfgs = grouped_eval_cfgs or [[cfg] for cfg in eval_cfgs] - for idx, cfg in enumerate(eval_cfgs): + for idx, (cfg, cfg_group) in enumerate(zip(eval_cfgs, grouped_eval_cfgs, strict=True)): success_mean, reward_mean, steps_mean = results[idx].mean(axis=0, keepdims=False) success_max, reward_max, steps_max = results[idx].max(axis=0, keepdims=False) success_min, reward_min, steps_min = results[idx].min(axis=0, keepdims=False) sucess_std, reward_std, steps_std = results[idx].std(axis=0, keepdims=False) success_median, reward_median, steps_median = np.median(results[idx], axis=0, keepdims=False) - prev_results.append( - { - "success": { - "mean": success_mean, - "max": success_max, - "min": success_min, - "std": sucess_std, - "median": success_median, - "values": results[idx, :, 0].tolist(), - }, - "reward_last_step": { - "mean": reward_mean, - "max": reward_max, - "min": reward_min, - "std": reward_std, - "median": reward_median, - "values": results[idx, :, 1].tolist(), - }, - "rewards": { - "mean": mean_rewards[idx], - "values": rewards[idx], - }, - "steps": { - "mean": steps_mean, - "max": steps_max, - "min": steps_min, - "std": steps_std, - "median": steps_median, - "values": results[idx, :, 2].tolist(), - }, - "episodes": len(results), - "timestamp": datetime.datetime.now().isoformat(), - "env_cfg": asdict(cfg), - "agent_cfg": asdict(agent_cfg), - } - ) + result_entry = { + "success": { + "mean": success_mean, + "max": success_max, + "min": success_min, + "std": sucess_std, + "median": success_median, + "values": results[idx, :, 0].tolist(), + }, + "reward_last_step": { + "mean": reward_mean, + "max": reward_max, + "min": reward_min, + "std": reward_std, + "median": reward_median, + "values": results[idx, :, 1].tolist(), + }, + "rewards": { + "mean": mean_rewards[idx], + "values": rewards[idx], + }, + "steps": { + "mean": steps_mean, + "max": steps_max, + "min": steps_min, + "std": steps_std, + "median": steps_median, + "values": results[idx, :, 2].tolist(), + }, + "episodes": results.shape[1], + "timestamp": datetime.datetime.now().isoformat(), + "env_cfg": asdict(cfg), + "agent_cfg": asdict(agent_cfg), + } + if len(cfg_group) > 1: + result_entry["merged_env_cfgs"] = [asdict(group_cfg) for group_cfg in cfg_group] + prev_results.append(result_entry) with open(path, "w") as f: json.dump(prev_results, f, indent=2) diff --git a/src/vlagents/policies.py b/src/vlagents/policies.py index 36202cd..932f31e 100644 --- a/src/vlagents/policies.py +++ b/src/vlagents/policies.py @@ -34,6 +34,9 @@ class Obs: cameras: dict[str, np.ndarray | SharedMemoryPayload | str] = field(default_factory=dict) camera_data_type: str = CameraDataType.RAW gripper: float | None = None + # TODO: add context about what the state means, and its dimensions + # theoratically it would be joints, xyzrpy and absolute or relative + state: np.ndarray | None = None info: dict[str, Any] = field(default_factory=dict) @@ -139,6 +142,134 @@ def reset(self, obs: Obs, instruction: Any, **kwargs) -> dict[str, Any]: return info +class LeRobotPolicy(Agent): + + def __init__( + self, + policy_name: str = "pi05", + default_checkpoint_path: str = "lerobot/pi05_base", + device: str = "cuda:0", + n_action_steps: int = 30, + temporal_ensemble_coeff: float | None = None, + rename_map: dict[str, str] | None = None, + **kwargs, + ) -> None: + super().__init__(default_checkpoint_path=default_checkpoint_path, **kwargs) + + self.policy_name = policy_name + self.device = device + self.n_action_steps = n_action_steps + self.temporal_ensemble_coeff = temporal_ensemble_coeff + checkpoint_path = self.checkpoint_path or self.default_checkpoint_path + if self.checkpoint_step is not None: + checkpoint_path = checkpoint_path.format(checkpoint_step=self.checkpoint_step) + self.path = checkpoint_path + + if rename_map is not None: + self.rename_map = rename_map + else: + self.rename_map = {} + + # self.rename_map = { + # "head": "image", + # "left_wrist": "image2", + # "right_wrist": "image3", + # } + + def initialize(self): + from collections import deque + + import torch + from lerobot.policies.factory import get_policy_class, make_pre_post_processors + from torchvision.transforms import v2 + + # from vlagents import train_xvla + + self.policy = get_policy_class(self.policy_name).from_pretrained(self.path) + self.policy.config.n_action_steps = self.n_action_steps + + if self.policy_name == "act": + from lerobot.policies.act.modeling_act import ACTTemporalEnsembler + + if self.temporal_ensemble_coeff is not None: + self.policy.config.temporal_ensemble_coeff = self.temporal_ensemble_coeff + self.policy.temporal_ensembler = ACTTemporalEnsembler( + self.temporal_ensemble_coeff, + self.policy.config.chunk_size, + ) + elif hasattr(self.policy, "temporal_ensembler"): + delattr(self.policy, "temporal_ensembler") + + if self.policy.config.temporal_ensemble_coeff is None: + self.policy._action_queue = deque([], maxlen=self.policy.config.n_action_steps) + + self._expected_image_shapes = { + key.removeprefix("observation.images."): tuple(feature.shape) + for key, feature in self.policy.config.input_features.items() + if key.startswith("observation.images.") + } + self._camera_transforms = { + key: v2.Compose( + [ + v2.ToImage(), + v2.Resize((height, width)), + v2.ToDtype(torch.float32, scale=True), + v2.ToPureTensor(), + ] + ) + for key, (_, height, width) in self._expected_image_shapes.items() + } + # self.policy.config.device = self.device + self.policy.to(self.device) + self.policy.eval() + + preprocessor_overrides = { + "device_processor": {"device": self.device}, + # "rename_observations_processor": {"rename_map": self.rename_map}, + } + + self.preprocessor, self.postprocessor = make_pre_post_processors( + policy_cfg=self.policy.config, + pretrained_path=self.path, + preprocessor_overrides=preprocessor_overrides, + ) + + def act(self, obs: Obs) -> Act: + import torch + + super().act(obs) + + observation = { + "observation.state": torch.as_tensor(np.array(obs.state, copy=True)).to(torch.float32), + "task": self.instruction, + } + + for key, img_data in obs.cameras.items(): + expected_shape = self._expected_image_shapes.get(self.rename_map.get(key, key)) + assert expected_shape is not None + observation[f"observation.images.{self.rename_map.get(key, key)}"] = self._camera_transforms[ + self.rename_map.get(key, key) + ](np.array(img_data, copy=True)) + + observation = self.preprocessor(observation) + + with torch.inference_mode(): + action = self.policy.select_action(observation) + # action = self.policy.predict_action_chunk(observation) + action = self.postprocessor(action) + + if isinstance(action, torch.Tensor): + action = action.detach().float().cpu().numpy() + + action = np.squeeze(action, axis=0) + return Act(action=np.asarray(action, dtype=np.float32)) + + def reset(self, obs: Obs, instruction: Any, **kwargs) -> dict[str, Any]: + info = super().reset(obs, instruction, **kwargs) + self.policy.reset() + return info + + class VjepaAC(Agent): def __init__( @@ -241,12 +372,12 @@ def act(self, obs: Obs) -> Act: import torch from torchvision.io import decode_jpeg + super().act(obs) + with torch.no_grad(): # read from camera-stream - side = base64.urlsafe_b64decode(obs.cameras["rgb_side"]) - side = torch.frombuffer(bytearray(side), dtype=torch.uint8) - side = decode_jpeg(side) + side = obs.cameras["rgb_side"] # [3, 720, 1280] -> [1, 720, 1280, 3] i.e, [T, C, Patches, dim] side = torch.permute(side, (1, 2, 0)).unsqueeze(0) @@ -686,6 +817,7 @@ def act(self, obs: Obs) -> Act: AGENTS = dict( test=TestAgent, octo=OctoModel, + lerobot=LeRobotPolicy, openvla=OpenVLAModel, octodist=OctoActionDistribution, openvladist=OpenVLADistribution, diff --git a/src/vlagents/train_xvla.py b/src/vlagents/train_xvla.py new file mode 100644 index 0000000..53425bd --- /dev/null +++ b/src/vlagents/train_xvla.py @@ -0,0 +1,53 @@ +import torch +import torch.nn as nn +from lerobot.policies.xvla.action_hub import BaseActionSpace, register_action + +XVLA_DOMAIN_ID = 20 + + +@register_action("frankaduo") +class FrankaDuoActionSpace(BaseActionSpace): + """Custom action space for dual Franka setup.""" + + dim_action = 20 + + # Use lists for safe PyTorch advanced indexing + gripper_idx = (7, 15) + # All indices EXCEPT 7 and 15 + joint_idx = [0, 1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14] + + def __init__(self): + super().__init__() + self.mse = nn.MSELoss() + self.bce = nn.BCEWithLogitsLoss() + + def compute_loss(self, pred, target): + """Define your loss computation.""" + # Corrected: Now computes MSE for BOTH Robot 1 and Robot 2 joints + joints_loss = self.mse(pred[..., self.joint_idx], target[..., self.joint_idx]) + + # Computes BCE for both grippers + gripper_loss = self.bce(pred[..., self.gripper_idx], target[..., self.gripper_idx]) + + return { + "joints_loss": joints_loss, + "gripper_loss": gripper_loss, + } + + def preprocess(self, proprio, action, mode="train"): + """Preprocess actions before training.""" + proprio_m = proprio.clone() + action_m = action.clone() if action is not None else None + + # Zero out both grippers + proprio_m[..., self.gripper_idx] = 0.0 + if action_m is not None: + action_m[..., self.gripper_idx] = 0.0 + + return proprio_m, action_m + + def postprocess(self, action): + """Post-process predictions for deployment.""" + # Apply sigmoid to both gripper logits + action[..., self.gripper_idx] = torch.sigmoid(action[..., self.gripper_idx]) + return action[..., :16]