Skip to content

Commit

Permalink
some tweaks for cisagov#457, don't bother keeping track of when suric…
Browse files Browse the repository at this point in the history
…ata is done with a PCAP file. just let filebeat handle it and pick up the resultant eve.json files directly
  • Loading branch information
mmguero committed Jan 22, 2025
1 parent 4e23d61 commit 53e117d
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 190 deletions.
11 changes: 8 additions & 3 deletions filebeat/filebeat-logs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,23 @@ filebeat.inputs:
#-------------------------- Suricata EVE JSON logs -----------------------------
- type: log
paths:
- ${FILEBEAT_SURICATA_LOG_PATH:/suricata}/eve*.json
- ${FILEBEAT_SURICATA_LOG_PATH:/suricata}/suricata-*/eve*.json
symlinks: true
fields_under_root: true
tags: ["_filebeat_suricata_malcolm_upload"]
compression_level: 0
scan_frequency: ${FILEBEAT_SCAN_FREQUENCY:10s}
clean_inactive: ${FILEBEAT_CLEAN_INACTIVE:180m}
ignore_older: ${FILEBEAT_IGNORE_OLDER:120m}
close_inactive: ${FILEBEAT_CLOSE_INACTIVE:120s}
close_inactive: ${FILEBEAT_CLOSE_INACTIVE_LIVE:90m}
close_renamed: ${FILEBEAT_CLOSE_RENAMED:true}
close_removed: ${FILEBEAT_CLOSE_REMOVED:true}
close_eof: ${FILEBEAT_CLOSE_EOF:true}
# We're now submitting uploaded PCAP to Suricata over a socket,
# but Suricata doesn't let us know when processing is complete.
# Reaching EOF doesn't necessarily mean the file is done
# being written to. We need to treat the results more like "live"
# even though the traffic itself isn't.
close_eof: false
clean_removed: ${FILEBEAT_CLEAN_REMOVED:true}

- type: log
Expand Down
27 changes: 14 additions & 13 deletions filebeat/scripts/clean-processed-folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,22 @@ def pruneFiles():
print(f'removing dead symlink "{currentFileSpec}"')
silentRemove(currentFileSpec)

# clean up any old and empty directories in Zeek processed/ directory
# check the suricata logs (live and otherwise) as well
for surDir in [suricataDir, suricataLiveDir]:
if os.path.isdir(surDir):
for eve in os.listdir(surDir):
eveFile = os.path.join(surDir, eve)
if os.path.isfile(eveFile):
checkFile(eveFile, filebeatReg=fbReg, checkLogs=True, checkArchives=False)

# clean up any old and empty directories in Zeek processed/ and suricata non-live directories
cleanDirSeconds = min(i for i in (cleanLogSeconds, cleanZipSeconds) if i > 0)
candidateDirs = []
if os.path.isdir(zeekProcessedDir):
for root, dirs, files in os.walk(zeekProcessedDir, topdown=False):
if root and dirs:
candidateDirs += [os.path.join(root, tmpDir) for tmpDir in dirs]
for processedDir in [zeekProcessedDir, suricataDir]:
if os.path.isdir(processedDir):
for root, dirs, files in os.walk(processedDir, topdown=False):
if root and dirs:
candidateDirs += [os.path.join(root, tmpDir) for tmpDir in dirs]
candidateDirs = list(set(candidateDirs))
candidateDirs.sort(reverse=True)
candidateDirs.sort(key=len, reverse=True)
Expand All @@ -161,14 +170,6 @@ def pruneFiles():
except OSError:
pass

# check the suricata logs (live and otherwise) as well
for surDir in [suricataDir, suricataLiveDir]:
if os.path.isdir(surDir):
for eve in os.listdir(surDir):
eveFile = os.path.join(surDir, eve)
if os.path.isfile(eveFile):
checkFile(eveFile, filebeatReg=fbReg, checkLogs=True, checkArchives=False)


