From a94303557664d06bd086f987883daa743d0b6961 Mon Sep 17 00:00:00 2001 From: Sarieh1 Date: Fri, 1 Nov 2024 07:30:49 +0300 Subject: [PATCH] Scrapping virus exchange --- .../backends/virus_exchange.py | 85 +++++++++++++++++++ your_daily_dose_malware/commands.py | 25 ++++++ 2 files changed, 110 insertions(+) create mode 100644 your_daily_dose_malware/backends/virus_exchange.py diff --git a/your_daily_dose_malware/backends/virus_exchange.py b/your_daily_dose_malware/backends/virus_exchange.py new file mode 100644 index 0000000..0cef331 --- /dev/null +++ b/your_daily_dose_malware/backends/virus_exchange.py @@ -0,0 +1,85 @@ +import os +import requests +import time +from pathlib import Path +from datetime import datetime as dt +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from rich.progress import Progress + +class VirusExchangeScraper: + def __init__(self): + print("init") + self.driver = webdriver.Chrome() + self.login_url = "https://virus.exchange/users/log_in" + self.samples_url = "https://virus.exchange/samples" + self.wait = WebDriverWait(self.driver, 10) + + def login(self, email, password): + # Login to the Virus Exchange site + print('login') + self.driver.get(self.login_url) + email_field = self.wait.until(EC.presence_of_element_located((By.NAME, "user[email]"))) + password_field = self.driver.find_element(By.NAME, "user[password]") + email_field.send_keys(email) + password_field.send_keys(password) + password_field.send_keys(Keys.RETURN) + self.driver.get(self.samples_url) + return 1 + + def get_samples_data(self): + # Wait for the sample list to load + print('getting samples') + self.wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, "li.relative.flex.items-center"))) + + # Find all sample items on the page + sample_elements = self.driver.find_elements(By.CSS_SELECTOR, "li.relative.flex.items-center") + samples = [] + + for element in sample_elements: + sha256 = element.find_element(By.CSS_SELECTOR, "h2 a span.whitespace-nowrap").text.strip() + try: + # Check for the presence of download link and ensure it's ready + download_link = element.find_element(By.CSS_SELECTOR, "a[download]").get_attribute("href") + samples.append({"sha256": sha256, "download_link": download_link}) + except: + print(f"Sample with SHA256 {sha256} is not yet ready. Skipping...") + + return samples + + def download_samples(self, samples): + # Directory setup for downloads + print('downloading samples') + download_dir = Path("Downloaded-Malwares") + download_dir.mkdir(exist_ok=True) + date_str = dt.now().strftime("%Y-%m-%d") + + # Download each sample file with progress + with Progress() as progress: + task = progress.add_task("Downloading samples...", total=len(samples)) + + for sample in samples: + sha256_hash = sample['sha256'] + download_link = sample['download_link'] + file_name = f"malware_{sha256_hash[:6]}_{date_str}.zip" + file_path = download_dir / file_name + + # Skip download if file already exists + if file_path.exists(): + progress.update(task, advance=1) + continue + + response = requests.get(download_link) + + if response.status_code == 200: + with open(file_path, "wb") as f: + f.write(response.content) + progress.update(task, advance=1) + else: + print(f"Failed to download {sha256_hash[:6]}") + + def close(self): + self.driver.quit() diff --git a/your_daily_dose_malware/commands.py b/your_daily_dose_malware/commands.py index 2772202..2741b17 100644 --- a/your_daily_dose_malware/commands.py +++ b/your_daily_dose_malware/commands.py @@ -7,6 +7,8 @@ from your_daily_dose_malware.backends.malshare import Malshare from your_daily_dose_malware.constants import MALWARE_BANNER, MALWARE_DESCRIPTION from your_daily_dose_malware.backends.malware_bazaar import MalwareBazaar +from your_daily_dose_malware.backends.virus_exchange import VirusExchangeScraper + app = typer.Typer() @@ -63,3 +65,26 @@ def malware_bazaar( raise typer.BadParameter( "You must to choose either or options" ) +@app.command(name="virus-exchange") +def virus_exchange( + email: str = typer.Option(..., help="Virus Exchange email"), + password: str = typer.Option(..., help="Virus Exchange password"), +): + """ + Log in to Virus Exchange and scrape malware samples + """ + console = Console() + console.print("Logging in to Virus Exchange...") + + # Initialize the scraper + scraper = VirusExchangeScraper() + + # Attempt to log in + if scraper.login(email, password): + console.print("Login successful! Scraping samples...") + samples = scraper.get_samples_data() + console.print(samples) + scraper.download_samples(samples) + console.print("Sample scraping completed.") + else: + console.print("Login failed. Check your credentials and try again.")