Skip to content

Commit

Permalink
more start_* service starting consistency (#18775)
Browse files Browse the repository at this point in the history
* more `start_*` service starting consistency

* fixup

* Update start_full_node.py
  • Loading branch information
altendky authored Nov 5, 2024
1 parent e07d1eb commit 3f6977f
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 63 deletions.
28 changes: 19 additions & 9 deletions chia/server/start_data_layer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import os
import pathlib
import sys
from typing import Any, Optional, cast
Expand All @@ -21,11 +22,13 @@
from chia.util.config import load_config, load_config_cli
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.ints import uint16
from chia.util.task_timing import maybe_manage_task_instrumentation

# See: https://bugs.python.org/issue29288
"".encode("idna")

SERVICE_NAME = "data_layer"

log = logging.getLogger(__name__)


Expand All @@ -42,7 +45,11 @@ def create_data_layer_service(
uploaders = []
if downloaders is None:
downloaders = []

service_config = config[SERVICE_NAME]

network_id = service_config["selected_network"]

self_hostname = config["self_hostname"]
wallet_rpc_port = service_config["wallet_peer"]["port"]
if wallet_service is None:
Expand All @@ -53,26 +60,26 @@ def create_data_layer_service(
wallet_config = wallet_service.config
wallet_rpc_init = WalletRpcClient.create(self_hostname, uint16(wallet_rpc_port), wallet_root_path, wallet_config)

data_layer = DataLayer.create(
# dont add Fil)
node = DataLayer.create(
config=service_config,
root_path=root_path,
wallet_rpc_init=wallet_rpc_init,
downloaders=downloaders,
uploaders=uploaders,
) # dont add Fil)
api = DataLayerAPI(data_layer)
network_id = service_config["selected_network"]
rpc_port = service_config.get("rpc_port")
)
peer_api = DataLayerAPI(node)

rpc_info: Optional[RpcInfo[DataLayerRpcApi]] = None
if rpc_port is not None:
if service_config.get("start_rpc_server", True):
rpc_info = (DataLayerRpcApi, cast(int, service_config["rpc_port"]))

return Service(
root_path=root_path,
config=config,
node=data_layer,
node=node,
# TODO: not for peers...
peer_api=api,
peer_api=peer_api,
node_type=NodeType.DATA_LAYER,
advertised_port=None,
service_name=SERVICE_NAME,
Expand Down Expand Up @@ -132,7 +139,10 @@ async def async_main() -> int:


def main() -> int:
return async_run(async_main())
with maybe_manage_task_instrumentation(
enable=os.environ.get(f"CHIA_INSTRUMENT_{SERVICE_NAME.upper()}") is not None
):
return async_run(coro=async_main())


if __name__ == "__main__":
Expand Down
20 changes: 14 additions & 6 deletions chia/server/start_farmer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import pathlib
import sys
from typing import Any, Optional
Expand All @@ -18,6 +19,7 @@
from chia.util.config import get_unresolved_peer_infos, load_config, load_config_cli
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.keychain import Keychain
from chia.util.task_timing import maybe_manage_task_instrumentation

# See: https://bugs.python.org/issue29288
"".encode("idna")
Expand All @@ -40,23 +42,25 @@ def create_farmer_service(
update_testnet_overrides(network_id, overrides)
updated_constants = replace_str_to_bytes(consensus_constants, **overrides)

farmer = Farmer(
node = Farmer(
root_path, service_config, config_pool, consensus_constants=updated_constants, local_keychain=keychain
)
peer_api = FarmerAPI(farmer)
peer_api = FarmerAPI(node)

rpc_info: Optional[RpcInfo[FarmerRpcApi]] = None
if service_config["start_rpc_server"]:
if service_config.get("start_rpc_server", True):
rpc_info = (FarmerRpcApi, service_config["rpc_port"])

return Service(
root_path=root_path,
config=config,
node=farmer,
node=node,
peer_api=peer_api,
node_type=NodeType.FARMER,
advertised_port=service_config["port"],
service_name=SERVICE_NAME,
connect_peers=get_unresolved_peer_infos(service_config, NodeType.FULL_NODE),
on_connect_callback=farmer.on_connect,
on_connect_callback=node.on_connect,
network_id=network_id,
rpc_info=rpc_info,
connect_to_daemon=connect_to_daemon,
Expand All @@ -72,6 +76,7 @@ async def async_main() -> int:
config_pool = load_config_cli(DEFAULT_ROOT_PATH, "config.yaml", "pool")
config["pool"] = config_pool
initialize_service_logging(service_name=SERVICE_NAME, config=config)

service = create_farmer_service(DEFAULT_ROOT_PATH, config, config_pool, DEFAULT_CONSTANTS)
async with SignalHandlers.manage() as signal_handlers:
await service.setup_process_global_state(signal_handlers=signal_handlers)
Expand All @@ -81,7 +86,10 @@ async def async_main() -> int:


def main() -> int:
return async_run(async_main())
with maybe_manage_task_instrumentation(
enable=os.environ.get(f"CHIA_INSTRUMENT_{SERVICE_NAME.upper()}") is not None
):
return async_run(coro=async_main())


if __name__ == "__main__":
Expand Down
29 changes: 16 additions & 13 deletions chia/server/start_full_node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import logging
import os
import pathlib
import sys
Expand All @@ -27,7 +26,6 @@
"".encode("idna")

SERVICE_NAME = "full_node"
log = logging.getLogger(__name__)


async def create_full_node_service(
Expand All @@ -39,31 +37,33 @@ async def create_full_node_service(
) -> FullNodeService:
service_config = config[SERVICE_NAME]

full_node = await FullNode.create(
network_id = service_config["selected_network"]
upnp_list = []
if service_config["enable_upnp"]:
upnp_list = [service_config["port"]]

node = await FullNode.create(
service_config,
root_path=root_path,
consensus_constants=consensus_constants,
)
api = FullNodeAPI(full_node)
peer_api = FullNodeAPI(node)

upnp_list = []
if service_config["enable_upnp"]:
upnp_list = [service_config["port"]]
network_id = service_config["selected_network"]
rpc_info: Optional[RpcInfo[FullNodeRpcApi]] = None
if service_config["start_rpc_server"]:
if service_config.get("start_rpc_server", True):
rpc_info = (FullNodeRpcApi, service_config["rpc_port"])

return Service(
root_path=root_path,
config=config,
node=api.full_node,
peer_api=api,
node=node,
peer_api=peer_api,
node_type=NodeType.FULL_NODE,
advertised_port=service_config["port"],
service_name=SERVICE_NAME,
upnp_ports=upnp_list,
connect_peers=get_unresolved_peer_infos(service_config, NodeType.FULL_NODE),
on_connect_callback=full_node.on_connect,
on_connect_callback=node.on_connect,
network_id=network_id,
rpc_info=rpc_info,
connect_to_daemon=connect_to_daemon,
Expand All @@ -81,6 +81,7 @@ async def async_main(service_config: dict[str, Any]) -> int:
update_testnet_overrides(network_id, overrides)
updated_constants = replace_str_to_bytes(DEFAULT_CONSTANTS, **overrides)
initialize_service_logging(service_name=SERVICE_NAME, config=config)

service = await create_full_node_service(DEFAULT_ROOT_PATH, config, updated_constants)
async with SignalHandlers.manage() as signal_handlers:
await service.setup_process_global_state(signal_handlers=signal_handlers)
Expand All @@ -92,7 +93,9 @@ async def async_main(service_config: dict[str, Any]) -> int:
def main() -> int:
freeze_support()

with maybe_manage_task_instrumentation(enable=os.environ.get("CHIA_INSTRUMENT_NODE") is not None):
with maybe_manage_task_instrumentation(
enable=os.environ.get(f"CHIA_INSTRUMENT_{SERVICE_NAME.upper()}") is not None
):
service_config = load_config_cli(DEFAULT_ROOT_PATH, "config.yaml", SERVICE_NAME)
target_peer_count = service_config.get("target_peer_count", 40) - service_config.get(
"target_outbound_peer_count", 8
Expand Down
21 changes: 15 additions & 6 deletions chia/server/start_harvester.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import pathlib
import sys
from typing import Any, Optional
Expand All @@ -18,6 +19,7 @@
from chia.util.chia_logging import initialize_service_logging
from chia.util.config import get_unresolved_peer_infos, load_config, load_config_cli
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.task_timing import maybe_manage_task_instrumentation

# See: https://bugs.python.org/issue29288
"".encode("idna")
Expand All @@ -34,19 +36,22 @@ def create_harvester_service(
) -> HarvesterService:
service_config = config[SERVICE_NAME]

overrides = service_config["network_overrides"]["constants"][service_config["selected_network"]]
network_id = service_config["selected_network"]
overrides = service_config["network_overrides"]["constants"][network_id]
updated_constants = replace_str_to_bytes(consensus_constants, **overrides)

harvester = Harvester(root_path, service_config, updated_constants)
peer_api = HarvesterAPI(harvester)
node = Harvester(root_path, service_config, updated_constants)
peer_api = HarvesterAPI(node)
network_id = service_config["selected_network"]

rpc_info: Optional[RpcInfo[HarvesterRpcApi]] = None
if service_config["start_rpc_server"]:
if service_config.get("start_rpc_server", True):
rpc_info = (HarvesterRpcApi, service_config["rpc_port"])

return Service(
root_path=root_path,
config=config,
node=harvester,
node=node,
peer_api=peer_api,
node_type=NodeType.HARVESTER,
advertised_port=None,
Expand All @@ -66,6 +71,7 @@ async def async_main() -> int:
config[SERVICE_NAME] = service_config
initialize_service_logging(service_name=SERVICE_NAME, config=config)
farmer_peers = get_unresolved_peer_infos(service_config, NodeType.FARMER)

service = create_harvester_service(DEFAULT_ROOT_PATH, config, DEFAULT_CONSTANTS, farmer_peers)
async with SignalHandlers.manage() as signal_handlers:
await service.setup_process_global_state(signal_handlers=signal_handlers)
Expand All @@ -75,7 +81,10 @@ async def async_main() -> int:


def main() -> int:
return async_run(async_main())
with maybe_manage_task_instrumentation(
enable=os.environ.get(f"CHIA_INSTRUMENT_{SERVICE_NAME.upper()}") is not None
):
return async_run(coro=async_main())


if __name__ == "__main__":
Expand Down
24 changes: 16 additions & 8 deletions chia/server/start_introducer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import pathlib
import sys
from typing import Any, Optional
Expand All @@ -14,6 +15,7 @@
from chia.util.chia_logging import initialize_service_logging
from chia.util.config import load_config, load_config_cli
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.task_timing import maybe_manage_task_instrumentation

# See: https://bugs.python.org/issue29288
"".encode("idna")
Expand All @@ -29,21 +31,23 @@ def create_introducer_service(
) -> IntroducerService:
service_config = config[SERVICE_NAME]

network_id = service_config["selected_network"]

if advertised_port is None:
advertised_port = service_config["port"]

introducer = Introducer(service_config["max_peers_to_send"], service_config["recent_peer_threshold"])
node__api = IntroducerAPI(introducer)
network_id = service_config["selected_network"]
node = Introducer(service_config["max_peers_to_send"], service_config["recent_peer_threshold"])
peer_api = IntroducerAPI(node)

return Service(
root_path=root_path,
config=config,
node=introducer,
peer_api=node__api,
node=node,
peer_api=peer_api,
node_type=NodeType.INTRODUCER,
advertised_port=advertised_port,
service_name=SERVICE_NAME,
network_id=network_id,
advertised_port=advertised_port,
connect_to_daemon=connect_to_daemon,
class_for_type=ApiProtocolRegistry,
)
Expand All @@ -54,8 +58,9 @@ async def async_main() -> int:
config = load_config(DEFAULT_ROOT_PATH, "config.yaml")
service_config = load_config_cli(DEFAULT_ROOT_PATH, "config.yaml", SERVICE_NAME)
config[SERVICE_NAME] = service_config
service = create_introducer_service(DEFAULT_ROOT_PATH, config)
initialize_service_logging(service_name=SERVICE_NAME, config=config)

service = create_introducer_service(DEFAULT_ROOT_PATH, config)
async with SignalHandlers.manage() as signal_handlers:
await service.setup_process_global_state(signal_handlers=signal_handlers)
await service.run()
Expand All @@ -64,7 +69,10 @@ async def async_main() -> int:


def main() -> int:
return async_run(async_main())
with maybe_manage_task_instrumentation(
enable=os.environ.get(f"CHIA_INSTRUMENT_{SERVICE_NAME.upper()}") is not None
):
return async_run(coro=async_main())


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 3f6977f

Please sign in to comment.