Skip to content

Commit

Permalink
Fix cache invalidation
Browse files Browse the repository at this point in the history
Old caches for the same step was not being deleted as a result the experiment caches occupied a large amount of the disk space
  • Loading branch information
thalessr committed Oct 2, 2020
1 parent a97ceaa commit 28ad684
Show file tree
Hide file tree
Showing 11 changed files with 166,907 additions and 23 deletions.
166,822 changes: 166,822 additions & 0 deletions examples/datasets/black_friday.csv

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions examples/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env python3
##
## Authors: Adriano Marques
## Nathan Martins
## Thales Ribeiro
##
## Copyright (C) 2019 Exponential Ventures LLC
##
## This library is free software; you can redistribute it and/or
## modify it under the terms of the GNU Library General Public
## License as published by the Free Software Foundation; either
## version 2 of the License, or (at your option) any later version.
##
## This library is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## Library General Public License for more details.
##
## You should have received a copy of the GNU Library General Public
## License along with this library; if not, write to the Free Software
## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
##


import logging
from os.path import split, join

import pandas as pd

from stripping import setup_stripping

st, c = setup_stripping(join(split(__file__)[0], '.stripping'))
logging.basicConfig(level=logging.DEBUG)


@st.step()
def load_dataset():
c.bf_file = join(split(__file__)[0], "datasets", "black_friday.csv")
logging.info(f"Processing file '{c.bf_file}' without using the Catalysis acceleration framework.")
c.bf = pd.read_csv(c.bf_file)


@st.step()
def split_dataset():
c.X = c.bf.iloc[:, 0:6].values
c.y = c.bf.iloc[:, 9].values

@st.step()
def hello_stripping():
print('Hellow')

st.execute()
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
asynctest==0.13.0
freezegun==0.3.15
numpy==1.18.1
pandas==1.0.1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

