Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Mutation Strategy from RankInvariantChecker and Update Tests #51

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions skcriteria/cmp/ranks_rev/default_mutation_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .mutation_strategy import MutationStrategy
import pandas as pd
import numpy as np

class DefaultMutationStrategy(MutationStrategy):
def generate_mutations(self, dm: pd.DataFrame, rank: 'Rank') -> pd.DataFrame:
# Example mutation: Add random noise to the decision matrix
noise = np.random.uniform(-0.1, 0.1, size=dm.shape)
mutated_dm = dm + noise
return mutated_dm
8 changes: 8 additions & 0 deletions skcriteria/cmp/ranks_rev/mutation_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from abc import ABC, abstractmethod
import pandas as pd
import numpy as np

class MutationStrategy(ABC):
@abstractmethod
def generate_mutations(self, dm: pd.DataFrame, rank: 'Rank') -> pd.DataFrame:
pass
46 changes: 23 additions & 23 deletions skcriteria/cmp/ranks_rev/rank_inv_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@

import numpy as np
import numpy.lib.arraysetops as arrset

import pandas as pd

from .. import RanksComparator
from ...agg import RankResult
from ...core import SKCMethodABC
from ...utils import Bunch, unique_names
from .mutation_strategy import MutationStrategy
from .default_mutation_strategy import DefaultMutationStrategy

# =============================================================================
# CONSTANT
Expand Down Expand Up @@ -109,7 +110,6 @@ class RankInvariantChecker(SKCMethodABC):
random_state: int, numpy.random.default_rng or None (default: None)
Controls the random state to generate variations in the sub-optimal
alternatives.

"""

_skcriteria_dm_type = "rank_reversal"
Expand All @@ -129,6 +129,7 @@ def __init__(
allow_missing_alternatives=False,
last_diff_strategy="median",
random_state=None,
mutation_strategy: MutationStrategy = DefaultMutationStrategy(),
):
if not (hasattr(dmaker, "evaluate") and callable(dmaker.evaluate)):
raise TypeError("'dmaker' must implement 'evaluate()' method")
Expand All @@ -155,6 +156,11 @@ def __init__(
# RANDOM
self._random_state = np.random.default_rng(random_state)

# MUTATION STRATEGY
if not isinstance(mutation_strategy, MutationStrategy):
raise TypeError("'mutation_strategy' must be an instance of MutationStrategy")
self.mutation_strategy = mutation_strategy

def __repr__(self):
"""x.__repr__() <==> repr(x)."""
name = self.get_method_name()
Expand All @@ -181,21 +187,21 @@ def repeat(self):

@property
def allow_missing_alternatives(self):
"""True if any mutation is allowed that does not possess all the \
"""True if any mutation is allowed that does not possess all the
alternatives of the original decision matrix."""
return self._allow_missing_alternatives

@property
def last_diff_strategy(self):
"""Since the least preferred alternative has no lower bound (since \
there is nothing immediately below it), this function calculates a \
limit ceiling based on the bounds of all the other suboptimal \
"""Since the least preferred alternative has no lower bound (since
there is nothing immediately below it), this function calculates a
limit ceiling based on the bounds of all the other suboptimal
alternatives."""
return self._last_diff_strategy

@property
def random_state(self):
"""Controls the random state to generate variations in the \
"""Controls the random state to generate variations in the
sub-optimal alternatives."""
return self._random_state

Expand Down Expand Up @@ -225,7 +231,6 @@ def _maximum_abs_noises(self, *, dm, rank):
Each row contains the maximum possible absolute noise to worsen
the current alternative (``mutate``) with respect to the next
(``mute_next``).

