diff --git a/docs/configtree.py b/docs/configtree.py index bb7a3214..85a382c9 100755 --- a/docs/configtree.py +++ b/docs/configtree.py @@ -70,8 +70,10 @@ def node_style(cls): style = "style=filled " + nodestyle[module] + " " except: pass - result = ' {0} [{1}URL="../module-{2}.html#panos.{3}" target="_top"];\n'.format( - cls_name, style, module, cls + result = ( + ' {0} [{1}URL="../module-{2}.html#panos.{3}" target="_top"];\n'.format( + cls_name, style, module, cls + ) ) else: if style: diff --git a/examples/dyn_address_group.py b/examples/dyn_address_group.py index 248769ca..501e3c35 100755 --- a/examples/dyn_address_group.py +++ b/examples/dyn_address_group.py @@ -57,7 +57,6 @@ def main(): - # Get command line arguments parser = argparse.ArgumentParser( description="Tag an IP address on a Palo Alto Networks Next generation Firewall" @@ -111,7 +110,11 @@ def main(): logging.basicConfig(format=logging_format, level=logging_level) # Connect to the device and determine its type (Firewall or Panorama). - device = PanDevice.create_from_device(args.hostname, args.username, args.password,) + device = PanDevice.create_from_device( + args.hostname, + args.username, + args.password, + ) # Panorama does not have a userid API, so exit. # You can use the userid API on a firewall with the Panorama 'target' diff --git a/examples/upgrade.py b/examples/upgrade.py index 12e55236..70902615 100755 --- a/examples/upgrade.py +++ b/examples/upgrade.py @@ -53,7 +53,6 @@ def main(): - # Get command line arguments parser = argparse.ArgumentParser( description="Upgrade a Palo Alto Networks Firewall or Panorama to the specified version" @@ -95,7 +94,11 @@ def main(): # Connect to the device and determine its type (Firewall or Panorama). # This is important to know what version to upgrade to next. - device = PanDevice.create_from_device(args.hostname, args.username, args.password,) + device = PanDevice.create_from_device( + args.hostname, + args.username, + args.password, + ) # Perform the upgrades in sequence with reboots between each upgrade device.software.upgrade_to_version(args.version, args.dryrun) diff --git a/examples/userid.py b/examples/userid.py index 0ba04887..f94365e0 100755 --- a/examples/userid.py +++ b/examples/userid.py @@ -53,7 +53,6 @@ def main(): - # Get command line arguments parser = argparse.ArgumentParser( description="Update User-ID by adding or removing a user-to-ip mapping" @@ -90,7 +89,11 @@ def main(): logging.basicConfig(format=logging_format, level=logging_level) # Connect to the device and determine its type (Firewall or Panorama). - device = PanDevice.create_from_device(args.hostname, args.username, args.password,) + device = PanDevice.create_from_device( + args.hostname, + args.username, + args.password, + ) logging.debug("Detecting type of device") diff --git a/panos/__init__.py b/panos/__init__.py index b160f7f7..a63d74e1 100755 --- a/panos/__init__.py +++ b/panos/__init__.py @@ -178,13 +178,12 @@ def __repr__(self): return "PanOSVersion ('%s')" % str(self) def __lt__(self, other): - # Handle 'latest' is always higher if isstring(other) and other == "latest": return True other = stringToVersion(other) - for (x, y) in zip(self.mainrelease, other.mainrelease): + for x, y in zip(self.mainrelease, other.mainrelease): if x < y: return True if x > y: @@ -203,7 +202,6 @@ def __ge__(self, other): return not self.__lt__(other) def __eq__(self, other): - # Handle 'latest' which is always different if isstring(other) and other == "latest": return False @@ -241,7 +239,9 @@ def tree_legend_dot(): result += ( '{module} [style=filled fillcolor={color} URL="{url}' '/module-{module}.html" target="_blank"];'.format( - module=module, color=node_color(module), url=DOCUMENTATION_URL, + module=module, + color=node_color(module), + url=DOCUMENTATION_URL, ) ) result += "}" @@ -286,7 +286,11 @@ def string_or_list(value): value, ] return ( - list(value) if "__iter__" in dir(value) else [value,] + list(value) + if "__iter__" in dir(value) + else [ + value, + ] ) @@ -463,8 +467,8 @@ def node_color(module): def object_classes(): import inspect - from panos import errors - from panos import base + + from panos import base, errors current_module = sys.modules[__name__] @@ -476,15 +480,17 @@ def object_classes(): if the_cls not in omits: omits.append(the_cls) - from panos import device - from panos import firewall - from panos import ha - from panos import network - from panos import objects - from panos import panorama - from panos import plugins - from panos import policies - from panos import predefined + from panos import ( + device, + firewall, + ha, + network, + objects, + panorama, + plugins, + policies, + predefined, + ) classes = {} for pkg in (device, firewall, ha, network, objects, panorama, policies, predefined): diff --git a/panos/base.py b/panos/base.py index 120b91a8..e7d3aa61 100644 --- a/panos/base.py +++ b/panos/base.py @@ -446,7 +446,6 @@ def element(self, with_children=True, comparable=False): nextelement = root for section in path: - if section.find("|") != -1: # This is an element variable, so create an element containing # the variables's value @@ -1648,7 +1647,13 @@ def _set_reference( if var_type == "list": if var is None: update_needed = True - setattr(obj, reference_var, [self,]) + setattr( + obj, + reference_var, + [ + self, + ], + ) if update: obj.update(reference_var) elif not isinstance(var, list): @@ -1992,7 +1997,9 @@ def delete_similar(self): for chunk in chunk_instances_for_delete_similar(instances): dev.xapi.delete( "{0}/{1}[{2}]".format( - xpath, prefix, " or ".join(joiner.format(x.uid) for x in chunk), + xpath, + prefix, + " or ".join(joiner.format(x.uid) for x in chunk), ), retry_on_peer=self.HA_SYNC, ) @@ -2124,14 +2131,13 @@ def hierarchy_info(self): dict: Hierarchy information about this object. """ from panos.firewall import Firewall - from panos.panorama import DeviceGroup - from panos.panorama import Panorama - from panos.panorama import Template - from panos.panorama import TemplateStack + from panos.panorama import DeviceGroup, Panorama, Template, TemplateStack classes = panos.object_classes() configs = [ - [self.__class__,], + [ + self.__class__, + ], ] updated_configs = [] @@ -2143,7 +2149,10 @@ def hierarchy_info(self): configs.pop(num) for p in parents: configs.append( - chain + [p,] + chain + + [ + p, + ] ) break else: @@ -2801,7 +2810,8 @@ def __getattr__(self, name): raise AttributeError( "'{0}' object has no attribute '{1}'".format( - self.__class__.__name__, str(name), + self.__class__.__name__, + str(name), ) ) @@ -2886,9 +2896,7 @@ def __repr__(self): class ValueEntry(VersionedPanObject): - """Base class for objects that only have a value element. - - """ + """Base class for objects that only have a value element.""" ROOT = Root.VSYS SUFFIX = ENTRY @@ -3722,7 +3730,12 @@ def get_device_version(self): @classmethod def create_from_device( - cls, hostname, api_username=None, api_password=None, api_key=None, port=443, + cls, + hostname, + api_username=None, + api_password=None, + api_key=None, + port=443, ): """Factory method to create a :class:`panos.firewall.Firewall` or :class:`panos.panorama.Panorama` object from a live device @@ -3744,18 +3757,33 @@ def create_from_device( # Create generic PanDevice to connect and get information from panos import firewall, panorama - device = PanDevice(hostname, api_username, api_password, api_key, port,) + device = PanDevice( + hostname, + api_username, + api_password, + api_key, + port, + ) system_info = device.refresh_system_info() version = system_info[0] model = system_info[1] if model == "Panorama" or model.startswith("M-"): instance = panorama.Panorama( - hostname, api_username, api_password, device.api_key, port, + hostname, + api_username, + api_password, + device.api_key, + port, ) else: serial = system_info[2] instance = firewall.Firewall( - hostname, api_username, api_password, device.api_key, serial, port, + hostname, + api_username, + api_password, + device.api_key, + serial, + port, ) instance._set_version_and_version_info(version) return instance @@ -3903,10 +3931,16 @@ def method(self, *args, **kwargs): def classify_exception(self, e): if str(e) == "Invalid credentials.": - return err.PanInvalidCredentials(str(e), pan_device=self.pan_device,) + return err.PanInvalidCredentials( + str(e), + pan_device=self.pan_device, + ) elif str(e).startswith("URLError:"): if str(e).endswith("timed out"): - return err.PanConnectionTimeout(str(e), pan_device=self.pan_device,) + return err.PanConnectionTimeout( + str(e), + pan_device=self.pan_device, + ) else: # This could be that we have an old version of OpenSSL # that doesn't support TLSv1.1, so check for that and give @@ -4836,7 +4870,12 @@ def _commit( logger.debug( self.id + ": commit requested: commit_all:%s sync:%s sync_all:%s cmd:%s" - % (str(commit_all), str(sync), str(sync_all), cmd,) + % ( + str(commit_all), + str(sync), + str(sync_all), + cmd, + ) ) if commit_all: action = "all" @@ -5603,7 +5642,8 @@ def is_ready(self, minutes=None, seconds=None): end = None if minutes is not None or seconds is not None: end = datetime.datetime.now() + datetime.timedelta( - minutes=minutes or 0, seconds=seconds or 0, + minutes=minutes or 0, + seconds=seconds or 0, ) while True: diff --git a/panos/device.py b/panos/device.py index dca3688f..2e1c7e6e 100644 --- a/panos/device.py +++ b/panos/device.py @@ -1290,7 +1290,11 @@ def _setup(self): ) ) params.append( - VersionedParamPath("disabled", vartype="yesno", path="disabled",), + VersionedParamPath( + "disabled", + vartype="yesno", + path="disabled", + ), ) self._params = tuple(params) @@ -1436,7 +1440,10 @@ def _setup(self): "facility", default="LOG_USER", path="facility", - values=["LOG_USER",] + ["LOG_LOCAL{0}".format(x) for x in range(8)], + values=[ + "LOG_USER", + ] + + ["LOG_LOCAL{0}".format(x) for x in range(8)], ) ) @@ -2244,12 +2251,28 @@ def _setup(self): ) params.append( VersionedParamPath( - "username_field_value", path="username-field/{username_field}", + "username_field_value", + path="username-field/{username_field}", + ) + ) + params.append( + VersionedParamPath( + "domain", + path="domain", + ) + ) + params.append( + VersionedParamPath( + "use_crl", + path="use-crl", + ) + ) + params.append( + VersionedParamPath( + "use_ocsp", + path="use-ocsp", ) ) - params.append(VersionedParamPath("domain", path="domain",)) - params.append(VersionedParamPath("use_crl", path="use-crl",)) - params.append(VersionedParamPath("use_ocsp", path="use-ocsp",)) params.append( VersionedParamPath( "crl_receive_timeout", @@ -2276,27 +2299,50 @@ def _setup(self): ) params.append( VersionedParamPath( - "block_unknown_certificate", vartype="yesno", path="block-unknown-cert", + "block_unknown_certificate", + vartype="yesno", + path="block-unknown-cert", ) ) params.append( VersionedParamPath( - "block_certificate_timeout", vartype="yesno", path="block-timeout-cert", + "block_certificate_timeout", + vartype="yesno", + path="block-timeout-cert", ) ) params.append( - VersionedParamPath("block_unauthenticated_certificate", exclude=True,) + VersionedParamPath( + "block_unauthenticated_certificate", + exclude=True, + ) ) params[-1].add_profile( - "7.1.0", vartype="yesno", path="block-unauthenticated-cert", + "7.1.0", + vartype="yesno", + path="block-unauthenticated-cert", + ) + params.append( + VersionedParamPath( + "block_expired_certificate", + exclude=True, + ) ) - params.append(VersionedParamPath("block_expired_certificate", exclude=True,)) params[-1].add_profile( - "8.1.0", vartype="yesno", path="block-expired-cert", + "8.1.0", + vartype="yesno", + path="block-expired-cert", + ) + params.append( + VersionedParamPath( + "ocsp_exclude_nonce", + exclude=True, + ) ) - params.append(VersionedParamPath("ocsp_exclude_nonce", exclude=True,)) params[-1].add_profile( - "9.0.0", path="ocsp-exclude-nonce", vartype="yesno", + "9.0.0", + path="ocsp-exclude-nonce", + vartype="yesno", ) self._params = tuple(params) @@ -2323,13 +2369,27 @@ def _setup(self): # params params = [] - params.append(VersionedParamPath("default_ocsp_url", path="default-ocsp-url",)) params.append( - VersionedParamPath("ocsp_verify_certificate", path="ocsp-verify-cert",) + VersionedParamPath( + "default_ocsp_url", + path="default-ocsp-url", + ) + ) + params.append( + VersionedParamPath( + "ocsp_verify_certificate", + path="ocsp-verify-cert", + ) + ) + params.append( + VersionedParamPath( + "template_name", + exclude=True, + ) ) - params.append(VersionedParamPath("template_name", exclude=True,)) params[-1].add_profile( - "9.0.0", path="template-name", + "9.0.0", + path="template-name", ) self._params = tuple(params) @@ -2364,7 +2424,8 @@ def _setup(self): params.append( VersionedParamPath( - "forward_trust_certificate_rsa", path="forward-trust-certificate/rsa", + "forward_trust_certificate_rsa", + path="forward-trust-certificate/rsa", ) ) params.append( @@ -2387,13 +2448,17 @@ def _setup(self): ) params.append( VersionedParamPath( - "root_ca_excludes", vartype="member", path="root-ca-exclude-list", + "root_ca_excludes", + vartype="member", + path="root-ca-exclude-list", ) ) # Only option present on Panorama. params.append( VersionedParamPath( - "trusted_root_cas", vartype="member", path="trusted-root-CA", + "trusted_root_cas", + vartype="member", + path="trusted-root-CA", ) ) params.append( @@ -2428,8 +2493,19 @@ def _setup(self): # params params = [] - params.append(VersionedParamPath("description", path="description",)) - params.append(VersionedParamPath("exclude", vartype="yesno", path="exclude",)) + params.append( + VersionedParamPath( + "description", + path="description", + ) + ) + params.append( + VersionedParamPath( + "exclude", + vartype="yesno", + path="exclude", + ) + ) self._params = tuple(params) @@ -2459,9 +2535,19 @@ def _setup(self): params = [] params.append( - VersionedParamPath("password_hash", vartype="encrypted", path="phash",) + VersionedParamPath( + "password_hash", + vartype="encrypted", + path="phash", + ) + ) + params.append( + VersionedParamPath( + "disabled", + vartype="yesno", + path="disabled", + ) ) - params.append(VersionedParamPath("disabled", vartype="yesno", path="disabled",)) self._params = tuple(params) @@ -2502,6 +2588,12 @@ def _setup(self): # params params = [] - params.append(VersionedParamPath("users", vartype="member", path="user",)) + params.append( + VersionedParamPath( + "users", + vartype="member", + path="user", + ) + ) self._params = tuple(params) diff --git a/panos/ha.py b/panos/ha.py index 824d9734..9a072e66 100644 --- a/panos/ha.py +++ b/panos/ha.py @@ -560,5 +560,6 @@ def _setup(self): # stubs self._stubs.add_profile( - "0.0.0", "interface/ha1", + "0.0.0", + "interface/ha1", ) diff --git a/panos/network.py b/panos/network.py index 05d2cf49..b184ecfd 100644 --- a/panos/network.py +++ b/panos/network.py @@ -153,22 +153,48 @@ def _setup(self): ) ) params.append( - VersionedParamPath("enable_packet_buffer_protection", exclude=True,) + VersionedParamPath( + "enable_packet_buffer_protection", + exclude=True, + ) ) params[-1].add_profile( - "8.0.0", path="network/enable-packet-buffer-protection", vartype="yesno", + "8.0.0", + path="network/enable-packet-buffer-protection", + vartype="yesno", + ) + params.append( + VersionedParamPath( + "enable_device_identification", + exclude=True, + ) ) - params.append(VersionedParamPath("enable_device_identification", exclude=True,)) params[-1].add_profile( - "10.0.0", path="enable-device-identification", vartype="yesno", + "10.0.0", + path="enable-device-identification", + vartype="yesno", + ) + params.append( + VersionedParamPath( + "device_include_acl", + exclude=True, + ) ) - params.append(VersionedParamPath("device_include_acl", exclude=True,)) params[-1].add_profile( - "10.0.0", path="device-acl/include-list", vartype="member", + "10.0.0", + path="device-acl/include-list", + vartype="member", + ) + params.append( + VersionedParamPath( + "device_exclude_acl", + exclude=True, + ) ) - params.append(VersionedParamPath("device_exclude_acl", exclude=True,)) params[-1].add_profile( - "10.0.0", path="device-acl/exclude-acl", vartype="member", + "10.0.0", + path="device-acl/exclude-acl", + vartype="member", ) self._params = tuple(params) @@ -1377,7 +1403,12 @@ def _setup(self): "mode", path="{mode}", default="layer3", - values=["layer3", "layer2", "virtual-wire", "ha",], + values=[ + "layer3", + "layer2", + "virtual-wire", + "ha", + ], ) ) params.append( @@ -2179,7 +2210,11 @@ def _setup(self): VersionedParamPath("enable", path="enable", default=True, vartype="yesno") ) params.append( - VersionedParamPath("reject_default_route", default=True, vartype="yesno",) + VersionedParamPath( + "reject_default_route", + default=True, + vartype="yesno", + ) ) params.append( VersionedParamPath( @@ -4211,7 +4246,11 @@ def _setup(self): ) params[-1].add_profile( "8.1.0", - values=("ip", "dynamic", "fqdn",), + values=( + "ip", + "dynamic", + "fqdn", + ), path="peer-address/{peer_ip_type}", ) params.append( @@ -4617,7 +4656,14 @@ def _setup(self): params.append( VersionedParamPath( "mk_esp_encryption", - values=("des", "3des", "aes128", "aes192", "aes256", "null",), + values=( + "des", + "3des", + "aes128", + "aes192", + "aes256", + "null", + ), path="{type}/{mk_protocol}/encryption/algorithm", ) ) @@ -5457,6 +5503,8 @@ def _setup(self): params = [] - params.append(VersionedParamPath("interface", path="interface"),) + params.append( + VersionedParamPath("interface", path="interface"), + ) self._params = tuple(params) diff --git a/panos/objects.py b/panos/objects.py index 014466ec..e5509cb6 100644 --- a/panos/objects.py +++ b/panos/objects.py @@ -240,7 +240,9 @@ def _setup(self): ) ) params[-1].add_profile( - "8.1.0", path="protocol/{protocol}", values=["tcp", "udp", "sctp"], + "8.1.0", + path="protocol/{protocol}", + values=["tcp", "udp", "sctp"], ) params.append( VersionedParamPath("source_port", path="protocol/{protocol}/source-port") @@ -269,14 +271,20 @@ def _setup(self): params[-1].add_profile( "8.1.0", vartype="int", - condition={"enable_override_timeout": "yes", "protocol": "tcp",}, + condition={ + "enable_override_timeout": "yes", + "protocol": "tcp", + }, path="protocol/{protocol}/override/{enable_override_timeout}/halfclose-timeout", ) params.append(VersionedParamPath("override_time_wait_timeout", exclude=True)) params[-1].add_profile( "8.1.0", vartype="int", - condition={"enable_override_timeout": "yes", "protocol": "tcp",}, + condition={ + "enable_override_timeout": "yes", + "protocol": "tcp", + }, path="protocol/{protocol}/override/{enable_override_timeout}/timewait-timeout", ) @@ -355,7 +363,8 @@ def _setup(self): # xpaths self._xpaths.add_profile(value="/application") self._xpaths.add_profile( - value='//*[contains(local-name(), "application")]', parents=("Predefined",), + value='//*[contains(local-name(), "application")]', + parents=("Predefined",), ) # params @@ -651,7 +660,8 @@ def _setup(self): # xpaths self._xpaths.add_profile(value="/application-container") self._xpaths.add_profile( - value='//*[contains(local-name(), "application")]', parents=("Predefined",), + value='//*[contains(local-name(), "application")]', + parents=("Predefined",), ) # params @@ -911,7 +921,9 @@ def _setup(self): VersionedParamPath( "action_type", default="tagging", - values=["tagging",], + values=[ + "tagging", + ], path="type/{action_type}", ) ) @@ -1201,7 +1213,10 @@ def _setup(self): params.append( VersionedParamPath( - "edl_type", default="ip", path="type", values=("ip", "domain", "url"), + "edl_type", + default="ip", + path="type", + values=("ip", "domain", "url"), ), ) params[-1].add_profile( @@ -1214,38 +1229,77 @@ def _setup(self): path="type/{edl_type}", values=("ip", "domain", "url", "predefined-ip", "predefined-url"), ) - params.append(VersionedParamPath("description", path="description",),) + params.append( + VersionedParamPath( + "description", + path="description", + ), + ) params[-1].add_profile( - "8.0.0", path="type/{edl_type}/description", + "8.0.0", + path="type/{edl_type}/description", + ) + params.append( + VersionedParamPath( + "source", + path="url", + ), ) - params.append(VersionedParamPath("source", path="url",),) params[-1].add_profile( - "8.0.0", path="type/{edl_type}/url", + "8.0.0", + path="type/{edl_type}/url", + ) + params.append( + VersionedParamPath( + "exceptions", + exclude=True, + ), ) - params.append(VersionedParamPath("exceptions", exclude=True,),) params[-1].add_profile( - "8.0.0", vartype="member", path="type/{edl_type}/exception-list", + "8.0.0", + vartype="member", + path="type/{edl_type}/exception-list", + ) + params.append( + VersionedParamPath( + "certificate_profile", + exclude=True, + ) ) - params.append(VersionedParamPath("certificate_profile", exclude=True,)) params[-1].add_profile( "8.0.0", path="type/{edl_type}/certificate-profile", condition={"edl_type": ["ip", "domain", "url"]}, ) - params.append(VersionedParamPath("username", exclude=True,)) + params.append( + VersionedParamPath( + "username", + exclude=True, + ) + ) params[-1].add_profile( "8.0.0", path="type/{edl_type}/auth/username", condition={"edl_type": ["ip", "domain", "url"]}, ) - params.append(VersionedParamPath("password", exclude=True,)) + params.append( + VersionedParamPath( + "password", + exclude=True, + ) + ) params[-1].add_profile( "8.0.0", path="type/{edl_type}/auth/password", vartype="encrypted", condition={"edl_type": ["ip", "domain", "url"]}, ) - params.append(VersionedParamPath("expand_domain", exclude=True,),) + params.append( + VersionedParamPath( + "expand_domain", + exclude=True, + ), + ) params[-1].add_profile( "9.0.0", path="type/{edl_type}/expand-domain", @@ -1308,7 +1362,10 @@ def _setup(self): "friday", "saturday", ), - condition={"edl_type": ["ip", "domain", "url"], "repeat": "weekly",}, + condition={ + "edl_type": ["ip", "domain", "url"], + "repeat": "weekly", + }, ) params.append( VersionedParamPath( @@ -1322,7 +1379,10 @@ def _setup(self): "8.0.0", vartype="int", path="type/{edl_type}/recurring/{repeat}/day-of-month", - condition={"edl_type": ["ip", "domain", "url"], "repeat": "monthly",}, + condition={ + "edl_type": ["ip", "domain", "url"], + "repeat": "monthly", + }, ) self._params = tuple(params) diff --git a/panos/plugins.py b/panos/plugins.py index b6e8ddc5..8ef7b16b 100644 --- a/panos/plugins.py +++ b/panos/plugins.py @@ -61,7 +61,10 @@ def _get_jobs(self, jobtype, svc): """ dev = self.obj.nearest_pandevice() - res = dev.op(XML.format(jobtype=jobtype, svc=svc), cmd_xml=False,) + res = dev.op( + XML.format(jobtype=jobtype, svc=svc), + cmd_xml=False, + ) logger.debug("%s jobs for %s: %s", jobtype, svc, ET.tostring(res)) status = res.find("./result/result/status") if status is None or (status is not None and status.text != "pass"): @@ -225,7 +228,9 @@ def _setup(self): ) params.append( VersionedParamPath( - "multi_tenant_enable", vartype="yesno", path="multi-tenant-enable", + "multi_tenant_enable", + vartype="yesno", + path="multi-tenant-enable", ) ) @@ -250,10 +255,18 @@ def _setup(self): # params params = [] params.append( - VersionedParamPath("device_groups", vartype="member", path="device-groups",) + VersionedParamPath( + "device_groups", + vartype="member", + path="device-groups", + ) ) params.append( - VersionedParamPath("templates", vartype="member", path="templates",) + VersionedParamPath( + "templates", + vartype="member", + path="templates", + ) ) self._params = tuple(params) @@ -280,20 +293,47 @@ def _setup(self): # params params = [] - params.append(VersionedParamPath("access_domain", path="access-domain",)) + params.append( + VersionedParamPath( + "access_domain", + path="access-domain", + ) + ) - params.append(VersionedParamPath("bandwidth", vartype="int", path="bandwidth",)) params.append( - VersionedParamPath("bandwidth_adem", vartype="int", path="bandwidth-adem",) + VersionedParamPath( + "bandwidth", + vartype="int", + path="bandwidth", + ) + ) + params.append( + VersionedParamPath( + "bandwidth_adem", + vartype="int", + path="bandwidth-adem", + ) + ) + params.append( + VersionedParamPath( + "bandwidth_cleanpipe", + vartype="int", + path="bandwidth-clean-pipe", + ) ) params.append( VersionedParamPath( - "bandwidth_cleanpipe", vartype="int", path="bandwidth-clean-pipe", + "users", + vartype="int", + path="users", ) ) - params.append(VersionedParamPath("users", vartype="int", path="users",)) params.append( - VersionedParamPath("adem_users", vartype="int", path="adem-users",) + VersionedParamPath( + "adem_users", + vartype="int", + path="adem-users", + ) ) self._params = tuple(params) @@ -613,7 +653,9 @@ def _setup(self): params.append( VersionedParamPath( - "same_as_primary", vartype="yesno", path="same-as-primary", + "same_as_primary", + vartype="yesno", + path="same-as-primary", ) ) params.append(VersionedParamPath("peer_ip_address", path="peer-ip-address")) diff --git a/panos/policies.py b/panos/policies.py index 9de9d9a1..5538dcfe 100644 --- a/panos/policies.py +++ b/panos/policies.py @@ -48,7 +48,10 @@ def _setup(self, name=None, elm=None): def refresh(self, elm=None): if elm is None and self.obj is not None: self.obj.parent.opstate.hit_count.refresh( - self.obj.HIT_COUNT_STYLE, [self.name,], + self.obj.HIT_COUNT_STYLE, + [ + self.name, + ], ) else: self._refresh_xml(elm) @@ -278,7 +281,8 @@ def current(self): ET.SubElement(sub, "xpath").text = self.obj.xpath() ans = self.obj.nearest_pandevice().op( - ET.tostring(cmd, encoding="utf-8"), cmd_xml=False, + ET.tostring(cmd, encoding="utf-8"), + cmd_xml=False, ) resp = ans.find("./result/entry/comment") @@ -298,7 +302,8 @@ def update(self, comment): ET.SubElement(sub, "comment").text = comment self.obj.nearest_pandevice().op( - ET.tostring(cmd, encoding="utf-8"), cmd_xml=False, + ET.tostring(cmd, encoding="utf-8"), + cmd_xml=False, ) @@ -384,7 +389,12 @@ def _setup(self): for var_name, path in any_defaults: params.append( VersionedParamPath( - var_name, default=["any",], vartype="member", path=path + var_name, + default=[ + "any", + ], + vartype="member", + path=path, ) ) @@ -406,7 +416,12 @@ def _setup(self): ) params.append( VersionedParamPath( - "category", default=["any",], vartype="member", path="category" + "category", + default=[ + "any", + ], + vartype="member", + path="category", ) ) params.append(VersionedParamPath("action", path="action")) @@ -469,11 +484,23 @@ def _setup(self): params.append(VersionedParamPath("uuid", exclude=True)) params[-1].add_profile("9.0.0", vartype="attrib", path="uuid") params.append( - VersionedParamPath("source_devices", default=["any",], exclude=True) + VersionedParamPath( + "source_devices", + default=[ + "any", + ], + exclude=True, + ) ) params[-1].add_profile("10.0.0", vartype="member", path="source-hip") params.append( - VersionedParamPath("destination_devices", default=["any",], exclude=True) + VersionedParamPath( + "destination_devices", + default=[ + "any", + ], + exclude=True, + ) ) params[-1].add_profile("10.0.0", vartype="member", path="destination-hip") params.append(VersionedParamPath("group_tag", exclude=True)) @@ -577,7 +604,12 @@ def _setup(self): ) params.append( VersionedParamPath( - "fromzone", default=["any",], vartype="member", path="from" + "fromzone", + default=[ + "any", + ], + vartype="member", + path="from", ) ) params.append(VersionedParamPath("tozone", vartype="member", path="to")) @@ -585,12 +617,22 @@ def _setup(self): params.append(VersionedParamPath("service", default="any", path="service")) params.append( VersionedParamPath( - "source", default=["any",], vartype="member", path="source" + "source", + default=[ + "any", + ], + vartype="member", + path="source", ) ) params.append( VersionedParamPath( - "destination", default=["any",], vartype="member", path="destination" + "destination", + default=[ + "any", + ], + vartype="member", + path="destination", ) ) params.append( @@ -894,7 +936,12 @@ def _setup(self): for var_name, path in any_defaults: params.append( VersionedParamPath( - var_name, default=["any",], vartype="member", path=path + var_name, + default=[ + "any", + ], + vartype="member", + path=path, ) ) params.append(VersionedParamPath("application", path="application")) @@ -1177,43 +1224,103 @@ def _setup(self): params.append(VersionedParamPath("uuid", exclude=True)) params[-1].add_profile("9.0.0", vartype="attrib", path="uuid") params.append( - VersionedParamPath("source_zones", vartype="member", path="from",) + VersionedParamPath( + "source_zones", + vartype="member", + path="from", + ) ) params.append( - VersionedParamPath("source_addresses", vartype="member", path="source",) + VersionedParamPath( + "source_addresses", + vartype="member", + path="source", + ) ) params.append( - VersionedParamPath("negate_source", vartype="yesno", path="negate-source",) + VersionedParamPath( + "negate_source", + vartype="yesno", + path="negate-source", + ) ) params.append( - VersionedParamPath("source_users", vartype="member", path="source-user",) + VersionedParamPath( + "source_users", + vartype="member", + path="source-user", + ) + ) + params.append( + VersionedParamPath( + "source_hip", + exclude=True, + ) ) - params.append(VersionedParamPath("source_hip", exclude=True,)) params[-1].add_profile( - "10.0.0", path="source-hip", vartype="member", + "10.0.0", + path="source-hip", + vartype="member", ) params.append( - VersionedParamPath("destination_zones", vartype="member", path="to",) + VersionedParamPath( + "destination_zones", + vartype="member", + path="to", + ) + ) + params.append( + VersionedParamPath( + "destination_addresses", + vartype="member", + path="destination", + ) ) params.append( VersionedParamPath( - "destination_addresses", vartype="member", path="destination", + "negate_destination", + vartype="yesno", + path="negate-destination", ) ) params.append( VersionedParamPath( - "negate_destination", vartype="yesno", path="negate-destination", + "destination_hip", + exclude=True, ) ) - params.append(VersionedParamPath("destination_hip", exclude=True,)) params[-1].add_profile( - "10.0.0", path="destination-hip", vartype="member", + "10.0.0", + path="destination-hip", + vartype="member", + ) + params.append( + VersionedParamPath( + "tags", + vartype="member", + path="tag", + ) ) - params.append(VersionedParamPath("tags", vartype="member", path="tag",)) - params.append(VersionedParamPath("disabled", vartype="yesno", path="disabled",)) - params.append(VersionedParamPath("services", vartype="member", path="service",)) params.append( - VersionedParamPath("url_categories", vartype="member", path="category",) + VersionedParamPath( + "disabled", + vartype="yesno", + path="disabled", + ) + ) + params.append( + VersionedParamPath( + "services", + vartype="member", + path="service", + ) + ) + params.append( + VersionedParamPath( + "url_categories", + vartype="member", + path="category", + ) ) params.append( VersionedParamPath( @@ -1232,38 +1339,79 @@ def _setup(self): VersionedParamPath( "decryption_type", path="type/{decryption_type}", - values=("ssl-forward-proxy", "ssh-proxy", "ssl-inbound-inspection",), + values=( + "ssl-forward-proxy", + "ssh-proxy", + "ssl-inbound-inspection", + ), ) ) params.append( VersionedParamPath( "ssl_certificate", path="type/{decryption_type}", - condition={"decryption_type": "ssl-inbound-inspection",}, + condition={ + "decryption_type": "ssl-inbound-inspection", + }, + ) + ) + params.append( + VersionedParamPath( + "decryption_profile", + path="profile", + ) + ) + params.append( + VersionedParamPath( + "forwarding_profile", + exclude=True, ) ) - params.append(VersionedParamPath("decryption_profile", path="profile",)) - params.append(VersionedParamPath("forwarding_profile", exclude=True,)) params[-1].add_profile( - "8.1.0", path="forwarding-profile", + "8.1.0", + path="forwarding-profile", + ) + params.append( + VersionedParamPath( + "group_tag", + exclude=True, + ) ) - params.append(VersionedParamPath("group_tag", exclude=True,)) params[-1].add_profile( - "9.0.0", path="group-tag", + "9.0.0", + path="group-tag", ) params.append( - VersionedParamPath("log_successful_tls_handshakes", exclude=True,) + VersionedParamPath( + "log_successful_tls_handshakes", + exclude=True, + ) ) params[-1].add_profile( - "10.0.0", path="log-success", vartype="yesno", + "10.0.0", + path="log-success", + vartype="yesno", + ) + params.append( + VersionedParamPath( + "log_failed_tls_handshakes", + exclude=True, + ) ) - params.append(VersionedParamPath("log_failed_tls_handshakes", exclude=True,)) params[-1].add_profile( - "10.0.0", path="log-fail", vartype="yesno", + "10.0.0", + path="log-fail", + vartype="yesno", + ) + params.append( + VersionedParamPath( + "log_setting", + exclude=True, + ) ) - params.append(VersionedParamPath("log_setting", exclude=True,)) params[-1].add_profile( - "10.0.0", path="log-setting", + "10.0.0", + path="log-setting", ) params.append( VersionedParamPath("negate_target", path="target/negate", vartype="yesno") @@ -1328,55 +1476,117 @@ def _setup(self): params.append(VersionedParamPath("uuid", exclude=True)) params[-1].add_profile("9.0.0", vartype="attrib", path="uuid") params.append( - VersionedParamPath("source_zones", vartype="member", path="from",) + VersionedParamPath( + "source_zones", + vartype="member", + path="from", + ) ) params.append( VersionedParamPath( - "source_addresses", default=["any",], vartype="member", path="source" + "source_addresses", + default=[ + "any", + ], + vartype="member", + path="source", ) ) params.append( - VersionedParamPath("negate_source", vartype="yesno", path="negate-source",) + VersionedParamPath( + "negate_source", + vartype="yesno", + path="negate-source", + ) ) params.append( - VersionedParamPath("destination_zones", vartype="member", path="to",) + VersionedParamPath( + "destination_zones", + vartype="member", + path="to", + ) ) params.append( VersionedParamPath( "destination_addresses", - default=["any",], + default=[ + "any", + ], vartype="member", path="destination", ) ) params.append( VersionedParamPath( - "negate_destination", vartype="yesno", path="negate-destination", + "negate_destination", + vartype="yesno", + path="negate-destination", + ) + ) + params.append( + VersionedParamPath( + "tag", + vartype="member", + path="tag", + ) + ) + params.append( + VersionedParamPath( + "disabled", + vartype="yesno", + path="disabled", + ) + ) + params.append( + VersionedParamPath( + "service", + vartype="member", + path="service", + ) + ) + params.append( + VersionedParamPath( + "source_hip", + exclude=True, ) ) - params.append(VersionedParamPath("tag", vartype="member", path="tag",)) - params.append(VersionedParamPath("disabled", vartype="yesno", path="disabled",)) - params.append(VersionedParamPath("service", vartype="member", path="service",)) - params.append(VersionedParamPath("source_hip", exclude=True,)) params[-1].add_profile( - "10.0.0", path="source-hip", vartype="member", + "10.0.0", + path="source-hip", + vartype="member", ) params.append( VersionedParamPath("source_users", vartype="member", path="source-user") ) params.append( - VersionedParamPath("url_categories", vartype="member", path="category",) + VersionedParamPath( + "url_categories", + vartype="member", + path="category", + ) + ) + params.append( + VersionedParamPath( + "group_tag", + exclude=True, + ) ) - params.append(VersionedParamPath("group_tag", exclude=True,)) params[-1].add_profile( - "9.0.0", path="group-tag", + "9.0.0", + path="group-tag", + ) + params.append( + VersionedParamPath( + "authentication_enforcement", + path="authentication-enforcement", + ) ) params.append( VersionedParamPath( - "authentication_enforcement", path="authentication-enforcement", + "timeout", + path="timeout", ) ) - params.append(VersionedParamPath("timeout", path="timeout",)) params.append( VersionedParamPath("negate_target", path="target/negate", vartype="yesno") ) diff --git a/panos/predefined.py b/panos/predefined.py index c522792d..3c8fd28d 100644 --- a/panos/predefined.py +++ b/panos/predefined.py @@ -111,10 +111,20 @@ def _refresh_application(self, name=None): ) def _refresh_service(self, name=None): - return self._refresh([(objects.ServiceObject, "service_objects", None),], name,) + return self._refresh( + [ + (objects.ServiceObject, "service_objects", None), + ], + name, + ) def _refresh_tag(self, name=None): - return self._refresh([(objects.Tag, "tag_objects", None),], name,) + return self._refresh( + [ + (objects.Tag, "tag_objects", None), + ], + name, + ) def refresh_application(self, name): """Refresh a Single Predefined Application diff --git a/panos/updater.py b/panos/updater.py index c40cab95..bbf025e4 100644 --- a/panos/updater.py +++ b/panos/updater.py @@ -121,7 +121,10 @@ def install(self, version, load_config=None, sync=False): ) response = self._op( 'request system software install %s version "%s"' - % ('load-config "{0}"'.format(load_config) if load_config else "", version,) + % ( + 'load-config "{0}"'.format(load_config) if load_config else "", + version, + ) ) if sync: result = self.pandevice.syncjob(response) diff --git a/panos/userid.py b/panos/userid.py index f8865ed1..4e42580f 100644 --- a/panos/userid.py +++ b/panos/userid.py @@ -718,7 +718,13 @@ def tag_user(self, user, tags, timeout=None, prefix=None): te = entry.find("./tag") break else: - entry = ET.SubElement(ru, "entry", {"user": user,}) + entry = ET.SubElement( + ru, + "entry", + { + "user": user, + }, + ) te = ET.SubElement(entry, "tag") # Now add in the tags with the specified timeout. @@ -761,7 +767,13 @@ def untag_user(self, user, tags=None, prefix=None): if entry.attrib["user"] == user: break else: - entry = ET.SubElement(uu, "entry", {"user": user,}) + entry = ET.SubElement( + uu, + "entry", + { + "user": user, + }, + ) # Do tag removal. te = entry.find("./tag") diff --git a/tests/live/conftest.py b/tests/live/conftest.py index d324367c..8d379d35 100644 --- a/tests/live/conftest.py +++ b/tests/live/conftest.py @@ -122,7 +122,10 @@ def init(): fw = random.choice(fws) panorama_fw_combinations.append( - ((pano, fw), desc(pano_version, fw_version),) + ( + (pano, fw), + desc(pano_version, fw_version), + ) ) diff --git a/tests/live/test_network.py b/tests/live/test_network.py index e62238df..f3f3ba69 100644 --- a/tests/live/test_network.py +++ b/tests/live/test_network.py @@ -6,7 +6,10 @@ class TestZoneBasic(testlib.FwFlow): def setup_state_obj(self, fw, state): - state.obj = network.Zone(testlib.random_name(), mode="layer3",) + state.obj = network.Zone( + testlib.random_name(), + mode="layer3", + ) fw.add(state.obj) def update_state_obj(self, fw, state): @@ -65,7 +68,10 @@ def create_dependencies(self, fw, state): state.parent.create() def setup_state_obj(self, fw, state): - state.obj = network.StaticMac(testlib.random_mac(), state.eths[0],) + state.obj = network.StaticMac( + testlib.random_mac(), + state.eths[0], + ) state.parent.add(state.obj) def update_state_obj(self, fw, state): @@ -102,7 +108,9 @@ def create_dependencies(self, fw, state): def setup_state_obj(self, fw, state): state.obj = network.Vlan( - testlib.random_name(), state.eths[0], state.vlan_interface.uid, + testlib.random_name(), + state.eths[0], + state.vlan_interface.uid, ) fw.add(state.obj) @@ -324,7 +332,9 @@ def create_dependencies(self, fw, state): state.eth = testlib.get_available_interfaces(fw)[0] state.parent = network.EthernetInterface( - state.eth, "layer3", ip=testlib.random_ip("/24"), + state.eth, + "layer3", + ip=testlib.random_ip("/24"), ) fw.add(state.parent) state.parent.create() @@ -368,7 +378,10 @@ def create_dependencies(self, fw, state): state.eth = None state.eth = testlib.get_available_interfaces(fw)[0] - state.parent = network.EthernetInterface(state.eth, "layer2",) + state.parent = network.EthernetInterface( + state.eth, + "layer2", + ) fw.add(state.parent) state.parent.create() @@ -376,7 +389,9 @@ def setup_state_obj(self, fw, state): tag = random.randint(1, 4000) name = "{0}.{1}".format(state.eth, tag) state.obj = network.Layer2Subinterface( - name, tag, comment="This is my L2 subinterface", + name, + tag, + comment="This is my L2 subinterface", ) state.parent.add(state.obj) @@ -861,14 +876,16 @@ def create_dependencies(self, fw, state): if self.WITH_BGP_IMPORT_RULE: state.import_rule = network.BgpPolicyImportRule( - name=testlib.random_name(), enable=True, + name=testlib.random_name(), + enable=True, ) state.bgp.add(state.import_rule) state.bgp.apply() if self.WITH_BGP_EXPORT_RULE: state.export_rule = network.BgpPolicyExportRule( - name=testlib.random_name(), enable=True, + name=testlib.random_name(), + enable=True, ) state.bgp.add(state.export_rule) state.bgp.apply() @@ -1409,7 +1426,9 @@ def setup_state_obj(self, fw, state): # 'match_afi': 'ip', # 'match_safi': 'ip', "match_route_table": "unicast", - "match_nexthop": [testlib.random_ip("/32"),], + "match_nexthop": [ + testlib.random_ip("/32"), + ], "match_from_peer": state.peer.name, "match_med": random.randint(0, 4294967295), "match_as_path_regex": "as-path-regex", @@ -1491,7 +1510,8 @@ class MakeBgpPolicyAddressPrefix(MakeVirtualRouter): def setup_state_obj(self, fw, state): state.obj = network.BgpPolicyAddressPrefix( - name=testlib.random_netmask(), exact=True, + name=testlib.random_netmask(), + exact=True, ) if self.WITH_BGP_IMPORT_RULE: state.import_rule.add(state.obj) @@ -1547,7 +1567,9 @@ def setup_state_obj(self, fw, state): ) advert.extend(prefixes) state.obj = network.BgpPolicyConditionalAdvertisement( - name=testlib.random_name(), enable=True, used_by=state.pg.name, + name=testlib.random_name(), + enable=True, + used_by=state.pg.name, ) state.obj.add(non_exist) state.obj.add(advert) @@ -1654,8 +1676,12 @@ class TestIkeCryptoProfile(testlib.FwFlow): def setup_state_obj(self, fw, state): state.obj = network.IkeCryptoProfile( testlib.random_name(), - authentication=["sha256",], - dh_group=["group1",], + authentication=[ + "sha256", + ], + dh_group=[ + "group1", + ], lifetime_minutes=42, ) fw.add(state.obj) @@ -1741,7 +1767,8 @@ def create_dependencies(self, fw, state): raise ValueError("IkeGateway not supported for version < 7.0") state.lbi = network.LoopbackInterface( - "loopback.{0}".format(random.randint(5, 20)), ipv6_enabled=True, + "loopback.{0}".format(random.randint(5, 20)), + ipv6_enabled=True, ) state.lbi.add(network.IPv6Address(testlib.random_ipv6())) state.lbi.add(network.IPv6Address(testlib.random_ipv6())) diff --git a/tests/live/test_objects.py b/tests/live/test_objects.py index c0bd5993..daa4dad9 100644 --- a/tests/live/test_objects.py +++ b/tests/live/test_objects.py @@ -29,7 +29,8 @@ def create_dependencies(self, dev, state): def setup_state_obj(self, dev, state): state.obj = objects.AddressGroup( - testlib.random_name(), [x.name for x in state.aos[:2]], + testlib.random_name(), + [x.name for x in state.aos[:2]], ) dev.add(state.obj) @@ -71,7 +72,8 @@ def setup_state_obj(self, dev, state): def update_state_obj(self, dev, state): state.obj.dynamic_value = "'{0}' and '{1}'".format( - state.tags[2].name, state.tags[3].name, + state.tags[2].name, + state.tags[3].name, ) state.obj.tag = state.tags[1].name @@ -86,7 +88,9 @@ def cleanup_dependencies(self, dev, state): class TestTag(testlib.DevFlow): def setup_state_obj(self, dev, state): state.obj = objects.Tag( - testlib.random_name(), color="color1", comments="My new tag", + testlib.random_name(), + color="color1", + comments="My new tag", ) dev.add(state.obj) diff --git a/tests/live/test_userid.py b/tests/live/test_userid.py index ead99456..7e2adc5a 100644 --- a/tests/live/test_userid.py +++ b/tests/live/test_userid.py @@ -67,7 +67,11 @@ def test_08_get_registered_ip(self, fw, state_map): test4 = set(fw.userid.get_registered_ip(ips, tags[0:5])) assert test4 == set(ips) test5 = set(fw.userid.get_registered_ip(ips[0], tags[0])) - assert test5 == set([ips[0],]) + assert test5 == set( + [ + ips[0], + ] + ) tests = [test1, test2, test3, test4, test5] assert len(test5) != 0 assert all([test1 >= x for x in tests]) diff --git a/tests/live/testlib.py b/tests/live/testlib.py index 3a23f20b..9138ea33 100644 --- a/tests/live/testlib.py +++ b/tests/live/testlib.py @@ -21,7 +21,9 @@ def random_ip(netmask=None): def random_netmask(): return "{0}.{1}.{2}.0/24".format( - random.randint(11, 150), random.randint(1, 200), random.randint(1, 200), + random.randint(11, 150), + random.randint(1, 200), + random.randint(1, 200), ) diff --git a/tests/test_base.py b/tests/test_base.py index 235a55d4..1eb0c6db 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -84,7 +84,13 @@ def test_add_without_children(self): ret_value = self.obj.add(child) self.assertEqual(child, ret_value) - self.verify_object(self.obj, name=OBJECT_NAME, children=[child,]) + self.verify_object( + self.obj, + name=OBJECT_NAME, + children=[ + child, + ], + ) self.verify_object(child, name=CHILD_NAME, parent=self.obj) def test_add_with_children(self): @@ -113,7 +119,13 @@ def test_insert_without_children(self): ret_val = self.obj.insert(0, child) self.assertEqual(child, ret_val) - self.verify_object(self.obj, name=OBJECT_NAME, children=[child,]) + self.verify_object( + self.obj, + name=OBJECT_NAME, + children=[ + child, + ], + ) self.verify_object(child, name=CHILD_NAME, parent=self.obj) def test_insert_with_children(self): @@ -215,7 +227,13 @@ def test_remove(self): ret_val = self.obj.remove(child2) self.assertIsNone(ret_val) - self.verify_object(self.obj, name=OBJECT_NAME, children=[child1,]) + self.verify_object( + self.obj, + name=OBJECT_NAME, + children=[ + child1, + ], + ) self.verify_object(child1, name=CHILD1_NAME, parent=self.obj) self.verify_object(child2, name=CHILD2_NAME) @@ -455,7 +473,9 @@ def test_apply_with_ha_sync(self, m_uid): self.assertIsNone(ret_val) m_panos.set_config_changed.assert_called_once_with() m_panos.active().xapi.edit.assert_called_once_with( - PanDeviceXpath, PanDeviceElementStr, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + PanDeviceElementStr, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath.assert_called_once_with() self.obj.element_str.assert_called_once_with() @@ -487,7 +507,9 @@ def test_apply_without_ha_sync(self, m_uid): self.assertIsNone(ret_val) m_panos.set_config_changed.assert_called_once_with() m_panos.xapi.edit.assert_called_once_with( - PanDeviceXpath, PanDeviceElementStr, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + PanDeviceElementStr, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath.assert_called_once_with() self.obj.element_str.assert_called_once_with() @@ -517,7 +539,9 @@ def test_create_with_ha_sync(self, m_uid): self.assertIsNone(ret_val) m_panos.set_config_changed.assert_called_once_with() m_panos.active().xapi.set.assert_called_once_with( - PanDeviceXpath, PanDeviceElementStr, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + PanDeviceElementStr, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath_short.assert_called_once_with() self.obj.element_str.assert_called_once_with() @@ -549,7 +573,9 @@ def test_create_without_ha_sync(self, m_uid): self.assertIsNone(ret_val) m_panos.set_config_changed.assert_called_once_with() m_panos.xapi.set.assert_called_once_with( - PanDeviceXpath, PanDeviceElementStr, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + PanDeviceElementStr, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath_short.assert_called_once_with() self.obj.element_str.assert_called_once_with() @@ -577,7 +603,8 @@ def test_delete_with_ha_sync_no_parent(self, m_uid): self.assertIsNone(ret_val) m_panos.set_config_changed.assert_called_once_with() m_panos.active().xapi.delete.assert_called_once_with( - PanDeviceXpath, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath.assert_called_once_with() for c in self.obj.children: @@ -607,7 +634,8 @@ def test_delete_with_ha_sync_and_parent(self, m_uid): self.obj.parent.remove.assert_called_once_with(self.obj) m_panos.set_config_changed.assert_called_once_with() m_panos.active().xapi.delete.assert_called_once_with( - PanDeviceXpath, retry_on_peer=self.obj.HA_SYNC, + PanDeviceXpath, + retry_on_peer=self.obj.HA_SYNC, ) self.obj.xpath.assert_called_once_with() for c in self.obj.children: @@ -1152,7 +1180,8 @@ def test_no_fallback_raises_value_error(self): parent = None obj = Base.ParentAwareXpath() obj.add_profile( - parents=("ParentClass1",), value="/some/path", + parents=("ParentClass1",), + value="/some/path", ) self.assertRaises(ValueError, obj._get_versioned_value, (1, 0, 0), parent) @@ -1290,7 +1319,8 @@ def test_entry_refresh(self): ans = o.refresh_variable("entries") m.xapi.get.assert_called_once_with( - o.xpath() + "/multiple/entries", retry_on_peer=o.HA_SYNC, + o.xpath() + "/multiple/entries", + retry_on_peer=o.HA_SYNC, ) self.assertEqual(ans, o.entries) self.assertEqual(ans, ["one", "two"]) @@ -1303,7 +1333,8 @@ def test_member_refresh(self): ans = o.refresh_variable("members") m.xapi.get.assert_called_once_with( - o.xpath() + "/multiple/members", retry_on_peer=o.HA_SYNC, + o.xpath() + "/multiple/members", + retry_on_peer=o.HA_SYNC, ) self.assertEqual(ans, o.members) self.assertEqual(ans, ["first", "second"]) @@ -1314,7 +1345,8 @@ def test_int_refresh(self): ans = o.refresh_variable("someint") m.xapi.get.assert_called_once_with( - o.xpath() + "/someint", retry_on_peer=o.HA_SYNC, + o.xpath() + "/someint", + retry_on_peer=o.HA_SYNC, ) self.assertEqual(ans, o.someint) self.assertEqual(ans, 42) @@ -1334,7 +1366,8 @@ def test_string_refresh(self): ans = o.refresh_variable("action") m.xapi.get.assert_called_once_with( - o.xpath() + "/config/action", retry_on_peer=o.HA_SYNC, + o.xpath() + "/config/action", + retry_on_peer=o.HA_SYNC, ) self.assertEqual(ans, o.action) self.assertEqual(ans, "DENY") @@ -1345,7 +1378,8 @@ def test_nested_attrib_refresh(self): ans = o.refresh_variable("action_uuid") m.xapi.get.assert_called_once_with( - o.xpath() + "/config/action", retry_on_peer=o.HA_SYNC, + o.xpath() + "/config/action", + retry_on_peer=o.HA_SYNC, ) self.assertEqual(ans, o.action_uuid) self.assertEqual(ans, "1234-56-789") @@ -1382,7 +1416,14 @@ def test_values_are_unchanged_after_comparison(self): self.assertEqual(o2.members, ["d", "c"]) def test_str_list_field_is_equal(self): - o1 = MyVersionedObject("a", ["a",], ["c", "d"], 5) + o1 = MyVersionedObject( + "a", + [ + "a", + ], + ["c", "d"], + 5, + ) o2 = MyVersionedObject("a", "a", ["c", "d"], 5) self.assertTrue(o1.equal(o2)) @@ -1527,14 +1568,18 @@ def config(self, length=10, count=1, suffix="entry"): listing = [] for x in range(count): - obj = Base.PanObject("".join(random.choice(chars) for y in range(length)),) + obj = Base.PanObject( + "".join(random.choice(chars) for y in range(length)), + ) obj.parent = mock.Mock() listing.append(obj) # Now tweak the first element for the tests, the rest don't matter. obj = listing[0] - obj._gather_bulk_info = mock.Mock(return_value=(dev, listing, None),) + obj._gather_bulk_info = mock.Mock( + return_value=(dev, listing, None), + ) if suffix == "member": obj.SUFFIX = Base.MEMBER else: @@ -1551,7 +1596,8 @@ def test_delete_one_entry(self): dev.xapi.delete.assert_called_once() dev.xapi.delete.assert_called_once_with( - "/mock/xpath/entry[@name='{0}']".format(obj.uid), retry_on_peer=obj.HA_SYNC, + "/mock/xpath/entry[@name='{0}']".format(obj.uid), + retry_on_peer=obj.HA_SYNC, ) def test_delete_one_member(self): @@ -1642,7 +1688,12 @@ def test_ok(self, mocksleep): @mock.patch("time.sleep") def test_times_out(self, mocksleep): fw = Base.PanDevice("127.0.0.1", "admin", "secret", api_key="apikey") - fw.xapi.op = mock.Mock(side_effect=[Err.PanURLError, ValueError,],) + fw.xapi.op = mock.Mock( + side_effect=[ + Err.PanURLError, + ValueError, + ], + ) ans = fw.is_ready(seconds=0) diff --git a/tests/test_device_profile_xpaths.py b/tests/test_device_profile_xpaths.py index 87a14424..05bcc01a 100644 --- a/tests/test_device_profile_xpaths.py +++ b/tests/test_device_profile_xpaths.py @@ -60,9 +60,18 @@ OBJECTS = { SnmpServerProfile: [None, SnmpV2cServer, SnmpV3Server], - EmailServerProfile: [None, EmailServer,], - LdapServerProfile: [None, LdapServer,], - SyslogServerProfile: [None, SyslogServer,], + EmailServerProfile: [ + None, + EmailServer, + ], + LdapServerProfile: [ + None, + LdapServer, + ], + SyslogServerProfile: [ + None, + SyslogServer, + ], HttpServerProfile: [ None, HttpServer, diff --git a/tests/test_firewall.py b/tests/test_firewall.py index fa6d1162..e883dcb0 100644 --- a/tests/test_firewall.py +++ b/tests/test_firewall.py @@ -22,7 +22,9 @@ class TestFirewall(unittest.TestCase): def test_id_returns_serial(self): expected = "serial#" - fw = panos.firewall.Firewall(serial=expected,) + fw = panos.firewall.Firewall( + serial=expected, + ) ret_val = fw.id @@ -31,7 +33,9 @@ def test_id_returns_serial(self): def test_id_returns_hostname(self): expected = "hostName" - fw = panos.firewall.Firewall(hostname=expected,) + fw = panos.firewall.Firewall( + hostname=expected, + ) ret_val = fw.id diff --git a/tests/test_init.py b/tests/test_init.py index f7bf6a6e..b8642aaf 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -28,7 +28,6 @@ class TestPanOSVersion(unittest.TestCase): """ def setUp(self): - self.c1 = panos.PanOSVersion("7.0.0-c1") self.c2 = panos.PanOSVersion("7.0.0-c2") self.b1 = panos.PanOSVersion("7.0.0-b1") @@ -178,7 +177,8 @@ def test_base_is_single_key_value(self): for x in self.quotes(): self.assertEqual( - panos.string_to_xml("hello {0}world{0}".format(x), x), self._str(root), + panos.string_to_xml("hello {0}world{0}".format(x), x), + self._str(root), ) def test_base_root_with_one_key_value(self): @@ -188,7 +188,8 @@ def test_base_root_with_one_key_value(self): for x in self.quotes(): self.assertEqual( - panos.string_to_xml("foo bar {0}baz{0}".format(x), x), self._str(root), + panos.string_to_xml("foo bar {0}baz{0}".format(x), x), + self._str(root), ) def test_base_root_with_two_key_values(self): diff --git a/tests/test_integration.py b/tests/test_integration.py index 654de5c3..3dabf0e2 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -89,11 +89,26 @@ def setUp(self): self.assertEqual(self.firewall, self.address_object.parent) self.assertEqual([], self.address_object.children) self.assertEqual(self.device_group, self.firewall.parent) - self.assertEqual([self.address_object,], self.firewall.children) + self.assertEqual( + [ + self.address_object, + ], + self.firewall.children, + ) self.assertEqual(self.panorama, self.device_group.parent) - self.assertEqual([self.firewall,], self.device_group.children) + self.assertEqual( + [ + self.firewall, + ], + self.device_group.children, + ) self.assertEqual(None, self.panorama.parent) - self.assertEqual([self.device_group,], self.panorama.children) + self.assertEqual( + [ + self.device_group, + ], + self.panorama.children, + ) def test_nearest_pandevice_from_addressobject_in_pano_dg_fw_ao_chain(self): """Runs nearest_pandevice() on the AddressObject. @@ -325,7 +340,10 @@ def test_element_str_from_ethernetinterface_for_aggregate_group(self): def test_element_str_from_firewall_with_pano_parent_and_systemsettings_child(self): expected = b"".join( - [b'', b'',] + [ + b'', + b'', + ] ) fw = panos.firewall.Firewall( @@ -357,7 +375,10 @@ def test_element_str_from_firewall_without_serial_number_raises_error(self): def test_element_str_from_firewall_with_dg_pano_parents_and_multi_vsys(self): expected = b"".join( - [b'', b"",] + [ + b'', + b"", + ] ) fw = panos.firewall.Firewall( @@ -615,7 +636,11 @@ def test_set_xpath_from_arp_with_l3s_ei_fw_parents(self): def test_edit_xpath_from_firewall(self): # This is not a valid xpath, but its what should happen # if there is no parent - expected = "".join(["/devices/entry[@name='serial']",]) + expected = "".join( + [ + "/devices/entry[@name='serial']", + ] + ) fw = panos.firewall.Firewall("foo", vsys="vsys2", serial="serial") @@ -626,7 +651,11 @@ def test_edit_xpath_from_firewall(self): def test_set_xpath_from_firewall(self): # This is not a valid xpath, but its what should happen # if there is no parent - expected = "".join(["/devices",]) + expected = "".join( + [ + "/devices", + ] + ) fw = panos.firewall.Firewall("foo", vsys="vsys2", serial="serial") diff --git a/tests/test_opstate.py b/tests/test_opstate.py index f99d3276..bf579e2f 100644 --- a/tests/test_opstate.py +++ b/tests/test_opstate.py @@ -48,7 +48,9 @@ def _hit_count_fw_setup(*args): inner = "".join(ET.tostring(x, encoding="utf-8").decode("utf-8") for x in args) fw.op = mock.Mock( - return_value=ET.fromstring(HIT_COUNT_PREFIX + inner + HIT_COUNT_SUFFIX,) + return_value=ET.fromstring( + HIT_COUNT_PREFIX + inner + HIT_COUNT_SUFFIX, + ) ) rb = Rulebase() diff --git a/tests/test_params.py b/tests/test_params.py index 4073ae2e..e2231514 100644 --- a/tests/test_params.py +++ b/tests/test_params.py @@ -17,22 +17,74 @@ def _setup(self): params = [] - params.append(VersionedParamPath("uuid", vartype="attrib", path="uuid",),) - params.append(VersionedParamPath("size", vartype="int", path="size",),) - params.append(VersionedParamPath("listing", vartype="member", path="listing",),) - params.append(VersionedParamPath("pb1", vartype="exist", path="pb1",),) - params.append(VersionedParamPath("pb2", vartype="exist", path="pb2",),) - params.append(VersionedParamPath("live", vartype="yesno", path="live",),) params.append( - VersionedParamPath("disabled", vartype="yesno", path="disabled",), + VersionedParamPath( + "uuid", + vartype="attrib", + path="uuid", + ), + ) + params.append( + VersionedParamPath( + "size", + vartype="int", + path="size", + ), + ) + params.append( + VersionedParamPath( + "listing", + vartype="member", + path="listing", + ), + ) + params.append( + VersionedParamPath( + "pb1", + vartype="exist", + path="pb1", + ), + ) + params.append( + VersionedParamPath( + "pb2", + vartype="exist", + path="pb2", + ), + ) + params.append( + VersionedParamPath( + "live", + vartype="yesno", + path="live", + ), + ) + params.append( + VersionedParamPath( + "disabled", + vartype="yesno", + path="disabled", + ), + ) + params.append( + VersionedParamPath( + "uuid2", + vartype="attrib", + path="level-2/uuid", + ), ) params.append( - VersionedParamPath("uuid2", vartype="attrib", path="level-2/uuid",), + VersionedParamPath( + "age", + vartype="int", + path="level-2/age", + ), ) - params.append(VersionedParamPath("age", vartype="int", path="level-2/age",),) params.append( VersionedParamPath( - "interfaces", vartype="member", path="level-2/interface", + "interfaces", + vartype="member", + path="level-2/interface", ), ) @@ -84,7 +136,8 @@ def _refresh_xml(): # int at base level def test_render_int(): _verify_render( - FakeObject("test", size=5), '5', + FakeObject("test", size=5), + '5', ) @@ -111,7 +164,8 @@ def test_parse_member(): # exist at base level def test_render_exist(): _verify_render( - FakeObject("test", pb1=True), '', + FakeObject("test", pb1=True), + '', ) @@ -139,7 +193,8 @@ def test_parse_yesno(): # attrib def test_render_attrib(): _verify_render( - FakeObject("test", uuid="123-456"), '', + FakeObject("test", uuid="123-456"), + '', ) diff --git a/tests/test_predefined.py b/tests/test_predefined.py index 9ea6eded..1a1b913a 100644 --- a/tests/test_predefined.py +++ b/tests/test_predefined.py @@ -45,9 +45,13 @@ ( """//*[contains(local-name(), "application")]/entry[@name='{0}']""", '//*[contains(local-name(), "application")]/entry', - ApplicationContainer(name="ap container 1", applications=["func1", "func2"],), ApplicationContainer( - name="application container deux", applications=["a", "la", "mode"], + name="ap container 1", + applications=["func1", "func2"], + ), + ApplicationContainer( + name="application container deux", + applications=["a", "la", "mode"], ), ), ( @@ -97,8 +101,16 @@ ( None, "/tag/entry", - Tag(name="foo", color="color1", comments="First color",), - Tag(name="bar", color="color42", comments="Another color for another time",), + Tag( + name="foo", + color="color1", + comments="First color", + ), + Tag( + name="bar", + color="color42", + comments="Another color for another time", + ), ), ) @@ -142,7 +154,11 @@ def _fw(*args): prefix = "" suffix = "" inner = "".join(x.element_str().decode("utf-8") for x in args) - fw.xapi.get = mock.Mock(return_value=ET.fromstring(prefix + inner + suffix,)) + fw.xapi.get = mock.Mock( + return_value=ET.fromstring( + prefix + inner + suffix, + ) + ) return fw diff --git a/tests/test_standards.py b/tests/test_standards.py index 3fe2824d..c284b448 100644 --- a/tests/test_standards.py +++ b/tests/test_standards.py @@ -107,6 +107,7 @@ # -- Fixtures -- + # PanObject / VersionedPanObject that has no NAME. @pytest.fixture( scope="function", diff --git a/tests/test_userid.py b/tests/test_userid.py index 0f4af6a1..a3a2052a 100644 --- a/tests/test_userid.py +++ b/tests/test_userid.py @@ -67,8 +67,18 @@ def test_batch_tag_user(self): ) fw.xapi fw.userid.batch_start() - fw.userid.tag_user("user1", ["tag1",]) - fw.userid.tag_user("user2", ["tag1",]) + fw.userid.tag_user( + "user1", + [ + "tag1", + ], + ) + fw.userid.tag_user( + "user2", + [ + "tag1", + ], + ) def test_batch_untag_user(self): fw = panos.firewall.Firewall( @@ -76,8 +86,18 @@ def test_batch_untag_user(self): ) fw.xapi fw.userid.batch_start() - fw.userid.untag_user("user1", ["tag1",]) - fw.userid.untag_user("user2", ["tag1",]) + fw.userid.untag_user( + "user1", + [ + "tag1", + ], + ) + fw.userid.untag_user( + "user2", + [ + "tag1", + ], + ) if __name__ == "__main__": diff --git a/tests/test_versioning.py b/tests/test_versioning.py index e32575db..d4fe86a5 100644 --- a/tests/test_versioning.py +++ b/tests/test_versioning.py @@ -39,7 +39,8 @@ def test_empty_objects_are_equal(self): raise unittest.SkipTest("OLD_CLS does not have element_str()") self.assertEqual( - old.element_str(), new.element_str(), + old.element_str(), + new.element_str(), ) def test_positionally_populated_objects_are_equal(self): @@ -56,7 +57,8 @@ def test_positionally_populated_objects_are_equal(self): raise unittest.SkipTest("OLD_CLS does not have element_str()") self.assertEqual( - old.element_str(), new.element_str(), + old.element_str(), + new.element_str(), ) def test_keyword_populated_objects_are_equal(self): @@ -73,7 +75,8 @@ def test_keyword_populated_objects_are_equal(self): raise unittest.SkipTest("OLD_CLS does not have element_str()") self.assertEqual( - old.element_str(), new.element_str(), + old.element_str(), + new.element_str(), ) def test_parsing_old_elmstring_works(self): @@ -305,14 +308,18 @@ def test_add_profile_raises_error_on_adding_lower_version_after_adding_a_higher_ def test_security_rule_hip_profiles(): "security rules on 10.1.5 should not have hip-profiles" rule = panos.policies.SecurityRule( - name="test_rule", source=["0.0.0.0/0"], destination=["0.0.0.0/0"], + name="test_rule", + source=["0.0.0.0/0"], + destination=["0.0.0.0/0"], ) rule._UNKNOWN_PANOS_VERSION = (10, 1, 5) st = rule.element_str(False).decode() assert "" not in st rule = panos.policies.SecurityRule( - name="test_rule", source=["0.0.0.0/0"], destination=["0.0.0.0/0"], + name="test_rule", + source=["0.0.0.0/0"], + destination=["0.0.0.0/0"], ) rule._UNKNOWN_PANOS_VERSION = (9, 0, 0) st = rule.element_str(False).decode()