diff --git a/poetry.lock b/poetry.lock index 2c4db47..7fbd29b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "certifi" @@ -248,6 +248,45 @@ pygments = ">=2.13.0,<3.0.0" [package.extras] jupyter = ["ipywidgets (>=7.5.1,<9)"] +[[package]] +name = "shellingham" +version = "1.5.4" +description = "Tool to Detect Surrounding Shell" +optional = false +python-versions = ">=3.7" +files = [ + {file = "shellingham-1.5.4-py2.py3-none-any.whl", hash = "sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686"}, + {file = "shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de"}, +] + +[[package]] +name = "typer" +version = "0.12.5" +description = "Typer, build great CLIs. Easy to code. Based on Python type hints." +optional = false +python-versions = ">=3.7" +files = [ + {file = "typer-0.12.5-py3-none-any.whl", hash = "sha256:62fe4e471711b147e3365034133904df3e235698399bc4de2b36c8579298d52b"}, + {file = "typer-0.12.5.tar.gz", hash = "sha256:f592f089bedcc8ec1b974125d64851029c3b1af145f04aca64d69410f0c9b722"}, +] + +[package.dependencies] +click = ">=8.0.0" +rich = ">=10.11.0" +shellingham = ">=1.3.0" +typing-extensions = ">=3.7.4.3" + +[[package]] +name = "typing-extensions" +version = "4.12.2" +description = "Backported and Experimental Type Hints for Python 3.8+" +optional = false +python-versions = ">=3.8" +files = [ + {file = "typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d"}, + {file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"}, +] + [[package]] name = "urllib3" version = "2.2.1" @@ -268,4 +307,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "57e46e1249426f51142d4e181221eb271c0b2377b537d569b11b276d138bf6f4" +content-hash = "648c25aae302a168b72548a8558a8ce7e800415bac55c579a58d541cce5696b8" diff --git a/pyproject.toml b/pyproject.toml index 2b978d2..ca753a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,12 +5,18 @@ description = "Downloads and aggregates fresh malware samples collected from OSI authors = ["Robert Thomas", "Prashant Shubham", "Mboula Penda Paul O’neal"] readme = "README.md" + +[tool.poetry.scripts] +dd-run = "your_daily_dose_malware.commands:app" + + + [tool.poetry.dependencies] python = "^3.9" requests = "^2.32.2" -click = "^8.1.7" python-dotenv = "^1.0.1" rich = "^13.7.1" +typer = "^0.12.5" [build-system] requires = ["poetry-core"] diff --git a/your_daily_dose_malware/__main__.py b/your_daily_dose_malware/__main__.py new file mode 100644 index 0000000..30d8eea --- /dev/null +++ b/your_daily_dose_malware/__main__.py @@ -0,0 +1,2 @@ +from your_daily_dose_malware.commands import app +app(prog_name="dd-run") \ No newline at end of file diff --git a/your_daily_dose_malware/backends/malshare.py b/your_daily_dose_malware/backends/malshare.py index 9d5d195..13881c6 100644 --- a/your_daily_dose_malware/backends/malshare.py +++ b/your_daily_dose_malware/backends/malshare.py @@ -1,47 +1,60 @@ import os from datetime import datetime as dt from pathlib import Path +from typing import List import requests import rich from dotenv import load_dotenv -from rich.progress import Progress +from rich.progress import Progress, SpinnerColumn +# from rich import print load_dotenv() API_KEY = os.getenv("MALSHARE_API_KEY") +SELECTED_FILES_AND_CONTENTS = [] -def scrap_malshare(): - """List hashes from the past 24 hours - """ - with Progress() as progress: - if API_KEY is None or API_KEY == "": - return None - try: - response = requests.post( - "https://malshare.com/api.php", - verify=True, - params={"api_key": API_KEY, "action": "getlist"}, - ) - response.raise_for_status() - hashes = response.json() - except requests.RequestException as err: - rich.print(f"[red] Malshare will be skipped. An expected error occurred: {err} ") - return None - rich.print(f"[green] - Malshare: {len(hashes)} Samples") - sha256_ids = [hashe["sha256"] for hashe in hashes] - task = progress.add_task( - "-[green]Downloading Malware Files...", total=len(sha256_ids) - ) - Path("Downloaded-Malwares").mkdir(exist_ok=True) - downloaded_malwares_path = Path("Downloaded-Malwares").absolute() - for sha256_id in sha256_ids: - if (downloaded_malwares_path / f"malware_{sha256_id[:6]}.zip").exists(): - progress.update(task, advance=1) - continue + +class Malshare: + + @classmethod + def parsing_malshare_sha256(cls,fetched_hashes:List[dict],progress: Progress): + sha256_ids = list() + for id_names in fetched_hashes: + sha256_id = id_names["sha256"] + sha256_ids.append(sha256_id) + return sha256_ids + + @classmethod + def get_malshare_treshold(cls,hashes: list[dict], limit: int): + """ + Malshare treshold + """ + if limit == 0 : + return hashes + else: + treshold = hashes[:limit] + return treshold + + + @classmethod + def gather_selected_malwares(cls,new_hashes: list[str], choosen_path: Path , progress: Progress,req_session: requests.Session ): + """ + Download selected malware samples + + Args: + new_hashes (list[str]): list of sha256 hashes + choosen_path (str, optional): path to save the downloaded malware samples. Defaults to None. + progress (Progress, optional): rich progress object. Defaults to None. + + Returns: + None + """ + dl_task = progress.add_task("-[green] gathering malwares ...") + for sha256_id in new_hashes: try: - response = requests.post( + response = req_session.post( "https://malshare.com/api.php", params={"api_key": API_KEY, "action": "getfile", "hash": sha256_id}, verify=True, @@ -49,21 +62,126 @@ def scrap_malshare(): response.raise_for_status() except requests.RequestException as err: raise err - curr_time = dt.now().date().strftime("%Y-%m-%d") if response.status_code == 502: json_response = response.json() if json_response["query_status"] == "file_not_found": - rich.print(f" [red]sha256_hash: {sha256_id[:6]} not found skipping") + rich.print(f"[red]sha256_hash: {sha256_id[:6]} not found skipping") continue if json_response["query_status"] == "illegal_sha256_hash": - rich.print(f" [red]Illegal SHA256 hash provided: {sha256_id[:6]} skipping") + rich.print(f"[red]Illegal SHA256 hash provided: {sha256_id[:6]} skipping") continue - file_path = f"malware_{sha256_id[:6]}_{curr_time}.zip" - final_path = (downloaded_malwares_path/file_path) - with open(file=final_path, mode="wb") as f: - f.write(response.content) - progress.update(task, advance=1) + curr_time = dt.now().date().strftime("%Y-%m-%d") + malware_content = response.content + file_path = f"malware_{sha256_id[:6]}_{curr_time}-{str(malware_content[:4]).replace('\\','').replace(" Path: + """ + Create directory and return path to save the downloaded malware samples + Args: + str_path (str): path to save the downloaded malware samples + + Returns: + Path: path to save the downloaded malware samples + """ + if str_path == "": + (Path()/"Downloaded-Malwares").mkdir(parents=True, exist_ok=True) + return (Path()/"Downloaded-Malwares") + Path(str_path).mkdir(parents=True, exist_ok=True) + return Path(str_path) + + @classmethod + def scrap_malshare(cls,choosen_path: str="", not_twins: bool = False, limit: int = 24): + """ + Scrap malshare.com and download the last 24 hours malware samples + + Args: + choosen_path (str): path to save the downloaded malware samples + not_twins (bool, optional): download already downloaded malware samples. Defaults to False. + + Returns: + None + """ + with Progress( + SpinnerColumn(finished_text="[bold green]finished ✓[/bold green]"), + *Progress.get_default_columns(), + transient=True, + ) as progress: + if API_KEY is None or API_KEY == "": + return None + try: + response = requests.post( + "https://malshare.com/api.php", + verify=True, + params={"api_key": API_KEY, "action": "getlist"}, + ) + response.raise_for_status() + except requests.RequestException as err: + rich.print(f"[red]Malshare will be skipped. An unexpected error occurred: {err} ") + return None + hashes = response.json() + sliced_hashes = cls.get_malshare_treshold(hashes, limit) + rich.print(f"[green] - Malshare: {len(sliced_hashes)} Samples") + sha256_ids = cls.parsing_malshare_sha256(sliced_hashes, progress) + downloaded_malwares_path = cls.create_dir_and_return_path(choosen_path) + with requests.Session() as session: + if not_twins: + new_hashes = cls.catch_new_malwares(sha256_ids, downloaded_malwares_path, progress) + cls.gather_selected_malwares(new_hashes=new_hashes, choosen_path=downloaded_malwares_path, progress=progress,req_session=session) + cls.download_selected_malwares(new_hashes=new_hashes, progress=progress) + else: + new_hashes = sha256_ids + cls.gather_selected_malwares(new_hashes=new_hashes, choosen_path=downloaded_malwares_path, progress=progress,req_session=session) + cls.download_selected_malwares(new_hashes, progress) -if __name__ == "__main__": - scrap_malshare() diff --git a/your_daily_dose_malware/backends/malware_bazaar.py b/your_daily_dose_malware/backends/malware_bazaar.py index 09a2bc6..ce830d7 100644 --- a/your_daily_dose_malware/backends/malware_bazaar.py +++ b/your_daily_dose_malware/backends/malware_bazaar.py @@ -5,67 +5,67 @@ import requests import rich from rich.progress import Progress +import typer +API_KEY = os.getenv("MALSHARE_API_KEY") -def hundred_most_recent(headers_info): - with Progress() as progress: - # TODO Pending Refactor this method to match all_most_recent style - response = requests.post( - "https://mb-api.abuse.ch/api/v1/", - data={"query": "get_recent", "selector": "100"}, - headers=headers_info, - ) - json_response = response.json() - if json_response["query_status"] == "no_selector": - rich.print( - "[red]No selector provided. Please use either time or limit as selector", - ) - if json_response["query_status"] == "unknown_selector": - rich.print("[red]Selector unknown. Please use either time or limit as selector") - return - if json_response["query_status"] == "no_results": - rich.print("[red]Your query yield no results") - return - else: - data_length = len(json_response["data"]) - rich.print(f" [green]Your query yield {data_length} results") - Path("Downloaded-Malwares").mkdir(exist_ok=True) - downloaded_malwares_path = Path("Downloaded-Malwares") - curr_time = dt.now().date().strftime("%Y-%m-%d") - task = progress.add_task( - " - [green]Downloading Malware Files...", - total=len(json_response["data"]) - ) - for data in json_response["data"]: - sha256_name = data["sha256_hash"] - file_path = f"malware_{sha256_name[:6]}_{curr_time}.zip" - if (downloaded_malwares_path/file_path).exists(): - progress.update(task, advance=1) - continue - response = requests.post( - "https://mb-api.abuse.ch/api/v1/", - data={"query": "get_file", "sha256_hash": sha256_name}, - headers=headers_info, - ) - file_path = f"malware_{sha256_name[:6]}_{curr_time}.zip" - path = downloaded_malwares_path/file_path - with open(path.absolute(), "wb+") as f: - f.write(response.content) - progress.update(task, advance=1) -def all_most_recent(headers_info): +class MalwareBazaar: - with Progress() as progress: + @classmethod + def hundred_most_recent(cls,headers_info): + with Progress() as progress: + # TODO Pending Refactor this method to match all_most_recent style + response = requests.post( + "https://mb-api.abuse.ch/api/v1/", + data={"query": "get_recent", "selector": "100"}, + headers=headers_info, + ) + json_response = response.json() - response = requests.post( - "https://bazaar.abuse.ch/export/txt/sha256/recent", - headers=headers_info - ) + if json_response["query_status"] == "no_selector": + rich.print( + "[red]No selector provided. Please use either time or limit as selector", + ) + if json_response["query_status"] == "unknown_selector": + rich.print("[red]Selector unknown. Please use either time or limit as selector") + return + if json_response["query_status"] == "no_results": + rich.print("[red]Your query yield no results") + return + else: + data_length = len(json_response["data"]) + rich.print(f"[green]Your query yield {data_length} results") + Path("Downloaded-Malwares").mkdir(exist_ok=True) + downloaded_malwares_path = Path("Downloaded-Malwares") + curr_time = dt.now().date().strftime("%Y-%m-%d") + task = progress.add_task( + " - [green]Downloading Malware Files...", + total=len(json_response["data"]) + ) + for data in json_response["data"]: + sha256_name = data["sha256_hash"] + file_path = f"malware_{sha256_name[:6]}_{curr_time}.zip" + if (downloaded_malwares_path/file_path).exists(): + progress.update(task, advance=1) + continue + response = requests.post( + "https://mb-api.abuse.ch/api/v1/", + data={"query": "get_file", "sha256_hash": sha256_name}, + headers=headers_info, + ) + file_path = f"malware_{sha256_name[:6]}_{curr_time}.zip" + path = downloaded_malwares_path/file_path + with open(path.absolute(), "wb+") as f: + f.write(response.content) + progress.update(task, advance=1) + @classmethod + def get_new_hashes(cls,malware_response: requests.Response): with open("sha256_names.txt", "wb+") as f: - f.write(response.content) + f.write(malware_response.content) f.seek(0) new_hashes = list() file_lines = [line.strip() for line in f.readlines()] @@ -74,56 +74,69 @@ def all_most_recent(headers_info): if 8 < index < len(file_lines) - 1: new_hashes.append(line) continue - rich.print(f" - [green]Malware Bazaar: {len(new_hashes)} Samples") + return new_hashes + ... - with open("sha256_names.txt", "w") as f: - for line in new_hashes: - f.write(line.decode("utf-8") + "\n") - sha256_hashes = open("sha256_names.txt", "r").readlines() - Path("sha256_names.txt").unlink() - task = progress.add_task( - " - [green]Downloading Malware Files...", total=len(sha256_hashes) - ) - Path("Downloaded-Malwares").mkdir(exist_ok=True) - for sha256_hash in sha256_hashes: - curr_time = dt.now().date().strftime("%Y-%m-%d") - downloaded_malwares_path = Path("Downloaded-Malwares").absolute() - file_path = f"malware_{sha256_hash[:6]}_{curr_time}.zip" - if (downloaded_malwares_path/file_path).exists(): - progress.update(task, advance=1) - continue + @classmethod + def all_most_recent(cls,headers_info): + + with Progress() as progress: response = requests.post( - "https://mb-api.abuse.ch/api/v1/", - params={"query": "get_file", "sha256_hash": sha256_hash}, - headers=headers_info, + "https://bazaar.abuse.ch/export/txt/sha256/recent", + headers=headers_info ) - if response.status_code == 502: - json_response = response.json() - if json_response["query_status"] == "file_not_found": - rich.print(f" [red]sha256_hash: {sha256_hash[:6]} not found skipping") - progress.update(task, advance=1) - continue - if json_response["query_status"] == "illegal_sha256_hash": - rich.print(f" [red]Illegal SHA256 hash provided: {sha256_hash[:6]} skipping") - progress.update(task, advance=1) - continue - targeted_file = f"malware_{sha256_hash[:6]}_{curr_time}.zip" - final_path = downloaded_malwares_path/targeted_file, "wb" - with open(final_path, "wb") as f: - f.write(response.content) - progress.update(task, advance=1) + new_hashes = cls.get_new_hashes(malware_response=response) + rich.print(f"- [green]Malware Bazaar: {len(new_hashes)} Samples") + with open("sha256_names.txt", "w") as f: + for line in new_hashes: + f.write(line.decode("utf-8") + "\n") + sha256_hashes = open("sha256_names.txt", "r").readlines() + print(sha256_hashes) + Path("sha256_names.txt").unlink() + task = progress.add_task( + " - [green]Downloading Malware Files..." + ) + Path("Downloaded-Malwares").mkdir(exist_ok=True) + for sha256_hash in sha256_hashes: + curr_time = dt.now().date().strftime("%Y-%m-%d") + downloaded_malwares_path = Path("Downloaded-Malwares").absolute() + file_path = f"malware_{sha256_hash[:6]}_{curr_time}.zip" + if (downloaded_malwares_path/file_path).exists(): + progress.update(task, advance=100/len(sha256_hashes)) + continue + response = requests.post( + "https://mb-api.abuse.ch/api/v1/", + params={"query": "get_file", "sha256_hash": sha256_hash}, + headers=headers_info, + ) + if response.status_code == 502: + json_response = response.json() + if json_response["query_status"] == "file_not_found": + rich.print(f" [red]sha256_hash: {sha256_hash[:6]} not found skipping") + progress.update(task, advance=1) + continue + if json_response["query_status"] == "illegal_sha256_hash": + rich.print(f" [red]Illegal SHA256 hash provided: {sha256_hash[:6]} skipping") + progress.update(task, advance=1) + continue + targeted_file = f"malware_{sha256_hash[:6]}_{curr_time}.zip" + final_path = downloaded_malwares_path/targeted_file + with open(final_path, "wb") as f: + f.write(response.content) + progress.update(task, advance=1) -def scrap_malware_bazaar(hundred_recent): - if os.getenv("MALWARE_BAZAAR_API_KEY") == "": - rich.print("[red]Please set MALWARE_BAZAAR_API_KEY") - return None - headers = {"API-KEY": os.getenv("MALWARE_BAZAAR_API_KEY")} - if hundred_recent: - hundred_most_recent(headers) - else: - all_most_recent(headers) + @classmethod + def scrap_malware_bazaar(cls,hundred_recent:bool=False): + if API_KEY == "": + rich.print("[red]Please set MALWARE_BAZAAR_API_KEY") + return None + headers = {"API-KEY": API_KEY} + if hundred_recent: + cls.hundred_most_recent(headers) + return typer.Exit(code=1) + else: + cls.all_most_recent(headers) + return typer.Exit(code=1) -if __name__ == "__main__": - scrap_malware_bazaar(hundred_recent=False) diff --git a/your_daily_dose_malware/commands.py b/your_daily_dose_malware/commands.py new file mode 100644 index 0000000..e37d170 --- /dev/null +++ b/your_daily_dose_malware/commands.py @@ -0,0 +1,70 @@ +from typing import Annotated +import typer +from pathlib import Path +from rich.console import Console + +# from .backends import malware_bazaar +from your_daily_dose_malware.backends.malshare import Malshare +from your_daily_dose_malware.constants import MALWARE_BANNER, MALWARE_DESCRIPTION +from your_daily_dose_malware.backends.malware_bazaar import MalwareBazaar + +app = typer.Typer() + +@app.callback() +def callback(): + console = Console() + console.print(MALWARE_BANNER) + console.print(f"{MALWARE_DESCRIPTION} \n") + + +@app.command() +def malshare( + path : str = typer.Option( + None, help="path to save the downloaded malware samples" + ), + not_twins : bool = typer.Option( + False, help="do not download already downloaded malware samples" + ), + limit : int = typer.Option( + 0, help="number of malware samples to download" + ), +): + "download the last 24 hours malware samples from malshare.com" + if path is not None and Path(path).exists(): + Malshare.scrap_malshare( + choosen_path=path, + not_twins=not_twins, + limit=limit + ) + else: + Malshare.scrap_malshare(limit=limit) + + +@app.command(name="m-bazaar") +def malware_bazaar( + hundred: Annotated[ + bool, + typer.Option( + "--hundred", + "-h", + is_flag=True, + ), + ]=False, + most_recent: Annotated[ + bool, + "--most-recent", + "-mr", + typer.Option( + is_flag=True, + ) + ]=False +): + """ + Scrap malshare.com and download the last 24 hours malware samples + """ + if hundred is True and most_recent is False: + MalwareBazaar.scrap_malware_bazaar(hundred_recent=True) + elif hundred is False and most_recent is True: + MalwareBazaar.scrap_malware_bazaar(hundred_recent=False) + else: + raise typer.BadParameter("You must to choose either or options") diff --git a/your_daily_dose_malware/main.py b/your_daily_dose_malware/main.py deleted file mode 100644 index 3f938cc..0000000 --- a/your_daily_dose_malware/main.py +++ /dev/null @@ -1,48 +0,0 @@ -import argparse - -from dotenv import load_dotenv -from rich.console import Console - -from your_daily_dose_malware.backends import malware_bazaar, malshare -from your_daily_dose_malware.constants import MALWARE_BANNER, MALWARE_DESCRIPTION - -load_dotenv() - - -def run_scrapper(args): - malware_bazaar.scrap_malware_bazaar(args.hundred_recent) - malshare.scrap_malshare() - - -def main(): - parser = argparse.ArgumentParser( - description="""Download recent samples from multiple OSINT provider - backends - "MalwareBazaar: all the recent recent malware's uploaded within" - "the last 48 hours sha256 hash" - "MalwareBazaar: By using api from 'https://mb-api.abuse.ch/api/v1/'""" - ) - parser.add_argument( - "-MwBz_48H", - "--last-48H", - action="store_true", - help="get the most recent sha256 hashes", - default=True, - ) - parser.add_argument( - "-MwBz_100", - "--hundred-recent", - action="store_true", - help="""get the most recent hundred recent malware's - uploaded within the last 60 min""", - ) - args = parser.parse_args() - - console = Console() - console.print(MALWARE_BANNER) - console.print(f"{MALWARE_DESCRIPTION} \n") - run_scrapper(args) - - -if __name__ == "__main__": - main()