-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathansible_merge_vars.py
153 lines (127 loc) · 5.63 KB
/
ansible_merge_vars.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python
"""
An Ansible action plugin to allow explicit merging of dict and list facts.
"""
from ansible.plugins.action import ActionBase
from ansible.errors import AnsibleError
from ansible.utils.vars import isidentifier
# Funky import dance for Ansible backwards compatitility (not sure if we
# actually need to do this or not)
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display # pylint: disable=ungrouped-imports
display = Display()
class ActionModule(ActionBase):
"""
Merge all variables in context with a certain suffix (lists or dicts only)
and create a new variable that contains the result of this merge. These
initial suffixed variables can be definied anywhere in the inventory, or by
any other means; as long as they're in the context for the running play,
they'll be merged.
"""
def run(self, tmp=None, task_vars=None):
suffix_to_merge = self._task.args.get('suffix_to_merge', '')
merged_var_name = self._task.args.get('merged_var_name', '')
dedup = self._task.args.get('dedup', True)
expected_type = self._task.args.get('expected_type')
recursive_dict_merge = bool(self._task.args.get('recursive_dict_merge', False))
if 'cacheable' in self._task.args.keys():
display.deprecated(
"The `cacheable` option does not actually do anything, since Ansible 2.5. "
"No matter what, the variable set by this plugin will be set in the fact "
"cache if you have fact caching enabled. To get rid of this warning, "
"remove the `cacheable` argument from your merge_vars task. This warning "
"will be removed in a future version of this plugin."
)
# Validate args
if expected_type not in ['dict', 'list']:
raise AnsibleError("expected_type must be set ('dict' or 'list').")
if not merged_var_name:
raise AnsibleError("merged_var_name must be set")
if not isidentifier(merged_var_name):
raise AnsibleError("merged_var_name '%s' is not a valid identifier" % merged_var_name)
if not suffix_to_merge.endswith('__to_merge'):
raise AnsibleError("Merge suffix must end with '__to_merge', sorry!")
keys = sorted([key for key in task_vars.keys()
if key.endswith(suffix_to_merge)])
display.v("Merging vars in this order: {}".format(keys))
# We need to render any jinja in the merged var now, because once it
# leaves this plugin, ansible will cleanse it by turning any jinja tags
# into comments.
# And we need it done before merging the variables,
# in case any structured data is specified with templates.
merge_vals = [self._templar.template(task_vars[key]) for key in keys]
# Dispatch based on type that we're merging
if merge_vals == []:
if expected_type == 'list':
merged = []
else:
merged = {}
elif isinstance(merge_vals[0], list):
merged = merge_list(merge_vals, dedup)
elif isinstance(merge_vals[0], dict):
merged = merge_dict(merge_vals, dedup, recursive_dict_merge)
else:
raise AnsibleError(
"Don't know how to merge variables of type: {}".format(type(merge_vals[0]))
)
return {
'ansible_facts': {merged_var_name: merged},
'changed': False,
}
def merge_dict(merge_vals, dedup, recursive_dict_merge):
"""
To merge dicts, just update one with the values of the next, etc.
"""
check_type(merge_vals, dict)
merged = {}
for val in merge_vals:
if not recursive_dict_merge:
merged.update(val)
else:
# Recursive merging of dictionaries with overlapping keys:
# LISTS: merge with merge_list
# DICTS: recursively merge with merge_dict
# any other types: replace (same as usual behaviour)
for key in val.keys():
if not key in merged:
# first hit of the value - just assign
merged[key] = val[key]
elif isinstance(merged[key], list):
merged[key] = merge_list([merged[key], val[key]], dedup)
elif isinstance(merged[key], dict):
merged[key] = merge_dict([merged[key], val[key]], dedup, recursive_dict_merge)
else:
merged[key] = val[key]
return merged
def merge_list(merge_vals, dedup):
""" To merge lists, just concat them. Dedup if wanted. """
check_type(merge_vals, list)
merged = flatten(merge_vals)
if dedup:
merged = deduplicate(merged)
return merged
def check_type(mylist, _type):
""" Ensure that all members of mylist are of type _type. """
if not all(isinstance(item, _type) for item in mylist):
raise AnsibleError("All values to merge must be of the same type, either dict or list")
def flatten(list_of_lists):
"""
Flattens a list of lists:
>>> flatten([[1, 2] [3, 4]])
[1, 2, 3, 4]
I wish Python had this in the standard lib :(
"""
return list((x for y in list_of_lists for x in y))
def deduplicate(mylist):
"""
Just brute force it. This lets us keep order, and lets us dedup unhashable
things, like dicts. Hopefully you won't run into such big lists that
this will ever be a performance issue.
"""
deduped = []
for item in mylist:
if item not in deduped:
deduped.append(item)
return deduped