Skip to content

Commit

Permalink
Port DeXRAY to Python (#2)
Browse files Browse the repository at this point in the history
* Begin port to use Python only
Use metadata from quarantined files

* Fix JSON section format

* Update dexray_lib.py

Now extracts some strings from Microsoft Defender metadata files.

Co-authored-by: Ewifly <[email protected]>
  • Loading branch information
malvidin and Ewifly authored Jun 7, 2021
1 parent d2ee38d commit 069b003
Show file tree
Hide file tree
Showing 6 changed files with 692 additions and 33 deletions.
22 changes: 22 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Dockerfile
.idea
.git

pipelines
venv
env
test
tests
exemples
docs

pip-log.txt
pip-delete-this-directory.txt
.tox
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
*.log
34 changes: 27 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
FROM cccs/assemblyline-v4-service-base:latest
FROM cccs/assemblyline-v4-service-base:latest AS base

ENV SERVICE_PATH dexray.dexray.Dexray

FROM base AS build

USER assemblyline

# Install pip packages
RUN touch /tmp/before-pip
RUN pip install --no-cache-dir --user olefile pycryptodome && rm -rf ~/.cache/pip

USER root
# Remove files that existed before the pip install so that our copy command below doesn't take a snapshot of
# files that already exist in the base image
RUN find /var/lib/assemblyline/.local -type f ! -newer /tmp/before-pip -delete

# change the ownership of the files to be copied due to bitbucket pipeline uid nonsense
RUN chown root:root -R /var/lib/assemblyline/.local

FROM base

# Install any service dependencies here
RUN apt-get update && apt-get install -y perl libarchive-extract-perl libarchive-zip-perl libcrypt-rc4-perl libdigest-crc-perl libcrypt-blowfish-perl libole-storage-lite-perl wget
RUN mkdir -p /opt/al_support
RUN wget -O /opt/al_support/dexray.pl https://raw.githubusercontent.com/Ewifly/assemblyline-service-dexray/main/dexray/dexray.pl
COPY --chown=assemblyline:assemblyline --from=build /var/lib/assemblyline/.local /var/lib/assemblyline/.local

RUN chmod +x /opt/al_support/dexray.pl
# Switch to assemblyline user
USER assemblyline

# Copy mobsf service code
# Clone Extract service code
WORKDIR /opt/al_service
COPY . .

# Patch version in manifest
ARG version=4.0.0.dev1
USER root
RUN sed -i -e "s/\$SERVICE_TAG/$version/g" service_manifest.yml

# Switch to assemblyline user
USER assemblyline
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Assemblyline service :
dequarantinize files before resubmitting them to the system using Dexray (http://hexacorn.com/d/DeXRAY.pl) (a little bit modified for IO)
#DeXRAY Assemblyline v4 Service
Extract files from quarantined files and resubmit using partial Python3 DeXRAY port.

http://hexacorn.com/d/DeXRAY.pl
97 changes: 77 additions & 20 deletions dexray/dexray.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,91 @@
import os
import glob
from subprocess import Popen

import json

from assemblyline_v4_service.common.base import ServiceBase
from assemblyline_v4_service.common.result import Result, ResultSection

from assemblyline_v4_service.common.request import ServiceRequest
from assemblyline_v4_service.common.result import Result, ResultSection, BODY_FORMAT
from assemblyline_v4_service.common.task import MaxExtractedExceeded

from dexray_lib import extract_ahnlab, extract_avast_avg, extract_mcafee_bup, extract_defender



class Dexray(ServiceBase):
def __init__(self, config=None):
super(Dexray, self).__init__(config)
self.dexraytool = self.config.get("dexray_tool_path", None)
self.extract_methods = [
extract_ahnlab,
extract_avast_avg,
extract_mcafee_bup,
extract_defender,
]
self.sha = None

def start(self):
if not os.path.exists(self.dexraytool):
self.log.error(f"Could not find dexray perl script at: {self.dexraytool}")

def execute(self, request):
file = request.file_path
"""Main Module. See README for details."""
result = Result()
unquar = Popen(["perl", self.dexraytool, file])
unquar.wait()

lnewfile_path = glob.glob(file + ".*") # should find the newly created file (filename.<offset>_<AV Name>.out)
if lnewfile_path:
text_section = ResultSection('DexRay found files:')
for newfile_path in lnewfile_path:
vendor = newfile_path.rsplit("_", 1)[1].split(".out")[0]
offset = newfile_path.rsplit("_", 1)[0].rsplit(".", 1)[1]
newfile_name = f"{request.file_name}.{offset}_{vendor}"
request.add_extracted(newfile_path, newfile_name, f"{vendor} un-quarantined file")
text_section.add_line(f"Resubmitted {vendor} un-quarantined file as : {newfile_name}")
result.add_section(text_section)
request.result = result
self.sha = request.sha256
local = request.file_path

text_section = None
kv_section = None

extracted, metadata = self.dexray(request, local)

num_extracted = len(request.extracted)
if num_extracted != 0:
text_section = ResultSection("DeXRAY found files:")
for extracted in request.extracted:
file_name = extracted.get('name')
text_section.add_line(f"Resubmitted un-quarantined file as : {file_name}")

if metadata:
# Can contain live URLs to the original content source
kv_section = ResultSection("DeXRAY Quarantine Metadata",
body_format=BODY_FORMAT.JSON,
body=json.dumps(metadata))
result.add_section(kv_section)

for section in (text_section, kv_section):
if section:
result.add_section(section)

def dexray(self, request: ServiceRequest, local: str):
"""Iterate through quarantine decrypt methods.
Args:
request: AL request object.
local: File path of AL sample.
Returns:
True if archive is password protected, and number of white-listed embedded files.
"""
encoding = request.file_type.replace("quarantine/", "")
extracted = []
metadata = {}

# Try all extracting methods
for extract_method in self.extract_methods:
# noinspection PyArgumentList
extracted, metadata = extract_method(local, self.sha, self.working_directory, encoding)
if extracted or metadata:
break

extracted_count = len(extracted)
# safe_str the file name (fn)
extracted = [[fp, safe_str(fn), e] for fp, fn, e in extracted]
for child in extracted:
try:
# If the file is not successfully added as extracted, then decrease the extracted file counter
if not request.add_extracted(*child):
extracted_count -= 1
except MaxExtractedExceeded:
raise MaxExtractedExceeded(f"This file contains {extracted_count} extracted files, exceeding the "
f"maximum of {request.max_extracted} extracted files allowed. "
"None of the files were extracted.")

return metadata
**
Loading

0 comments on commit 069b003

Please sign in to comment.