"""
The :class:`World` class hosts all Things in a Simulation. It handles the access to Mujoco by implementing a
method, that converts the Constructed World to an xml string and a method used to launch a Mujoco viewer, that
is especially useful during model construction for debugging. You can add and remove Things from the World with
the same methods as in :class:`NodeThing <blueprints.thing.node.NodeThing>` i.e. :meth:`attach <blueprints.thing.node.NodeThing.attach>`
and :meth:`detach <blueprints.thing.node.NodeThing.detach>`.
.. code-block:: python
:caption: World attachment/detachment
>>> world = blue.World()
>>> teapot = blue.geoms.Mesh(filename='utahteapot.stl', pos=[0, 0, 1])
>>> red_teapot = teapot.copy(color='red')
>>> green_teapot = teapot.copy(color='green')
>>> world.attach(red_teapot, green_teapot, copy=False)
>>> world.geoms.color
[Red[#FF0000], Green[#00FF00]]
>>> world.detach(red_teapot)
>>> world.geoms.color
[Green[#00FF00]]
The :meth:`view <World.view>` method can be used. The perspective of the default camera is set such that all
Things attached to the :class:`World` are in view.
.. code-block:: python
:caption: World view
>>> world.view()
Things that are attached with the ``background`` flag argument set to ``True`` will not be used to center the
perspective of the camera.
.. code-block:: python
:caption: World background
>>> plane = blue.geoms.Plane()
>>> world.attach(plane, background=True)
>>> world.view()
"""
import xml.etree.ElementTree as xml
import numpy as np
import os
import sys
import time
import tqdm
import imageio
import mujoco
import mujoco.viewer
from mujoco.glfw import glfw
import blueprints as blue
from collections import defaultdict
from itertools import chain
[docs]
class World(blue.WorldType, blue.thing.NodeThing):
r"""
This class is available through the shortcut :class:`blueprints.World <World>`.
Most attribute descriptions are partially taken from `Mujoco <https://mujoco.readthedocs.io/en/latest/XMLreference.html#mjcf-reference>`__.
Attributes
----------
angle : str
Either ``'radian'`` or ``'degree'`` (avoid if possible).
autolimits : bool
Indicating whether limits are used or not.
contact : bool
Indicating whether collisions and contacts are ignored in the simulations physics.
gravity : bool
Indicating whether gravity exists.
integrator : str
The name of the integrator use, see :attr:`integrator` for a detailed description.
viscosity : float
The viscosity of the Worlds Things.
"""
[docs]
@blue.restrict
def __init__(self,
name: str = 'anonymous_model',
autolimits: bool = True,
viscosity: float|int|None = 0.,
timestep: float|int|None = 0.002,
integrator: str = 'implicit',
cone: str = 'pyramidal',
gravity: bool = True,
contact: bool = True,
angle: str = 'radian',
texture: blue.SkyboxTextureType|None = None,
**kwargs):
r"""
Parameters
----------
name : str, optional
The user specified name for the Actuator. In the case of a naming conflict the name will
be altered by an enumeration scheme.
autolimits : bool, optional
This attribute affects the behavior of attributes such as ``'limited'`` (on <body-joint>
``'forcelimited'``, ``'ctrllimited'``, and ``'actlimited'`` (on <actuator>). If ``True``,
these attributes are unnecessary and their value will be inferred from the presence of their
corresponding ``'range'`` attribute. If ``False``, no such inference will happen: For a Joint
to be limited, both limited=``True`` and range=``min max`` must be specified. In this mode,
it is an error to specify a range without a limit.
timestep : float | int | None, optional
Simulation time step in seconds. This is the single most important parameter affecting
the speed-accuracy trade-off which is inherent in every physics simulation. Smaller values
result in better accuracy and stability. To achieve real-time performance, the time step
must be larger than the CPU time per step (or 4 times larger when using the RK4 integrator).
The CPU time is measured with internal timers. It should be monitored when adjusting the
time step.
viscosity : float | int | None, optional
Viscosity of the medium. This parameter is used to simulate viscous forces, which scale
linearly with velocity. In SI units the viscosity of air is around 0.00002 while the
viscosity of water is around 0.0009 depending on temperature. Setting viscosity to 0
disables viscous forces. Note that the default Euler integrator handles damping in the
:class:`Joints <blueprints.joint.BaseJoint>` implicitly – which improves stability and
accuracy. It does not presently do this with body viscosity. Therefore, if the goal is
merely to create a damped simulation (as opposed to modelling the specific effects of
viscosity), we recommend using joint damping rather than body viscosity, or switching to
the implicit or implicitfast :attr:`integrator`.
cone : str, optional
The type of contact friction cone. Elliptic cones are a better model of the physical reality, but pyramidal cones sometimes make the solver faster and more robust.
integrator : str, optional
This attribute selects the numerical integrator to be used. Currently the available
integrators are the semi-implicit Euler method ``'Euler'``, the fixed-step 4-th order
Runge Kutta method ``'RK4'``, the Implicit-in-velocity Euler method ``'implicit'``, and
``'implicitfast'``, which drops the Coriolis and centrifugal terms.
gravity : bool, optional
A flag indicating whether gravity exists. Be default the `gravitational constant <https://en.wikipedia.org/wiki/Gravitational_constant>`__.
is set to :math:`6.674 \times 10^{-11} N \frac{m^2}{kg^2}`.
contact : bool, optional
If ``False``, collisions and contacts are ignored in the simulations physics.
angle : str, optional
The angle measure either ``'radian'`` or ``'degree'``.
.. warning::
At the moment 'degree' is not supported for any manipulations. It is recommended to
only use 'degree' if no orientation manipulation is performed on the model! To convert
from degrees to radians use the multiplier :attr:`DEGREES_TO_RADIANS <blueprints.utils.geometry.DEGREES_TO_RADIANS>`
**kwargs
Keyword arguments are passed to ``super().__init__``.
"""
self.angle = angle
self.gravity = gravity
self.timestep = float(timestep)
self.viscosity = float(viscosity)
self.integrator = integrator
self.cone = cone
self.contact = contact
self.autolimits = autolimits
self.texture = texture
self._agents = []
self._geoms = []
self._bodies = []
self._sites = []
self._lights = []
self._cameras = []
self._background = []
self._CHILDREN = {'lights': {'type': blue.LightType,
'children': self._lights},
'geoms': {'type': blue.GeomType,
'children': self._geoms},
'sites': {'type': blue.SiteType,
'children': self._sites},
'cameras': {'type': blue.CameraType,
'children': self._cameras},
'agents': {'type': blue.AgentType,
'children': self._agents},
'bodies': {'type': blue.BodyType,
'children': self._bodies}}
self._name_manager = blue.naming.NameManager(self)
self._built = False
self._viewer = None
self._recordings = dict()
super().__init__(name=name,
**kwargs)
# KINEMATIC TREE PROPERTIES/METHODS
[docs]
@blue.restrict
def attach(self,
*items: list[blue.ThingType|blue.LatticeType|blue.ViewType],
copy: bool = False,
background: bool = False) -> None:
"""
This method attaches *items to the parent Thing. If copy is set true, a copy of the items is created.
If copy is set to false, the kinematic graph might no longer be a tree resulting in infinite loops
or naming conflicts if the same item is included in the tree twice. All items of a specific type
can be accessed via the types attribute.
Parameters
----------
*items : list[blue.ThingType]
All items that are to be attached.
copy : bool, optional
A flag indicating whether a copy of the items should be attached.
background : bool, optional
A flag indicating whether the attached Things belong to the Worlds background.
Raises
------
TypeError
If an item is not a valid child an error is raised.
"""
views = list(chain( *filter(lambda x: isinstance(x, blue.ViewType), items)))
lattice = list(chain( *filter(lambda x: isinstance(x, blue.LatticeType), items)))
items = views + lattice + list( filter(lambda x: not isinstance(x, (blue.ViewType, blue.LatticeType)), items))
if copy:
items = [item.copy() for item in items]
super().attach(*items, copy=False)
if background:
self._background.extend(items)
[docs]
@blue.restrict
def detach(self, *items: list[blue.ThingType]) -> None:
"""
This method is used to detach Things from the kinematic tree. They are afterwards
no longer present in this Worlds children attributes and their ``parent`` attribute
will be set to ``None``.
Parameters
----------
*items
Things that are no longer children of this World can be passed, as long as their
type is a valid type for children of :class:`World`.
Raises
------
TypeError
If the arguments to this function are not valid children types an error is raised.
"""
super().detach(*items)
for item in items:
if item in self._background:
self._background.remove(item)
# XML PROPERTIES/METHODS
[docs]
@blue.restrict
def build(self) -> None:
"""
This method constructs the xml structure of the kinematic tree. It is called before the World
is converted to an xml string if any modification to it was done since the last build. It is
not necessary for a user to call this method since it is implicitly called every time the build
needs to be updated.
"""
if self._built:
return
# REGISTERING NAMES
self._name_manager.register()
# BUILD DIRECTORY
pathname = os.path.dirname(sys.argv[0])
self._path = os.path.abspath(pathname)
# BUILD SIZE AND CENTER
min_pos, max_pos = self._location_range
self._center = (min_pos + max_pos)/2
self._size = np.max(max_pos - min_pos)/2
# BUILD TOP LEVEL XML
self._xml_root = xml.Element('mujoco', model=self.name)
self._xml_compiler = xml.SubElement(self._xml_root,
'compiler',
angle=self.angle,
autolimits=str(self.autolimits).lower(),
#inertiafromgeom='true',
coordinate='local')
option = xml.SubElement(self._xml_root,
'option',
timestep=str(self.timestep),
viscosity=str(self.viscosity),
integrator=self.integrator,
cone=self.cone)
#statistic = xml.SubElement(self._xml_root,
# 'statistic',
# meansize=".05"
# )
flag = xml.SubElement(option,
'flag',
gravity='disable' if not self.gravity else 'enable',
contact='disable' if not self.contact else 'enable')
self._xml_asset = xml.SubElement(self._xml_root, 'asset')
self._xml_sensor = xml.SubElement(self._xml_root, 'sensor')
self._xml_actuator = xml.SubElement(self._xml_root, 'actuator')
self._xml_tendon = xml.SubElement(self._xml_root, 'tendon')
if self.all.cameras:
max_width, max_height = np.max(self.all.cameras.resolution, axis=0)
else:
max_width, max_height = 480, 480
self._gl_context = mujoco.GLContext(max_height, max_width)
self._xml_visual = xml.SubElement(self._xml_root, 'visual')
self._xml_global = xml.SubElement(self._xml_visual, 'global', offwidth=str(max_width), offheight=str(max_height))
# BUILD WORLDBODY XML
indicies = {'body': 1,
'geom': 0,
'jnt': 0,
'light': 0,
'mesh': 0,
'site': 0,
'sensors': 0,
'activation': 0,
'force': 0,
'hfield': 0}
self._xml_worldbody = xml.SubElement(self._xml_root, 'worldbody')
self._build_children(parent=self._xml_worldbody,
world=self,
indicies=indicies)
if self.texture is not None:
self.texture._build(self, self, indicies)
# CLEAN UP EMPTY TOP LEVEL CONTAINERS
if len(self._xml_asset) == 0:
assets = self._xml_root.find('asset')
self._xml_root.remove(assets)
if len(self._xml_sensor) == 0:
sensors = self._xml_root.find('sensor')
self._xml_root.remove(sensors)
if len(self._xml_actuator) == 0:
actuators = self._xml_root.find('actuator')
self._xml_root.remove(actuators)
# SET INDENTS
xml.indent(self._xml_root, '\t', 0)
self._built = True
self.reset()
[docs]
def reset(self):
"""
This method resets the Mujoco simulation data and initializes a single first step
to make all runtime data accessable.
"""
if self._built:
xml_string = self.to_xml_string()
self._mj_model = mujoco.MjModel.from_xml_string(xml_string)
self._mj_data = mujoco.MjData(self._mj_model)
mujoco.mj_forward(self._mj_model, self._mj_data)
else:
raise Exception('World can only be reset after it has been built.')
[docs]
@blue.restrict
def unbuild(self) -> None:
"""
Unbuilds the World to anable alterations. This discards the current Mujoco simulation.
"""
if self._built:
self._built = False
for tendon in self.descendants['tendons']['descendants']:
tendon._built = False
# Reset _built on material/texture assets so they
# re-emit their XML elements on the next build.
for thing in self.all:
asset = getattr(thing, 'asset', None)
if asset is not None and hasattr(asset, '_built'):
asset._built = False
tex = getattr(asset, 'texture', None)
if tex is not None:
tex_asset = getattr(tex, 'asset', None)
if tex_asset is not None and hasattr(tex_asset, '_built'):
tex_asset._built = False
if self.texture is not None:
self.texture.asset._built = False
self._name_manager.unregister()
self._gl_context.free()
del self._mj_model, self._mj_data, self._gl_context
self.stop_recording()
[docs]
@blue.restrict
def view(self, callback=lambda: None) -> None:
"""
Launches an interactive window of the Mujoco simulation.
Parameters
----------
callback : function
The callback function is called after each timestep. The user can use this
function to update World properties during the simulation.
"""
self.build()
xml_string = self.to_xml_string()
# xml_string = """
#<mujoco model="humanoid">
# <compiler angle="degree" inertiafromgeom="true"/>
# <default>
# <joint armature="1" damping="1" limited="true"/>
# <geom conaffinity="1" condim="1" contype="1" margin="0.001" material="geom" rgba="0.8 0.6 .4 1"/>
# <motor ctrllimited="true" ctrlrange="-.4 .4"/>
# </default>
# <option integrator="RK4" iterations="50" solver="PGS" timestep="0.003">
# <!-- <flags solverstat="enable" energy="enable"/>-->
# </option>
# <size nkey="5" nuser_geom="1"/>
# <visual>
# <map fogend="5" fogstart="3"/>
# </visual>
# <asset>
# <texture builtin="gradient" height="100" rgb1=".4 .5 .6" rgb2="0 0 0" type="skybox" width="100"/>
# <!-- <texture builtin="gradient" height="100" rgb1="1 1 1" rgb2="0 0 0" type="skybox" width="100"/>-->
# <texture builtin="flat" height="1278" mark="cross" markrgb="1 1 1" name="texgeom" random="0.01" rgb1="0.8 0.6 0.4" rgb2="0.8 0.6 0.4" type="cube" width="127"/>
# <texture builtin="checker" height="100" name="texplane" rgb1="0 0 0" rgb2="0.8 0.8 0.8" type="2d" width="100"/>
# <material name="MatPlane" reflectance="0.5" shininess="1" specular="1" texrepeat="60 60" texture="texplane"/>
# <material name="geom" texture="texgeom" texuniform="true"/>
# </asset>
# <worldbody>
# <light cutoff="100" diffuse="1 1 1" dir="-0 0 -1.3" directional="true" exponent="1" pos="0 0 1.3" specular=".1 .1 .1"/>
# <geom condim="3" friction="1 .1 .1" material="MatPlane" name="floor" pos="0 0 0" rgba="0.8 0.9 0.8 1" size="20 20 0.125" type="plane"/>
# <!-- <geom condim="3" material="MatPlane" name="floor" pos="0 0 0" size="10 10 0.125" type="plane"/>-->
# <body name="torso" pos="0 0 1.4">
# <camera name="track" mode="trackcom" pos="0 -4 0" xyaxes="1 0 0 0 0 1"/>
# <joint armature="0" damping="0" limited="false" name="root" pos="0 0 0" stiffness="0" type="free"/>
# <geom fromto="0 -.07 0 0 .07 0" name="torso1" size="0.07" type="capsule"/>
# <geom name="head" pos="0 0 .19" size=".09" type="sphere" user="258"/>
# <geom fromto="-.01 -.06 -.12 -.01 .06 -.12" name="uwaist" size="0.06" type="capsule"/>
# <body name="lwaist" pos="-.01 0 -0.260" quat="1.000 0 -0.002 0">
# <geom fromto="0 -.06 0 0 .06 0" name="lwaist" size="0.06" type="capsule"/>
# <joint armature="0.02" axis="0 0 1" damping="5" name="abdomen_z" pos="0 0 0.065" range="-45 45" stiffness="20" type="hinge"/>
# <joint armature="0.02" axis="0 1 0" damping="5" name="abdomen_y" pos="0 0 0.065" range="-75 30" stiffness="10" type="hinge"/>
# <body name="pelvis" pos="0 0 -0.165" quat="1.000 0 -0.002 0">
# <joint armature="0.02" axis="1 0 0" damping="5" name="abdomen_x" pos="0 0 0.1" range="-35 35" stiffness="10" type="hinge"/>
# <geom fromto="-.02 -.07 0 -.02 .07 0" name="butt" size="0.09" type="capsule"/>
# <body name="right_thigh" pos="0 -0.1 -0.04">
# <joint armature="0.01" axis="1 0 0" damping="5" name="right_hip_x" pos="0 0 0" range="-25 5" stiffness="10" type="hinge"/>
# <joint armature="0.01" axis="0 0 1" damping="5" name="right_hip_z" pos="0 0 0" range="-60 35" stiffness="10" type="hinge"/>
# <joint armature="0.0080" axis="0 1 0" damping="5" name="right_hip_y" pos="0 0 0" range="-110 20" stiffness="20" type="hinge"/>
# <geom fromto="0 0 0 0 0.01 -.34" name="right_thigh1" size="0.06" type="capsule"/>
# <body name="right_shin" pos="0 0.01 -0.403">
# <joint armature="0.0060" axis="0 -1 0" name="right_knee" pos="0 0 .02" range="-160 -2" type="hinge"/>
# <geom fromto="0 0 0 0 0 -.3" name="right_shin1" size="0.049" type="capsule"/>
# <body name="right_foot" pos="0 0 -0.45">
# <geom name="right_foot" pos="0 0 0.1" size="0.075" type="sphere" user="0"/>
# </body>
# </body>
# </body>
# <body name="left_thigh" pos="0 0.1 -0.04">
# <joint armature="0.01" axis="-1 0 0" damping="5" name="left_hip_x" pos="0 0 0" range="-25 5" stiffness="10" type="hinge"/>
# <joint armature="0.01" axis="0 0 -1" damping="5" name="left_hip_z" pos="0 0 0" range="-60 35" stiffness="10" type="hinge"/>
# <joint armature="0.01" axis="0 1 0" damping="5" name="left_hip_y" pos="0 0 0" range="-110 20" stiffness="20" type="hinge"/>
# <geom fromto="0 0 0 0 -0.01 -.34" name="left_thigh1" size="0.06" type="capsule"/>
# <body name="left_shin" pos="0 -0.01 -0.403">
# <joint armature="0.0060" axis="0 -1 0" name="left_knee" pos="0 0 .02" range="-160 -2" stiffness="1" type="hinge"/>
# <geom fromto="0 0 0 0 0 -.3" name="left_shin1" size="0.049" type="capsule"/>
# <body name="left_foot" pos="0 0 -0.45">
# <geom name="left_foot" type="sphere" size="0.075" pos="0 0 0.1" user="0" />
# </body>
# </body>
# </body>
# </body>
# </body>
# <body name="right_upper_arm" pos="0 -0.17 0.06">
# <joint armature="0.0068" axis="2 1 1" name="right_shoulder1" pos="0 0 0" range="-85 60" stiffness="1" type="hinge"/>
# <joint armature="0.0051" axis="0 -1 1" name="right_shoulder2" pos="0 0 0" range="-85 60" stiffness="1" type="hinge"/>
# <geom fromto="0 0 0 .16 -.16 -.16" name="right_uarm1" size="0.04 0.16" type="capsule"/>
# <body name="right_lower_arm" pos=".18 -.18 -.18">
# <joint armature="0.0028" axis="0 -1 1" name="right_elbow" pos="0 0 0" range="-90 50" stiffness="0" type="hinge"/>
# <geom fromto="0.01 0.01 0.01 .17 .17 .17" name="right_larm" size="0.031" type="capsule"/>
# <geom name="right_hand" pos=".18 .18 .18" size="0.04" type="sphere"/>
# <camera pos="0 0 0"/>
# </body>
# </body>
# <body name="left_upper_arm" pos="0 0.17 0.06">
# <joint armature="0.0068" axis="2 -1 1" name="left_shoulder1" pos="0 0 0" range="-60 85" stiffness="1" type="hinge"/>
# <joint armature="0.0051" axis="0 1 1" name="left_shoulder2" pos="0 0 0" range="-60 85" stiffness="1" type="hinge"/>
# <geom fromto="0 0 0 .16 .16 -.16" name="left_uarm1" size="0.04 0.16" type="capsule"/>
# <body name="left_lower_arm" pos=".18 .18 -.18">
# <joint armature="0.0028" axis="0 -1 -1" name="left_elbow" pos="0 0 0" range="-90 50" stiffness="0" type="hinge"/>
# <geom fromto="0.01 -0.01 0.01 .17 -.17 .17" name="left_larm" size="0.031" type="capsule"/>
# <geom name="left_hand" pos=".18 -.18 .18" size="0.04" type="sphere"/>
# </body>
# </body>
# </body>
# </worldbody>
# <tendon>
# <fixed name="left_hipknee">
# <joint coef="-1" joint="left_hip_y"/>
# <joint coef="1" joint="left_knee"/>
# </fixed>
# <fixed name="right_hipknee">
# <joint coef="-1" joint="right_hip_y"/>
# <joint coef="1" joint="right_knee"/>
# </fixed>
# </tendon>
#
# <actuator>
# <motor gear="100" joint="abdomen_y" name="abdomen_y"/>
# <motor gear="100" joint="abdomen_z" name="abdomen_z"/>
# <motor gear="100" joint="abdomen_x" name="abdomen_x"/>
# <motor gear="100" joint="right_hip_x" name="right_hip_x"/>
# <motor gear="100" joint="right_hip_z" name="right_hip_z"/>
# <motor gear="300" joint="right_hip_y" name="right_hip_y"/>
# <motor gear="200" joint="right_knee" name="right_knee"/>
# <motor gear="100" joint="left_hip_x" name="left_hip_x"/>
# <motor gear="100" joint="left_hip_z" name="left_hip_z"/>
# <motor gear="300" joint="left_hip_y" name="left_hip_y"/>
# <motor gear="200" joint="left_knee" name="left_knee"/>
# <motor gear="25" joint="right_shoulder1" name="right_shoulder1"/>
# <motor gear="25" joint="right_shoulder2" name="right_shoulder2"/>
# <motor gear="25" joint="right_elbow" name="right_elbow"/>
# <motor gear="25" joint="left_shoulder1" name="left_shoulder1"/>
# <motor gear="25" joint="left_shoulder2" name="left_shoulder2"/>
# <motor gear="25" joint="left_elbow" name="left_elbow"/>
# </actuator>
#</mujoco>
#"""
# self._mj_model = mujoco.MjModel.from_xml_string(xml_string)
# self._mj_data = mujoco.MjData(self._mj_model)
# mujoco.mj_forward(self._mj_model, self._mj_data)
print(xml_string)
glfw.init()
with mujoco.viewer.launch_passive(self._mj_model, self._mj_data, show_left_ui=True, show_right_ui=True) as viewer:
self._viewer = viewer
## TIME
#start_time = time.time()
# DEFAULT CAMERA POSITIONING
viewer.cam.distance = self.size * 4
viewer.cam.lookat = self.center
desc = self.descendants['cameras']['descendants']
while viewer.is_running():
mujoco.mj_step(self._mj_model, self._mj_data)
viewer.sync()
# CALLBACK
callback()
#print(viewer.update_hfield.)
#print(*sorted(viewer.__dir__()), sep='\n')
# TIME SYNCH
#model_time = self._mj_data.time
#run_time = time.time() - start_time
#delta = max(0, model_time - run_time)
#time.sleep(delta)
self._viewer = None
[docs]
@blue.restrict
def step(self,
n_steps: int|None = 1,
seconds: int|float|None = None,
pbar: bool = False) -> None:
"""
Performs timesteps in the simmulation. Exactly one argument must be given by the user.
All cameras that are recording are taking images during the simulation at their respective
times.
Parameters
----------
n_steps : int | None
The number of steps to be taken
seconds : int | float | None
The number of seconds to be simulated
"""
#if not hasattr(self, '_mj_model') and not hasattr(self, '_mj_data'):
if not self._built:
raise Exception('The World must first be build before simulation steps can be performed. Use World.build()!')
if n_steps is None and seconds is None:
raise ValueError('Precisely one argument seconds or n_steps must be set, but both were None.')
if seconds is not None:
n_steps = int(seconds/self.timestep)
if n_steps == 0:
raise ValueError(f'With World.timestep={self.timestep} and seconds={seconds} the number of simulation steps is 0. Please decrease the timestep or increase the duration (seconds).')
if pbar:
iterator = tqdm.tqdm(range(n_steps))
else:
iterator = range(n_steps)
for n in iterator:
mujoco.mj_step(self._mj_model, self._mj_data)
time = self._mj_data.time
for camera, rec_dict in self._recordings.items():
fps, last, writer = rec_dict['fps'], rec_dict['last'], rec_dict['writer']
if last + 1/fps <= time:
writer.append_data(camera.observation)
rec_dict['last'] = time
#for descendants in self.descendants.values():
# for descendant in descendants['descendants']:
# descendant._clear_step_cache()
[docs]
@blue.restrict
def start_recording(self,
*cameras: list[blue.CameraType],
filename: str,
fps: int = 60,
**kwargs):
"""
This method starts recordings of the cameras passed to it. The rendering is done
by the mujoco.Renderer using OpenGL. The user should make sure, that their OpenGL
instance is properly functioning. The video files are written to the ``recordings``
directory next to the file instantiating the World. The format is infered from the
file extension in the ``filename`` using imageio. To set additional parameters for
the video formatting in imageio the user can set the ``**kwargs`` which are passed
to the imageio writer.
Example
-------
If the user would like to record a gif image that loops forever (instead of
the default case of a single loop) the user can set the argument ``loop`` to
zero, indicating an infinitely looping gif in the imageio writer:
>>> world.start_recording(camera, filename='my.gif', loop=0)
Now the camera has started recording, but there is nothing yet happening in the
simulation. Using :meth:`World.step` the user can simulate the passage of time with
the recording cameras writing images to their respective video files at correpsonding
moments (as determined by ``fps`` and :attr:`World.timestep`). Additionally the user
can apply actions in between timesteps via an :class:`Agent <blueprints.agent.Agent>`
or through :class:`Actuators <blueprints.actuators.BaseActuator>` directly.
For example, the following lines demonstrate such a simulation loop with some number
of steps in MDP space determined by ``time_steps``, some ``skip_frames``, a blueprints
:class:`Agent <blueprints.agent.Agent>` ``agent`` used to interface with the :class:`Sensors <blueprints.sensors.BaseSensor>`
and :class:`Actuators <blueprints.actuators.BaseActuator>` and an RL-Agent ``policy``
>>> for t in range(time_steps):
>>> world.step(n_steps=skip_frames)
>>> agent.force = policy(agent.observations)
Finally the recordings are stopped using :meth:`stop_recording` to save the recorded
videos to a file.
>>> world.stop_recording()
The final video of the users agent can then be found in ``'recordings/camera_my.gif'``
Parameters
----------
*cameras : list [ blue.CameraType ]
A list of :class:`Cameras <blueprints.camera.Camera>`
filename : str
The name (with a proper file extension indicating the format) of the video.
If multiple cameras are passed, multiple videos will be created with the name
of each camera prefixed to the filename.
fps : int
The frames per second
**kwargs : dict
Additional keyword arguments that are passed to imageio to write the video.
"""
# VALUE CHECKS
if not all(camera in self.all.cameras for camera in cameras):
raise ValueError(f'Not all cameras given are attached within the kinematic hierarchy of the World.')
if not self._built:
raise Exception('World must first be build before recordings can start.')
# MAKE DIRECTORIES
path = f'{self._path}/recordings'
if not os.path.exists(path):
os.mkdir(path)
for camera in cameras:
writer = imageio.get_writer(f'{path}/{camera.name}_{filename}', fps=fps, **kwargs)
self._recordings[camera] = {'fps': fps, 'writer': writer, 'last': 0.}
[docs]
@blue.restrict
def stop_recording(self,
*cameras: list[blue.CameraType]):
"""
This method stops recording for all the cameras passed. If no camera is passed, all recording cameras will be stopped.
Parameters
----------
cameras : list [ blue.CameraType ]
The cameras to be stopped.
"""
# VALUE CHECKS
if not all(camera in self.all.cameras for camera in cameras):
raise ValueError(f'Not all cameras given are attached within the kinematic hierarchy of the World.')
if not cameras:
for rec_dict in self._recordings.values():
rec_dict['writer'].close()
self._recordings = dict()
else:
for camera in cameras:
self._recordings[camera]['writer'].close()
del self._recordings[camera]
[docs]
@blue.restrict
def to_xml_string(self) -> str:
"""
If not build, this method will first build the world, retrieve the XML string and then unbuild the world again.
If the user calls this method repeatedly the world should probably be build first.
Returns
-------
str
The xml string representing the xml model used to launch Mujoco.
"""
built = self._built
if not built:
self.build()
string = xml.tostring(self._xml_root, encoding='unicode')
if not built:
self.unbuild()
return string
[docs]
@blue.restrict
@classmethod
def from_xml_string(cls, string: str) -> blue.WorldType:
"""
This method reconstructs a World from a Mujoco xml string. It currently only supports reading
models, that exclusively consist of blueprints supported features. In a future version it will
be extended to be compatible to all Mujoco models.
Parameters
----------
string : str
The xml description of the Mujoco model.
Returns
-------
blue.WorldType
The reconstructed World.
"""
init_args = dict()
caches = defaultdict(dict)
assets = defaultdict(dict)
materials = dict()
sensors = defaultdict(lambda: defaultdict(list))
actuators = defaultdict(lambda: defaultdict(list))
ref_actuators = defaultdict(lambda: defaultdict(list))
TEXTURE_CLASSES = {'2d': blue.texture.Plane,
'cube': blue.texture.Box,
'skybox': blue.texture.Skybox}
xml_tree = xml.fromstring(string)
xml_compiler = xml_tree.find('compiler')
xml_assets = xml_tree.find('asset')
xml_sensors = xml_tree.find('sensor')
xml_actuators = xml_tree.find('actuator')
xml_worldbody = xml_tree.find('worldbody')
xml_option = xml_tree.find('option')
# COLLECTING ARGS
name = xml_tree.get('model')
if name is not None:
init_args['name'] = name
if xml_compiler is not None:
angle = xml_compiler.get('angle')
if angle is not None:
init_args['angle'] = angle
autolimits = xml_compiler.get('autolimits')
if autolimits is not None:
init_args['autolimits'] = autolimits == 'true'
if xml_option is not None:
init_args['viscosity'] = float(xml_option.get('viscosity')) if xml_option.get('viscosity') is not None else 0.
init_args['integrator'] = xml_option.get('integrator')
timestep = xml_option.get('timestep')
if timestep is not None:
init_args['timestep'] = float(timestep)
cone = xml_option.get('cone')
if cone is not None:
init_args['cone'] = cone
xml_flag = xml_option.find('flag')
if xml_flag is not None:
init_args['gravity'] = xml_flag.get('gravity') == 'enable'
init_args['contact'] = xml_flag.get('contact') == 'enable'
if xml_sensors is not None:
for sensor in xml_sensors:
sensor_obj = blue.REGISTER._get_thing(sensor)
sensor_parent = sensor_obj._PARENT_TYPE
sensors[sensor_parent][sensor.get(sensor_parent)].append(sensor_obj)
if xml_assets is not None:
# XML attr name → Texture constructor param name
TEX_ATTR_MAP = {'file': 'filename', 'rgb1': 'color_1',
'rgb2': 'color_2', 'markrgb': 'color_mark',
'gridsize': 'grid_size', 'gridlayout': 'grid_layout',
'hflip': 'h_flip', 'vflip': 'v_flip',
'nchannel': 'n_channel', 'content_type': 'content',
'fileright': 'filename_right', 'fileleft': 'filename_left',
'fileup': 'filename_up', 'filedown': 'filename_down',
'filefront': 'filename_front', 'fileback': 'filename_back'}
TEX_FLOAT = {'random'}
TEX_INT = {'width', 'height', 'n_channel', 'nchannel'}
TEX_BOOL = {'hflip', 'vflip', 'h_flip', 'v_flip'}
TEX_ARRAY = {'rgb1', 'rgb2', 'markrgb', 'gridsize'}
TEX_SKIP = {'type', 'name'}
# First pass: textures and cache-based assets (mesh, hfield)
for asset in xml_assets:
asset_type = asset.tag
if asset_type == 'texture':
tex_type = asset.get('type', '2d')
tex_class = TEXTURE_CLASSES[tex_type]
tex_kwargs = {}
for k, v in asset.items():
if k in TEX_SKIP:
continue
param = TEX_ATTR_MAP.get(k, k)
if k in TEX_FLOAT:
v = float(v)
elif k in TEX_INT:
v = int(v)
elif k in TEX_BOOL:
v = v == 'true'
elif k in TEX_ARRAY:
v = [float(x) for x in v.split()]
tex_kwargs[param] = v
tex_name = asset.get('name')
tex_obj = tex_class(name=tex_name, **tex_kwargs)
tex_obj.asset._name = tex_name
assets['texture'][tex_name] = tex_obj
elif asset_type in blue.REGISTER.CACHE_THINGS:
asset_name = asset.get('name')
if asset_name in caches[asset_type]:
cache = caches[asset_type][asset_name]
else:
cache_class = blue.REGISTER.CACHE_THINGS[asset_type]
cache = cache_class._from_xml_element(asset)
asset_class = blue.REGISTER._get_thing_class(asset)
asset_obj = asset_class._from_xml_element(xml_element=asset,
cache=cache)
assets[asset_type][asset.get('name')] = asset_obj
# Second pass: materials (reference textures by name)
MAT_FLOAT = {'emission', 'specular', 'shininess', 'reflectance',
'metallic', 'roughness'}
MAT_BOOL = {'texuniform'}
MAT_ARRAY = {'texrepeat', 'rgba'}
MAT_SKIP = {'name', 'texture'}
for asset in xml_assets:
if asset.tag == 'material':
tex_name = asset.get('texture')
texture = assets['texture'].get(tex_name) if tex_name else None
mat_kwargs = {}
for k, v in asset.items():
if k in MAT_SKIP:
continue
if k == 'rgba':
mat_kwargs['color'] = [float(x) for x in v.split()]
elif k in MAT_FLOAT:
mat_kwargs[k] = float(v)
elif k in MAT_BOOL:
mat_kwargs[k] = v == 'true'
elif k in MAT_ARRAY:
mat_kwargs[k] = [float(x) for x in v.split()]
else:
mat_kwargs[k] = v
mat_name = asset.get('name')
mat_obj = blue.Material(texture=texture,
name=mat_name,
**mat_kwargs)
# Preserve the original asset name so the
# round-tripped XML uses the same identifier.
mat_obj.asset._name = mat_name
materials[mat_name] = mat_obj
if xml_actuators is not None:
for actuator in xml_actuators:
actuator_obj = blue.REGISTER._get_thing(actuator)
actuator_obj._IGNORE_CHECKS = True
actuator_name = actuator.get('name')
# Attach sensors that reference this actuator
if actuator_name in sensors['actuator']:
actuator_obj.attach(*sensors['actuator'][actuator_name], copy=False)
# Find the parent reference attribute (joint, site, body, or jointinparent)
parent_name = None
parent_key = None
for attr in ('joint', 'jointinparent', 'site', 'body'):
parent_name = actuator.get(attr)
if parent_name is not None:
parent_key = 'joint' if attr == 'jointinparent' else attr
break
if parent_key is not None:
actuators[parent_key][parent_name].append(actuator_obj)
# Handle secondary references (refsite)
for ref_attr, ref_type in actuator_obj._OTHER_REFERENCES.items():
ref_name = actuator.get(ref_attr)
if ref_name is not None:
ref_actuators[ref_type][ref_name].append(actuator_obj)
# Assign skybox texture to init_args if present
for tex_obj in assets.get('texture', {}).values():
if isinstance(tex_obj, blue.texture.Skybox):
init_args['texture'] = tex_obj
break
# INIT WORLD
world = object.__new__(cls)
world.__init__(**init_args)
for xml_element in xml_worldbody:
cls._build_from_xml(parent=world,
xml_element=xml_element,
assets=assets,
materials=materials,
sensors=sensors,
actuators=actuators,
ref_actuators=ref_actuators)
for tag_actuators in actuators.values():
for name_parent in tag_actuators.values():
for actuator in name_parent:
actuator._IGNORE_CHECKS = False
# Resolve camera/light target references by name
all_things = {t.name: t for t in world.all}
for thing in world.all:
if isinstance(thing, (blue.Camera, blue.Light)):
target_name = getattr(thing, '_target_name', None)
if target_name is not None and target_name in all_things:
thing.target = all_things[target_name]
return world
@property
def model(self):
"""
If a feature is not accessable through blueprints you can
access the Mujoco model directly.
Returns
-------
mj.Model
"""
if self._built:
return self._mj_model
else:
raise Exception('Mujoco model is only accessable after the World has been built.')
@property
def data(self):
"""
If a feature is not accessable through blueprints you can
access the Mujoco data directly.
Returns
-------
mj.Data
"""
if self._built:
return self._mj_data
else:
raise Exception('Mujoco data is only accessable after the World has been built.')
@blue.restrict
@classmethod
def _build_from_xml(cls,
parent,
xml_element,
assets: dict,
materials: dict,
sensors: dict,
actuators: dict,
ref_actuators: dict) -> blue.ThingType:
"""
This method builds the world from the xml Element.
Parameters
----------
parent : WorldType
The World object to which the reconstructed Things are attached.
xml_element : TYPE
The xml element from which the Things used for reconstruction are taken.
assets : dict
A dictionary of assets.
materials : dict
A dictionary of materials.
sensors : dict
A dictionary of sensors.
actuators : dict
A dictionary of actuators.
ref_actuators : dict
A dictionary of ref_actuators.
"""
# CONSTRUCT ELEMENT
xml_type = xml_element.get('type')
xml_tag = xml_element.tag
xml_name = xml_element.get('name', '')
# Detect Agent bodies by AGENT: prefix
if xml_tag == 'body' and xml_name.startswith('AGENT:'):
obj_class = blue.Agent
else:
obj_class = blue.REGISTER._get_thing_class(xml_element)
xml_args = dict()
if xml_type in assets:
xml_name = xml_element.get(xml_type)
xml_args['asset'] = assets[xml_type][xml_name]
mat_name = xml_element.get('material')
if mat_name is not None and mat_name in materials:
xml_args['material'] = materials[mat_name]
if xml_tag in sensors:
xml_name = xml_element.get('name')
xml_args['sensors'] = sensors[xml_tag][xml_name]
if xml_tag in actuators:
xml_name = xml_element.get('name')
xml_args['actuators'] = actuators[xml_tag][xml_name]
if xml_tag in ref_actuators:
xml_name = xml_element.get('name')
xml_args['ref_actuators'] = ref_actuators[xml_tag][xml_name]
obj = obj_class._from_xml_element(xml_element=xml_element, **xml_args)
# CONSTRUCT CHILDREN
for xml_child in xml_element:
cls._build_from_xml(parent=obj,
xml_element=xml_child,
assets=assets,
materials=materials,
sensors=sensors,
actuators=actuators,
ref_actuators=ref_actuators)
parent.attach(obj, copy=False)
# PROPERTIES
@property
def size(self):
"""
The size is computed to sclae the FreeCamera of the viewer (see :meth:`World.view`).
Returns
-------
np.ndarray
The size of the World ignoring background Things.
"""
built = self._built
if not built:
self.build()
size = self._size
if not built:
self.unbuild()
return size
@property
def center(self):
"""
The center is computed to position the FreeCamera of the viewer (see :meth:`World.view`).
Returns
-------
np.ndarray
The center of the World ignoring background Things.
"""
built = self._built
if not built:
self.build()
center = self._center
if not built:
self.unbuild()
return center
@property
def texture(self) -> blue.SkyboxTextureType:
"""
Only a :class:`SkyboxTexture <blueprints.texture.Skybox>` are valid for the World.
.. note::
At the moment, Mujoco is only rendering SkyboxTextures of the World, if also another
texture is taken by a material refferenced by a Thing within the World.
Returns
-------
blue.SkyboxTextureType
The Skybox Texture which is applied to the background of the World.
"""
return self._texture
@texture.setter
@blue.restrict
def texture(self, texture: blue.SkyboxTextureType|None) -> None:
if texture is not None:
self._texture = texture.copy()
self._texture._parent = self
else:
self._texture = texture