-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathnetbox-powerdns-sync.py
304 lines (250 loc) · 10.2 KB
/
netbox-powerdns-sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
#!/usr/bin/env python3
import argparse
import ipaddress
import logging
import re
import sys
from collections import Counter
import powerdns
import pynetbox
from systemd.journal import JournalHandler
from config import DRY_RUN, FORWARD_ZONES, REVERSE_ZONES
from config import NB_TOKEN, NB_URL, PDNS_API_URL, PDNS_KEY
from config import PTR_ONLY_CF
from config import SOURCE_DEVICE, SOURCE_IP, SOURCE_VM
def make_canonical(zone):
# return a zone in canonical form
return f'{zone}.'
def get_host_ips_ip(nb, zone):
# return list of tuples for ip addresses
host_ips = []
# get IPs with DNS name ending in forward_zone from NetBox
if PTR_ONLY_CF:
nb_ips = nb.ipam.ip_addresses.filter(
dns_name__iew=zone,
status=['active', 'dhcp', 'slaac'],
cf_ptr_only=False
)
else:
nb_ips = nb.ipam.ip_addresses.filter(
dns_name__iew=zone,
status=['active', 'dhcp', 'slaac']
)
# assemble list with tupels containing the canonical name, the record
# type and the IP address without the subnet from NetBox IPs
for nb_ip in nb_ips:
nb_zone = nb_ip.dns_name.split('.')
if zone != '.'.join(nb_zone[1:]):
continue
if nb_ip.family.value == 6:
type = 'AAAA'
else:
type = 'A'
host_ips.append((
make_canonical(nb_ip.dns_name),
type,
re.sub('/[0-9]*', '', str(nb_ip)),
make_canonical(zone)
))
return host_ips
def get_host_ips_ip_reverse(nb, prefix, zone):
# return list of reverse zone tupels for ip addresses
host_ips = []
# get IPs within the prefix from NetBox
nb_ips = nb.ipam.ip_addresses.filter(
parent=prefix,
status=['active', 'dhcp', 'slaac']
)
# assemble list with tupels containing the canonical name, the record type
# and the IP address without the subnet from NetBox IPs
for nb_ip in nb_ips:
if nb_ip.dns_name != '':
ip = re.sub('/[0-9]*', '', str(nb_ip))
reverse_pointer = ipaddress.ip_address(ip).reverse_pointer
host_ips.append((
make_canonical(reverse_pointer),
'PTR',
make_canonical(nb_ip.dns_name),
make_canonical(zone)
))
return host_ips
def get_host_ips_device(nb, zone):
# return list of tupels for devices
# get devices with name ending in forward_zone from NetBox
nb_devices = nb.dcim.devices.filter(
name__iew=zone,
status=['active', 'failed', 'offline', 'staged']
)
return get_host_ips_host(nb_devices, zone)
def get_host_ips_vm(nb, zone):
# return list of tupels for VMs
# get VMs with name ending in forward_zone from NetBox
nb_vms = nb.virtualization.virtual_machines.filter(
name__iew=zone,
status=['active',
'failed',
'offline',
'staged'])
return get_host_ips_host(nb_vms, zone)
def get_host_ips_host(nb_hosts, zone):
# return list of tupels for hosts (NetBox devices/VMs)
host_ips = []
# assemble list with tupels containing the canonical name, the record
# type and the IP addresses without the subnet of the device/vm
for nb_host in nb_hosts:
if nb_host.primary_ip4:
host_ips.append((
make_canonical(nb_host.name),
'A',
re.sub('/[0-9]*', '', str(nb_host.primary_ip4)),
make_canonical(zone)
))
if nb_host.primary_ip6:
host_ips.append((
make_canonical(nb_host.name),
'AAAA',
re.sub('/[0-9]*', '', str(nb_host.primary_ip6)),
make_canonical(zone)
))
return host_ips
def main():
parser = argparse.ArgumentParser(
description='Sync DNS name entries from NetBox to PowerDNS',
epilog='''This script uses the REST API of NetBox to retriev
IP addresses and their DNS name. It then syncs the DNS names
to PowerDNS to create A, AAAA and PTR records.
It does this for forward and reverse zones specified in the config
file.
''')
parser.add_argument('--dry_run', '-d', action='store_true',
help='Perform a dry run (make no changes to PowerDNS)')
parser.add_argument('--loglevel', '-l', type=str, default='INFO',
choices=['WARNING', 'INFO', ''],
help='Log level for the console logger')
parser.add_argument('--loglevel_journal', '-j', type=str, default='',
choices=['WARNING', 'INFO', ''],
help='Log level for the systemd journal logger')
args = parser.parse_args()
# merge dry_run directives from config and arguments
dry_run = False
if args.dry_run or DRY_RUN:
dry_run = True
logger = logging.getLogger(__name__)
# set overall log level to debug to catch all
logger.setLevel(logging.DEBUG)
# loglevel for console logging
if args.loglevel != '':
handler = logging.StreamHandler()
handler.setLevel(getattr(logging, args.loglevel))
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# loglevel for journal logging
if args.loglevel_journal != '':
journal_handler = JournalHandler()
journal_handler.setLevel(getattr(logging, args.loglevel_journal))
logger.addHandler(journal_handler)
nb = pynetbox.api(NB_URL, token=NB_TOKEN)
pdns_api_client = powerdns.PDNSApiClient(api_endpoint=PDNS_API_URL,
api_key=PDNS_KEY)
pdns = powerdns.PDNSEndpoint(pdns_api_client).servers[0]
host_ips = []
record_ips = []
for forward_zone in FORWARD_ZONES:
# Source IP: Create domains based on DNS name attached to IPs
if SOURCE_IP:
host_ips += get_host_ips_ip(nb, forward_zone)
# Source device: Create domains based on the name of devices
if SOURCE_DEVICE:
host_ips += get_host_ips_device(nb, forward_zone)
# Source VM: Create domains based on the name of VMs
if SOURCE_VM:
host_ips += get_host_ips_vm(nb, forward_zone)
# get zone forward_zone_canonical form PowerDNS
zone = pdns.get_zone(make_canonical(forward_zone))
if zone is None:
logger.critical(f'Zone {forward_zone} not found in PowerDNS. Skipping it.')
continue
# assemble list with tupels containing the canonical name, the record
# type, the IP address and forward_zone_canonical without the subnet
# from PowerDNS zone records with the
# comment 'NetBox'
for record in zone.records:
for comment in record['comments']:
if comment['content'] == 'NetBox':
for ip in record['records']:
record_ips.append((
record['name'],
record['type'],
ip['content'],
make_canonical(forward_zone)
))
for reverse_zone in REVERSE_ZONES:
host_ips += get_host_ips_ip_reverse(nb, reverse_zone['prefix'],
reverse_zone['zone'])
# get reverse zone records form PowerDNS
zone = pdns.get_zone(make_canonical(reverse_zone['zone']))
if zone is None:
logger.critical(f'Zone {reverse_zone["zone"]} not found in PowerDNS. Skipping it.')
continue
# assemble list with tupels containing the canonical name, the record
# type, the IP address and forward_zone_canonical without the subnet
# from PowerDNS zone records with the
# comment 'NetBox'
for record in zone.records:
for comment in record['comments']:
if comment['content'] == 'NetBox':
for ip in record['records']:
record_ips.append((
record['name'],
record['type'],
ip['content'],
make_canonical(reverse_zone['zone'])
))
# find duplicates in host_ips
duplicate_records = [(host_ip[0], host_ip[1]) for host_ip in host_ips]
duplicate_records = [duplicate for duplicate, amount in
Counter(duplicate_records).items() if amount > 1]
for duplicate_record in duplicate_records:
logger.critical(f'''Detected duplicate record from NetBox \
{duplicate_record[0]} of type {duplicate_record[1]}.
Not continuing execution. Please resolve the duplicate.''')
if len(duplicate_records) > 0:
sys.exit()
# create set with tupels that have to be created
# tupels from NetBox without tupels that already exists in PowerDNS
to_create = set(host_ips) - set(record_ips)
# create set with tupels that have to be deleted
# tupels from PowerDNS without tupels that are documented in NetBox
to_delete = set(record_ips) - set(host_ips)
logger.info(f'{len(to_create)} records to create')
for record in to_create:
logger.info(f'Will create record {record[0]}')
logger.info(f'{len(to_delete)} records to delete')
for record in to_delete:
logger.info(f'Will delete record {record[0]}')
if dry_run:
logger.info('Skipping Create/Delete due to Dry Run')
sys.exit()
for record in to_create:
logger.info(f'Now creating {record}')
zone = pdns.get_zone(record[3])
zone.create_records([
powerdns.RRSet(
record[0],
record[1],
[(record[2], False)],
comments=[powerdns.Comment('NetBox')])
])
for record in to_delete:
logger.info(f'Now deleting {record}')
zone = pdns.get_zone(record[3])
zone.delete_records([
powerdns.RRSet(
record[0],
record[1],
[(record[2], False)],
comments=[powerdns.Comment('NetBox')])
])
if __name__ == '__main__':
main()