Skip to content

Commit

Permalink
sequence: refactor SequenceMatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
williballenthin committed Jan 16, 2025
1 parent ffcec47 commit b464e03
Showing 1 changed file with 93 additions and 75 deletions.
168 changes: 93 additions & 75 deletions capa/capabilities/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,70 +76,48 @@ class ThreadCapabilities:
call_matches: MatchResults


def find_thread_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle
) -> ThreadCapabilities:
"""
find matches for the given rules within the given thread,
which includes matches for all the sequences and calls within it.
"""
# all features found within this thread,
# includes features found within calls.
features: FeatureSet = collections.defaultdict(set)

# matches found at the call scope.
# might be found at different calls, that's ok.
call_matches: MatchResults = collections.defaultdict(list)

# matches found at the sequence scope.
sequence_matches: MatchResults = collections.defaultdict(list)
class SequenceMatcher:
def __init__(self, ruleset: RuleSet):
super().__init__()
self.ruleset = ruleset

# We matches sequences as the sliding window of calls with size SEQUENCE_SIZE.
#
# For each call, we consider the window of SEQUENCE_SIZE calls leading up to it,
# merging all their features and doing a match.
#
# We track these features in two data structures:
# 1. a deque of those features found in the prior calls.
# We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed.
# 2. a live set of features seen in the sequence.
# As we pop from the deque, we remove features from the current set,
# and as we push to the deque, we insert features to the current set.
# With this approach, our algorithm performance is independent of SEQUENCE_SIZE.
# The naive algorithm, of merging all the trailing feature sets at each call, is dependent upon SEQUENCE_SIZE
# (that is, runtime gets slower the larger SEQUENCE_SIZE is).
sequence_call_addresses: collections.deque[DynamicCallAddress] = collections.deque(maxlen=SEQUENCE_SIZE)
sequence_feature_sets: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE)
sequence_features: FeatureSet = collections.defaultdict(set)

# the names of rules matched at the last sequence,
# so that we can deduplicate long strings of the same matche.
last_sequence_matches: set[str] = set()

call_count = 0
for ch in extractor.get_calls(ph, th):
call_count += 1
call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in call_capabilities.features.items():
features[feature].update(vas)

for rule_name, res in call_capabilities.matches.items():
call_matches[rule_name].extend(res)
# matches found at the sequence scope.
self.matches: MatchResults = collections.defaultdict(list)

# We matches sequences as the sliding window of calls with size SEQUENCE_SIZE.
#
# sequence scope matching
# For each call, we consider the window of SEQUENCE_SIZE calls leading up to it,
# merging all their features and doing a match.
#
sequence_call_addresses.append(ch.address)
# We track these features in two data structures:
# 1. a deque of those features found in the prior calls.
# We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed.
# 2. a live set of features seen in the sequence.
# As we pop from the deque, we remove features from the current set,
# and as we push to the deque, we insert features to the current set.
# With this approach, our algorithm performance is independent of SEQUENCE_SIZE.
# The naive algorithm, of merging all the trailing feature sets at each call, is dependent upon SEQUENCE_SIZE
# (that is, runtime gets slower the larger SEQUENCE_SIZE is).
self.current_call_addresses: collections.deque[DynamicCallAddress] = collections.deque(maxlen=SEQUENCE_SIZE)
self.current_feature_sets: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE)
self.current_features: FeatureSet = collections.defaultdict(set)

# the names of rules matched at the last sequence,
# so that we can deduplicate long strings of the same matche.
self.last_sequence_matches: set[str] = set()

def next(self, ch: CallHandle, call_features: FeatureSet):
self.current_call_addresses.append(ch.address)
# TODO: it would be nice to create this only when needed, since it generates garbage.
sequence_address = DynamicSequenceAddress(
th.address, id=ch.address.id, calls=tuple(address.id for address in sequence_call_addresses)
ch.address.thread, id=ch.address.id, calls=tuple(address.id for address in self.current_call_addresses)
)

