forked from bulik/ldsc
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathsimulate.py
105 lines (91 loc) · 2.77 KB
/
simulate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
Generates .sumstats and .l2.ldscore/.l2.M files used for simulation testing.
"""
from pathlib import Path
import numpy as np
import pandas as pd
N_INDIV = 10000
N_SIMS = 1000
N_SNP = 1000
h21 = 0.3
h22 = 0.6
Path("tests/simulate_test/ldscore/").mkdir(exist_ok=True)
Path("tests/simulate_test/sumstats/").mkdir(exist_ok=True)
def print_ld(x, fh, M):
l2 = ".l2.ldscore"
m = ".l2.M_5_50"
x.to_csv(fh + l2, sep="\t", index=False, float_format="%.3f")
print("\t".join(map(str, M)), file=open(fh + m, "w"))
# chr1
y = x.iloc[0 : int(len(x) / 2),]
y.to_csv(fh + "1" + l2, sep="\t", index=False, float_format="%.3f")
print("\t".join((str(x / 2) for x in M)), file=open(fh + "1" + m, "w"))
# chr2
y = x.iloc[int(len(x) / 2) : len(x),]
y.to_csv(fh + "2" + l2, sep="\t", index=False, float_format="%.3f")
print("\t".join((str(x / 2) for x in M)), file=open(fh + "2" + m, "w"))
two_ldsc = np.abs(100 * np.random.normal(size=2 * N_SNP)).reshape((N_SNP, 2))
single_ldsc = np.sum(two_ldsc, axis=1).reshape((N_SNP, 1))
M_two = np.sum(two_ldsc, axis=0)
M = np.sum(single_ldsc)
ld = pd.DataFrame(
{
"CHR": np.ones(N_SNP),
"SNP": ["rs" + str(i) for i in range(1000)],
"BP": np.arange(N_SNP),
}
)
# 2 LD Scores 2 files
split_ldsc = ld.copy()
split_ldsc["LD"] = two_ldsc[:, 0]
print_ld(split_ldsc, "tests/simulate_test/ldscore/twold_firstfile", [M_two[0]])
split_ldsc = ld.copy()
split_ldsc["LD"] = two_ldsc[:, 1] # both have same colname to test that this is ok
print_ld(split_ldsc, "tests/simulate_test/ldscore/twold_secondfile", [M_two[1]])
# 1 LD Score 1 file
ldsc = ld.copy()
ldsc["LD"] = single_ldsc
print_ld(ldsc, "tests/simulate_test/ldscore/oneld_onefile", [M])
# 2 LD Scores 1 file
ldsc = ld.copy()
ldsc["LD1"] = two_ldsc[:, 0]
ldsc["LD2"] = two_ldsc[:, 1]
print_ld(ldsc, "tests/simulate_test/ldscore/twold_onefile", M_two)
# Weight LD Scores
w_ld = ld.copy()
w_ld["LD"] = np.ones(N_SNP)
w_ld.to_csv(
"tests/simulate_test/ldscore/w.l2.ldscore",
index=False,
sep="\t",
float_format="%.3f",
)
# split across chromosomes
df = pd.DataFrame(
{
"SNP": ["rs" + str(i) for i in range(1000)],
"A1": ["A" for _ in range(1000)],
"A2": ["G" for _ in range(1000)],
"N": np.ones(1000) * N_INDIV,
}
)
for i in range(N_SIMS):
z = np.random.normal(size=N_SNP).reshape((N_SNP,))
c = np.sqrt(
1
+ N_INDIV
* (
h21 * two_ldsc[:, 0] / float(M_two[0])
+ h22 * two_ldsc[:, 1] / float(M_two[1])
)
)
z = np.multiply(z, c)
dfi = df.copy()
dfi["Z"] = z
dfi.reindex(np.random.permutation(dfi.index))
dfi.to_csv(
"tests/simulate_test/sumstats/" + str(i),
sep="\t",
index=False,
float_format="%.3f",
)