-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathprocess_semistructured.py
241 lines (193 loc) · 9.17 KB
/
process_semistructured.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# -*- encoding: utf-8 -*-
from __future__ import absolute_import
import json
import logging
from collections import defaultdict
import click
from strephit.commons import io, wikidata, parallel, text
logger = logging.getLogger(__name__)
class SemistructuredSerializer:
def __init__(self, language, sourced_only):
self.language = language
self.sourced_only = sourced_only
def serialize_item(self, item):
""" Converts an item to quick statements.
:param item: Scraped item, either str (json) or dict
:returns: tuples <success, item> where item is an entity which
could not be resolved if success is false, otherwise it is a
<subject, property, object, source> tuple
:rtype: generator
"""
if isinstance(item, basestring):
item = json.loads(item)
name = item.pop('name', '')
other = item.pop('other', {})
url = item.pop('url', '')
if self.sourced_only and not url:
logger.debug('item %s has no url, skipping it')
return
if not name:
logger.debug('item %s has no name, skipping it')
return
data = {}
try:
data = json.loads(other)
except ValueError:
pass
except TypeError:
if isinstance(other, dict):
data = other
else:
return
name, honorifics = text.fix_name(name)
data.update(item)
data.pop('bio', None)
# the name will be the last one to be resolved because it is the hardest
# one to get right, so we will use all the other statements to help
statements = defaultdict(list)
for key, value in data.iteritems():
if not isinstance(value, list):
value = [value]
strings = []
for val in value:
if isinstance(val, basestring):
strings.append(val)
elif isinstance(val, dict):
strings.extend(val.keys())
strings.extend(val.values())
for val in strings:
if not val:
continue
elif not isinstance(val, basestring):
logger.debug('skipping value "%s" because it is not a string', val)
continue
property = wikidata.PROPERTY_TO_WIKIDATA.get(key)
if not property:
logger.debug('cannot resolve property %s, skipping', key)
continue
info = dict(data, **statements) # provide all available info to the resolver
resolved = wikidata.resolve(property, val, self.language, **info)
if not resolved:
logger.debug('cannot resolve value %s of property %s, skipping', val, property)
yield False, {'chunk': val, 'additional': {'property': property, 'url': url}}
continue
statements[property].append(resolved)
info = dict(data, **statements) # provide all available info to the resolver
info['type_'] = 5 # Q5 = human
wid = wikidata.resolver_with_hints('P1559', name, self.language, **info)
if not wid:
logger.debug('cannot find wikidata id of "%s" with properties %s, skipping',
name, repr(info))
yield False, {'chunk': name, 'additional': {'property': 'P1559', 'url': url}}
return
# now that we are sure about the subject we can produce the actual statements
yield True, (wid, 'P1559', '%s:"%s"' % (self.language, name.title()), url)
for property, values in statements.iteritems():
for val in values:
yield True, (wid, property, val, url)
for each in honorifics:
hon = wikidata.resolve('P1035', each, self.language)
if hon:
yield True, (wid, 'P1035', hon, url)
else:
yield False, {'chunk': each, 'additional': {'property': 'P1035', 'url': url}}
def process_corpus(self, items, output_file, dump_unresolved_file=None, genealogics=None, processes=0):
count = skipped = 0
genealogics_url_to_id = {}
for success, item in parallel.map(self.serialize_item, items, processes, flatten=True):
if success:
subj, prop, val, url = item
statement = wikidata.finalize_statement(
subj, prop, val, self.language, url,
resolve_property=False, resolve_value=False
)
if not statement:
continue
output_file.write(statement.encode('utf8'))
output_file.write('\n')
if genealogics and url.startswith('http://www.genealogics.org/'):
genealogics_url_to_id[url] = subj
count += 1
if count % 10000 == 0:
logger.info('Produced %d statements so far, skipped %d names', count, skipped)
else:
skipped += 1
if dump_unresolved_file:
dump_unresolved_file.write(json.dumps(item))
dump_unresolved_file.write('\n')
logger.info('Produced %d statements so far, skipped %d names', count, skipped)
return genealogics_url_to_id, count, skipped
def resolve_genealogics_family(self, input_file, url_to_id):
""" Performs a second pass on genealogics to resolve additional family members
"""
family_properties = {
'Family': 'P1038',
'Father': 'P22',
'Married': 'P26',
'Mother': 'P25',
u'Children\xa0': 'P40',
}
for row in input_file:
data = json.loads(row)
if 'url' not in data or data['url'] not in url_to_id:
continue
subj = url_to_id[data['url']]
for key, value in data.get('other', {}).iteritems():
if key in family_properties:
prop = family_properties[key]
if not isinstance(value, list):
logger.debug('unexpected value "%s", property "%s" subject %s',
value, key, subj)
continue
for member in value:
for name, url in member.iteritems():
if url in url_to_id:
val = url_to_id[url]
logger.debug('resolved "%s", %s of/with %s to %s',
name.strip(), key, subj, val)
statement = wikidata.finalize_statement(
subj, prop, val, self.language, data['url'],
resolve_property=False, resolve_value=False
)
yield True, statement
else:
logger.debug('skipping "%s" (%s), %s of/with %s',
name.strip(), url, key, subj)
yield False, name
@click.command()
@click.argument('corpus-dir', type=click.Path())
@click.option('--outfile', '-o', type=click.File('w'), default='output/semi_structured.qs')
@click.option('--genealogics', type=click.File('r'))
@click.option('--sourced-only/--allow-unsourced', default=True)
@click.option('--language', default='en', help='The names are searched in this language')
@click.option('--processes', '-p', default=0)
@click.option('--dump-unresolved', type=click.File('w'))
def process_semistructured(corpus_dir, outfile, language, processes,
sourced_only, genealogics, dump_unresolved):
""" Processes the corpus and extracts semi-structured data serialized into QuickStatements.
Needs a second pass on genealogics to correctly resolve family members.
"""
resolver = SemistructuredSerializer(language, sourced_only, )
genealogics_url_to_id, count, skipped = resolver.process_corpus(
io.load_scraped_items(corpus_dir), outfile, dump_unresolved, genealogics, processes
)
logger.info('Done, produced %d statements, skipped %d names', count, skipped)
if not genealogics:
logger.info("Dataset serialized to '%s'" % outfile.name)
return
logger.info('Starting second pass on genealogics ...')
genealogics_data = resolver.resolve_genealogics_family(genealogics, genealogics_url_to_id)
for success, item in genealogics_data:
if success:
outfile.write(item.encode('utf8'))
outfile.write('\n')
count += 1
if count % 10000 == 0:
logger.info('Produced %d statements so far, skipped %d names', count, skipped)
else:
skipped += 1
if dump_unresolved:
dump_unresolved.write(json.dumps(item))
dump_unresolved.write('\n')
logger.info('Done, produced %d statements, skipped %d names', count, skipped)
logger.info("Dataset serialized to '%s'" % outfile.name)