-
Notifications
You must be signed in to change notification settings - Fork 268
/
Copy pathenvironment.py
205 lines (176 loc) · 7.95 KB
/
environment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import importlib
from functools import partial
from os.path import exists, dirname, abspath, join
from typing import Type, List
from pyrep import PyRep
from pyrep.objects import VisionSensor
from pyrep.robots.arms.panda import Panda
from rlbench import utils
from rlbench.action_modes.action_mode import ActionMode
from rlbench.backend.const import *
from rlbench.backend.robot import Robot
from rlbench.backend.scene import Scene
from rlbench.backend.task import Task
from rlbench.const import SUPPORTED_ROBOTS
from rlbench.demo import Demo
from rlbench.observation_config import ObservationConfig
from rlbench.sim2real.domain_randomization import RandomizeEvery, \
VisualRandomizationConfig, DynamicsRandomizationConfig
from rlbench.sim2real.domain_randomization_scene import DomainRandomizationScene
from rlbench.task_environment import TaskEnvironment
DIR_PATH = dirname(abspath(__file__))
class Environment(object):
"""Each environment has a scene."""
def __init__(self,
action_mode: ActionMode,
dataset_root: str = '',
obs_config: ObservationConfig = ObservationConfig(),
headless: bool = False,
static_positions: bool = False,
robot_setup: str = 'panda',
randomize_every: RandomizeEvery = None,
frequency: int = 1,
visual_randomization_config: VisualRandomizationConfig = None,
dynamics_randomization_config: DynamicsRandomizationConfig = None,
attach_grasped_objects: bool = True,
shaped_rewards: bool = False,
arm_max_velocity: float = 1.0,
arm_max_acceleration: float = 4.0,
):
self._dataset_root = dataset_root
self._action_mode = action_mode
self._obs_config = obs_config
self._headless = headless
self._static_positions = static_positions
self._robot_setup = robot_setup.lower()
self._randomize_every = randomize_every
self._frequency = frequency
self._visual_randomization_config = visual_randomization_config
self._dynamics_randomization_config = dynamics_randomization_config
self._attach_grasped_objects = attach_grasped_objects
self._shaped_rewards = shaped_rewards
self._arm_max_velocity = arm_max_velocity
self._arm_max_acceleration = arm_max_acceleration
if robot_setup not in SUPPORTED_ROBOTS.keys():
raise ValueError('robot_configuration must be one of %s' %
str(SUPPORTED_ROBOTS.keys()))
if (randomize_every is not None and
visual_randomization_config is None and
dynamics_randomization_config is None):
raise ValueError(
'If domain randomization is enabled, must supply either '
'visual_randomization_config or dynamics_randomization_config')
self._check_dataset_structure()
self._pyrep = None
self._robot = None
self._scene = None
self._prev_task = None
def _check_dataset_structure(self):
if len(self._dataset_root) > 0 and not exists(self._dataset_root):
raise RuntimeError(
'Data set root does not exists: %s' % self._dataset_root)
def _string_to_task(self, task_name: str):
task_name = task_name.replace('.py', '')
try:
class_name = ''.join(
[w[0].upper() + w[1:] for w in task_name.split('_')])
mod = importlib.import_module("rlbench.tasks.%s" % task_name)
except Exception as e:
raise RuntimeError(
'Tried to interpret %s as a task, but failed. Only valid tasks '
'should belong in the tasks/ folder' % task_name) from e
return getattr(mod, class_name)
def launch(self):
if self._pyrep is not None:
raise RuntimeError('Already called launch!')
self._pyrep = PyRep()
self._pyrep.launch(join(DIR_PATH, TTT_FILE), headless=self._headless)
arm_class, gripper_class, _ = SUPPORTED_ROBOTS[
self._robot_setup]
arm_class = partial(
arm_class,
max_velocity=self._arm_max_velocity,
max_acceleration=self._arm_max_acceleration)
# We assume the panda is already loaded in the scene.
if self._robot_setup != 'panda':
# Remove the panda from the scene
panda_arm = Panda()
panda_pos = panda_arm.get_position()
panda_arm.remove()
arm_path = join(DIR_PATH, 'robot_ttms', self._robot_setup + '.ttm')
self._pyrep.import_model(arm_path)
arm, gripper = arm_class(), gripper_class()
arm.set_position(panda_pos)
else:
arm, gripper = arm_class(), gripper_class()
self._robot = Robot(arm, gripper)
if self._randomize_every is None:
self._scene = Scene(
self._pyrep, self._robot, self._obs_config, self._robot_setup)
else:
self._scene = DomainRandomizationScene(
self._pyrep, self._robot, self._obs_config, self._robot_setup,
self._randomize_every, self._frequency,
self._visual_randomization_config,
self._dynamics_randomization_config)
self._action_mode.arm_action_mode.set_control_mode(self._robot)
def shutdown(self):
if self._pyrep is not None:
self._pyrep.shutdown()
self._pyrep = None
def get_task(self, task_class: Type[Task]) -> TaskEnvironment:
# If user hasn't called launch, implicitly call it.
if self._pyrep is None:
self.launch()
self._scene.unload()
task = task_class(self._pyrep, self._robot)
self._prev_task = task
return TaskEnvironment(
self._pyrep, self._robot, self._scene, task,
self._action_mode, self._dataset_root, self._obs_config,
self._static_positions, self._attach_grasped_objects,
self._shaped_rewards)
@property
def action_shape(self):
return self._action_mode.action_shape(self._scene),
def get_demos(self, task_name: str, amount: int,
variation_number=0,
image_paths=False,
random_selection: bool = True,
from_episode_number: int = 0) -> List[Demo]:
if self._dataset_root is None or len(self._dataset_root) == 0:
raise RuntimeError(
"Can't ask for a stored demo when no dataset root provided.")
demos = utils.get_stored_demos(
amount, image_paths, self._dataset_root, variation_number,
task_name, self._obs_config, random_selection, from_episode_number)
return demos
def get_scene_data(self) -> dict:
"""Get the data of various scene/camera information.
This temporarily starts the simulator in headless mode.
:return: A dictionary containing scene data.
"""
def _get_cam_info(cam: VisionSensor):
if not cam.still_exists():
return None
intrinsics = cam.get_intrinsic_matrix()
return dict(
intrinsics=intrinsics,
near_plane=cam.get_near_clipping_plane(),
far_plane=cam.get_far_clipping_plane(),
extrinsics=cam.get_matrix())
headless = self._headless
self._headless = True
self.launch()
d = dict(
left_shoulder_camera=_get_cam_info(
self._scene._cam_over_shoulder_left),
right_shoulder_camera=_get_cam_info(
self._scene._cam_over_shoulder_right),
front_camera=_get_cam_info(self._scene._cam_front),
wrist_camera=_get_cam_info(self._scene._cam_wrist),
overhead_camera=_get_cam_info(self._scene._cam_overhead)
)
self.shutdown()
self._headless = headless
return d