Skip to content

Commit

Permalink
refactor: replace argparse with typer and create an entry script command
Browse files Browse the repository at this point in the history
  • Loading branch information
poneoneo committed Sep 23, 2024
1 parent 7a6835c commit 4be6c07
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 189 deletions.
43 changes: 41 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ description = "Downloads and aggregates fresh malware samples collected from OSI
authors = ["Robert Thomas", "Prashant Shubham", "Mboula Penda Paul O’neal"]
readme = "README.md"


[tool.poetry.scripts]
dd-run = "your_daily_dose_malware.commands:app"



[tool.poetry.dependencies]
python = "^3.9"
requests = "^2.32.2"
click = "^8.1.7"
python-dotenv = "^1.0.1"
rich = "^13.7.1"
typer = "^0.12.5"

[build-system]
requires = ["poetry-core"]
Expand Down
2 changes: 2 additions & 0 deletions your_daily_dose_malware/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from your_daily_dose_malware.commands import app
app(prog_name="dd-run")
198 changes: 158 additions & 40 deletions your_daily_dose_malware/backends/malshare.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,187 @@
import os
from datetime import datetime as dt
from pathlib import Path
from typing import List

import requests
import rich
from dotenv import load_dotenv
from rich.progress import Progress
from rich.progress import Progress, SpinnerColumn
# from rich import print

load_dotenv()

API_KEY = os.getenv("MALSHARE_API_KEY")
SELECTED_FILES_AND_CONTENTS = []


def scrap_malshare():
"""List hashes from the past 24 hours
"""
with Progress() as progress:
if API_KEY is None or API_KEY == "":
return None
try:
response = requests.post(
"https://malshare.com/api.php",
verify=True,
params={"api_key": API_KEY, "action": "getlist"},
)
response.raise_for_status()
hashes = response.json()
except requests.RequestException as err:
rich.print(f"[red] Malshare will be skipped. An expected error occurred: {err} ")
return None
rich.print(f"[green] - Malshare: {len(hashes)} Samples")
sha256_ids = [hashe["sha256"] for hashe in hashes]
task = progress.add_task(
"-[green]Downloading Malware Files...", total=len(sha256_ids)
)
Path("Downloaded-Malwares").mkdir(exist_ok=True)
downloaded_malwares_path = Path("Downloaded-Malwares").absolute()
for sha256_id in sha256_ids:
if (downloaded_malwares_path / f"malware_{sha256_id[:6]}.zip").exists():
progress.update(task, advance=1)
continue

class Malshare:

@classmethod
def parsing_malshare_sha256(cls,fetched_hashes:List[dict],progress: Progress):
sha256_ids = list()
for id_names in fetched_hashes:
sha256_id = id_names["sha256"]
sha256_ids.append(sha256_id)
return sha256_ids

@classmethod
def get_malshare_treshold(cls,hashes: list[dict], limit: int):
"""
Malshare treshold
"""
if limit == 0 :
return hashes
else:
treshold = hashes[:limit]
return treshold