setup(
name='stripping',
version='0.2.0',
version='0.2.1',
description='An easy to use pipeline solution for AI/ML experiments',
author='Adriano Marques, Nathan Martins, Thales Ribeiro',
author_email='[email protected], [email protected], [email protected]',
Expand Down
2 changes: 1 addition & 1 deletion stripping/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def execute_or_retrieve(self, step_fn, *args, **kwargs):
else:
step_return = step_fn(*args, **kwargs)

self.storage.save_step(step_fn.code, step_return, self.context, *args, **kwargs)
self.storage.save_step(step_fn.code, step_fn.name, step_return, self.context, *args, **kwargs)

return step_return

Expand Down
1 change: 0 additions & 1 deletion stripping/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from tempfile import TemporaryFile

import numpy as np
import pandas as pd

from .cache import StepCache
from .singleton import SingletonDecorator
Expand Down
15 changes: 11 additions & 4 deletions stripping/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import pickle
import sys
from pathlib import Path
from shutil import rmtree
from tempfile import TemporaryFile
from typing import Iterable

Expand Down Expand Up @@ -66,18 +67,19 @@ def __init__(self, cache_dir: str, catalysis_credential_name: str = '') -> None:
self.exec_args = sorted(sys.argv[1:])
self.hashed_args = hashlib.sha1(",".join(self.exec_args).encode()).hexdigest()

def step_location(self, step_code: str, *args, **kwargs) -> Iterable[Path]:
def step_location(self, step_code: str, step_name: str, *args, **kwargs) -> Iterable[Path]:
input_args = list(args) + [i for pair in sorted(kwargs.items(), key=lambda x: x[0]) for i in pair]
input_args = ",".join([str(i) for i in input_args]).encode()
loc = Path(os.path.join(self.cache_dir,
self.hashed_name,
self.hashed_args,
hashlib.sha1(step_name.encode()).hexdigest(),
hashlib.sha1(step_code.encode()).hexdigest(),
hashlib.sha1(input_args).hexdigest()))
return loc, loc / 'return', loc / 'context'

def get_step(self, step_name: str, step_code: str, context, *args, **kwargs):
location, return_location, context_location = self.step_location(step_code, *args, **kwargs)
location, return_location, context_location = self.step_location(step_code, step_name, *args, **kwargs)
return_file_name = return_location / '0'

if self.catalysis_client is not None:
Expand Down Expand Up @@ -114,12 +116,17 @@ def get_step(self, step_name: str, step_code: str, context, *args, **kwargs):
return None

return None
elif Path(os.path.join(self.cache_dir, self.hashed_name, self.hashed_args,
hashlib.sha1(step_name.encode()).hexdigest())).exists():
LOG.info(f'Deleting cache for step {step_name}')
rmtree(os.path.join(self.cache_dir, self.hashed_name, self.hashed_args,
hashlib.sha1(step_name.encode()).hexdigest()), ignore_errors=True)

raise StepNotCached(f"The step '{step_name}' is not yet cached.")

def save_step(self, step_code: str, step_return, context, *args, **kwargs) -> None:
def save_step(self, step_code: str, step_name: str, step_return, context, *args, **kwargs) -> None:

location, return_location, context_location = self.step_location(step_code, *args, **kwargs)
location, return_location, context_location = self.step_location(step_code, step_name, *args, **kwargs)

# Only create dirs if we don't have a catalysis client, otherwise the driver already takes
# care of creating our directories whenever we write to a file with non-existent path.
Expand Down
4 changes: 2 additions & 2 deletions tests/storage_with_catalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async def test_save_step_remotely(self):
stripping, context = setup_stripping_with_catalysis(cache_dir, "local")

storage = stripping.cache.storage
storage.save_step('Pass', 'RETURN_HERE', context)
storage.save_step('Pass', 'step_name', 'RETURN_HERE', context)

async def test_get_step_remotely(self):
random = str(uuid.uuid4())
Expand All @@ -61,7 +61,7 @@ async def test_get_step_remotely(self):
stripping, context = setup_stripping_with_catalysis(cache_dir, "local")

storage = stripping.cache.storage
storage.save_step('Pass', 'RETURN_HERE', context)
storage.save_step('Pass', 'step_name', 'RETURN_HERE', context)

aux = storage.step_location('Pass')
# The len(aux) should be 3 because it means
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cache_invalidation.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def tearDown(self):
shutil.rmtree(tmp_dir, ignore_errors=True)

async def test_strategy(self):
self.storage.save_step('Pass', 'RETURN_HERE', context)
self.storage.save_step('Pass', 'name', 'RETURN_HERE', context)
await self.cache_invalidation.strategy()
self.assertTrue(len(glob(f'{tmp_dir}{os.sep}*')) >= 0)

Expand Down
18 changes: 9 additions & 9 deletions tests/test_cache_invalidation_with_catalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
##


from os.path import split, join, exists
import asynctest
import shutil
import asyncio
import tracemalloc
import datetime
import os
from freezegun import freeze_time
import shutil
import tracemalloc
from glob import glob
from os.path import split, join

import asynctest
from freezegun import freeze_time

from stripping.cache import StepCache, CacheInvalidation
from stripping import setup_stripping
from stripping.cache import CacheInvalidation
from stripping.storage import CacheStorage

tmp_dir = join(split(__file__)[0], '.test_cache')
Expand All @@ -56,11 +56,11 @@ def tearDownClass(cls):
shutil.rmtree(tmp_dir, ignore_errors=True)

async def test_strategy(self):
self.storage.save_step('Pass', 'RETURN_HERE', context)
self.storage.save_step('Pass', 'step_name', 'RETURN_HERE', context)
await self.cache_invalidation.strategy()
self.assertTrue(len(glob(f'{tmp_dir}{os.sep}*')) >= 0)

with freeze_time(datetime.datetime.now() + datetime.timedelta(days=0.25*365)):
with freeze_time(datetime.datetime.now() + datetime.timedelta(days=0.25 * 365)):
await self.cache_invalidation.strategy()

self.assertEqual(0, len(glob(f'{tmp_dir}{os.sep}*')))
8 changes: 4 additions & 4 deletions tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ def tearDown(self):
shutil.rmtree(tmp_dir, ignore_errors=True)

def test_save_step(self):
self.storage.save_step('Pass', 'RETURN_HERE', context)
aux = self.storage.get_step('', 'Pass', context)
self.storage.save_step(step_code='Pass', step_name='step_name', step_return='RETURN_HERE', context=context)
aux = self.storage.get_step(step_name='step_name', step_code='Pass', context=context)
self.assertEqual(aux, 'RETURN_HERE') # That means the code/step was found
with self.assertRaises(StepNotCached):
self.storage.get_step('', 'PHONY_CODE', context)
self.storage.get_step('', 'bla', 'PHONY_CODE', context)

def test_step_location(self):
aux = self.storage.step_location('Pass')
aux = self.storage.step_location(step_code='Pass', step_name='step_name')
# The len(aux) should be 3 because it means
# location, return location and context location path
self.assertEqual(3, len(aux))

0 comments on commit 28ad684

Please sign in to comment.