From 52207928e790860347ea8f96d556f64d8900e677 Mon Sep 17 00:00:00 2001 From: Rich Piazza Date: Mon, 13 Jan 2025 16:54:47 -0500 Subject: [PATCH] neo4j datastore --- stix2/datastore/__init__.py | 7 +- stix2/datastore/neo4j/STIX2NEO4J.py | 148 +++++++++++ stix2/datastore/neo4j/__init__.py | 0 stix2/datastore/neo4j/demo.py | 25 ++ stix2/datastore/neo4j/neo4j.py | 305 ++++++++++++++++++++++ stix2/datastore/neo4j/neo4j_testing.py | 339 +++++++++++++++++++++++++ 6 files changed, 819 insertions(+), 5 deletions(-) create mode 100644 stix2/datastore/neo4j/STIX2NEO4J.py create mode 100644 stix2/datastore/neo4j/__init__.py create mode 100644 stix2/datastore/neo4j/demo.py create mode 100644 stix2/datastore/neo4j/neo4j.py create mode 100644 stix2/datastore/neo4j/neo4j_testing.py diff --git a/stix2/datastore/__init__.py b/stix2/datastore/__init__.py index 715c6e6b..93f05405 100644 --- a/stix2/datastore/__init__.py +++ b/stix2/datastore/__init__.py @@ -210,11 +210,8 @@ def add(self, *args, **kwargs): stix_objs (list): a list of STIX objects """ - try: - return self.sink.add(*args, **kwargs) - except AttributeError: - msg = "%s has no data sink to put objects in" - raise AttributeError(msg % self.__class__.__name__) + return self.sink.add(*args, **kwargs) + class DataSink(metaclass=ABCMeta): diff --git a/stix2/datastore/neo4j/STIX2NEO4J.py b/stix2/datastore/neo4j/STIX2NEO4J.py new file mode 100644 index 00000000..b6111e3d --- /dev/null +++ b/stix2/datastore/neo4j/STIX2NEO4J.py @@ -0,0 +1,148 @@ +# Reference implementation python script to load STIX 2.1 bundles into +# Neo4J graph database +# Code developed by JHU/APL - First Draft December 2021 + +# DISCLAIMER +# The script developed by JHU/APL for the demonstration are not “turn key” and are +# not safe for deployment without being tailored to production infrastructure. These +# files are not being delivered as software and are not appropriate for direct use on any +# production networks. JHU/APL assumes no liability for the direct use of these files and +# they are provided strictly as a reference implementation. +# +# NO WARRANTY, NO LIABILITY. THIS MATERIAL IS PROVIDED “AS IS.” JHU/APL MAKES NO +# REPRESENTATION OR WARRANTY WITH RESPECT TO THE PERFORMANCE OF THE MATERIALS, INCLUDING +# THEIR SAFETY, EFFECTIVENESS, OR COMMERCIAL VIABILITY, AND DISCLAIMS ALL WARRANTIES IN +# THE MATERIAL, WHETHER EXPRESS OR IMPLIED, INCLUDING (BUT NOT LIMITED TO) ANY AND ALL +# IMPLIED WARRANTIES OF PERFORMANCE, MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, +# AND NON-INFRINGEMENT OF INTELLECTUAL PROPERTY OR OTHER THIRD PARTY RIGHTS. ANY USER OF +# THE MATERIAL ASSUMES THE ENTIRE RISK AND LIABILITY FOR USING THE MATERIAL. IN NO EVENT +# SHALL JHU/APL BE LIABLE TO ANY USER OF THE MATERIAL FOR ANY ACTUAL, INDIRECT, +# CONSEQUENTIAL, SPECIAL OR OTHER DAMAGES ARISING FROM THE USE OF, OR INABILITY TO USE, +# THE MATERIAL, INCLUDING, BUT NOT LIMITED TO, ANY DAMAGES FOR LOST PROFITS. + +## Import python modules for this script +import json +from typing import List +from py2neo import Graph, Node +from getpass import getpass +from tqdm import tqdm + +#Import variables +BundleName = input("Enter the name you want for your bundle: ") +NeoHost = input("Enter the hostname for Neo4j server: ") +NeoUser = input("Neo4j User: ") +NeoPass = getpass("Neo4j Password: ") +JSONFILE = input("Path to STIX JSON: ") + +class NeoUploader(object): + + def __init__(self): + # Connect to neo4j + self.sgraph = Graph(host=NeoHost, auth=(NeoUser, NeoPass)) + self.relations = list() + self.relationship_ids = set() + self.nodes_with_object_ref = list() + self.nodes = list() + self.bundlename = BundleName + self.infer_relation = {"parent_ref": "parent_of", + "created_by_ref": "created_by", + "src_ref": "source_of", + "dst_ref": "destination_of"} + self.__load_json(JSONFILE) + + def __load_json(self, fd): + data = None + with open(fd) as json_file: + data = json.load(json_file) + for entry in data["objects"]: + if entry["type"] == "relationship": + self.relations.append(entry) + else: + self.nodes.append(entry) + + # Make Nodes + def make_nodes(self): + total_nodes=len(self.nodes) + for idx, apobj in tqdm(enumerate(self.nodes), total=total_nodes, desc="Making Nodes", unit="node"): + keys = apobj.keys() + node_contents = dict() + #If the SCO does not have a name field, use the type as name + if 'name' not in keys: + node_name = apobj["type"] + else: + node_name = apobj["name"] + # add id and type to node contents + node_contents["ap_id"] = apobj["id"] + node_contents["type"] = apobj["type"] + # store rest of object contents in node contents + for key in keys: + if key not in ["type", "name", "id"]: + # collections not allowed as neo4j property value + # convert nested collections to string + if isinstance(apobj[key], list) or isinstance(apobj[key], dict): + node_contents[key] = json.dumps(apobj[key]) + else: + node_contents[key] = apobj[key] + # Make the Bundle ID a property + # use dictionary expansion as keywork for optional node properties + node = Node(apobj["type"], + name=node_name, + bundlesource=self.bundlename, + **node_contents) + # if node needs new created_by relation, create the node and then the relationship + self.sgraph.create(node) + # save off these nodes for additional relationship creating + if 'object_refs' in keys: + self.nodes_with_object_ref.append(apobj) + + # create relationships that exist outside of relationship objects + # such as Created_by and Parent_Of + def __make_inferred_relations(self): + total_nodes=len(self.nodes) + for idx, apobj in tqdm(enumerate(self.nodes), total=total_nodes, desc="Checking Inferred Relationships", unit="node"): + for k in apobj.keys(): + k_tokens = k.split("_") + # find refs, but ignore external_references since they aren't objects + if "ref" in k_tokens[len(k_tokens) - 1] and k_tokens[len(k_tokens) - 1] != "references": + rel_type = "_".join(k_tokens[: -1]) + ref_list = [] + # refs are lists, push singular ref into list to make it iterable for loop + if not type(apobj[k]).__name__ == "list": + ref_list.append(apobj[k]) + else: + ref_list = apobj[k] + for ref in ref_list: + # The "b to a" relationship is reversed in this cypher query to ensure the correct relationship direction in the graph + cypher_string = f'MATCH (a),(b) WHERE a.bundlesource="{self.bundlename}" AND b.bundlesource="{self.bundlename}" AND a.ap_id="{str(ref)}" AND b.ap_id="{str(apobj["id"])}" CREATE (b)-[r:{rel_type}]->(a) RETURN a,b' + try: + self.sgraph.run(cypher_string) + except Exception as err: + print(err) + continue + + # Make Relationships + def make_relationships(self): + total_rels=len(self.relations) + for idx, apobj in tqdm(enumerate(self.relations), total=total_rels, desc="Making Relationships", unit="rel"): + # Define Relationship Type + reltype = str(apobj['relationship_type']) + # Fix Relationships with hyphens, neo4j will throw syntax error as + # the hyphen is interpreted as an operation in the query string + reltype = reltype.replace('-', '_') + # create the relationship + cypher_string = f'MATCH (a),(b) WHERE a.bundlesource="{self.bundlename}" AND b.bundlesource="{self.bundlename}" AND a.ap_id="{str(apobj["source_ref"])}" AND b.ap_id="{str(apobj["target_ref"])}" CREATE (a)-[r:{reltype}]->(b) RETURN a,b' + self.sgraph.run(cypher_string) + # maintain set of object ids that are in relationship objects + self.relationship_ids.add(str(apobj['source_ref'])) + self.relationship_ids.add(str(apobj['target_ref'])) + self.__make_inferred_relations() + + # run the helper methods to upload bundle to neo4j database + def upload(self): + self.make_nodes() + self.make_relationships() + + +if __name__ == '__main__': + uploader = NeoUploader() + uploader.upload() diff --git a/stix2/datastore/neo4j/__init__.py b/stix2/datastore/neo4j/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/stix2/datastore/neo4j/demo.py b/stix2/datastore/neo4j/demo.py new file mode 100644 index 00000000..047bfcf9 --- /dev/null +++ b/stix2/datastore/neo4j/demo.py @@ -0,0 +1,25 @@ + +import sys +import json + +import stix2 +from stix2.datastore.neo4j.neo4j import Neo4jStore +import stix2.properties + +# needed so the relational db code knows to create tables for this +from incident import incident, event, task, impact +from identity_contact_information import identity_contact_information +from observed_string import observed_string + + +def main(): + with open(sys.argv[1], "r") as f: + bundle = stix2.parse(json.load(f), allow_custom=True) + store = Neo4jStore(clear_database=True) + + for obj in bundle.objects: + store.add(obj) + + +if __name__ == '__main__': + main() diff --git a/stix2/datastore/neo4j/neo4j.py b/stix2/datastore/neo4j/neo4j.py new file mode 100644 index 00000000..c9acc6c4 --- /dev/null +++ b/stix2/datastore/neo4j/neo4j.py @@ -0,0 +1,305 @@ +import json + +from py2neo import Graph, Node, Relationship + +import stix2 +from stix2.base import _STIXBase +from stix2.datastore import ( + DataSink, DataSource, DataStoreMixin, +) +from stix2.parsing import parse + + +def remove_sro_from_list(sro, sro_list): + for rel in sro_list: + if (rel["source_ref"] == sro["source_ref"] and + rel["target_ref"] == sro["target_ref"] and + rel["relationship_type"] == sro["relationship_type"]): + sro_list.remove(rel) + break + return sro_list + + +def hash_dict_as_string(hash_dict): + hashes = [] + for hash_type, hash in hash_dict.items(): + hashes.append(f'{hash_type}:{hash}') + return ",".join(hashes) + +def _add(sink, stix_data, allow_custom=True, version="2.1"): + """Add STIX objects to MemoryStore/Sink. + + Adds STIX objects to an in-memory dictionary for fast lookup. + Recursive function, breaks down STIX Bundles and lists. + + Args: + store: A MemoryStore, MemorySink or MemorySource object. + stix_data (list OR dict OR STIX object): STIX objects to be added + allow_custom (bool): Whether to allow custom properties as well unknown + custom objects. Note that unknown custom objects cannot be parsed + into STIX objects, and will be returned as is. Default: False. + version (str): Which STIX2 version to lock the parser to. (e.g. "2.0", + "2.1"). If None, the library makes the best effort to figure + out the spec representation of the object. + + """ + if isinstance(stix_data, list): + # STIX objects are in a list- recurse on each object + for stix_obj in stix_data: + _add(sink, stix_obj, allow_custom, version) + + elif stix_data["type"] == "bundle": + # adding a json bundle - so just grab STIX objects + for stix_obj in stix_data.get("objects", []): + _add(sink, stix_obj, allow_custom, version) + + else: + # Adding a single non-bundle object + if isinstance(stix_data, _STIXBase): + stix_obj = stix_data + else: + stix_obj = parse(stix_data, allow_custom, version) + + sink.insert_object(stix_obj) + + +class Neo4jStore(DataStoreMixin): + default_host = "localhost" + default_username = "neo4j" + default_password = "password" + + default_neo4j_connection = "bolt://neo4j:password@localhost:7687" + + def __init__(self, host=default_host, username=default_username, password=default_password, allow_custom=True, version=None, + clear_database=True): + self.sgraph = Graph(host=host, auth=(username, password)) + super().__init__( + source = Neo4jSource( + sgraph=self.sgraph, + allow_custom=allow_custom, + + ), + sink = Neo4jSink( + sgraph=self.sgraph, + allow_custom=allow_custom, + version=version, + clear_database=clear_database, + + + ) + ) + + +class Neo4jSource(DataSource): + def __init__(self, sgraph, allow_custom): + pass + + def all_versions(self, stix_id): + pass + + def query(self, query=None): + pass + + def get(self, stix_id): + pass + + +class Neo4jSink(DataSink): + + def add(self, stix_data, version=None): + _add(self, stix_data, self.allow_custom) + add.__doc__ = _add.__doc__ + + def __init__(self, sgraph, allow_custom=True, version=None, clear_database=False): + super(Neo4jSink, self).__init__() + self.sgraph = sgraph + self.relationships_to_recheck = list() + self.sub_object_relationships = list() + self.counter = 1 + self.allow_custom=allow_custom + if clear_database: + self.sgraph.delete_all() + + def insert_object(self, obj): + # need something better to check for sros, this will not handle sightings + if obj["type"] == "relationship": + self._insert_sro(obj) + else: + self._insert_sdo_sco_smo(obj, obj["type"]) + + def next_id(self): + self.counter += 1 + return str(self.counter) + + def _insert_sdo_sco_smo(self, obj, type_name): + extension_relationships = list() + self.sub_object_relationships = list() + external_references = list() + keys = obj.keys() + node_contents = dict() + # If the SCO does not have a name field, use the type as name + if 'name' not in keys: + node_name = obj["type"] + "_" + self.next_id() + else: + node_name = obj["name"] + # add id and type to node contents + if "id" in obj: + node_contents["id"] = obj["id"] + node_contents["type"] = type_name + # store rest of object contents in node contents + for key in keys: + if key not in ["type", "name", "id"] and not key.endswith("ref") and not key.endswith("refs"): + # collections not allowed as neo4j property value + # convert nested collections to string + if isinstance(obj[key], list): + if isinstance(obj[key][0], str): + node_contents[key] = ",".join(obj[key]) + elif isinstance(obj[key][0], stix2.ExternalReference): + external_references = obj[key] + else: + print(obj[key]) + elif not isinstance(obj[key], dict): + node_contents[key] = obj[key] + elif key == "hashes": + node_contents[key] = hash_dict_as_string(obj[key]) + elif key == "extensions": + for extension_id, value in obj[key].items(): + if hasattr(value, "extension_type") and value.extension_type and value.extension_type.startswith("new-"): + continue + else: + extension_relationships.append(value) + else: + self.sub_object_relationships.append((key, obj[key])) + # Make the Bundle ID a property + # use dictionary expansion as keyword for optional node properties + node = Node(type_name, + name=node_name, + # bundlesource=self.bundlename, + **node_contents) + # if node needs new created_by relation, create the node and then the relationship + self.sgraph.create(node) + # check to see if the addition of this node makes it possible to create a relationship + for rel in self.relationships_to_recheck: + self._insert_sro(obj, True) + self._insert_embedded_relationships(obj) + self._insert_external_references(external_references, node) + self._insert_extensions(extension_relationships, node) + self._insert_sub_objects(self.sub_object_relationships, node) + + def _insert_sub_object(self, sub_prop, sub_obj, parent_node): + node_contents = dict() + node_contents["type"] = sub_prop + for key, value in sub_obj.items(): + if isinstance(value, list): + if isinstance(value[0], str): + node_contents[key] = ",".join(value) + elif isinstance(value[0], dict): + for v in value: + self.sub_object_relationships.append((key, v)) + elif key == "hashes": + node_contents[key] = hash_dict_as_string(value) + elif not isinstance(value, dict): + node_contents[key] = value + else: + self.sub_object_relationships.append((key, value)) + node = Node(sub_prop, + name=sub_prop + "_" + self.next_id(), + # bundlesource=self.bundlename, + **node_contents) + self.sgraph.create(node) + relationship = Relationship(parent_node, sub_prop, node) + self.sgraph.create(relationship) + + def _insert_sub_objects(self, sub_objects, parent_node): + for sub in sub_objects: + self._insert_sub_object(sub[0], sub[1], parent_node) + + def _insert_external_references(self, refs, parent_node): + for ref in refs: + node_contents = dict() + node_contents["type"] = "external_reference" + for key, value in ref.items(): + if key == "hashes": + node_contents[key] = hash_dict_as_string(value) + elif not isinstance(value, dict): + node_contents[key] = value + else: + self.sub_object_relationships.append((key, value)) + node = Node("external_reference", + name="external_reference" + "_" + self.next_id(), + # bundlesource=self.bundlename, + **node_contents) + relationship = Relationship(parent_node, "external_reference", node) + self.sgraph.create(relationship) + + def _insert_extensions(self, extensions, parent_node): + for ext in extensions: + node_contents = dict() + type_name = ext.__class__.__name__ + node_contents["type"] = type_name + for key, value in ext.items(): + if isinstance(value, list): + if isinstance(value[0], str): + node_contents[key] = ",".join(value) + else: + for v in value: + self.sub_object_relationships.append((key, v)) + elif key == "hashes": + node_contents[key] = hash_dict_as_string(value) + else: + node_contents[key] = value + node = Node(type_name, + name=type_name + "_" + self.next_id(), + # bundlesource=self.bundlename, + **node_contents) + relationship = Relationship(parent_node, type_name, node) + self.sgraph.create(relationship) + + def _is_node_available(self, id,): + cypher_string = f'OPTIONAL MATCH (a) WHERE a.id="{str(id)}" RETURN a' + return self.sgraph.run(cypher_string) + + def _insert_sro(self, obj, recheck=False): + reltype = str(obj['relationship_type']) + # Fix Relationships with hyphens, neo4j will throw syntax error as + # the hyphen is interpreted as an operation in the query string + reltype = reltype.replace('-', '_') + # create the relationship + # query for the existence of both source and target objects + # save ones for which both don't exist, and check the list whenever a new S(D|C|M)O is added + if self._is_node_available(obj["source_ref"]) and self._is_node_available(obj["target_ref"]): + cypher_string = f'MATCH (a),(b) WHERE a.id="{str(obj["source_ref"])}" AND b.id="{str(obj["target_ref"])}" CREATE (a)-[r:{reltype}]->(b) RETURN a,b' + self.sgraph.run(cypher_string) + if recheck: + remove_sro_from_list(obj, self.relationships_to_recheck) + else: + if not recheck: + self.relationships_to_recheck.append(obj) + + def _insert_embedded_relationships(self, obj, recheck=False): + for k in obj.keys(): + k_tokens = k.split("_") + # find refs, but ignore external_references since they aren't objects + if "ref" in k_tokens[len(k_tokens) - 1] and k_tokens[len(k_tokens) - 1] != "references": + rel_type = "_".join(k_tokens[: -1]) + ref_list = [] + # refs are lists, push singular ref into list to make it iterable for loop + if not type(obj[k]).__name__ == "list": + ref_list.append(obj[k]) + else: + ref_list = obj[k] + for ref in ref_list: + if self._is_node_available(ref): + # The "b to a" relationship is reversed in this cypher query to ensure the correct relationship direction in the graph + cypher_string = f'MATCH (a),(b) WHERE a.id="{str(ref)}" AND b.id="{str(obj["id"])}" CREATE (b)-[r:{rel_type}]->(a) RETURN a,b' + self.sgraph.run(cypher_string) + if recheck: + remove_sro_from_list(obj, self.relationships_to_recheck) + else: + if not recheck: + embedded_relationship = {"source_ref": obj["id"], + "target_ref": ref, + "relationship_type": rel_type} + self.relationships_to_recheck.append(embedded_relationship) + + diff --git a/stix2/datastore/neo4j/neo4j_testing.py b/stix2/datastore/neo4j/neo4j_testing.py new file mode 100644 index 00000000..13346177 --- /dev/null +++ b/stix2/datastore/neo4j/neo4j_testing.py @@ -0,0 +1,339 @@ +import datetime as dt +import os # noqa: F401 + + +import pytz + +import stix2 +from stix2.datastore.neo4j.neo4j import Neo4jStore +import stix2.properties + +email_message = stix2.EmailMessage( + type="email-message", + spec_version="2.1", + id="email-message--0c57a381-2a17-5e61-8754-5ef96efb286c", + from_ref="email-addr--9b7e29b3-fd8d-562e-b3f0-8fc8134f5dda", + to_refs=["email-addr--d1b3bf0c-f02a-51a1-8102-11aba7959868"], + is_multipart=False, + date="2004-05-19T12:22:23.000Z", + subject="Did you see this?", + additional_header_fields={ + "Reply-To": [ + "steve@example.com", + "jane@example.com", + ], + }, +) + +directory_stix_object = stix2.Directory( + path="/foo/bar/a", + path_enc="latin1", + ctime="1980-02-23T05:43:28.2678Z", + atime="1991-06-09T18:06:33.915Z", + mtime="2000-06-28T13:06:09.5827Z", + contains_refs=[ + "file--8903b558-40e3-43e2-be90-b341c12ff7ae", + "directory--e0604d0c-bab3-4487-b350-87ac1a3a195c", + ], + object_marking_refs=[ + "marking-definition--1b3eec29-5376-4837-bd93-73203e65d73c", + ], +) + +s = stix2.v21.Software( + id="software--28897173-7314-4eec-b1cf-2c625b635bf6", + name="Word", + cpe="cpe:2.3:a:microsoft:word:2000:*:*:*:*:*:*:*", + swid="com.acme.rms-ce-v4-1-5-0", + version="2002", + languages=["c", "lisp"], + vendor="Microsoft", +) + + +def windows_registry_key_example(): + v1 = stix2.v21.WindowsRegistryValueType( + name="Foo", + data="qwerty", + data_type="REG_SZ", + ) + v2 = stix2.v21.WindowsRegistryValueType( + name="Bar", + data="Fred", + data_type="REG_SZ", + ) + w = stix2.v21.WindowsRegistryKey( + key="hkey_local_machine\\system\\bar\\foo", + values=[v1, v2], + ) + return w + + +def malware_with_all_required_properties(): + ref1 = stix2.v21.ExternalReference( + source_name="veris", + external_id="0001AA7F-C601-424A-B2B8-BE6C9F5164E7", + hashes={ + "SHA-256": "6db12788c37247f2316052e142f42f4b259d6561751e5f401a1ae2a6df9c674b", + "MD5": "3773a88f65a5e780c8dff9cdc3a056f3", + }, + url="https://github.com/vz-risk/VCDB/blob/master/data/json/0001AA7F-C601-424A-B2B8-BE6C9F5164E7.json", + ) + ref2 = stix2.v21.ExternalReference( + source_name="ACME Threat Intel", + description="Threat report", + url="http://www.example.com/threat-report.pdf", + ) + now = dt.datetime(2016, 5, 12, 8, 17, 27, tzinfo=pytz.utc) + + malware = stix2.v21.Malware( + external_references=[ref1, ref2], + type="malware", + id="malware--9c4638ec-f1de-4ddb-abf4-1b760417654e", + created=now, + modified=now, + name="Cryptolocker", + is_family=False, + labels=["foo", "bar"], + ) + return malware + + +def file_example_with_PDFExt_Object(): + f = stix2.v21.File( + name="qwerty.dll", + magic_number_hex="504B0304", + extensions={ + "pdf-ext": stix2.v21.PDFExt( + version="1.7", + document_info_dict={ + "Title": "Sample document", + "Author": "Adobe Systems Incorporated", + "Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh", + "Producer": "Acrobat Distiller 3.01 for Power Macintosh", + "CreationDate": "20070412090123-02", + }, + pdfid0="DFCE52BD827ECF765649852119D", + pdfid1="57A1E0F9ED2AE523E313C", + ), + }, + ) + return f + + +def extension_definition_insert(): + return stix2.ExtensionDefinition( + created_by_ref="identity--8a5fb7e4-aabe-4635-8972-cbcde1fa4792", + name="test", + schema="a schema", + version="1.2.3", + extension_types=["property-extension", "new-sdo", "new-sro"], + object_marking_refs=[ + "marking-definition--caa0d913-5db8-4424-aae0-43e770287d30", + "marking-definition--122a27a0-b96f-46bc-8fcd-f7a159757e77", + ], + granular_markings=[ + { + "lang": "en_US", + "selectors": ["name", "schema"], + }, + { + "marking_ref": "marking-definition--50902d70-37ae-4f85-af68-3f4095493b42", + "selectors": ["name", "schema"], + }, + ], + ) + + +def dictionary_test(): + return stix2.File( + spec_version="2.1", + name="picture.jpg", + defanged=True, + ctime="1980-02-23T05:43:28.2678Z", + extensions={ + "raster-image-ext": { + "exif_tags": { + "Make": "Nikon", + "Model": "D7000", + "XResolution": 4928, + "YResolution": 3264, + }, + }, + }, + ) + + +def kill_chain_test(): + return stix2.AttackPattern( + spec_version="2.1", + id="attack-pattern--0c7b5b88-8ff7-4a4d-aa9d-feb398cd0061", + created="2016-05-12T08:17:27.000Z", + modified="2016-05-12T08:17:27.000Z", + name="Spear Phishing", + kill_chain_phases=[ + { + "kill_chain_name": "lockheed-martin-cyber-kill-chain", + "phase_name": "reconnaissance", + }, + ], + external_references=[ + { + "source_name": "capec", + "external_id": "CAPEC-163", + }, + ], + granular_markings=[ + { + "lang": "en_US", + "selectors": ["kill_chain_phases"], + }, + { + "marking_ref": "marking-definition--50902d70-37ae-4f85-af68-3f4095493b42", + "selectors": ["external_references"], + }, + ], ) + + +@stix2.CustomObject( + 'x-custom-type', + properties=[ + ("phases", stix2.properties.ListProperty(stix2.KillChainPhase)), + ("something_else", stix2.properties.IntegerProperty()), + ], +) +class CustomClass: + pass + + +def custom_obj(): + obj = CustomClass( + phases=[ + { + "kill_chain_name": "chain name", + "phase_name": "the phase name", + }, + ], + something_else=5, + ) + return obj + + +@stix2.CustomObject( + "test-object", [ + ("prop_name", stix2.properties.ListProperty(stix2.properties.BinaryProperty())), + ], + "extension-definition--15de9cdb-3515-4271-8479-8141154c5647", + is_sdo=True, +) +class TestClass: + pass + + +def test_binary_list(): + return TestClass(prop_name=["AREi", "7t3M"]) + + +@stix2.CustomObject( + "test2-object", [ + ( + "prop_name", stix2.properties.ListProperty( + stix2.properties.HexProperty(), + ), + ), + ], + "extension-definition--15de9cdb-4567-4271-8479-8141154c5647", + is_sdo=True, +) +class Test2Class: + pass + + +def test_hex_list(): + return Test2Class( + prop_name=["1122", "fedc"], + ) + + +@stix2.CustomObject( + "test3-object", [ + ( + "prop_name", + stix2.properties.DictionaryProperty( + valid_types=[ + stix2.properties.IntegerProperty, + stix2.properties.FloatProperty, + stix2.properties.StringProperty, + ], + ), + ), + ( + "list_of_timestamps", + stix2.properties.ListProperty(stix2.properties.TimestampProperty()), + ), + ], + "extension-definition--15de9cdb-1234-4271-8479-8141154c5647", + is_sdo=True, +) +class Test3Class: + pass + + +def test_dictionary(): + return Test3Class( + prop_name={"a": 1, "b": 2.3, "c": "foo"}, + list_of_timestamps=["2016-05-12T08:17:27.000Z", "2024-05-12T08:17:27.000Z"], + ) + + +def main(): + store = Neo4jStore() + + if store.sink: + + ap = kill_chain_test() + store.add(ap) + + x = email_message + + store.add(x) + + td = test_dictionary() + + store.add(td) + + th = test_hex_list() + + store.add(th) + + tb = test_binary_list() + + store.add(tb) + + co = custom_obj() + + store.add(co) + + pdf_file = file_example_with_PDFExt_Object() + store.add(pdf_file) + + store.add(directory_stix_object) + + store.add(s) + + store.add(extension_definition_insert()) + + dict_example = dictionary_test() + store.add(dict_example) + + malware = malware_with_all_required_properties() + store.add(malware) + + # read_obj = store.get(directory_stix_object.id) + # print(read_obj) + else: + print("database does not exist") + + +if __name__ == '__main__': + main()