# As we add items to the end of the deque, overflow and drop the oldest items (at the left end).
# While we could rely on `deque.append` with `maxlen` set (which we provide above),
# we want to use the dropped item first, to remove the old features, so we manually pop it here.
if len(sequence_feature_sets) == SEQUENCE_SIZE:
overflowing_feature_set = sequence_feature_sets.popleft()
if len(self.current_feature_sets) == SEQUENCE_SIZE:
overflowing_feature_set = self.current_feature_sets.popleft()

for feature, vas in overflowing_feature_set.items():
if len(vas) == 1 and isinstance(next(iter(vas)), _NoAddress):
Expand All @@ -149,35 +127,75 @@ def find_thread_capabilities(
# like arch/os/format.
continue

feature_vas = sequence_features[feature]
feature_vas = self.current_features[feature]
feature_vas.difference_update(vas)
if not feature_vas:
del sequence_features[feature]
del self.current_features[feature]

# update the deque and set of features with the latest call's worth of features.
latest_features = call_capabilities.features
sequence_feature_sets.append(latest_features)
for feature, vas in latest_features.items():
sequence_features[feature].update(vas)
self.current_feature_sets.append(call_features)
for feature, vas in call_features.items():
self.current_features[feature].update(vas)

_, smatches = ruleset.match(Scope.SEQUENCE, sequence_features, sequence_address)
_, matches = self.ruleset.match(Scope.SEQUENCE, self.current_features, sequence_address)

newly_encountered_rules = set(matches.keys()) - self.last_sequence_matches

# don't emit match results for rules seen during the immediately preceeding sequence.
#
# This means that we won't emit duplicate matches when there are multiple sequences
# that overlap a single matching event.
# It also handles the case of a tight loop containing matched logic;
# only the first match will be recorded.
#
# In theory, this means the result document doesn't have *every* possible match location,
# but in practice, humans will only be interested in the first handful anyways.
suppressed_rules = set(self.last_sequence_matches)

# however, if a newly encountered rule depends on a suppressed rule,
# don't suppress that rule match, or we won't be able to reconstruct the vverbose output.
# see: https://github.com/mandiant/capa/pull/2532#issuecomment-2548508130
for new_rule in newly_encountered_rules:
suppressed_rules -= set(self.ruleset.rules[new_rule].get_dependencies(self.ruleset.rules_by_namespace))

# TODO: if smatches: create the sequence location
for rule_name, res in smatches.items():
for rule_name, res in matches.items():
# TODO: maybe just garbage collect here better.
if rule_name in last_sequence_matches:
# don't emit match results for rules seen during the immediately preceeding sequence.
#
# This means that we won't emit duplicate matches when there are multiple sequences
# that overlap a single matching event.
# It also handles the case of a tight loop containing matched logic;
# only the first match will be recorded.
#
# In theory, this means the result document doesn't have *every* possible match location,
# but in practice, humans will only be interested in the first handful anyways.
if rule_name in suppressed_rules:
continue
sequence_matches[rule_name].extend(res)
self.matches[rule_name].extend(res)

self.last_sequence_matches = set(matches.keys())


def find_thread_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle
) -> ThreadCapabilities:
"""
find matches for the given rules within the given thread,
which includes matches for all the sequences and calls within it.
"""
# all features found within this thread,
# includes features found within calls.
features: FeatureSet = collections.defaultdict(set)

# matches found at the call scope.
# might be found at different calls, that's ok.
call_matches: MatchResults = collections.defaultdict(list)

sequence_matcher = SequenceMatcher(ruleset)

call_count = 0
for ch in extractor.get_calls(ph, th):
call_count += 1
call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in call_capabilities.features.items():
features[feature].update(vas)

for rule_name, res in call_capabilities.matches.items():
call_matches[rule_name].extend(res)

last_sequence_matches = set(smatches.keys())
sequence_matcher.next(ch, call_capabilities.features)

for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()):
features[feature].add(va)
Expand All @@ -196,9 +214,9 @@ def find_thread_capabilities(
th.address.tid,
call_count,
len(features),
len(matches) + len(sequence_matches) + len(call_matches),
len(matches) + len(sequence_matcher.matches) + len(call_matches),
)
return ThreadCapabilities(features, matches, sequence_matches, call_matches)
return ThreadCapabilities(features, matches, sequence_matcher.matches, call_matches)


@dataclass
Expand Down

0 comments on commit b464e03

Please sign in to comment.