Skip to content

Commit

Permalink
Fixed minor bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Givanna Putri committed Aug 5, 2019
1 parent d9c9a5a commit 2286ad0
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 105 deletions.
Binary file modified .DS_Store
Binary file not shown.
169 changes: 109 additions & 60 deletions chronoclust/hddstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,25 @@ def _set_dataset_dependent_parameters(self, input_dataset):
None
"""

self.dataset_dimensionality = input_dataset.shape[1]
dataset_dim = input_dataset.shape[1]
config = self.config

self.dataset_dimensionality = dataset_dim

# Set projected_dimensionality_threshold. This will be set only once.
config_pi = int(float(self.config.find("pi").text))
config_pi = float(config.find("pi").text)
if config_pi <= 0:
# if config for projected_dimensionality_threshold is set to absurd size i.e. 0 or negative,
# the dimensionality of the dataset is going to be used instead.
self.pi = self.dataset_dimensionality
self.pi = dataset_dim
else:
# Doesn't make sense for this to not be whole number
self.pi = round(config_pi)

# Setting outlier deletion point. It's given as proportion of number of data_autoencoder points.
# So need to set whole number.
# It'll be based on proportion given * number of data_autoencoder points in previous day.
self.omicron = float(self.config.find("omicron").text) * self.dataset_size
self.omicron = float(config.find("omicron").text) * self.dataset_size

# make sure this is done only after we set outlier deletion point! This is because we want the deletion point
# to be based on "previous day dataset size"!
Expand Down Expand Up @@ -173,7 +176,9 @@ def online_microcluster_maintenance(self, input_dataset, input_dataset_daystamp,
if reset_param:
self._set_dataset_dependent_parameters(input_dataset)

self.logger.info(f"Setting up online phase for timepoint {input_dataset_daystamp} with following params:\n"
logger = self.logger

logger.info(f"Setting up online phase for timepoint {input_dataset_daystamp} with following params:\n"
f"Pcore density threshold factor(beta) = {self.beta}\n"
f"Decay rate(lambda) = {self.lambbda}\n"
f"Radius threshold(epsilon) = {self.epsilon}\n"
Expand All @@ -187,7 +192,7 @@ def online_microcluster_maintenance(self, input_dataset, input_dataset_daystamp,
# Check whether we need to decay the cluster. We only decay it if this dataset is not for the same day as the
# previous dataset processed by the online microcluster maintenance.
if (self.last_data_timestamp - input_dataset_daystamp) != 0:
self.logger.info("Decaying and downgrading microclusters")
logger.info("Decaying and downgrading microclusters")
# The time difference is converted to days because we only decay as each day has passed between datasets.
interval = input_dataset_daystamp - self.last_data_timestamp
self._decay_clusters_weight(interval)
Expand All @@ -204,9 +209,9 @@ def online_microcluster_maintenance(self, input_dataset, input_dataset_daystamp,

num_datapoints = input_dataset.shape[0]

self.logger.info("Starting online microcluster maintenance for timepoint {}".format(input_dataset_daystamp))
logger.info("Starting online microcluster maintenance for timepoint {}".format(input_dataset_daystamp))
# progress bar widget
progress_bar = TqdmToLogger(self.logger, level=logging.INFO)
progress_bar = TqdmToLogger(logger, level=logging.INFO)
for row in tqdm(range(num_datapoints), file=progress_bar, mininterval=1):
# You may find sometimes the progress line doesn't work well. In that case uncomment below.
datapoint = input_dataset[row]
Expand All @@ -226,8 +231,8 @@ def online_microcluster_maintenance(self, input_dataset, input_dataset_daystamp,
# We create a new outlier cluster for the datapoint.
self._create_new_outlier_cluster(datapoint, input_dataset_daystamp)

self.logger.info("Finish online microcluster maintenance for timepoint {}".format(input_dataset_daystamp))
self.logger.info("Online maintenance yield {} pcores and {} outlier".format(
logger.info("Finish online microcluster maintenance for timepoint {}".format(input_dataset_daystamp))
logger.info("Online maintenance yield {} pcores and {} outlier".format(
len(self.pcore_MC), len(self.outlier_MC)))

self.last_data_timestamp = input_dataset_daystamp
Expand All @@ -246,15 +251,21 @@ def _decay_clusters_weight(self, interval):
Returns:
None.
"""
for pcore in self.pcore_MC:
pcore_MCs = self.pcore_MC
outlier_MCs = self.outlier_MC

