-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathweb_scraper.py
390 lines (319 loc) · 15.9 KB
/
web_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
import requests
from bs4 import BeautifulSoup
import logging
from urllib.parse import quote_plus, urlparse
import os
import validators # for URL validation
from readability import Document # for main content extraction
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from nltk.probability import FreqDist
import nltk
from urllib3.util import Retry
from requests.adapters import HTTPAdapter
import string
import time
import random
from datetime import datetime
from logger_config import logger_config
# Download required NLTK data
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
class WebScraper:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Cache-Control': 'max-age=0',
'Sec-Ch-Ua': '"Not_A Brand";v="8", "Chromium";v="120"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1'
}
# Set up session with retry strategy
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# Initialize NLTK resources
self.stop_words = set(stopwords.words('english'))
self.punctuation = set(string.punctuation)
# Get API keys from environment variables
self.google_api_key = os.getenv('GOOGLE_API_KEY')
self.google_cx = os.getenv('GOOGLE_CX')
# Get logger
self.logger = logger_config.get_scraping_logger()
def extract_keywords(self, text, num_keywords=10):
"""Extract keywords from text using NLTK."""
try:
# Tokenize and convert to lowercase
words = word_tokenize(text.lower())
# Remove stopwords, punctuation, and short words
words = [word for word in words
if word not in self.stop_words
and word not in self.punctuation
and len(word) > 2
and word.isalnum()]
# Calculate word frequencies
freq_dist = FreqDist(words)
# Get most common words
keywords = [word for word, freq in freq_dist.most_common(num_keywords)]
self.logger.debug(f"Extracted keywords: {keywords}")
return keywords
except Exception as e:
self.logger.error(f"Error extracting keywords: {str(e)}")
return []
def is_content_relevant(self, text, query):
"""Check if content is relevant to the query using keyword analysis."""
try:
# Extract keywords from the content
content_keywords = self.extract_keywords(text, num_keywords=15)
# Convert query to lowercase words and remove stopwords
query_words = set(word.lower() for word in query.split()
if word.lower() not in self.stop_words)
# Get first few sentences for context
sentences = sent_tokenize(text)[:3]
first_paragraph = ' '.join(sentences)
self.logger.debug(f"Content keywords: {content_keywords}")
self.logger.debug(f"Query words: {query_words}")
self.logger.debug(f"First paragraph: {first_paragraph}")
# Check relevance criteria
keyword_match = any(word in content_keywords for word in query_words)
has_enough_sentences = len(sentences) >= 3
has_query_words = any(word.lower() in text.lower() for word in query_words)
# Calculate relevance score
matching_keywords = sum(1 for word in query_words if word in content_keywords)
relevance_score = matching_keywords / len(query_words) if query_words else 0
self.logger.debug(f"Relevance score: {relevance_score}")
# Content is relevant if it meets all criteria and has a minimum relevance score
is_relevant = (keyword_match and has_enough_sentences and
has_query_words and relevance_score >= 0.3)
if not is_relevant:
self.logger.debug("Content rejected due to:")
if not keyword_match: self.logger.debug("- No keyword match")
if not has_enough_sentences: self.logger.debug("- Not enough sentences")
if not has_query_words: self.logger.debug("- Query words not found")
if relevance_score < 0.3: self.logger.debug(f"- Low relevance score: {relevance_score}")
return is_relevant
except Exception as e:
self.logger.error(f"Error checking content relevance: {str(e)}")
return False
def is_valid_url(self, url):
"""Enhanced URL validation."""
try:
# Basic URL validation
if not validators.url(url):
return False
# Parse URL
parsed = urlparse(url)
# Check for valid scheme
if parsed.scheme not in ['http', 'https']:
return False
# Check for suspicious TLDs
suspicious_tlds = ['.xyz', '.top', '.win', '.bid']
if any(parsed.netloc.endswith(tld) for tld in suspicious_tlds):
return False
# Check for valid domain structure
if len(parsed.netloc.split('.')) < 2:
return False
return True
except Exception:
return False
def clean_text(self, text):
"""Clean text while preserving structure and readability."""
# Remove extra whitespace within lines while preserving paragraph breaks
lines = text.split('\n')
cleaned_lines = []
for line in lines:
# Clean each line individually
cleaned = ' '.join(word for word in line.split() if word)
if cleaned:
cleaned_lines.append(cleaned)
# Join with double newlines to preserve paragraph structure
return '\n\n'.join(cleaned_lines)
def extract_content_from_element(self, element, is_header=False):
"""Extract and clean content from a BeautifulSoup element."""
text = element.get_text(separator=' ', strip=True)
if not text:
return None
# Clean the text
text = self.clean_text(text)
# Add formatting for headers
if is_header:
text = f"\n## {text}\n"
return text
def extract_paragraphs(self, elements):
"""Extract and format paragraphs from a list of HTML elements."""
content_parts = []
for elem in elements:
if elem.name: # Check if it's a tag
is_header = elem.name.startswith('h')
text = self.extract_content_from_element(elem, is_header)
if text:
content_parts.append(text)
return '\n\n'.join(content_parts) if content_parts else None
def scrape_url(self, url, query=None):
"""Scrape content from a given URL with enhanced content extraction."""
try:
if not self.is_valid_url(url):
self.logger.warning(f"Invalid URL format: {url}")
return None
self.logger.debug(f"Scraping URL: {url}")
# Add random delay to avoid rate limiting
time.sleep(random.uniform(1, 3))
response = self.session.get(url, headers=self.headers, timeout=15)
response.raise_for_status()
# First try to extract using readability
doc = Document(response.text)
title = doc.title()
# Parse the HTML with BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# Remove unwanted elements
for element in soup.find_all(['script', 'style', 'iframe', 'form', 'nav', 'footer']):
element.decompose()
# Try multiple content extraction strategies
content = None
# Strategy 1: Try readability's main content
if not content:
main_content = doc.summary()
content_soup = BeautifulSoup(main_content, 'html.parser')
paragraphs = content_soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
if paragraphs:
# Preserve paragraph structure and spacing
content_parts = []
for p in paragraphs:
# Clean the text while preserving internal spacing
text = ' '.join(p.get_text().split())
if text:
# Add header markers for headers
if p.name.startswith('h'):
text = f"\n## {text}\n"
content_parts.append(text)
content = '\n\n'.join(content_parts)
# Strategy 2: Look for article or main content tags
if not content or len(content.split()) < 50:
main_tags = soup.find_all(['article', 'main', 'div'], class_=lambda x: x and any(word in str(x).lower() for word in ['content', 'article', 'post', 'entry', 'body']))
for tag in main_tags:
paragraphs = tag.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
if paragraphs:
content_parts = []
for p in paragraphs:
text = ' '.join(p.get_text().split())
if text:
if p.name.startswith('h'):
text = f"\n## {text}\n"
content_parts.append(text)
content = '\n\n'.join(content_parts)
if len(content.split()) >= 50:
break
# Strategy 3: Find the div with the most paragraph tags
if not content or len(content.split()) < 50:
divs = soup.find_all('div')
max_p_count = 0
best_div = None
for div in divs:
p_count = len(div.find_all('p'))
if p_count > max_p_count:
max_p_count = p_count
best_div = div
if best_div:
paragraphs = best_div.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
content_parts = []
for p in paragraphs:
text = ' '.join(p.get_text().split())
if text:
if p.name.startswith('h'):
text = f"\n## {text}\n"
content_parts.append(text)
content = '\n\n'.join(content_parts)
if not content:
self.logger.warning(f"Could not extract content from {url}")
return None
# Clean the content while preserving structure
# Replace multiple newlines with double newline
content = '\n\n'.join(line.strip() for line in content.split('\n') if line.strip())
# Basic content validation
if len(content.split()) < 50:
self.logger.warning(f"Content from {url} too short ({len(content.split())} words)")
return None
# Check content relevance if query is provided
if query and not self.is_content_relevant(content, query):
self.logger.warning(f"Content from {url} not relevant to query: {query}")
return None
word_count = len(content.split())
self.logger.info(f"Successfully extracted {word_count} words from {url}")
# Log scraped content in debug mode
logger_config.log_scraped_content(url, title, content)
return content
except Exception as e:
self.logger.error(f"Error scraping URL {url}: {str(e)}")
return None
def search_urls(self, query, num_results=5):
"""
Search for relevant URLs using Google Custom Search API.
Returns a list of URLs related to the query.
"""
try:
if not self.google_api_key or not self.google_cx:
self.logger.error("Google API key or Custom Search Engine ID not found in environment variables")
return []
# Format the query to focus on educational and documentation content
search_query = f"{query} (site:.org OR site:.edu OR site:.io OR site:docs.* OR site:wikipedia.org)"
# Google Custom Search API endpoint
search_url = "https://www.googleapis.com/customsearch/v1"
params = {
'key': self.google_api_key,
'cx': self.google_cx,
'q': search_query,
'num': min(num_results, 10) # Max 10 results per request
}
self.logger.info(f"Searching with query: {search_query}")
response = self.session.get(search_url, params=params, timeout=10)
response.raise_for_status()
search_results = response.json()
if 'items' not in search_results:
self.logger.warning("No search results found")
return []
urls = []
seen_urls = set()
for item in search_results['items']:
url = item['link']
# Validate URL
if not self.is_valid_url(url):
self.logger.debug(f"Skipping invalid URL: {url}")
continue
# Remove tracking parameters
url = url.split('?')[0]
# Skip if we've seen this URL already
if url in seen_urls:
continue
# Skip common non-content URLs
if any(skip in url.lower() for skip in ['/search', '/login', '/signup', '/ads', '/tracking']):
continue
urls.append(url)
seen_urls.add(url)
if len(urls) >= num_results:
break
if not urls:
self.logger.warning(f"No valid URLs found in the search results. Query: {search_query}")
else:
self.logger.info(f"Found {len(urls)} valid URLs")
for i, url in enumerate(urls, 1):
self.logger.debug(f"{i}. {url}")
return urls[:num_results]
except Exception as e:
self.logger.error(f"Error searching URLs: {str(e)}")
import traceback
self.logger.error(traceback.format_exc())
return []