-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathjson_sqlalchemy.py
152 lines (121 loc) · 4.89 KB
/
json_sqlalchemy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# from https://gist.github.com/dbarnett/1730610
import json
import sqlalchemy
from sqlalchemy import String
from sqlalchemy.ext.mutable import Mutable
class JSONEncodedObj(sqlalchemy.types.TypeDecorator):
"""Represents an immutable structure as a json-encoded string."""
impl = String
def process_bind_param(self, value, dialect):
if value is not None:
value = json.dumps(value)
return value
def process_result_value(self, value, dialect):
if value is not None:
try:
value = json.loads(value)
except:
return value
return value
class MutationObj(Mutable):
@classmethod
def coerce(cls, key, value):
if isinstance(value, dict) and not isinstance(value, MutationDict):
return MutationDict.coerce(key, value)
if isinstance(value, list) and not isinstance(value, MutationList):
return MutationList.coerce(key, value)
return value
@classmethod
def _listen_on_attribute(cls, attribute, coerce, parent_cls):
key = attribute.key
if parent_cls is not attribute.class_:
return
# rely on "propagate" here
parent_cls = attribute.class_
def load(state, *args):
val = state.dict.get(key, None)
if coerce:
val = cls.coerce(key, val)
state.dict[key] = val
if isinstance(val, cls):
val._parents[state.obj()] = key
def set(target, value, oldvalue, initiator):
if not isinstance(value, cls):
value = cls.coerce(key, value)
if isinstance(value, cls):
value._parents[target.obj()] = key
if isinstance(oldvalue, cls):
oldvalue._parents.pop(target.obj(), None)
return value
def pickle(state, state_dict):
val = state.dict.get(key, None)
if isinstance(val, cls):
if 'ext.mutable.values' not in state_dict:
state_dict['ext.mutable.values'] = []
state_dict['ext.mutable.values'].append(val)
def unpickle(state, state_dict):
if 'ext.mutable.values' in state_dict:
for val in state_dict['ext.mutable.values']:
val._parents[state.obj()] = key
sqlalchemy.event.listen(parent_cls, 'load', load, raw=True, propagate=True)
sqlalchemy.event.listen(parent_cls, 'refresh', load, raw=True, propagate=True)
sqlalchemy.event.listen(attribute, 'set', set, raw=True, retval=True, propagate=True)
sqlalchemy.event.listen(parent_cls, 'pickle', pickle, raw=True, propagate=True)
sqlalchemy.event.listen(parent_cls, 'unpickle', unpickle, raw=True, propagate=True)
class MutationDict(MutationObj, dict):
@classmethod
def coerce(cls, key, value):
"""Convert plain dictionary to MutationDict"""
self = MutationDict((k,MutationObj.coerce(key,v)) for (k,v) in value.items())
self._key = key
return self
def __setitem__(self, key, value):
dict.__setitem__(self, key, MutationObj.coerce(self._key, value))
self.changed()
def __delitem__(self, key):
dict.__delitem__(self, key)
self.changed()
class MutationList(MutationObj, list):
@classmethod
def coerce(cls, key, value):
"""Convert plain list to MutationList"""
self = MutationList((MutationObj.coerce(key, v) for v in value))
self._key = key
return self
def __setitem__(self, idx, value):
list.__setitem__(self, idx, MutationObj.coerce(self._key, value))
self.changed()
def __setslice__(self, start, stop, values):
list.__setslice__(self, start, stop, (MutationObj.coerce(self._key, v) for v in values))
self.changed()
def __delitem__(self, idx):
list.__delitem__(self, idx)
self.changed()
def __delslice__(self, start, stop):
list.__delslice__(self, start, stop)
self.changed()
def append(self, value):
list.append(self, MutationObj.coerce(self._key, value))
self.changed()
def insert(self, idx, value):
list.insert(self, idx, MutationObj.coerce(self._key, value))
self.changed()
def extend(self, values):
list.extend(self, (MutationObj.coerce(self._key, v) for v in values))
self.changed()
def pop(self, *args, **kw):
value = list.pop(self, *args, **kw)
self.changed()
return value
def remove(self, value):
list.remove(self, value)
self.changed()
def JSONAlchemy(sqltype):
"""A type to encode/decode JSON on the fly
sqltype is the string type for the underlying DB column.
You can use it like:
Column(JSONAlchemy(Text(600)))
"""
class _JSONEncodedObj(JSONEncodedObj):
impl = sqltype
return MutationObj.as_mutable(_JSONEncodedObj)