-
Notifications
You must be signed in to change notification settings - Fork 49
/
Copy pathreducer.py
179 lines (139 loc) · 6.96 KB
/
reducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import logging
import time
from intervaltree import Interval, IntervalTree
from typing import List
from copy import deepcopy
from model.model_base import Scanner, ScanSpeed
from model.model_data import Data, Match
from model.file_model import BaseFile
from myutils import *
PRINT_DELAY_SECONDS = 2
class Reducer():
"""Reducer will scan data in file with scanner, and return List of matches"""
def __init__(self, file: BaseFile, scanner: Scanner, iteration: int = 0, scanSpeed: ScanSpeed = ScanSpeed.Normal, matchIdx: int = 0):
self.file: BaseFile = file
self.scanner: Scanner = scanner
self.scanSpeed: ScanSpeed = scanSpeed
self.iteration: int = iteration
self.matchIdx: int = matchIdx
self.matchesAdded: int = 0
self.chunks_tested: int = 0
self.minMatchSize: int = 4
self.minChunkSize: int = 4 # sane default for now. Will be adjusted based on section size on scan()
# re-init for every scan
self.lastPrintTime: int = 0
self.it = IntervalTree()
def init(self):
self.it = IntervalTree()
self.lastPrintTime = 0
def scan(self, offsetStart, offsetEnd) -> List[Match]:
"""Scan self.file.Data() from offsetStart to offsetEnd, return matches"""
self.init()
data = deepcopy(self.file.Data()) # get the data of the file to work on as copy
size = offsetEnd - offsetStart
if size < 50000: # 50kb
self.minChunkSize = 2
elif size < 100000: # 100kb
self.minChunkSize = 8
elif size < 500000: # 500kb
self.minChunkSize = 16
elif size < 1000000: # 1mb
self.minChunkSize = 32
else: # >1mb
self.minChunkSize = 64
self.minMatchSize = self.minChunkSize * 2
logging.info("Reducer Start: ScanSpeed:{} Iteration:{} MinChunkSize:{} MinMatchSize:{}".format(
self.scanSpeed.name, self.iteration, self.minChunkSize, self.minMatchSize))
timeStart = time.time()
self._scanDataPart(data, offsetStart, offsetEnd)
timeEnd = time.time()
scanTime = round(timeEnd - timeStart)
logging.info("Reducer Result: Time:{} Chunks:{} MatchesAdded:{} MatchesFinal:{}".format(
scanTime, self.chunks_tested, self.matchesAdded, len(self.it)))
matches = convertMatchesIt(self.it, self.iteration, self.matchIdx)
self.matchIdx += len(matches)
return matches
def _scanData(self, data: Data):
"""Use self.file with data, scan it and return true/false"""
newFileData: Data = self.file.getFileDataWith(data)
return self.scanner.scannerDetectsBytes(newFileData.getBytes(), self.file.filename)
def _addMatch(self, sectionStart: int, sectionEnd: int):
self.it.add ( Interval(sectionStart, sectionEnd) )
self.matchesAdded += 1
# Always merge, so we have accurate information about the amount of real matches
self.it.merge_overlaps(strict=False)
# recursive
def _scanDataPart(self, data: Data, sectionStart: int, sectionEnd: int):
size = sectionEnd - sectionStart
chunkSize = int(size // 2)
self.chunks_tested += 1
self._printStatus()
#logging.info(f"Testing: {sectionStart}-{sectionEnd} with size {sectionEnd-sectionStart} (chunkSize {chunkSize} bytes)")
#logging.info(f"Testing Top: {sectionStart}-{sectionStart+chunkSize}")
#logging.info(f"Testing Bot: {sectionStart+chunkSize}-{sectionStart+chunkSize+chunkSize}")
if self.chunks_tested > 0 and self.chunks_tested % 100 == 0:
logging.info("Doubling: minChunkSize: {} minMatchSize: {}".format(
self.minChunkSize, self.minMatchSize
))
self.minChunkSize *= 2
self.minMatchSize *= 2
# dangling bytes
# note that these have been detected, thats why they are being scanned.
# so we can just add them
if chunkSize <= self.minChunkSize:
dataBytes = data.getBytesRange(sectionStart, sectionEnd)
logging.info(f"Result: {sectionStart}-{sectionEnd} ({sectionEnd-sectionStart}b minChunk:{self.minChunkSize} X)"
+ "\n" + hexdmp(dataBytes, offset=sectionStart))
self._addMatch(sectionStart, sectionEnd)
return
dataChunkTopNull = deepcopy(data)
dataChunkTopNull.patchDataFill(sectionStart, chunkSize)
dataChunkBotNull = deepcopy(data)
dataChunkBotNull.patchDataFill(sectionStart+chunkSize, chunkSize)
detectTopNull = self._scanData(dataChunkTopNull)
detectBotNull = self._scanData(dataChunkBotNull)
if detectTopNull and detectBotNull:
#logging.info("--> Both Detected")
# Both halves are detected
# Continue scanning both halves independantly, but with each other halve
# zeroed out (instead of the complete file)
self._scanDataPart(dataChunkBotNull, sectionStart, sectionStart+chunkSize)
self._scanDataPart(dataChunkTopNull, sectionStart+chunkSize, sectionEnd)
elif not detectTopNull and not detectBotNull:
#logging.info("--> Both UNdetected")
# both parts arent detected anymore
if chunkSize <= self.minMatchSize:
# Small enough, no more detections.
# The "previous" section is our match
dataBytes = data.getBytesRange(sectionStart, sectionStart+size)
logging.info(f"Result: {sectionStart}-{sectionEnd} ({sectionEnd-sectionStart} bytes)"
+ "\n" + hexdmp(dataBytes, offset=sectionStart))
self._addMatch(sectionStart, sectionStart+size)
else:
# make it smaller still.
# Take complete data (not nulled)
self._scanDataPart(data, sectionStart, sectionStart+chunkSize)
self._scanDataPart(data, sectionStart+chunkSize, sectionEnd)
elif not detectTopNull:
# Detection in the top half
#logging.info("--> Do Top")
self._scanDataPart(data, sectionStart, sectionStart+chunkSize)
elif not detectBotNull:
# Detection in the bottom half
#logging.info("--> Do Bot")
self._scanDataPart(data, sectionStart+chunkSize, sectionEnd)
return
def _printStatus(self):
currentTime = time.time()
if currentTime > self.lastPrintTime + PRINT_DELAY_SECONDS:
self.lastPrintTime = currentTime
logging.info("Reducing: {} chunks done, found {} matches ({} added)".format(
self.chunks_tested, len(self.it), self.matchesAdded))
def convertMatchesIt(matchesIt: IntervalTree, iteration: int = 0, baseIdx: int = 0) -> List[Match]:
matches: List[Match] = []
idx = 0 + baseIdx
for m in sorted(matchesIt):
match = Match(idx, m.begin, m.end-m.begin, iteration)
matches.append(match)
idx += 1
return matches