for pcore in pcore_MCs:
self.decay_a_cluster_weight(interval, pcore)

for outlier_mc in self.outlier_MC:
for outlier_mc in outlier_MCs:
self.decay_a_cluster_weight(interval, outlier_mc)

def decay_a_cluster_weight(self, interval, microcluster):
"""
Method to decay a microcluster's weight.
There is no need to update the preferred dimension as its variance would have remained the same.
Remember, decay is affecting CF1, CF2, and weight, and variance is calculated purely from them.
So if all of them change, then the variance will remain the same.
Args:
interval (float): Time difference between last dataset processed by online cluster maintenance
Expand Down Expand Up @@ -282,18 +293,25 @@ def _add_to_pcore(self, datapoint, datapoint_timestamp):
closest_distance = None
closest_cluster_index = None

# To save time rather than keep on referring to the self attributes.
pcore_MCs = self.pcore_MC
delta_squared = self.delta_squared
k = self.k
pi = self.pi
epsilon_squared = self.epsilon_squared

# calculate distances between datapoint and all pcore MCs.
for index, pmc in enumerate(self.pcore_MC):
for index, pmc in enumerate(pcore_MCs):

# In the Figure 2 paper[1] line 3-4,
# we want to just temporarily add data_autoencoder point to each microcluster to
# see if the datapoint can fit in it by checking the microcluster's pdim. We don't want to interfere the
# original microcluster, so we clone it and pretend to add a point it.
temp_pmc = pmc.get_copy_with_new_point(datapoint, self.delta_squared, self.k)
temp_pmc = pmc.get_copy_with_new_point(datapoint, delta_squared, k)

pdim_temp_pmc = (np.array(temp_pmc.preferred_dimension_vector) != 1).sum()

if pdim_temp_pmc <= self.pi:
if pdim_temp_pmc <= pi:
# Rather than keeping array of potential microclusters, we just calculate distance to it, and compare
# to see if there has been one that was closer before. If there hasn't
# then store this as closest. Otherwise leave it.
Expand All @@ -306,16 +324,14 @@ def _add_to_pcore(self, datapoint, datapoint_timestamp):
# We got here when there exists a potential microcluster that can accomodate the point. We then check to
# see if the potential microcluster can actually accomodate the point i.e. its radius will not blow out
# beyond the radius threshold. See line 14-15 in Figure 2 paper[1].
tmp_closest_cluster = self.pcore_MC[closest_cluster_index].get_copy_with_new_point(datapoint,
self.delta_squared,
self.k)
tmp_closest_cluster = pcore_MCs[closest_cluster_index].get_copy_with_new_point(datapoint, delta_squared, k)
projected_radius_squared = tmp_closest_cluster.calculate_projected_radius_squared()

if projected_radius_squared <= self.epsilon_squared:
if projected_radius_squared <= epsilon_squared:

self.pcore_MC[closest_cluster_index].add_new_point(datapoint, datapoint_timestamp)
self.pcore_MC[closest_cluster_index].update_preferred_dimensions(self.delta_squared,
self.k)
# TODO: check if referring to local variable affect the actual self as well. If not then revert to self.
pcore_MCs[closest_cluster_index].add_new_point(datapoint, datapoint_timestamp)
pcore_MCs[closest_cluster_index].update_preferred_dimensions(delta_squared, k)
return True
return False

Expand All @@ -337,30 +353,34 @@ def _add_to_outlier(self, datapoint, datapoint_timestamp):
closest_distance = None
closest_cluster_index = None

outlier_MCs = self.outlier_MC
delta_squared = self.delta_squared
k = self.k
epsilon_squared = self.epsilon_squared

# Find closest outlier microcluster.
for index, omc in enumerate(self.outlier_MC):
for index, omc in enumerate(outlier_MCs):
distance = omc.get_projected_dist_to_point(datapoint)
if closest_distance is None or distance < closest_distance:
closest_distance = distance
closest_cluster_index = index

# TODO merge this with pcore one as it is very similar
if closest_distance is not None:
# We got here when there exists an outlier microcluster that can accomodate the point. We then check to
# see if the outlier microcluster can actually accomodate the point i.e. its radius will not blow out
# beyond the radius threshold.

tmp_outlier_mc = self.outlier_MC[closest_cluster_index].get_copy_with_new_point(datapoint,
self.delta_squared,
self.k)
tmp_outlier_mc = outlier_MCs[closest_cluster_index].get_copy_with_new_point(datapoint, delta_squared, k)
projected_radius_squared = tmp_outlier_mc.calculate_projected_radius_squared()

if projected_radius_squared <= self.epsilon_squared:
self.outlier_MC[closest_cluster_index].add_new_point(datapoint, datapoint_timestamp)
self.outlier_MC[closest_cluster_index].update_preferred_dimensions(self.delta_squared,
self.k)
if projected_radius_squared <= epsilon_squared:
# TODO: check if referring to local variable affect the actual self as well. If not then revert to self.
outlier_MCs[closest_cluster_index].add_new_point(datapoint, datapoint_timestamp)
outlier_MCs[closest_cluster_index].update_preferred_dimensions(delta_squared, k)

# From here on, we then check whether the outlier microcluster can be upgraded to pcore microcluster.
self._upgrade_outlier_microcluster(self.outlier_MC[closest_cluster_index])
self._upgrade_outlier_microcluster(outlier_MCs[closest_cluster_index])

return True
return False
Expand All @@ -379,15 +399,23 @@ def _upgrade_outlier_microcluster(self, outlier_mc):
Returns:
None.
"""
weight_threshold_obeyed = outlier_mc.cumulative_weight >= self.beta * \
self.mu
pdim_threshold_obeyed = np.array(outlier_mc.preferred_dimension_vector > 1).sum() <= \
self.pi
beta = self.beta
mu = self.mu
pi = self.pi
pcore_MCs = self.pcore_MC

