-
Notifications
You must be signed in to change notification settings - Fork 7
/
test_etl_pipeline.py
117 lines (88 loc) · 4.23 KB
/
test_etl_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import csv
import unittest
import os
from etl_pipeline import FlattenSqlDB, CreateDbFromCsv
from create_sensor_db import MySQLCreate
class MySqlCreateTest(unittest.TestCase):
def setUp(self):
self.mysql = MySQLCreate(host='localhost',
user='root',
password='')
def tearDown(self):
self.mysql = None
def test_create_table(self):
self.mysql.create_table()
with self.mysql.connection.cursor() as cursor:
# Check that table is created and empty
self.assertEqual(0, cursor.execute("select * from `SensorTable`;"))
def test_write_to_db(self):
self.mysql.create_table()
self.mysql.write_to_db()
with self.mysql.connection.cursor() as cursor:
# Check that table is created and filled
self.assertEqual(4320, cursor.execute("select * from `SensorTable`;"))
# Chose a time and sensor reading that was compressed in the initial csv input
cursor.execute("select S.Reading from `SensorTable` S "
"where S.Time = '2017-01-01 0:06' and S.SensorId = 1 ;")
rows = cursor.fetchall()
self.assertEqual(rows[0]["Reading"], 303)
self.assertEqual(len(rows), 1)
# Chose a time and sensor reading that was not compressed in the initial csv input
cursor.execute("select S.Reading from `SensorTable` S "
"where S.Time = '2017-01-01 0:10' and S.SensorId = 2 ;")
rows = cursor.fetchall()
self.assertEqual(rows[0]["Reading"], 304)
self.assertEqual(len(rows), 1)
def test_write_table_to_csv(self):
filepath = 'test.csv'
if os.path.isfile(filepath):
os.remove(filepath)
self.mysql.fill_db(filepath)
with open(filepath, 'r') as raw_file:
raw_data = csv.reader(raw_file, delimiter=',', quoting=csv.QUOTE_NONE)
data_list = list(raw_data)[1:]
row = data_list[0]
self.assertEqual(row[0], '2017-01-01 00:00')
self.assertEqual(int(row[1]), 1)
self.assertEqual(int(row[2]), 1)
self.assertEqual(int(row[3]), 305)
class EtlPipelineTest(unittest.TestCase):
def setUp(self):
self.mysql = MySQLCreate(host='localhost',
user='root',
password='')
def tearDown(self):
self.mysql = None
def test_generate_flat_sensor_table(self):
self.mysql.create_table()
self.mysql.write_to_db()
FlattenSqlDB.generate_flat_sensor_table(self.mysql.connection)
with self.mysql.connection.cursor() as cursor:
# Check whether there is data in the new table, SensorsData
self.assertTrue(cursor.execute("select * from `SensorsData`;") > 0)
self.assertTrue(len(cursor.fetchall()) > 0)
# Fetch sensor readings for time 0 and use the sample dataset to validate whether the values are correct
cursor.execute("select S.Sensor1Reading, S.Sensor2Reading, S.Sensor3Reading from `SensorsData` S "
"where S.Time = '2017-01-01 0:00' ;")
rows = cursor.fetchall()
self.assertEqual(1, len(rows))
self.assertEqual(int(rows[0]["Sensor1Reading"]), 305)
self.assertEqual(int(rows[0]["Sensor2Reading"]), 301)
self.assertEqual(int(rows[0]["Sensor3Reading"]), 304)
def test_write_table_to_csv(self):
self.mysql.create_table()
self.mysql.write_to_db()
FlattenSqlDB.generate_flat_sensor_table(self.mysql.connection)
filepath = 'test.csv'
if os.path.isfile(filepath):
os.remove(filepath)
FlattenSqlDB.write_flat_table_to_csv(self.mysql.connection, filepath)
with open(filepath, 'r') as raw_file:
raw_data = csv.reader(raw_file, delimiter=',', quoting=csv.QUOTE_NONE)
data_list = list(raw_data)[1:]
row = data_list[0]
self.assertEqual(row[0], '2017-01-01 00:00')
self.assertEqual(int(row[1]), 1)
self.assertEqual(int(row[2]), 305)
self.assertEqual(int(row[3]), 301)
self.assertEqual(int(row[4]), 304)