Skip to content

Commit

Permalink
Adding scraper and processor code
Browse files Browse the repository at this point in the history
  • Loading branch information
aksh-patel1 committed Apr 14, 2024
0 parents commit d5159dc
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Ignore .env files
**/.env

# Ignore iconic-range-412912-258f129777ff.json files
**/iconic-range-412912-258f129777ff.json
18 changes: 18 additions & 0 deletions processor/Dockerfile.processor
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Use the official Python image
FROM python:3.8-slim

# Set the working directory in the container
WORKDIR /app

# Copy the Python script, the credentials file, requirements file, and the .env file
COPY processor.py .
COPY iconic-range-412912-258f129777ff.json .
COPY requirements.txt .
COPY .env .
COPY processor.log .

# Install dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Run the processor.py script
CMD ["python", "processor.py"]
Empty file added processor/processor.log
Empty file.
140 changes: 140 additions & 0 deletions processor/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import os
from dotenv import load_dotenv
import json
from datetime import datetime
from bs4 import BeautifulSoup
from google.oauth2 import service_account
from googleapiclient.discovery import build
import boto3
import time
import logging
from logging.handlers import RotatingFileHandler


# Load environment variables from .env file
load_dotenv()

# Configure logging
logging.basicConfig(level=logging.INFO, filename='processor.log', filemode='a',
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)

logger.info(f'\n#### Initializing Processing job at {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} ####')

# Google Sheets API credentials
SCOPES = ['https://www.googleapis.com/auth/spreadsheets']
SERVICE_ACCOUNT_FILE = os.getenv('SERVICE_ACCOUNT_FILE') # Path to your credentials JSON file

# AWS S3
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
BUCKET_NAME = os.getenv("S3_BUCKET")

# Authenticate with Google Sheets API
def authenticate_google_sheets():
creds = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
service = build('sheets', 'v4', credentials=creds)
return service

SHEET = authenticate_google_sheets().spreadsheets()

def update_price(sheet_id, range_name, row_index, price):
value_input_option = 'RAW'

# current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
current_time = datetime.now().strftime("%Y-%m-%d")

value_range_body = {
"values": [
[price, current_time]
]
}

try:
SHEET.values().update(
spreadsheetId=sheet_id,
range=f'{range_name}!C{row_index+1}:D{row_index}',
valueInputOption=value_input_option,
body=value_range_body
).execute()

logger.info(f'For row-{row_index}, Updated price: {row_index}, PriceUpdatedAt: {current_time}\n')
except Exception as e:
logger.error(f"Error updating price for row {row_index}: {e}")

def extract_price(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
script_tag = soup.find('script', {'id': '__NEXT_DATA__'})
if script_tag:
script_content = script_tag.string
json_data = json.loads(script_content)

product_key = 'product:{\"productId\":\"%s\"}' % json_data['props']['pageProps']['id']

if 'props' in json_data and 'pageProps' in json_data['props'] \
and 'apolloState' in json_data['props']['pageProps'] \
and 'ROOT_QUERY' in json_data['props']['pageProps']['apolloState'] \
and product_key in json_data['props']['pageProps']['apolloState']['ROOT_QUERY'] \
and 'productBasicData' in json_data['props']['pageProps']['apolloState']['ROOT_QUERY'][product_key]:
product_data = json_data['props']['pageProps']['apolloState']['ROOT_QUERY'][product_key]['productBasicData']
if 'price' in product_data:
logger.info(f"extracted_price: {product_data['price'].get('value')}")
return product_data['price'].get('value')
return None

def process_page(bucket_name, page_key, sheet_id, range_name, row_index, s3):
logger.info(f'Processing started for row index: {row_index}')

try:
response = s3.get_object(Bucket=bucket_name, Key=page_key)
html_content = response['Body'].read().decode('utf-8')

price = extract_price(html_content)

if price:
update_price(sheet_id, range_name, row_index, price)
else:
logger.warning(f"We are unable to find price for page_{row_index-1}")
except Exception as e:
logger.error(f"Error processing page_{row_index-1}: {e}")


def main():
sheet_id = '1JinOtgZDuD8s8eM0QL72_7mPx3Jhkog59-nyqXLJdm4'
range_name = 'Sheet1'

s3_client = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

response = s3_client.list_objects_v2(Bucket=BUCKET_NAME, Delimiter='/')

# Get the prefixes (folder names)
folders = response.get('CommonPrefixes', [])

# Sort the folders by name and pick the latest one
latest_updated_at = sorted(folders, key=lambda x: x['Prefix'], reverse=True)[0]['Prefix']

values = SHEET.values().get(spreadsheetId=sheet_id, range=range_name).execute().get('values', [])

for row_index, _ in enumerate(values[1:], start=2): # Skip the header row
page_key = f'{latest_updated_at}page_{row_index - 1}.html'
process_page(BUCKET_NAME, page_key, sheet_id, range_name, row_index, s3_client)
time.sleep(0.6)

# Upload logs file to S3
log_file_path = 'processor.log'
s3_key = f'processor.log'

with open(log_file_path, 'rb') as f:
s3_client.put_object(
Bucket=BUCKET_NAME,
Key=s3_key,
Body=f,
ContentType='text/plain'
)

if __name__ == '__main__':
try:
main()
except Exception as e:
logger.error(f"Error: {e}")
7 changes: 7 additions & 0 deletions processor/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
boto3
beautifulsoup4
google-auth
google-auth-httplib2
google-auth-oauthlib
google-api-python-client
python-dotenv
18 changes: 18 additions & 0 deletions scraper/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Use the official Python image
FROM python:3.8-slim

# Set the working directory in the container
WORKDIR /app

# Copy the Python scripts, the credentials file, and the requirements file
COPY scraper.py .
COPY iconic-range-412912-258f129777ff.json .
COPY requirements.txt .
COPY .env .
COPY scraper.log .

# Install dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Run the scraper.py script
CMD ["python", "scraper.py"]
7 changes: 7 additions & 0 deletions scraper/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
boto3
google-auth
google-auth-httplib2
google-auth-oauthlib
google-api-python-client
python-dotenv
requests
Empty file added scraper/scraper.log
Empty file.
134 changes: 134 additions & 0 deletions scraper/scraper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from dotenv import load_dotenv
import os
import requests
from concurrent.futures import ThreadPoolExecutor
import boto3
from datetime import datetime
from google.oauth2 import service_account
from googleapiclient.discovery import build
import logging
import json

# Load environment variables from .env file
load_dotenv()

# Configure logging
logging.basicConfig(level=logging.INFO, filename='scraper.log', filemode='a',
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)

logger.info(f'\n#### Initializing Scraping job at {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} ####')

# AWS
AWS_REGION = os.getenv("AWS_REGION")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
S3_BUCKET = os.getenv("S3_BUCKET")

# Define the function to scrape a single URL synchronously
def scrape_url(url, retries=4):
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"accept": "*/*",
"accept-encoding": "gzip, deflate, br",
"connection": "keep-alive"
}

