Skip to content

Commit

Permalink
v1.8
Browse files Browse the repository at this point in the history
  • Loading branch information
WaterEmissary committed Jan 2, 2025
1 parent 0f5f623 commit 5f732f0
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 75 deletions.
234 changes: 180 additions & 54 deletions WatchDog_QT.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import sys
import time
import shlex
import signal
import psutil
import pickle
Expand All @@ -20,17 +21,19 @@
from ps_dialog import Ui_Dialog # pyside6-uic .\process_setup.ui -o .\ps_dialog.py
# pyinstaller -w -F -i ./ico.png --add-data "ico.png;." -n WatchDog.exe .\WatchDog_QT.py

# todo:
# todo: 右键启动看门狗和禁止未正确显示, 后台启动无法正常拉起程序

DEFAULT_CONFIG = {
'AUTO_HIDDEN': False,
'MEM_UNIT': 0,
'FLUSH_TIME': 0.5,
'LISTENING': {},
'REOPEN_OPT': 0
'REOPEN_OPT': 1,
'START_WAY': 1
}
MEM_UNIT_LIST = ['MB', 'GB']
REOPEN_LIST = ['无操作', '重启软件']
START_WAY = ['cmd', 'powershell']
# 临时文件锁路径
temp_file_path = os.path.join(tempfile.gettempdir(), ".mydog")

Expand All @@ -46,18 +49,21 @@
['v1.4', '添加了浏览功能,一键跳转到软件目录'], ['v1.41', '修改浏览为打开工作目录'],
['v1.5', '添加了移除前询问功能'], ['v1.6', "添加重复打开相关功能, 添加单进程右键菜单"],
['v1.7', '修复了在某些情况下无法正常拉起进程的bug'],
['v1.8', '添加了排序功能, 添加了自动创建启动脚本功能']]
['v1.8', '添加了排序功能, 添加了自动创建启动脚本功能'],
['v1.9', '更改启动功能到WMI中, 添加了对powershell支持']]

# WMI控制程序
class WMI:
def __init__(self, flush_time=0.1):
def __init__(self, flush_time=0.1, qt=None):
self.execution = False
self.sleep_time = flush_time
self.processes_dict = {}
self.run_time_dict = {}
self.system_usage = {'cpu_percent': 0.0, 'memory_percent': 0.0}
threading.Thread(target=self.get_processes).start()
threading.Thread(target=self.get_system_usage).start()
self.qt = qt
threading.Thread(target=self.get_processes, daemon=True).start()
threading.Thread(target=self.get_system_usage, daemon=True).start()
threading.Thread(target=self.wmi_watch_dog, daemon=True).start()

# 获取系统运行状态
def get_system_usage(self):
Expand Down Expand Up @@ -88,6 +94,51 @@ def get_processes(self):
pythoncom.CoUninitialize() # 卸载 COM 库
time.sleep(self.sleep_time)

# wmi启动
def wmi_watch_dog(self):
while True:
# 获取在监听的程序列表
self.qt.load_config()
listening = self.qt.config.get('LISTENING')
self.qt.process_list = []
# 遍历存储的进程, 从实时进程列表中获取对应的数据
for index, (name, info) in enumerate(listening.items()):
# 初始化数据
process = dict(
id=index,
use_listen=info.get('use_listen'),
other_name=info.get('other_name'),
exe_path=info.get('exe_path'),
work_dir=info.get('work_dir'),
arguments=info.get('arguments'),
hidden=info.get('hidden'),
run_status=False,
pid='--',
name='--',
command_line='--',
thread_count='--',
cpu_percent='--',
memory_usage='--',
visual_memory='--',
running_time='--',
)
if self.check_process(name): # 进程存在, 更新数据
process['run_status'] = True
process.update(self.optimize_process_data(name, self.qt.config.get('MEM_UNIT')))

if process.get('use_listen') and self.execution is False: # 如果进程启用监听状态, 并且不在执行状态
if self.processes_dict: # 至少已经检查完一次进程状态
if process.get('run_status'): # 如果进程在运行, 不做任何操作
pass
else: # 进程不存在, 启动
# self.start_process(process)
self.start_process(process)
# time.sleep(1)

self.qt.process_list.append(process)
self.qt.update_table_row(process)
time.sleep(self.qt.config.get('FLUSH_TIME'))

# 将进程对象转换成实际的数据
def optimize_process_data(self, process_name, mem_unit=0):
process = self.processes_dict.get(process_name.split('.')[0], None)
Expand Down Expand Up @@ -122,7 +173,10 @@ def check_process(self, process_name):
def start_process(self, process):
def thread_run():
self.execution = True
self._start_process(process)
if self.qt.config.get('START_WAY') == 0:
self._start_process(process)
elif self.qt.config.get('START_WAY') == 1:
self._start_process_powershell(process)
time.sleep(3)
self.execution = False

Expand All @@ -131,8 +185,7 @@ def thread_run():
threading.Thread(target=thread_run).start()

