-
Notifications
You must be signed in to change notification settings - Fork 0
/
COMP565_A1_ldsr.py
114 lines (79 loc) · 2.98 KB
/
COMP565_A1_ldsr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import pandas as pd
import numpy as np
# README: https://github.com/mikemikezhu/ld_score_regression
"""
Constants
"""
SAMPLE_SIZE = 1000
TOTAL_FEATURES = 4268
"""
Data Loader
"""
class DataLoaderService:
""" Load LD Matrix (Pearson Correlation) """
def load_correlation(self, path: str,
compression: str) -> np.ndarray:
result = pd.read_csv(path, compression=compression)
print("LD matrix:")
print(result.head())
result = result.to_numpy()[:, 1:]
print("LD matrix shape: {}".format(result.shape))
return result
""" Load marginal statistics """
def load_marginal_statistics(self, path: str,
compression: str) -> np.ndarray:
result = pd.read_csv(path, compression=compression)
print("Beta marginal statistics:")
print(result.head())
result = result.to_numpy()[:, 1]
print("Beta marginal statistics shape: {}".format(result.shape))
return result
"""
LD Score
"""
class LdScoreCalculator:
def calculate_ld_score(self, correlation: np.ndarray) -> np.ndarray:
# LD score is the sum of the squared Pearson correlation between SNP j and SNP k
result = np.sum(np.square(correlation), axis=1)
print("LD Score:")
print(result)
print("LD Score shape: {}".format(result.shape))
return result
"""
LD Score Regression
"""
class LdScoreRegressionCalculator:
def calculate_heritability(self, ld_score: np.ndarray,
marginal_statistics: np.ndarray,
sample_size: int,
total_features: int) -> float:
numerator = np.sum(
ld_score * (np.square(marginal_statistics) - 1 / sample_size))
denominator = np.sum(np.square(ld_score)) / total_features
return numerator / denominator
"""
Main
"""
def main():
# Load data
data_loader = DataLoaderService()
r = data_loader.load_correlation("data/LD.csv.gz", 'gzip')
beta = data_loader.load_marginal_statistics(
"data/beta_marginal.csv.gz", 'gzip')
assert r.shape[0] == TOTAL_FEATURES
assert r.shape[1] == TOTAL_FEATURES
assert len(beta) == TOTAL_FEATURES
# Calculate LD score
ld_score_calculator = LdScoreCalculator()
ld_score = ld_score_calculator.calculate_ld_score(r)
assert len(ld_score) == TOTAL_FEATURES
# Calculate heritability
regression_calculator = LdScoreRegressionCalculator()
heritability = regression_calculator.calculate_heritability(ld_score=ld_score,
marginal_statistics=beta,
sample_size=SAMPLE_SIZE,
total_features=TOTAL_FEATURES)
# Heritability: 0.19017122782624796
print("Heritability: {}".format(heritability))
if __name__ == "__main__":
main()