forked from hirowatari-s/ExploreSearchSystem
-
Notifications
You must be signed in to change notification settings - Fork 0
/
som.py
133 lines (111 loc) · 5.98 KB
/
som.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# -*- coding: utf-8 -*-
import numpy as np
from scipy.spatial import distance as dist
from sklearn.decomposition import PCA
class ManifoldModeling:
def __init__(self, X, latent_dim, resolution, sigma_max, sigma_min, tau, model_name, init='random', metric="sqeuclidean"):
self.X = X
self.N = self.X.shape[0]
self.sigma_max = sigma_max
self.sigma_min = sigma_min
self.tau = tau
self.D = X.shape[1]
self.L = latent_dim
self.model_name = model_name
self.history = {}
if isinstance(init, str) and init == 'PCA':
pca = PCA(n_components=latent_dim)
pca.fit(X)
if latent_dim == 1:
self.Zeta = np.linspace(-1.0, 1.0, resolution)[:, np.newaxis]
elif latent_dim == 2:
if isinstance(init, str) and init == 'PCA':
comp1, comp2 = pca.singular_values_[0], pca.singular_values_[1]
zeta = np.meshgrid(np.linspace(-1, 1, resolution), np.linspace(-comp2/comp1, comp2/comp1, resolution))
else:
zeta = np.meshgrid(np.linspace(-1, 1, resolution), np.linspace(-1, 1, resolution))
self.Zeta = np.dstack(zeta).reshape(resolution**2, latent_dim)
else:
raise ValueError("invalid latent dimension: {}".format(latent_dim))
self.K = resolution**self.L
if isinstance(init, str) and init == 'random':
self.Z = np.random.rand(self.N, latent_dim) * 2.0 - 1.0
elif isinstance(init, str) and init == 'random_bmu':
init_bmus = np.random.randint(0, self.Zeta.shape[0] - 1, self.N)
self.Z = self.Zeta[init_bmus,:]
elif isinstance(init, str) and init == 'PCA':
self.Z = pca.transform(X)/comp1
elif isinstance(init, np.ndarray) and init.dtype == int:
init_bmus = init.copy()
self.Z = self.Zeta[init_bmus, :]
elif isinstance(init, np.ndarray) and init.shape == (self.N, latent_dim):
self.Z = init.copy()
else:
raise ValueError("invalid init: {}".format(init))
#metricに関する処理
if metric == "sqeuclidean":
self.metric="sqeuclidean"
elif metric == "KLdivergence":
self.metric = "KLdivergence"
else:
raise ValueError("invalid metric: {}".format(metric))
self.history = {}
def _cooperative_process(self, epoch):
# 学習量を計算
# sigma = self.sigma_min + (self.sigma_max - self.sigma_min) * np.exp(-epoch / self.tau) # 近傍半径を設定
self.sigma = max(self.sigma_min, self.sigma_min + (self.sigma_max - self.sigma_min) * (1 - (epoch / self.tau))) # 近傍半径を設定
Dist = dist.cdist(self.Zeta, self.Z, 'sqeuclidean')
# KxNの距離行列を計算
# ノードと勝者ノードの全ての組み合わせにおける距離を網羅した行列
self.H = np.exp(-Dist / (2 * self.sigma * self.sigma)) # KxNの学習量行列を計算
def _adaptive_process(self):
# 参照ベクトルの更新
G = np.sum(self.H, axis=1)[:, np.newaxis] # 各ノードが受ける学習量の総和を保持するKx1の列ベクトルを計算
Ginv = np.reciprocal(G) # Gのそれぞれの要素の逆数を取る
R = self.H * Ginv # 学習量の総和が1になるように規格化
self.Y = R @ self.X # 学習量を重みとして観測データの平均を取り参照ベクトルとする
def _competitive_process(self):
if self.metric == "sqeuclidean": # ユークリッド距離を使った勝者決定
# 勝者ノードの計算H
Dist = dist.cdist(self.X, self.Y) # NxKの距離行列を計算
self.bmus = Dist.argmin(axis=1)
# Nx1の勝者ノード番号をまとめた列ベクトルを計算
# argmin(axis=1)を用いて各行で最小値を探しそのインデックスを返す
self.Z = self.Zeta[self.bmus, :] # 勝者ノード番号から勝者ノードを求める
elif self.metric == "KLdivergence": # KL情報量を使った勝者決定
Dist = np.sum(self.X[:, np.newaxis, :] * np.log(self.Y)[np.newaxis, :, :], axis=2) # N*K行列
# 勝者番号の決定
self.bmus = np.argmax(Dist, axis=1)
# Nx1の勝者ノード番号をまとめた列ベクトルを計算
# argmin(axis=1)を用いて各行で最小値を探しそのインデックスを返す
self.Z = self.Zeta[self.bmus, :] # 勝者ノード番号から勝者ノードを求める
def _ukr_process(self, sigma, alpha=5.1):
ZZ = self.Z[:, None, :] - self.Z[None, :, :]
D = np.sum(np.square(ZZ), axis=2)
H = np.exp((-0.5/sigma**2) * D)
G = np.sum(H, axis=1, keepdims=True)
R = H / G
Y = R @ self.X
Dyx = Y[:, None, :] - self.X[None, :, :]
A = R * np.einsum("nd,nid->ni", Y - self.X, Dyx)
grad = np.einsum("ni,nid->nd", A + A.T, ZZ)
self.Z = np.clip(self.Z - alpha * grad, -1, 1)
# self.Z = self.Z - alpha * grad
def update_mapping(self, Y):
self.Y = Y
def fit(self, nb_epoch=100, verbose=True):
self.history['z'] = np.zeros((nb_epoch, self.N, self.L))
self.history['y'] = np.zeros((nb_epoch, self.K, self.D))
self.history['sigma'] = np.zeros(nb_epoch)
for epoch in range(nb_epoch):
if self.model_name == 'SOM':
self._cooperative_process(epoch) # 協調過程
self._adaptive_process() # 適合過程
self._competitive_process() # 競合過程
else:
self._ukr_process(np.sqrt(1/self.N*np.pi))
self._cooperative_process(epoch) # 協調過程
self._adaptive_process() # 適合過程
self.history['z'][epoch] = self.Z
self.history['y'][epoch] = self.Y
self.history['sigma'][epoch] = self.sigma