# 启动进程
@staticmethod
def _start_process(process):
def _start_process(self, process):
exe_path = process.get('exe_path')
work_dir = process.get('work_dir')
if not work_dir:
Expand All @@ -145,6 +198,51 @@ def _start_process(process):
print(command)
subprocess.Popen(command, shell=True, cwd=work_dir)

# 使用powershell启动
def _start_process_powershell(self, process):
"""使用 PowerShell 启动进程,可控制是否隐藏窗口。"""

exe_path = process.get('exe_path')
work_dir = process.get('work_dir')
if not work_dir:
work_dir = None
arguments = process.get('arguments')
hidden = process.get('hidden')

# 使用 shlex.quote() 安全地转义参数,防止命令注入
quoted_exe_path = shlex.quote(exe_path)


# 构建 PowerShell 命令
powershell_command = [
"powershell.exe",
"-NoProfile", # 可选:不加载 PowerShell 配置文件,提高启动速度
"-Command",
"Start-Process",
"-FilePath", quoted_exe_path,
]
# 如果有参数,再传参数
if arguments:
quoted_arguments = " ".join(shlex.quote(arg) for arg in shlex.split(arguments))
powershell_command.append("-ArgumentList")
powershell_command.append(quoted_arguments)

if work_dir:
powershell_command.extend(["-WorkingDirectory", shlex.quote(work_dir)])

if hidden:
powershell_command.extend(["-WindowStyle", "Hidden"])

command = " ".join(powershell_command)
print(command) # 打印最终执行的命令,方便调试
try:
subprocess.Popen(powershell_command,
creationflags=subprocess.CREATE_NO_WINDOW if hidden else 0) # powershell本身窗口也隐藏
except FileNotFoundError:
print(f"Error: PowerShell not found.")
except subprocess.SubprocessError as e:
print(f"Error executing command: {e}")

# 停止进程任务流
def stop_process(self, process):
def thread_run():
Expand Down Expand Up @@ -209,7 +307,7 @@ def __init__(self):
self.function_init()
self.init_tray()

self.wmi = WMI()
self.wmi = WMI(qt=self)

self.flush_data()

Expand Down Expand Up @@ -283,7 +381,7 @@ def create_start_script(self):
self.show_message("脚本已存在, 请勿重复创建!", level='alarm')
else:
with open("./start_dog.bat", "w", encoding='utf-8') as f:
f.write(f'''@echo off\nstart "" {os.path.join(os.getcwd(), 'WatchDog.exe')}''')
f.write(f'''@echo off\nstart "" "{os.path.join(os.getcwd(), 'WatchDog.exe')}"''')
self.show_message("脚本已在当前目录创建成功,如需移动请自行修改")

# QT_切换开机自启状态
Expand Down Expand Up @@ -355,6 +453,7 @@ def _init(self):
self.ui.show_version_label.setStyleSheet("""color: #888888""")
self.ui.autoStartUpCheckBox.setChecked(self.AUTO_START)
self.ui.create_start_script_button.clicked.connect(self.create_start_script)
self.ui.start_way_combobox.addItems(START_WAY)
self.ui.FullSetupMemComboBox.addItems(MEM_UNIT_LIST)
self.ui.FullSetupReopenComboBox.addItems(REOPEN_LIST)
self.ui.FullSetupFlushSpin.setDecimals(1)
Expand All @@ -364,6 +463,7 @@ def _init(self):

self.ui.autoStartUpCheckBox.stateChanged.connect(self.switch_start_up)
self.ui.FullSetup_HiddenButton.toggled.connect(self.setup_auto_hidden_checkbox)
self.ui.start_way_combobox.currentIndexChanged.connect(self.setup_start_way)
self.ui.FullSetupMemComboBox.currentIndexChanged.connect(self.setup_switch_mem_unit)
self.ui.FullSetupReopenComboBox.currentIndexChanged.connect(self.setup_switch_reopen)
self.ui.FullSetupFlushSpin.valueChanged.connect(self.setup_flush_spin)
Expand All @@ -375,6 +475,8 @@ def function_init(self):
self.auto_check_config()
# 读取配置
self.load_config()
# 自动升级配置
self.update_config()
# 初始化表格
self.create_table_row()

Expand Down Expand Up @@ -463,10 +565,26 @@ def save_config(self):
pickle.dump(self.config, f)
self.show_message("保存成功")

# 功能_自动升级配置文件
def update_config(self):
updated = False
for k, v in DEFAULT_CONFIG.items():
if k not in self.config.keys():
updated = True
self.config[k] = v
if updated:
self.save_config()
else:
pass

# 设置_自动隐藏复选框
def setup_auto_hidden_checkbox(self, toggled):
self.config['AUTO_HIDDEN'] = toggled

# 设置_更换启动方式
def setup_start_way(self, index):
self.config['START_WAY'] = index

