Skip to content

Commit

Permalink
fix style
Browse files Browse the repository at this point in the history
  • Loading branch information
Spartan859 committed Jan 30, 2025
1 parent 7e264d5 commit 1c52199
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 133 deletions.
3 changes: 2 additions & 1 deletion base_compile_result_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

from abc import ABC, abstractmethod


class BaseCompileResultSender(ABC):
"""Abstract base class for compile result senders."""

@abstractmethod
async def send(self, status: bool, message: str) -> None:
async def send(self, code_id: str, status: bool, message: str) -> None:
"""Send the result of a compile.
Args:
Expand Down
4 changes: 2 additions & 2 deletions base_task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class BaseTaskScheduler(ABC):
"""Abstract base class for task schedulers."""

@abstractmethod
def can_accept_judge_task(self) -> bool:
"""Check if the scheduler can accept a new judge task.
Expand All @@ -18,7 +18,7 @@ def can_accept_judge_task(self) -> bool:
True if the scheduler can accept a new judge task, False otherwise
"""
raise NotImplementedError

@abstractmethod
def get_finished_judge_tasks_queue(self) -> asyncio.Queue:
raise NotImplementedError
Expand Down
2 changes: 1 addition & 1 deletion compile_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def execute(self) -> str:
"""

compile_result = await self._build_task.execute()
if compile_result.split(":")[0]=='E':
if compile_result.split(":")[0] == "E":
self._result = compile_result[2:]
await self._sender.send(self._code_id, False, self._result)
return compile_result[2:]
Expand Down
4 changes: 2 additions & 2 deletions judge_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ async def execute(self) -> MatchResult:
agent_image_tags = await asyncio.gather(
*[t.execute() for t in self._build_tasks]
)

for tag in agent_image_tags:
if tag.split(":")[0] == "E":
match_result = MatchResult(
self._match_id, scores=[0, 0], record_file_path=""
)
await self._reporter.report(match_result)
return match_result

match_result = await self._judger.judge(
self._match_id, self._game_host_image_tag, agent_image_tags
)
Expand Down
45 changes: 29 additions & 16 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,42 @@
from thuai_task_scheduler import ThuaiTaskScheduler
from ws_client import WsClient


async def fetch():
await ThuaiFetcher().fetch("cbd96c3c5a934e0cabac0a3f006a823b")


async def clean():
await ThuaiFetcher().clean()


async def buildTask():
return await BuildTask("cbd96c3c5a934e0cabac0a3f006a823b",
ThuaiFetcher(),
ThuaiBuilder()).execute()

return await BuildTask(
"cbd96c3c5a934e0cabac0a3f006a823b", ThuaiFetcher(), ThuaiBuilder()
).execute()


async def compileTask():
return await CompileTask("cbd96c3c5a934e0cabac0a3f006a823b",
ThuaiFetcher(), ThuaiBuilder(), ThuaiCRSender()).execute()
return await CompileTask(
"cbd96c3c5a934e0cabac0a3f006a823b",
ThuaiFetcher(),
ThuaiBuilder(),
ThuaiCRSender(),
).execute()


async def testWsClient():
ws_client=WsClient("wss://api.dev.saiblo.net/ws/", "thuai8judger",
ThuaiTaskScheduler(),
ThuaiFetcher(),
ThuaiBuilder(),
ThuaiCRSender(),
ThuaiJudger(),
ThuaiReporter(),
"thuai7judger:latest")
ws_client = WsClient(
"wss://api.dev.saiblo.net/ws/",
"thuai8judger",
ThuaiTaskScheduler(),
ThuaiFetcher(),
ThuaiBuilder(),
ThuaiCRSender(),
ThuaiJudger(),
ThuaiReporter(),
"thuai7judger:latest",
)
await ws_client.start()
print("WsClient started")
# # print('qwdhkdjwqieuo')
Expand All @@ -46,10 +58,12 @@ async def testWsClient():
# ws_client.start()
# time.sleep(20)
# ws_client.stop()



async def main():
await testWsClient()


if __name__ == "__main__":
# asyncio.run(fetch())
# asyncio.run(clean())
Expand All @@ -58,4 +72,3 @@ async def main():
# print(asyncio.run(compileTask()))
# testWsClient()
asyncio.run(main())

13 changes: 7 additions & 6 deletions thuai_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ def __init__(self):

# # ID for image, to avoid name conflict
# self.image_id = 0

def _build_image(self, file_path: Path, code_id: str):
"""Block in a separate thread to build Docker image."""
self.client.images.build(path=str(file_path), tag=code_id, rm=True)

