-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathutil.py
86 lines (72 loc) · 2.23 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""Utility functions for search"""
from contextlib import contextmanager
import json
from tempfile import NamedTemporaryFile
def traverse_mapping(mapping, parent_key):
"""
Traverse a mapping, yielding each nested dict
Args:
mapping (dict): The mapping itself, or a nested dict
parent_key (str): The key for the mapping
Returns:
generator: A generator of key, dict within the mapping
"""
yield parent_key, mapping
for key, value in mapping.items():
if isinstance(value, dict):
yield from traverse_mapping(value, key)
class _JsonStream:
"""
Handles storing large amounts of newline separated JSON data
"""
def __init__(self, file):
self.file = file
def write_stream(self, gen):
"""
Write objects to the JSON file
"""
self.file.seek(0)
for obj in gen:
self.file.write(json.dumps(obj))
self.file.write("\n")
def read_stream(self):
"""
Reads stream of json objects from a file
"""
self.file.seek(0)
for line in self.file.readlines():
yield json.loads(line)
@contextmanager
def open_json_stream():
"""
Open a temporary file for reading and writing json objects
"""
with NamedTemporaryFile("w+") as file:
yield _JsonStream(file)
def fix_nested_filter(query, parent_key):
"""
Fix the invalid 'filter' in the Opensearch queries
Args:
query (dict): An Opensearch query
parent_key (any): The parent key
Returns:
dict: An updated Opensearch query with filter replaced with query
"""
if isinstance(query, dict):
if 'filter' in query and parent_key == 'nested':
copy = dict(query)
if 'query' in copy:
raise Exception("Unexpected 'query' found")
copy['query'] = copy['filter']
del copy['filter']
return copy
else:
return {
key: fix_nested_filter(value, key) for key, value in query.items()
}
elif isinstance(query, list):
return [
fix_nested_filter(piece, key) for key, piece in enumerate(query)
]
else:
return query