forked from DvaMishkiLapa/VKArchiveDownloader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_downloader.py
242 lines (219 loc) · 10.4 KB
/
data_downloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import asyncio
from configparser import ConfigParser
from os.path import join
from traceback import format_exc
from typing import Coroutine, Dict
import aiofiles
import aiohttp
from bs4 import BeautifulSoup
import tools
from logger import create_logger
from latest_user_agents import get_random_user_agent
config = ConfigParser()
config_read = config.read('config.ini', encoding='utf8')
if config_read is None:
logger = create_logger('logs/vk_parser.log', 'data_downloader', 'DEBUG')
else:
log_level = config['main_parameters'].get('log_level', 'DEBUG')
logger = create_logger('logs/vk_parser.log', 'data_downloader', log_level)
error_titles = [
'Ошибка | ВКонтакте',
'Ошибка',
'Error | VK',
'Error'
]
error_titles_html = [f'<title>{x}</title>' for x in error_titles]
def get_response_info(content_type: str) -> Dict[str, str]:
'''
Возвращает информацию из заголовка запроса `content-type`
`content_type`: заголовок запроса
Максимальная информация, которая может быть возвращена:
```
{
'encoding': 'Кодировка контента, если он текстовый, иначе ключ будет отсутствовать',
'data_type': 'Тип контента, например, text или image',
'extension': 'Конкретное расширение для контента, например, html или gif',
'full_type_info': 'Комбинация data_type и extension через /'
}
```
'''
result = {}
find_res = content_type.find('charset=')
if find_res != -1:
result.update({'encoding': content_type[find_res + len('charset='):]})
find_res = content_type.find(';')
if find_res != -1:
full_type_info = content_type[:find_res]
data_type, extension = full_type_info.split('/')
else:
full_type_info = content_type
data_type, extension = full_type_info.split('/')
result.update({
'data_type': data_type,
'extension': extension,
'full_type_info': full_type_info
})
return result
def get_file_name_by_link(link: str) -> str | None:
try:
if '?extra=' in link:
return link.split('/')[-1].split('?extra=')[0]
elif '?size=' in link:
return link.split('/')[-1].split('?size=')[0]
else:
return link.split('/')[-1]
except Exception:
return None
def check_vk_title_error(soup: BeautifulSoup) -> str | bool:
'''
Проверяет наличие ошибки доступа к содержимому VK
`soup`: экземпляр `BeautifulSoup` для работы с `HTML` контентом
Возвращает:
- `True`: Ошибки нет
- `False`: Ошибки есть (ошибки доступа, недоступности, скрытия)
'''
mes = soup.find('div', class_='message_page_title')
if mes is not None:
if any(ext in mes.text for ext in error_titles):
details = soup.find('div', class_='message_page_body').text.strip().split('\n')[0]
return details
return False
async def find_link_by_url(session: aiohttp.ClientSession, url: str, pattern: str, cookies=None) -> str:
'''
Находит ссылку на файл из документа VK. Если не найдено, возвращает переданный `url`
`session`: сессия
`url`: ссылка на документ VK, где нужно найти ссылку
`pattern`: паттерн для поиска
- `doc`: паттерн для поиска ссылок в документах
`cookies` куки для `aiohttp.ClientResponse`
'''
async with session.get(url, timeout=60, cookies=cookies) as response:
if 'doc' in pattern:
doc_url_pattern = 'docUrl":"'
doc_buy_pattern = '","docBuyLink'
assert response.status == 200, f'Response status: {response.status}'
if 'text/html' in response.headers['content-type']:
text = await response.text()
assert not any(ext in text for ext in error_titles_html), 'Ошибка доступа к документу'
first = text.find(doc_url_pattern)
second = text.find(doc_buy_pattern)
return text[first+len(doc_url_pattern):second].replace('\/', '/')
elif 'photo' in pattern:
soup = BeautifulSoup(await response.text(), 'html.parser')
check = check_vk_title_error(soup)
assert not check, f'Ошибка доступа к фото: {check}'
link = soup.find('meta', attrs={'name': 'og:image:secure_url'})
if link:
return link['value']
redirect_url = str(response.url)
if redirect_url != url:
return redirect_url
return url
async def downloader(response: aiohttp.ClientResponse, path: str, name: str) -> Coroutine:
'''
Скачивает файл из `response`:
`response`: ответ на запрос
`path`: путь, куда будет сохранен файл
`name`: имя сохраняемого файла
'''
block_size = 16384
async with aiofiles.open(join(path, name), 'wb') as f:
while True:
data = await response.content.read(block_size)
if not data:
break
await f.write(data)
async def get_info(url: str, save_path: str, file_name: str, sema: asyncio.BoundedSemaphore, cookies=None) -> Dict[str, str] | None:
'''
Скачивает файл из `response`, возвращая о нем информацию:
```
{
'url': URL скаченного файла,
'file_info': информация о типе файла
}
```
`url`: ссылка на файл или ресурс
`save_path`: путь до папки, куда будет сохранен файл
`file_name`: имя файла
`sema`: семафор для асинхронного скачивания
`cookies` куки для `aiohttp.ClientSession`
'''
try:
if 'vk.com/video' in url:
return {'url': url, 'file_info': 'vk_video'}
elif 'vk.com/id' in url or 'vk.com/public' in url:
return {'url': url, 'file_info': 'vk_contact'}
elif 'vk.com/story' in url:
return {'url': url, 'file_info': 'vk_story'}
elif 'github.com' in url:
return {'url': url, 'file_info': 'github_link'}
elif 'aliexpress.com' in url:
return {'url': url, 'file_info': 'aliexpress_link'}
elif 'pastebin.com' in url:
return {'url': url, 'file_info': 'pastebin_link'}
elif 'drive.google.com' in url:
return {'url': url, 'file_info': 'gdrive_link'}
elif 'google.com' in url:
return {'url': url, 'file_info': 'google_link'}
elif 'wikipedia.org' in url:
return {'url': url, 'file_info': 'wikipedia_link'}
elif 'pornhub.com' in url:
return {'url': url, 'file_info': '🍓'}
elif 't.me' in url:
return {'url': url, 'file_info': 'telegram_contact'}
elif 'dns-shop.ru' in url:
return {'url': url, 'file_info': 'dns_shop_link'}
headers = {
'Accept-Language': 'ru',
'User-Agent': get_random_user_agent()
}
async with sema, aiohttp.ClientSession(headers=headers) as session:
async with session.get(url, timeout=45) as response:
assert response.status == 200, f'Response status: {response.status}'
if any(t in response.headers['content-type'] for t in ('image', 'audio')):
response_info = get_response_info(response.headers['content-type'])
download_path = join(save_path, response_info['full_type_info'])
tools.create_folder(download_path)
download_file_name = get_file_name_by_link(str(response.url))
if download_file_name is None:
download_file_name = f'{file_name}.{response_info["extension"]}'
await asyncio.create_task(
downloader(
response=response,
path=download_path,
name=download_file_name
)
)
return {'url': str(response.url), 'file_info': response_info['full_type_info']}
target_content_type = response.headers['content-type']
if 'text/html' in target_content_type:
if 'vk.com/doc' in url:
find_res = await asyncio.create_task(find_link_by_url(session, url, 'doc', cookies))
elif 'vk.com/photo':
find_res = await asyncio.create_task(find_link_by_url(session, url, 'photo', cookies))
if find_res == url:
return {'url': find_res, 'file_info': 'not_parse'}
async with session.get(find_res, timeout=900) as response:
response_info = get_response_info(response.headers['content-type'])
download_path = join(save_path, response_info['full_type_info'])
tools.create_folder(download_path)
download_file_name = get_file_name_by_link(find_res)
if download_file_name is None:
download_file_name = f'{file_name}.{response_info["extension"]}'
await asyncio.create_task(
downloader(
response=response,
path=download_path,
name=download_file_name
)
)
return {'url': str(response.url), 'file_info': response_info['full_type_info']}
return {'url': url, 'file_info': 'not_parse'}
except asyncio.TimeoutError as e:
logger.error(f'Ошибка 🔗 {url}: тайм-аут скачивания')
logger.debug(format_exc())
return {'url': url, 'file_info': 'timeout_error'}
except Exception as e:
logger.error(f'Ошибка 🔗 {url}: {e}')
logger.debug(format_exc())
return {'url': url, 'file_info': 'error'}