def main():
with open(lockFilename, 'w') as lock_file:
Expand Down
23 changes: 14 additions & 9 deletions logstash/pipelines/suricata/11_suricata_logs.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,23 @@ filter {
source => "message"
target => "suricata" }

# tags may have been specified, like: eve-123456789-1-(tagA,tagB,tagC).json, extract the tags
# Suricata tags may have been specified in the directory name, like:
# .../suricata-1737561275800982-1-(SNMP,NBSITEID0)/eve.json
# Extract the tags to apply to the event.
ruby {
id => "ruby_suricata_tags_extract"
code => "
if fileParts = event.get('[log][file][path]').split('/').last.match(/eve-\d+-\d+-(?:\((.*)\))?\.json/i) then
tags = fileParts.captures[0]
unless tags.nil?
filenameTags = tags.split(',')
nbsiteid = filenameTags.find { |str| str[/NBSITEID(.+)/] }[/NBSITEID(.+)/, 1].to_s rescue nil
filenameTags.delete_if{|v| ((v == nil) or (v == '') or (v !~ /\D/) or (v =~ /\A\s*NBSITEID/) or (v =~ /\A\s*(eve|pcap|dmp|log|bro|zeek|suricata|m?tcpdump|m?netsniff|autozeek|autosuricata)s?\s*\z/i))}
event.set('[@metadata][suricata_log_tags]', filenameTags.uniq) unless (filenameTags.length == 0)
event.set('[@metadata][nbsiteid]', nbsiteid) unless (nbsiteid.nil? || nbsiteid.empty?)
if filePath = event.get('[log][file][path]') then
parentDir = File.dirname(filePath).split('/').last
if matches = parentDir.match(/suricata-\d+-\d+-(?:\((.*)\))/i) then
tags = matches.captures[0]
unless tags.nil?
filenameTags = tags.split(',')
nbsiteid = filenameTags.find { |str| str[/NBSITEID(.+)/] }[/NBSITEID(.+)/, 1].to_s rescue nil
filenameTags.delete_if{|v| ((v == nil) or (v == '') or (v !~ /\D/) or (v =~ /\A\s*NBSITEID/) or (v =~ /\A\s*(eve|pcap|dmp|log|bro|zeek|suricata|m?tcpdump|m?netsniff|autozeek|autosuricata)s?\s*\z/i))}
event.set('[@metadata][suricata_log_tags]', filenameTags.uniq) unless (filenameTags.length == 0)
event.set('[@metadata][nbsiteid]', nbsiteid) unless (nbsiteid.nil? || nbsiteid.empty?)
end
end
end"
}
Expand Down
164 changes: 6 additions & 158 deletions shared/bin/pcap_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
)
from malcolm_utils import eprint, str2bool, AtomicInt, run_process, same_file_or_dir
from multiprocessing.pool import ThreadPool
from subprocess import PIPE, STDOUT, DEVNULL, Popen, TimeoutExpired
from collections import deque, defaultdict
from collections import deque
from itertools import chain, repeat

try:
Expand Down Expand Up @@ -400,7 +399,6 @@ def suricataFileWorker(suricataWorkerArgs):

(
newFileQueue,
suricataOutputDirMap,
pcapBaseDir,
autoSuricata,
forceSuricata,
Expand All @@ -423,7 +421,6 @@ def suricataFileWorker(suricataWorkerArgs):
suricataWorkerArgs[8],
suricataWorkerArgs[9],
suricataWorkerArgs[10],
suricataWorkerArgs[11],
)

if not logger:
Expand Down Expand Up @@ -485,7 +482,9 @@ def suricataFileWorker(suricataWorkerArgs):

# Create unique output directory for this PCAP's suricata output
processTimeUsec = int(round(time.time() * 1000000))
output_dir = os.path.join(uploadDir, f"suricata-{processTimeUsec}-{workerId}")
output_dir = os.path.join(
uploadDir, f"suricata-{processTimeUsec}-{workerId}-({','.join(fileInfo[FILE_INFO_DICT_TAGS])})"
)

