Skip to content

Commit

Permalink
Move generate_srdf/replace_urdf_package_keyword functions to mplib.ur…
Browse files Browse the repository at this point in the history
…df_utils
  • Loading branch information
KolinGuo committed Apr 15, 2024
1 parent 041a305 commit b0a320d
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 127 deletions.
143 changes: 16 additions & 127 deletions mplib/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from mplib.pymp.collision_detection import AllowedCollisionMatrix, WorldCollisionResult
from mplib.pymp.collision_detection.fcl import CollisionGeometry, FCLObject
from mplib.pymp.planning import ompl
from mplib.urdf_utils import generate_srdf, replace_urdf_package_keyword


class Planner:
Expand All @@ -28,8 +29,8 @@ def __init__(
urdf: str,
move_group: str,
*,
srdf: str = "",
package_keyword_replacement: str = "",
srdf: Optional[str] = None,
new_package_keyword: str = "",
use_convex: bool = False,
user_link_names: Sequence[str] = [],
user_joint_names: Sequence[str] = [],
Expand All @@ -48,7 +49,7 @@ def __init__(
:param urdf: Unified Robot Description Format file.
:param move_group: target link to move, usually the end-effector.
:param srdf: Semantic Robot Description Format file.
:param package_keyword_replacement: replace ``package://`` keyword in URDF
:param new_package_keyword: the string to replace 'package://' keyword
:param use_convex: if True, load collision mesh as convex mesh.
If mesh is not convex, a ``RuntimeError`` will be raised.
:param user_link_names: names of links, the order matters.
Expand All @@ -62,23 +63,27 @@ def __init__(
:param objects: list of FCLObject as non-articulated collision objects
:param verbose: if True, print verbose logs for debugging
"""
self.urdf = urdf
self.srdf = srdf
if self.srdf == "" and os.path.exists(urdf.replace(".urdf", ".srdf")):
self.srdf = urdf.replace(".urdf", ".srdf")
print(f"No SRDF file provided but found {self.srdf}")
self.urdf, self.srdf = urdf, srdf
if self.srdf is None:
if os.path.exists(srdf := urdf.replace(".urdf", ".srdf")):
print(f"No SRDF file provided but found {self.srdf}")
self.srdf = srdf
else:
self.srdf = generate_srdf(self.urdf, new_package_keyword, verbose=True)

# replace package:// keyword if exists
urdf = self.replace_package_keyword(package_keyword_replacement)
self.urdf = replace_urdf_package_keyword(self.urdf, new_package_keyword)

self.robot = ArticulatedModel(
self.urdf,
self.srdf,
str(self.urdf),
str(self.srdf),
link_names=user_link_names, # type: ignore
joint_names=user_joint_names, # type: ignore
convex=use_convex,
verbose=verbose,
)
# Remove the temporary URDF file from replace_urdf_package_keyword()
self.urdf.unlink()
self.pinocchio_model = self.robot.get_pinocchio_model()
self.user_link_names = self.pinocchio_model.get_link_names()
self.user_joint_names = self.pinocchio_model.get_joint_names()
Expand Down Expand Up @@ -127,124 +132,8 @@ def __init__(
self.planning_world = PlanningWorld([self.robot], objects)
self.acm = self.planning_world.get_allowed_collision_matrix()

if self.srdf == "":
self.generate_collision_pair()
self.robot.update_SRDF(self.srdf)

self.planner = ompl.OMPLPlanner(world=self.planning_world)

def replace_package_keyword(self, package_keyword_replacement: str):
# TODO(merge): fix file writing
# TODO(merge): convert to staticmethod
"""
some ROS URDF files use package:// keyword to refer the package dir
replace it with the given string (default is empty)
Args:
package_keyword_replacement: the string to replace 'package://' keyword
"""
rtn_urdf = self.urdf
with open(self.urdf) as in_f:
content = in_f.read()
if "package://" in content:
rtn_urdf = self.urdf.replace(".urdf", "_package_keyword_replaced.urdf")
content = content.replace("package://", package_keyword_replacement)
if not os.path.exists(rtn_urdf):
with open(rtn_urdf, "w") as out_f:
out_f.write(content)
return rtn_urdf

def generate_collision_pair(self, num_samples=100000):
# TODO(merge): convert to staticmethod
"""
We read the srdf file to get the link pairs that should not collide.
If not provided, we need to randomly sample configurations
to find the link pairs that will always collide.
:param num_samples: number of samples to find the link that will always collide
"""
print(
"Since no SRDF file is provided. We will first detect link pairs that will"
" always collide. This may take several minutes."
)

root = ET.Element("robot")
robot_name = self.urdf.split("/")[-1].split(".")[0]
root.set("name", robot_name)
self.srdf = self.urdf.replace(".urdf", ".srdf")

acm = AllowedCollisionMatrix()

# 1. disable adjacent link pairs
for link1, link2 in self.pinocchio_model.get_adjacent_links():
print(f"Ignore collision pair: ({link1}, {link2}), reason: Adjacent")
acm.set_entry(link1, link2, True)
_ = ET.SubElement(
root,
"disable_collisions",
attrib={"link1": link1, "link2": link2, "reason": "Adjacent"},
)

# 2. disable all-zeros qpos (default) collision
self.robot.set_qpos(np.zeros(len(self.user_joint_names)), True)
for collision in self.check_for_self_collision():
link1, link2 = collision.link_name1, collision.link_name2
if acm.get_entry(link1, link2) is not None: # already ignored
continue
print(
f"Ignore collision pair: ({link1}, {link2}), "
"reason: Default (collides at all-zeros qpos)"
)
acm.set_entry(link1, link2, True)
_ = ET.SubElement(
root,
"disable_collisions",
attrib={"link1": link1, "link2": link2, "reason": "Default"},
)

# 3. disable collision pairs that always collide and never collide via sampling
n_links = len(self.user_link_names)
collision_cnt = np.zeros((n_links, n_links), dtype=int)
for _ in range(num_samples):
self.robot.set_qpos(self.pinocchio_model.get_random_configuration(), True)
for collision in self.check_for_self_collision():
u = self.link_name_2_idx[collision.link_name1]
v = self.link_name_2_idx[collision.link_name2]
collision_cnt[u][v] += 1

for i, link1 in enumerate(self.user_link_names):
for j in range(i + 1, n_links):
link2 = self.user_link_names[j]
if acm.get_entry(link1, link2) is not None: # already ignored
continue
if (cnt := (collision_cnt[i][j] + collision_cnt[j][i])) == num_samples:
print(
f"Ignore collision pair: ({link1}, {link2}), "
"reason: Always collide"
)
_ = ET.SubElement(
root,
"disable_collisions",
attrib={"link1": link1, "link2": link2, "reason": "Always"},
)
elif cnt == 0:
print(
f"Ignore collision pair: ({link1}, {link2}), "
"reason: Never collide"
)
_ = ET.SubElement(
root,
"disable_collisions",
attrib={"link1": link1, "link2": link2, "reason": "Never"},
)

# Save SRDF
with open(self.srdf, "w") as srdf_file:
srdf_file.write(
minidom.parseString(ET.tostring(root)).toprettyxml(indent=" ")
)
print(f"Saving the SRDF file to {self.srdf}")

def wrap_joint_limit(self, qpos: np.ndarray) -> bool:
"""
Checks if the joint configuration can be wrapped to be within the joint limits.
Expand Down
188 changes: 188 additions & 0 deletions mplib/urdf_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import tempfile
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Optional
from xml.dom import minidom

import numpy as np

from mplib.pymp import ArticulatedModel
from mplib.pymp.collision_detection import AllowedCollisionMatrix


def compute_default_collisions(
robot: ArticulatedModel, *, num_samples=100000, verbose=False
) -> str:
"""
Compute default collision pairs for generating SRDF.
This function mimics MoveIt2's
``moveit_setup::srdf_setup::computeDefaultCollisions()``
Reference:
https://moveit.picknik.ai/main/api/html/namespacemoveit__setup_1_1srdf__setup.html#a2812f73b447d838cd7dba1b0ee1a0c95
:param robot: an ``ArticulatedModel``
:param num_samples: number of samples to find the link that will always collide
:param verbose: print debug info
:return: SRDF content as an XML string
"""
if verbose:
print(
"Generating SRDF with default collision pairs. "
"This may take several minutes."
)

root = ET.Element("robot", {"name": robot.name})

pinocchio_model = robot.get_pinocchio_model()
user_link_names = pinocchio_model.get_link_names()
user_joint_names = pinocchio_model.get_joint_names()
link_name_2_idx = {link: i for i, link in enumerate(user_link_names)}
fcl_model = robot.get_fcl_model()
acm = AllowedCollisionMatrix()

# 1. disable adjacent link pairs
for link1, link2 in pinocchio_model.get_adjacent_links():
if verbose:
print(f"Ignore collision pair: ({link1}, {link2}), reason: Adjacent")
acm.set_entry(link1, link2, True)
_ = ET.SubElement(
root,
"disable_collisions",
attrib={"link1": link1, "link2": link2, "reason": "Adjacent"},
)

# 2. disable all-zeros qpos (default) collision
robot.set_qpos(np.zeros(len(user_joint_names)), True)
for collision in fcl_model.check_self_collision():
link1, link2 = collision.link_name1, collision.link_name2
if acm.get_entry(link1, link2) is not None: # already ignored
continue
if verbose:
print(
f"Ignore collision pair: ({link1}, {link2}), "
"reason: Default (collides at all-zeros qpos)"
)
acm.set_entry(link1, link2, True)
_ = ET.SubElement(
root,
"disable_collisions",
attrib={"link1": link1, "link2": link2, "reason": "Default"},
)

# 3. disable collision pairs that always collide and never collide via sampling
n_links = len(user_link_names)
collision_cnt = np.zeros((n_links, n_links), dtype=int)
for _ in range(num_samples):
robot.set_qpos(pinocchio_model.get_random_configuration(), True)
for collision in fcl_model.check_self_collision():
u = link_name_2_idx[collision.link_name1]
v = link_name_2_idx[collision.link_name2]
collision_cnt[u][v] += 1

for i, link1 in enumerate(user_link_names):
for j in range(i + 1, n_links):
link2 = user_link_names[j]
if acm.get_entry(link1, link2) is not None: # already ignored
continue
if (cnt := (collision_cnt[i][j] + collision_cnt[j][i])) == num_samples:
if verbose:
print(
f"Ignore collision pair: ({link1}, {link2}), "
"reason: Always collide"
)
_ = ET.SubElement(
root,
"disable_collisions",
attrib={"link1": link1, "link2": link2, "reason": "Always"},
)
elif cnt == 0:
if verbose:
print(
f"Ignore collision pair: ({link1}, {link2}), "
"reason: Never collide"
)
_ = ET.SubElement(
root,
"disable_collisions",
attrib={"link1": link1, "link2": link2, "reason": "Never"},
)

return minidom.parseString(ET.tostring(root)).toprettyxml(indent=" ")


def replace_urdf_package_keyword(
urdf_path: str | Path,
new_package_keyword: str = "",
*,
save_path: Optional[str | Path] = None,
) -> Path:
"""
Some ROS URDF files use package:// keyword to refer the package dir.
Replace it with the given string (default is empty)
:param urdf_path: Path to a Unified Robot Description Format file.
:param new_package_keyword: the string to replace 'package://' keyword
:param save_path: Path to save the modified URDF file.
If ``None``, create a temporary file.
:return: Path to the modified URDF file
"""
if save_path is None:
_, save_path = tempfile.mkstemp(suffix=".urdf")
save_path = Path(save_path)
if not (save_dir := save_path.parent).is_dir():
save_dir.mkdir(parents=True)

with Path(urdf_path).open("r") as in_f:
if "package://" in (content := in_f.read()):
with save_path.open("w") as out_f:
out_f.write(content.replace("package://", new_package_keyword))
return save_path


def generate_srdf(
urdf_path: str | Path,
new_package_keyword: str = "",
*,
num_samples=100000,
save_path: Optional[str | Path] = None,
verbose=False,
) -> Path:
"""
Generate SRDF from URDF similar to MoveIt2's setup assistant.
:param urdf_path: Path to a Unified Robot Description Format file.
:param new_package_keyword: the string to replace 'package://' keyword
:param num_samples: number of samples to find the link that will always collide
:param save_path: Path to save the generated SRDF file.
If ``None``, create a temporary file.
:param verbose: print debug info
:return: Path to the generated SRDF file
"""
assert Path(urdf_path).is_file(), f"URDF file {urdf_path} does not exist"

# Replace 'package://' keyword
urdf_path = replace_urdf_package_keyword(urdf_path, new_package_keyword)

robot = ArticulatedModel(str(urdf_path), "")
srdf_str = compute_default_collisions(
robot, num_samples=num_samples, verbose=verbose
)

if save_path is None:
_, save_path = tempfile.mkstemp(suffix=".srdf")
save_path = Path(save_path)
if not (save_dir := save_path.parent).is_dir():
save_dir.mkdir(parents=True)

# Save SRDF
with save_path.open("w") as f:
f.write(srdf_str)

if verbose:
print(f"Saved the SRDF file to {save_path}")

# Remove the temporary URDF file from replace_urdf_package_keyword()
urdf_path.unlink()
return save_path

0 comments on commit b0a320d

Please sign in to comment.