Skip to content

Commit

Permalink
replace our memcached-based fact cache implementation with local files
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanpetrello committed Jan 15, 2018
1 parent e0c04df commit 983b192
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 330 deletions.
134 changes: 55 additions & 79 deletions awx/main/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@
# All Rights Reserved.

# Python
import codecs
import datetime
import hashlib
import hmac
import logging
import os
import time
import json
import base64
from urlparse import urljoin

import six

# Django
from django.conf import settings
from django.db import models
#from django.core.cache import cache
import memcache
from django.db.models import Q, Count
from django.utils.dateparse import parse_datetime
from dateutil import parser
from dateutil.tz import tzutc
from django.utils.encoding import force_text, smart_str
from django.utils.timezone import utc
from django.utils.timezone import utc, now
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError

Expand Down Expand Up @@ -714,85 +714,61 @@ def get_notification_templates(self):
def get_notification_friendly_name(self):
return "Job"

@property
def memcached_fact_key(self):
return '{}'.format(self.inventory.id)

def memcached_fact_host_key(self, host_name):
return '{}-{}'.format(self.inventory.id, base64.b64encode(host_name.encode('utf-8')))

def memcached_fact_modified_key(self, host_name):
return '{}-{}-modified'.format(self.inventory.id, base64.b64encode(host_name.encode('utf-8')))

def _get_inventory_hosts(self, only=['name', 'ansible_facts', 'modified',]):
return self.inventory.hosts.only(*only)

def _get_memcache_connection(self):
return memcache.Client([settings.CACHES['default']['LOCATION']], debug=0)

def start_job_fact_cache(self):
def _get_inventory_hosts(self, only=['name', 'ansible_facts', 'ansible_facts_modified', 'modified',]):
if not self.inventory:
return

cache = self._get_memcache_connection()

host_names = []

for host in self._get_inventory_hosts():
host_key = self.memcached_fact_host_key(host.name)
modified_key = self.memcached_fact_modified_key(host.name)

if cache.get(modified_key) is None:
if host.ansible_facts_modified:
host_modified = host.ansible_facts_modified.replace(tzinfo=tzutc()).isoformat()
else:
host_modified = datetime.datetime.now(tzutc()).isoformat()
cache.set(host_key, json.dumps(host.ansible_facts))
cache.set(modified_key, host_modified)

host_names.append(host.name)

cache.set(self.memcached_fact_key, host_names)

def finish_job_fact_cache(self):
if not self.inventory:
return

cache = self._get_memcache_connection()
return []
return self.inventory.hosts.only(*only)

def start_job_fact_cache(self, destination, modification_times, timeout=None):
destination = os.path.join(destination, 'facts')
os.makedirs(destination, mode=0700)
hosts = self._get_inventory_hosts()
if timeout is None:
timeout = settings.ANSIBLE_FACT_CACHE_TIMEOUT
if timeout > 0:
# exclude hosts with fact data older than `settings.ANSIBLE_FACT_CACHE_TIMEOUT seconds`
timeout = now() - datetime.timedelta(seconds=timeout)
hosts = hosts.filter(ansible_facts_modified__gte=timeout)
for host in hosts:
host_key = self.memcached_fact_host_key(host.name)
modified_key = self.memcached_fact_modified_key(host.name)

modified = cache.get(modified_key)
if modified is None:
cache.delete(host_key)
continue

# Save facts if cache is newer than DB
modified = parser.parse(modified, tzinfos=[tzutc()])
if not host.ansible_facts_modified or modified > host.ansible_facts_modified:
ansible_facts = cache.get(host_key)
try:
ansible_facts = json.loads(ansible_facts)
except Exception:
ansible_facts = None

if ansible_facts is None:
cache.delete(host_key)
continue
host.ansible_facts = ansible_facts
host.ansible_facts_modified = modified
if 'insights' in ansible_facts and 'system_id' in ansible_facts['insights']:
host.insights_system_id = ansible_facts['insights']['system_id']
host.save()
filepath = os.sep.join(map(six.text_type, [destination, host.name]))
with codecs.open(filepath, 'w', encoding='utf-8') as f:
os.chmod(f.name, 0600)
json.dump(host.ansible_facts, f)
# make note of the time we wrote the file so we can check if it changed later
modification_times[filepath] = os.path.getmtime(filepath)