try:
logger.info(
Expand All @@ -496,15 +495,8 @@ def suricataFileWorker(suricataWorkerArgs):
output_dir=output_dir,
):
# suricata over socket mode doesn't let us know when a PCAP file is done processing,
# so all we do here is store some information about it in suricataOutputDirMap and
# move on to the next one. suricataResultsMonitor will watch for the PCAP finished
# message in suricata.log and handle processing the eve.json at that point.
suricataOutputDirMap[fileInfo[FILE_INFO_DICT_NAME]] = (
output_dir,
workerId,
processTimeUsec,
fileInfo[FILE_INFO_DICT_TAGS],
)
# so all we do here is submit it and then we'll let filebeat tail the results
# as long as it needs to
logger.info(
f"{scriptName}[{workerId}]:\t\t{os.path.basename(fileInfo[FILE_INFO_DICT_NAME])}"
)
Expand All @@ -521,125 +513,6 @@ def suricataFileWorker(suricataWorkerArgs):
logger.info(f"{scriptName}[{workerId}]:\tfinished")


###################################################################################################
def suricataResultsMonitor(suricataResultsMonitorArgs):
global shuttingDown
global workersCount

# There is no good way to know if a PCAP file has finished processing with Suricata in socket
# mode (see https://forum.suricata.io/t/suricata-in-socket-mode-determining-if-a-pcap-is-done-processing).
# So what we have to do is tail suricata.log and watch for the "end of file reached" message
# to know we can process its eve.log. This is a pain in the 🍑.

workerId = workersCount.increment() # unique ID for this thread

(
suricataOutputDirMap,
suricataLogPath,
uploadDir,
logger,
debug,
) = (
suricataResultsMonitorArgs[0],
suricataResultsMonitorArgs[1],
suricataResultsMonitorArgs[2],
suricataResultsMonitorArgs[3],
suricataResultsMonitorArgs[4],
)

if not logger:
logger = logging

logger.info(f"{scriptName}[{workerId}]:\t🪵\tstarted")

finishedRegEx = re.compile(r'pcap file (.+?) end of file reached')

process = None

while not shuttingDown:

if process is not None:
try:
process.terminate()
try:
process.wait(timeout=5.0)
except TimeoutExpired:
process.kill()
except Exception:
pass
process = None

if os.path.isfile(suricataLogPath):
if process := Popen(
['tail', '-n', '1000', '-F', suricataLogPath],
stdout=PIPE,
stderr=None if debug else DEVNULL,
):
logger.info(f"{scriptName}[{workerId}]:\t🪵\tmonitoring {suricataLogPath}")
while not shuttingDown:

if output := process.stdout.readline():
try:
if pcapDoneMsgMatch := finishedRegEx.search(output.decode('utf-8').strip()):
pcapFileSpec = pcapDoneMsgMatch.group(1)
outputDir, origWorkerId, processTimeUsec, tags = suricataOutputDirMap.pop(
pcapFileSpec, (None, workerId, 0, None)
)
if outputDir and os.path.isdir(str(outputDir)):
logger.debug(
f"{scriptName}[{workerId}]:\t🏁\t{pcapFileSpec} done processing in {outputDir}"
)
eveJsonFile = os.path.join(outputDir, "eve.json")
if os.path.isfile(eveJsonFile):
eveJsonFileStats = os.stat(eveJsonFile)
if eveJsonFileStats.st_size > 0:
# relocate the .json to be processed (do it this way instead of with a shutil.move because of
# the way Docker volume mounts work, ie. avoid "OSError: [Errno 18] Invalid cross-device link").
eveJsonFileFinal = f"eve-{processTimeUsec}-{origWorkerId}-({','.join(tags if tags else [])}).json"
shutil.copy(eveJsonFile, os.path.join(uploadDir, eveJsonFileFinal))
logger.info(
f"{scriptName}[{workerId}]:\t📄\tGenerated {eveJsonFileFinal} ({eveJsonFileStats.st_size}) for {os.path.basename(pcapFileSpec)}"
)
else:
logger.info(
f"{scriptName}[{workerId}]:\t🫙\tEmpty eve.json generated for {os.path.basename(pcapFileSpec)}"
)
os.unlink(eveJsonFile)

