-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
86 lines (74 loc) · 2.89 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from typing import List
import numpy as np
import pyshark
import pathlib
import json
import os
def open_json(filename, folder):
"""
Open a json file given its name and its containing folder
:param filename: name of the file to be opened
:param folder: relative path of the folder containing the file to open
"""
result_path = os.path.join(pathlib.Path(__file__).parent.absolute(), folder)
my_path = os.path.join(result_path, filename)
with open(my_path) as json_file:
return json.load(json_file)
def open_pcapng(filename, folder):
"""
Open a pcapng file given its name and it containing folder
:param filename: name of the file to be opened
:param folder: relative path of the folder containing the file to open
"""
result_path = os.path.join(pathlib.Path(__file__).parent.absolute(), folder)
my_path = os.path.join(result_path, filename)
return pyshark.FileCapture(my_path)
def get_activities(data):
"""
Find all the activities available in the reference file
:param data: the reference file
:return: a list of activities
"""
activities = []
for d in data:
activities.append(d)
return activities
def get_reference_values(data, activities):
"""
Find the all the referenced value of the activities
:param data: the reference file
:param activities: the list of activities
:return: a list of tuple (frame size, data rate) ordered by activity
"""
values = []
for a in activities:
temp = []
for i in range(len(data[a])):
temp.append(list(data[a].values())[i])
values.append(tuple(temp))
return values
def load_tr_data(frame_size: List[int], activity: str, feature_set, feature_label, p: int, interval_time):
"""
Load the training set list with the extrapolated data
:param interval_time: time between packets arriving
:param frame_size: up_frame_size or down_frame_size
:param activity: name of the training file
:param feature_set: up_features or down_features
:param feature_label: up_labels or down_labels
:param p: percentile deliminator
:return: the feature set, the label set and two empty lists, in order to clean up/down frame_size and data_rate
"""
# feature_set is (mean frame size, mean data rate, percentage of packets with size < p
feature_set.append([np.mean(frame_size), np.std(frame_size), (sum(i < p for i in frame_size) / len(frame_size)),
np.mean(interval_time)])
# feature_labels is index of file (will be useful to retrieve activity name)
feature_label.append(activity)
# Reset variables
return feature_set, feature_label, [], []
def get_activity_name(activity: str) -> str:
"""
Returns the name of the activity
:param activity: name of training set files (ActivityName_XX)
:return: the name of the activity
"""
return activity[0:-3]