def finish_job_fact_cache(self, destination, modification_times):
destination = os.path.join(destination, 'facts')
for host in self._get_inventory_hosts():
filepath = os.sep.join(map(six.text_type, [destination, host.name]))
if os.path.exists(filepath):
# If the file changed since we wrote it pre-playbook run...
modified = os.path.getmtime(filepath)
if modified > modification_times.get(filepath, 0):
with codecs.open(filepath, 'r', encoding='utf-8') as f:
try:
ansible_facts = json.load(f)
except ValueError:
continue
host.ansible_facts = ansible_facts
host.ansible_facts_modified = now()
if 'insights' in ansible_facts and 'system_id' in ansible_facts['insights']:
host.insights_system_id = ansible_facts['insights']['system_id']
host.save()
system_tracking_logger.info(
'New fact for inventory {} host {}'.format(
smart_str(host.inventory.name), smart_str(host.name)),
extra=dict(inventory_id=host.inventory.id, host_name=host.name,
ansible_facts=host.ansible_facts,
ansible_facts_modified=host.ansible_facts_modified.isoformat()))
else:
# if the file goes missing, ansible removed it (likely via clear_facts)
host.ansible_facts = {}
host.ansible_facts_modified = now()
system_tracking_logger.info(
'New fact for inventory {} host {}'.format(
smart_str(host.inventory.name), smart_str(host.name)),
extra=dict(inventory_id=host.inventory.id, host_name=host.name,
ansible_facts=host.ansible_facts,
ansible_facts_modified=host.ansible_facts_modified.isoformat()))
'Facts cleared for inventory {} host {}'.format(
smart_str(host.inventory.name), smart_str(host.name)))
host.save()


class JobHostSummary(CreatedModifiedModel):
Expand Down
23 changes: 15 additions & 8 deletions awx/main/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,15 @@ def run(self, pk, isolated_host=None, **kwargs):
# Fetch ansible version once here to support version-dependent features.
kwargs['ansible_version'] = get_ansible_version()
kwargs['private_data_dir'] = self.build_private_data_dir(instance, **kwargs)

# Fetch "cached" fact data from prior runs and put on the disk
# where ansible expects to find it
if getattr(instance, 'use_fact_cache', False) and not kwargs.get('isolated'):
instance.start_job_fact_cache(
os.path.join(kwargs['private_data_dir']),
kwargs.setdefault('fact_modification_times', {})
)

# May have to serialize the value
kwargs['private_data_files'] = self.build_private_data_files(instance, **kwargs)
kwargs['passwords'] = self.build_passwords(instance, **kwargs)
Expand Down Expand Up @@ -1032,10 +1041,8 @@ def build_env(self, job, **kwargs):
env['INVENTORY_ID'] = str(job.inventory.pk)
if job.use_fact_cache and not kwargs.get('isolated'):
env['ANSIBLE_LIBRARY'] = self.get_path_to('..', 'plugins', 'library')
env['ANSIBLE_CACHE_PLUGINS'] = self.get_path_to('..', 'plugins', 'fact_caching')
env['ANSIBLE_CACHE_PLUGIN'] = "awx"
env['ANSIBLE_CACHE_PLUGIN_TIMEOUT'] = str(settings.ANSIBLE_FACT_CACHE_TIMEOUT)
env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else ''
env['ANSIBLE_CACHE_PLUGIN'] = "jsonfile"
env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] = os.path.join(kwargs['private_data_dir'], 'facts')
if job.project:
env['PROJECT_REVISION'] = job.project.scm_revision
env['ANSIBLE_RETRY_FILES_ENABLED'] = "False"
Expand Down Expand Up @@ -1286,14 +1293,14 @@ def pre_run_hook(self, job, **kwargs):
('project_update', local_project_sync.name, local_project_sync.id)))
raise

if job.use_fact_cache and not kwargs.get('isolated'):
job.start_job_fact_cache()


def final_run_hook(self, job, status, **kwargs):
super(RunJob, self).final_run_hook(job, status, **kwargs)
if job.use_fact_cache and not kwargs.get('isolated'):
job.finish_job_fact_cache()
job.finish_job_fact_cache(
kwargs['private_data_dir'],
kwargs['fact_modification_times']
)
try:
inventory = job.inventory
except Inventory.DoesNotExist:
Expand Down
Loading

0 comments on commit 983b192

Please sign in to comment.