async def build(self, file_path: Path, code_id: str) -> str:
# get all image tags
built_image_tags = [tag.split(':')[0]
for image in self.client.images.list()
for tag in image.tags]
built_image_tags = [
tag.split(":")[0]
for image in self.client.images.list()
for tag in image.tags
]
# print(f"Built image tags: {built_image_tags}")
# if not built yet...
if code_id not in built_image_tags:
Expand All @@ -53,8 +55,7 @@ async def build(self, file_path: Path, code_id: str) -> str:
error_msg += log_line
# print(error_msg)
return f"E:{error_msg}"



self.built_images[file_path] = code_id

return self.built_images[file_path]
Expand Down
14 changes: 9 additions & 5 deletions thuai_cr_sender.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
"""Compile result sender for thuai."""

import requests

from base_compile_result_sender import BaseCompileResultSender

# https API
COMPILE_RESULT_API = "https://api.dev.saiblo.net/judger/codes/{}/"


class ThuaiCRSender(BaseCompileResultSender):
"""Compile result sender for thuai."""

async def send(self, code_id: str, status: bool, message: str) -> None:
status_str = "编译成功" if status else "编译失败"
print(f"Sending compile result for {code_id}: {status_str} with message: {message}")
print(
f"Sending compile result for {code_id}: {status_str} with message: {message}"
)
# use method PUT to update compile result
requests.put(COMPILE_RESULT_API.format(code_id),
json={"compile_status": status_str, "compile_message": message})


requests.put(
COMPILE_RESULT_API.format(code_id),
json={"compile_status": status_str, "compile_message": message},
)
6 changes: 3 additions & 3 deletions thuai_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CODE_INFO_API = "https://api.dev.saiblo.net/judger/codes/{}"
CODE_DOWNLOAD_API = "https://api.dev.saiblo.net/judger/codes/{}/download"


class ThuaiFetcher(BaseAgentCodeFetcher):
"""Fetches agent code for THUAI."""

Expand All @@ -18,7 +19,6 @@ async def clean(self) -> None:
# remove fetched_codes
shutil.rmtree("fetched_codes")


