Skip to content

Commit

Permalink
style: make baby flake happy
Browse files Browse the repository at this point in the history
  • Loading branch information
poneoneo committed Sep 23, 2024
1 parent 4be6c07 commit e10870f
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 80 deletions.
3 changes: 2 additions & 1 deletion your_daily_dose_malware/__main__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from your_daily_dose_malware.commands import app
app(prog_name="dd-run")

app(prog_name="dd-run")
87 changes: 55 additions & 32 deletions your_daily_dose_malware/backends/malshare.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,34 @@
SELECTED_FILES_AND_CONTENTS = []



class Malshare:

@classmethod
def parsing_malshare_sha256(cls,fetched_hashes:List[dict],progress: Progress):
def parsing_malshare_sha256(cls, fetched_hashes: List[dict], progress: Progress):
sha256_ids = list()
for id_names in fetched_hashes:
sha256_id = id_names["sha256"]
sha256_ids.append(sha256_id)
return sha256_ids

@classmethod
def get_malshare_treshold(cls,hashes: list[dict], limit: int):
"""
Malshare treshold
def get_malshare_treshold(cls, hashes: list[dict], limit: int):
"""
if limit == 0 :
Malshare treshold
"""
if limit == 0:
return hashes
else:
treshold = hashes[:limit]
return treshold


@classmethod
def gather_selected_malwares(cls,new_hashes: list[str], choosen_path: Path , progress: Progress,req_session: requests.Session ):
def gather_selected_malwares(
cls,
new_hashes: list[str],
choosen_path: Path,
progress: Progress,
req_session: requests.Session,
):
"""
Download selected malware samples
Expand Down Expand Up @@ -68,18 +71,20 @@ def gather_selected_malwares(cls,new_hashes: list[str], choosen_path: Path , pro
rich.print(f"[red]sha256_hash: {sha256_id[:6]} not found skipping")
continue
if json_response["query_status"] == "illegal_sha256_hash":
rich.print(f"[red]Illegal SHA256 hash provided: {sha256_id[:6]} skipping")
rich.print(
f"[red]Illegal SHA256 hash provided: {sha256_id[:6]} skipping"
)
continue
curr_time = dt.now().date().strftime("%Y-%m-%d")
malware_content = response.content
file_path = f"malware_{sha256_id[:6]}_{curr_time}-{str(malware_content[:4]).replace('\\','').replace("<!","").replace("<h","")}.zip"
progress.update(dl_task, advance=100/len(new_hashes))
path_and_content = ((choosen_path/file_path),malware_content)
progress.update(dl_task, advance=100 / len(new_hashes))
path_and_content = ((choosen_path / file_path), malware_content)
global SELECTED_FILES_AND_CONTENTS
SELECTED_FILES_AND_CONTENTS.append(path_and_content)

@classmethod
def download_selected_malwares(cls,new_hashes: list[str], progress: Progress ):
def download_selected_malwares(cls, new_hashes: list[str], progress: Progress):
"""
Download selected malware samples
Expand All @@ -97,11 +102,12 @@ def download_selected_malwares(cls,new_hashes: list[str], progress: Progress ):
for malwares_path_and_content in SELECTED_FILES_AND_CONTENTS:
with open(file=malwares_path_and_content[0], mode="wb") as f:
f.write(malwares_path_and_content[1])
progress.update(dl_task, advance=100/total_file)

progress.update(dl_task, advance=100 / total_file)

@classmethod
def catch_new_malwares(cls,hashes: list[str], choosen_path: Path ,current_progress: Progress ):
def catch_new_malwares(
cls, hashes: list[str], choosen_path: Path, current_progress: Progress
):
"""
Catch new malware samples since the last fecth from malshare.com to avoid duplicates
Expand All @@ -114,8 +120,8 @@ def catch_new_malwares(cls,hashes: list[str], choosen_path: Path ,current_progre
"""
new_hashes = []
task = current_progress.add_task(
"-[green]Checking existing malwares ...", total=len(hashes),start=False
)
"-[green]Checking existing malwares ...", total=len(hashes), start=False
)
for sha256_id in hashes:
if (choosen_path / f"malware_{sha256_id[:6]}.zip").exists():
continue
Expand All @@ -126,7 +132,7 @@ def catch_new_malwares(cls,hashes: list[str], choosen_path: Path ,current_progre
return new_hashes

@classmethod
def create_dir_and_return_path(cls,str_path: str) -> Path:
def create_dir_and_return_path(cls, str_path: str) -> Path:
"""
Create directory and return path to save the downloaded malware samples
Args:
Expand All @@ -136,13 +142,15 @@ def create_dir_and_return_path(cls,str_path: str) -> Path:
Path: path to save the downloaded malware samples
"""
if str_path == "":
(Path()/"Downloaded-Malwares").mkdir(parents=True, exist_ok=True)
return (Path()/"Downloaded-Malwares")
(Path() / "Downloaded-Malwares").mkdir(parents=True, exist_ok=True)
return Path() / "Downloaded-Malwares"
Path(str_path).mkdir(parents=True, exist_ok=True)
return Path(str_path)

@classmethod
def scrap_malshare(cls,choosen_path: str="", not_twins: bool = False, limit: int = 24):
def scrap_malshare(
cls, choosen_path: str = "", not_twins: bool = False, limit: int = 24
):
"""
Scrap malshare.com and download the last 24 hours malware samples
Expand All @@ -154,9 +162,9 @@ def scrap_malshare(cls,choosen_path: str="", not_twins: bool = False, limit: int
None
"""
with Progress(
SpinnerColumn(finished_text="[bold green]finished ✓[/bold green]"),
*Progress.get_default_columns(),
transient=True,
SpinnerColumn(finished_text="[bold green]finished ✓[/bold green]"),
*Progress.get_default_columns(),
transient=True,
) as progress:
if API_KEY is None or API_KEY == "":
return None
Expand All @@ -168,7 +176,9 @@ def scrap_malshare(cls,choosen_path: str="", not_twins: bool = False, limit: int
)
response.raise_for_status()
except requests.RequestException as err:
rich.print(f"[red]Malshare will be skipped. An unexpected error occurred: {err} ")
rich.print(
f"[red]Malshare will be skipped. An unexpected error occurred: {err} "
)
return None
hashes = response.json()
sliced_hashes = cls.get_malshare_treshold(hashes, limit)
Expand All @@ -177,11 +187,24 @@ def scrap_malshare(cls,choosen_path: str="", not_twins: bool = False, limit: int
downloaded_malwares_path = cls.create_dir_and_return_path(choosen_path)
with requests.Session() as session:
if not_twins:
new_hashes = cls.catch_new_malwares(sha256_ids, downloaded_malwares_path, progress)
cls.gather_selected_malwares(new_hashes=new_hashes, choosen_path=downloaded_malwares_path, progress=progress,req_session=session)
cls.download_selected_malwares(new_hashes=new_hashes, progress=progress)
new_hashes = cls.catch_new_malwares(
sha256_ids, downloaded_malwares_path, progress
)
cls.gather_selected_malwares(
new_hashes=new_hashes,
choosen_path=downloaded_malwares_path,
progress=progress,
req_session=session,
)
cls.download_selected_malwares(
new_hashes=new_hashes, progress=progress
)
else:
new_hashes = sha256_ids
cls.gather_selected_malwares(new_hashes=new_hashes, choosen_path=downloaded_malwares_path, progress=progress,req_session=session)
cls.download_selected_malwares(new_hashes, progress)

cls.gather_selected_malwares(
new_hashes=new_hashes,
choosen_path=downloaded_malwares_path,
progress=progress,
req_session=session,
)
cls.download_selected_malwares(new_hashes, progress)
45 changes: 21 additions & 24 deletions your_daily_dose_malware/backends/malware_bazaar.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@
API_KEY = os.getenv("MALSHARE_API_KEY")




class MalwareBazaar:

@classmethod
def hundred_most_recent(cls,headers_info):
def hundred_most_recent(cls, headers_info):
with Progress() as progress:
# TODO Pending Refactor this method to match all_most_recent style
response = requests.post(
Expand All @@ -30,7 +27,9 @@ def hundred_most_recent(cls,headers_info):
"[red]No selector provided. Please use either time or limit as selector",
)
if json_response["query_status"] == "unknown_selector":
rich.print("[red]Selector unknown. Please use either time or limit as selector")
rich.print(
"[red]Selector unknown. Please use either time or limit as selector"
)
return
if json_response["query_status"] == "no_results":
rich.print("[red]Your query yield no results")
Expand All @@ -43,12 +42,12 @@ def hundred_most_recent(cls,headers_info):
curr_time = dt.now().date().strftime("%Y-%m-%d")
task = progress.add_task(
" - [green]Downloading Malware Files...",
total=len(json_response["data"])
total=len(json_response["data"]),
)
for data in json_response["data"]:
sha256_name = data["sha256_hash"]
file_path = f"malware_{sha256_name[:6]}_{curr_time}.zip"
if (downloaded_malwares_path/file_path).exists():
if (downloaded_malwares_path / file_path).exists():
progress.update(task, advance=1)
continue
response = requests.post(
Expand All @@ -57,13 +56,13 @@ def hundred_most_recent(cls,headers_info):
headers=headers_info,
)
file_path = f"malware_{sha256_name[:6]}_{curr_time}.zip"
path = downloaded_malwares_path/file_path
path = downloaded_malwares_path / file_path
with open(path.absolute(), "wb+") as f:
f.write(response.content)
progress.update(task, advance=1)

@classmethod
def get_new_hashes(cls,malware_response: requests.Response):
def get_new_hashes(cls, malware_response: requests.Response):
with open("sha256_names.txt", "wb+") as f:
f.write(malware_response.content)
f.seek(0)
Expand All @@ -78,12 +77,10 @@ def get_new_hashes(cls,malware_response: requests.Response):
...

@classmethod
def all_most_recent(cls,headers_info):

def all_most_recent(cls, headers_info):
with Progress() as progress:
response = requests.post(
"https://bazaar.abuse.ch/export/txt/sha256/recent",
headers=headers_info
"https://bazaar.abuse.ch/export/txt/sha256/recent", headers=headers_info
)
new_hashes = cls.get_new_hashes(malware_response=response)
rich.print(f"- [green]Malware Bazaar: {len(new_hashes)} Samples")
Expand All @@ -94,16 +91,14 @@ def all_most_recent(cls,headers_info):
sha256_hashes = open("sha256_names.txt", "r").readlines()
print(sha256_hashes)
Path("sha256_names.txt").unlink()
task = progress.add_task(
" - [green]Downloading Malware Files..."
)
task = progress.add_task(" - [green]Downloading Malware Files...")
Path("Downloaded-Malwares").mkdir(exist_ok=True)
for sha256_hash in sha256_hashes:
curr_time = dt.now().date().strftime("%Y-%m-%d")
downloaded_malwares_path = Path("Downloaded-Malwares").absolute()
file_path = f"malware_{sha256_hash[:6]}_{curr_time}.zip"
if (downloaded_malwares_path/file_path).exists():
progress.update(task, advance=100/len(sha256_hashes))
if (downloaded_malwares_path / file_path).exists():
progress.update(task, advance=100 / len(sha256_hashes))
continue
response = requests.post(
"https://mb-api.abuse.ch/api/v1/",
Expand All @@ -113,21 +108,25 @@ def all_most_recent(cls,headers_info):
if response.status_code == 502:
json_response = response.json()
if json_response["query_status"] == "file_not_found":
rich.print(f" [red]sha256_hash: {sha256_hash[:6]} not found skipping")
rich.print(
f" [red]sha256_hash: {sha256_hash[:6]} not found skipping"
)
progress.update(task, advance=1)
continue
if json_response["query_status"] == "illegal_sha256_hash":
rich.print(f" [red]Illegal SHA256 hash provided: {sha256_hash[:6]} skipping")
rich.print(
f" [red]Illegal SHA256 hash provided: {sha256_hash[:6]} skipping"
)
progress.update(task, advance=1)
continue
targeted_file = f"malware_{sha256_hash[:6]}_{curr_time}.zip"
final_path = downloaded_malwares_path/targeted_file
final_path = downloaded_malwares_path / targeted_file
with open(final_path, "wb") as f:
f.write(response.content)
progress.update(task, advance=1)

@classmethod
def scrap_malware_bazaar(cls,hundred_recent:bool=False):
def scrap_malware_bazaar(cls, hundred_recent: bool = False):
if API_KEY == "":
rich.print("[red]Please set MALWARE_BAZAAR_API_KEY")
return None
Expand All @@ -138,5 +137,3 @@ def scrap_malware_bazaar(cls,hundred_recent:bool=False):
else:
cls.all_most_recent(headers)
return typer.Exit(code=1)


41 changes: 18 additions & 23 deletions your_daily_dose_malware/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Annotated
import typer
import typer
from pathlib import Path
from rich.console import Console

Expand All @@ -10,6 +10,7 @@

app = typer.Typer()


@app.callback()
def callback():
console = Console()
Expand All @@ -19,23 +20,15 @@ def callback():

@app.command()
def malshare(
path : str = typer.Option(
None, help="path to save the downloaded malware samples"
),
not_twins : bool = typer.Option(
path: str = typer.Option(None, help="path to save the downloaded malware samples"),
not_twins: bool = typer.Option(
False, help="do not download already downloaded malware samples"
),
limit : int = typer.Option(
0, help="number of malware samples to download"
),
limit: int = typer.Option(0, help="number of malware samples to download"),
):
"download the last 24 hours malware samples from malshare.com"
if path is not None and Path(path).exists():
Malshare.scrap_malshare(
choosen_path=path,
not_twins=not_twins,
limit=limit
)
Malshare.scrap_malshare(choosen_path=path, not_twins=not_twins, limit=limit)
else:
Malshare.scrap_malshare(limit=limit)

Expand All @@ -45,26 +38,28 @@ def malware_bazaar(
hundred: Annotated[
bool,
typer.Option(
"--hundred",
"-h",
is_flag=True,
),
]=False,
"--hundred",
"-h",
is_flag=True,
),
] = False,
most_recent: Annotated[
bool,
"--most-recent",
"-mr",
typer.Option(
is_flag=True,
)
]=False
is_flag=True,
),
] = False,
):
"""
Scrap malshare.com and download the last 24 hours malware samples
Scrap malshare.com and download the last 24 hours malware samples
"""
if hundred is True and most_recent is False:
MalwareBazaar.scrap_malware_bazaar(hundred_recent=True)
elif hundred is False and most_recent is True:
MalwareBazaar.scrap_malware_bazaar(hundred_recent=False)
else:
raise typer.BadParameter("You must to choose either <hundred> or <most-recent> options")
raise typer.BadParameter(
"You must to choose either <hundred> or <most-recent> options"
)

0 comments on commit e10870f

Please sign in to comment.