-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcursor_ES.py
235 lines (192 loc) · 8.24 KB
/
cursor_ES.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
try:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk, BulkIndexError
from elastic_transport import ObjectApiResponse
from requests import head
from unicodedata import normalize
from math import ceil
from credentials_default import es_cert, es_user, es_password
from helper_es_index_settings import ANALYZERS
except ImportError as error:
print(error)
class ES:
"""
Provides ElasticSearch interactivity
Methods
-------
- check_connection() -> ObjectApiResponse/bool : begins a sequence to verify (and redownload) drs_metadata
- indices() -> list/bool : retrieves all indices (excluding hidden ones) from ElasticSearch
- del_index(index=str) -> ObjectApiResponse : deletes an index (including contents, shards and metadata)
- add_index(index=str) -> ObjectApiResponse : add an index (with specific settings)
- add_doc(index=str, doc=dict) -> ObjectApiResponse : add document to an index
- save(data=dict) -> Generator : bulk insert records into ElasticSearch
- build_library -> Generator : compile records for the 'library' on the site and bulk insert into ElasticSearch
Private Methods
- __update(docs=dict, fn=function) : adds data to ElasticSearch
"""
def __init__(self):
"""
Establishes a connection to ElasticSearch
Parameters
----------
- None
"""
self.es = Elasticsearch(f'https://localhost:9200', ca_certs=es_cert, basic_auth=(es_user, es_password))
def check_connection(self):
"""
Checks access to ElasticSearch by requesting server info
Parameters
----------
- None
Returns
-------
- ObjectApiResponse : response from ElasticSearch
#### OR
- Boolean : False if failed
"""
try:
return self.es.info()
except:
return False
def indices(self):
"""
Retrieves all indices (excluding hidden ones) from ElasticSearch
Returns
-------
- list : a list of index names or False upon failure
#### OR
- Bool : False if failed
"""
try:
return self.es.indices.get_alias(expand_wildcards=['open']).body
except:
return False
def del_index(self, index:str) -> ObjectApiResponse:
"""
Deletes an index (including contents, shards and metadata)
Parameters
----------
- index (str) : name of index to delete
Returns
-------
- ObjectApiResponse : response from ElasticSearch
"""
return self.es.options(ignore_status=[400, 404]).indices.delete(index=index)
def add_index(self, index:str) -> ObjectApiResponse:
"""
Adds a new index (including settings defined in helper_es_index_settings.py)
Parameters
----------
- index (str) : name of index to add
Returns
-------
- ObjectApiResponse : response from ElasticSearch
"""
return self.es.indices.create(index=index, settings=ANALYZERS[index])
def add_doc(self, index:str, doc:dict) -> ObjectApiResponse:
"""
Add document to an index
Parameters
----------
- index (str) : name of index to add to
- doc (dict) : dictionary with document data
Returns
-------
- ObjectApiResponse : response from ElasticSearch
"""
return self.es.index(index=index, document=doc)
def save(self, data:dict) -> dict:
"""
Generator function that batch processes data for writing to ElasticSearch in batches of 250.
NOTE: Connection time-out may occur if chunk_size is set too high and/or request_timeout too low.
If the process stalls this will inadvertently interrupt the insertion process. In that case,
the safest option is to simply restart the program. Also note that documents are indexed upon
insertion, which may take long depending on complexity of document. The refresh parameter on
the streaming_bulk API prevents indices not to be ready for querying and needs to be set to
'wait_for' to make sure the build_library method is able to request all documents in the
publisheddocuments index.
Parameters
----------
- data (dict) : the dictionary with compiled resources to push to ElasticSearch
Yields
------
- result (dict) : result of streaming insert operation
"""
try:
indices = list(set([v['ES_index'] for v in data.values()]))
for index in indices:
self.del_index(index)
self.add_index(index) # THIS CHANGES SETTINGS OF THE NEW INDEX AND NEEDS TO BE TESTED
def data_generator(data):
for v in data.values():
yield {
"_index" : v['ES_index'],
"_id" : f'{v["ES_index"]}-{v["RecID"]}',
"doc" : v
}
try:
for ok, result in streaming_bulk(self.es, data_generator(data), chunk_size=5000, request_timeout=60, refresh='wait_for'):
if ok is not True:
print(str(result))
else:
yield result
except BulkIndexError as e:
raise e
except Exception as e:
raise e
def build_library(self) -> dict:
"""
Builds a 'library' of published documents with PDFs only, grouped by author.
Uses the class save-function to bulk-insert the library into ElasticSearch.
Yields
------
dict : result of streaming insert operation
"""
published_query = {
"query": {
"bool": {
"must": {
"exists": {
"field": "doc.PDF"
}
},
"must_not": {
"term": {
"doc.PDF": ""
}
}
}
},
"size" : 1000
}
documents = self.es.search(index='publisheddocuments', body=published_query)['hits']['hits']
library = {}
def fileSize(file):
fileSize = head(file).headers["content-length"]
if fileSize.isdigit():
fileSize = int(fileSize)
fileSize = f'{ceil(fileSize / 1000)} KB' if fileSize <= 999999 else f'{round(fileSize / 1000000, 2)} MB'
return fileSize
def organizeData():
for result in documents:
result = result['_source']['doc']
if len(result['Authors']):
for author in result['Authors']:
if author['ConstituentID'] not in library:
library[author['ConstituentID']] = []
library['ES_index'] = 'library'
title = str(normalize('NFD', result['Title']).encode('ascii', 'ignore').decode("utf-8"))
sortText = result['Notes'] if result['Notes'] is not None else title
sortText = str(normalize('NFD', sortText).encode('ascii', 'ignore').decode("utf-8"))
library[author['ConstituentID']].append({
'Author' : author['DisplayName'],
'Title' : title,
'AlphaSort' : str(normalize('NFD', author['AlphaSort']).encode('ascii', 'ignore').decode("utf-8")),
'DisplayText' : str(normalize('NFD', result['BoilerText']).encode('ascii', 'ignore').decode("utf-8")),
'SortText' : sortText,
'Format' : result['Format'],
'FileSize' : fileSize(result['PDF']),
'URL' : result['PDF']
})
organizeData()
yield self.save(library)