# 设置_更换内存全局单位
def setup_switch_mem_unit(self, index):
self.config['MEM_UNIT'] = index
Expand Down Expand Up @@ -527,45 +645,45 @@ def flush_data(self):
self.ui.MEMusageLabel.setText(f" [内存]: {self.wmi.system_usage.get('memory_percent')} %")
self.ui.CPUusageLabel.setStyleSheet(f"""color: {self.get_health_color(self.wmi.system_usage.get('cpu_percent'))};""")
self.ui.MEMusageLabel.setStyleSheet(f"""color: {self.get_health_color(self.wmi.system_usage.get('memory_percent'))};""")
# 获取在监听的程序列表
self.load_config()
listening = self.config.get('LISTENING')
self.process_list = []
# 遍历存储的进程, 从实时进程列表中获取对应的数据
for index, (name, info) in enumerate(listening.items()):
# 初始化数据
process = dict(
id=index,
use_listen=info.get('use_listen'),
other_name=info.get('other_name'),
exe_path=info.get('exe_path'),
work_dir=info.get('work_dir'),
arguments=info.get('arguments'),
hidden=info.get('hidden'),
run_status=False,
pid='--',
name='--',
command_line='--',
thread_count='--',
cpu_percent='--',
memory_usage='--',
visual_memory='--',
running_time='--',
)
if self.wmi.check_process(name): # 进程存在, 更新数据
process['run_status'] = True
process.update(self.wmi.optimize_process_data(name, self.config.get('MEM_UNIT')))

if process.get('use_listen') and self.wmi.execution is False: # 如果进程启用监听状态, 并且不在执行状态
if self.wmi.processes_dict: # 至少已经检查完一次进程状态
if process.get('run_status'): # 如果进程在运行, 不做任何操作
pass
else: # 进程不存在, 启动
self.wmi.start_process(process)
time.sleep(1)

self.process_list.append(process)
self.update_table_row(process)
# # 获取在监听的程序列表
# self.load_config()
# listening = self.config.get('LISTENING')
# self.process_list = []
# # 遍历存储的进程, 从实时进程列表中获取对应的数据
# for index, (name, info) in enumerate(listening.items()):
# # 初始化数据
# process = dict(
# id=index,
# use_listen=info.get('use_listen'),
# other_name=info.get('other_name'),
# exe_path=info.get('exe_path'),
# work_dir=info.get('work_dir'),
# arguments=info.get('arguments'),
# hidden=info.get('hidden'),
# run_status=False,
# pid='--',
# name='--',
# command_line='--',
# thread_count='--',
# cpu_percent='--',
# memory_usage='--',
# visual_memory='--',
# running_time='--',
# )
# if self.wmi.check_process(name): # 进程存在, 更新数据
# process['run_status'] = True
# process.update(self.wmi.optimize_process_data(name, self.config.get('MEM_UNIT')))
#
# if process.get('use_listen') and self.wmi.execution is False: # 如果进程启用监听状态, 并且不在执行状态
# if self.wmi.processes_dict: # 至少已经检查完一次进程状态
# if process.get('run_status'): # 如果进程在运行, 不做任何操作
# pass
# else: # 进程不存在, 启动
# self.wmi.start_process(process)
# time.sleep(1)
#
# self.process_list.append(process)
# self.update_table_row(process)
# 更新当前选中进程的按钮状态
process = self.get_process_by_selected()
if process:
Expand All @@ -583,10 +701,14 @@ def show_message(self, message, timeout=2000, level='normal'):

# QT_获取选中行的进程运行时信息
def get_process_by_selected(self):
row = self.get_selected_row()
if row == -1:
try:
row = self.get_selected_row()
if row == -1:
return None
return self.process_list[row]
except:
return None
return self.process_list[row]


# QT_选中行内容
def table_row_select(self):
Expand Down Expand Up @@ -624,6 +746,7 @@ def switch_page(self, page):
self.load_config()
self.ui.autoStartUpCheckBox.setChecked(self.AUTO_START)
self.ui.FullSetup_HiddenButton.setChecked(self.config.get('AUTO_HIDDEN'))
self.ui.start_way_combobox.setCurrentIndex(self.config.get('START_WAY'))
self.ui.FullSetupMemComboBox.setCurrentIndex(self.config.get('MEM_UNIT'))
self.ui.FullSetupReopenComboBox.setCurrentIndex(self.config.get('REOPEN_OPT'))
self.ui.FullSetupFlushSpin.setValue(self.config.get('FLUSH_TIME'))
Expand Down Expand Up @@ -762,7 +885,10 @@ def show_watchdog_list_menu(self, pos):
label_action.setDefaultWidget(label)
menu.addAction(label_action)
menu.addSeparator()
use = menu.addAction("启用看门狗")
if process.get('use_listen'):
use = menu.addAction("禁用看门狗")
else:
use = menu.addAction("启用看门狗")
remove = menu.addAction("移除")
switch = menu.addAction("停止" if process.get('run_status') else "启动")
restart = menu.addAction("重启")
Expand Down
Loading

0 comments on commit 5f732f0

Please sign in to comment.