weight_threshold_obeyed = outlier_mc.cumulative_weight >= beta * mu
pdim_threshold_obeyed = np.array(outlier_mc.preferred_dimension_vector > 1).sum() <= pi

if weight_threshold_obeyed and pdim_threshold_obeyed:
outlier_mc.id = list(range(len(self.pcore_MC), len(self.pcore_MC) + 1))
# TODO: remove if it doesn't break the code
# num_pcore_MCs = len(pcore_MCs)
# outlier_mc.id = list(range(num_pcore_MCs, num_pcore_MCs + 1))

outlier_mc.id = [len(pcore_MCs)]
self.outlier_MC.remove(outlier_mc)
self.pcore_MC.append(outlier_mc)
# TODO: check if referring to local variable affect the actual self as well. If not then revert to self.
pcore_MCs.append(outlier_mc)

def _create_new_outlier_cluster(self, datapoint, creation_time):
"""
Expand All @@ -401,12 +429,21 @@ def _create_new_outlier_cluster(self, datapoint, creation_time):
Returns:
None.
"""
outlier_mc_id = set(range(len(self.outlier_MC), len(self.outlier_MC) + 1))
outlier_mc = Microcluster(cf1=np.zeros(len(datapoint)), cf2=np.zeros(len(datapoint)), id=outlier_mc_id,
outlier_MCs = self.outlier_MC
# TODO: remove if new code doesn't break
# num_outlier_MCs = len(self.outlier_MC)
# outlier_mc_id = set(range(num_outlier_MCs, num_outlier_MCs + 1))
outlier_mc_id = set()
outlier_mc_id.add(len(outlier_MCs))

num_datapoints = len(datapoint)
outlier_mc = Microcluster(cf1=np.zeros(num_datapoints), cf2=np.zeros(num_datapoints), id=outlier_mc_id,
creation_time_in_hrs=creation_time)
outlier_mc.add_new_point(datapoint, creation_time)
outlier_mc.update_preferred_dimensions(self.delta_squared, self.k)
self.outlier_MC.append(outlier_mc)

# TODO: check if referring to local variable affect the actual self as well. If not then revert to self.
outlier_MCs.append(outlier_mc)

def offline_clustering(self, dataset_daystamp):
"""
Expand All @@ -421,35 +458,40 @@ def offline_clustering(self, dataset_daystamp):
datapoints = {}

num_core = 0
for cluster in self.pcore_MC:
pcore_MCs = self.pcore_MC
epsilon_squared = self.epsilon_squared
mu = self.mu
pi = self.pi
logger = self.logger

for cluster in pcore_MCs:

# For offline clustering, the core status of each cluster is determined by the cluster itself rather than
# by PreDeCon.
cluster_id = next(iter(cluster.id))

cluster_is_core = cluster.is_core(self.epsilon_squared, self.mu,
self.pi)
cluster_is_core = cluster.is_core(epsilon_squared, mu, pi)
if cluster_is_core:
num_core += 1

datapoints[cluster_id] = MicroclusterAsDatapoint(datapoint_dimension_values=cluster.cluster_centroids,
datapoint_id=cluster_id, is_core_cluster=cluster_is_core,
cluster_CF1=cluster.CF1, cluster_CF2=cluster.CF2,
cluster_cumulative_weight=cluster.cumulative_weight)
num_pcore = len(self.pcore_MC) - num_core
self.logger.info(f'Starting offline clustering with {num_core} core clusters and {num_pcore} pcore clusters.')
num_pcore = len(pcore_MCs) - num_core
logger.info(f'Starting offline clustering with {num_core} core clusters and {num_pcore} pcore clusters.')

predecon_offline = PreDeCon(datapoints=datapoints, dataset_dimensionality=self.dataset_dimensionality,
epsilon=self.upsilon,
delta=self.delta,
lambbda=self.pi,
mu=self.mu,
lambbda=pi,
mu=mu,
k=self.k)
predecon_offline.run()

self.final_clusters = predecon_offline.clusters
self.logger.info('Finish offline clustering for dataset with timepoint: {}'.format(dataset_daystamp))
self.logger.info("Offline clustering yield {} cores.".format(len(self.final_clusters)))
logger.info('Finish offline clustering for dataset with timepoint: {}'.format(dataset_daystamp))
logger.info("Offline clustering yield {} clusters.".format(len(self.final_clusters)))

def downgrade_microclusters(self):
self._downgrade_potential_microclusters()
Expand All @@ -461,26 +503,33 @@ def _downgrade_potential_microclusters(self):
no longer obeyed as in Definition 6 in paper[1].
"""

for potential_cluster in self.pcore_MC:
weight_threshold_obeyed = potential_cluster.cumulative_weight < self.beta * \
self.mu
pdim_threshold_obeyed = np.array(potential_cluster.preferred_dimension_vector > 1).sum() > \
self.pi
pcore_MCs = self.pcore_MC
outlier_MCs = self.outlier_MC
beta = self.beta
mu = self.mu
pi = self.pi

for potential_cluster in pcore_MCs:
weight_threshold_obeyed = potential_cluster.cumulative_weight < beta * mu
pdim_threshold_obeyed = np.array(potential_cluster.preferred_dimension_vector > 1).sum() > pi

if weight_threshold_obeyed or pdim_threshold_obeyed:

potential_cluster.id = list(range(len(self.outlier_MC), len(self.outlier_MC) + 1))
self.pcore_MC.remove(potential_cluster)
self.outlier_MC.append(potential_cluster)
# potential_cluster.id = list(range(len(self.outlier_MC), len(self.outlier_MC) + 1))
potential_cluster.id = [len(outlier_MCs)]
pcore_MCs.remove(potential_cluster)
outlier_MCs.append(potential_cluster)

def _downgrade_outlier_microclusters(self):
"""
Delete outlier microcluster. See section 4.4 in paper [1].
"""
for outlier_cluster in self.outlier_MC:
outlier_MCs = self.outlier_MC
omicron = self.omicron
for outlier_cluster in outlier_MCs:

if outlier_cluster.cumulative_weight <= self.omicron:
self.outlier_MC.remove(outlier_cluster)
if outlier_cluster.cumulative_weight <= omicron:
outlier_MCs.remove(outlier_cluster)
del outlier_cluster


Expand Down
Loading

0 comments on commit 2286ad0

Please sign in to comment.