diff --git a/docs/conf.py b/docs/conf.py index d6b1e5ef3..c068c61aa 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -9,8 +9,8 @@ project = 'pyk' author = 'Runtime Verification, Inc' copyright = '2024, Runtime Verification, Inc' -version = '0.1.631' -release = '0.1.631' +version = '0.1.632' +release = '0.1.632' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/package/version b/package/version index 6c3237570..05fafe81e 100644 --- a/package/version +++ b/package/version @@ -1 +1 @@ -0.1.631 +0.1.632 diff --git a/pyproject.toml b/pyproject.toml index 04a2b81e1..03e9dc2b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pyk" -version = "0.1.631" +version = "0.1.632" description = "" authors = [ "Runtime Verification, Inc. ", diff --git a/src/pyk/proof/implies.py b/src/pyk/proof/implies.py index 9e27b9840..122554338 100644 --- a/src/pyk/proof/implies.py +++ b/src/pyk/proof/implies.py @@ -12,7 +12,7 @@ from ..prelude.kbool import BOOL, TRUE from ..prelude.ml import is_bottom, is_top, mlAnd, mlEquals, mlEqualsFalse, mlEqualsTrue from ..utils import ensure_dir_path -from .proof import Proof, ProofStatus, ProofSummary, Prover +from .proof import Proof, ProofStatus, ProofStep, ProofSummary, Prover, StepResult if TYPE_CHECKING: from collections.abc import Iterable, Mapping @@ -66,6 +66,32 @@ def status(self) -> ProofStatus: else: return ProofStatus.PASSED + def get_steps(self) -> Iterable[ProofStep]: + proof_type = type(self).__name__ + _LOGGER.info(f'Attempting {proof_type} {self.id}') + + if self.status is not ProofStatus.PENDING: + _LOGGER.info(f'{proof_type} finished {self.id}: {self.status}') + return [] + return [ + ImpliesProofStep( + antecedent=self.antecedent, + consequent=self.consequent, + proof_id=self.id, + bind_universally=self.bind_universally, + ) + ] + + def commit(self, result: StepResult) -> None: + proof_type = type(self).__name__ + if isinstance(result, ImpliesProofResult): + self.csubst = result.csubst + self.simplified_antecedent = result.simplified_antecedent + self.simplified_consequent = result.simplified_consequent + _LOGGER.info(f'{proof_type} finished {self.id}: {self.status}') + else: + raise ValueError('Incorrect result type') + @property def can_progress(self) -> bool: return self.simplified_antecedent is None or self.simplified_consequent is None @@ -372,6 +398,21 @@ def lines(self) -> list[str]: ] +@dataclass +class ImpliesProofResult(StepResult): + csubst: CSubst | None + simplified_antecedent: KInner | None + simplified_consequent: KInner | None + + +@dataclass +class ImpliesProofStep(ProofStep): + antecedent: KInner + consequent: KInner + bind_universally: bool + proof_id: str + + class ImpliesProver(Prover): proof: ImpliesProof @@ -379,40 +420,36 @@ def __init__(self, proof: ImpliesProof, kcfg_explore: KCFGExplore): super().__init__(kcfg_explore) self.proof = proof - def step_proof(self) -> None: - proof_type = type(self.proof).__name__ - _LOGGER.info(f'Attempting {proof_type} {self.proof.id}') - - if self.proof.status is not ProofStatus.PENDING: - _LOGGER.info(f'{proof_type} finished {self.proof.id}: {self.proof.status}') - return + def step_proof(self, step: ProofStep) -> StepResult: + assert isinstance(step, ImpliesProofStep) # to prove the equality, we check the implication of the form `constraints #Implies LHS #Equals RHS`, i.e. # "LHS equals RHS under these constraints" - antecedent_simplified_kast, _ = self.kcfg_explore.kast_simplify(self.proof.antecedent) - consequent_simplified_kast, _ = self.kcfg_explore.kast_simplify(self.proof.consequent) - self.proof.simplified_antecedent = antecedent_simplified_kast - self.proof.simplified_consequent = consequent_simplified_kast - _LOGGER.info(f'Simplified antecedent: {self.kcfg_explore.kprint.pretty_print(antecedent_simplified_kast)}') - _LOGGER.info(f'Simplified consequent: {self.kcfg_explore.kprint.pretty_print(consequent_simplified_kast)}') + simplified_antecedent, _ = self.kcfg_explore.kast_simplify(step.antecedent) + simplified_consequent, _ = self.kcfg_explore.kast_simplify(step.consequent) + _LOGGER.info(f'Simplified antecedent: {self.kcfg_explore.kprint.pretty_print(simplified_antecedent)}') + _LOGGER.info(f'Simplified consequent: {self.kcfg_explore.kprint.pretty_print(simplified_consequent)}') - if is_bottom(antecedent_simplified_kast): - _LOGGER.warning(f'Antecedent of implication (proof constraints) simplifies to #Bottom {self.proof.id}') - self.proof.csubst = CSubst(Subst({}), ()) + csubst: CSubst | None = None - elif is_top(consequent_simplified_kast): - _LOGGER.warning(f'Consequent of implication (proof equality) simplifies to #Top {self.proof.id}') - self.proof.csubst = CSubst(Subst({}), ()) + if is_bottom(simplified_antecedent): + _LOGGER.warning(f'Antecedent of implication (proof constraints) simplifies to #Bottom {step.proof_id}') + csubst = CSubst(Subst({}), ()) + + elif is_top(simplified_consequent): + _LOGGER.warning(f'Consequent of implication (proof equality) simplifies to #Top {step.proof_id}') + csubst = CSubst(Subst({}), ()) else: - # TODO: we should not be forced to include the dummy configuration in the antecedent and consequent dummy_config = self.kcfg_explore.kprint.definition.empty_config(sort=GENERATED_TOP_CELL) - result = self.kcfg_explore.cterm_implies( - antecedent=CTerm(config=dummy_config, constraints=[self.proof.simplified_antecedent]), - consequent=CTerm(config=dummy_config, constraints=[self.proof.simplified_consequent]), - bind_universally=self.proof.bind_universally, + implies_result = self.kcfg_explore.cterm_implies( + antecedent=CTerm(config=dummy_config, constraints=[simplified_antecedent]), + consequent=CTerm(config=dummy_config, constraints=[simplified_consequent]), + bind_universally=step.bind_universally, ) - if result is not None: - self.proof.csubst = result + if implies_result is not None: + csubst = implies_result - _LOGGER.info(f'{proof_type} finished {self.proof.id}: {self.proof.status}') + return ImpliesProofResult( + csubst=csubst, simplified_antecedent=simplified_antecedent, simplified_consequent=simplified_consequent + ) diff --git a/src/pyk/proof/proof.py b/src/pyk/proof/proof.py index 52f77477d..66cf3e91f 100644 --- a/src/pyk/proof/proof.py +++ b/src/pyk/proof/proof.py @@ -36,6 +36,14 @@ class Proof(ABC): _subproofs: dict[str, Proof] admitted: bool + @abstractmethod + def get_steps(self) -> Iterable[ProofStep]: + ... + + @abstractmethod + def commit(self, result: StepResult) -> None: + ... + @property def proof_subdir(self) -> Path | None: if self.proof_dir is None: @@ -286,25 +294,36 @@ def lines(self) -> list[str]: return [line for lines in (summary.lines for summary in self.summaries) for line in lines] +class StepResult: + ... + + +class ProofStep: + ... + + class Prover: kcfg_explore: KCFGExplore proof: Proof - def __init__(self, kcfg_explore: KCFGExplore): - self.kcfg_explore = kcfg_explore - @abstractmethod - def step_proof(self) -> None: + def step_proof(self, step: ProofStep) -> StepResult: ... + def __init__(self, kcfg_explore: KCFGExplore): + self.kcfg_explore = kcfg_explore + def advance_proof(self, max_iterations: int | None = None, fail_fast: bool = False) -> None: iterations = 0 while self.proof.can_progress: - if fail_fast and self.proof.failed: - _LOGGER.warning(f'Terminating proof early because fail_fast is set: {self.proof.id}') - return - if max_iterations is not None and max_iterations <= iterations: - return - iterations += 1 - self.step_proof() - self.proof.write_proof_data() + steps = self.proof.get_steps() + for step in steps: + iterations += 1 + result = self.step_proof(step) + self.proof.commit(result) + self.proof.write_proof_data() + if fail_fast and self.proof.failed: + _LOGGER.warning(f'Terminating proof early because fail_fast is set: {self.proof.id}') + return + if max_iterations is not None and max_iterations <= iterations: + return diff --git a/src/pyk/proof/reachability.py b/src/pyk/proof/reachability.py index 35e701760..9a75104f1 100644 --- a/src/pyk/proof/reachability.py +++ b/src/pyk/proof/reachability.py @@ -18,15 +18,17 @@ from ..prelude.ml import mlAnd, mlTop from ..utils import FrozenDict, ensure_dir_path, hash_str, shorten_hashes, single from .implies import ProofSummary, Prover, RefutationProof -from .proof import CompositeSummary, Proof, ProofStatus +from .proof import CompositeSummary, Proof, ProofStatus, ProofStep, StepResult if TYPE_CHECKING: from collections.abc import Iterable, Mapping from pathlib import Path from typing import Any, Final, TypeVar + from ..cterm import CSubst, CTerm from ..kast.outer import KClaim, KDefinition, KFlatModuleList, KRuleLike from ..kcfg import KCFGExplore + from ..kcfg.explore import ExtendResult from ..kcfg.kcfg import NodeIdLike T = TypeVar('T', bound='Proof') @@ -44,6 +46,7 @@ class APRProof(Proof, KCFGExploration): to specify an optional loop iteration bound for each loop in execution. """ + kcfg_explore: KCFGExplore node_refutations: dict[int, RefutationProof] # TODO _node_refutatations init: int target: int @@ -52,11 +55,16 @@ class APRProof(Proof, KCFGExploration): logs: dict[int, tuple[LogEntry, ...]] circularity: bool failure_info: APRFailureInfo | None + _checked_for_bounded: set[int] + _checked_for_terminal: set[int] + _checked_for_subsumption: set[int] + fast_check_subsumption: bool def __init__( self, id: str, kcfg: KCFG, + kcfg_explore: KCFGExplore, terminal: Iterable[int], init: NodeIdLike, target: NodeIdLike, @@ -68,10 +76,12 @@ def __init__( subproof_ids: Iterable[str] = (), circularity: bool = False, admitted: bool = False, + fast_check_subsumption: bool = False, ): Proof.__init__(self, id, proof_dir=proof_dir, subproof_ids=subproof_ids, admitted=admitted) KCFGExploration.__init__(self, kcfg, terminal) + self.kcfg_explore = kcfg_explore self.failure_info = None self.init = kcfg._resolve(init) self.target = kcfg._resolve(target) @@ -81,6 +91,11 @@ def __init__( self.circularity = circularity self.node_refutations = {} self.kcfg.cfg_dir = self.proof_subdir / 'kcfg' if self.proof_subdir else None + self.fast_check_subsumption = fast_check_subsumption + + self._checked_for_bounded = set() + self._checked_for_terminal = set() + self._checked_for_subsumption = set() if self.proof_dir is not None and self.proof_subdir is not None: ensure_dir_path(self.proof_dir) @@ -99,10 +114,122 @@ def __init__( assert type(subproof) is RefutationProof self.node_refutations[node_id] = subproof + + def nonzero_depth(self, node: KCFG.Node) -> bool: + return not self.kcfg.zero_depth_between(self.init, node.id) + + def _may_subsume(self, node: KCFG.Node) -> bool: + node_k_cell = node.cterm.try_cell('K_CELL') + target_k_cell = self.kcfg.node(self.target).cterm.try_cell('K_CELL') + if node_k_cell and target_k_cell and not target_k_cell.match(node_k_cell): + return False + return True + + def _check_terminal(self, node: KCFG.Node) -> None: + if node.id not in self._checked_for_terminal: + _LOGGER.info(f'Checking terminal: {node.id}') + self._checked_for_terminal.add(node.id) + if self.kcfg_explore.kcfg_semantics.is_terminal(node.cterm): + _LOGGER.info(f'Terminal node: {node.id}.') + self._terminal.add(node.id) + elif self.fast_check_subsumption and self._may_subsume(node): + _LOGGER.info(f'Marking node as terminal because of fast may subsume check {self.id}: {node.id}') + self._terminal.add(node.id) + + def _check_all_terminals(self) -> None: + for node in self.kcfg.nodes: + self._check_terminal(node) + + def get_steps(self) -> Iterable[APRProofStep]: + if not self.pending: + return [] + steps = [] + target_node = self.kcfg.node(self.target) + for curr_node in self.pending: + if self.bmc_depth is not None and curr_node.id not in self._checked_for_bounded: + _LOGGER.info(f'Checking bmc depth for node {self.id}: {curr_node.id}') + self._checked_for_bounded.add(curr_node.id) + _prior_loops = [ + succ.source.id + for succ in self.shortest_path_to(curr_node.id) + if self.kcfg_explore.kcfg_semantics.same_loop(succ.source.cterm, curr_node.cterm) + ] + prior_loops: list[NodeIdLike] = [] + for _pl in _prior_loops: + if not ( + self.kcfg.zero_depth_between(_pl, curr_node.id) + or any(self.kcfg.zero_depth_between(_pl, pl) for pl in prior_loops) + ): + prior_loops.append(_pl) + _LOGGER.info(f'Prior loop heads for node {self.id}: {(curr_node.id, prior_loops)}') + if len(prior_loops) > self.bmc_depth: + _LOGGER.warning(f'Bounded node {self.id}: {curr_node.id} at bmc depth {self.bmc_depth}') + self.add_bounded(curr_node.id) + continue + steps.append( + APRProofStep( + bmc_depth=self.bmc_depth, + checked_for_bounded=self._checked_for_bounded, + cterm=curr_node.cterm, + is_terminal=self.kcfg_explore.kcfg_semantics.is_terminal(curr_node.cterm), + kcfg_explore=self.kcfg_explore, + module_name=( + self.circularities_module_name + if self.nonzero_depth(curr_node) + else self.dependencies_module_name + ), + node_id=curr_node.id, + proof_id=self.id, + target_cterm=target_node.cterm, + target_is_terminal=self.target in self._terminal, + target_node_id=target_node.id, + ) + ) + return steps + + def commit(self, result: StepResult) -> None: + if isinstance(result, APRProofExtendResult): + self.kcfg_explore.extend_kcfg( + result.extend_result, self.kcfg, self.kcfg.node(result.node_id), self.logs + ) + elif isinstance(result, APRProofSubsumeResult): + if result.csubst is None: + self._terminal.add(result.node_id) + else: + self.kcfg.create_cover(result.node_id, self.target, csubst=result.csubst) + _LOGGER.info( + f'Subsumed into target node {self.id}: {shorten_hashes((result.node_id, self.target))}' + ) + else: + raise ValueError('Incorrect result type') + + self._check_all_terminals() + + for node in self.terminal: + if ( + not node.id in self._checked_for_subsumption + and self.kcfg.is_leaf(node.id) + and not self.is_target(node.id) + ): + self._checked_for_subsumption.add(node.id) + self._check_subsume(node) + + if self.failed: + self.failure_info = self.failure_info() + + @property def module_name(self) -> str: return self._make_module_name(self.id) + @property + def circularities_module_name(self) -> str: + return self.module_name + '-CIRCULARITIES-MODULE' + + @property + def dependencies_module_name(self) -> str: + return self.module_name + '-DEPENDS-MODULE' + @property def pending(self) -> list[KCFG.Node]: return [node for node in self.explorable if self.is_pending(node.id)] @@ -538,10 +665,6 @@ class APRProver(Prover): always_check_subsumption: bool fast_check_subsumption: bool - _checked_for_terminal: set[int] - _checked_for_subsumption: set[int] - _checked_for_bounded: set[int] - def __init__( self, proof: APRProof, @@ -589,126 +712,159 @@ def _inject_module(module_name: str, import_name: str, sentences: list[KRuleLike _inject_module(self.dependencies_module_name, self.main_module_name, dependencies_as_rules) _inject_module(self.circularities_module_name, self.dependencies_module_name, [circularity_rule]) - self._checked_for_terminal = set() - self._checked_for_subsumption = set() - self._checked_for_bounded = set() - self._check_all_terminals() - - def nonzero_depth(self, node: KCFG.Node) -> bool: - return not self.proof.kcfg.zero_depth_between(self.proof.init, node.id) - - def _check_terminal(self, node: KCFG.Node) -> None: - if node.id not in self._checked_for_terminal: - _LOGGER.info(f'Checking terminal: {node.id}') - self._checked_for_terminal.add(node.id) - if self.kcfg_explore.kcfg_semantics.is_terminal(node.cterm): - _LOGGER.info(f'Terminal node: {node.id}.') - self.proof._terminal.add(node.id) - elif self.fast_check_subsumption and self._may_subsume(node): - _LOGGER.info(f'Marking node as terminal because of fast may subsume check {self.proof.id}: {node.id}') - self.proof._terminal.add(node.id) - - def _check_all_terminals(self) -> None: - for node in self.proof.kcfg.nodes: - self._check_terminal(node) + self.proof._check_all_terminals() + +# def _may_subsume(self, node: KCFG.Node) -> bool: +# node_k_cell = node.cterm.try_cell('K_CELL') +# target_k_cell = self.proof.kcfg.node(self.proof.target).cterm.try_cell('K_CELL') +# if node_k_cell and target_k_cell and not target_k_cell.match(node_k_cell): +# return False +# return True + +# def _check_subsume(self, node: KCFG.Node) -> bool: +# _LOGGER.info( +# f'Checking subsumption into target state {self.proof.id}: {shorten_hashes((node.id, self.proof.target))}' +# ) +# if self.fast_check_subsumption and not self._may_subsume(node): +# _LOGGER.info( +# f'Skipping full subsumption check because of fast may subsume check {self.proof.id}: {node.id}' +# ) +# return False +# csubst = self.kcfg_explore.cterm_implies(node.cterm, self.proof.kcfg.node(self.proof.target).cterm) +# if csubst is not None: +# self.proof.kcfg.create_cover(node.id, self.proof.target, csubst=csubst) +# _LOGGER.info(f'Subsumed into target node {self.proof.id}: {shorten_hashes((node.id, self.proof.target))}') +# return True +# else: +# return False + +# def advance_pending_node( +# self, +# node: KCFG.Node, +# execute_depth: int | None = None, +# cut_point_rules: Iterable[str] = (), +# terminal_rules: Iterable[str] = (), +# ) -> None: +# if self.proof.bmc_depth is not None and node.id not in self._checked_for_bounded: +# _LOGGER.info(f'Checking bmc depth for node {self.proof.id}: {node.id}') +# self._checked_for_bounded.add(node.id) +# _prior_loops = [ +# succ.source.id +# for succ in self.proof.shortest_path_to(node.id) +# if self.kcfg_explore.kcfg_semantics.same_loop(succ.source.cterm, node.cterm) +# ] +# prior_loops: list[NodeIdLike] = [] +# for _pl in _prior_loops: +# if not ( +# self.proof.kcfg.zero_depth_between(_pl, node.id) +# or any(self.proof.kcfg.zero_depth_between(_pl, pl) for pl in prior_loops) +# ): +# prior_loops.append(_pl) +# _LOGGER.info(f'Prior loop heads for node {self.proof.id}: {(node.id, prior_loops)}') +# if len(prior_loops) > self.proof.bmc_depth: +# _LOGGER.warning(f'Bounded node {self.proof.id}: {node.id} at bmc depth {self.proof.bmc_depth}') +# self.proof.add_bounded(node.id) +# return +# +# if self.proof.target not in self.proof._terminal: +# if self.always_check_subsumption and self._check_subsume(node): +# return +# +# module_name = self.circularities_module_name if self.nonzero_depth(node) else self.dependencies_module_name +# self.kcfg_explore.extend( +# self.proof, +# node, +# self.proof.logs, +# execute_depth=execute_depth, +# cut_point_rules=cut_point_rules, +# terminal_rules=terminal_rules, +# module_name=module_name, +# ) + + + def step_proof(self, step: ProofStep) -> APRProofResult: + if self.is_terminal or not self.target_is_terminal: + if self.always_check_subsumption: + csubst = self._check_subsume() + if csubst is not None or self.is_terminal: + return APRProofSubsumeResult(csubst=csubst, node_id=self.node_id) + + extend_result = self.kcfg_explore.extend_cterm( + self.cterm, + execute_depth=self.execute_depth, + cut_point_rules=self.cut_point_rules, + terminal_rules=self.terminal_rules, + module_name=self.module_name, + ) + return APRProofExtendResult(node_id=self.node_id, extend_result=extend_result) - def _may_subsume(self, node: KCFG.Node) -> bool: - node_k_cell = node.cterm.try_cell('K_CELL') - target_k_cell = self.proof.kcfg.node(self.proof.target).cterm.try_cell('K_CELL') - if node_k_cell and target_k_cell and not target_k_cell.match(node_k_cell): - return False - return True - def _check_subsume(self, node: KCFG.Node) -> bool: - _LOGGER.info( - f'Checking subsumption into target state {self.proof.id}: {shorten_hashes((node.id, self.proof.target))}' - ) - if self.fast_check_subsumption and not self._may_subsume(node): - _LOGGER.info( - f'Skipping full subsumption check because of fast may subsume check {self.proof.id}: {node.id}' - ) - return False - csubst = self.kcfg_explore.cterm_implies(node.cterm, self.proof.kcfg.node(self.proof.target).cterm) - if csubst is not None: - self.proof.kcfg.create_cover(node.id, self.proof.target, csubst=csubst) - _LOGGER.info(f'Subsumed into target node {self.proof.id}: {shorten_hashes((node.id, self.proof.target))}') - return True - else: - return False + def failure_info(self) -> APRFailureInfo: + return APRFailureInfo.from_proof(self.proof, self.kcfg_explore, counterexample_info=self.counterexample_info) - def advance_pending_node( - self, - node: KCFG.Node, - execute_depth: int | None = None, - cut_point_rules: Iterable[str] = (), - terminal_rules: Iterable[str] = (), - ) -> None: - if self.proof.bmc_depth is not None and node.id not in self._checked_for_bounded: - _LOGGER.info(f'Checking bmc depth for node {self.proof.id}: {node.id}') - self._checked_for_bounded.add(node.id) - _prior_loops = [ - succ.source.id - for succ in self.proof.shortest_path_to(node.id) - if self.kcfg_explore.kcfg_semantics.same_loop(succ.source.cterm, node.cterm) - ] - prior_loops: list[NodeIdLike] = [] - for _pl in _prior_loops: - if not ( - self.proof.kcfg.zero_depth_between(_pl, node.id) - or any(self.proof.kcfg.zero_depth_between(_pl, pl) for pl in prior_loops) - ): - prior_loops.append(_pl) - _LOGGER.info(f'Prior loop heads for node {self.proof.id}: {(node.id, prior_loops)}') - if len(prior_loops) > self.proof.bmc_depth: - _LOGGER.warning(f'Bounded node {self.proof.id}: {node.id} at bmc depth {self.proof.bmc_depth}') - self.proof.add_bounded(node.id) - return - - if self.proof.target not in self.proof._terminal: - if self.always_check_subsumption and self._check_subsume(node): - return - - module_name = self.circularities_module_name if self.nonzero_depth(node) else self.dependencies_module_name - self.kcfg_explore.extend( - self.proof, - node, - self.proof.logs, - execute_depth=execute_depth, - cut_point_rules=cut_point_rules, - terminal_rules=terminal_rules, - module_name=module_name, - ) - def step_proof(self) -> None: - self._check_all_terminals() +@dataclass +class APRProofResult(StepResult): + node_id: int - if not self.proof.pending: - return - curr_node = self.proof.pending[0] - self.advance_pending_node( - node=curr_node, - execute_depth=self.execute_depth, - cut_point_rules=self.cut_point_rules, - terminal_rules=self.terminal_rules, - ) +@dataclass +class APRProofExtendResult(APRProofResult): + extend_result: ExtendResult - self._check_all_terminals() - for node in self.proof.terminal: - if ( - not node.id in self._checked_for_subsumption - and self.proof.kcfg.is_leaf(node.id) - and not self.proof.is_target(node.id) - ): - self._checked_for_subsumption.add(node.id) - self._check_subsume(node) +@dataclass +class APRProofSubsumeResult(APRProofResult): + csubst: CSubst | None - if self.proof.failed: - self.proof.failure_info = self.failure_info() - def failure_info(self) -> APRFailureInfo: - return APRFailureInfo.from_proof(self.proof, self.kcfg_explore, counterexample_info=self.counterexample_info) +@dataclass(frozen=True, eq=True) +class APRProofStep(ProofStep): + bmc_depth: int | None + node_id: int + target_node_id: int + proof_id: str + cterm: CTerm + target_cterm: CTerm + kcfg_explore: KCFGExplore + checked_for_bounded: Iterable[int] + target_is_terminal: bool + module_name: str + is_terminal: bool + +# def _may_subsume(self) -> bool: +# node_k_cell = self.cterm.try_cell('K_CELL') +# target_k_cell = self.target_cterm.try_cell('K_CELL') +# if node_k_cell and target_k_cell and not target_k_cell.match(node_k_cell): +# return False +# return True +# +# def _check_subsume(self) -> CSubst | None: +# _LOGGER.info( +# f'Checking subsumption into target state {self.proof_id}: {shorten_hashes((self.node_id, self.target_node_id))}' +# ) +# if self.fast_check_subsumption and not self._may_subsume(): +# _LOGGER.info( +# f'Skipping full subsumption check because of fast may subsume check {self.proof_id}: {self.node_id}' +# ) +# return None +# return self.kcfg_explore.cterm_implies(self.cterm, self.target_cterm) +# +# def exec(self) -> APRProofResult: +# if self.is_terminal or not self.target_is_terminal: +# if self.always_check_subsumption: +# csubst = self._check_subsume() +# if csubst is not None or self.is_terminal: +# return APRProofSubsumeResult(csubst=csubst, node_id=self.node_id) +# +# extend_result = self.kcfg_explore.extend_cterm( +# self.cterm, +# execute_depth=self.execute_depth, +# cut_point_rules=self.cut_point_rules, +# terminal_rules=self.terminal_rules, +# module_name=self.module_name, +# ) +# return APRProofExtendResult(node_id=self.node_id, extend_result=extend_result) @dataclass(frozen=True)