Skip to content

Commit

Permalink
NAS-133328 / 25.04 / Use -freebsdCompat flag for sedutil-cli (#15375)
Browse files Browse the repository at this point in the history
* add utils/sed.py

* use fbsd compat flag on MBR shadow

* fix/improve sed_unlock()/sed_unlock_all()
  • Loading branch information
yocalebo authored Jan 13, 2025
1 parent 10286b8 commit b73629f
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 83 deletions.
193 changes: 110 additions & 83 deletions src/middlewared/middlewared/plugins/disk_/sed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from middlewared.utils.asyncio_ import asyncio_map
from middlewared.service import CallError, Service, private
from middlewared.utils import run
from middlewared.utils.sed import unlock_impl


RE_HDPARM_DRIVE_LOCKED = re.compile(r'Security.*\n\s*locked', re.DOTALL)
Expand All @@ -15,94 +16,107 @@
class DiskService(Service):

@private
async def sed_unlock_all(self, force=False):
async def should_try_unlock(self, force=False):
if force:
# vrrp_master event will set this to True
return True

# on an HA system, if both controllers manage to send
# SED commands at the same time, then it can cause issues
# where, ultimately, the disks don't get unlocked
if not force: # Do not check the status if we are unlocking from vrrp_event
if await self.middleware.call('failover.licensed'):
if await self.middleware.call('failover.status') == 'BACKUP':
return

advconfig = await self.middleware.call('system.advanced.config')
disks = await self.middleware.call('disk.query', [], {'extra': {'passwords': True}})
return await self.middleware.call('failover.status') in ('MASTER', 'SINGLE')

# If no SED password was found we can stop here
if not await self.middleware.call('system.advanced.sed_global_password') and not any(
[d['passwd'] for d in disks]
@private
async def map_disks_to_passwd(self, disk_name=None):
global_passwd = await self.middleware.call('system.advanced.sed_global_password')
disks = []
filters = [] if disk_name is None else [('name', '=', disk_name)]
for disk in await self.middleware.call(
'disk.query', filters, {'extra': {'passwords': True}}
):
path = f'/dev/{disk["name"]}'
# user can specify a per-disk password and/or a global password
# we default to using the per-disk password with fallback to global
passwd = disk['passwd'] if disk['passwd'] else global_passwd
if passwd:
disks.append({'path': path, 'passwd': passwd})
return disks

@private
async def parse_unlock_info(self, info):
"""Purpose of this method is to parse the unlock object
since we have to run multiple commands for each disk.
This will log the appropriate error message and return
the absolute path of the disk that we failed to unlock.
"""
if info.invalid_or_unsupported:
# disk doesn't exist, or doesn't even return
# properly from the --query command
return

result = await asyncio_map(lambda disk: self.sed_unlock(disk['name'], disk, advconfig, True), disks, 16)
locked = list(filter(lambda x: x['locked'] is True, result))
if locked:
disk_names = ', '.join([i['name'] for i in locked])
self.logger.warn(f'Failed to unlock following SED disks: {disk_names}')
raise CallError('Failed to unlock SED disks', errno.EACCES)
return True
failed = None
if info.locked is True:
failed = info.disk_path
errmsg = f'{info.disk_path!r}'
# means disk supports SED and we failed to unlock
# the disk (either bad password or unhandled error)
if info.query_cp and info.query_cp.returncode:
errmsg += f' QUERY ERROR: {info.query_cp.stderr.decode(errors="ignore")!r}'
if info.unlock_cp and info.unlock_cp.returncode:
errmsg += f' UNLOCK ERROR: {info.unlock_cp.stderr.decode(errrors="ignore")!r}'
self.logger.warning(errmsg)

if info.mbr_cp and info.mbr_cp.returncode:
# if we successfully unlock the disk, we disable
# the MBR shadow protection since this is a feature
# used by the OS to protect boot partitions. We
# dont use this functionality since we're only
# locking/unlocking disks used in zpools.
self.logger.warning(
'%r MBR ERROR: %r',
info.disk_path,
info.mbr_cp.stderr.decode(errors="ignore")
)

return failed

@private
async def sed_unlock(self, disk_name, disk=None, advconfig=None, force=False):
# on an HA system, if both controllers manage to send
# SED commands at the same time, then it can cause issues
# where, ultimately, the disks don't get unlocked
if not force: # Do not check the status if we are unlocking from vrrp_event
if await self.middleware.call('failover.licensed'):
if await self.middleware.call('failover.status') == 'BACKUP':
return
async def sed_unlock_all(self, force=False):
if not self.should_try_unlock(force):
return

if advconfig is None:
advconfig = await self.middleware.call('system.advanced.config')
disks_to_unlock = await self.map_disks_to_passwd()
if not disks_to_unlock:
# If no SED password was found for any disk
# then there is no reason to continue
return

devname = f'/dev/{disk_name}'
# We need two states to tell apart when disk was successfully unlocked
locked = None
unlocked = None
password = await self.middleware.call('system.advanced.sed_global_password')

if disk is None:
disk = await self.query([('name', '=', disk_name)], {'extra': {'passwords': True}})
if disk and disk[0]['passwd']:
password = disk[0]['passwd']
elif disk.get('passwd'):
password = disk['passwd']

rv = {'name': disk_name, 'locked': None}

if not password:
# If there is no password no point in continuing
return rv

# Try unlocking TCG OPAL using sedutil
cp = await run('sedutil-cli', '--query', devname, check=False)
if cp.returncode == 0:
output = cp.stdout.decode(errors='ignore')
if 'Locked = Y' in output:
locked = True
cp = await run('sedutil-cli', '--setLockingRange', '0', 'RW', password, devname, check=False)
if cp.returncode == 0:
locked = False
unlocked = True
# If we were able to unlock it, let's set mbrenable to off
cp = await run('sedutil-cli', '--setMBREnable', 'off', password, devname, check=False)
if cp.returncode:
self.logger.error(
'Failed to set MBREnable for %r to "off": %s', devname,
cp.stderr.decode(), exc_info=True
)

elif 'Locked = N' in output:
locked = False
failed_to_unlock = list()
for i in await asyncio_map(unlock_impl, disks_to_unlock, limit=16):
if failed := await self.parse_unlock_info(i):
failed_to_unlock.append(failed)

if failed_to_unlock:
raise CallError(
'Failed to unlock SED disk(s), check /var/log/middlewared.log for details',
errno.EACCES
)

# Try ATA Security if SED was not unlocked and its not locked by OPAL
if not unlocked and not locked:
locked, unlocked = await self.middleware.call('disk.unlock_ata_security', devname, advconfig, password)
return True

@private
async def sed_unlock(self, disk_name, force=False):
if not self.should_try_unlock(force):
return

disk = await self.map_disks_to_passwd(disk_name)
if not disk:
return

if locked:
self.logger.error(f'Failed to unlock {disk_name}')
info = await unlock_impl(disk[0]["path"], disk[0]["passwd"])
failed = await self.parse_unlock_info(info)

rv['locked'] = locked
return rv
return failed is None or not info.locked

@private
async def sed_initial_setup(self, disk_name, password):
Expand Down Expand Up @@ -149,19 +163,32 @@ async def sed_initial_setup(self, disk_name, password):
return 'SUCCESS'

@private
async def unlock_ata_security(self, devname, _adv, password):
locked = unlocked = False
async def unlock_ata_security(self, devname, password):
# FIXME: REMOVE THIS METHOD. We don't sell non-TCG password protected
# disks so there is a high chance this does NOT work for anyone
# with this type of drive. Unless we can test this in-house on real
# drives, we're doing ourselves a disservice by having it. Especially
# since this is dealing with user's data
cp = await run('hdparm', '-I', devname, check=False)
if cp.returncode:
return locked, unlocked
return False

output = cp.stdout.decode()
if RE_HDPARM_DRIVE_LOCKED.search(output):
adv = await self.middleware.call('system.advanced.config')
locked = False
if RE_HDPARM_DRIVE_LOCKED.search(cp.stdout.decode()):
locked = True
cmd = ['hdparm', '--user-master', _adv['sed_user'][0].lower(), '--security-unlock', password, devname]
cp = await run(cmd, check=False)
cp = await run(
[
'hdparm',
'--user-master',
adv['sed_user'][0].lower(),
'--security-unlock',
password,
devname
],
check=False
)
if cp.returncode == 0:
locked = False
unlocked = True

return locked, unlocked
return locked
85 changes: 85 additions & 0 deletions src/middlewared/middlewared/utils/sed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from dataclasses import dataclass
from enum import IntEnum
from subprocess import CompletedProcess

from middlewared.utils import run

__all__ = ("unlock_impl",)


class ReturnCodeMappings(IntEnum):
SUCCESS = 0
AUTH_FAILED = 135
INVALID_OR_UNSUPPORTED = 136


@dataclass(slots=True, frozen=True, kw_only=True)
class UnlockResponses:
disk_path: str
"""The absolute path of the disk."""
invalid_or_unsupported: bool = False
"""Does the disk support SED or is it valid?"""
locked: bool | None = None
"""Is the disk locked?"""
query_cp: CompletedProcess | None = None
"""The response of `sedutil-cli --query` command"""
unlock_cp: CompletedProcess | None = None
"""The response of `sedutil-cli --setLockingRange` command."""
mbr_cp: CompletedProcess | None = None
"""The response of `sedutil-cli --setMBREnable off` command."""


async def run_sedutil_cmd(cmd: list[str]) -> CompletedProcess:
return await run(["sedutil-cli"] + cmd, check=False)


async def unlock_tcg_opal_pyrite(disk_path: str, password: str) -> UnlockResponses:
query_cp = await run_sedutil_cmd(["--query", disk_path])
if (
query_cp.returncode == ReturnCodeMappings.INVALID_OR_UNSUPPORTED
or query_cp.returncode != ReturnCodeMappings.SUCCESS
):
return UnlockResponses(
disk_path=disk_path, invalid_or_unsupported=True, query_cp=query_cp
)
elif b"Locked = N" in query_cp.stdout:
return UnlockResponses(disk_path=disk_path, locked=False, query_cp=query_cp)

use_fbsd_compat = False
cmd = ["--setLockingRange", "0", "RW", password, disk_path]
unlock_cp = await run_sedutil_cmd(cmd)
if unlock_cp.returncode == ReturnCodeMappings.AUTH_FAILED:
# Sigh, sedutil-cli (by default) uses an asinine,
# non-portable "hashing" mechanism for the password
# BY DEFAULT....This means the password you typed
# on freeBSD to setup the drive will NOT work when
# given on linux (and vice versa). Since we're now
# stuck with this head-ache, we'll try again with
# the `-freebsdCompat` flag.
cmd.insert(0, "-freebsdCompat")
unlock_cp = await run_sedutil_cmd(cmd)
use_fbsd_compat = unlock_cp.returncode == ReturnCodeMappings.SUCCESS

if unlock_cp.returncode != ReturnCodeMappings.SUCCESS:
return UnlockResponses(disk_path=disk_path, locked=True, unlock_cp=unlock_cp)

# Disable Master Boot Record (MBR) Shadowing Support.
# The host application can store and execute a
# “Pre-Boot Authentication (PBA) Environment” to
# unlock the range in which the OS is stored so that
# the OS can boot.
mbr_cmd = ["--setMBREnable", "off", password, disk_path]
if use_fbsd_compat:
mbr_cmd.insert(0, "-freebsdCompat")

mbr_cp = await run_sedutil_cmd(mbr_cmd)
return UnlockResponses(
disk_path=disk_path, locked=False, unlock_cp=unlock_cp, mbr_cp=mbr_cp
)


async def unlock_impl(disk: dict[str, str]) -> UnlockResponses:
"""Try to unlock the self encrypting drive (SED). The
drive must conform to one of the Trusted Computing Group (TCG)
Enterprise, Opal, Opalite or Pyrite SSC specifications."""
return await unlock_tcg_opal_pyrite(disk['path'], disk['passwd'])

0 comments on commit b73629f

Please sign in to comment.