-
Notifications
You must be signed in to change notification settings - Fork 0
/
sae_evolution.py
191 lines (152 loc) · 6.14 KB
/
sae_evolution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import os
import logging
import random
import torch
import uuid
import yaml
from pathlib import Path
class Candidate:
def __init__(self, file_path, layers, initial_population=False, generation=0):
self.file_path = file_path
self.initial_population = initial_population
self.layers = layers
self.generation = generation
self.last_gen = None
def to_dict(self):
return {
"model": self.file_path,
"generation": self.generation,
"last_gen": self.last_gen,
"layers": self.layers
}
def crossover(parent1, parent2):
child_layers = {}
file_path = f"{uuid.uuid4()}.yaml"
child = Candidate(file_path, child_layers)
child.layers = crossover_layers(parent1.layers, parent2.layers)
child.generation = max(parent1.generation, parent2.generation) + 1
return child
def merge_dicts(*dicts):
result = {}
for d in dicts:
for key, value in d.items():
if key in result:
result[key].update(value)
else:
result[key] = value.copy()
return result
def random_filter(d, fraction=0.8):
keys = list(d.keys())
num_to_keep = max(1, int(len(keys) * fraction))
keys_to_keep = random.sample(keys, num_to_keep)
return {k: d[k] for k in keys_to_keep}
def merge_single_level_dicts(a, b):
result = a.copy()
for key, value in b.items():
if key in result:
result[key] = {**result[key], **value}
else:
result[key] = value
return result
def crossover_layers(a, b):
# Merge the single-level nested dictionaries
combined = merge_single_level_dicts(a, b)
# Apply random filter to the combined result
filtered_result = {k: random_filter(v) for k, v in combined.items()}
return filtered_result
def mutation(candidate, mutation_rate=0.01, mutation_scale=20, rare_change_rate=0.0001):
mutated_layers = {}
for layer_name, layer in candidate.layers.items():
mutated_layer = {}
for neuron_id, weight in layer.items():
if random.random() < mutation_rate:
# Randomize weight coefficient
mutated_weight = weight + random.gauss(0, mutation_scale)
mutated_layer[neuron_id] = int(mutated_weight)
elif random.random() < rare_change_rate:
# Rare chance to drop this neuron
continue
else:
mutated_layer[neuron_id] = weight
# Add a random neuron with a small chance
if random.random() < rare_change_rate:
new_neuron_id = max(layer.keys()) + 1 if layer else 0
mutated_layer[new_neuron_id] = random.gauss(0, mutation_scale)
if len(mutated_layer.items()) > 0:
mutated_layers[layer_name] = mutated_layer
candidate.layers = mutated_layers
def save_candidate(candidate, file_path):
# Ensure the directory 'candidates' exists
os.makedirs('candidates', exist_ok=True)
# Construct the full file path
full_path = os.path.join('candidates', file_path)
# Save the candidate.layers map to the specified file in YAML format
with open(full_path, 'w') as file:
yaml.dump(candidate.to_dict(), file)
def load_candidate(file_name):
# Construct the full file path
full_path = os.path.join('candidates', file_name)
# Load the YAML file into a dictionary
with open(full_path, 'r') as file:
candidate = yaml.safe_load(file)
return Candidate(file_path=full_path, layers=candidate['layers'], initial_population=True)
def breed(parents, mutation_rate):
print("breed", parents)
offspring = crossover(parents[0], parents[1])
mutation(offspring, mutation_rate)
# Save the offspring's layers to a file
save_candidate(offspring, offspring.file_path)
return offspring
def selection(population):
return random.sample(population, 2)
# You can keep the evolve function mostly the same, just update it to use the new breed function
def evolve(population, population_size, mutation_rate,):
seed_population = list(population)
while len(population) < population_size:
parents = selection(seed_population)
offspring = breed(parents, mutation_rate)
population.append(offspring)
return population
async def run_evolution(population, elite_size, population_size, mutation_rate, evaluation_criteria):
logging.info("Before evolve")
log_candidates(population)
population = evolve(population, population_size, mutation_rate)
logging.info("Before sorting")
log_candidates(population)
population = await sort_with_correction(population, evaluation_criteria)
logging.info("After sorting")
log_candidates(population)
return population[:elite_size]
def log_candidates(population):
format_str = "{0}. {1:<24}"
for index, candidate in enumerate(population, start=1):
logging.info(format_str.format(index, candidate.file_path))
async def correct_insert_element(item, sorted_list, compare, top_k):
if not sorted_list:
return [item]
# find a place for insertion
insert_pos = await find_insertion_point(item, sorted_list, compare, top_k)
# insert item tentatively
sorted_list.insert(insert_pos, item)
return sorted_list
async def find_insertion_point(item, sorted_list, compare, top_k):
# binary search variant that accounts for potential comparison errors
low, high = 0, len(sorted_list) - 1
while low <= high:
if low > top_k and top_k > 0:
return low
mid = (low + high) // 2
result = await compare(item, sorted_list[mid])
# adjust binary search based on comparison, considering potential inaccuracies
if result == 1:
high = mid - 1
else:
low = mid + 1
return low
async def sort_with_correction(buffer, compare, top_k=-1):
sorted_list = []
for item in buffer:
sorted_list = await correct_insert_element(item, sorted_list, compare, top_k)
# correction mechanism here
#sorted_list = await correction_pass(sorted_list)
return sorted_list