forked from mahaitongdae/Reachability_Constrained_RL
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel.py
159 lines (126 loc) · 5.12 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# =====================================
# @Time : 2020/8/10
# @Author : Yang Guan (Tsinghua Univ.)
# @FileName: model.py
# =====================================
import tensorflow as tf
from tensorflow import Variable
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import Dense
import numpy as np
tf.config.experimental.set_visible_devices([], 'GPU')
tf.config.threading.set_inter_op_parallelism_threads(1)
tf.config.threading.set_intra_op_parallelism_threads(1)
class MLPNet(Model):
def __init__(self, input_dim, num_hidden_layers, num_hidden_units, hidden_activation, output_dim, **kwargs):
super(MLPNet, self).__init__(name=kwargs['name'])
self.first_ = Dense(num_hidden_units,
activation=hidden_activation,
kernel_initializer=tf.keras.initializers.HeNormal(),
dtype=tf.float32)
self.hidden = Sequential([Dense(num_hidden_units,
activation=hidden_activation,
kernel_initializer=tf.keras.initializers.HeNormal(),
dtype=tf.float32) for _ in range(num_hidden_layers-1)])
output_activation = kwargs['output_activation'] if kwargs.get('output_activation') else 'linear'
if kwargs.get('output_bias'):
self.outputs = Dense(output_dim,
activation=output_activation,
kernel_initializer=tf.keras.initializers.HeNormal(),
bias_initializer=tf.keras.initializers.Constant(kwargs.get('output_bias')),
dtype=tf.float32)
else:
self.outputs = Dense(output_dim,
activation=output_activation,
kernel_initializer=tf.keras.initializers.HeNormal(),
bias_initializer=tf.keras.initializers.Constant(0.),
dtype=tf.float32)
self.build(input_shape=(None, input_dim))
def call(self, x, **kwargs):
x = self.first_(x)
x = self.hidden(x)
x = self.outputs(x)
return x
class AlphaModel(Model):
def __init__(self, **kwargs):
super(AlphaModel, self).__init__(name=kwargs['name'])
self.log_alpha = tf.Variable(0., dtype=tf.float32)
class LamModel(Model):
def __init__(self, **kwargs):
super(LamModel, self).__init__(name=kwargs['name'])
self.var = tf.Variable(-1., dtype=tf.float32)
class SiSParaModel(Model):
def __init__(self, **kwargs):
super(SiSParaModel, self).__init__(name=kwargs['name'])
init_var = kwargs.get('init_var', [0.1, 1.0, 2.0])
self.var = tf.Variable(init_var, dtype=tf.float32)
def test_alpha():
import numpy as np
alpha_model = AlphaModel(name='alpha')
print(alpha_model.trainable_weights)
print(len(alpha_model.trainable_weights))
print(alpha_model.get_weights())
print(alpha_model.log_alpha)
b = alpha_model.log_alpha
alpha_model.set_weights(np.array([3]))
print(b)
with tf.GradientTape() as tape:
b = 3.*alpha_model.log_alpha
print(tape.gradient(b, alpha_model.trainable_weights[0]))
def test_lam():
import numpy as np
with tf.GradientTape() as tape:
lam_model = LamModel(name='lam')
print(lam_model.trainable_weights)
print(len(lam_model.trainable_weights))
print(lam_model.get_weights())
print(lam_model.log_lam)
b = lam_model.log_lam
lam_model.set_weights(np.array([3]))
print(b)
c = 3.*lam_model.log_lam
print(tape.gradient(c, lam_model.trainable_weights[0]))
def test_attrib():
import numpy as np
a = Variable(0, name='d')
p = MLPNet(2, 2, 128, 1, name='ttt')
print(hasattr(p, 'get_weights'))
print(hasattr(p, 'trainable_weights'))
print(hasattr(a, 'get_weights'))
print(hasattr(a, 'trainable_weights'))
print(type(a))
print(type(p))
# print(a.name)
# print(p.name)
# p.build((None, 2))
p.summary()
# inp = np.random.random([10, 2])
# out = p.forward(inp)
# print(p.get_weights())
# print(p.trainable_weights)
def test_clone():
p = MLPNet(2, 2, 128, 1, name='ttt')
print(p._is_graph_network)
s = tf.keras.models.clone_model(p)
print(s)
def test_out():
import numpy as np
Qs = tuple(MLPNet(8, 2, 128, 1, name='Q' + str(i)) for i in range(2))
inp = np.random.random((128, 8))
out = [Q(inp) for Q in Qs]
print(out)
def test_memory():
import time
Q = MLPNet(8, 2, 128, 1)
time.sleep(111111)
def test_memory2():
import time
model = tf.keras.Sequential([tf.keras.layers.Dense(10, input_shape=(30,), activation='relu'),
tf.keras.layers.Dense(20, activation='relu'),
tf.keras.layers.Dense(20, activation='relu'),
tf.keras.layers.Dense(10, activation='relu')])
time.sleep(10000)
if __name__ == '__main__':
test_lam()