async def fetch(self, code_id: str) -> Path:
"""Fetches the code for an agent and saves it to a directory(containing the dockerfile)
Expand Down Expand Up @@ -47,7 +47,7 @@ async def fetch(self, code_id: str) -> Path:
zip_file.close()
else:
raise Exception("Failed to download code")

return Path(f"fetched_codes/{code_id}")

async def list(self) -> Dict[str, Path]:
Expand All @@ -56,4 +56,4 @@ async def list(self) -> Dict[str, Path]:
Returns:
A dictionary mapping code IDs to the paths of their corresponding tarball files
"""
pass
pass
17 changes: 10 additions & 7 deletions thuai_judger.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ async def judge(
server_name = self.get_name("server")

token = 0

print(f"Starting Server {server_name} with image {game_host_image_tag}.")

print(
f"Starting Server {server_name} with image {game_host_image_tag}."
)

# Run server container.
record_folder = self.get_name("record", match_id)
Path(record_folder).mkdir(parents=True)
server_mount = Mount("/record", record_folder, type="bind")

self.client.containers.run(
game_host_image_tag,
# ports={"14514/tcp": 14514},
Expand All @@ -83,8 +85,10 @@ async def judge(
detach=True,
name=server_name,
)

print(f"Server {server_name} is running with image {game_host_image_tag}.")

print(
f"Server {server_name} is running with image {game_host_image_tag}."
)

# Run agent containers, create networks
for agent_image_tag in agent_image_tags:
Expand All @@ -108,13 +112,12 @@ async def judge(
network=network_name,
detach=True,
name=container_name,
remove=True
remove=True,
)
self.judge_containers[match_id].append(container_name)

token = token + 1


# for network in self.judge_networks[match_id]:
# self.client.networks.get(network).connect(server_name)

Expand Down
35 changes: 20 additions & 15 deletions thuai_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
MATCH_REPORT_API = "https://api.dev.saiblo.net/judger/matches/{}/"
NO_RECORD_FILE = "no_record.txt"


class ThuaiReporter(BaseMatchResultReporter):
"""Concrete implementation of BaseMatchResultReporter for THUAI."""

Expand All @@ -17,24 +18,28 @@ async def report(self, result: MatchResult) -> None:
"""
print(f"Reporting match result: {result}")
scores = result.scores
state = '评测成功'
states = [{'position': i, 'status': 'OK', 'code': 0, 'stderr': ''} for i in range(len(scores))]
state = "评测成功"
states = [
{"position": i, "status": "OK", "code": 0, "stderr": ""}
for i in range(len(scores))
]
message = {}
data = {
'message': json.dumps(message),
'state': state,
'scores': json.dumps(scores),
'states': json.dumps(states)
"message": json.dumps(message),
"state": state,
"scores": json.dumps(scores),
"states": json.dumps(states),
}
# json_data = json.dumps(data)
try:
with open(result.record_file_path, 'rb') as dat_file:
files = {'file': (f'{result.match_id}.dat', dat_file)}
response = requests.put(MATCH_REPORT_API.format(result.match_id), files=files, data=data)
with open(result.record_file_path, "rb") as dat_file:
files = {"file": (f"{result.match_id}.dat", dat_file)}
response = requests.put(
MATCH_REPORT_API.format(result.match_id), files=files, data=data
)
except Exception as e:
with open(NO_RECORD_FILE, 'rb') as no_record_file:
files = {'file': (f'{result.match_id}.txt', no_record_file)}
response = requests.put(MATCH_REPORT_API.format(result.match_id), files=files, data=data)



with open(NO_RECORD_FILE, "rb") as no_record_file:
files = {"file": (f"{result.match_id}.txt", no_record_file)}
response = requests.put(
MATCH_REPORT_API.format(result.match_id), files=files, data=data
)
24 changes: 15 additions & 9 deletions thuai_task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
COMPILATION_TASK_PARALLELISM = 5
JUDGE_TASK_PARALLELISM = 1


class ThuaiTaskScheduler(BaseTaskScheduler):
"""Concrete implementation of a task scheduler."""

_compilation_tasks: Dict[str, CompileTask]
_judge_tasks: Dict[str, JudgeTask]
_compilation_tasks_queue: asyncio.Queue
_judge_tasks_queue: asyncio.Queue
_compilation_tasks_loop: asyncio.Task
_judge_tasks_loop: asyncio.Task
_finished_judge_tasks: asyncio.Queue


def __init__(self) -> None:
"""Initialize the task scheduler."""
Expand All @@ -31,9 +31,9 @@ def __init__(self) -> None:
self._compilation_tasks_queue = asyncio.Queue()
self._judge_tasks_queue = asyncio.Queue()
self._finished_judge_tasks = asyncio.Queue()

def can_accept_judge_task(self) -> bool:
return self._judge_tasks_queue.qsize() < JUDGE_TASK_PARALLELISM*2
return self._judge_tasks_queue.qsize() < JUDGE_TASK_PARALLELISM * 2

async def clean(self) -> None:
"""Cleans up scheduled tasks."""
Expand Down Expand Up @@ -63,8 +63,10 @@ async def schedule(self, task: BaseTask) -> str:
raise ValueError("Invalid task type.")
print("Scheduled task: {}".format(task_id))
return task_id

async def task_loop(self, scheduled_tasks_queue: asyncio.Queue, task_parallelism: int) -> None:

async def task_loop(
self, scheduled_tasks_queue: asyncio.Queue, task_parallelism: int
) -> None:
"""Loop for executing compilation tasks."""
print("Task loop started.")
while True:
Expand All @@ -88,13 +90,17 @@ async def task_loop(self, scheduled_tasks_queue: asyncio.Queue, task_parallelism
print("Task {} finished.".format(task_result.match_id))
if task_result:
self._finished_judge_tasks.put_nowait(task_result.match_id)

def get_finished_judge_tasks_queue(self) -> asyncio.Queue:
return self._finished_judge_tasks

async def start(self) -> None:
"""Starts the task scheduler and begins executing tasks."""
self._compilation_tasks_loop = asyncio.create_task(self.task_loop(self._compilation_tasks_queue, COMPILATION_TASK_PARALLELISM))
self._judge_tasks_loop = asyncio.create_task(self.task_loop(self._judge_tasks_queue, JUDGE_TASK_PARALLELISM))
self._compilation_tasks_loop = asyncio.create_task(
self.task_loop(self._compilation_tasks_queue, COMPILATION_TASK_PARALLELISM)
)
self._judge_tasks_loop = asyncio.create_task(
self.task_loop(self._judge_tasks_queue, JUDGE_TASK_PARALLELISM)
)
await self._compilation_tasks_loop
await self._judge_tasks_loop
Loading

0 comments on commit 1c52199

Please sign in to comment.