else:
logger.warning(
f"{scriptName}[{workerId}]:\t⚠️\tNo eve.json generated for {os.path.basename(pcapFileSpec)}"
)

if not same_file_or_dir(uploadDir, outputDir):
shutil.rmtree(outputDir, ignore_errors=True)

else:
logger.warning(
f"{scriptName}[{workerId}]:\t⚠️\tNo output directory for {os.path.basename(pcapFileSpec)}"
)

except Exception as e:
logger.error(f"{scriptName}[{workerId}]:\t💥\tError watching logs: {e}")

else:
if process.poll() is not None:
break
else:
time.sleep(1)
else:
logger.error(f"{scriptName}[{workerId}]:\t💥\tUnable to tail {suricataLogPath}")
time.sleep(1)

else:
time.sleep(1)

if process is not None:
process.poll()

logger.info(f"{scriptName}[{workerId}]:\t🪵\tfinished")


###################################################################################################
# main
def main():
Expand Down Expand Up @@ -843,15 +716,6 @@ def main():
type=str,
default=SURICATA_SOCKET_PATH,
)
parser.add_argument(
'--suricata-log',
required=False,
dest='suricataLogPath',
help="suricata.log path",
metavar='<STR>',
type=str,
default=SURICATA_LOG_PATH,
)
parser.add_argument(
'--autosuricata',
dest='autoSuricata',
Expand Down Expand Up @@ -980,15 +844,13 @@ def main():
),
)
elif processingMode == PCAP_PROCESSING_MODE_SURICATA:
suricataOutputDirMap = defaultdict(lambda: None)
ThreadPool(
# threading is done inside of Suricata in socket mode, so just use 1 thread to submit PCAP
1,
suricataFileWorker,
(
[
newFileQueue,
suricataOutputDirMap,
args.pcapBaseDir,
args.autoSuricata,
args.forceSuricata,
Expand All @@ -1002,20 +864,6 @@ def main():
],
),
)
ThreadPool(
# threading is done inside of Suricata in socket mode, so just use 1 thread to monitor suricata results
1,
suricataResultsMonitor,
(
[
suricataOutputDirMap,
args.suricataLogPath,
args.suricataUploadDir,
logging,
args.verbose <= logging.DEBUG,
],
),
)

while not shuttingDown:
# for debugging
Expand Down
6 changes: 0 additions & 6 deletions shared/bin/pcap_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@

# Copyright (c) 2025 Battelle Energy Alliance, LLC. All rights reserved.

import os
import re
import sys

from subprocess import PIPE, Popen
from multiprocessing import RawValue
from threading import Lock

###################################################################################################
PCAP_TOPIC_PORT = 30441
Expand Down
1 change: 0 additions & 1 deletion suricata/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ command=python3 /usr/local/bin/pcap_suricata_processor.py %(ENV_PCAP_PIPELINE_VE
--suricata "%(ENV_SURICATA_RUN_DIR)s/suricata-command.socket"
--suricata-config "%(ENV_SURICATA_CONFIG_FILE)s"
--suricata-directory "%(ENV_SURICATA_LOG_DIR)s"
--suricata-log "%(ENV_SURICATA_LOG_DIR)s/suricata.log"
autostart=%(ENV_SURICATA_PCAP_PROCESSOR)s
autorestart=%(ENV_SURICATA_PCAP_PROCESSOR)s
startsecs=15
Expand Down

0 comments on commit 53e117d

Please sign in to comment.