Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

50 analysis module first rule based analysis implemented #90

Merged
merged 14 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions apps/analyzer/metadata_analyzer/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
class Analyzer:
def init(database, backend, simple_analyzer):
def init(database, backend, simple_analyzer, simple_rule_based_analyzer):
Analyzer.database = database
Analyzer.backend = backend
Analyzer.simple_analyzer = simple_analyzer
Analyzer.simple_rule_based_analyzer = simple_rule_based_analyzer

def analyze():
data = list(Analyzer.database.get_results())
Expand All @@ -20,7 +21,7 @@ def analyze():
def _convert_result(result):
return {
"id": result.uuid,
"sizeMB": result.data_size // 1_000_000,
"sizeMB": result.data_size / 1_000_000,
"creationDate": result.start_time.isoformat(),
}

Expand All @@ -44,13 +45,26 @@ def update_data():

# Send a full batch
if len(batch) == 100:
Analyzer.backend.sendBackupDataBatched(batch)
Analyzer.backend.send_backup_data_batched(batch)
batch = []

# Send the remaining results
if len(batch) > 0:
Analyzer.backend.sendBackupDataBatched(batch)
Analyzer.backend.send_backup_data_batched(batch)

return {"count": count}


def simple_rule_based_analysis(alert_limit):
data = list(Analyzer.database.get_results())
result = Analyzer.simple_rule_based_analyzer.analyze(data, alert_limit)
return result

def simple_rule_based_analysis_diff(alert_limit):
data = list(Analyzer.database.get_results())
result = Analyzer.simple_rule_based_analyzer.analyze_diff(data,alert_limit)
return result

def simple_rule_based_analysis_inc(alert_limit):
data = list(Analyzer.database.get_results())
result = Analyzer.simple_rule_based_analyzer.analyze_inc(data,alert_limit)
return result
8 changes: 7 additions & 1 deletion apps/analyzer/metadata_analyzer/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ class Backend:
def __init__(self, backend_url):
self.backend_url = backend_url

def sendBackupDataBatched(self, batch):
def send_backup_data_batched(self, batch):
url = self.backend_url + "backupData/batched"
r = requests.post(url, json=batch)
r.raise_for_status()

