-
Notifications
You must be signed in to change notification settings - Fork 0
/
pick_and_place_three_arm_sm.py
126 lines (103 loc) · 5.59 KB
/
pick_and_place_three_arm_sm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from enum import Enum
import math
from rospy import loginfo, logwarn
import PyKDL
import pprint
import time
from pick_and_place_arm_sm import PickAndPlaceStateMachine, PickAndPlaceState, PSM_HOME_POS, vector_eps_eq
PSM_HOME_POS = PyKDL.Vector(0., 0., -0.1)
class PickAndPlaceThreeArmStateMachine:
# this class isn't actually a state machine, but rather a way to connect
# the states of PSM1 and PSM2 state machines to effectively create a
# 2-arm FSM
def _get_objects_for_psms(self):
'''
Returns a dict of PSM index -> list of objects that are closest to that PSM
'''
result = dict()
for object in self.world.objects:
closest_psm_idx = self.world_to_psm_tfs.index(
min(self.world_to_psm_tfs, key=lambda tf : (tf * object.pos).Norm()))
if closest_psm_idx not in result:
result[closest_psm_idx] = list()
result[closest_psm_idx].append(object)
if self.log_verbose:
loginfo("Unpicked objects: {}".format(pprint.pformat(result)))
return result
def __init__(self, psms, world_to_psm_tfs, world, approach_vec, log_verbose=False):
self.world = world
self.psms = psms
self.world_to_psm_tfs = world_to_psm_tfs
self.log_verbose = log_verbose
self.approach_vec = approach_vec
# get objects for psms
psm_to_objects_map = self._get_objects_for_psms()
print(psm_to_objects_map)
if 0 in psm_to_objects_map:
self.current_sm = PickAndPlaceStateMachine(self.psms[0], self.world,
self.world_to_psm_tfs[0], psm_to_objects_map[0][0],
approach_vec, closed_loop=False,
home_when_done=bool(len(psm_to_objects_map[0]) <= 1))
elif 1 in psm_to_objects_map:
self.current_sm = PickAndPlaceStateMachine(self.psms[1], self.world,
self.world_to_psm_tfs[1], psm_to_objects_map[1][0],
approach_vec, closed_loop=False,
home_when_done=bool(len(psm_to_objects_map[1]) <= 1))
elif 2 in psm_to_objects_map:
self.current_sm = PickAndPlaceStateMachine(self.psms[2], self.world,
self.world_to_psm_tfs[2], psm_to_objects_map[2][0],
approach_vec, closed_loop=False,
home_when_done=bool(len(psm_to_objects_map[2]) <= 1))
def update_world(self, world):
self.world = world
self.current_sm.update_world(world)
def run_once(self):
if self.current_sm.is_done() and \
self.current_sm.psm.get_current_jaw_position() > math.pi / 3:
objects = self._get_objects_for_psms()
if 0 in objects:
print(len(objects[0]))
self.current_sm = \
PickAndPlaceStateMachine(self.psms[0], self.world,
self.world_to_psm_tfs[0], objects[0][0],
self.approach_vec, closed_loop=False,
home_when_done=bool(len(objects[0]) <= 1))
elif 1 in objects:
if self.current_sm.psm == self.psms[0]:
# go home
self.current_sm.state = PickAndPlaceState.HOME
if not self.current_sm.is_done():
self.current_sm.run_once()
# this state check is here because i want to sleep
while not vector_eps_eq(self.psms[0].get_current_position().p, PSM_HOME_POS):
# spaghetti code
time.sleep(0.1)
self.current_sm = \
PickAndPlaceStateMachine(self.psms[1], self.world,
self.world_to_psm_tfs[1], objects[1][0],
self.approach_vec, closed_loop=False,
home_when_done=bool(len(objects[1]) <= 1))
elif 2 in objects:
if self.current_sm.psm == (self.psms[0] or self.psms[1]):
# go home
self.current_sm.state = PickAndPlaceState.HOME
if not self.current_sm.is_done():
self.current_sm.run_once()
# this state check is here because i want to sleep
while not vector_eps_eq(self.psms[0].get_current_position().p, PSM_HOME_POS) \
and not vector_eps_eq(self.psms[1].get_current_position().p, PSM_HOME_POS):
# spaghetti code
time.sleep(0.1)
self.current_sm = \
PickAndPlaceStateMachine(self.psms[2], self.world,
self.world_to_psm_tfs[2], objects[2][0],
self.approach_vec, closed_loop=False,
home_when_done=bool(len(objects[2]) <= 1))
if not self.current_sm.is_done():
self.current_sm.run_once()
def is_done(self):
return bool(len(self.world.objects) == 0) and self.current_sm.jaw_fully_open()
def __str__(self):
return str(self.__dict__)
def __repr__(self):
return str(self.__dict__)