-
Notifications
You must be signed in to change notification settings - Fork 18
/
memory.py
93 lines (72 loc) · 3.59 KB
/
memory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import numpy as np
from copy import deepcopy as dc
import random
class Memory:
def __init__(self, capacity, k_future, env):
self.capacity = capacity
self.memory = []
self.memory_counter = 0
self.memory_length = 0
self.env = env
self.future_p = 1 - (1. / (1 + k_future))
def sample(self, batch_size):
ep_indices = np.random.randint(0, len(self.memory), batch_size)
time_indices = np.random.randint(0, len(self.memory[0]["next_state"]), batch_size)
states = []
actions = []
desired_goals = []
next_states = []
next_achieved_goals = []
for episode, timestep in zip(ep_indices, time_indices):
states.append(dc(self.memory[episode]["state"][timestep]))
actions.append(dc(self.memory[episode]["action"][timestep]))
desired_goals.append(dc(self.memory[episode]["desired_goal"][timestep]))
next_achieved_goals.append(dc(self.memory[episode]["next_achieved_goal"][timestep]))
next_states.append(dc(self.memory[episode]["next_state"][timestep]))
states = np.vstack(states)
actions = np.vstack(actions)
desired_goals = np.vstack(desired_goals)
next_achieved_goals = np.vstack(next_achieved_goals)
next_states = np.vstack(next_states)
her_indices = np.where(np.random.uniform(size=batch_size) < self.future_p)
future_offset = np.random.uniform(size=batch_size) * (len(self.memory[0]["next_state"]) - time_indices)
future_offset = future_offset.astype(int)
future_t = (time_indices + 1 + future_offset)[her_indices]
future_ag = []
for episode, f_offset in zip(ep_indices[her_indices], future_t):
future_ag.append(dc(self.memory[episode]["achieved_goal"][f_offset]))
future_ag = np.vstack(future_ag)
desired_goals[her_indices] = future_ag
rewards = np.expand_dims(self.env.compute_reward(next_achieved_goals, desired_goals, None), 1)
return self.clip_obs(states), actions, rewards, self.clip_obs(next_states), self.clip_obs(desired_goals)
def add(self, transition):
self.memory.append(transition)
if len(self.memory) > self.capacity:
self.memory.pop(0)
assert len(self.memory) <= self.capacity
def __len__(self):
return len(self.memory)
@staticmethod
def clip_obs(x):
return np.clip(x, -200, 200)
def sample_for_normalization(self, batch):
size = len(batch[0]["next_state"])
ep_indices = np.random.randint(0, len(batch), size)
time_indices = np.random.randint(0, len(batch[0]["next_state"]), size)
states = []
desired_goals = []
for episode, timestep in zip(ep_indices, time_indices):
states.append(dc(batch[episode]["state"][timestep]))
desired_goals.append(dc(batch[episode]["desired_goal"][timestep]))
states = np.vstack(states)
desired_goals = np.vstack(desired_goals)
her_indices = np.where(np.random.uniform(size=size) < self.future_p)
future_offset = np.random.uniform(size=size) * (len(batch[0]["next_state"]) - time_indices)
future_offset = future_offset.astype(int)
future_t = (time_indices + 1 + future_offset)[her_indices]
future_ag = []
for episode, f_offset in zip(ep_indices[her_indices], future_t):
future_ag.append(dc(batch[episode]["achieved_goal"][f_offset]))
future_ag = np.vstack(future_ag)
desired_goals[her_indices] = future_ag
return self.clip_obs(states), self.clip_obs(desired_goals)