-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrlbench.py
75 lines (63 loc) · 2.72 KB
/
rlbench.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import threading
import numpy as np
from pyrep.const import RenderMode
from rlpyt.envs.base import Env, EnvStep
from rlpyt.spaces.int_box import IntBox
from rlpyt.spaces.float_box import FloatBox
from rlbench.environment import Environment
from rlbench.action_modes import ArmActionMode, ActionMode
from rlbench.observation_config import ObservationConfig, CameraConfig
from rlbench.tasks import FastSingle2xtarget
from rlbench.tasks import SmallTargetReachNoDistractors
from rlbench.tasks import LargeTargetNoDistractorsDt200
from dreamer.envs.env import EnvInfo
class RLBench(Env):
def __init__(self, config):
self.config = config
self._env, self._task = self._initialize()
def _initialize(self):
self.obs_sz = (64, 64)
cam_config = CameraConfig(image_size=self.obs_sz, render_mode=RenderMode.OPENGL3)
obs_config = ObservationConfig(wrist_camera=cam_config)
obs_config.left_shoulder_camera.set_all(False)
obs_config.right_shoulder_camera.set_all(False)
obs_config.overhead_camera.set_all(False)
obs_config.wrist_camera.set_all(True)
obs_config.front_camera.set_all(False) # note: TODO: test whether shoulder camera works better
# print("\nOBS1", vars(obs_config))
# for ok, ov in vars(obs_config).items():
# if "camera" in ok and "matrix" not in ok:
# print(ok, vars(ov))
action_mode = self.config.get("action_mode",
ActionMode(ArmActionMode.ABS_JOINT_VELOCITY))
headless = self.config.get("headless", False)
env = Environment(action_mode, obs_config=obs_config, headless=headless)
env.launch()
task = env.get_task(self.config.get("task", FastSingle2xtarget))
return env, task
@property
def observation_space(self):
return IntBox(low=0, high=255, shape=(3,) + self.config.get("size", self.obs_sz),
dtype="uint8")
@property
def action_space(self):
return FloatBox(low=-1.0,
high=1.0,
shape=(self._env.action_size,))
def step(self, action):
obs, reward, done = self._task.step(action)
# remember to change the camera here as well
obs = np.transpose(obs.wrist_rgb, (2, 0, 1))
info = EnvInfo(None, None, done)
return EnvStep(obs, reward, done, info)
def reset(self):
descriptions, obs = self._task.reset()
# obs = np.transpose(obs.front_rgb, (2, 0, 1))
obs = np.transpose(obs.wrist_rgb, (2, 0, 1))
del descriptions # Not used.
return obs
def render(self, *args, **kwargs):
pass
@property
def horizon(self):
raise NotImplementedError