Skip to content

Commit

Permalink
Use new attribute-based attachments interface
Browse files Browse the repository at this point in the history
Problem:
These samples use the legacy arc_attachments approach

Solution:
Updated to use the new attributes approach

Signed-off-by: Henry Jewell [email protected]
  • Loading branch information
Henry Jewell authored and henry739 committed Feb 9, 2023
1 parent 2f04ba0 commit c9ad3c4
Show file tree
Hide file tree
Showing 15 changed files with 261 additions and 215 deletions.
57 changes: 36 additions & 21 deletions archivist_samples/door_entry/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@
############


def attachment_create(doors, idx, name):
def attachment_create(doors, name):
with resources.open_binary(images_assets, name) as fd:
attachment = doors.attachments.upload(fd)
result = {
"arc_attachment_identity": attachment["identity"],
"arc_hash_alg": attachment["hash"]["alg"],
"arc_hash_value": attachment["hash"]["value"],
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": attachment["identity"],
"arc_blob_hash_alg": attachment["hash"]["alg"],
"arc_blob_hash_value": attachment["hash"]["value"],
"arc_file_name": name,
}
if idx == 0:
result["arc_display_name"] = "arc_primary_image"

return result

Expand Down Expand Up @@ -294,6 +294,23 @@ def create_cards(cards):
return cards_map


def find_attachment_attributes(attrs):
if attrs is None:
return []

attachment_attrs = []
for _, attribute in attrs.items():
if not isinstance(attribute, dict):
continue

if attribute.get("arc_attribute_type") != "arc_attachment":
continue

attachment_attrs.append(attribute)

return attachment_attrs


# Use case functions
####################

Expand All @@ -303,18 +320,18 @@ def list_doors(doors):
for door in doors.assets.list():
attrs = door["attributes"]
location = doors.locations.read(attrs["arc_home_location_identity"])
attachments = attrs["arc_attachments"] or []
attachments = find_attachment_attributes(attrs)

print(f"\tAsset name:\t{attrs['arc_display_name']}")
print(f"\tAsset type:\t{attrs['arc_display_type']}")
print(f"\tAsset location:\t{location['display_name']}")
print(f"\tAsset address:\t{location['attributes']['address']}")
print(f"\tArchivist ID:\t{door['identity']}")
for a in attachments:
print(f"\tAttachment identity: \t{a['arc_attachment_identity']}")
print(f"\tAttachment name: \t{a['arc_display_name']}")
print(f"\tAttachment identity: \t{a['arc_blob_identity']}")
print(f"\tAttachment name: \t{a.get('arc_display_name')}")
with open("/tmp/xxx", "wb") as fd:
doors.attachments.download(a["arc_attachment_identity"], fd)
doors.attachments.download(a["arc_blob_identity"], fd)

print("-----")

Expand Down Expand Up @@ -455,7 +472,7 @@ def open_door(doors, doorid, cards, cardid):
# delivery people take photos of parcels left in safe places
door_image = door.primary_image
if not door_image:
LOGGER.error("Door location is missing")
LOGGER.error("Door image is missing")
return

# Capture a picture from the built-in camera on the door
Expand Down Expand Up @@ -504,16 +521,14 @@ def open_door(doors, doorid, cards, cardid):
"wavestone_evt_type": "door_open",
# Attachments list is optional, and allows attaching extra evidence
# such as photographs, scans, PDFs etc to the event record. Adding
# one with dispaly name arc_primary_image will instruct the UI to
# one called arc_primary_image will instruct the UI to
# display it (although it's not required)
"arc_attachments": [
{
"arc_display_name": "arc_primary_image",
"arc_attachment_identity": image["identity"],
"arc_hash_value": image["hash"]["value"],
"arc_hash_alg": image["hash"]["alg"],
}
],
"arc_primary_image": {
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": image["identity"],
"arc_blob_hash_alg": image["hash"]["alg"],
"arc_blob_hash_value": image["hash"]["value"],
},
},
confirm=True,
)
Expand Down Expand Up @@ -550,7 +565,7 @@ def open_door(doors, doorid, cards, cardid):
"wavestone_door_name": wsext_doorname,
"wavestone_door_archivist_id": door["identity"],
"wavestone_evt_type": "door_open",
"arc_attachments": [door_image],
"arc_primary_image": [door_image],
},
confirm=True,
)
Expand Down
1 change: 0 additions & 1 deletion archivist_samples/software_bill_of_materials/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ In order to control data sharing to restrict general customers to see only the r
"arc_display_type",
"arc_display_name",
"arc_description",
"arc_attachments",
"sbom_component",
"sbom_supplier",
"sbom_uuid",
Expand Down
189 changes: 99 additions & 90 deletions archivist_samples/software_bill_of_materials/software_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,17 @@ def installation(
"sbom_installation_supplier": sbom_installation["supplier"],
"sbom_installation_uuid": sbom_installation["uuid"],
"sbom_installation_environment": sbom_installation["environment"],
"arc_attachments": [
{
"arc_display_name": sbom_installation["description"],
"arc_attachment_identity": attachment["identity"],
"arc_hash_value": attachment["hash"]["value"],
"arc_hash_alg": attachment["hash"]["alg"],
}
for attachment in self._attachments
],
}