@classmethod
def gather_selected_malwares(cls,new_hashes: list[str], choosen_path: Path , progress: Progress,req_session: requests.Session ):
"""
Download selected malware samples
Args:
new_hashes (list[str]): list of sha256 hashes
choosen_path (str, optional): path to save the downloaded malware samples. Defaults to None.
progress (Progress, optional): rich progress object. Defaults to None.
Returns:
None
"""
dl_task = progress.add_task("-[green] gathering malwares ...")
for sha256_id in new_hashes:
try:
response = requests.post(
response = req_session.post(
"https://malshare.com/api.php",
params={"api_key": API_KEY, "action": "getfile", "hash": sha256_id},
verify=True,
)
response.raise_for_status()
except requests.RequestException as err:
raise err
curr_time = dt.now().date().strftime("%Y-%m-%d")
if response.status_code == 502:
json_response = response.json()
if json_response["query_status"] == "file_not_found":
rich.print(f" [red]sha256_hash: {sha256_id[:6]} not found skipping")
rich.print(f"[red]sha256_hash: {sha256_id[:6]} not found skipping")
continue
if json_response["query_status"] == "illegal_sha256_hash":
rich.print(f" [red]Illegal SHA256 hash provided: {sha256_id[:6]} skipping")
rich.print(f"[red]Illegal SHA256 hash provided: {sha256_id[:6]} skipping")
continue
file_path = f"malware_{sha256_id[:6]}_{curr_time}.zip"
final_path = (downloaded_malwares_path/file_path)
with open(file=final_path, mode="wb") as f:
f.write(response.content)
progress.update(task, advance=1)
curr_time = dt.now().date().strftime("%Y-%m-%d")
malware_content = response.content
file_path = f"malware_{sha256_id[:6]}_{curr_time}-{str(malware_content[:4]).replace('\\','').replace("<!","").replace("<h","")}.zip"
progress.update(dl_task, advance=100/len(new_hashes))
path_and_content = ((choosen_path/file_path),malware_content)
global SELECTED_FILES_AND_CONTENTS
SELECTED_FILES_AND_CONTENTS.append(path_and_content)

@classmethod
def download_selected_malwares(cls,new_hashes: list[str], progress: Progress ):
"""
Download selected malware samples
Args:
new_hashes (list[str]): list of sha256 hashes
choosen_path (str, optional): path to save the downloaded malware samples. Defaults to None.
progress (Progress, optional): rich progress object. Defaults to None.
Returns:
None
"""
dl_task = progress.add_task("-[green] Download gathered malwares ...")
global SELECTED_FILES_AND_CONTENTS
total_file = len(SELECTED_FILES_AND_CONTENTS)
for malwares_path_and_content in SELECTED_FILES_AND_CONTENTS:
with open(file=malwares_path_and_content[0], mode="wb") as f:
f.write(malwares_path_and_content[1])
progress.update(dl_task, advance=100/total_file)


@classmethod
def catch_new_malwares(cls,hashes: list[str], choosen_path: Path ,current_progress: Progress ):
"""
Catch new malware samples since the last fecth from malshare.com to avoid duplicates
Args:
hashes (list): list of sha256 hashes
choosen_path (str, optional): path to save the downloaded malware samples. Defaults to None.
Returns:
list: list of sha256 hashes
"""
new_hashes = []
task = current_progress.add_task(
"-[green]Checking existing malwares ...", total=len(hashes),start=False
)
for sha256_id in hashes:
if (choosen_path / f"malware_{sha256_id[:6]}.zip").exists():
continue
else:
new_hashes.append(sha256_id)
current_progress.start_task(task_id=task)
current_progress.update(task, advance=1)
return new_hashes

@classmethod
def create_dir_and_return_path(cls,str_path: str) -> Path:
"""
Create directory and return path to save the downloaded malware samples
Args:
str_path (str): path to save the downloaded malware samples
Returns:
Path: path to save the downloaded malware samples
"""
if str_path == "":
(Path()/"Downloaded-Malwares").mkdir(parents=True, exist_ok=True)
return (Path()/"Downloaded-Malwares")
Path(str_path).mkdir(parents=True, exist_ok=True)
return Path(str_path)

@classmethod
def scrap_malshare(cls,choosen_path: str="", not_twins: bool = False, limit: int = 24):
"""
Scrap malshare.com and download the last 24 hours malware samples
Args:
choosen_path (str): path to save the downloaded malware samples
not_twins (bool, optional): download already downloaded malware samples. Defaults to False.
Returns:
None
"""
with Progress(
SpinnerColumn(finished_text="[bold green]finished ✓[/bold green]"),
*Progress.get_default_columns(),
transient=True,
) as progress:
if API_KEY is None or API_KEY == "":
return None
try:
response = requests.post(
"https://malshare.com/api.php",
verify=True,
params={"api_key": API_KEY, "action": "getlist"},
)
response.raise_for_status()
except requests.RequestException as err:
rich.print(f"[red]Malshare will be skipped. An unexpected error occurred: {err} ")
return None
hashes = response.json()
sliced_hashes = cls.get_malshare_treshold(hashes, limit)
rich.print(f"[green] - Malshare: {len(sliced_hashes)} Samples")
sha256_ids = cls.parsing_malshare_sha256(sliced_hashes, progress)
downloaded_malwares_path = cls.create_dir_and_return_path(choosen_path)
with requests.Session() as session:
if not_twins:
new_hashes = cls.catch_new_malwares(sha256_ids, downloaded_malwares_path, progress)
cls.gather_selected_malwares(new_hashes=new_hashes, choosen_path=downloaded_malwares_path, progress=progress,req_session=session)
cls.download_selected_malwares(new_hashes=new_hashes, progress=progress)
else:
new_hashes = sha256_ids
cls.gather_selected_malwares(new_hashes=new_hashes, choosen_path=downloaded_malwares_path, progress=progress,req_session=session)
cls.download_selected_malwares(new_hashes, progress)

if __name__ == "__main__":
scrap_malshare()
Loading

0 comments on commit 4be6c07

Please sign in to comment.