for _ in range(retries):
try:
response = requests.get(url, headers=headers, timeout=25)
response.raise_for_status() # Raise an exception for 4xx or 5xx status codes
logger.info(f"Successfully scraped page: {url}")
return response.text
except requests.exceptions.RequestException as e:
logger.error(f"Error scraping page: {url}, Error: {e}")

return None

# Authenticate with Google Sheets API
def authenticate_google_sheets():
SCOPES = ['https://www.googleapis.com/auth/spreadsheets.readonly']
SERVICE_ACCOUNT_FILE = os.getenv('SERVICE_ACCOUNT_FILE')
creds = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
service = build('sheets', 'v4', credentials=creds)
return service

# Read URLs from the Google Sheet
def read_urls_from_sheet(sheet_id, service):
sheet = service.spreadsheets()
result = sheet.values().get(spreadsheetId=sheet_id, range='Sheet1!B2:B').execute()
values = result.get('values', [])
urls = [url[0] for url in values]
return urls

# Main function to scrape URLs and store HTML in S3
def main():
# Authenticate with Google Sheets API
service = authenticate_google_sheets()

# Read URLs from the Google Sheet
sheet_id = '1JinOtgZDuD8s8eM0QL72_7mPx3Jhkog59-nyqXLJdm4'
urls = read_urls_from_sheet(sheet_id, service)

# Scrape URLs using multithreading
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(scrape_url, urls))

# Store HTML content in S3
s3_client = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
current_date = datetime.now().strftime('%Y-%m-%d')
for idx, html_content in enumerate(results):
if html_content:
# Store HTML content in S3 bucket
s3_key = f'{current_date}/page_{idx + 1}.html'
s3_client.put_object(
Bucket=S3_BUCKET,
Key=s3_key,
Body=html_content.encode('utf-8'),
ContentType='text/html'
)

logger.info(f'Stored page {idx + 1} HTML in S3')

# Upload logs file to S3
log_file_path = 'scraper.log'
s3_key = 'scraper.log'

with open(log_file_path, 'rb') as f:
s3_client.put_object(
Bucket=S3_BUCKET,
Key=s3_key,
Body=f,
ContentType='text/plain'
)

def trigger_processing_batch():

client = boto3.client('events', region_name=AWS_REGION, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

response = client.put_events(
Entries=[
{
'Source': "scraperapp.batch",
'DetailType': "Batch Job State Change",
'Detail': json.dumps({
"state": ["SUCCEEDED"]
})
},
],
)

return response


if __name__ == "__main__":
try:
main()

response = trigger_processing_batch()
logger.info(f'Response from EventBridge: {response}')

except Exception as e:
logger.error(f"Error: {e}")

0 comments on commit d5159dc

Please sign in to comment.