-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathCSN_utils.py
307 lines (266 loc) · 12.6 KB
/
CSN_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
import os
import math
from tqdm import tqdm
import json
import numpy as np
class Utils:
def __init__(self, minScale=-25, maxScale=25):
self.minScale = minScale
self.maxScale = maxScale
def normalize(self, embeddings):
minX = min(embeddings, key=lambda x: x[0])[0]
rangeX = max(embeddings, key=lambda x: x[0])[0] - minX
minY = min(embeddings, key=lambda x: x[1])[1]
rangeY = max(embeddings, key=lambda x: x[1])[1] - minY
rangeScale = self.maxScale + 0.9999999999 - self.minScale
for index, e in enumerate(embeddings):
embeddings[index][0] = (embeddings[index][0] - minX) / rangeX * rangeScale + self.minScale
embeddings[index][1] = (embeddings[index][1] - minY) / rangeY * rangeScale + self.minScale
return embeddings
def center(self, embeddings):
offsetA = (max(embeddings, key=lambda x: x[0])[0] + min(embeddings, key=lambda x: x[0])[0]) / 2
offsetB = (max(embeddings, key=lambda x: x[1])[1] + min(embeddings, key=lambda x: x[1])[1]) / 2
for index, e in enumerate(embeddings):
embeddings[index][0] = embeddings[index][0] - offsetA
embeddings[index][1] = embeddings[index][1] - offsetB
return embeddings
def write_metadata(directory, metadata, imageFileColumn):
# rename metadata.imageFileColumn to metadata.filename
metadata.rename(columns={imageFileColumn: "filename"}, inplace=True)
metadata.reset_index(inplace=True)
# save metadata file
result = metadata.to_json(orient="records")
with open(f'build/datasets/{directory}/metadata.json', "w") as f:
f.write(result)
print("saved metadata.json")
def write_config(directory, title=None, description='', mappings=None, clusters=None, total=0, sliderSetting=None, infoColumns=None, searchFields=None, imageWebLocation=None, spriteRows=32, spriteNumb=None, squareSize=2048, spriteSize=64, spriteDir=None):
configData = {}
configData["title"] = title
configData["datasetInfo"] = description
configData["embeddings"] = mappings
configData["clusters"] = clusters
configData["total"] = total
configData["sliders"] = sliderSetting
if infoColumns:
configData["info"] = infoColumns
configData["search"] = searchFields
configData["url_prefix"] = imageWebLocation
configData["sprite_side"] = spriteRows
if spriteDir:
configData["sprite_dir"] = spriteDir
else:
configData["sprite_dir"] = directory
if spriteNumb:
configData["sprite_number"] = spriteNumb
else:
# count files in sprite directory
spriteNumb = 0
for file in os.listdir(f'build/datasets/{configData["sprite_dir"]}'):
if file.startswith("tile_"):
spriteNumb += 1
configData["sprite_number"] = spriteNumb
configData["sprite_image_size"] = spriteSize
configData["sprite_actual_size"] = squareSize
with open(f'build/datasets/{directory}/config.json', 'w') as f:
json.dump(configData, f, indent=4, cls=NumpyEncoder)
print("saved config.json")
class NumpyEncoder(json.JSONEncoder):
""" Special json encoder for numpy types """
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
return json.JSONEncoder.default(self, obj)
class ImageSpriteGenerator:
def __init__(self, directory, spriteSize=2048, spriteRows=32, imageFolder=None, files=None, reduced_colors=False):
self.directory = directory
self.imageFolder = imageFolder
self.files = files
self.reduced_colors = reduced_colors
self.spriteSize = spriteSize
self.spriteRows = spriteRows
self.columns = spriteRows
self.squareSize = int(spriteSize / spriteRows)
self.imgPerSprite = spriteRows * spriteRows
def generate(self):
try:
from PIL import Image
except ImportError:
print("Pillow is not installed. Please run: !pip install Pillow")
def resizeImgSprite(image):
(w, h) = image.size
max_dim = max(w, h)
new_w = int(w / max_dim * self.squareSize)
new_h = int(h / max_dim * self.squareSize)
x_dif = int((self.squareSize - new_w) / 2)
y_dif = int((self.squareSize - new_h) / 2)
new_w = max(9, new_w)
new_h = max(9, new_h)
return image.resize((new_w-8, new_h-8),Image.LANCZOS), new_w, new_h, x_dif, y_dif
imgPerSprite = self.spriteRows*self.spriteRows
self.numbSprites = math.ceil(len(self.files)/imgPerSprite)
for spriteNum in tqdm(range(self.numbSprites), desc = "Generating sprites"):
result = Image.new("RGBA", (self.spriteSize, self.spriteSize), (255, 0, 0, 0))
for i in range(imgPerSprite):
img_idx = i+(spriteNum*imgPerSprite)
if img_idx >= len(self.files):
break
entry = self.files[img_idx]
try:
image = Image.open(os.path.join(self.imageFolder, entry))
except:
print(f"Skipping invalid image file: {entry}")
continue
resizedImage,w,h,x_dif,y_dif = resizeImgSprite(image)
r_result = Image.new("RGBA", (w, h), (255, 0, 0, 0))
r_inner = Image.new("RGBA", (w-4, h-4), (1, 1, 1, 1)) # produces an almost transparent border to indicate clusters in the tool
r_result.paste(r_inner, (2,2))
r_result.paste(resizedImage, (4,4))
x = i % self.spriteRows * self.squareSize + x_dif
y = i // self.spriteRows * self.squareSize + y_dif
result.paste(r_result, (x, y, x + w, y + h))
result = result.resize((self.spriteSize, self.spriteSize), Image.LANCZOS) # Use LANCZOS filter for better image quality
if self.reduced_colors == True:
# convert to 256 colors for faster loading online
result = result.convert("P", palette=Image.WEB, dither=Image.FLOYDSTEINBERG)
result.save(f'build/datasets/{self.directory}/tile_{spriteNum}.png', "PNG", optimize=True)
class SimplePlot:
def __init__(self, directory, A=None ,B=None, metadata=None):
self.directory = directory
self.makePlot(A,B, metadata)
def makePlot(self, A,B, metadata):
plot = metadata[[A,B]]
normalizedPlot = Utils().normalize(plot.values)
print("normalized plot")
centeredEmbedding = Utils().center(normalizedPlot)
print("centered embedding")
self.filename = (A + "_" + B).replace(" ","")
# save file
with open(f'build/datasets/{self.directory}/{self.filename}.json', "w") as out_file:
out = json.dumps(centeredEmbedding, cls=NumpyEncoder)
out_file.write(out)
print(f"saved {self.filename}.json")
return {"name": self.filename, "file": self.filename + ".json"}
class PCAGenerator:
# See PCA documentation: https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.eA.html
def __init__(self, directory, scale=True, data=None, components=3):
self.directory = directory
self.data = data
self.components = components
self.scale = scale
def generate(self):
try:
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
except ImportError:
print("sklearn is not installed. Please run: !pip install sklearn")
print("Performing PCA...")
x = StandardScaler().fit_transform(self.data)
pca = PCA(n_components=self.components)
embedding = pca.fit_transform(x)
if self.scale:
normalized = Utils().normalize(embedding)
centeredEmbedding = Utils().center(normalized)
else:
centeredEmbedding = embedding
print("...done")
PCMap = centeredEmbedding.reshape(-1,2)
# save file
with open(f'build/datasets/{self.directory}/PCA.json', "w") as out_file:
out = json.dumps(PCMap, cls=NumpyEncoder)
out_file.write(out)
print(f"saved PCA.json")
return centeredEmbedding
class UMAPGenerator:
def __init__(self, directory, data=None, n_neighbors=15, min_dist=0.18, metric="correlation", verbose=True):
self.directory = directory
self.data = data
self.n_neighbors = n_neighbors
self.min_dist = min_dist
self.metric = metric
self.verbose = verbose
def generate(self):
try:
import umap
from sklearn.preprocessing import StandardScaler
except ImportError:
print("umap is not installed. Please run: !pip install umap-learn")
print("Generating UMAP...")
scaled_penguin_data = StandardScaler().fit_transform(self.data)
reducer = umap.UMAP(n_neighbors=self.n_neighbors,
min_dist=self.min_dist,
metric=self.metric,
verbose=self.verbose)
embedding = reducer.fit_transform(scaled_penguin_data)
normalized = Utils().normalize(embedding)
centeredEmbedding = Utils().center(normalized)
print("...done")
# save file
with open(f'build/datasets/{self.directory}/UMAP.json', "w") as out_file:
out = json.dumps(centeredEmbedding, cls=NumpyEncoder)
out_file.write(out)
print(f"saved UMAP.json")
class TSNEGenerator:
def __init__(self, directory, data=None, n_components=2, verbose=1, random_state=123):
self.directory = directory
self.data = data
self.n_components = n_components
self.verbose = verbose
self.random_state = random_state
def generate(self):
try:
from sklearn.manifold import TSNE
from sklearn.preprocessing import StandardScaler
except ImportError:
print("sklearn is not installed. Please run: !pip install sklearn")
print("Generating t-SNE...")
x = StandardScaler().fit_transform(self.data)
tsne = TSNE(n_components=self.n_components, verbose=self.verbose, random_state=self.random_state)
embedding = tsne.fit_transform(x)
normalized = Utils().normalize(embedding)
centeredEmbedding = Utils().center(normalized)
print("...done")
# save file
with open(f'build/datasets/{self.directory}/TSNE.json', "w") as out_file:
out = json.dumps(centeredEmbedding, cls=NumpyEncoder)
out_file.write(out)
print(f"saved TSNE.json")
class HistogramGenerator:
def __init__(self, directory, data=None, selection=None, bucketCount = 50):
self.directory = directory
self.data = data
self.selection = selection
self.bucketCount = bucketCount
def prepareBuckets(self, MIN, MAX, data):
# prepare Slider Bar Historgram
buckets = {}
bucketsSize = {}
if (MIN < 0):
stepSize = (abs(MIN) + abs(MAX)) / self.bucketCount
else:
stepSize = abs((abs(MIN) - abs(MAX)) / self.bucketCount)
for i in range(0, self.bucketCount):
buckets[i] = []
bucketsSize[i] = 0
for index, e in enumerate(data):
if (e == MAX):
targetBucket = self.bucketCount-1
try:
targetBucket = math.floor((e - MIN) / stepSize)
buckets[targetBucket].append(index)
bucketsSize[targetBucket]+=1
except:
pass
return {"histogram":list(bucketsSize.values()), "selections":list(buckets.values()), "range":[int(MIN),int(MAX)]}
def generate(self):
bucketData = {}
for element in self.selection:
print("preparing Slider Bar Historgram data", element)
bucketData[element] = {str(element):{"histogram":[], "selections":[]}}
bucketData[element] = self.prepareBuckets(self.data[element].min(), self.data[element].max(), self.data[element].values.tolist())
with open(f'build/datasets/{self.directory}/barData.json', "w") as f:
json.dump(bucketData , f)
print(f'saved barData.json')