Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

15 make the tests work #40

Merged
merged 13 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*.pyc
.pytest_cache/
.spyproject
**/__pycache__/

# Virtual environment
env/
Expand Down
179 changes: 104 additions & 75 deletions contrib/scripts/sensitive_strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ def __init__(
self.sensitive_strings_csv = sensitive_strings_csv
self.allowed_binary_files_csv = allowed_binary_files_csv
self.cache_file_csv = cache_file_csv
self.interactive = False
self._interactive = False
self.verify_all_on_behalf_of_user = False
self.remove_unfound_binaries = False
self.date_time_str = tdt.current_date_time_string_forfile()
self.tmp_dir_base = ft.norm_path(
os.path.join(orp.opencsp_temporary_dir(), "SensitiveStringSearcher")
)
self.git_files_only = True
self.is_hdf5_searcher = False
self.has_backed_up_allowed_binaries_csv = False

self.matchers = self.build_matchers()
self.matches: dict[str, list[ssm.Match]] = {}
Expand All @@ -56,6 +59,14 @@ def __init__(
self.cached_cleared_files: list[fc.FileCache] = []
self.new_cached_cleared_files: list[fc.FileCache] = []

@property
def interactive(self):
return self._interactive or self.verify_all_on_behalf_of_user

@interactive.setter
def interactive(self, val: bool):
self._interactive = val

def __del__(self):
if ft.directory_exists(self.tmp_dir_base):
tmp_dirs = ft.files_in_directory(self.tmp_dir_base, files_only=False)
Expand Down Expand Up @@ -194,6 +205,7 @@ def search_hdf5_file(self, hdf5_file: ff.FileFingerprint):
h5_dir, self.sensitive_strings_csv, tmp_allowed_binary_csv
)
hdf5_searcher.interactive = self.interactive
hdf5_searcher.verify_all_on_behalf_of_user = self.verify_all_on_behalf_of_user
hdf5_searcher.date_time_str = self.date_time_str
hdf5_searcher.tmp_dir_base = self.tmp_dir_base
hdf5_searcher.git_files_only = False
Expand Down Expand Up @@ -221,12 +233,7 @@ def search_hdf5_file(self, hdf5_file: ff.FileFingerprint):

# Ask the user about signing off
if self.interactive:
lt.info("Do you want to sign off on this file anyways (y/n)?")
val = input("")[0]
lt.info(f" User responded '{val}'")
if val.lower() == 'y':
matches = []
else:
if not self.verify_interactively(file_relpath_name_ext):
matches.append(
ssm.Match(0, 0, 0, "", "", None, "HDF5 file denied by user")
)
Expand Down Expand Up @@ -259,6 +266,49 @@ def search_hdf5_file(self, hdf5_file: ff.FileFingerprint):

return matches

def verify_interactively(self, relative_path_name_ext: str, cv_img: Image.Image = None, cv_title: str = None):
if cv_img is None:
lt.info("")
lt.info("Unknown binary file:")
lt.info(" " + relative_path_name_ext)
lt.info(
"Is this unknown binary file safe to add, and doesn't contain any sensitive information (y/n)?"
)
if self.verify_all_on_behalf_of_user:
val = 'y'
else:
resp = input("").strip()
val = 'n' if len(resp) == 0 else resp[0]
lt.info(f" User responded '{val}'")

else:
lt.info("")
lt.info(
"Is this image safe to add, and doesn't contain any sensitive information (y/n)?"
)
if self.verify_all_on_behalf_of_user:
val = 'y'
else:
cv2.imshow(cv_title, cv_img)
key = cv2.waitKey(0)
cv2.destroyAllWindows()
time.sleep(0.1) # small delay to prevent accidental double-bounces

# Check for 'y' or 'n'
if key == ord('y') or key == ord('Y'):
val = 'y'
elif key == ord('n') or key == ord('N'):
val = 'n'
else:
val = '?'
if val.lower() in ["y", "n"]:
lt.info(f" User responded '{val}'")
else:
lt.error("Did not respond with either 'y' or 'n'. Assuming 'n'.")
val = 'n'

return val.lower() == 'y'

def search_binary_file(self, binary_file: ff.FileFingerprint) -> list[ssm.Match]:
norm_path = self.norm_path(binary_file.relative_path, binary_file.name_ext)
_, _, ext = ft.path_components(norm_path)
Expand All @@ -280,15 +330,7 @@ def search_binary_file(self, binary_file: ff.FileFingerprint) -> list[ssm.Match]
matches += self.search_hdf5_file(binary_file)

else:
lt.info("")
lt.info("Unknown binary file:")
lt.info(" " + relative_path_name_ext)
lt.info(
"Is this unknown binary file safe to add, and doesn't contain any sensitive information (y/n)?"
)
val = input("")[0]
lt.info(f" User responded '{val}'")
if val.lower() != 'y':
if not self.verify_interactively(relative_path_name_ext):
matches.append(ssm.Match(0, 0, 0, "", "", None, "Unknown binary file"))

return matches
Expand Down Expand Up @@ -318,15 +360,7 @@ def interactive_image_sign_off(
description=f"{file_ff.relative_path}/{file_ff.name_ext}",
)
else:
lt.info(
"Unknown image file failed to open. Do you want to sign off on this file anyways (y/n)?"
)
val = input("")[0]
lt.info(f" User responded '{val}'")
if val.lower() == 'y':
return True
else:
return False
return self.verify_interactively(file_ff.relative_path)
# if img is not None
else:
return False
Expand All @@ -347,29 +381,7 @@ def interactive_image_sign_off(
rescaled = " (downscaled)"

# Show the image and prompt the user
lt.info("")
lt.info(
"Is this image safe to add, and doesn't contain any sensitive information (y/n)?"
)
cv2.imshow(description + rescaled, np_image)
key = cv2.waitKey(0)
cv2.destroyAllWindows()
time.sleep(0.1) # small delay to prevent accidental double-bounces

# Check for 'y' or 'n'
if key == ord('y') or key == ord('Y'):
val = 'y'
elif key == ord('n') or key == ord('N'):
val = 'n'
else:
val = '?'
if val.lower() in ["y", "n"]:
lt.info(f" User responded '{val}'")
else:
lt.error("Did not respond with either 'y' or 'n'. Assuming 'n'.")
val = 'n'

ret = val == 'y'
ret = self.verify_interactively(description, np_image, description+rescaled)
return ret

def _init_files_lists(self):
Expand Down Expand Up @@ -404,6 +416,29 @@ def _init_files_lists(self):
self.cached_cleared_files.clear()
self.new_cached_cleared_files.append(sensitive_strings_cache)

def create_backup_allowed_binaries_csv(self):
path, name, ext = ft.path_components(self.allowed_binary_files_csv)
backup_name_ext = f"{name}_backup_{self.date_time_str}{ext}"
backup_path_name_ext = os.path.join(path, backup_name_ext)
if ft.file_exists(backup_path_name_ext):
ft.delete_file(backup_path_name_ext)
ft.copy_file(self.allowed_binary_files_csv, path, backup_name_ext)
self.has_backed_up_allowed_binaries_csv = True

def update_allowed_binaries_csv(self):
# Overwrite the allowed list csv file with the updated allowed_binary_files
if not self.has_backed_up_allowed_binaries_csv:
self.create_backup_allowed_binaries_csv()
path, name, ext = ft.path_components(self.allowed_binary_files_csv)
self.allowed_binary_files = sorted(self.allowed_binary_files)

self.allowed_binary_files[0].to_csv(
"Allowed Binary Files",
path,
name,
rows=self.allowed_binary_files,
)

def search_files(self):
self._init_files_lists()
if self.git_files_only:
Expand Down Expand Up @@ -463,6 +498,13 @@ def search_files(self):
else:
self._register_file_in_cleared_cache(file_path, file_name_ext)

# Potentially remove unfound binary files
if len(self.unfound_allowed_binary_files) > 0 and self.remove_unfound_binaries:
for file in self.unfound_allowed_binary_files:
self.allowed_binary_files.remove(file)
self.unfound_allowed_binary_files.clear()
self.update_allowed_binaries_csv()

# Print initial information about matching files and problematic binary files
if len(matches) > 0:
lt.error(f"Found {len(matches)} files containing sensitive strings:")
Expand Down Expand Up @@ -498,28 +540,9 @@ def search_files(self):
self.unknown_binary_files.remove(file_ff)
self.allowed_binary_files.append(file_ff)

# First, make a backup copy of the allowed list csv file
if num_signed_binary_files == 0:
path, name, ext = ft.path_components(
self.allowed_binary_files_csv
)
backup_name_ext = f"{name}_backup_{self.date_time_str}{ext}"
backup_path_name_ext = os.path.join(path, backup_name_ext)
if ft.file_exists(backup_path_name_ext):
ft.delete_file(backup_path_name_ext)
ft.copy_file(
self.allowed_binary_files_csv, path, backup_name_ext
)

# Overwrite the allowed list csv file with the updated allowed_binary_files
path, name, ext = ft.path_components(self.allowed_binary_files_csv)
self.allowed_binary_files = sorted(self.allowed_binary_files)
file_ff.to_csv(
"Allowed Binary Files",
path,
name,
rows=self.allowed_binary_files,
)
# and make a backup as necessary.
self.update_allowed_binaries_csv()

num_signed_binary_files += 1

Expand Down Expand Up @@ -620,14 +643,18 @@ def search_files(self):
parser = argparse.ArgumentParser(
prog=__file__.rstrip(".py"), description='Sensitive strings searcher'
)
parser.add_argument(
'--no-interactive',
action='store_true',
dest="ninteractive",
help="Don't interactively ask the user about unknown binary files. Simply fail instead.",
)
parser.add_argument('--no-interactive', action='store_true', dest="ninteractive",
help="Don't interactively ask the user about unknown binary files. Simply fail instead.")
parser.add_argument('--accept-all', action='store_true', dest="acceptall",
help="Don't interactively ask the user about unknown binary files. Simply accept all as verified on the user's behalf. " +
"This can be useful when you're confident that the only changes have been that the binary files have moved but not changed.")
parser.add_argument('--accept-unfound', action='store_true', dest="acceptunfound",
help="Don't fail because of unfound expected binary files. Instead remove the expected files from the list of allowed binaries. " +
"This can be useful when you're confident that the only changes have been that the binary files have moved but not changed.")
args = parser.parse_args()
not_interactive: bool = args.ninteractive
accept_all: bool = args.acceptall
remove_unfound_binaries: bool = args.acceptunfound

ss_log_dir = ft.norm_path(
opencsp_settings['sensitive_strings']['sensitive_strings_dir']
Expand All @@ -652,6 +679,8 @@ def search_files(self):
root_search_dir, sensitive_strings_csv, allowed_binary_files_csv, ss_cache_file
)
searcher.interactive = not not_interactive
searcher.verify_all_on_behalf_of_user = accept_all
searcher.remove_unfound_binaries = remove_unfound_binaries
searcher.date_time_str = date_time_str
num_errors = searcher.search_files()

Expand Down
2 changes: 1 addition & 1 deletion contrib/scripts/test/test_sensitive_strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_directory_matcher(self):
def test_all_matches(self):
sensitive_strings_csv = os.path.join(self.ss_dir, "test_all_matches.csv")
searcher = ss.SensitiveStringsSearcher(
self.root_search_dir, sensitive_strings_csv, self.all_binaries
self.root_search_dir, sensitive_strings_csv, self.no_binaries
)
searcher.git_files_only = False
# 6 matches:
Expand Down
3 changes: 3 additions & 0 deletions opencsp/common/lib/camera/ImageAcquisition_DCAM_color.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from opencsp.common.lib.camera.image_processing import encode_RG_to_RGB
from opencsp.common.lib.camera.ImageAcquisitionAbstract import ImageAcquisitionAbstract
from opencsp.common.lib.camera.ImageAcquisition_DCAM_mono import ImageAcquisition as MonoIA


class ImageAcquisition(ImageAcquisitionAbstract):
Expand All @@ -23,6 +24,8 @@ def __init__(self, instance: int = 0, pixel_format: str = 'BayerRG12'):
- Other RGB based formats as defined by Basler

"""
MonoIA._check_pypylon_version()

# Find all instances of DCAM cameras
tlFactory = pylon.TlFactory.GetInstance()
devices = tlFactory.EnumerateDevices()
Expand Down
21 changes: 21 additions & 0 deletions opencsp/common/lib/camera/ImageAcquisition_DCAM_mono.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from importlib.metadata import version
import numpy as np

from pypylon import pylon

from opencsp.common.lib.camera.ImageAcquisitionAbstract import ImageAcquisitionAbstract
import opencsp.common.lib.tool.log_tools as lt


class ImageAcquisition(ImageAcquisitionAbstract):
_has_checked_pypylon_version = False

def __init__(self, instance: int = 0, pixel_format: str = 'Mono8'):
"""
Class to control a Basler DCAM monochromatic camera. Grabs one frame
Expand All @@ -22,6 +27,8 @@ def __init__(self, instance: int = 0, pixel_format: str = 'Mono8'):
- Others as defined by Basler

"""
ImageAcquisition._check_pypylon_version()

# Find all instances of DCAM cameras
tlFactory = pylon.TlFactory.GetInstance()
devices = tlFactory.EnumerateDevices()
Expand Down Expand Up @@ -64,6 +71,20 @@ def __init__(self, instance: int = 0, pixel_format: str = 'Mono8'):
shutter_min, shutter_max, 2**13
).astype(int)

@classmethod
def _check_pypylon_version(cls):
if not cls._has_checked_pypylon_version:
pypylon_version = version('pypylon')
suggested_pypylon_version = "3.0" # latest release as of 2024/03/21

if pypylon_version < suggested_pypylon_version:
lt.warn("Warning in ImageAcquisition_DCAM_mono.py: " +
f"pypylon version {pypylon_version} is behind the suggested version {suggested_pypylon_version}. " +
"If you have trouble grabbing frames with the basler camera, try upgrading your version of pypylon " +
"with \"python -m pip install --upgrade pypylon\".")

cls._has_checked_pypylon_version = True

def get_frame(self) -> np.ndarray:
# Start frame capture
self.cap.StartGrabbingMax(1)
Expand Down
3 changes: 2 additions & 1 deletion opencsp/common/lib/geometry/test/test_LoopXY.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from opencsp.common.lib.geometry.LoopXY import LoopXY
from opencsp.common.lib.geometry.LineXY import LineXY
from opencsp.common.lib.geometry.Vxy import Vxy
import opencsp.common.lib.render.figure_management as fm


class TestLoopXY(unittest.TestCase):
Expand Down Expand Up @@ -124,7 +125,7 @@ def test_draw(self):
verts = Vxy(([1, 0, 0, 1], [1, 1, 0, 0]))
loop = LoopXY.from_vertices(verts)

fig = plt.figure()
fig = fm._mpl_pyplot_figure()
loop.draw(fig.gca())
plt.close(fig)

Expand Down
3 changes: 2 additions & 1 deletion opencsp/common/lib/geometry/test/test_RegionXY.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from opencsp.common.lib.geometry.LoopXY import LoopXY
from opencsp.common.lib.geometry.RegionXY import RegionXY
from opencsp.common.lib.geometry.Vxy import Vxy
import opencsp.common.lib.render.figure_management as fm


class TestRegionXY:
Expand All @@ -30,6 +31,6 @@ def test_draw(self):
loop = LoopXY.from_vertices(verts)
region = RegionXY(loop)

fig = plt.figure()
fig = fm._mpl_pyplot_figure()
region.draw(fig.gca())
plt.close(fig)
Loading
Loading