"""
# TODO: room for improvement: pandas to numpy

Expand Down Expand Up @@ -261,7 +266,7 @@ def _maximum_abs_noises(self, *, dm, rank):
return maximum_abs_noises

def _mutate_dm(self, *, dm, mutate, alternative_max_abs_noise, random):
"""Create a new decision matrix by replacing a suboptimal alternative \
"""Create a new decision matrix by replacing a suboptimal alternative
with a slightly worse one.

The algorithm operates as follows:
Expand Down Expand Up @@ -296,7 +301,6 @@ def _mutate_dm(self, *, dm, mutate, alternative_max_abs_noise, random):
Decision matrix with the 'mutate' alternative "worsened".
noise: ``pandas.Series``
Noise used to worsen the alternative.

"""
# TODO: room for improvement: pandas to numpy

Expand Down Expand Up @@ -352,7 +356,6 @@ def _generate_mutations(self, *, dm, orank, repeat, random):
worsen.
noise: ``pandas.Series``
Noise used to worsen the 'mutated' alternative.

"""
# check the maximum absolute difference between any alternative and
# the next one in the ranking to establish a worse-limit
Expand Down Expand Up @@ -381,7 +384,7 @@ def _add_mutation_info_to_rank(
full_alternatives,
allow_missing_alternatives,
):
"""Adds information on how an alternative was "worsened" in the \
"""Adds information on how an alternative was "worsened" in the
decision matrix with respect to the original.

All aggregated information is included within the ``rrt1`` (Rank
Expand Down Expand Up @@ -414,7 +417,6 @@ def _add_mutation_info_to_rank(
patched_rank : ``skcriteria.agg.Rank``
Ranking with all the information about the worsened alternative and
the rank reversal test added to the `extra_.rrt1` attribute.

"""
# extract the original data
method = str(rank.method)
Expand Down Expand Up @@ -454,7 +456,7 @@ def _add_mutation_info_to_rank(
"iteration": iteration,
"mutated": mutated,
"noise": noise,
"missing_alternatives": alts_diff,
"missing_alternatives": alts_diff.tolist(),
},
)

