diff --git a/diffyscan/diffyscan.py b/diffyscan/diffyscan.py index 18cd7d7..9c4d083 100755 --- a/diffyscan/diffyscan.py +++ b/diffyscan/diffyscan.py @@ -10,7 +10,11 @@ from .utils.common import load_config, load_env from .utils.constants import * from .utils.explorer import get_contract_from_explorer -from .utils.github import get_file_from_github, get_file_from_github_recursive, resolve_dep +from .utils.github import ( + get_file_from_github, + get_file_from_github_recursive, + resolve_dep, +) from .utils.helpers import create_dirs from .utils.logger import logger from .utils.binary_verifier import * @@ -21,53 +25,85 @@ g_skip_user_input: bool = False + def prettify_solidity(solidity_contract_content: str): - github_file_name = os.path.join(tempfile.gettempdir(), "9B91E897-EA51-4FCC-8DAF-FCFF135A6963.sol") + github_file_name = os.path.join( + tempfile.gettempdir(), "9B91E897-EA51-4FCC-8DAF-FCFF135A6963.sol" + ) with open(github_file_name, "w") as fp: fp.write(solidity_contract_content) prettier_return_code = subprocess.call( - ["npx", "prettier", "--plugin=prettier-plugin-solidity", "--write", github_file_name], - stdout=subprocess.DEVNULL) + [ + "npx", + "prettier", + "--plugin=prettier-plugin-solidity", + "--write", + github_file_name, + ], + stdout=subprocess.DEVNULL, + ) if prettier_return_code != 0: logger.error("Prettier/npx subprocess failed (see the error above)") sys.exit() with open(github_file_name, "r") as fp: return fp.read() + def run_binary_diff(remote_contract_address, contract_source_code, config): - logger.info(f'Started binary checking for {remote_contract_address}') - - contract_creation_code, immutables, is_valid_constructor = get_contract_creation_code_from_etherscan(contract_source_code, config, remote_contract_address) - + logger.info(f"Started binary checking for {remote_contract_address}") + + contract_creation_code, immutables, is_valid_constructor = ( + get_contract_creation_code_from_etherscan( + contract_source_code, config, remote_contract_address + ) + ) + if not is_valid_constructor: - logger.error(f'Failed to find constructorArgs, binary diff skipped') + logger.error(f"Failed to find constructorArgs, binary diff skipped") return - + deployer_account = get_account(LOCAL_RPC_URL) - - if (deployer_account is None): - logger.error(f'Failed to receive the account, binary diff skipped') + + if deployer_account is None: + logger.error(f"Failed to receive the account, binary diff skipped") return - - local_contract_address = deploy_contract(LOCAL_RPC_URL, deployer_account, contract_creation_code) - - if (local_contract_address is None): - logger.error(f'Failed to deploy bytecode to {LOCAL_RPC_URL}, binary diff skipped') + + local_contract_address = deploy_contract( + LOCAL_RPC_URL, deployer_account, contract_creation_code + ) + + if local_contract_address is None: + logger.error( + f"Failed to deploy bytecode to {LOCAL_RPC_URL}, binary diff skipped" + ) return local_deployed_bytecode = get_bytecode(local_contract_address, LOCAL_RPC_URL) - if (local_deployed_bytecode is None): - logger.error(f'Failed to receive bytecode from {LOCAL_RPC_URL}') + if local_deployed_bytecode is None: + logger.error(f"Failed to receive bytecode from {LOCAL_RPC_URL}") return - + remote_deployed_bytecode = get_bytecode(remote_contract_address, REMOTE_RPC_URL) if remote_deployed_bytecode is None: - logger.error(f'Failed to receive bytecode from {REMOTE_RPC_URL}') + logger.error(f"Failed to receive bytecode from {REMOTE_RPC_URL}") return - - to_match(local_deployed_bytecode, remote_deployed_bytecode, immutables, remote_contract_address) -def run_source_diff(contract_address_from_config, contract_code, config, github_api_token, recursive_parsing=False, prettify=False): + to_match( + local_deployed_bytecode, + remote_deployed_bytecode, + immutables, + remote_contract_address, + ) + + +def run_source_diff( + contract_address_from_config, + contract_code, + config, + github_api_token, + recursive_parsing=False, + prettify=False, +): logger.divider() logger.okay("Contract", contract_address_from_config) logger.okay("Blockchain explorer Hostname", config["explorer_hostname"]) @@ -81,7 +117,11 @@ def run_source_diff(contract_address_from_config, contract_code, config, github_ f"Fetching source code from blockchain explorer {config['explorer_hostname']} ..." ) - source_files = contract_code["solcInput"].items() if not "sources" in contract_code["solcInput"] else contract_code["solcInput"]["sources"].items() + source_files = ( + contract_code["solcInput"].items() + if not "sources" in contract_code["solcInput"] + else contract_code["solcInput"]["sources"].items() + ) files_count = len(source_files) logger.okay("Contract", contract_code["name"]) logger.okay("Files", files_count) @@ -119,9 +159,13 @@ def run_source_diff(contract_address_from_config, contract_code, config, github_ file_found = bool(repo) if recursive_parsing: - github_file = get_file_from_github_recursive(github_api_token, repo, path_to_file, dep_name) + github_file = get_file_from_github_recursive( + github_api_token, repo, path_to_file, dep_name + ) else: - github_file = get_file_from_github(github_api_token, repo, path_to_file, dep_name) + github_file = get_file_from_github( + github_api_token, repo, path_to_file, dep_name + ) if not github_file: github_file = "" @@ -137,7 +181,9 @@ def run_source_diff(contract_address_from_config, contract_code, config, github_ explorer_lines = explorer_content.splitlines() diff_html = difflib.HtmlDiff().make_file(github_lines, explorer_lines) - diff_report_filename = f"{DIFFS_DIR}/{contract_address_from_config}/{filename}.html" + diff_report_filename = ( + f"{DIFFS_DIR}/{contract_address_from_config}/{filename}.html" + ) create_dirs(diff_report_filename) with open(diff_report_filename, "w") as f: @@ -167,55 +213,97 @@ def run_source_diff(contract_address_from_config, contract_code, config, github_ logger.report_table(report) -def process_config(path: str, recursive_parsing: bool, unify_formatting: bool, binary_check: bool, autoclean: bool): + +def process_config( + path: str, + recursive_parsing: bool, + unify_formatting: bool, + binary_check: bool, + autoclean: bool, +): logger.info(f"Loading config {path}...") config = load_config(path) explorer_token = None if "explorer_token_env_var" in config: - explorer_token = load_env(config["explorer_token_env_var"], masked=True, required=False) - if (explorer_token is None): - explorer_token = os.getenv('ETHERSCAN_EXPLORER_TOKEN', default=None) - if (explorer_token is None): - raise ValueError(f'Failed to find "ETHERSCAN_EXPLORER_TOKEN" in env') - + explorer_token = load_env( + config["explorer_token_env_var"], masked=True, required=False + ) + if explorer_token is None: + explorer_token = os.getenv("ETHERSCAN_EXPLORER_TOKEN", default=None) + if explorer_token is None: + raise ValueError(f'Failed to find "ETHERSCAN_EXPLORER_TOKEN" in env') + contracts = config["contracts"] - + try: - if (binary_check): - ganache.start() - + if binary_check: + ganache.start() + for contract_address, contract_name in contracts.items(): - contract_code = get_contract_from_explorer(explorer_token, config["explorer_hostname"], contract_address, contract_name) - run_source_diff(contract_address, contract_code, config, GITHUB_API_TOKEN, recursive_parsing, unify_formatting) - if (binary_check): + contract_code = get_contract_from_explorer( + explorer_token, + config["explorer_hostname"], + contract_address, + contract_name, + ) + run_source_diff( + contract_address, + contract_code, + config, + GITHUB_API_TOKEN, + recursive_parsing, + unify_formatting, + ) + if binary_check: run_binary_diff(contract_address, contract_code, config) except KeyboardInterrupt: - logger.info(f'Keyboard interrupt by user') + logger.info(f"Keyboard interrupt by user") finally: ganache.stop() - - if (autoclean): + if autoclean: shutil.rmtree(SOLC_DIR) - logger.okay(f'{SOLC_DIR} deleted') + logger.okay(f"{SOLC_DIR} deleted") + def parse_arguments(): parser = argparse.ArgumentParser() - parser.add_argument("--version", "-V", action="store_true", help="Display version information") - parser.add_argument("path", nargs="?", default=None, help="Path to config or directory with configs") - parser.add_argument("--yes", "-y", help="If set don't ask for input before validating each contract", action="store_true") + parser.add_argument( + "--version", "-V", action="store_true", help="Display version information" + ) + parser.add_argument( + "path", nargs="?", default=None, help="Path to config or directory with configs" + ) + parser.add_argument( + "--yes", + "-y", + help="If set don't ask for input before validating each contract", + action="store_true", + ) parser.add_argument( "--support-brownie", help="Support recursive retrieving for contracts. It may be useful for contracts whose sources have been verified by the brownie tooling, which automatically replaces relative paths to contracts in imports with plain contract names.", action=argparse.BooleanOptionalAction, ) - parser.add_argument("--prettify", "-p", help="Unify formatting by prettier before comparing", action="store_true") - parser.add_argument("--binary-check", "-binary", help="Match contracts by binaries such as verify-bytecode.ts", default=True) - parser.add_argument("--autoclean", "-clean", help="Autoclean build dir after work", default=True) + parser.add_argument( + "--prettify", + "-p", + help="Unify formatting by prettier before comparing", + action="store_true", + ) + parser.add_argument( + "--binary-check", + "-binary", + help="Match contracts by binaries such as verify-bytecode.ts", + default=True, + ) + parser.add_argument( + "--autoclean", "-clean", help="Autoclean build dir after work", default=True + ) return parser.parse_args() - - + + def main(): global g_skip_user_input @@ -228,14 +316,32 @@ def main(): logger.divider() if args.path is None: - process_config(DEFAULT_CONFIG_PATH, args.support_brownie, args.prettify, args.binary_check, args.autoclean) + process_config( + DEFAULT_CONFIG_PATH, + args.support_brownie, + args.prettify, + args.binary_check, + args.autoclean, + ) elif os.path.isfile(args.path): - process_config(args.path, args.support_brownie, args.prettify, args.binary_check, args.autoclean) + process_config( + args.path, + args.support_brownie, + args.prettify, + args.binary_check, + args.autoclean, + ) elif os.path.isdir(args.path): for filename in os.listdir(args.path): config_path = os.path.join(args.path, filename) if os.path.isfile(config_path): - process_config(config_path, args.support_brownie, args.prettify, args.binary_check, args.autoclean) + process_config( + config_path, + args.support_brownie, + args.prettify, + args.binary_check, + args.autoclean, + ) else: logger.error(f"Specified config path {args.path} not found") sys.exit(1) diff --git a/diffyscan/utils/binary_verifier.py b/diffyscan/utils/binary_verifier.py index ec43f23..8388b0f 100644 --- a/diffyscan/utils/binary_verifier.py +++ b/diffyscan/utils/binary_verifier.py @@ -324,4 +324,4 @@ def parse(bytecode): 'bytecode': buffer[i:i+length].hex() }) i += length - return instructions \ No newline at end of file + return instructions diff --git a/diffyscan/utils/common.py b/diffyscan/utils/common.py index 00e9f6f..34b2507 100644 --- a/diffyscan/utils/common.py +++ b/diffyscan/utils/common.py @@ -8,6 +8,7 @@ from .logger import logger from .types import Config + def load_env(variable_name, required=True, masked=False): value = os.getenv(variable_name, default=None) @@ -29,6 +30,7 @@ def load_config(path: str) -> Config: with open(path, mode="r") as config_file: return json.load(config_file) + def handle_response(response, url): if response.status_code == 404: return None @@ -40,18 +42,21 @@ def handle_response(response, url): return response + def fetch(url, headers={}): logger.log(f"Fetch: {url}") response = requests.get(url, headers=headers) return handle_response(response, url) + def pull(url, payload={}): logger.log(f"Pull: {url}") response = requests.post(url, data=payload) return handle_response(response, url) - + + def mask_text(text, mask_start=3, mask_end=3): text_length = len(text) mask = "*" * (text_length - mask_start - mask_end) @@ -64,11 +69,12 @@ def parse_repo_link(repo_link): user_slash_repo = repo_location[0] return user_slash_repo + def get_solc_native_platform_from_os(): platform_name = sys.platform - if platform_name == 'linux': - return 'linux-amd64' - elif platform_name == 'darwin': - return 'macosx-amd64' + if platform_name == "linux": + return "linux-amd64" + elif platform_name == "darwin": + return "macosx-amd64" else: - raise ValueError(f'Unsupported platform {platform_name}') \ No newline at end of file + raise ValueError(f"Unsupported platform {platform_name}") diff --git a/diffyscan/utils/constants.py b/diffyscan/utils/constants.py index e0f1e8a..81b4ac4 100644 --- a/diffyscan/utils/constants.py +++ b/diffyscan/utils/constants.py @@ -8,18 +8,18 @@ LOGS_PATH = f"{DIGEST_DIR}/{START_TIME_INT}/logs.txt" DEFAULT_CONFIG_PATH = "config.json" -REMOTE_RPC_URL = os.getenv('REMOTE_RPC_URL', '') +REMOTE_RPC_URL = os.getenv("REMOTE_RPC_URL", "") if not REMOTE_RPC_URL: - raise ValueError('REMOTE_RPC_URL variable is not set') + raise ValueError("REMOTE_RPC_URL variable is not set") -SOLC_DIR = os.getenv('SOLC_DIR', '') -if SOLC_DIR == '': - raise ValueError('SOLC_DIR variable is not set') - -LOCAL_RPC_URL = os.getenv('LOCAL_RPC_URL', '') +SOLC_DIR = os.getenv("SOLC_DIR", "") +if SOLC_DIR == "": + raise ValueError("SOLC_DIR variable is not set") + +LOCAL_RPC_URL = os.getenv("LOCAL_RPC_URL", "") if not LOCAL_RPC_URL: - raise ValueError('LOCAL_RPC_URL variable is not set') - -GITHUB_API_TOKEN = os.getenv('GITHUB_API_TOKEN', '') + raise ValueError("LOCAL_RPC_URL variable is not set") + +GITHUB_API_TOKEN = os.getenv("GITHUB_API_TOKEN", "") if not GITHUB_API_TOKEN: - raise ValueError('GITHUB_API_TOKEN variable is not set') + raise ValueError("GITHUB_API_TOKEN variable is not set") diff --git a/diffyscan/utils/encoder.py b/diffyscan/utils/encoder.py index 0069cf8..b5e41b2 100644 --- a/diffyscan/utils/encoder.py +++ b/diffyscan/utils/encoder.py @@ -1,16 +1,20 @@ from .logger import logger + def encode_address(address): number = int(address, 16) formatted_address = f"{number:064x}" return formatted_address + def encode_uint256(number): return format(number, "064x") + def encode_bytes32(data): return data.replace("0x", "") + def encode_bytes(data): bytes_str = data.lstrip("0x") data_length = len(bytes_str) // 2 @@ -20,6 +24,7 @@ def encode_bytes(data): bytes_str += "0" * (encoded_length - len(bytes_str)) return format(data_length, "064x") + bytes_str + def encode_tuple(types, args): args_length = len(types) encoded_offsets = "" @@ -29,21 +34,25 @@ def encode_tuple(types, args): arg_value = args[arg_index] if arg_type == "address": encoded_offsets += encode_address(arg_value) - elif arg_type == "uint256" or arg_type == "bool" or arg_type == 'uint8': + elif arg_type == "uint256" or arg_type == "bool" or arg_type == "uint8": encoded_offsets += encode_uint256(arg_value) elif arg_type == "bytes32": encoded_offsets += encode_bytes32(arg_value) elif arg_type == "address[]" and not arg_value: - encoded_data += '0' * 64 + encoded_data += "0" * 64 offset = format((arg_index + args_length) * 32, "064x") encoded_offsets += offset else: - logger.warn(f"Unknown constructor argument type '{arg_type}', use --constructor-calldata instead") - return encoded_offsets+encoded_data + logger.warn( + f"Unknown constructor argument type '{arg_type}', use --constructor-calldata instead" + ) + return encoded_offsets + encoded_data + def to_hex_with_alignment(value): return format(value, "064x") + def encode_constructor_arguments(constructor_abi, constructor_config_args): arg_length = len(constructor_abi) @@ -52,45 +61,49 @@ def encode_constructor_arguments(constructor_abi, constructor_config_args): constructor_calldata = "" compl_data = [] if arg_length > 0: - for argument_index in range(arg_length): - arg_type = constructor_abi[argument_index]["type"] - arg_value = constructor_config_args[argument_index] - if arg_type == "address": - constructor_calldata += encode_address(arg_value) - elif arg_type == "uint256" or arg_type == "bool" or arg_type == 'uint8': - constructor_calldata += encode_uint256(arg_value) - elif arg_type == "bytes32": - constructor_calldata += encode_bytes32(arg_value) - elif arg_type == "bytes": - data = format((argument_index+1)*32, "064x") - constructor_calldata+=data - data2 = encode_bytes(arg_value) - compl_data.append(data2) - elif arg_type == "string": - offset_arg = to_hex_with_alignment((arg_length + len(compl_data))*32) - constructor_calldata += offset_arg - length_arg = to_hex_with_alignment(len(arg_value.encode('utf-8'))) - hex_text = arg_value.encode('utf-8').hex().ljust(64, '0') - compl_data.append(length_arg) - compl_data.append(hex_text) - elif arg_type == "tuple": - args_tuple_types = [component["type"] for component in constructor_abi[argument_index]["components"]] - if len(args_tuple_types) and args_tuple_types[0] == "address[]": - dynamic_type_length = format((len(constructor_calldata)//64 + 1) *32, "064x") - constructor_calldata += dynamic_type_length - logger.info(f'dynamic_type_length {dynamic_type_length}') - compl_data.append(encode_tuple(args_tuple_types, arg_value)) - else: - constructor_calldata += encode_tuple(args_tuple_types, arg_value) - elif arg_type.endswith("[]"): - data = format((argument_index+1)*32, "064x") - constructor_calldata+=data - data2 = encode_bytes(arg_value) - compl_data.append(data2) - else: - raise ValueError(f"Unknown constructor argument type: {arg_type}") - for data in compl_data: + for argument_index in range(arg_length): + arg_type = constructor_abi[argument_index]["type"] + arg_value = constructor_config_args[argument_index] + if arg_type == "address": + constructor_calldata += encode_address(arg_value) + elif arg_type == "uint256" or arg_type == "bool" or arg_type == "uint8": + constructor_calldata += encode_uint256(arg_value) + elif arg_type == "bytes32": + constructor_calldata += encode_bytes32(arg_value) + elif arg_type == "bytes": + data = format((argument_index + 1) * 32, "064x") + constructor_calldata += data + data2 = encode_bytes(arg_value) + compl_data.append(data2) + elif arg_type == "string": + offset_arg = to_hex_with_alignment((arg_length + len(compl_data)) * 32) + constructor_calldata += offset_arg + length_arg = to_hex_with_alignment(len(arg_value.encode("utf-8"))) + hex_text = arg_value.encode("utf-8").hex().ljust(64, "0") + compl_data.append(length_arg) + compl_data.append(hex_text) + elif arg_type == "tuple": + args_tuple_types = [ + component["type"] + for component in constructor_abi[argument_index]["components"] + ] + if len(args_tuple_types) and args_tuple_types[0] == "address[]": + dynamic_type_length = format( + (len(constructor_calldata) // 64 + 1) * 32, "064x" + ) + constructor_calldata += dynamic_type_length + logger.info(f"dynamic_type_length {dynamic_type_length}") + compl_data.append(encode_tuple(args_tuple_types, arg_value)) + else: + constructor_calldata += encode_tuple(args_tuple_types, arg_value) + elif arg_type.endswith("[]"): + data = format((argument_index + 1) * 32, "064x") + constructor_calldata += data + data2 = encode_bytes(arg_value) + compl_data.append(data2) + else: + raise ValueError(f"Unknown constructor argument type: {arg_type}") + for data in compl_data: constructor_calldata += data return constructor_calldata - diff --git a/diffyscan/utils/explorer.py b/diffyscan/utils/explorer.py index 9a8e686..e482cee 100644 --- a/diffyscan/utils/explorer.py +++ b/diffyscan/utils/explorer.py @@ -11,9 +11,7 @@ def _errorNoSourceCodeAndExit(address): def _get_contract_from_etherscan(token, etherscan_hostname, contract): - etherscan_link = ( - f"https://{etherscan_hostname}/api?module=contract&action=getsourcecode&address={contract}" - ) + etherscan_link = f"https://{etherscan_hostname}/api?module=contract&action=getsourcecode&address={contract}" if token is not None: etherscan_link = f"{etherscan_link}&apikey={token}" @@ -26,46 +24,43 @@ def _get_contract_from_etherscan(token, etherscan_hostname, contract): if "ContractName" not in result: _errorNoSourceCodeAndExit(contract) - solc_input = result['SourceCode'] + solc_input = result["SourceCode"] - if solc_input.startswith('{{'): + if solc_input.startswith("{{"): return { - 'name': result['ContractName'], - 'solcInput': json.loads(solc_input[1:-1]), - 'compiler': result['CompilerVersion'] + "name": result["ContractName"], + "solcInput": json.loads(solc_input[1:-1]), + "compiler": result["CompilerVersion"], } else: return { - 'name': result['ContractName'], - 'compiler': result['CompilerVersion'], - 'solcInput': { - 'language': 'Solidity', - 'sources': { - result['ContractName']: { - 'content': solc_input - } - }, - 'settings': { - 'optimizer': { - 'enabled': result['OptimizationUsed'] == '1', - 'runs': int(result['Runs']) + "name": result["ContractName"], + "compiler": result["CompilerVersion"], + "solcInput": { + "language": "Solidity", + "sources": {result["ContractName"]: {"content": solc_input}}, + "settings": { + "optimizer": { + "enabled": result["OptimizationUsed"] == "1", + "runs": int(result["Runs"]), }, - 'outputSelection': { - '*': { - '*': [ - 'abi', - 'evm.bytecode', - 'evm.deployedBytecode', - 'evm.methodIdentifiers', - 'metadata' + "outputSelection": { + "*": { + "*": [ + "abi", + "evm.bytecode", + "evm.deployedBytecode", + "evm.methodIdentifiers", + "metadata", ], - '': ['ast'] + "": ["ast"], } - } - } - } + }, + }, + }, } + def _get_contract_from_zksync(zksync_explorer_hostname, contract): zksync_explorer_link = ( f"https://{zksync_explorer_hostname}/contract_verification/info/{contract}" @@ -83,15 +78,14 @@ def _get_contract_from_zksync(zksync_explorer_hostname, contract): _errorNoSourceCodeAndExit(contract) return { - 'name': data['ContractName'], - 'sources': json.loads(data["sourceCode"]["sources"]), - 'compiler': data["CompilerVersion"] + "name": data["ContractName"], + "sources": json.loads(data["sourceCode"]["sources"]), + "compiler": data["CompilerVersion"], } + def _get_contract_from_mantle(mantle_explorer_hostname, contract): - etherscan_link = ( - f"https://{mantle_explorer_hostname}/api?module=contract&action=getsourcecode&address={contract}" - ) + etherscan_link = f"https://{mantle_explorer_hostname}/api?module=contract&action=getsourcecode&address={contract}" response = fetch(etherscan_link).json data = response["result"][0] @@ -103,12 +97,15 @@ def _get_contract_from_mantle(mantle_explorer_hostname, contract): source_files.append((entry["Filename"], {"content": entry["SourceCode"]})) return { - 'name': data['ContractName'], - 'sources': json.loads(data["sourceCode"]["sources"]), - 'compiler': data["CompilerVersion"] + "name": data["ContractName"], + "sources": json.loads(data["sourceCode"]["sources"]), + "compiler": data["CompilerVersion"], } -def get_contract_from_explorer(token, explorer_hostname, contract_address, contract_name_from_config): + +def get_contract_from_explorer( + token, explorer_hostname, contract_address, contract_name_from_config +): result = {} if explorer_hostname.startswith("zksync"): result = _get_contract_from_zksync(explorer_hostname, contract_address) @@ -117,12 +114,14 @@ def get_contract_from_explorer(token, explorer_hostname, contract_address, contr elif explorer_hostname.endswith("lineascan.build"): result = _get_contract_from_etherscan(None, explorer_hostname, contract_address) else: - result =_get_contract_from_etherscan(token, explorer_hostname, contract_address) - - contract_name_from_etherscan = result['name'] + result = _get_contract_from_etherscan( + token, explorer_hostname, contract_address + ) + + contract_name_from_etherscan = result["name"] if contract_name_from_etherscan != contract_name_from_config: - raise ValueError( - f"Contract name in config does not match with Blockchain explorer {contract_address}: {contract_name_from_config} != {contract_name_from_etherscan}", - ) - + raise ValueError( + f"Contract name in config does not match with Blockchain explorer {contract_address}: {contract_name_from_config} != {contract_name_from_etherscan}", + ) + return result diff --git a/diffyscan/utils/ganache.py b/diffyscan/utils/ganache.py index 4c1694f..130c015 100644 --- a/diffyscan/utils/ganache.py +++ b/diffyscan/utils/ganache.py @@ -6,54 +6,65 @@ from .logger import logger from .constants import LOCAL_RPC_URL, REMOTE_RPC_URL from .binary_verifier import get_chain_id + + class Ganache: - sub_process = None - TIMEOUT_FOR_INIT_SEC = 5 - - def __init__(self): - pass - - def is_port_in_use(self, parsed_url) -> bool: - import socket - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - return s.connect_ex((parsed_url.hostname, parsed_url.port)) == 0 - - def start(self): - parsed_url = urlparse(LOCAL_RPC_URL) - - local_node_command = ( - f'npx ganache --host {parsed_url.hostname} ' \ - f'--port {parsed_url.port} ' \ - f'--chain.chainId {get_chain_id (REMOTE_RPC_URL)} ' \ - f'--fork.url {REMOTE_RPC_URL} ' \ - f'-l 92000000 --hardfork istanbul -d' - ) - - logger.info(f'Trying to start Ganache: "{local_node_command}"') - is_port_used = self.is_port_in_use(parsed_url) - if (is_port_used): - answer = input(f'Port {parsed_url.port} is busy. Fix it? write "yes": ') - if (answer.lower() == 'yes'): - return_code = subprocess.call(f'exec npx kill-port {parsed_url.port}', shell=True) - if (return_code == 0): - is_port_used = self.is_port_in_use(parsed_url) - if (is_port_used): - raise ValueError(f'Failed to start Ganache: {parsed_url.netloc} is busy') - self.sub_process = subprocess.Popen("exec "+ local_node_command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) - try: - _, errs = self.sub_process.communicate(timeout=self.TIMEOUT_FOR_INIT_SEC) - if (errs): - raise ValueError(f'Failed to start Ganache: {errs.decode()}') - except subprocess.TimeoutExpired: + sub_process = None + TIMEOUT_FOR_INIT_SEC = 5 + + def __init__(self): + pass + + def is_port_in_use(self, parsed_url) -> bool: + import socket + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex((parsed_url.hostname, parsed_url.port)) == 0 + + def start(self): + parsed_url = urlparse(LOCAL_RPC_URL) + + local_node_command = ( + f"npx ganache --host {parsed_url.hostname} " + f"--port {parsed_url.port} " + f"--chain.chainId {get_chain_id (REMOTE_RPC_URL)} " + f"--fork.url {REMOTE_RPC_URL} " + f"-l 92000000 --hardfork istanbul -d" + ) + + logger.info(f'Trying to start Ganache: "{local_node_command}"') is_port_used = self.is_port_in_use(parsed_url) - if (is_port_used): - logger.okay(f'Ganache successfully started, PID {self.sub_process.pid}') - else: - raise ValueError(f'Failed to start Ganache: something is wrong') - - def stop(self): - if self.sub_process is not None and self.sub_process.poll() is None: - os.kill(self.sub_process.pid, signal.SIGTERM) - logger.info(f'Ganache stopped, PID {self.sub_process.pid}') - -ganache = Ganache() \ No newline at end of file + if is_port_used: + answer = input(f'Port {parsed_url.port} is busy. Fix it? write "yes": ') + if answer.lower() == "yes": + return_code = subprocess.call( + f"exec npx kill-port {parsed_url.port}", shell=True + ) + if return_code == 0: + is_port_used = self.is_port_in_use(parsed_url) + if is_port_used: + raise ValueError(f"Failed to start Ganache: {parsed_url.netloc} is busy") + self.sub_process = subprocess.Popen( + "exec " + local_node_command, + shell=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + ) + try: + _, errs = self.sub_process.communicate(timeout=self.TIMEOUT_FOR_INIT_SEC) + if errs: + raise ValueError(f"Failed to start Ganache: {errs.decode()}") + except subprocess.TimeoutExpired: + is_port_used = self.is_port_in_use(parsed_url) + if is_port_used: + logger.okay(f"Ganache successfully started, PID {self.sub_process.pid}") + else: + raise ValueError(f"Failed to start Ganache: something is wrong") + + def stop(self): + if self.sub_process is not None and self.sub_process.poll() is None: + os.kill(self.sub_process.pid, signal.SIGTERM) + logger.info(f"Ganache stopped, PID {self.sub_process.pid}") + + +ganache = Ganache()