-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbase.py
121 lines (109 loc) · 3.89 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import socket
import struct
from typing import Union
import exceptions
import settings
from utils.module_loading import load_backend
class BaseServer(object):
# 服务类型,填 server 或 agent
_TYPE = ''
backend_cache = {}
def call(self, backend_path: str, method: str, *args, **kwargs):
"""
执行钩子的方法
:param backend_path: 钩子的路径
:param method: 钩子对象的方法
:param args:
:param kwargs:
:return:
"""
backend = self.backend_cache.get(backend_path)
if not backend:
backend = load_backend(backend_path)
self.backend_cache[backend_path] = backend
func = getattr(backend, method)
return func(self, *args, **kwargs)
def send(self, sock: socket.socket, data: Union[bytes, bytearray]):
"""
发送数据到 tunnel 客户端
:param sock: tunnel 套接字
:param data: 待发送的数据
:return:
"""
return self.call(
backend_path=settings.CONNECTION_BACKEND,
method=f'{self._TYPE}_send',
sock=sock,
data=data,
)
def recv(self, sock: socket.socket, bufsize: int):
"""
接收某个套接字的数据
:param sock: 需要接收内容的套接字
:param bufsize: 接收的长度
:return:
"""
return self.call(
backend_path=settings.CONNECTION_BACKEND,
method=f'{self._TYPE}_recv',
sock=sock,
bufsize=bufsize,
)
def parse_socks5_addr_port(self, sock: socket.socket) -> tuple[int, str, int]:
"""
处理 socks5 代理协议中请求阶段的 IP 地址和端口
:param sock: 客户端的 socket 连接对象
:return: 地址类型, 地址, 端口
"""
atyp = struct.unpack('!B', self.recv(sock, 1))[0]
# IPV4
if atyp == 0x01:
addr = socket.inet_ntop(socket.AF_INET, self.recv(sock, 4))
# Domain name
elif atyp == 0x03:
domain_length = ord(self.recv(sock, 1))
addr = self.recv(sock, domain_length).decode()
# IPV6
elif atyp == 0x04:
addr = socket.inet_ntop(socket.AF_INET6, self.recv(sock, 6))
else:
raise exceptions.UnknownTypeException('未知的类型: %s' % atyp)
port = struct.unpack('!H', self.recv(sock, 2))[0]
return atyp, addr, port
@staticmethod
def int2bytes(value: int) -> bytes:
"""
把10进制的内存地址的值转成16进制后,再转成 bytes
:param value:
:return:
"""
hex_value = hex(value)[2:]
hex_value = '0' + hex_value if len(hex_value) % 2 else hex_value
return bytes.fromhex(hex_value)
@staticmethod
def bytes2int(value: bytes) -> int:
"""
把 bytes 的值转换成16进制的整数,再转换成10进制的整形
:param value: 16进制整数的 bytes
:return:
"""
return int(value.hex(), 16)
def get_memory_bytes(self, sock: socket.socket) -> bytes:
"""从套接字中获取对应套接字内存的长度,并获取对应套接字的内存地址"""
length = struct.unpack('!B', self.recv(sock, 1))[0]
return self.recv(sock, length)
def sock2bytes(self, sock: socket.socket) -> bytes:
"""
传入一个套接字,根据内存地址转换成 16 进制后的 bytes
:param sock: 待转换的套接字
:return:
"""
return self.int2bytes(id(sock))
def sock2send_data(self, sock: socket.socket) -> bytes:
"""
传入一个套接字,转换成传输协议中的标识 ID
:param sock: 待转换的套接字
:return:
"""
memory_bytes = self.sock2bytes(sock)
return struct.pack('!B', len(memory_bytes)) + memory_bytes