Expand All @@ -468,7 +470,7 @@ def _add_mutation_info_to_rank(
return patched_rank

def evaluate(self, dm):
"""Executes a the invariance test.
"""Executes the invariance test.

Parameters
----------
Expand All @@ -481,14 +483,12 @@ def evaluate(self, dm):
An object containing multiple rankings of the alternatives, with
information on any changes made to the original decision matrix in
the `extra_` attribute. Specifically, the `extra_` attribute
contains a an object in the key `rrt1` that provides
contains an object in the key `rrt1` that provides
information on any changes made to the original decision matrix,
including the the noise applied to worsen any sub-optimal
including the noise applied to worsen any sub-optimal
alternative.

"""
# FIRST THE DATA THAT WILL BE USED IN ALL THE ITERATIONS ==============

# the test configuration
dmaker = self.dmaker
allow_missing_alternatives = self.allow_missing_alternatives
Expand All @@ -509,7 +509,7 @@ def evaluate(self, dm):
allow_missing_alternatives=allow_missing_alternatives,
)

# Here we create a containers for the rank comparator starting with
# Here we create containers for the rank comparator starting with
# the original rank
names, results = ["Original"], [patched_orank]

Expand All @@ -524,7 +524,7 @@ def evaluate(self, dm):
# calculate the new rank
mrank = dmaker.evaluate(mdm)

# add info about the mutation to rhe rank
# add info about the mutation to the rank
patched_mrank = self._add_mutation_info_to_rank(
rank=mrank,
mutated=mutated,
Expand All @@ -535,9 +535,9 @@ def evaluate(self, dm):
)

# store the information
names.append(f"M.{mutated}")
names.append(f"M.{mutated}_{it}")
results.append(patched_mrank)

# manually creates a new RankComparator
named_ranks = unique_names(names=names, elements=results)
return RanksComparator(named_ranks)
return RanksComparator(named_ranks)
49 changes: 27 additions & 22 deletions tests/cmp/ranks_rev/test_rank_inv_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,19 @@
# DOCS
# =============================================================================

"""test for skcriteria.cmp.rrtest1

"""

"""test for skcriteria.cmp.ranks_rev.rank_inv_check"""

# =============================================================================
# IMPORTS
# =============================================================================


import numpy as np

import pytest

import skcriteria as skc
from skcriteria.agg.similarity import TOPSIS
from skcriteria.cmp.ranks_rev.rank_inv_check import RankInvariantChecker
from skcriteria.utils import rank
from skcriteria.cmp.ranks_rev.default_mutation_strategy import DefaultMutationStrategy

# =============================================================================
# TESTS
Expand Down Expand Up @@ -54,18 +49,32 @@ class EvaluateNoCallable:
def test_RankInvariantChecker_invalid_last_diff_strategy():
class FakeDM:
def evaluate(self): ...

dmaker = FakeDM()
with pytest.raises(TypeError):
RankInvariantChecker(dmaker, last_diff_strategy=None)


# CHECK DOMINANCE =============================================================
def test_RankInvariantChecker_with_custom_mutation_strategy():
class CustomMutationStrategy(DefaultMutationStrategy):
def generate_mutations(self, dm: pd.DataFrame, rank: 'Rank') -> pd.DataFrame:
# Custom mutation logic
noise = np.random.uniform(-0.05, 0.05, size=dm.shape)
return dm + noise

dm = skc.datasets.load_simple_stock_selection()
dmaker = TOPSIS()
mutation_strategy = CustomMutationStrategy()
rrt1 = RankInvariantChecker(dmaker, mutation_strategy=mutation_strategy, random_state=42)
result = rrt1.evaluate(dm)
assert isinstance(result, skc.cmp.RanksComparator)
assert "Original" in result.names
# Additional assertions can be added based on custom mutation logic


def original_dominates_mutated(dm, result, alt_name):
original = dm.alternatives[alt_name]
noise = result[f"M.{alt_name}"].e_.rrt1.noise
noise = result.ranks[f"M.{alt_name}"].extra_.rrt1.noise
mutated = original + noise

dom = rank.dominance(original, mutated, dm.minwhere)
Expand Down Expand Up @@ -102,9 +111,6 @@ def test_RankInvariantChecker_van2021evaluation(windows_size):
assert original_dominates_mutated(dm, result, "DOGE")


# REMOVE AN ALTERNATIVE =======================================================


class RemoveAlternativeDMaker:
def __init__(self, dmaker, remove, remove_at_call):
self.dmaker = dmaker
Expand Down Expand Up @@ -154,9 +160,9 @@ def test_RankInvariantChecker_remove_one_alternative():

result = rrt1.evaluate(dm)

_, rank = result.ranks[1]
_, rank = result.ranks["M.AA"]

np.testing.assert_array_equal(rank.e_.rrt1.missing_alternatives, ["AA"])
np.testing.assert_array_equal(rank.extra_.rrt1.missing_alternatives, ["AA"])
assert rank.to_series()["AA"] == 6


Expand All @@ -170,10 +176,10 @@ def test_RankInvariantChecker_remove_two_alternatives():

result = rrt1.evaluate(dm)

_, rank = result.ranks[1]

_, rank = result.ranks["M.AA"]
np.testing.assert_array_equal(
rank.e_.rrt1.missing_alternatives, ["AA", "MM"]
rank.extra_.rrt1.missing_alternatives, ["AA", "MM"]
)

assert rank.to_series()["AA"] == 5
Expand All @@ -187,9 +193,8 @@ def test_RankInvariantChecker_repr():

result = repr(rrt1)
expected = (
f"<RankInvariantChecker {dmaker!r} repeats={1}, "
f"allow_missing_alternatives={False} "
f"last_diff_strategy={np.median!r}>"
f"<RankInvariantChecker {repr(dmaker)} repeats=1, "
f"allow_missing_alternatives=False last_diff_strategy={np.median!r}>"
)

assert result == expected
assert result == expected