Skip to content

Commit

Permalink
Fixing linter checks
Browse files Browse the repository at this point in the history
  • Loading branch information
lucifercr07 committed Jun 3, 2024
1 parent a59bb8b commit b7c976c
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 70 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pre-commit-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
IGNORE_GITIGNORED_FILES: true
VALIDATE_ALL_CODEBASE: true
VALIDATE_PYTHON_BLACK: true
VALIDATE_PYTHON_FLAKE8: true
VALIDATE_HTML: true
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Your Daily Dose of Malware
Tool to Harvest Fresh Malware Samples for Security Research.

[![Super-Linter](https://github.com/Anti-Malware-Alliance/your-daily-dose-malware/actions/workflows/pre-commit-actions.yml/badge.svg)](https://github.com/marketplace/actions/super-linter)

# The Problem

Security Analyst are constantly in a need for Fresh Malware Samples.
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ requests = "^2.32.2"
click = "^8.1.7"
python-dotenv = "^1.0.1"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
2 changes: 1 addition & 1 deletion your_daily_dose_malware/app/utils/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
APP_NAME = 'your-daily-dose-malware'
APP_NAME = "your-daily-dose-malware"
64 changes: 47 additions & 17 deletions your_daily_dose_malware/app/utils/http_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@


class HttpMethod(Enum):
GET = 'GET'
POST = 'POST'
PUT = 'PUT'
DELETE = 'DELETE'
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"


class HttpUtilsException(Exception):
Expand All @@ -22,25 +22,52 @@ def __init__(self, message, status_code=None):
class HttpUtils:
@staticmethod
def get(url, headers=None, params=None, timeout=10):
return HttpUtils._make_request(HttpMethod.GET, url, headers=headers, params=params, timeout=timeout)
return HttpUtils._make_request(
HttpMethod.GET, url, headers=headers, params=params,
timeout=timeout
)

@staticmethod
def post(url, headers=None, params=None, data=None, json=None, timeout=10):
return HttpUtils._make_request(HttpMethod.POST, url, headers=headers, params=params, data=data, json=json,
timeout=timeout)
return HttpUtils._make_request(
HttpMethod.POST,
url,
headers=headers,
params=params,
data=data,
json=json,
timeout=timeout,
)

@staticmethod
def put(url, headers=None, params=None, data=None, json=None, timeout=10):
return HttpUtils._make_request(HttpMethod.PUT, url, headers=headers, params=params, data=data, json=json,
timeout=timeout)
return HttpUtils._make_request(
HttpMethod.PUT,
url,
headers=headers,
params=params,
data=data,
json=json,
timeout=timeout,
)

@staticmethod
def delete(url, headers=None, params=None, data=None, json=None, timeout=10):
return HttpUtils._make_request(HttpMethod.DELETE, url, headers=headers, params=params, data=data, json=json,
timeout=timeout)
def delete(url, headers=None, params=None, data=None, json=None,
timeout=10):
return HttpUtils._make_request(
HttpMethod.DELETE,
url,
headers=headers,
params=params,
data=data,
json=json,
timeout=timeout,
)

@staticmethod
def _make_request(method, url, headers=None, params=None, data=None, json=None, timeout=10):
def _make_request(
method, url, headers=None, params=None, data=None, json=None,
timeout=10):
try:
response = requests.request(
method=method,
Expand All @@ -49,7 +76,7 @@ def _make_request(method, url, headers=None, params=None, data=None, json=None,
params=params,
data=data,
json=json,
timeout=timeout
timeout=timeout,
)

response.raise_for_status()
Expand All @@ -58,8 +85,11 @@ def _make_request(method, url, headers=None, params=None, data=None, json=None,
except ValueError:
return response.text
except HTTPError as http_err:
raise HttpUtilsException(f'HTTP error occurred: {http_err}', status_code=http_err.response.status_code)
raise HttpUtilsException(
f"HTTP error occurred: {http_err}",
status_code=http_err.response.status_code,
)
except RequestException as req_err:
raise HttpUtilsException(f'Request error occurred: {req_err}')
raise HttpUtilsException(f"Request error occurred: {req_err}")
except Exception as err:
raise HttpUtilsException(f'An unexpected error occurred: {err}')
raise HttpUtilsException(f"An unexpected error occurred: {err}")
2 changes: 1 addition & 1 deletion your_daily_dose_malware/commands/scraper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import click


@click.group(name='scraper')
@click.group(name="scraper")
def scraper():
"""Commands for scraping data from your daily dose malware."""
pass
97 changes: 60 additions & 37 deletions your_daily_dose_malware/commands/utils/retrieve_malware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,71 @@
import requests
import os


def hundred_most_recent(headers_info):
response = requests.post('https://mb-api.abuse.ch/api/v1/', data={'query':'get_recent','selector':'100'}, headers=headers_info)
response = requests.post(
"https://mb-api.abuse.ch/api/v1/",
data={"query": "get_recent", "selector": "100"},
headers=headers_info,
)
json_response = response.json()
if json_response['query_status'] == 'no_selector':
click.echo(' No selector provided. Please use either time or limit as selector',color=True)
if json_response['query_status'] == 'unknown_selector':
click.echo(' Selector unknown. Please use either time or limit as selector')
if json_response['query_status'] == 'no_results':
click.echo(' Your query yield no results')
if json_response["query_status"] == "no_selector":
click.echo(
"No selector provided. Please use either time or limit "
"as selector",
color=True,
)
if json_response["query_status"] == "unknown_selector":
click.echo("Selector unknown. Please use either time or limit "
"as selector")
if json_response["query_status"] == "no_results":
click.echo("Your query yield no results")
else:
data_lenght = len(json_response['data'])
click.echo(f' Your query yield {data_lenght} results')
for data in json_response['data']:
sha256_name = data['sha256_hash']
if os.path.exists(f'malware_{sha256_name[:4]}.zip'):
data_length = len(json_response["data"])
click.echo(f" Your query yield {data_length} results")
for data in json_response["data"]:
sha256_name = data["sha256_hash"]
if os.path.exists(f"malware_{sha256_name[:4]}.zip"):
continue
response = requests.post('https://mb-api.abuse.ch/api/v1/', data={'query':'get_file','sha256_hash':sha256_name}, headers=headers_info)
with open(f'malware_{sha256_name[:4]}.zip', 'wb+') as f:
response = requests.post(
"https://mb-api.abuse.ch/api/v1/",
data={"query": "get_file", "sha256_hash": sha256_name},
headers=headers_info,
)
with open(f"malware_{sha256_name[:4]}.zip", "wb+") as f:
f.write(response.content)
click.echo(f' malware_{sha256_name[:4]}.zip downloaded')
click.echo(f" malware_{sha256_name[:4]}.zip downloaded")


def all_most_recent(headers_info):
response = requests.post('https://bazaar.abuse.ch/export/txt/sha256/recent', headers=headers_info)
with open('sha256_names.txt', 'wb+') as f:
response = requests.post(
"https://bazaar.abuse.ch/export/txt/sha256/recent",
headers=headers_info
)
with open("sha256_names.txt", "wb+") as f:
f.write(response.content)
f.seek(0) # go back to the top of the file
new_hashes = list()
file_lines = [line.strip() for line in f.readlines()]
for index, line in enumerate(file_lines, start=0):
# skip the first 9 lines and last line
if 8 < index < len(file_lines) - 1:
click.echo(line)
new_hashes.append(line)
continue
with open("sha256_names.txt", "w") as f:
for line in new_hashes:
f.write(line.decode("utf-8") + "\n")
sha256_names = open("sha256_names.txt", "r").readlines()
click.echo(f" {len(sha256_names)} hashes downloaded")
for sha256_hash in sha256_names:
if os.path.exists(f"malware_{sha256_hash[:4]}.zip"):
continue
response = requests.post(
"https://mb-api.abuse.ch/api/v1/",
data={"query": "get_file", "sha256_hash": sha256_hash},
headers=headers_info,
)
with open(f"malware_{sha256_hash[:4]}.zip", "wb") as f:
f.write(response.content)
f.seek(0) # go back to the top of the file
new_hashes = list()
file_lines = [line.strip() for line in f.readlines()]
for index, line in enumerate(file_lines,start=0):
if index > 8 and index < len(file_lines)-1: # skip the first 9 lines and last line
click.echo(line)
new_hashes.append(line)
continue
with open('sha256_names.txt', 'w') as f:
for line in new_hashes:
f.write(line.decode('utf-8') + '\n')
sha256_names = open('sha256_names.txt', 'r').readlines()
click.echo(f' {len(sha256_names)} hashes downloaded')
for sha256_hash in sha256_names:
if os.path.exists(f'malware_{sha256_hash[:4]}.zip'):
continue
response = requests.post('https://mb-api.abuse.ch/api/v1/', data={'query':'get_file','sha256_hash':sha256_hash}, headers=headers_info)
with open(f'malware_{sha256_hash[:4]}.zip', 'wb') as f:
f.write(response.content)
click.echo(f' malware_{sha256_hash[:4]}.zip downloaded')
click.echo(f" malware_{sha256_hash[:4]}.zip downloaded")
35 changes: 23 additions & 12 deletions your_daily_dose_malware/main.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,42 @@
import click
from .commands.scraper import scraper
from .commands.utils.retrieve_malware import all_most_recent, hundred_most_recent
from .commands.utils.retrieve_malware import (all_most_recent,
hundred_most_recent)
from dotenv import load_dotenv
import os

load_dotenv()


@click.command(
help="""
Download eitheir hundred recent malwares uploaded within the last 60 min or all the recent malwares uploaded within the last 48 hours sha256 hash by using api from 'https://mb-api.abuse.ch/api/v1/'
Download either hundred recent malware's uploaded within the last 60 min or
all the recent malware's uploaded within the last 48 hours sha256 hash
by using api from 'https://mb-api.abuse.ch/api/v1/'
-s256 or --by-sha256: get the lastest sha256 hashes from 'https://bazaar.abuse.ch/export/txt/sha256/recent' save them in sha256_names.txt then for each file download all malwares in zip file
-s256 or --by-sha256: get the most recent sha256 hashes from
'https://bazaar.abuse.ch/export/txt/sha256/recent' save them in
sha256_names.txt then for each file download all malware's in zip file
-hr or --hundred-recent: get the lastest hundred recent malwares uploaded within the last 60 min
""")
@click.option('-s256','--by-sha256',is_flag=True)
@click.option('-hr','--hundred-recent',is_flag=True)
-hr or --hundred-recent: get the most recent hundred recent malware's
uploaded within the last 60 min
"""
)
@click.option("-s256", "--by-sha256", is_flag=True)
@click.option("-hr", "--hundred-recent", is_flag=True)
def run_scrapper(by_sha256, hundred_recent):
headers = {
'API-KEY':os.getenv('API_KEY')
}
headers = {"API-KEY": os.getenv("API_KEY")}
if hundred_recent:
hundred_most_recent(headers)
elif by_sha256:
all_most_recent(headers)
else:
click.echo(' No selector provided. Please use either by_sha256, hundred_recent as selector',)
click.echo(
" No selector provided. Please use either by_sha256, "
"hundred_recent as selector",
)


if __name__ == '__main__':
if __name__ == "__main__":
scraper.add_command(run_scrapper)
scraper()

0 comments on commit b7c976c

Please sign in to comment.