-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimulation.py
87 lines (67 loc) · 1.93 KB
/
simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# Pedagogical Value-Aligned Crowdsourcing
# @Tony Runzhe Yang
from utils import *
def strong(H, belief, examples):
ll = lambda hx, y: 1.0 / (
1.0 + np.exp(-3 * (1 - 2 * np.abs(hx - y))))
frac = [1.0] * len(H)
nume = 0
for i in xrange(len(H)):
for eg in examples:
x, y = eg
frac[i] *= ll(H[i](x), y)
nume += frac[i] * belief[i]
return np.array(frac) * belief / nume
def median(H, belief, examples):
ll = lambda hx, y: 1.0 / (
1.0 + np.exp(-1.2 * (1 - 2 * np.abs(hx - y))))
frac = [1.0] * len(H)
nume = 0
for i in xrange(len(H)):
for eg in examples:
x, y = eg
frac[i] *= ll(H[i](x), y)
nume += frac[i] * belief[i]
return np.array(frac) * belief / nume
def weak(H, belief, examples):
ll = lambda hx, y: 1.0 / (
1.0 + np.exp(-0.4*(1 - 2 * np.abs(hx - y))))
frac = [1.0] * len(H)
nume = 0
for i in xrange(len(H)):
for eg in examples:
x, y = eg
frac[i] *= ll(H[i](x), y)
nume += frac[i] * belief[i]
return np.array(frac) * belief / nume
def random(H, __, ___,):
return l1normalize(np.random.rand(len(H)))
policy = {
"strong": strong,
"median": median,
"weak": weak,
"random": random
}
class Student:
def __init__(self, H, stu_type="strong"):
self.H = H
self.belief = l1normalize(np.random.rand(len(H)))
self.update = policy[stu_type]
def reset(self, belief=None, stu_type=None):
if belief is None:
self.belief = l1normalize(np.random.rand(len(self.H)))
else:
self.belief = belief
if stu_type is not None:
self.update = policy[stu_type]
def practice(self, queries):
k = len(queries)
h_ids = np.random.choice(len(self.H), size=k, p=self.belief)
obs = [(queries[i], self.H[h_ids[i]](queries[i])) for i in xrange(k)]
return obs
def learn(self, examples):
self.belief = self.update(self.H, self.belief, examples)
def real_eta(self, G):
return sum(
[sum([(1 - np.abs(self.H[i](x) - y)) * self.belief[i]
for i in xrange(len(self.H))]) for (x, y) in G]) / len(G)