def create_alert(self, alert):
url = self.backend_url + "alerting"
r = requests.post(url, json=alert)
r.raise_for_status()
21 changes: 20 additions & 1 deletion apps/analyzer/metadata_analyzer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dotenv import load_dotenv
from metadata_analyzer.database import Database
from metadata_analyzer.simple_analyzer import SimpleAnalyzer
from metadata_analyzer.simple_rule_based_analyzer import SimpleRuleBasedAnalyzer
from metadata_analyzer.analyzer import Analyzer
from metadata_analyzer.backend import Backend
from flasgger import Swagger
Expand Down Expand Up @@ -142,12 +143,30 @@ def update_data():
"""
return jsonify(Analyzer.update_data())

@app.route("/simpleRuleBasedAnalysis", methods=["POST"])
def simple_rule_based_analysis():
json = request.get_json()
alert_limit = json["alertLimit"]
return jsonify(Analyzer.simple_rule_based_analysis(alert_limit))

@app.route("/simpleRuleBasedAnalysisDiff", methods=["POST"])
def simple_rule_based_analysis_diff():
json = request.get_json()
alert_limit = json["alertLimit"]
return jsonify(Analyzer.simple_rule_based_analysis_diff(alert_limit))

@app.route("/simpleRuleBasedAnalysisInc", methods=["POST"])
def simple_rule_based_analysis_inc():
json = request.get_json()
alert_limit = json["alertLimit"]
return jsonify(Analyzer.simple_rule_based_analysis_inc(alert_limit))

def main():
database = Database()
backend = Backend(os.getenv("BACKEND_URL"))
simple_analyzer = SimpleAnalyzer()
Analyzer.init(database, backend, simple_analyzer)
simple_rule_based_analyzer = SimpleRuleBasedAnalyzer(backend, 0.2, 0.2, 0.2, 0.2)
Analyzer.init(database, backend, simple_analyzer, simple_rule_based_analyzer)

new_port = os.getenv("FLASK_RUN_PORT")
int_port = int(new_port or 5000)
Expand Down
179 changes: 179 additions & 0 deletions apps/analyzer/metadata_analyzer/simple_rule_based_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import sys
from collections import defaultdict
import metadata_analyzer.backend
from datetime import datetime, timedelta

class SimpleRuleBasedAnalyzer:
def __init__(self, backend, size_alert_percentage, inc_percentage, inc_date_percentage, diff_percentage):
self.backend = backend
self.size_alert_percentage = size_alert_percentage
self.inc_data_percentage = inc_percentage
self.inc_date_percentage = inc_date_percentage
self.diff_percentage = diff_percentage

# Analyze a pair of consecutive results, returns a list of created alerts
def _analyze_pair(self, result1, result2, bound):
relative_change = self.handle_zero(result1, result2)
# Skip pairs of results with changes inside the bounds
if -bound <= relative_change <= bound:
return []

alert = {
"type": 0 if relative_change > 0 else 1,
"value": result2.data_size / 1_000_000,
"referenceValue": result1.data_size / 1_000_000,
"backupId": result2.uuid,
}
return [alert]

def handle_zero(self,result1, result2):
# Handle results with a data_size of zero
if result1.data_size == 0 and result2.data_size == 0:
relative_change = 0
elif result1.data_size == 0:
relative_change = float("inf")
elif result2.data_size == 0:
relative_change = -float("inf")
else:
relative_change = (result2.data_size - result1.data_size) / result1.data_size
return relative_change

# Analyze a pair of consecutive results, returns a list of created alerts
def _analyze_pair_diff(self, result1, result2):
relative_change = self.handle_zero(result1, result2)

# Skip pairs of results with changes inside the bounds that increase
if relative_change > 0 and relative_change <= self.diff_percentage:
return []

alert = {
"type": 0 if relative_change > 0 else 1,
"value": result2.data_size / 1_000_000,
"referenceValue": result1.data_size / 1_000_000,
"backupId": result2.uuid,
}

return [alert]

# For now only search for size changes and trigger corresponding alerts
def analyze(self, data, alert_limit):
# Group the 'full' results by their task
groups = defaultdict(list)
for result in data:
if (result.task == ""
or result.fdi_type != 'F'
or result.data_size is None
or result.start_time is None):
continue
groups[result.task].append(result)

alerts = []
# Iterate through each group to find drastic size changes
for task, unordered_results in groups.items():
results = sorted(unordered_results, key=lambda result: result.start_time)
# Iterate through each pair of consecutive results and compare their sizes
for result1, result2 in zip(results[:-1], results[1:]):
new_alerts = self._analyze_pair(result1, result2, self.size_alert_percentage)
alerts += new_alerts

# Only send a maximum of alert_limit alerts or all alerts if alert_limit is -1
count = len(alerts) if alert_limit == -1 else min(alert_limit, len(alerts))
# Send the alerts to the backend
for alert in alerts[:count]:
self.backend.create_alert(alert)

return {
"count": count
}

# Searches for size increases in diffs and trigger corresponding alerts if not applicable
def analyze_diff(self, data, alert_limit):
# Group the 'full' and 'diff results by their task
groups = defaultdict(list)
groupNum = 0
for result in data:
if (result.task == ""
or (result.fdi_type != 'F' and result.fdi_type != 'D')
or result.data_size is None
or result.start_time is None):
continue
if (result.fdi_type == 'F'):
groupNum += 1
continue
groups[groupNum].append(result)

alerts = []
# Iterates through groups to ensure size increases except when a full backup was done
for task, unordered_results in groups.items():
results = sorted(unordered_results, key=lambda result: result.start_time)
# Iterate through each pair of consecutive results and compare their sizes
for result1, result2 in zip(results[:-1], results[1:]):
new_alerts = self._analyze_pair_diff(result1, result2)
alerts += new_alerts

# Only send a maximum of alert_limit alerts or all alerts if alert_limit is -1
count = len(alerts) if alert_limit == -1 else min(alert_limit, len(alerts))
# Send the alerts to the backend
for alert in alerts[:count]:
self.backend.create_alert(alert)

return {
"count": count
}

# Searches for size changes in incs and triggers corresponding alerts if not applicable
def analyze_inc(self, data, alert_limit):

groups = defaultdict(list)
for result in data:
if (result.task == ""
or result.fdi_type != 'I'
or result.data_size is None
or result.start_time is None):
continue
groups[result.task].append(result)

alerts = []
# Iterates through groups to ensure size increases except when a full backup was done
for task, unordered_results in groups.items():
results = sorted(unordered_results, key=lambda result: result.start_time)

# For now assumes that average size of incs is base value from which to judge all incs, may be subject to change
# Iterate through each results get an average value
avg_size = 0
prev_time = results[0].start_time
avg_time = timedelta(0)


for result in results:
avg_size += result.data_size
avg_time += result.start_time - prev_time
prev_time = result.start_time

avg_size = avg_size/(len(results))
avg_time = avg_time/(len(results)-1)

#if(True): # so times are regular in margin and data sizes are same in margin

for prev, current in zip(results[:-1], results[1:]):

interval = current.start_time - prev.start_time
# only compares if incs happened at quasi-regular intervals
if(interval >= avg_time * (1 - self.inc_date_percentage) and interval <= avg_time * (1 + self.inc_date_percentage)):
# converts prev to a result with the average size
prev.data_size = avg_size
new_alerts = self._analyze_pair(prev, current, self.inc_data_percentage)
alerts += new_alerts

# Only send a maximum of alert_limit alerts or all alerts if alert_limit is -1
count = len(alerts) if alert_limit == -1 else min(alert_limit, len(alerts))
# Send the alerts to the backend
for alert in alerts[:count]:
self.backend.create_alert(alert)

return {
"count": count
}



10 changes: 10 additions & 0 deletions apps/analyzer/tests/mock_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class MockBackend:
def __init__(self):
self.backups = []
self.alerts = []

def send_backup_data_batched(self, batch):
self.backups += batch

def create_alert(self, alert):
self.alerts.append(alert)
6 changes: 6 additions & 0 deletions apps/analyzer/tests/mock_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class MockDatabase:
def __init__(self, results):
self.results = results

def get_results(self):
return iter(self.results)
21 changes: 5 additions & 16 deletions apps/analyzer/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,10 @@
from metadata_analyzer.main import hello_world, update_data
from metadata_analyzer.analyzer import Analyzer
from metadata_analyzer.models import Result
from metadata_analyzer.simple_rule_based_analyzer import SimpleRuleBasedAnalyzer
from datetime import datetime

class MockDatabase:
def __init__(self, results):
self.results = results

def get_results(self):
return iter(self.results)

class MockBackend:
def __init__(self):
self.backups = []

def sendBackupDataBatched(self, batch):
self.backups += batch
from tests.mock_backend import MockBackend
from tests.mock_database import MockDatabase

def test_hello_world():
"""Test the hello_world function."""
Expand All @@ -32,8 +21,8 @@ def test_update_data():

database = MockDatabase([mock_result])
backend = MockBackend()
simple_analyzer = None
Analyzer.init(database, backend, simple_analyzer)
Analyzer.init(database, backend, None, None)
Analyzer.update_data()

assert backend.backups == [Analyzer._convert_result(mock_result)]

Loading