forked from j-luo93/ASLI
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprepare_abc.py
417 lines (336 loc) · 17.4 KB
/
prepare_abc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
import pickle
import unicodedata
from argparse import ArgumentParser
from collections import Counter
from dataclasses import asdict
from functools import lru_cache, wraps
from typing import Any, Dict, List, Optional, Tuple, Type, Union
import networkx as nx
import numpy as np
import pandas as pd
import streamlit as st
from ipapy.ipastring import IPAString
from lingpy.sequence.sound_classes import ipa2tokens
from networkx.algorithms.components import connected_components
from networkx.algorithms.shortest_paths import shortest_path
from tqdm import tqdm
from dev_misc import NDA
from dev_misc.utils import ErrorRecord, recorded_try
from pypheature.nphthong import InvalidNphthong, Nphthong
from pypheature.process import (FeatureProcessor, InvalidBaseSegment,
NoMappingFound, NonUniqueMapping, Segment)
from pypheature.segment import ExclusivityFailure, InvalidSegment, Segment
from sound_law.utils import PDF, run_section, run_with_argument
@run_section('Loading data...', 'Loading done.')
def load_data(path: str) -> PDF:
return pd.read_csv(path, sep='\t', error_bad_lines=False)
class I2tException(Exception):
"""Raise this when you have any `i2t` issue."""
def i2t(ipa: str) -> List[str]:
"""ipa2token call. Raises error if return is empty."""
ret = ipa2tokens(ipa, merge_vowels=True, merge_geminates=False)
if not ret:
raise I2tException
return ret
def recorded_assign(df: PDF, new_name: str, old_name: str, func, error_cls=AssertionError) -> Tuple[PDF, PDF]:
errors = list()
df = df.assign(**{new_name: recorded_try(df, old_name, func, error_cls, errors=errors)})
error_df = df.iloc[[error.idx for error in errors]]
return df, error_df
def standardize(ph: str, ignore: bool = False) -> str:
ph = str(IPAString(unicode_string=ph, ignore=ignore))
return unicodedata.normalize('NFD', ph)
def fv_func(seg):
if isinstance(seg, Segment):
return (tuple([(k, str(v.value)) for k, v in sorted(asdict(seg).items()) if k not in ['ipa', 'diacritics', 'base']]), )
ret = tuple()
for v in seg.vowels:
ret += fv_func(v)
return ret
# # Compute distance matrix.
@lru_cache(maxsize=None)
def get_sub_cost(seg1: Segment, seg2: Segment, quiet: bool = True) -> float:
cost = 0.0
def helper(name, weight):
v1 = getattr(seg1, name).value
v2 = getattr(seg2, name).value
ret = (v1 is not v2) * weight
if not quiet:
print(f'{name} cost', ret, 'from', v1, 'and', v2)
return ret
# syllabic, consonantal, approximant, sonorant for sonority hierarchy.
for name in ['syllabic', 'consonantal', 'approximant', 'sonorant']:
cost += helper(name, 1.0 / 4)
# continuant and delayed_release for different obstruents, but delayed_release is optional.
if seg1.is_obstruent() and seg2.is_obstruent():
cost += helper('continuant', 0.5 / 2)
cost += helper('delayed_release', 0.5 / 2)
# trill and tap
if seg1.is_liquid() and seg2.is_liquid():
cost += helper('trill', 0.5 / 2)
cost += helper('tap', 0.5 / 2)
# dorsal features
cost += helper('dorsal', 1.0 / 3)
if seg1.is_dorsal() and seg2.is_dorsal():
for name in ['high', 'low', 'front', 'back']:
cost += helper(name, 0.5 / 4)
# coronal features
cost += helper('coronal', 1.0 / 3)
if seg1.is_coronal() and seg2.is_coronal():
for name in ['anterior', 'distributed', 'strident', 'lateral']:
cost += helper(name, 0.5 / 4)
# labial features.
cost += helper('labial', 1.0 / 3)
if seg1.is_labial() and seg2.is_labial():
for name in ['labiodental', 'round']:
cost += helper(name, 0.5 / 2)
# laryngeal features
for name in ['voice', 'spread_glottis', 'constricted_glottis']:
cost += helper(name, 0.25)
# some minor features.
for name in ['tense', 'round', 'long', 'nasal', 'overlong']:
cost += helper(name, 0.25)
return cost
def iter_seg(seg):
if isinstance(seg, Segment):
yield seg
else:
yield from seg.vowels
def edit_dist(seg1, seg2, ins_cost=1):
def get_len(seg):
return 1 if isinstance(seg, Segment) else len(seg)
l1 = get_len(seg1)
l2 = get_len(seg2)
subcost = np.zeros([l1, l2], dtype='float32')
for i, s1 in enumerate(iter_seg(seg1)):
for j, s2 in enumerate(iter_seg(seg2)):
subcost[i, j] = get_sub_cost(s1, s2)
l = min(l1, l2)
dist = np.full([l1 + 1, l2 + 1, l + 1], 10000, dtype='float32')
for i in range(l1 + 1):
dist[i, 0, 0] = i * ins_cost
for j in range(l2 + 1):
dist[0, j, 0] = j * ins_cost
for i in range(1, l1 + 1):
for j in range(1, l2 + 1):
for k in range(1, l + 1):
dist[i, j, k] = min(dist[i - 1, j - 1, k - 1] + subcost[i - 1, j - 1],
min(dist[i - 1, j, k], dist[i, j - 1, k]) + ins_cost)
return dist[l1, l2, l]
@run_section('Adding phones specific to our dataset...', 'Phones added.')
def add_phones(df: PDF, added_phones: List[str]) -> PDF:
df = df['rawIPA'].str.split().explode().reset_index().rename(columns={'index': 'raw_word_id'})
df = df.dropna(subset=['rawIPA'])
for ph in added_phones:
# ipa2tokens cannot properly deal with some overlong vowels.
df = df.append({
'raw_word_id': len(df),
'rawIPA': (f'{ph} ' * 100).strip()},
ignore_index=True)
return df
@run_section('Removing stress from transcriptions...', 'Removal done.')
def destress(df: PDF) -> PDF:
return df.assign(rawIPA=df['rawIPA'].apply(lambda ipa: ipa.replace('ˌ', '').replace('ˈ', '')))
@run_section('Tokenizing transcriptions...', 'Tokenization done.')
def tokenize(df: PDF) -> Tuple[PDF, PDF, str]:
old_name = 'rawIPA'
df, error_df = recorded_assign(df, 'raw_toks', old_name, i2t, I2tException)
df = df.dropna(subset=['raw_toks'])
return df, error_df, old_name
@run_section('Standardizing phones...', 'Standardization done.')
def standardize_phones(df: PDF) -> Tuple[PDF, PDF, str]:
raw_ph_df = PDF(set(df.explode('raw_toks')['raw_toks']), columns=['raw_ph'])
old_name = 'raw_ph'
# Use `recorded_assign` to record the errors, but use normal `assign` with `ignore=True` to get the output df.
raw_ph_df = raw_ph_df.assign(std_ph=raw_ph_df[old_name].apply(standardize, ignore=True))
_, error_df = recorded_assign(raw_ph_df, 'std_ph_with_error', old_name, standardize, ValueError)
return raw_ph_df, error_df, old_name
@run_section('Obtain feature vectors using `pypheature`...', 'Feature vectors obtained.')
def get_feature_vectors(df: PDF, processor: FeatureProcessor) -> Tuple[PDF, PDF, str]:
std_ph_df = PDF(set(df['std_ph']), columns=['std_ph'])
assert len(set(std_ph_df['std_ph'])) == len(std_ph_df)
old_name = 'std_ph'
std_ph_df, error_df = recorded_assign(std_ph_df, 'segment', old_name, processor.process,
(InvalidBaseSegment, InvalidNphthong, InvalidSegment, ExclusivityFailure))
std_ph_df = std_ph_df.dropna().reset_index(drop=True)
std_ph_df = std_ph_df.assign(fv=std_ph_df['segment'].apply(fv_func))
return std_ph_df, error_df, old_name
@run_section('Getting the prototypical sounds -- merging sounds into one if they are identical based on the feature vectors...',
'Merging done.')
def get_proto_phones(std_ph_df: PDF, raw_ph_df: PDF, words_df: PDF) -> PDF:
std_ph_lst = std_ph_df.pivot_table(index='fv', values='std_ph', aggfunc=list)
raw2std = raw_ph_df[['raw_ph', 'std_ph']].set_index('raw_ph', verify_integrity=True).to_dict()['std_ph']
std2cnt = words_df['raw_toks'].explode().apply(raw2std.get).value_counts().to_dict()
def get_proto_ph(lst: List[str]) -> str:
# You want the segment with the highest count, and then shortest length.
stats = [(std2cnt[seg], -len(standardize(seg, ignore=True))) for seg in lst]
max_stat = max(stats)
return lst[stats.index(max_stat)]
std_ph_lst = std_ph_lst.assign(proto_ph=std_ph_lst['std_ph'].apply(get_proto_ph))
std2proto = std_ph_lst.explode('std_ph').set_index('std_ph').to_dict()['proto_ph']
merged_cnt = pd.merge(PDF(std2cnt.items(), columns=['std_ph', 'cnt']),
PDF(std2proto.items(), columns=['std_ph', 'proto_ph']),
left_on='std_ph', right_on='std_ph', how='inner')
return merged_cnt
def show_errors(error_df: PDF, old_name: str):
st.write(f'{len(error_df)} errors in total, results computed from `{old_name}` column:')
st.write(error_df)
@run_section('Getting phones to keep (frequency >= 50)...', 'Done.')
def get_kept_phones(df: PDF, processor: FeatureProcessor) -> Tuple[PDF, List[int], Dict[str, int], List[int], List[Union[Segment, Nphthong]]]:
proto2cnt = df.pivot_table(index='proto_ph', values='cnt',
aggfunc='sum').sort_values('cnt', ascending=False)
i2pp = list(proto2cnt.index)
pp2i = {pp: i for i, pp in enumerate(i2pp)}
kept_ids = [i for pp, i in pp2i.items() if proto2cnt.loc[pp]['cnt'] >= 50]
segments = [processor.process(pp) for pp in i2pp]
return proto2cnt, i2pp, pp2i, kept_ids, segments
@run_section('Loading feature processor...', 'Loading done.')
def load_processor() -> FeatureProcessor:
return FeatureProcessor()
@run_section('Getting edit dist between prototypes...', 'Computation done.', suppress_st_warning=True)
def get_edit_dist(i2pp: List[str], segments: List[Union[Segment, Nphthong]], insert_cost: float) -> NDA:
dist_mat = np.zeros([len(i2pp), len(i2pp)], dtype='float32')
pbar = st.progress(0.0)
pbar_status = st.empty()
for i, seg1 in tqdm(enumerate(segments)):
for j, seg2 in enumerate(segments):
dist_mat[i, j] = edit_dist(seg1, seg2, ins_cost=insert_cost)
pbar.progress(i / len(segments))
pbar_status.text(f'{i + 1} / {len(segments)} = {((i + 1) / len(segments) * 100.0):.1f}% done.')
return dist_mat
def should_proceed(key: str) -> bool:
if st._is_running_with_streamlit:
return st.radio('Proceed?', ['Yes', 'No'], index=1, key=key) == 'Yes'
return True
def get_connected_sounds(ph, g, kept_dist_mat, kept_i2pp, kept_pp2i) -> PDF:
i = kept_pp2i[ph]
ret = list()
for u, v in g.edges(i):
sound = kept_i2pp[v]
ret.append((sound, float(kept_dist_mat[i, v])))
return PDF(ret, columns=['IPA', 'distance'])
if __name__ == "__main__":
parser = ArgumentParser()
st.title('Prepare alphabet.')
st.header('Specify the arguments first:')
data_path = run_with_argument('data_path',
parser=parser,
default='data/northeuralex-0.9-forms.tsv',
msg="Path to the NorthEuraLex dataset.")
raw_words_df = load_data(data_path)
# Add some phones to the dataset -- they might not be present in the original data.
added_phones = ['oːː', 'eːː', 'õː', 'ĩː', 'xʷ', 'gʷ', 'hʷ', 'ay', 'iuː', 'ioː',
'io', 'eːo', 'æa', 'æːa', 'eo', 'iːu', 'iu', 'ɣː', 'ðː', 'wː', 'θː', 'βː', 'øy',
'tʲː', 'dʲː']
words_df = add_phones(raw_words_df, added_phones)
st.write(f'{", ".join(added_phones)}')
words_df = destress(words_df)
words_df, error_df, old_name = tokenize(words_df)
show_errors(error_df, old_name)
if should_proceed('tokenized'):
raw_ph_df, error_df, old_name = standardize_phones(words_df)
show_errors(error_df, old_name)
if should_proceed('standardized'):
processor = load_processor()
std_ph_df, error_df, old_name = get_feature_vectors(raw_ph_df, processor)
show_errors(error_df, old_name)
if should_proceed('prototype'):
merged_cnt = get_proto_phones(std_ph_df, raw_ph_df, words_df)
proto2cnt, i2pp, pp2i, kept_ids, segments = get_kept_phones(merged_cnt, processor)
st.write(f'{len(kept_ids)} sounds are kept.')
insert_cost = 1.0
dist_mat = get_edit_dist(i2pp, segments, insert_cost)
kept_dist_mat = dist_mat[np.asarray(kept_ids).reshape(-1, 1), kept_ids]
kept_i2pp = [i2pp[i] for i in kept_ids]
kept_pp2i = {pp: i for i, pp in enumerate(kept_i2pp)}
# Building graphs of connection.
top_k = 10
g = nx.Graph()
for i, pp in enumerate(kept_i2pp):
g.add_node(i)
# There are two ways of adding an edge. First, the distance is <= insert_cost and it falls within the top k neighbors.
sort_i = kept_dist_mat[i, :].copy().argsort()
dists = kept_dist_mat[i, sort_i]
# Since the closest sound is always itself, we need to use index top_k, instead of top_k - 1.
# max_dist = max(dists[top_k], insert_cost)
max_dist = dists[top_k]
for j, d in zip(sort_i, dists):
if d > max_dist:
break
if i != j:
g.add_edge(i, j)
# Second, you might add a new vowel for nphthongs.
seg = segments[pp2i[pp]]
if isinstance(seg, Segment) and seg.is_vowel():
for j, pp in enumerate(kept_i2pp):
seg_j = segments[pp2i[pp]]
if isinstance(seg_j, Nphthong) and len(seg_j) == 2 and kept_dist_mat[i, j] == insert_cost:
g.add_edge(i, j)
elif isinstance(seg, Nphthong):
for j, pp in enumerate(kept_i2pp):
seg_j = segments[pp2i[pp]]
if isinstance(seg_j, Segment) and seg_j.is_vowel() and kept_dist_mat[i, j] == insert_cost:
g.add_edge(i, j)
elif isinstance(seg_j, Nphthong) and abs(len(seg) - len(seg_j)) == 1 and kept_dist_mat[i, j] == insert_cost:
g.add_edge(i, j)
query_sound = st.selectbox('Query sound', sorted(kept_i2pp))
st.write(get_connected_sounds(query_sound, g, kept_dist_mat, kept_i2pp, kept_pp2i))
cc = list(connected_components(g))
assert len(cc) == 1
# Compute average number of connected sounds.
cnt = dict()
for i in kept_ids:
cnt[i2pp[i]] = len(g.edges(i))
st.write(f'Average number of connected sounds: {(sum(cnt.values()) / len(kept_ids)):.3f}')
if should_proceed('about_to_save'):
proto_ph_map = dict()
for i in kept_ids:
ph = i2pp[i]
proto_ph_map[ph] = ph
lengths = [len(pp) for pp in kept_i2pp]
neg_counts = [-proto2cnt.loc[pp]['cnt'] for pp in kept_i2pp]
for i, pp in tqdm(enumerate(i2pp)):
if i not in kept_ids:
dists = dist_mat[i, kept_ids].copy()
stats = list(zip(dists, neg_counts, lengths))
j = stats.index(min(stats))
proto_ph_map[pp] = kept_i2pp[j]
g_edges = set(g.edges)
edges = set(g_edges)
for i, j in g_edges:
edges.add((j, i))
for i in range(len(i2pp)):
assert (i, i) not in edges
processor.load_repository(kept_i2pp)
cl_map = dict()
gb_map = dict()
for ph in kept_i2pp:
segment = processor.process(ph)
if isinstance(segment, Segment) and segment.is_short():
try:
after = processor.change_features(segment, ['+long'])
except NonUniqueMapping:
print(f"non-unique mapping for {ph}")
except NoMappingFound:
print(f"no mapping for {ph}")
after_id = kept_pp2i[str(after)]
before_id = kept_pp2i[ph]
if (before_id, after_id) in edges:
print(ph, after)
cl_map[ph] = str(after)
if isinstance(segment, Nphthong) and len(segment.vowels) == 2:
first = str(segment.vowels[0])
second = str(segment.vowels[1])
if first in ['i', 'u'] and second in kept_pp2i:
gb_map[ph] = second
out_path = 'data/nel_segs.pkl'
with open(out_path, 'wb') as fout:
pickle.dump({
'proto_ph_map': proto_ph_map,
'proto_ph_lst': kept_i2pp,
'dist_mat': kept_dist_mat,
'edges': [(kept_i2pp[i], kept_i2pp[j]) for i, j in edges],
'cl_map': cl_map,
'gb_map': gb_map},
fout)
st.write(f'Saved to {out_path}.')