forked from octodns/octodns
-
Notifications
You must be signed in to change notification settings - Fork 0
/
envvar.py
100 lines (77 loc) · 3.17 KB
/
envvar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import logging
import os
from ..record import Record
from .base import BaseSource
class EnvVarSourceException(Exception):
pass
class EnvironmentVariableNotFoundException(EnvVarSourceException):
def __init__(self, data):
super(EnvironmentVariableNotFoundException, self).__init__(
'Unknown environment variable {}'.format(data))
class EnvVarSource(BaseSource):
'''
This source allows for environment variables to be embedded at octodns
execution time into zones. Intended to capture artifacts of deployment to
facilitate operational objectives.
The TXT record generated will only have a single value.
The record name cannot conflict with any other co-existing sources. If
this occurs, an exception will be thrown.
Possible use cases include:
- Embedding a version number into a TXT record to monitor update
propagation across authoritative providers.
- Capturing identifying information about the deployment process to
record where and when the zone was updated.
version:
class: octodns.source.envvar.EnvVarSource
# The environment variable in question, in this example the username
# currently executing octodns
variable: USER
# The TXT record name to embed the value found at the above
# environment variable
name: deployuser
# The TTL of the TXT record (optional, default 60)
ttl: 3600
This source is then combined with other sources in the octodns config
file:
zones:
netflix.com.:
sources:
- yaml
- version
targets:
- ultra
- ns1
'''
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False
SUPPORTS = set(('TXT'))
DEFAULT_TTL = 60
def __init__(self, id, variable, name, ttl=DEFAULT_TTL):
self.log = logging.getLogger('{}[{}]'.format(
self.__class__.__name__, id))
self.log.debug('__init__: id=%s, variable=%s, name=%s, '
'ttl=%d', id, variable, name, ttl)
super(EnvVarSource, self).__init__(id)
self.envvar = variable
self.name = name
self.ttl = ttl
def _read_variable(self):
value = os.environ.get(self.envvar)
if value is None:
raise EnvironmentVariableNotFoundException(self.envvar)
self.log.debug('_read_variable: successfully loaded var=%s val=%s',
self.envvar, value)
return value
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
before = len(zone.records)
value = self._read_variable()
# We don't need to worry about conflicting records here because the
# manager will deconflict sources on our behalf.
payload = {'ttl': self.ttl, 'type': 'TXT', 'values': [value]}
record = Record.new(zone, self.name, payload, source=self,
lenient=lenient)
zone.add_record(record, lenient=lenient)
self.log.info('populate: found %s records, exists=False',
len(zone.records) - before)