for i, attachment in enumerate(self._attachments):
attrs[f"attachment_attr_{i}"] = {
"arc_display_name": sbom_installation["description"],
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": attachment["identity"],
"arc_blob_hash_alg": attachment["hash"]["alg"],
"arc_blob_hash_value": attachment["hash"]["value"],
}

if custom_attrs is not None:
attrs.update(custom_attrs)
asset_attrs = {
Expand Down Expand Up @@ -166,16 +167,17 @@ def decommission(
"sbom_decomission_target_date": sbom_decomission["target_date"],
"sbom_decomission_status": sbom_decomission["status"],
"sbom_decomission_environment": sbom_decomission["environment"],
"arc_attachments": [
{
"arc_display_name": sbom_decomission["description"],
"arc_attachment_identity": attachment["identity"],
"arc_hash_value": attachment["hash"]["value"],
"arc_hash_alg": attachment["hash"]["alg"],
}
for attachment in self._attachments
],
}

for i, attachment in enumerate(self._attachments):
attrs[f"attachment_attr_{i}"] = {
"arc_display_name": sbom_decomission["description"],
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": attachment["identity"],
"arc_blob_hash_alg": attachment["hash"]["alg"],
"arc_blob_hash_value": attachment["hash"]["value"],
}

if custom_attrs is not None:
attrs.update(custom_attrs)
asset_attrs = {
Expand Down Expand Up @@ -222,16 +224,17 @@ def upgrade(
"sbom_upgrade_supplier": sbom_upgrade["supplier"],
"sbom_upgrade_uuid": sbom_upgrade["uuid"],
"sbom_upgrade_environment": sbom_upgrade["environment"],
"arc_attachments": [
{
"arc_display_name": sbom_upgrade["description"],
"arc_attachment_identity": attachment["identity"],
"arc_hash_value": attachment["hash"]["value"],
"arc_hash_alg": attachment["hash"]["alg"],
}
for attachment in self._attachments
],
}

for i, attachment in enumerate(self._attachments):
attrs[f"attachment_attr_{i}"] = {
"arc_display_name": sbom_upgrade["description"],
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": attachment["identity"],
"arc_blob_hash_alg": attachment["hash"]["alg"],
"arc_blob_hash_value": attachment["hash"]["value"],
}

if custom_attrs is not None:
attrs.update(custom_attrs)
asset_attrs = {
Expand Down Expand Up @@ -282,16 +285,17 @@ def upgrade_plan(
"sbom_planned_version": sbom_planned["version"],
"sbom_planned_reference": sbom_planned["reference"],
"sbom_planned_environment": sbom_planned["environment"],
"arc_attachments": [
{
"arc_display_name": sbom_planned["description"],
"arc_attachment_identity": attachment["identity"],
"arc_hash_value": attachment["hash"]["value"],
"arc_hash_alg": attachment["hash"]["alg"],
}
for attachment in attachments
],
}

for i, attachment in enumerate(self._attachments):
attrs[f"attachment_attr_{i}"] = {
"arc_display_name": sbom_planned["description"],
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": attachment["identity"],
"arc_blob_hash_alg": attachment["hash"]["alg"],
"arc_blob_hash_value": attachment["hash"]["value"],
}

if custom_attrs is not None:
attrs.update(custom_attrs)
return self.arch.events.create(
Expand Down Expand Up @@ -327,16 +331,17 @@ def upgrade_accepted(
"sbom_accepted_version": sbom_accepted["version"],
"sbom_accepted_reference": sbom_accepted["reference"],
"sbom_accepted_environment": sbom_accepted["environment"],
"arc_attachments": [
{
"arc_display_name": sbom_accepted["description"],
"arc_attachment_identity": attachment["identity"],
"arc_hash_value": attachment["hash"]["value"],
"arc_hash_alg": attachment["hash"]["alg"],
}
for attachment in self._attachments
],
}

for i, attachment in enumerate(self._attachments):
attrs[f"attachment_attr_{i}"] = {
"arc_display_name": sbom_accepted["description"],
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": attachment["identity"],
"arc_blob_hash_alg": attachment["hash"]["alg"],
"arc_blob_hash_value": attachment["hash"]["value"],
}

if custom_attrs is not None:
attrs.update(custom_attrs)
return self.arch.events.create(
Expand Down Expand Up @@ -375,16 +380,17 @@ def rollback(
"sbom_rollback_supplier": sbom_rollback["supplier"],
"sbom_rollback_uuid": sbom_rollback["uuid"],
"sbom_rollback_environment": sbom_rollback["environment"],
"arc_attachments": [
{
"arc_display_name": sbom_rollback["description"],
"arc_attachment_identity": attachment["identity"],
"arc_hash_value": attachment["hash"]["value"],
"arc_hash_alg": attachment["hash"]["alg"],
}
for attachment in self._attachments
],
}

for i, attachment in enumerate(self._attachments):
attrs[f"attachment_attr_{i}"] = {
"arc_display_name": sbom_rollback["description"],
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": attachment["identity"],
"arc_blob_hash_alg": attachment["hash"]["alg"],
"arc_blob_hash_value": attachment["hash"]["value"],
}

if custom_attrs is not None:
attrs.update(custom_attrs)
asset_attrs = {
Expand Down Expand Up @@ -435,16 +441,17 @@ def rollback_plan(
"sbom_planned_version": sbom_planned["version"],
"sbom_planned_reference": sbom_planned["reference"],
"sbom_planned_environment": sbom_planned["environment"],
"arc_attachments": [
{
"arc_display_name": sbom_planned["description"],
"arc_attachment_identity": attachment["identity"],
"arc_hash_value": attachment["hash"]["value"],
"arc_hash_alg": attachment["hash"]["alg"],
}
for attachment in attachments
],
}

for i, attachment in enumerate(self._attachments):
attrs[f"attachment_attr_{i}"] = {
"arc_display_name": sbom_planned["description"],
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": attachment["identity"],
"arc_blob_hash_alg": attachment["hash"]["alg"],
"arc_blob_hash_value": attachment["hash"]["value"],
}

if custom_attrs is not None:
attrs.update(custom_attrs)
return self.arch.events.create(
Expand Down Expand Up @@ -480,16 +487,17 @@ def rollback_accepted(
"sbom_accepted_version": sbom_accepted["version"],
"sbom_accepted_reference": sbom_accepted["reference"],
"sbom_accepted_environment": sbom_accepted["environment"],
"arc_attachments": [
{
"arc_display_name": sbom_accepted["description"],
"arc_attachment_identity": attachment["identity"],
"arc_hash_value": attachment["hash"]["value"],
"arc_hash_alg": attachment["hash"]["alg"],
}
for attachment in self._attachments
],
}

for i, attachment in enumerate(self._attachments):
attrs[f"attachment_attr_{i}"] = {
"arc_display_name": sbom_accepted["description"],
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": attachment["identity"],
"arc_blob_hash_alg": attachment["hash"]["alg"],
"arc_blob_hash_value": attachment["hash"]["value"],
}

if custom_attrs is not None:
attrs.update(custom_attrs)
return self.arch.events.create(
Expand Down Expand Up @@ -525,17 +533,17 @@ def vuln_disclosure(
"vuln_author": vuln["author"],
"vuln_target_component": vuln["target_component"],
"vuln_target_version": vuln["target_version"],
"arc_attachments": [
{
"arc_display_name": vuln["description"],
"arc_attachment_identity": attachment["identity"],
"arc_hash_value": attachment["hash"]["value"],
"arc_hash_alg": attachment["hash"]["alg"],
}
for attachment in self._attachments
],
}

for i, attachment in enumerate(self._attachments):
attrs[f"attachment_attr_{i}"] = {
"arc_display_name": vuln["description"],
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": attachment["identity"],
"arc_blob_hash_alg": attachment["hash"]["alg"],
"arc_blob_hash_value": attachment["hash"]["value"],
}

if custom_attrs is not None:
attrs.update(custom_attrs)

Expand Down Expand Up @@ -568,16 +576,17 @@ def vuln_update(
"vuln_author": vuln["author"],
"vuln_target_component": vuln["target_component"],
"vuln_target_version": vuln["target_version"],
"arc_attachments": [
{
"arc_display_name": vuln["description"],
"arc_attachment_identity": attachment["identity"],
"arc_hash_value": attachment["hash"]["value"],
"arc_hash_alg": attachment["hash"]["alg"],
}
for attachment in self._attachments
],
}

for i, attachment in enumerate(self._attachments):
attrs[f"attachment_attr_{i}"] = {
"arc_display_name": vuln["description"],
"arc_attribute_type": "arc_attachment",
"arc_blob_identity": attachment["identity"],
"arc_blob_hash_alg": attachment["hash"]["alg"],
"arc_blob_hash_value": attachment["hash"]["value"],
}

if custom_attrs is not None:
attrs.update(custom_attrs)

Expand Down
Loading

0 comments on commit c9ad3c4

Please sign in to comment.