From 7f51e4635619328d69acd64582bdfa37e46296ff Mon Sep 17 00:00:00 2001 From: JoeanAmier Date: Sun, 7 Jul 2024 15:45:04 +0800 Subject: [PATCH] fix: Update a_bogus generation --- crawlers/douyin/web/abogus.py | 299 +++++++++++++++++++++------------- crawlers/douyin/web/utils.py | 2 +- 2 files changed, 189 insertions(+), 112 deletions(-) diff --git a/crawlers/douyin/web/abogus.py b/crawlers/douyin/web/abogus.py index a19d3483c..c22a5fa01 100644 --- a/crawlers/douyin/web/abogus.py +++ b/crawlers/douyin/web/abogus.py @@ -15,83 +15,25 @@ 1. Changed the ua_code to compatible with the current config file User-Agent string in https://github.com/Evil0ctal/Douyin_TikTok_Download_API/blob/main/crawlers/douyin/web/config.yaml """ +from random import choice from random import randint from random import random from re import compile from time import time -from urllib.parse import urlencode, quote +from urllib.parse import urlencode +from urllib.parse import quote +from gmssl import sm3, func + +__all__ = ["ABogus", ] class ABogus: __filter = compile(r'%([0-9A-F]{2})') __arguments = [0, 1, 14] + __ua_key = "\u0000\u0001\u000e" __end_string = "cus" __version = [1, 0, 1, 5] - __env = [ - 49, - 53, - 51, - 54, - 124, - 55, - 52, - 50, - 124, - 49, - 53, - 51, - 54, - 124, - 56, - 54, - 52, - 124, - 48, - 124, - 48, - 124, - 48, - 124, - 48, - 124, - 49, - 53, - 51, - 54, - 124, - 56, - 54, - 52, - 124, - 49, - 53, - 51, - 54, - 124, - 56, - 54, - 52, - 124, - 49, - 53, - 51, - 54, - 124, - 55, - 52, - 50, - 124, - 50, - 52, - 124, - 50, - 52, - 124, - 87, - 105, - 110, - 51, - 50] + __browser = "1536|742|1536|864|0|0|0|0|1536|864|1536|864|1536|742|24|24|MacIntel" __reg = [ 1937774191, 1226093241, @@ -107,12 +49,55 @@ class ABogus: "s1": "Dkdpgh4ZKsQB80/Mfvw36XI1R25+WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=", "s2": "Dkdpgh4ZKsQB80/Mfvw36XI1R25-WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=", "s3": "ckdp1h4ZKsUB80/Mfvw36XIgR25+WQAlEi7NLboqYTOPuzmFjJnryx9HVGDaStCe", - "s4": "Dkdpgh2ZmsQB80/MfvV36XI1R45-WUAlEixNLwoqYTOPuzKFjJnry79HbGcaStCe"} + "s4": "Dkdpgh2ZmsQB80/MfvV36XI1R45-WUAlEixNLwoqYTOPuzKFjJnry79HbGcaStCe", + } - def __init__(self, ): + def __init__(self, + # user_agent: str = USERAGENT, + platform: str = None, ): self.chunk = [] self.size = 0 self.reg = self.__reg[:] + # self.ua_code = self.generate_ua_code(user_agent) + # Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, + # like Gecko) Chrome/90.0.4430.212 Safari/537.36 + self.ua_code = [ + 76, + 98, + 15, + 131, + 97, + 245, + 224, + 133, + 122, + 199, + 241, + 166, + 79, + 34, + 90, + 191, + 128, + 126, + 122, + 98, + 66, + 11, + 14, + 40, + 49, + 110, + 110, + 173, + 67, + 96, + 138, + 252] + self.browser = self.generate_browser_info( + platform) if platform else self.__browser + self.browser_len = len(self.browser) + self.browser_code = self.char_code_at(self.browser) @classmethod def list_1(cls, random_num=None, a=170, b=85, c=45, ) -> list: @@ -193,47 +178,50 @@ def generate_string_1( def generate_string_2( self, url_params: str, - user_agent: str, + method="GET", start_time=0, end_time=0, ) -> str: a = self.generate_string_2_list( url_params, - user_agent, + method, start_time, end_time, ) e = self.end_check_num(a) - a.extend(self.__env) + a.extend(self.browser_code) a.append(e) return self.rc4_encrypt(self.from_char_code(*a), "y") def generate_string_2_list( self, url_params: str, - user_agent: str, + method="GET", start_time=0, end_time=0, ) -> list: start_time = start_time or int(time() * 1000) end_time = end_time or (start_time + randint(4, 8)) - params_array = self.sum(self.sum(url_params)) - # TODO: 需要编写一个函数来生成ua_code 2024年6月13日17:13:08 - # Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36 - ua_code = [76, 98, 15, 131, 97, 245, 224, 133, 122, 199, 241, 166, 79, 34, 90, 191, 128, 126, 122, 98, 66, 11, 14, 40, 49, 110, 110, 173, 67, 96, 138, 252] + params_array = self.generate_params_code(url_params) + method_array = self.generate_method_code(method) return self.list_4( (end_time >> 24) & 255, params_array[21], - ua_code[23], + self.ua_code[23], (end_time >> 16) & 255, params_array[22], - ua_code[24], + self.ua_code[24], (end_time >> 8) & 255, (end_time >> 0) & 255, (start_time >> 24) & 255, (start_time >> 16) & 255, (start_time >> 8) & 255, (start_time >> 0) & 255, + method_array[21], + method_array[22], + int(end_time / 256 / 256 / 256 / 256) >> 0, + int(start_time / 256 / 256 / 256 / 256) >> 0, + self.browser_len, ) @staticmethod @@ -325,6 +313,11 @@ def list_4( j: int, k: int, m: int, + n: int, + o: int, + p: int, + q: int, + r: int, ) -> list: return [ 44, @@ -335,18 +328,18 @@ def list_4( 0, 24, b, - 58, + n, 0, c, d, 0, - 24, - 97, + 0, + 0, 1, 0, 239, e, - 51, + o, f, g, 0, @@ -363,11 +356,11 @@ def list_4( k, m, 3, - 399, + p, 1, - 399, + q, 1, - 64, + r, 0, 0, 0] @@ -432,10 +425,10 @@ def char_code_at(s): return [ord(char) for char in s] def write(self, e, ): + self.size = len(e) if isinstance(e, str): - e = self.decode_string(e + self.__end_string) + e = self.decode_string(e) e = self.char_code_at(e) - self.size = len(e) if len(e) <= 64: self.chunk = e else: @@ -454,9 +447,7 @@ def sum(self, e, length=60): self.write(e) self.fill(length) self.compress(self.chunk) - a = self.reg_to_array(self.reg) - self.reset() - return a + return self.reg_to_array(self.reg) @classmethod def generate_result_unit(cls, n, s): @@ -475,13 +466,40 @@ def generate_result_end(cls, s, e="s4"): return r @classmethod - def generate_result(cls, s, n, e="s4"): - r = "" - for i in range(n): - b = ((ord(s[i * 3]) << 16) | (ord(s[i * 3 + 1])) - << 8) | ord(s[i * 3 + 2]) - r += cls.generate_result_unit(b, e) - return r + def generate_result(cls, s, e="s4"): + # r = "" + # for i in range(len(s)//4): + # b = ((ord(s[i * 3]) << 16) | (ord(s[i * 3 + 1])) + # << 8) | ord(s[i * 3 + 2]) + # r += cls.generate_result_unit(b, e) + # return r + + r = [] + + for i in range(0, len(s), 3): + if i + 2 < len(s): + n = ( + (ord(s[i]) << 16) + | (ord(s[i + 1]) << 8) + | ord(s[i + 2]) + ) + elif i + 1 < len(s): + n = (ord(s[i]) << 16) | ( + ord(s[i + 1]) << 8 + ) + else: + n = ord(s[i]) << 16 + + for j, k in zip(range(18, -1, -6), + (0xFC0000, 0x03F000, 0x0FC0, 0x3F)): + if j == 6 and i + 1 >= len(s): + break + if j == 0 and i + 2 >= len(s): + break + r.append(cls.__str[e][(n & k) >> j]) + + r.append("=" * ((4 - len(r) % 4) % 4)) + return "".join(r) @classmethod def generate_args_code(cls): @@ -496,12 +514,74 @@ def generate_args_code(cls): a.append(cls.__arguments[2] >> j) return [int(i) & 255 for i in a] + def generate_method_code(self, method: str = "GET") -> list[int]: + return self.sm3_to_array(self.sm3_to_array(method + self.__end_string)) + # return self.sum(self.sum(method + self.__end_string)) + + def generate_params_code(self, params: str) -> list[int]: + return self.sm3_to_array(self.sm3_to_array(params + self.__end_string)) + # return self.sum(self.sum(params + self.__end_string)) + + @classmethod + def sm3_to_array(cls, data: str | list) -> list[int]: + """ + 代码参考: https://github.com/Johnserf-Seed/f2/blob/main/f2/utils/abogus.py + + 计算请求体的 SM3 哈希值,并将结果转换为整数数组 + Calculate the SM3 hash value of the request body and convert the result to an array of integers + + Args: + data (Union[str, List[int]]): 输入数据 (Input data). + + Returns: + List[int]: 哈希值的整数数组 (Array of integers representing the hash value). + """ + + if isinstance(data, str): + b = data.encode("utf-8") + else: + b = bytes(data) # 将 List[int] 转换为字节数组 + + # 将字节数组转换为适合 sm3.sm3_hash 函数处理的列表格式 + h = sm3.sm3_hash(func.bytes_to_list(b)) + + # 将十六进制字符串结果转换为十进制整数列表 + return [int(h[i: i + 2], 16) for i in range(0, len(h), 2)] + + @classmethod + def generate_browser_info(cls, platform: str = "Win32") -> str: + inner_width = randint(1280, 1920) + inner_height = randint(720, 1080) + outer_width = randint(inner_width, 1920) + outer_height = randint(inner_height, 1080) + screen_x = 0 + screen_y = choice((0, 30)) + value_list = [ + inner_width, + inner_height, + outer_width, + outer_height, + screen_x, + screen_y, + 0, + 0, + outer_width, + outer_height, + outer_width, + outer_height, + inner_width, + inner_height, + 24, + 24, + platform, + ] + return "|".join(str(i) for i in value_list) + @staticmethod def rc4_encrypt(plaintext, key): s = list(range(256)) j = 0 - # Key Scheduling Algorithm (KSA) for i in range(256): j = (j + s[i] + ord(key[i % len(key)])) % 256 s[i], s[j] = s[j], s[i] @@ -510,7 +590,6 @@ def rc4_encrypt(plaintext, key): j = 0 cipher = [] - # Pseudo-Random Generation Algorithm (PRGA) for k in range(len(plaintext)): i = (i + 1) % 256 j = (j + s[i]) % 256 @@ -521,8 +600,8 @@ def rc4_encrypt(plaintext, key): return ''.join(cipher) def get_value(self, - url_params: dict, - user_agent: str, + url_params: dict | str, + method="GET", start_time=0, end_time=0, random_num_1=None, @@ -534,15 +613,12 @@ def get_value(self, random_num_2, random_num_3, ) - string_2 = self.generate_string_2( - urlencode(url_params), - user_agent, - start_time, - end_time, - ) + string_2 = self.generate_string_2(urlencode(url_params) if isinstance( + url_params, dict) else url_params, method, start_time, end_time, ) string = string_1 + string_2 - return self.generate_result( - string, 40, "s4") + self.generate_result_end(string, "s4") + # return self.generate_result( + # string, "s4") + self.generate_result_end(string, "s4") + return self.generate_result(string, "s4") if __name__ == "__main__": @@ -550,9 +626,10 @@ def get_value(self, USERAGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36" url_str = "https://www.douyin.com/aweme/v1/web/aweme/detail/?device_platform=webapp&aid=6383&channel=channel_pc_web&pc_client_type=1&version_code=190500&version_name=19.5.0&cookie_enabled=true&browser_language=zh-CN&browser_platform=Win32&browser_name=Firefox&browser_online=true&engine_name=Gecko&os_name=Windows&os_version=10&platform=PC&screen_width=1920&screen_height=1080&browser_version=124.0&engine_version=122.0.0.0&cpu_core_num=12&device_memory=8&aweme_id=7345492945006595379" # 将url参数转换为字典 - url_params = dict([param.split("=") for param in url_str.split("?")[1].split("&")]) + url_params = dict([param.split("=") + for param in url_str.split("?")[1].split("&")]) print(f"URL参数: {url_params}") - a_bogus = bogus.get_value(url_params, USERAGENT) + a_bogus = bogus.get_value(url_params, ) # 使用url编码a_bogus a_bogus = quote(a_bogus, safe='') print(a_bogus) diff --git a/crawlers/douyin/web/utils.py b/crawlers/douyin/web/utils.py index c963b47c5..d5497e86f 100644 --- a/crawlers/douyin/web/utils.py +++ b/crawlers/douyin/web/utils.py @@ -297,7 +297,7 @@ def ab_model_2_endpoint(cls, params: dict, user_agent: str) -> str: raise TypeError("参数必须是字典类型") try: - ab_value = AB().get_value(params, user_agent) + ab_value = AB().get_value(params, ) except Exception as e: raise RuntimeError("生成A-Bogus失败: {0})".format(e))