From b464e03281cc6e6e78f3def23fef2079e8af6ac4 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Thu, 16 Jan 2025 09:49:10 +0000 Subject: [PATCH] sequence: refactor SequenceMatcher --- capa/capabilities/dynamic.py | 168 +++++++++++++++++++---------------- 1 file changed, 93 insertions(+), 75 deletions(-) diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index 7a13862568..37316c1eda 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -76,70 +76,48 @@ class ThreadCapabilities: call_matches: MatchResults -def find_thread_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle -) -> ThreadCapabilities: - """ - find matches for the given rules within the given thread, - which includes matches for all the sequences and calls within it. - """ - # all features found within this thread, - # includes features found within calls. - features: FeatureSet = collections.defaultdict(set) - - # matches found at the call scope. - # might be found at different calls, that's ok. - call_matches: MatchResults = collections.defaultdict(list) - - # matches found at the sequence scope. - sequence_matches: MatchResults = collections.defaultdict(list) +class SequenceMatcher: + def __init__(self, ruleset: RuleSet): + super().__init__() + self.ruleset = ruleset - # We matches sequences as the sliding window of calls with size SEQUENCE_SIZE. - # - # For each call, we consider the window of SEQUENCE_SIZE calls leading up to it, - # merging all their features and doing a match. - # - # We track these features in two data structures: - # 1. a deque of those features found in the prior calls. - # We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed. - # 2. a live set of features seen in the sequence. - # As we pop from the deque, we remove features from the current set, - # and as we push to the deque, we insert features to the current set. - # With this approach, our algorithm performance is independent of SEQUENCE_SIZE. - # The naive algorithm, of merging all the trailing feature sets at each call, is dependent upon SEQUENCE_SIZE - # (that is, runtime gets slower the larger SEQUENCE_SIZE is). - sequence_call_addresses: collections.deque[DynamicCallAddress] = collections.deque(maxlen=SEQUENCE_SIZE) - sequence_feature_sets: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE) - sequence_features: FeatureSet = collections.defaultdict(set) - - # the names of rules matched at the last sequence, - # so that we can deduplicate long strings of the same matche. - last_sequence_matches: set[str] = set() - - call_count = 0 - for ch in extractor.get_calls(ph, th): - call_count += 1 - call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch) - for feature, vas in call_capabilities.features.items(): - features[feature].update(vas) - - for rule_name, res in call_capabilities.matches.items(): - call_matches[rule_name].extend(res) + # matches found at the sequence scope. + self.matches: MatchResults = collections.defaultdict(list) + # We matches sequences as the sliding window of calls with size SEQUENCE_SIZE. # - # sequence scope matching + # For each call, we consider the window of SEQUENCE_SIZE calls leading up to it, + # merging all their features and doing a match. # - sequence_call_addresses.append(ch.address) + # We track these features in two data structures: + # 1. a deque of those features found in the prior calls. + # We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed. + # 2. a live set of features seen in the sequence. + # As we pop from the deque, we remove features from the current set, + # and as we push to the deque, we insert features to the current set. + # With this approach, our algorithm performance is independent of SEQUENCE_SIZE. + # The naive algorithm, of merging all the trailing feature sets at each call, is dependent upon SEQUENCE_SIZE + # (that is, runtime gets slower the larger SEQUENCE_SIZE is). + self.current_call_addresses: collections.deque[DynamicCallAddress] = collections.deque(maxlen=SEQUENCE_SIZE) + self.current_feature_sets: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE) + self.current_features: FeatureSet = collections.defaultdict(set) + + # the names of rules matched at the last sequence, + # so that we can deduplicate long strings of the same matche. + self.last_sequence_matches: set[str] = set() + + def next(self, ch: CallHandle, call_features: FeatureSet): + self.current_call_addresses.append(ch.address) # TODO: it would be nice to create this only when needed, since it generates garbage. sequence_address = DynamicSequenceAddress( - th.address, id=ch.address.id, calls=tuple(address.id for address in sequence_call_addresses) + ch.address.thread, id=ch.address.id, calls=tuple(address.id for address in self.current_call_addresses) ) # As we add items to the end of the deque, overflow and drop the oldest items (at the left end). # While we could rely on `deque.append` with `maxlen` set (which we provide above), # we want to use the dropped item first, to remove the old features, so we manually pop it here. - if len(sequence_feature_sets) == SEQUENCE_SIZE: - overflowing_feature_set = sequence_feature_sets.popleft() + if len(self.current_feature_sets) == SEQUENCE_SIZE: + overflowing_feature_set = self.current_feature_sets.popleft() for feature, vas in overflowing_feature_set.items(): if len(vas) == 1 and isinstance(next(iter(vas)), _NoAddress): @@ -149,35 +127,75 @@ def find_thread_capabilities( # like arch/os/format. continue - feature_vas = sequence_features[feature] + feature_vas = self.current_features[feature] feature_vas.difference_update(vas) if not feature_vas: - del sequence_features[feature] + del self.current_features[feature] # update the deque and set of features with the latest call's worth of features. - latest_features = call_capabilities.features - sequence_feature_sets.append(latest_features) - for feature, vas in latest_features.items(): - sequence_features[feature].update(vas) + self.current_feature_sets.append(call_features) + for feature, vas in call_features.items(): + self.current_features[feature].update(vas) - _, smatches = ruleset.match(Scope.SEQUENCE, sequence_features, sequence_address) + _, matches = self.ruleset.match(Scope.SEQUENCE, self.current_features, sequence_address) + + newly_encountered_rules = set(matches.keys()) - self.last_sequence_matches + + # don't emit match results for rules seen during the immediately preceeding sequence. + # + # This means that we won't emit duplicate matches when there are multiple sequences + # that overlap a single matching event. + # It also handles the case of a tight loop containing matched logic; + # only the first match will be recorded. + # + # In theory, this means the result document doesn't have *every* possible match location, + # but in practice, humans will only be interested in the first handful anyways. + suppressed_rules = set(self.last_sequence_matches) + + # however, if a newly encountered rule depends on a suppressed rule, + # don't suppress that rule match, or we won't be able to reconstruct the vverbose output. + # see: https://github.com/mandiant/capa/pull/2532#issuecomment-2548508130 + for new_rule in newly_encountered_rules: + suppressed_rules -= set(self.ruleset.rules[new_rule].get_dependencies(self.ruleset.rules_by_namespace)) + # TODO: if smatches: create the sequence location - for rule_name, res in smatches.items(): + for rule_name, res in matches.items(): # TODO: maybe just garbage collect here better. - if rule_name in last_sequence_matches: - # don't emit match results for rules seen during the immediately preceeding sequence. - # - # This means that we won't emit duplicate matches when there are multiple sequences - # that overlap a single matching event. - # It also handles the case of a tight loop containing matched logic; - # only the first match will be recorded. - # - # In theory, this means the result document doesn't have *every* possible match location, - # but in practice, humans will only be interested in the first handful anyways. + if rule_name in suppressed_rules: continue - sequence_matches[rule_name].extend(res) + self.matches[rule_name].extend(res) + + self.last_sequence_matches = set(matches.keys()) + + +def find_thread_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle +) -> ThreadCapabilities: + """ + find matches for the given rules within the given thread, + which includes matches for all the sequences and calls within it. + """ + # all features found within this thread, + # includes features found within calls. + features: FeatureSet = collections.defaultdict(set) + + # matches found at the call scope. + # might be found at different calls, that's ok. + call_matches: MatchResults = collections.defaultdict(list) + + sequence_matcher = SequenceMatcher(ruleset) + + call_count = 0 + for ch in extractor.get_calls(ph, th): + call_count += 1 + call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch) + for feature, vas in call_capabilities.features.items(): + features[feature].update(vas) + + for rule_name, res in call_capabilities.matches.items(): + call_matches[rule_name].extend(res) - last_sequence_matches = set(smatches.keys()) + sequence_matcher.next(ch, call_capabilities.features) for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): features[feature].add(va) @@ -196,9 +214,9 @@ def find_thread_capabilities( th.address.tid, call_count, len(features), - len(matches) + len(sequence_matches) + len(call_matches), + len(matches) + len(sequence_matcher.matches) + len(call_matches), ) - return ThreadCapabilities(features, matches, sequence_matches, call_matches) + return ThreadCapabilities(features, matches, sequence_matcher.matches, call_matches) @dataclass