diff --git a/panos/__init__.py b/panos/__init__.py index a63d74e1..fedab2dd 100755 --- a/panos/__init__.py +++ b/panos/__init__.py @@ -127,6 +127,12 @@ def isstring(arg): pan.DEBUG3 = pan.DEBUG2 - 1 +def stringToVersion(other): + if isstring(other): + other = PanOSVersion(other) + return other + + class PanOSVersion(LooseVersion): """LooseVersion with convenience properties to access version components""" @@ -150,6 +156,10 @@ def patch(self): def mainrelease(self): return self.version[0:3] + @property + def minorrelease(self): + return self.version[0:2] + @property def subrelease(self): try: @@ -174,6 +184,19 @@ def subrelease_num(self): subrelease_num = None return subrelease_num + @property + def baseimage(self): + # Account for lack of PAN-OS 7.0.0 + if self.major == 7 and self.minor == 0: + base_patch = "1" + else: + base_patch = "0" + version_string = str(self) + version_tokens = version_string.split("-")[0].split(".") + version_tokens[2] = base_patch + base_image_string = ".".join(version_tokens) + return PanOSVersion(base_image_string) + def __repr__(self): return "PanOSVersion ('%s')" % str(self) @@ -221,12 +244,6 @@ def __ne__(self, other): return not self.__eq__(other) -def stringToVersion(other): - if isstring(other): - other = PanOSVersion(other) - return other - - def tree_legend_dot(): """Create a graphviz dot string for a legend graph""" modules = ["firewall", "policies", "objects", "network", "device", "panorama", "ha"] diff --git a/panos/updater.py b/panos/updater.py index bbf025e4..8005d333 100644 --- a/panos/updater.py +++ b/panos/updater.py @@ -17,6 +17,8 @@ """Device updater handles software versions and updates for devices""" +from typing import Any, List, Literal, Optional, Union, cast + from pan.config import PanConfig import panos.errors as err @@ -151,7 +153,12 @@ def _parse_current_version(self, response_element): self._logger.debug("Found current version: %s" % current_version) return current_version - def download_install(self, version, load_config=None, sync=False): + def download_install( + self, + version: Union[str, PanOSVersion], + load_config: Optional[str] = None, + sync: bool = False, + ) -> Any: """Download and install the requested PAN-OS version. Like a combinations of the ``check()``, ``download()``, and @@ -173,13 +180,11 @@ def download_install(self, version, load_config=None, sync=False): If sync, returns result of PAN-OS install job """ - if isstring(version): - version = PanOSVersion(version) # Get list of software if needed if not self.versions: self.check() # Get versions as StrictVersion objects - available_versions = map(PanOSVersion, self.versions.keys()) + available_versions = list(map(PanOSVersion, self.versions.keys())) target_version = PanOSVersion(str(version)) current_version = PanOSVersion(self.pandevice.version) @@ -202,7 +207,12 @@ def download_install(self, version, load_config=None, sync=False): result = self.install(target_version, load_config=load_config, sync=sync) return result - def download_install_reboot(self, version, load_config=None, sync=False): + def download_install_reboot( + self, + version: Union[str, PanOSVersion], + load_config: Optional[str] = None, + sync: bool = False, + ) -> Optional[str]: """Download and install the requested PAN-OS version, then reboot. Like a combinations of the ``check()``, ``download()``, and @@ -219,17 +229,16 @@ def download_install_reboot(self, version, load_config=None, sync=False): err.PanDeviceError: problem found in pre-download checks or after reboot """ - if isstring(version): - version = PanOSVersion(version) - self.download_install(version, load_config, sync=True) + target_version = PanOSVersion(str(version)) + self.download_install(target_version, load_config, sync=True) # Reboot the device self._logger.info( - "Device %s is rebooting after upgrading to version %s. This will take a while." + "Device %s is rebooting after upgrading to version %s. This will take a while." % (self.pandevice.id, version) ) self.pandevice.restart() if sync: - new_version = self.pandevice.syncreboot() + new_version: str = self.pandevice.syncreboot() if version != new_version: raise err.PanDeviceError( "Attempt to upgrade to version %s failed." @@ -241,14 +250,82 @@ def download_install_reboot(self, version, load_config=None, sync=False): else: return None - def upgrade_to_version(self, target_version, dryrun=False): + def _next_upgrade_version( + self, + target_version: Union[PanOSVersion, Literal["latest"]], + install_base: bool, + ) -> PanOSVersion: + current_version = PanOSVersion(self.pandevice.version) + if target_version != "latest" and current_version == target_version: + return None + available_versions = list(map(PanOSVersion, self.versions.keys())) + latest_version = max(available_versions) + next_minor_version = self._next_minor_version(current_version) + if next_minor_version not in available_versions: + next_minor_version = None + if install_base: + if target_version == "latest": + return ( + next_minor_version + if next_minor_version is not None + else latest_version + ) + elif self._direct_upgrade_possible( + current_version, target_version, install_base + ): + # No minor upgrade needed to target + return target_version + elif next_minor_version is None: + return latest_version + else: + return next_minor_version + else: + if target_version == "latest": + if next_minor_version is None: + return latest_version + else: + return self._latest_patch_version( + next_minor_version, available_versions + ) + elif self._direct_upgrade_possible( + current_version, target_version, install_base + ): + return target_version + else: + # More than one minor upgrade needed to target + return self._latest_patch_version( + next_minor_version, available_versions + ) + + def _current_version_is_target( + self, target_version: Union[PanOSVersion, Literal["latest"]] + ) -> bool: + current_version = PanOSVersion(self.pandevice.version) + available_versions = list(map(PanOSVersion, self.versions.keys())) + latest_version = max(available_versions) + if target_version == "latest" and current_version == latest_version: + return True + elif current_version == target_version: + return True + else: + return False + + def upgrade_to_version( + self, + target_version: Union[str, PanOSVersion], + dryrun: bool = False, + install_base: bool = True, + ) -> List[str]: """Upgrade to the target version, completing all intermediate upgrades. For example, if firewall is running version 9.0.5 and target version is 10.0.2, then this method will proceed through the following steps: + - Download 9.1.0 - Upgrade to 9.1.0 and reboot - - Upgrade to 10.0.0 and reboot + - Download 10.0.0 + - Upgrade to 10.0.0 and reboot (to skip this step, set `install_base` to False) + - Download 10.0.2 - Upgrade to 10.0.2 and reboot Does not account for HA pairs. @@ -259,13 +336,17 @@ def upgrade_to_version(self, target_version, dryrun=False): from panos.firewall import Firewall - fw = Firewall("10.0.0.5", "admin", "password") + fw = Firewall("192.168.1.1", "admin", "password") fw.software.upgrade_to_version("10.0.2") Args: target_version (string): PAN-OS version (eg. "10.0.2") or "latest" dryrun (bool, optional): Log what steps would be taken, but don't make any changes to the live device. Defaults to False. + install_base (bool, optional): The upgrade path will include an + upgrade to each base image (eg. 10.0.0) before upgrade to the + patch version. If this is False, the base image will download + but not install. Raises: err.PanDeviceError: any problem during the upgrade process @@ -279,10 +360,11 @@ def upgrade_to_version(self, target_version, dryrun=False): starting_version = self.pandevice.version # Get versions as StrictVersion objects - available_versions = map(PanOSVersion, self.versions.keys()) + target_is_latest = target_version == "latest" + target_version = ( + PanOSVersion(str(target_version)) if not target_is_latest else "latest" + ) current_version = PanOSVersion(self.pandevice.version) - latest_version = max(available_versions) - next_minor_version = self._next_minor_version(current_version) # Check that this is an upgrade, not a downgrade if current_version > target_version: @@ -291,55 +373,41 @@ def upgrade_to_version(self, target_version, dryrun=False): % (self.pandevice.id, self.pandevice.version, target_version) ) - # Determine the next version to upgrade to - if target_version == "latest": - next_version = min(latest_version, next_minor_version) - elif latest_version < target_version: - next_version = next_minor_version - elif not self._direct_upgrade_possible(current_version, target_version): - next_version = next_minor_version - else: - next_version = PanOSVersion(str(target_version)) - - if next_version not in available_versions and not dryrun: - self._logger.info( - "Device %s upgrading to %s, currently on %s. Checking for newer versions." - % (self.pandevice.id, target_version, self.pandevice.version) - ) - self.check() - available_versions = map(PanOSVersion, self.versions.keys()) - latest_version = max(available_versions) - # Check if done upgrading - if current_version == target_version: + if self._current_version_is_target(target_version): self._logger.info( "Device %s is running target version: %s" - % (self.pandevice.id, target_version) - ) - return True - elif target_version == "latest" and current_version == latest_version: - self._logger.info( - "Device %s is running latest version: %s" - % (self.pandevice.id, latest_version) + % (self.pandevice.id, current_version) ) if dryrun: self._logger.info( - "NOTE: dryrun with 'latest' does not show all upgrades," - ) - self._logger.info( - "as new versions are learned through the upgrade process," + "NOTE: dryrun with 'latest' does not show all upgrades, as new versions are learned through the upgrade process, so results may be different than dryrun output when using 'latest'." ) + return [str(current_version)] + + # Determine the next version to upgrade to + next_version = self._next_upgrade_version(target_version, install_base) + + # Download base image if needed + if ( + not install_base + and not self.versions[str(next_version.baseimage)]["downloaded"] + ): + if dryrun: self._logger.info( - "so results may be different than dryrun output when using 'latest'." + "Device %s will download base image: %s" + % (self.pandevice.id, next_version.baseimage) ) - return True + else: + self.download(next_version.baseimage, sync=True) # Ensure the content pack is upgraded to the latest - self.pandevice.content.download_and_install_latest(sync=True) + if not dryrun: + self.pandevice.content.download_and_install_latest(sync=True) # Upgrade to the next version self._logger.info( - "Device %s will be upgraded to version: %s" + "Device %s will download and upgrade to version: %s" % (self.pandevice.id, next_version) ) if dryrun: @@ -347,10 +415,12 @@ def upgrade_to_version(self, target_version, dryrun=False): else: self.download_install_reboot(next_version, sync=True) self.check() - result = self.upgrade_to_version(target_version, dryrun=dryrun) + result = self.upgrade_to_version( + target_version, dryrun=dryrun, install_base=install_base + ) if result and dryrun: self.pandevice.version = starting_version - return result + return [str(current_version)] + result def _next_major_version(self, version): if isstring(version): @@ -361,12 +431,15 @@ def _next_major_version(self, version): next_version = PanOSVersion("7.0.1") return next_version - def _next_minor_version(self, version): + def _next_minor_version(self, version: Union[PanOSVersion, str]) -> PanOSVersion: from panos.firewall import Firewall - if isstring(version): - next_version = PanOSVersion(version) - if version.minor == 1: + version = PanOSVersion(str(version)) + + # Account for 10.2.x (only release with minor version of '2') + if version.major == 10 and version.minor == 1: + next_version = PanOSVersion("10.2.0") + elif version.minor > 0: next_version = PanOSVersion(str(version.major + 1) + ".0.0") # There is no PAN-OS 5.1 for firewalls, so next minor release from 5.0.x is 6.0.0. elif ( @@ -390,7 +463,22 @@ def _next_patch_version(self, version): ) return next_version - def _direct_upgrade_possible(self, current_version, target_version): + def _latest_patch_version( + self, version: Union[str, PanOSVersion], available_versions: List[PanOSVersion] + ): + if isstring(version): + version = PanOSVersion(version) + found_patch = False + latest_patch: PanOSVersion = PanOSVersion("0.0.0") + for v in available_versions: + if v.major == version.major and v.minor == version.minor: + latest_patch = max(latest_patch, v) + found_patch = True + return latest_patch if found_patch else None + + def _direct_upgrade_possible( + self, current_version, target_version, install_base=True + ): """Check if current version can directly upgrade to target version :returns True if a direct upgrade is possible, False if not @@ -415,7 +503,7 @@ def _direct_upgrade_possible(self, current_version, target_version): current_version.major == target_version.major and current_version.minor == 0 and target_version.minor == 1 - and target_version.patch == 0 + and (not install_base or target_version.patch == 0) ): return True @@ -425,10 +513,12 @@ def _direct_upgrade_possible(self, current_version, target_version): current_version.major + 1 == target_version.major and current_version.minor == 1 and target_version.minor == 0 - and target_version.patch == 0 + and (not install_base or target_version.patch == 0) ): return True + # SPECIAL CASES + # Upgrading a firewall from PAN-OS 5.0.x to 6.0.x # This is a special case because there is no PAN-OS 5.1.x from panos.firewall import Firewall @@ -441,6 +531,17 @@ def _direct_upgrade_possible(self, current_version, target_version): ): return True + # Upgrade from PAN-OS 10.1.x to 10.2.x + # This is a special case because only minor release with a 2 + if ( + current_version.major == 10 + and current_version.minor == 1 + and target_version.major == 10 + and target_version.minor == 2 + and (not install_base or target_version.patch == 0) + ): + return True + return False diff --git a/tests/test_updater.py b/tests/test_updater.py new file mode 100644 index 00000000..f7139658 --- /dev/null +++ b/tests/test_updater.py @@ -0,0 +1,112 @@ +try: + from unittest import mock +except ImportError: + import mock + +from panos import PanOSVersion +from panos.firewall import Firewall + + +def _fw(version): + fw = Firewall("127.0.0.1", "admin", "admin", "secret") + fw._set_version_and_version_info(version) + return fw + + +def _updater_fw_setup(*args): + fw = _fw() + + return fw + + +def versionStrToTuple(version_string): + tokens = version_string.split(".")[:3] + tokens[2] = tokens[2].split("-")[0] + return tuple(int(x) for x in tokens) + + +def _create_mock_check(fw): + patches = range(5) + + def mock_check(): + version_info = versionStrToTuple(fw.version) + current_minor = ".".join(map(lambda x: str(x), version_info[0:-1])) + if version_info[1] == 0: + next_minor = ".".join([str(version_info[0]), "1"]) + else: + next_minor = ".".join([str(version_info[0] + 1), "0"]) + versions = [".".join((str(current_minor), str(patch))) for patch in patches] + versions += [".".join((str(next_minor), str(patch))) for patch in patches] + fw.software.versions = {version: {"downloaded": False} for version in versions} + + return mock_check + + +def _create_mock_download_install_reboot(fw): + def mock_download_install_reboot(next_version, sync): + fw.version = str(next_version) + return next_version + + return mock_download_install_reboot + + +def test_upgrade_to_version_with_install_base(): + fw = _fw("8.0.2") + + fw.software.check = mock.Mock(side_effect=_create_mock_check(fw)) + fw.software.download_install_reboot = mock.Mock( + side_effect=_create_mock_download_install_reboot(fw) + ) + fw.content.download_and_install_latest = mock.Mock() + fw.software.download = mock.Mock() + + result = fw.software.upgrade_to_version("10.1.3") + assert result == ["8.0.2", "8.1.0", "9.0.0", "9.1.0", "10.0.0", "10.1.0", "10.1.3"] + + +def test_upgrade_to_version_without_install_base(): + fw = _fw("8.0.2") + + fw.software.check = mock.Mock(side_effect=_create_mock_check(fw)) + fw.software.download_install_reboot = mock.Mock( + side_effect=_create_mock_download_install_reboot(fw) + ) + fw.content.download_and_install_latest = mock.Mock() + fw.software.download = mock.Mock() + + result = fw.software.upgrade_to_version("10.1.3", install_base=False) + assert result == ["8.0.2", "8.1.4", "9.0.4", "9.1.4", "10.0.4", "10.1.3"] + + +def test_next_upgrade_version_with_10_2_with_install_base(): + fw = _fw("10.1.3") + fw.software.versions = { + "10.1.0": "", + "10.1.1": "", + "10.1.2": "", + "10.1.3": "", + "10.1.4": "", + "10.2.0": "", + "10.2.1": "", + "10.2.2": "", + "10.2.3": "", + } + result = fw.software._next_upgrade_version("11.0.2", install_base=True) + assert result == PanOSVersion("10.2.0") + + +def test_next_upgrade_version_with_10_2_without_install_base(): + fw = _fw("10.1.3") + fw.software.versions = { + "10.1.0": "", + "10.1.1": "", + "10.1.2": "", + "10.1.3": "", + "10.1.4": "", + "10.2.0": "", + "10.2.1": "", + "10.2.2": "", + "10.2.3": "", + } + result = fw.software._next_upgrade_version("11.0.2", install_base=False) + assert result == PanOSVersion("10.2.3")